LCOV - code coverage report
Current view: top level - src/backend/commands - vacuumparallel.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 94.4 % 395 373
Test Date: 2026-01-26 10:56:24 Functions: 92.9 % 14 13
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 69.7 % 218 152

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * vacuumparallel.c
       4                 :             :  *        Support routines for parallel vacuum execution.
       5                 :             :  *
       6                 :             :  * This file contains routines that are intended to support setting up, using,
       7                 :             :  * and tearing down a ParallelVacuumState.
       8                 :             :  *
       9                 :             :  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
      10                 :             :  * with parallel worker processes.  Individual indexes are processed by one
      11                 :             :  * vacuum process.  ParallelVacuumState contains shared information as well as
      12                 :             :  * the memory space for storing dead items allocated in the DSA area.  We
      13                 :             :  * launch parallel worker processes at the start of parallel index
      14                 :             :  * bulk-deletion and index cleanup and once all indexes are processed, the
      15                 :             :  * parallel worker processes exit.  Each time we process indexes in parallel,
      16                 :             :  * the parallel context is re-initialized so that the same DSM can be used for
      17                 :             :  * multiple passes of index bulk-deletion and index cleanup.
      18                 :             :  *
      19                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
      21                 :             :  *
      22                 :             :  * IDENTIFICATION
      23                 :             :  *        src/backend/commands/vacuumparallel.c
      24                 :             :  *
      25                 :             :  *-------------------------------------------------------------------------
      26                 :             :  */
      27                 :             : #include "postgres.h"
      28                 :             : 
      29                 :             : #include "access/amapi.h"
      30                 :             : #include "access/table.h"
      31                 :             : #include "access/xact.h"
      32                 :             : #include "commands/progress.h"
      33                 :             : #include "commands/vacuum.h"
      34                 :             : #include "executor/instrument.h"
      35                 :             : #include "optimizer/paths.h"
      36                 :             : #include "pgstat.h"
      37                 :             : #include "storage/bufmgr.h"
      38                 :             : #include "tcop/tcopprot.h"
      39                 :             : #include "utils/lsyscache.h"
      40                 :             : #include "utils/rel.h"
      41                 :             : 
      42                 :             : /*
      43                 :             :  * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
      44                 :             :  * we don't need to worry about DSM keys conflicting with plan_node_id we can
      45                 :             :  * use small integers.
      46                 :             :  */
      47                 :             : #define PARALLEL_VACUUM_KEY_SHARED                      1
      48                 :             : #define PARALLEL_VACUUM_KEY_QUERY_TEXT          2
      49                 :             : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE        3
      50                 :             : #define PARALLEL_VACUUM_KEY_WAL_USAGE           4
      51                 :             : #define PARALLEL_VACUUM_KEY_INDEX_STATS         5
      52                 :             : 
      53                 :             : /*
      54                 :             :  * Shared information among parallel workers.  So this is allocated in the DSM
      55                 :             :  * segment.
      56                 :             :  */
      57                 :             : typedef struct PVShared
      58                 :             : {
      59                 :             :         /*
      60                 :             :          * Target table relid, log level (for messages about parallel workers
      61                 :             :          * launched during VACUUM VERBOSE) and query ID.  These fields are not
      62                 :             :          * modified during the parallel vacuum.
      63                 :             :          */
      64                 :             :         Oid                     relid;
      65                 :             :         int                     elevel;
      66                 :             :         int64           queryid;
      67                 :             : 
      68                 :             :         /*
      69                 :             :          * Fields for both index vacuum and cleanup.
      70                 :             :          *
      71                 :             :          * reltuples is the total number of input heap tuples.  We set either old
      72                 :             :          * live tuples in the index vacuum case or the new live tuples in the
      73                 :             :          * index cleanup case.
      74                 :             :          *
      75                 :             :          * estimated_count is true if reltuples is an estimated value.  (Note that
      76                 :             :          * reltuples could be -1 in this case, indicating we have no idea.)
      77                 :             :          */
      78                 :             :         double          reltuples;
      79                 :             :         bool            estimated_count;
      80                 :             : 
      81                 :             :         /*
      82                 :             :          * In single process vacuum we could consume more memory during index
      83                 :             :          * vacuuming or cleanup apart from the memory for heap scanning.  In
      84                 :             :          * parallel vacuum, since individual vacuum workers can consume memory
      85                 :             :          * equal to maintenance_work_mem, the new maintenance_work_mem for each
      86                 :             :          * worker is set such that the parallel operation doesn't consume more
      87                 :             :          * memory than single process vacuum.
      88                 :             :          */
      89                 :             :         int                     maintenance_work_mem_worker;
      90                 :             : 
      91                 :             :         /*
      92                 :             :          * The number of buffers each worker's Buffer Access Strategy ring should
      93                 :             :          * contain.
      94                 :             :          */
      95                 :             :         int                     ring_nbuffers;
      96                 :             : 
      97                 :             :         /*
      98                 :             :          * Shared vacuum cost balance.  During parallel vacuum,
      99                 :             :          * VacuumSharedCostBalance points to this value and it accumulates the
     100                 :             :          * balance of each parallel vacuum worker.
     101                 :             :          */
     102                 :             :         pg_atomic_uint32 cost_balance;
     103                 :             : 
     104                 :             :         /*
     105                 :             :          * Number of active parallel workers.  This is used for computing the
     106                 :             :          * minimum threshold of the vacuum cost balance before a worker sleeps for
     107                 :             :          * cost-based delay.
     108                 :             :          */
     109                 :             :         pg_atomic_uint32 active_nworkers;
     110                 :             : 
     111                 :             :         /* Counter for vacuuming and cleanup */
     112                 :             :         pg_atomic_uint32 idx;
     113                 :             : 
     114                 :             :         /* DSA handle where the TidStore lives */
     115                 :             :         dsa_handle      dead_items_dsa_handle;
     116                 :             : 
     117                 :             :         /* DSA pointer to the shared TidStore */
     118                 :             :         dsa_pointer dead_items_handle;
     119                 :             : 
     120                 :             :         /* Statistics of shared dead items */
     121                 :             :         VacDeadItemsInfo dead_items_info;
     122                 :             : } PVShared;
     123                 :             : 
     124                 :             : /* Status used during parallel index vacuum or cleanup */
     125                 :             : typedef enum PVIndVacStatus
     126                 :             : {
     127                 :             :         PARALLEL_INDVAC_STATUS_INITIAL = 0,
     128                 :             :         PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
     129                 :             :         PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
     130                 :             :         PARALLEL_INDVAC_STATUS_COMPLETED,
     131                 :             : } PVIndVacStatus;
     132                 :             : 
     133                 :             : /*
     134                 :             :  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
     135                 :             :  * This includes the status of parallel index vacuum as well as index statistics.
     136                 :             :  */
     137                 :             : typedef struct PVIndStats
     138                 :             : {
     139                 :             :         /*
     140                 :             :          * The following two fields are set by leader process before executing
     141                 :             :          * parallel index vacuum or parallel index cleanup.  These fields are not
     142                 :             :          * fixed for the entire VACUUM operation.  They are only fixed for an
     143                 :             :          * individual parallel index vacuum and cleanup.
     144                 :             :          *
     145                 :             :          * parallel_workers_can_process is true if both leader and worker can
     146                 :             :          * process the index, otherwise only leader can process it.
     147                 :             :          */
     148                 :             :         PVIndVacStatus status;
     149                 :             :         bool            parallel_workers_can_process;
     150                 :             : 
     151                 :             :         /*
     152                 :             :          * Individual worker or leader stores the result of index vacuum or
     153                 :             :          * cleanup.
     154                 :             :          */
     155                 :             :         bool            istat_updated;  /* are the stats updated? */
     156                 :             :         IndexBulkDeleteResult istat;
     157                 :             : } PVIndStats;
     158                 :             : 
     159                 :             : /*
     160                 :             :  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
     161                 :             :  */
     162                 :             : struct ParallelVacuumState
     163                 :             : {
     164                 :             :         /* NULL for worker processes */
     165                 :             :         ParallelContext *pcxt;
     166                 :             : 
     167                 :             :         /* Parent Heap Relation */
     168                 :             :         Relation        heaprel;
     169                 :             : 
     170                 :             :         /* Target indexes */
     171                 :             :         Relation   *indrels;
     172                 :             :         int                     nindexes;
     173                 :             : 
     174                 :             :         /* Shared information among parallel vacuum workers */
     175                 :             :         PVShared   *shared;
     176                 :             : 
     177                 :             :         /*
     178                 :             :          * Shared index statistics among parallel vacuum workers. The array
     179                 :             :          * element is allocated for every index, even those indexes where parallel
     180                 :             :          * index vacuuming is unsafe or not worthwhile (e.g.,
     181                 :             :          * will_parallel_vacuum[] is false).  During parallel vacuum,
     182                 :             :          * IndexBulkDeleteResult of each index is kept in DSM and is copied into
     183                 :             :          * local memory at the end of parallel vacuum.
     184                 :             :          */
     185                 :             :         PVIndStats *indstats;
     186                 :             : 
     187                 :             :         /* Shared dead items space among parallel vacuum workers */
     188                 :             :         TidStore   *dead_items;
     189                 :             : 
     190                 :             :         /* Points to buffer usage area in DSM */
     191                 :             :         BufferUsage *buffer_usage;
     192                 :             : 
     193                 :             :         /* Points to WAL usage area in DSM */
     194                 :             :         WalUsage   *wal_usage;
     195                 :             : 
     196                 :             :         /*
     197                 :             :          * False if the index is totally unsuitable target for all parallel
     198                 :             :          * processing. For example, the index could be <
     199                 :             :          * min_parallel_index_scan_size cutoff.
     200                 :             :          */
     201                 :             :         bool       *will_parallel_vacuum;
     202                 :             : 
     203                 :             :         /*
     204                 :             :          * The number of indexes that support parallel index bulk-deletion and
     205                 :             :          * parallel index cleanup respectively.
     206                 :             :          */
     207                 :             :         int                     nindexes_parallel_bulkdel;
     208                 :             :         int                     nindexes_parallel_cleanup;
     209                 :             :         int                     nindexes_parallel_condcleanup;
     210                 :             : 
     211                 :             :         /* Buffer access strategy used by leader process */
     212                 :             :         BufferAccessStrategy bstrategy;
     213                 :             : 
     214                 :             :         /*
     215                 :             :          * Error reporting state.  The error callback is set only for workers
     216                 :             :          * processes during parallel index vacuum.
     217                 :             :          */
     218                 :             :         char       *relnamespace;
     219                 :             :         char       *relname;
     220                 :             :         char       *indname;
     221                 :             :         PVIndVacStatus status;
     222                 :             : };
     223                 :             : 
     224                 :             : static int      parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     225                 :             :                                                                                         bool *will_parallel_vacuum);
     226                 :             : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     227                 :             :                                                                                                 bool vacuum);
     228                 :             : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
     229                 :             : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
     230                 :             : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     231                 :             :                                                                                           PVIndStats *indstats);
     232                 :             : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     233                 :             :                                                                                                    bool vacuum);
     234                 :             : static void parallel_vacuum_error_callback(void *arg);
     235                 :             : 
     236                 :             : /*
     237                 :             :  * Try to enter parallel mode and create a parallel context.  Then initialize
     238                 :             :  * shared memory state.
     239                 :             :  *
     240                 :             :  * On success, return parallel vacuum state.  Otherwise return NULL.
     241                 :             :  */
     242                 :             : ParallelVacuumState *
     243                 :         127 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
     244                 :             :                                          int nrequested_workers, int vac_work_mem,
     245                 :             :                                          int elevel, BufferAccessStrategy bstrategy)
     246                 :             : {
     247                 :         127 :         ParallelVacuumState *pvs;
     248                 :         127 :         ParallelContext *pcxt;
     249                 :         127 :         PVShared   *shared;
     250                 :         127 :         TidStore   *dead_items;
     251                 :         127 :         PVIndStats *indstats;
     252                 :         127 :         BufferUsage *buffer_usage;
     253                 :         127 :         WalUsage   *wal_usage;
     254                 :         127 :         bool       *will_parallel_vacuum;
     255                 :         127 :         Size            est_indstats_len;
     256                 :         127 :         Size            est_shared_len;
     257                 :         127 :         int                     nindexes_mwm = 0;
     258                 :         127 :         int                     parallel_workers = 0;
     259                 :         127 :         int                     querylen;
     260                 :             : 
     261                 :             :         /*
     262                 :             :          * A parallel vacuum must be requested and there must be indexes on the
     263                 :             :          * relation
     264                 :             :          */
     265         [ +  - ]:         127 :         Assert(nrequested_workers >= 0);
     266         [ +  - ]:         127 :         Assert(nindexes > 0);
     267                 :             : 
     268                 :             :         /*
     269                 :             :          * Compute the number of parallel vacuum workers to launch
     270                 :             :          */
     271                 :         127 :         will_parallel_vacuum = palloc0_array(bool, nindexes);
     272                 :         254 :         parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
     273                 :         127 :                                                                                                            nrequested_workers,
     274                 :         127 :                                                                                                            will_parallel_vacuum);
     275         [ +  + ]:         127 :         if (parallel_workers <= 0)
     276                 :             :         {
     277                 :             :                 /* Can't perform vacuum in parallel -- return NULL */
     278                 :         121 :                 pfree(will_parallel_vacuum);
     279                 :         121 :                 return NULL;
     280                 :             :         }
     281                 :             : 
     282                 :           6 :         pvs = palloc0_object(ParallelVacuumState);
     283                 :           6 :         pvs->indrels = indrels;
     284                 :           6 :         pvs->nindexes = nindexes;
     285                 :           6 :         pvs->will_parallel_vacuum = will_parallel_vacuum;
     286                 :           6 :         pvs->bstrategy = bstrategy;
     287                 :           6 :         pvs->heaprel = rel;
     288                 :             : 
     289                 :           6 :         EnterParallelMode();
     290                 :           6 :         pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
     291                 :           6 :                                                                  parallel_workers);
     292         [ +  - ]:           6 :         Assert(pcxt->nworkers > 0);
     293                 :           6 :         pvs->pcxt = pcxt;
     294                 :             : 
     295                 :             :         /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
     296                 :           6 :         est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
     297                 :           6 :         shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
     298                 :           6 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     299                 :             : 
     300                 :             :         /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
     301                 :           6 :         est_shared_len = sizeof(PVShared);
     302                 :           6 :         shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
     303                 :           6 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     304                 :             : 
     305                 :             :         /*
     306                 :             :          * Estimate space for BufferUsage and WalUsage --
     307                 :             :          * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
     308                 :             :          *
     309                 :             :          * If there are no extensions loaded that care, we could skip this.  We
     310                 :             :          * have no way of knowing whether anyone's looking at pgBufferUsage or
     311                 :             :          * pgWalUsage, so do it unconditionally.
     312                 :             :          */
     313                 :           6 :         shm_toc_estimate_chunk(&pcxt->estimator,
     314                 :             :                                                    mul_size(sizeof(BufferUsage), pcxt->nworkers));
     315                 :           6 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     316                 :           6 :         shm_toc_estimate_chunk(&pcxt->estimator,
     317                 :             :                                                    mul_size(sizeof(WalUsage), pcxt->nworkers));
     318                 :           6 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     319                 :             : 
     320                 :             :         /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
     321         [ +  - ]:           6 :         if (debug_query_string)
     322                 :             :         {
     323                 :           6 :                 querylen = strlen(debug_query_string);
     324                 :           6 :                 shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
     325                 :           6 :                 shm_toc_estimate_keys(&pcxt->estimator, 1);
     326                 :           6 :         }
     327                 :             :         else
     328                 :           0 :                 querylen = 0;                   /* keep compiler quiet */
     329                 :             : 
     330                 :           6 :         InitializeParallelDSM(pcxt);
     331                 :             : 
     332                 :             :         /* Prepare index vacuum stats */
     333                 :           6 :         indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
     334   [ +  -  +  -  :         132 :         MemSet(indstats, 0, est_indstats_len);
          +  -  -  +  +  
                      + ]
     335         [ +  + ]:          27 :         for (int i = 0; i < nindexes; i++)
     336                 :             :         {
     337                 :          21 :                 Relation        indrel = indrels[i];
     338                 :          21 :                 uint8           vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     339                 :             : 
     340                 :             :                 /*
     341                 :             :                  * Cleanup option should be either disabled, always performing in
     342                 :             :                  * parallel or conditionally performing in parallel.
     343                 :             :                  */
     344   [ +  +  -  + ]:          21 :                 Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
     345                 :             :                            ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
     346         [ -  + ]:          21 :                 Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
     347                 :             : 
     348         [ +  + ]:          21 :                 if (!will_parallel_vacuum[i])
     349                 :           1 :                         continue;
     350                 :             : 
     351         [ +  + ]:          20 :                 if (indrel->rd_indam->amusemaintenanceworkmem)
     352                 :           2 :                         nindexes_mwm++;
     353                 :             : 
     354                 :             :                 /*
     355                 :             :                  * Remember the number of indexes that support parallel operation for
     356                 :             :                  * each phase.
     357                 :             :                  */
     358         [ +  + ]:          20 :                 if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     359                 :          18 :                         pvs->nindexes_parallel_bulkdel++;
     360         [ +  + ]:          20 :                 if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
     361                 :           4 :                         pvs->nindexes_parallel_cleanup++;
     362         [ +  + ]:          20 :                 if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
     363                 :          14 :                         pvs->nindexes_parallel_condcleanup++;
     364      [ -  +  + ]:          21 :         }
     365                 :           6 :         shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
     366                 :           6 :         pvs->indstats = indstats;
     367                 :             : 
     368                 :             :         /* Prepare shared information */
     369                 :           6 :         shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
     370   [ +  -  +  -  :          66 :         MemSet(shared, 0, est_shared_len);
          +  -  -  +  +  
                      + ]
     371                 :           6 :         shared->relid = RelationGetRelid(rel);
     372                 :           6 :         shared->elevel = elevel;
     373                 :           6 :         shared->queryid = pgstat_get_my_query_id();
     374                 :           6 :         shared->maintenance_work_mem_worker =
     375         [ +  + ]:           8 :                 (nindexes_mwm > 0) ?
     376         [ -  + ]:           2 :                 maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
     377                 :           4 :                 maintenance_work_mem;
     378                 :           6 :         shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
     379                 :             : 
     380                 :             :         /* Prepare DSA space for dead items */
     381                 :           6 :         dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
     382                 :             :                                                                           LWTRANCHE_PARALLEL_VACUUM_DSA);
     383                 :           6 :         pvs->dead_items = dead_items;
     384                 :           6 :         shared->dead_items_handle = TidStoreGetHandle(dead_items);
     385                 :           6 :         shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
     386                 :             : 
     387                 :             :         /* Use the same buffer size for all workers */
     388                 :           6 :         shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
     389                 :             : 
     390                 :           6 :         pg_atomic_init_u32(&(shared->cost_balance), 0);
     391                 :           6 :         pg_atomic_init_u32(&(shared->active_nworkers), 0);
     392                 :           6 :         pg_atomic_init_u32(&(shared->idx), 0);
     393                 :             : 
     394                 :           6 :         shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
     395                 :           6 :         pvs->shared = shared;
     396                 :             : 
     397                 :             :         /*
     398                 :             :          * Allocate space for each worker's BufferUsage and WalUsage; no need to
     399                 :             :          * initialize
     400                 :             :          */
     401                 :          12 :         buffer_usage = shm_toc_allocate(pcxt->toc,
     402                 :           6 :                                                                         mul_size(sizeof(BufferUsage), pcxt->nworkers));
     403                 :           6 :         shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
     404                 :           6 :         pvs->buffer_usage = buffer_usage;
     405                 :          12 :         wal_usage = shm_toc_allocate(pcxt->toc,
     406                 :           6 :                                                                  mul_size(sizeof(WalUsage), pcxt->nworkers));
     407                 :           6 :         shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
     408                 :           6 :         pvs->wal_usage = wal_usage;
     409                 :             : 
     410                 :             :         /* Store query string for workers */
     411         [ -  + ]:           6 :         if (debug_query_string)
     412                 :             :         {
     413                 :           6 :                 char       *sharedquery;
     414                 :             : 
     415                 :           6 :                 sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
     416                 :           6 :                 memcpy(sharedquery, debug_query_string, querylen + 1);
     417                 :           6 :                 sharedquery[querylen] = '\0';
     418                 :          12 :                 shm_toc_insert(pcxt->toc,
     419                 :           6 :                                            PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
     420                 :           6 :         }
     421                 :             : 
     422                 :             :         /* Success -- return parallel vacuum state */
     423                 :           6 :         return pvs;
     424                 :         127 : }
     425                 :             : 
     426                 :             : /*
     427                 :             :  * Destroy the parallel context, and end parallel mode.
     428                 :             :  *
     429                 :             :  * Since writes are not allowed during parallel mode, copy the
     430                 :             :  * updated index statistics from DSM into local memory and then later use that
     431                 :             :  * to update the index statistics.  One might think that we can exit from
     432                 :             :  * parallel mode, update the index statistics and then destroy parallel
     433                 :             :  * context, but that won't be safe (see ExitParallelMode).
     434                 :             :  */
     435                 :             : void
     436                 :           6 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
     437                 :             : {
     438         [ +  - ]:           6 :         Assert(!IsParallelWorker());
     439                 :             : 
     440                 :             :         /* Copy the updated statistics */
     441         [ +  + ]:          27 :         for (int i = 0; i < pvs->nindexes; i++)
     442                 :             :         {
     443                 :          21 :                 PVIndStats *indstats = &(pvs->indstats[i]);
     444                 :             : 
     445         [ +  + ]:          21 :                 if (indstats->istat_updated)
     446                 :             :                 {
     447                 :          15 :                         istats[i] = palloc0_object(IndexBulkDeleteResult);
     448                 :          15 :                         memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
     449                 :          15 :                 }
     450                 :             :                 else
     451                 :           6 :                         istats[i] = NULL;
     452                 :          21 :         }
     453                 :             : 
     454                 :           6 :         TidStoreDestroy(pvs->dead_items);
     455                 :             : 
     456                 :           6 :         DestroyParallelContext(pvs->pcxt);
     457                 :           6 :         ExitParallelMode();
     458                 :             : 
     459                 :           6 :         pfree(pvs->will_parallel_vacuum);
     460                 :           6 :         pfree(pvs);
     461                 :           6 : }
     462                 :             : 
     463                 :             : /*
     464                 :             :  * Returns the dead items space and dead items information.
     465                 :             :  */
     466                 :             : TidStore *
     467                 :          13 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
     468                 :             : {
     469                 :          13 :         *dead_items_info_p = &(pvs->shared->dead_items_info);
     470                 :          13 :         return pvs->dead_items;
     471                 :             : }
     472                 :             : 
     473                 :             : /* Forget all items in dead_items */
     474                 :             : void
     475                 :           7 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
     476                 :             : {
     477                 :           7 :         VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
     478                 :             : 
     479                 :             :         /*
     480                 :             :          * Free the current tidstore and return allocated DSA segments to the
     481                 :             :          * operating system. Then we recreate the tidstore with the same max_bytes
     482                 :             :          * limitation we just used.
     483                 :             :          */
     484                 :           7 :         TidStoreDestroy(pvs->dead_items);
     485                 :           7 :         pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
     486                 :             :                                                                                    LWTRANCHE_PARALLEL_VACUUM_DSA);
     487                 :             : 
     488                 :             :         /* Update the DSA pointer for dead_items to the new one */
     489                 :           7 :         pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(pvs->dead_items));
     490                 :           7 :         pvs->shared->dead_items_handle = TidStoreGetHandle(pvs->dead_items);
     491                 :             : 
     492                 :             :         /* Reset the counter */
     493                 :           7 :         dead_items_info->num_items = 0;
     494                 :           7 : }
     495                 :             : 
     496                 :             : /*
     497                 :             :  * Do parallel index bulk-deletion with parallel workers.
     498                 :             :  */
     499                 :             : void
     500                 :           7 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     501                 :             :                                                                         int num_index_scans)
     502                 :             : {
     503         [ +  - ]:           7 :         Assert(!IsParallelWorker());
     504                 :             : 
     505                 :             :         /*
     506                 :             :          * We can only provide an approximate value of num_heap_tuples, at least
     507                 :             :          * for now.
     508                 :             :          */
     509                 :           7 :         pvs->shared->reltuples = num_table_tuples;
     510                 :           7 :         pvs->shared->estimated_count = true;
     511                 :             : 
     512                 :           7 :         parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
     513                 :           7 : }
     514                 :             : 
     515                 :             : /*
     516                 :             :  * Do parallel index cleanup with parallel workers.
     517                 :             :  */
     518                 :             : void
     519                 :           6 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     520                 :             :                                                                         int num_index_scans, bool estimated_count)
     521                 :             : {
     522         [ +  - ]:           6 :         Assert(!IsParallelWorker());
     523                 :             : 
     524                 :             :         /*
     525                 :             :          * We can provide a better estimate of total number of surviving tuples
     526                 :             :          * (we assume indexes are more interested in that than in the number of
     527                 :             :          * nominally live tuples).
     528                 :             :          */
     529                 :           6 :         pvs->shared->reltuples = num_table_tuples;
     530                 :           6 :         pvs->shared->estimated_count = estimated_count;
     531                 :             : 
     532                 :           6 :         parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
     533                 :           6 : }
     534                 :             : 
     535                 :             : /*
     536                 :             :  * Compute the number of parallel worker processes to request.  Both index
     537                 :             :  * vacuum and index cleanup can be executed with parallel workers.
     538                 :             :  * The index is eligible for parallel vacuum iff its size is greater than
     539                 :             :  * min_parallel_index_scan_size as invoking workers for very small indexes
     540                 :             :  * can hurt performance.
     541                 :             :  *
     542                 :             :  * nrequested is the number of parallel workers that user requested.  If
     543                 :             :  * nrequested is 0, we compute the parallel degree based on nindexes, that is
     544                 :             :  * the number of indexes that support parallel vacuum.  This function also
     545                 :             :  * sets will_parallel_vacuum to remember indexes that participate in parallel
     546                 :             :  * vacuum.
     547                 :             :  */
     548                 :             : static int
     549                 :         127 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     550                 :             :                                                                 bool *will_parallel_vacuum)
     551                 :             : {
     552                 :         127 :         int                     nindexes_parallel = 0;
     553                 :         127 :         int                     nindexes_parallel_bulkdel = 0;
     554                 :         127 :         int                     nindexes_parallel_cleanup = 0;
     555                 :         127 :         int                     parallel_workers;
     556                 :             : 
     557                 :             :         /*
     558                 :             :          * We don't allow performing parallel operation in standalone backend or
     559                 :             :          * when parallelism is disabled.
     560                 :             :          */
     561   [ +  +  -  + ]:         127 :         if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
     562                 :          49 :                 return 0;
     563                 :             : 
     564                 :             :         /*
     565                 :             :          * Compute the number of indexes that can participate in parallel vacuum.
     566                 :             :          */
     567         [ +  + ]:         261 :         for (int i = 0; i < nindexes; i++)
     568                 :             :         {
     569                 :         183 :                 Relation        indrel = indrels[i];
     570                 :         183 :                 uint8           vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     571                 :             : 
     572                 :             :                 /* Skip index that is not a suitable target for parallel index vacuum */
     573   [ +  -  +  + ]:         183 :                 if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
     574                 :         183 :                         RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
     575                 :         159 :                         continue;
     576                 :             : 
     577                 :          24 :                 will_parallel_vacuum[i] = true;
     578                 :             : 
     579         [ +  + ]:          24 :                 if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     580                 :          22 :                         nindexes_parallel_bulkdel++;
     581   [ +  +  +  + ]:          24 :                 if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
     582                 :          20 :                         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     583                 :          22 :                         nindexes_parallel_cleanup++;
     584      [ -  +  + ]:         183 :         }
     585                 :             : 
     586         [ -  + ]:          78 :         nindexes_parallel = Max(nindexes_parallel_bulkdel,
     587                 :             :                                                         nindexes_parallel_cleanup);
     588                 :             : 
     589                 :             :         /* The leader process takes one index */
     590                 :          78 :         nindexes_parallel--;
     591                 :             : 
     592                 :             :         /* No index supports parallel vacuum */
     593         [ +  + ]:          78 :         if (nindexes_parallel <= 0)
     594                 :          72 :                 return 0;
     595                 :             : 
     596                 :             :         /* Compute the parallel degree */
     597         [ +  + ]:          11 :         parallel_workers = (nrequested > 0) ?
     598         [ +  + ]:           6 :                 Min(nrequested, nindexes_parallel) : nindexes_parallel;
     599                 :             : 
     600                 :             :         /* Cap by max_parallel_maintenance_workers */
     601         [ +  + ]:           6 :         parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
     602                 :             : 
     603                 :           6 :         return parallel_workers;
     604                 :         127 : }
     605                 :             : 
     606                 :             : /*
     607                 :             :  * Perform index vacuum or index cleanup with parallel workers.  This function
     608                 :             :  * must be used by the parallel vacuum leader process.
     609                 :             :  */
     610                 :             : static void
     611                 :          13 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     612                 :             :                                                                         bool vacuum)
     613                 :             : {
     614                 :          13 :         int                     nworkers;
     615                 :          13 :         PVIndVacStatus new_status;
     616                 :             : 
     617         [ +  - ]:          13 :         Assert(!IsParallelWorker());
     618                 :             : 
     619         [ +  + ]:          13 :         if (vacuum)
     620                 :             :         {
     621                 :           7 :                 new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
     622                 :             : 
     623                 :             :                 /* Determine the number of parallel workers to launch */
     624                 :           7 :                 nworkers = pvs->nindexes_parallel_bulkdel;
     625                 :           7 :         }
     626                 :             :         else
     627                 :             :         {
     628                 :           6 :                 new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
     629                 :             : 
     630                 :             :                 /* Determine the number of parallel workers to launch */
     631                 :           6 :                 nworkers = pvs->nindexes_parallel_cleanup;
     632                 :             : 
     633                 :             :                 /* Add conditionally parallel-aware indexes if in the first time call */
     634         [ +  + ]:           6 :                 if (num_index_scans == 0)
     635                 :           3 :                         nworkers += pvs->nindexes_parallel_condcleanup;
     636                 :             :         }
     637                 :             : 
     638                 :             :         /* The leader process will participate */
     639                 :          13 :         nworkers--;
     640                 :             : 
     641                 :             :         /*
     642                 :             :          * It is possible that parallel context is initialized with fewer workers
     643                 :             :          * than the number of indexes that need a separate worker in the current
     644                 :             :          * phase, so we need to consider it.  See
     645                 :             :          * parallel_vacuum_compute_workers().
     646                 :             :          */
     647         [ +  + ]:          13 :         nworkers = Min(nworkers, pvs->pcxt->nworkers);
     648                 :             : 
     649                 :             :         /*
     650                 :             :          * Set index vacuum status and mark whether parallel vacuum worker can
     651                 :             :          * process it.
     652                 :             :          */
     653         [ +  + ]:          53 :         for (int i = 0; i < pvs->nindexes; i++)
     654                 :             :         {
     655                 :          40 :                 PVIndStats *indstats = &(pvs->indstats[i]);
     656                 :             : 
     657         [ +  - ]:          40 :                 Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
     658                 :          40 :                 indstats->status = new_status;
     659                 :          40 :                 indstats->parallel_workers_can_process =
     660         [ +  + ]:          40 :                         (pvs->will_parallel_vacuum[i] &&
     661                 :          76 :                          parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
     662                 :          38 :                                                                                                         num_index_scans,
     663                 :          38 :                                                                                                         vacuum));
     664                 :          40 :         }
     665                 :             : 
     666                 :             :         /* Reset the parallel index processing and progress counters */
     667                 :          13 :         pg_atomic_write_u32(&(pvs->shared->idx), 0);
     668                 :             : 
     669                 :             :         /* Setup the shared cost-based vacuum delay and launch workers */
     670         [ +  + ]:          13 :         if (nworkers > 0)
     671                 :             :         {
     672                 :             :                 /* Reinitialize parallel context to relaunch parallel workers */
     673         [ +  + ]:          11 :                 if (num_index_scans > 0)
     674                 :           5 :                         ReinitializeParallelDSM(pvs->pcxt);
     675                 :             : 
     676                 :             :                 /*
     677                 :             :                  * Set up shared cost balance and the number of active workers for
     678                 :             :                  * vacuum delay.  We need to do this before launching workers as
     679                 :             :                  * otherwise, they might not see the updated values for these
     680                 :             :                  * parameters.
     681                 :             :                  */
     682                 :          11 :                 pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
     683                 :          11 :                 pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
     684                 :             : 
     685                 :             :                 /*
     686                 :             :                  * The number of workers can vary between bulkdelete and cleanup
     687                 :             :                  * phase.
     688                 :             :                  */
     689                 :          11 :                 ReinitializeParallelWorkers(pvs->pcxt, nworkers);
     690                 :             : 
     691                 :          11 :                 LaunchParallelWorkers(pvs->pcxt);
     692                 :             : 
     693         [ +  - ]:          11 :                 if (pvs->pcxt->nworkers_launched > 0)
     694                 :             :                 {
     695                 :             :                         /*
     696                 :             :                          * Reset the local cost values for leader backend as we have
     697                 :             :                          * already accumulated the remaining balance of heap.
     698                 :             :                          */
     699                 :          11 :                         VacuumCostBalance = 0;
     700                 :          11 :                         VacuumCostBalanceLocal = 0;
     701                 :             : 
     702                 :             :                         /* Enable shared cost balance for leader backend */
     703                 :          11 :                         VacuumSharedCostBalance = &(pvs->shared->cost_balance);
     704                 :          11 :                         VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
     705                 :          11 :                 }
     706                 :             : 
     707         [ +  + ]:          11 :                 if (vacuum)
     708   [ -  +  #  #  :           7 :                         ereport(pvs->shared->elevel,
          -  +  -  +  #  
                      # ]
     709                 :             :                                         (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
     710                 :             :                                                                          "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
     711                 :             :                                                                          pvs->pcxt->nworkers_launched),
     712                 :             :                                                         pvs->pcxt->nworkers_launched, nworkers)));
     713                 :             :                 else
     714   [ -  +  #  #  :           4 :                         ereport(pvs->shared->elevel,
          -  +  -  +  #  
                      # ]
     715                 :             :                                         (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
     716                 :             :                                                                          "launched %d parallel vacuum workers for index cleanup (planned: %d)",
     717                 :             :                                                                          pvs->pcxt->nworkers_launched),
     718                 :             :                                                         pvs->pcxt->nworkers_launched, nworkers)));
     719                 :          11 :         }
     720                 :             : 
     721                 :             :         /* Vacuum the indexes that can be processed by only leader process */
     722                 :          13 :         parallel_vacuum_process_unsafe_indexes(pvs);
     723                 :             : 
     724                 :             :         /*
     725                 :             :          * Join as a parallel worker.  The leader vacuums alone processes all
     726                 :             :          * parallel-safe indexes in the case where no workers are launched.
     727                 :             :          */
     728                 :          13 :         parallel_vacuum_process_safe_indexes(pvs);
     729                 :             : 
     730                 :             :         /*
     731                 :             :          * Next, accumulate buffer and WAL usage.  (This must wait for the workers
     732                 :             :          * to finish, or we might get incomplete data.)
     733                 :             :          */
     734         [ +  + ]:          13 :         if (nworkers > 0)
     735                 :             :         {
     736                 :             :                 /* Wait for all vacuum workers to finish */
     737                 :          11 :                 WaitForParallelWorkersToFinish(pvs->pcxt);
     738                 :             : 
     739         [ +  + ]:          24 :                 for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
     740                 :          13 :                         InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
     741                 :          11 :         }
     742                 :             : 
     743                 :             :         /*
     744                 :             :          * Reset all index status back to initial (while checking that we have
     745                 :             :          * vacuumed all indexes).
     746                 :             :          */
     747         [ +  + ]:          53 :         for (int i = 0; i < pvs->nindexes; i++)
     748                 :             :         {
     749                 :          40 :                 PVIndStats *indstats = &(pvs->indstats[i]);
     750                 :             : 
     751         [ +  - ]:          40 :                 if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
     752   [ #  #  #  # ]:           0 :                         elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
     753                 :             :                                  RelationGetRelationName(pvs->indrels[i]));
     754                 :             : 
     755                 :          40 :                 indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
     756                 :          40 :         }
     757                 :             : 
     758                 :             :         /*
     759                 :             :          * Carry the shared balance value to heap scan and disable shared costing
     760                 :             :          */
     761         [ +  + ]:          13 :         if (VacuumSharedCostBalance)
     762                 :             :         {
     763                 :          11 :                 VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
     764                 :          11 :                 VacuumSharedCostBalance = NULL;
     765                 :          11 :                 VacuumActiveNWorkers = NULL;
     766                 :          11 :         }
     767                 :          13 : }
     768                 :             : 
     769                 :             : /*
     770                 :             :  * Index vacuum/cleanup routine used by the leader process and parallel
     771                 :             :  * vacuum worker processes to vacuum the indexes in parallel.
     772                 :             :  */
     773                 :             : static void
     774                 :          26 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
     775                 :             : {
     776                 :             :         /*
     777                 :             :          * Increment the active worker count if we are able to launch any worker.
     778                 :             :          */
     779         [ +  + ]:          26 :         if (VacuumActiveNWorkers)
     780                 :          24 :                 pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     781                 :             : 
     782                 :             :         /* Loop until all indexes are vacuumed */
     783                 :          54 :         for (;;)
     784                 :             :         {
     785                 :          66 :                 int                     idx;
     786                 :          66 :                 PVIndStats *indstats;
     787                 :             : 
     788                 :             :                 /* Get an index number to process */
     789                 :          66 :                 idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
     790                 :             : 
     791                 :             :                 /* Done for all indexes? */
     792         [ +  + ]:          66 :                 if (idx >= pvs->nindexes)
     793                 :          26 :                         break;
     794                 :             : 
     795                 :          40 :                 indstats = &(pvs->indstats[idx]);
     796                 :             : 
     797                 :             :                 /*
     798                 :             :                  * Skip vacuuming index that is unsafe for workers or has an
     799                 :             :                  * unsuitable target for parallel index vacuum (this is vacuumed in
     800                 :             :                  * parallel_vacuum_process_unsafe_indexes() by the leader).
     801                 :             :                  */
     802         [ +  + ]:          40 :                 if (!indstats->parallel_workers_can_process)
     803                 :          12 :                         continue;
     804                 :             : 
     805                 :             :                 /* Do vacuum or cleanup of the index */
     806                 :          28 :                 parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
     807   [ +  -  +  + ]:          66 :         }
     808                 :             : 
     809                 :             :         /*
     810                 :             :          * We have completed the index vacuum so decrement the active worker
     811                 :             :          * count.
     812                 :             :          */
     813         [ +  + ]:          26 :         if (VacuumActiveNWorkers)
     814                 :          24 :                 pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     815                 :          26 : }
     816                 :             : 
     817                 :             : /*
     818                 :             :  * Perform parallel vacuuming of indexes in leader process.
     819                 :             :  *
     820                 :             :  * Handles index vacuuming (or index cleanup) for indexes that are not
     821                 :             :  * parallel safe.  It's possible that this will vary for a given index, based
     822                 :             :  * on details like whether we're performing index cleanup right now.
     823                 :             :  *
     824                 :             :  * Also performs vacuuming of smaller indexes that fell under the size cutoff
     825                 :             :  * enforced by parallel_vacuum_compute_workers().
     826                 :             :  */
     827                 :             : static void
     828                 :          13 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
     829                 :             : {
     830         [ +  - ]:          13 :         Assert(!IsParallelWorker());
     831                 :             : 
     832                 :             :         /*
     833                 :             :          * Increment the active worker count if we are able to launch any worker.
     834                 :             :          */
     835         [ +  + ]:          13 :         if (VacuumActiveNWorkers)
     836                 :          11 :                 pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     837                 :             : 
     838         [ +  + ]:          53 :         for (int i = 0; i < pvs->nindexes; i++)
     839                 :             :         {
     840                 :          40 :                 PVIndStats *indstats = &(pvs->indstats[i]);
     841                 :             : 
     842                 :             :                 /* Skip, indexes that are safe for workers */
     843         [ +  + ]:          40 :                 if (indstats->parallel_workers_can_process)
     844                 :          28 :                         continue;
     845                 :             : 
     846                 :             :                 /* Do vacuum or cleanup of the index */
     847                 :          12 :                 parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
     848      [ -  +  + ]:          40 :         }
     849                 :             : 
     850                 :             :         /*
     851                 :             :          * We have completed the index vacuum so decrement the active worker
     852                 :             :          * count.
     853                 :             :          */
     854         [ +  + ]:          13 :         if (VacuumActiveNWorkers)
     855                 :          11 :                 pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     856                 :          13 : }
     857                 :             : 
     858                 :             : /*
     859                 :             :  * Vacuum or cleanup index either by leader process or by one of the worker
     860                 :             :  * process.  After vacuuming the index this function copies the index
     861                 :             :  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
     862                 :             :  * segment.
     863                 :             :  */
     864                 :             : static void
     865                 :          40 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     866                 :             :                                                                   PVIndStats *indstats)
     867                 :             : {
     868                 :          40 :         IndexBulkDeleteResult *istat = NULL;
     869                 :          40 :         IndexBulkDeleteResult *istat_res;
     870                 :          40 :         IndexVacuumInfo ivinfo;
     871                 :             : 
     872                 :             :         /*
     873                 :             :          * Update the pointer to the corresponding bulk-deletion result if someone
     874                 :             :          * has already updated it
     875                 :             :          */
     876         [ +  + ]:          40 :         if (indstats->istat_updated)
     877                 :          19 :                 istat = &(indstats->istat);
     878                 :             : 
     879                 :          40 :         ivinfo.index = indrel;
     880                 :          40 :         ivinfo.heaprel = pvs->heaprel;
     881                 :          40 :         ivinfo.analyze_only = false;
     882                 :          40 :         ivinfo.report_progress = false;
     883                 :          40 :         ivinfo.message_level = DEBUG2;
     884                 :          40 :         ivinfo.estimated_count = pvs->shared->estimated_count;
     885                 :          40 :         ivinfo.num_heap_tuples = pvs->shared->reltuples;
     886                 :          40 :         ivinfo.strategy = pvs->bstrategy;
     887                 :             : 
     888                 :             :         /* Update error traceback information */
     889                 :          40 :         pvs->indname = pstrdup(RelationGetRelationName(indrel));
     890                 :          40 :         pvs->status = indstats->status;
     891                 :             : 
     892      [ +  +  - ]:          40 :         switch (indstats->status)
     893                 :             :         {
     894                 :             :                 case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
     895                 :          38 :                         istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
     896                 :          19 :                                                                                           &pvs->shared->dead_items_info);
     897                 :          19 :                         break;
     898                 :             :                 case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
     899                 :          21 :                         istat_res = vac_cleanup_one_index(&ivinfo, istat);
     900                 :          21 :                         break;
     901                 :             :                 default:
     902   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
     903                 :             :                                  indstats->status,
     904                 :             :                                  RelationGetRelationName(indrel));
     905                 :           0 :         }
     906                 :             : 
     907                 :             :         /*
     908                 :             :          * Copy the index bulk-deletion result returned from ambulkdelete and
     909                 :             :          * amvacuumcleanup to the DSM segment if it's the first cycle because they
     910                 :             :          * allocate locally and it's possible that an index will be vacuumed by a
     911                 :             :          * different vacuum process the next cycle.  Copying the result normally
     912                 :             :          * happens only the first time an index is vacuumed.  For any additional
     913                 :             :          * vacuum pass, we directly point to the result on the DSM segment and
     914                 :             :          * pass it to vacuum index APIs so that workers can update it directly.
     915                 :             :          *
     916                 :             :          * Since all vacuum workers write the bulk-deletion result at different
     917                 :             :          * slots we can write them without locking.
     918                 :             :          */
     919   [ +  +  +  + ]:          40 :         if (!indstats->istat_updated && istat_res != NULL)
     920                 :             :         {
     921                 :          15 :                 memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
     922                 :          15 :                 indstats->istat_updated = true;
     923                 :             : 
     924                 :             :                 /* Free the locally-allocated bulk-deletion result */
     925                 :          15 :                 pfree(istat_res);
     926                 :          15 :         }
     927                 :             : 
     928                 :             :         /*
     929                 :             :          * Update the status to completed. No need to lock here since each worker
     930                 :             :          * touches different indexes.
     931                 :             :          */
     932                 :          40 :         indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     933                 :             : 
     934                 :             :         /* Reset error traceback information */
     935                 :          40 :         pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     936                 :          40 :         pfree(pvs->indname);
     937                 :          40 :         pvs->indname = NULL;
     938                 :             : 
     939                 :             :         /*
     940                 :             :          * Call the parallel variant of pgstat_progress_incr_param so workers can
     941                 :             :          * report progress of index vacuum to the leader.
     942                 :             :          */
     943                 :          40 :         pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
     944                 :          40 : }
     945                 :             : 
     946                 :             : /*
     947                 :             :  * Returns false, if the given index can't participate in the next execution of
     948                 :             :  * parallel index vacuum or parallel index cleanup.
     949                 :             :  */
     950                 :             : static bool
     951                 :          38 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     952                 :             :                                                                            bool vacuum)
     953                 :             : {
     954                 :          38 :         uint8           vacoptions;
     955                 :             : 
     956                 :          38 :         vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     957                 :             : 
     958                 :             :         /* In parallel vacuum case, check if it supports parallel bulk-deletion */
     959         [ +  + ]:          38 :         if (vacuum)
     960                 :          18 :                 return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
     961                 :             : 
     962                 :             :         /* Not safe, if the index does not support parallel cleanup */
     963   [ +  +  +  + ]:          20 :         if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
     964                 :          16 :                 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
     965                 :           2 :                 return false;
     966                 :             : 
     967                 :             :         /*
     968                 :             :          * Not safe, if the index supports parallel cleanup conditionally, but we
     969                 :             :          * have already processed the index (for bulkdelete).  We do this to avoid
     970                 :             :          * the need to invoke workers when parallel index cleanup doesn't need to
     971                 :             :          * scan the index.  See the comments for option
     972                 :             :          * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
     973                 :             :          * parallel cleanup conditionally.
     974                 :             :          */
     975   [ +  +  +  + ]:          18 :         if (num_index_scans > 0 &&
     976                 :           9 :                 ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     977                 :           7 :                 return false;
     978                 :             : 
     979                 :          11 :         return true;
     980                 :          38 : }
     981                 :             : 
     982                 :             : /*
     983                 :             :  * Perform work within a launched parallel process.
     984                 :             :  *
     985                 :             :  * Since parallel vacuum workers perform only index vacuum or index cleanup,
     986                 :             :  * we don't need to report progress information.
     987                 :             :  */
     988                 :             : void
     989                 :          13 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
     990                 :             : {
     991                 :          13 :         ParallelVacuumState pvs;
     992                 :          13 :         Relation        rel;
     993                 :          13 :         Relation   *indrels;
     994                 :          13 :         PVIndStats *indstats;
     995                 :          13 :         PVShared   *shared;
     996                 :          13 :         TidStore   *dead_items;
     997                 :          13 :         BufferUsage *buffer_usage;
     998                 :          13 :         WalUsage   *wal_usage;
     999                 :          13 :         int                     nindexes;
    1000                 :          13 :         char       *sharedquery;
    1001                 :          13 :         ErrorContextCallback errcallback;
    1002                 :             : 
    1003                 :             :         /*
    1004                 :             :          * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
    1005                 :             :          * don't support parallel vacuum for autovacuum as of now.
    1006                 :             :          */
    1007         [ +  - ]:          13 :         Assert(MyProc->statusFlags == PROC_IN_VACUUM);
    1008                 :             : 
    1009   [ -  +  -  + ]:          13 :         elog(DEBUG1, "starting parallel vacuum worker");
    1010                 :             : 
    1011                 :          13 :         shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
    1012                 :             : 
    1013                 :             :         /* Set debug_query_string for individual workers */
    1014                 :          13 :         sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
    1015                 :          13 :         debug_query_string = sharedquery;
    1016                 :          13 :         pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1017                 :             : 
    1018                 :             :         /* Track query ID */
    1019                 :          13 :         pgstat_report_query_id(shared->queryid, false);
    1020                 :             : 
    1021                 :             :         /*
    1022                 :             :          * Open table.  The lock mode is the same as the leader process.  It's
    1023                 :             :          * okay because the lock mode does not conflict among the parallel
    1024                 :             :          * workers.
    1025                 :             :          */
    1026                 :          13 :         rel = table_open(shared->relid, ShareUpdateExclusiveLock);
    1027                 :             : 
    1028                 :             :         /*
    1029                 :             :          * Open all indexes. indrels are sorted in order by OID, which should be
    1030                 :             :          * matched to the leader's one.
    1031                 :             :          */
    1032                 :          13 :         vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
    1033         [ +  - ]:          13 :         Assert(nindexes > 0);
    1034                 :             : 
    1035                 :             :         /*
    1036                 :             :          * Apply the desired value of maintenance_work_mem within this process.
    1037                 :             :          * Really we should use SetConfigOption() to change a GUC, but since we're
    1038                 :             :          * already in parallel mode guc.c would complain about that.  Fortunately,
    1039                 :             :          * by the same token guc.c will not let any user-defined code change it.
    1040                 :             :          * So just avert your eyes while we do this:
    1041                 :             :          */
    1042         [ -  + ]:          13 :         if (shared->maintenance_work_mem_worker > 0)
    1043                 :          13 :                 maintenance_work_mem = shared->maintenance_work_mem_worker;
    1044                 :             : 
    1045                 :             :         /* Set index statistics */
    1046                 :          13 :         indstats = (PVIndStats *) shm_toc_lookup(toc,
    1047                 :             :                                                                                          PARALLEL_VACUUM_KEY_INDEX_STATS,
    1048                 :             :                                                                                          false);
    1049                 :             : 
    1050                 :             :         /* Find dead_items in shared memory */
    1051                 :          26 :         dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
    1052                 :          13 :                                                                 shared->dead_items_handle);
    1053                 :             : 
    1054                 :             :         /* Set cost-based vacuum delay */
    1055                 :          13 :         VacuumUpdateCosts();
    1056                 :          13 :         VacuumCostBalance = 0;
    1057                 :          13 :         VacuumCostBalanceLocal = 0;
    1058                 :          13 :         VacuumSharedCostBalance = &(shared->cost_balance);
    1059                 :          13 :         VacuumActiveNWorkers = &(shared->active_nworkers);
    1060                 :             : 
    1061                 :             :         /* Set parallel vacuum state */
    1062                 :          13 :         pvs.indrels = indrels;
    1063                 :          13 :         pvs.nindexes = nindexes;
    1064                 :          13 :         pvs.indstats = indstats;
    1065                 :          13 :         pvs.shared = shared;
    1066                 :          13 :         pvs.dead_items = dead_items;
    1067                 :          13 :         pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
    1068                 :          13 :         pvs.relname = pstrdup(RelationGetRelationName(rel));
    1069                 :          13 :         pvs.heaprel = rel;
    1070                 :             : 
    1071                 :             :         /* These fields will be filled during index vacuum or cleanup */
    1072                 :          13 :         pvs.indname = NULL;
    1073                 :          13 :         pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
    1074                 :             : 
    1075                 :             :         /* Each parallel VACUUM worker gets its own access strategy. */
    1076                 :          13 :         pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
    1077                 :          13 :                                                                                           shared->ring_nbuffers * (BLCKSZ / 1024));
    1078                 :             : 
    1079                 :             :         /* Setup error traceback support for ereport() */
    1080                 :          13 :         errcallback.callback = parallel_vacuum_error_callback;
    1081                 :          13 :         errcallback.arg = &pvs;
    1082                 :          13 :         errcallback.previous = error_context_stack;
    1083                 :          13 :         error_context_stack = &errcallback;
    1084                 :             : 
    1085                 :             :         /* Prepare to track buffer usage during parallel execution */
    1086                 :          13 :         InstrStartParallelQuery();
    1087                 :             : 
    1088                 :             :         /* Process indexes to perform vacuum/cleanup */
    1089                 :          13 :         parallel_vacuum_process_safe_indexes(&pvs);
    1090                 :             : 
    1091                 :             :         /* Report buffer/WAL usage during parallel execution */
    1092                 :          13 :         buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
    1093                 :          13 :         wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
    1094                 :          26 :         InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1095                 :          13 :                                                   &wal_usage[ParallelWorkerNumber]);
    1096                 :             : 
    1097                 :             :         /* Report any remaining cost-based vacuum delay time */
    1098         [ +  - ]:          13 :         if (track_cost_delay_timing)
    1099                 :           0 :                 pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_DELAY_TIME,
    1100                 :           0 :                                                                                         parallel_vacuum_worker_delay_ns);
    1101                 :             : 
    1102                 :          13 :         TidStoreDetach(dead_items);
    1103                 :             : 
    1104                 :             :         /* Pop the error context stack */
    1105                 :          13 :         error_context_stack = errcallback.previous;
    1106                 :             : 
    1107                 :          13 :         vac_close_indexes(nindexes, indrels, RowExclusiveLock);
    1108                 :          13 :         table_close(rel, ShareUpdateExclusiveLock);
    1109                 :          13 :         FreeAccessStrategy(pvs.bstrategy);
    1110                 :          13 : }
    1111                 :             : 
    1112                 :             : /*
    1113                 :             :  * Error context callback for errors occurring during parallel index vacuum.
    1114                 :             :  * The error context messages should match the messages set in the lazy vacuum
    1115                 :             :  * error context.  If you change this function, change vacuum_error_callback()
    1116                 :             :  * as well.
    1117                 :             :  */
    1118                 :             : static void
    1119                 :           0 : parallel_vacuum_error_callback(void *arg)
    1120                 :             : {
    1121                 :           0 :         ParallelVacuumState *errinfo = arg;
    1122                 :             : 
    1123   [ #  #  #  # ]:           0 :         switch (errinfo->status)
    1124                 :             :         {
    1125                 :             :                 case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
    1126                 :           0 :                         errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
    1127                 :           0 :                                            errinfo->indname,
    1128                 :           0 :                                            errinfo->relnamespace,
    1129                 :           0 :                                            errinfo->relname);
    1130                 :           0 :                         break;
    1131                 :             :                 case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
    1132                 :           0 :                         errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
    1133                 :           0 :                                            errinfo->indname,
    1134                 :           0 :                                            errinfo->relnamespace,
    1135                 :           0 :                                            errinfo->relname);
    1136                 :           0 :                         break;
    1137                 :             :                 case PARALLEL_INDVAC_STATUS_INITIAL:
    1138                 :           0 :                 case PARALLEL_INDVAC_STATUS_COMPLETED:
    1139                 :             :                 default:
    1140                 :           0 :                         return;
    1141                 :             :         }
    1142         [ #  # ]:           0 : }
        

Generated by: LCOV version 2.3.2-1