LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 84.6 % 723 612
Test Date: 2026-01-26 10:56:24 Functions: 95.0 % 20 19
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 64.9 % 279 181

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * execParallel.c
       4                 :             :  *        Support routines for parallel execution.
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * This file contains routines that are intended to support setting up,
      10                 :             :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11                 :             :  * executor.  The ParallelContext machinery will handle starting the
      12                 :             :  * workers and ensuring that their state generally matches that of the
      13                 :             :  * leader; see src/backend/access/transam/README.parallel for details.
      14                 :             :  * However, we must save and restore relevant executor state, such as
      15                 :             :  * any ParamListInfo associated with the query, buffer/WAL usage info, and
      16                 :             :  * the actual plan to be passed down to the worker.
      17                 :             :  *
      18                 :             :  * IDENTIFICATION
      19                 :             :  *        src/backend/executor/execParallel.c
      20                 :             :  *
      21                 :             :  *-------------------------------------------------------------------------
      22                 :             :  */
      23                 :             : 
      24                 :             : #include "postgres.h"
      25                 :             : 
      26                 :             : #include "executor/execParallel.h"
      27                 :             : #include "executor/executor.h"
      28                 :             : #include "executor/nodeAgg.h"
      29                 :             : #include "executor/nodeAppend.h"
      30                 :             : #include "executor/nodeBitmapHeapscan.h"
      31                 :             : #include "executor/nodeBitmapIndexscan.h"
      32                 :             : #include "executor/nodeCustom.h"
      33                 :             : #include "executor/nodeForeignscan.h"
      34                 :             : #include "executor/nodeHash.h"
      35                 :             : #include "executor/nodeHashjoin.h"
      36                 :             : #include "executor/nodeIncrementalSort.h"
      37                 :             : #include "executor/nodeIndexonlyscan.h"
      38                 :             : #include "executor/nodeIndexscan.h"
      39                 :             : #include "executor/nodeMemoize.h"
      40                 :             : #include "executor/nodeSeqscan.h"
      41                 :             : #include "executor/nodeSort.h"
      42                 :             : #include "executor/nodeSubplan.h"
      43                 :             : #include "executor/nodeTidrangescan.h"
      44                 :             : #include "executor/tqueue.h"
      45                 :             : #include "jit/jit.h"
      46                 :             : #include "nodes/nodeFuncs.h"
      47                 :             : #include "pgstat.h"
      48                 :             : #include "tcop/tcopprot.h"
      49                 :             : #include "utils/datum.h"
      50                 :             : #include "utils/dsa.h"
      51                 :             : #include "utils/lsyscache.h"
      52                 :             : #include "utils/snapmgr.h"
      53                 :             : 
      54                 :             : /*
      55                 :             :  * Magic numbers for parallel executor communication.  We use constants
      56                 :             :  * greater than any 32-bit integer here so that values < 2^32 can be used
      57                 :             :  * by individual parallel nodes to store their own state.
      58                 :             :  */
      59                 :             : #define PARALLEL_KEY_EXECUTOR_FIXED             UINT64CONST(0xE000000000000001)
      60                 :             : #define PARALLEL_KEY_PLANNEDSTMT                UINT64CONST(0xE000000000000002)
      61                 :             : #define PARALLEL_KEY_PARAMLISTINFO              UINT64CONST(0xE000000000000003)
      62                 :             : #define PARALLEL_KEY_BUFFER_USAGE               UINT64CONST(0xE000000000000004)
      63                 :             : #define PARALLEL_KEY_TUPLE_QUEUE                UINT64CONST(0xE000000000000005)
      64                 :             : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      65                 :             : #define PARALLEL_KEY_DSA                                UINT64CONST(0xE000000000000007)
      66                 :             : #define PARALLEL_KEY_QUERY_TEXT         UINT64CONST(0xE000000000000008)
      67                 :             : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
      68                 :             : #define PARALLEL_KEY_WAL_USAGE                  UINT64CONST(0xE00000000000000A)
      69                 :             : 
      70                 :             : #define PARALLEL_TUPLE_QUEUE_SIZE               65536
      71                 :             : 
      72                 :             : /*
      73                 :             :  * Fixed-size random stuff that we need to pass to parallel workers.
      74                 :             :  */
      75                 :             : typedef struct FixedParallelExecutorState
      76                 :             : {
      77                 :             :         int64           tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      78                 :             :         dsa_pointer param_exec;
      79                 :             :         int                     eflags;
      80                 :             :         int                     jit_flags;
      81                 :             : } FixedParallelExecutorState;
      82                 :             : 
      83                 :             : /*
      84                 :             :  * DSM structure for accumulating per-PlanState instrumentation.
      85                 :             :  *
      86                 :             :  * instrument_options: Same meaning here as in instrument.c.
      87                 :             :  *
      88                 :             :  * instrument_offset: Offset, relative to the start of this structure,
      89                 :             :  * of the first Instrumentation object.  This will depend on the length of
      90                 :             :  * the plan_node_id array.
      91                 :             :  *
      92                 :             :  * num_workers: Number of workers.
      93                 :             :  *
      94                 :             :  * num_plan_nodes: Number of plan nodes.
      95                 :             :  *
      96                 :             :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      97                 :             :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      98                 :             :  */
      99                 :             : struct SharedExecutorInstrumentation
     100                 :             : {
     101                 :             :         int                     instrument_options;
     102                 :             :         int                     instrument_offset;
     103                 :             :         int                     num_workers;
     104                 :             :         int                     num_plan_nodes;
     105                 :             :         int                     plan_node_id[FLEXIBLE_ARRAY_MEMBER];
     106                 :             :         /* array of num_plan_nodes * num_workers Instrumentation objects follows */
     107                 :             : };
     108                 :             : #define GetInstrumentationArray(sei) \
     109                 :             :         (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     110                 :             :          (Instrumentation *) (((char *) sei) + sei->instrument_offset))
     111                 :             : 
     112                 :             : /* Context object for ExecParallelEstimate. */
     113                 :             : typedef struct ExecParallelEstimateContext
     114                 :             : {
     115                 :             :         ParallelContext *pcxt;
     116                 :             :         int                     nnodes;
     117                 :             : } ExecParallelEstimateContext;
     118                 :             : 
     119                 :             : /* Context object for ExecParallelInitializeDSM. */
     120                 :             : typedef struct ExecParallelInitializeDSMContext
     121                 :             : {
     122                 :             :         ParallelContext *pcxt;
     123                 :             :         SharedExecutorInstrumentation *instrumentation;
     124                 :             :         int                     nnodes;
     125                 :             : } ExecParallelInitializeDSMContext;
     126                 :             : 
     127                 :             : /* Helper functions that run in the parallel leader. */
     128                 :             : static char *ExecSerializePlan(Plan *plan, EState *estate);
     129                 :             : static bool ExecParallelEstimate(PlanState *planstate,
     130                 :             :                                                                  ExecParallelEstimateContext *e);
     131                 :             : static bool ExecParallelInitializeDSM(PlanState *planstate,
     132                 :             :                                                                           ExecParallelInitializeDSMContext *d);
     133                 :             : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     134                 :             :                                                                                                         bool reinitialize);
     135                 :             : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     136                 :             :                                                                                 ParallelContext *pcxt);
     137                 :             : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     138                 :             :                                                                                                 SharedExecutorInstrumentation *instrumentation);
     139                 :             : 
     140                 :             : /* Helper function that runs in the parallel worker. */
     141                 :             : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     142                 :             : 
     143                 :             : /*
     144                 :             :  * Create a serialized representation of the plan to be sent to each worker.
     145                 :             :  */
     146                 :             : static char *
     147                 :         121 : ExecSerializePlan(Plan *plan, EState *estate)
     148                 :             : {
     149                 :         121 :         PlannedStmt *pstmt;
     150                 :         121 :         ListCell   *lc;
     151                 :             : 
     152                 :             :         /* We can't scribble on the original plan, so make a copy. */
     153                 :         121 :         plan = copyObject(plan);
     154                 :             : 
     155                 :             :         /*
     156                 :             :          * The worker will start its own copy of the executor, and that copy will
     157                 :             :          * insert a junk filter if the toplevel node has any resjunk entries. We
     158                 :             :          * don't want that to happen, because while resjunk columns shouldn't be
     159                 :             :          * sent back to the user, here the tuples are coming back to another
     160                 :             :          * backend which may very well need them.  So mutate the target list
     161                 :             :          * accordingly.  This is sort of a hack; there might be better ways to do
     162                 :             :          * this...
     163                 :             :          */
     164   [ +  +  +  +  :         334 :         foreach(lc, plan->targetlist)
                   +  + ]
     165                 :             :         {
     166                 :         213 :                 TargetEntry *tle = lfirst_node(TargetEntry, lc);
     167                 :             : 
     168                 :         213 :                 tle->resjunk = false;
     169                 :         213 :         }
     170                 :             : 
     171                 :             :         /*
     172                 :             :          * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     173                 :             :          * for our purposes, but the worker will need at least a minimal
     174                 :             :          * PlannedStmt to start the executor.
     175                 :             :          */
     176                 :         121 :         pstmt = makeNode(PlannedStmt);
     177                 :         121 :         pstmt->commandType = CMD_SELECT;
     178                 :         121 :         pstmt->queryId = pgstat_get_my_query_id();
     179                 :         121 :         pstmt->planId = pgstat_get_my_plan_id();
     180                 :         121 :         pstmt->hasReturning = false;
     181                 :         121 :         pstmt->hasModifyingCTE = false;
     182                 :         121 :         pstmt->canSetTag = true;
     183                 :         121 :         pstmt->transientPlan = false;
     184                 :         121 :         pstmt->dependsOnRole = false;
     185                 :         121 :         pstmt->parallelModeNeeded = false;
     186                 :         121 :         pstmt->planTree = plan;
     187                 :         121 :         pstmt->partPruneInfos = estate->es_part_prune_infos;
     188                 :         121 :         pstmt->rtable = estate->es_range_table;
     189                 :         121 :         pstmt->unprunableRelids = estate->es_unpruned_relids;
     190                 :         121 :         pstmt->permInfos = estate->es_rteperminfos;
     191                 :         121 :         pstmt->resultRelations = NIL;
     192                 :         121 :         pstmt->appendRelations = NIL;
     193                 :         121 :         pstmt->planOrigin = PLAN_STMT_INTERNAL;
     194                 :             : 
     195                 :             :         /*
     196                 :             :          * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     197                 :             :          * for unsafe ones (so that the list indexes of the safe ones are
     198                 :             :          * preserved).  This positively ensures that the worker won't try to run,
     199                 :             :          * or even do ExecInitNode on, an unsafe subplan.  That's important to
     200                 :             :          * protect, eg, non-parallel-aware FDWs from getting into trouble.
     201                 :             :          */
     202                 :         121 :         pstmt->subplans = NIL;
     203   [ +  +  +  +  :         130 :         foreach(lc, estate->es_plannedstmt->subplans)
                   +  + ]
     204                 :             :         {
     205                 :           9 :                 Plan       *subplan = (Plan *) lfirst(lc);
     206                 :             : 
     207   [ +  -  +  + ]:           9 :                 if (subplan && !subplan->parallel_safe)
     208                 :           2 :                         subplan = NULL;
     209                 :           9 :                 pstmt->subplans = lappend(pstmt->subplans, subplan);
     210                 :           9 :         }
     211                 :             : 
     212                 :         121 :         pstmt->rewindPlanIDs = NULL;
     213                 :         121 :         pstmt->rowMarks = NIL;
     214                 :         121 :         pstmt->relationOids = NIL;
     215                 :         121 :         pstmt->invalItems = NIL;     /* workers can't replan anyway... */
     216                 :         121 :         pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
     217                 :         121 :         pstmt->utilityStmt = NULL;
     218                 :         121 :         pstmt->stmt_location = -1;
     219                 :         121 :         pstmt->stmt_len = -1;
     220                 :             : 
     221                 :             :         /* Return serialized copy of our dummy PlannedStmt. */
     222                 :         242 :         return nodeToString(pstmt);
     223                 :         121 : }
     224                 :             : 
     225                 :             : /*
     226                 :             :  * Parallel-aware plan nodes (and occasionally others) may need some state
     227                 :             :  * which is shared across all parallel workers.  Before we size the DSM, give
     228                 :             :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     229                 :             :  * &pcxt->estimator.
     230                 :             :  *
     231                 :             :  * While we're at it, count the number of PlanState nodes in the tree, so
     232                 :             :  * we know how many Instrumentation structures we need.
     233                 :             :  */
     234                 :             : static bool
     235                 :         500 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     236                 :             : {
     237         [ +  - ]:         500 :         if (planstate == NULL)
     238                 :           0 :                 return false;
     239                 :             : 
     240                 :             :         /* Count this node. */
     241                 :         500 :         e->nnodes++;
     242                 :             : 
     243   [ +  +  +  +  :         500 :         switch (nodeTag(planstate))
          +  +  +  +  +  
          -  +  +  -  +  
                   -  + ]
     244                 :             :         {
     245                 :             :                 case T_SeqScanState:
     246         [ +  + ]:         190 :                         if (planstate->plan->parallel_aware)
     247                 :         302 :                                 ExecSeqScanEstimate((SeqScanState *) planstate,
     248                 :         151 :                                                                         e->pcxt);
     249                 :         190 :                         break;
     250                 :             :                 case T_IndexScanState:
     251                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     252                 :          98 :                         ExecIndexScanEstimate((IndexScanState *) planstate,
     253                 :          49 :                                                                   e->pcxt);
     254                 :          49 :                         break;
     255                 :             :                 case T_IndexOnlyScanState:
     256                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     257                 :          18 :                         ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     258                 :           9 :                                                                           e->pcxt);
     259                 :           9 :                         break;
     260                 :             :                 case T_BitmapIndexScanState:
     261                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     262                 :           6 :                         ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
     263                 :           3 :                                                                                 e->pcxt);
     264                 :           3 :                         break;
     265                 :             :                 case T_ForeignScanState:
     266         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
     267                 :           0 :                                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     268                 :           0 :                                                                                 e->pcxt);
     269                 :           0 :                         break;
     270                 :             :                 case T_TidRangeScanState:
     271         [ -  + ]:           4 :                         if (planstate->plan->parallel_aware)
     272                 :           8 :                                 ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
     273                 :           4 :                                                                                  e->pcxt);
     274                 :           4 :                         break;
     275                 :             :                 case T_AppendState:
     276         [ +  + ]:          31 :                         if (planstate->plan->parallel_aware)
     277                 :          46 :                                 ExecAppendEstimate((AppendState *) planstate,
     278                 :          23 :                                                                    e->pcxt);
     279                 :          31 :                         break;
     280                 :             :                 case T_CustomScanState:
     281         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
     282                 :           0 :                                 ExecCustomScanEstimate((CustomScanState *) planstate,
     283                 :           0 :                                                                            e->pcxt);
     284                 :           0 :                         break;
     285                 :             :                 case T_BitmapHeapScanState:
     286         [ -  + ]:           3 :                         if (planstate->plan->parallel_aware)
     287                 :           6 :                                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     288                 :           3 :                                                                            e->pcxt);
     289                 :           3 :                         break;
     290                 :             :                 case T_HashJoinState:
     291         [ +  + ]:          33 :                         if (planstate->plan->parallel_aware)
     292                 :          42 :                                 ExecHashJoinEstimate((HashJoinState *) planstate,
     293                 :          21 :                                                                          e->pcxt);
     294                 :          33 :                         break;
     295                 :             :                 case T_HashState:
     296                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     297                 :          33 :                         ExecHashEstimate((HashState *) planstate, e->pcxt);
     298                 :          33 :                         break;
     299                 :             :                 case T_SortState:
     300                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     301                 :          26 :                         ExecSortEstimate((SortState *) planstate, e->pcxt);
     302                 :          26 :                         break;
     303                 :             :                 case T_IncrementalSortState:
     304                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     305                 :           0 :                         ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
     306                 :           0 :                         break;
     307                 :             :                 case T_AggState:
     308                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     309                 :          95 :                         ExecAggEstimate((AggState *) planstate, e->pcxt);
     310                 :          95 :                         break;
     311                 :             :                 case T_MemoizeState:
     312                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     313                 :           1 :                         ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
     314                 :           1 :                         break;
     315                 :             :                 default:
     316                 :          23 :                         break;
     317                 :             :         }
     318                 :             : 
     319                 :         500 :         return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     320                 :         500 : }
     321                 :             : 
     322                 :             : /*
     323                 :             :  * Estimate the amount of space required to serialize the indicated parameters.
     324                 :             :  */
     325                 :             : static Size
     326                 :           4 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
     327                 :             : {
     328                 :           4 :         int                     paramid;
     329                 :           4 :         Size            sz = sizeof(int);
     330                 :             : 
     331                 :           4 :         paramid = -1;
     332         [ +  + ]:           9 :         while ((paramid = bms_next_member(params, paramid)) >= 0)
     333                 :             :         {
     334                 :           5 :                 Oid                     typeOid;
     335                 :           5 :                 int16           typLen;
     336                 :           5 :                 bool            typByVal;
     337                 :           5 :                 ParamExecData *prm;
     338                 :             : 
     339                 :           5 :                 prm = &(estate->es_param_exec_vals[paramid]);
     340                 :          10 :                 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     341                 :           5 :                                                            paramid);
     342                 :             : 
     343                 :           5 :                 sz = add_size(sz, sizeof(int)); /* space for paramid */
     344                 :             : 
     345                 :             :                 /* space for datum/isnull */
     346         [ +  - ]:           5 :                 if (OidIsValid(typeOid))
     347                 :           5 :                         get_typlenbyval(typeOid, &typLen, &typByVal);
     348                 :             :                 else
     349                 :             :                 {
     350                 :             :                         /* If no type OID, assume by-value, like copyParamList does. */
     351                 :           0 :                         typLen = sizeof(Datum);
     352                 :           0 :                         typByVal = true;
     353                 :             :                 }
     354                 :          10 :                 sz = add_size(sz,
     355                 :          10 :                                           datumEstimateSpace(prm->value, prm->isnull,
     356                 :           5 :                                                                                  typByVal, typLen));
     357                 :           5 :         }
     358                 :           8 :         return sz;
     359                 :           4 : }
     360                 :             : 
     361                 :             : /*
     362                 :             :  * Serialize specified PARAM_EXEC parameters.
     363                 :             :  *
     364                 :             :  * We write the number of parameters first, as a 4-byte integer, and then
     365                 :             :  * write details for each parameter in turn.  The details for each parameter
     366                 :             :  * consist of a 4-byte paramid (location of param in execution time internal
     367                 :             :  * parameter array) and then the datum as serialized by datumSerialize().
     368                 :             :  */
     369                 :             : static dsa_pointer
     370                 :           4 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
     371                 :             : {
     372                 :           4 :         Size            size;
     373                 :           4 :         int                     nparams;
     374                 :           4 :         int                     paramid;
     375                 :           4 :         ParamExecData *prm;
     376                 :           4 :         dsa_pointer handle;
     377                 :           4 :         char       *start_address;
     378                 :             : 
     379                 :             :         /* Allocate enough space for the current parameter values. */
     380                 :           4 :         size = EstimateParamExecSpace(estate, params);
     381                 :           4 :         handle = dsa_allocate(area, size);
     382                 :           4 :         start_address = dsa_get_address(area, handle);
     383                 :             : 
     384                 :             :         /* First write the number of parameters as a 4-byte integer. */
     385                 :           4 :         nparams = bms_num_members(params);
     386                 :           4 :         memcpy(start_address, &nparams, sizeof(int));
     387                 :           4 :         start_address += sizeof(int);
     388                 :             : 
     389                 :             :         /* Write details for each parameter in turn. */
     390                 :           4 :         paramid = -1;
     391         [ +  + ]:           9 :         while ((paramid = bms_next_member(params, paramid)) >= 0)
     392                 :             :         {
     393                 :           5 :                 Oid                     typeOid;
     394                 :           5 :                 int16           typLen;
     395                 :           5 :                 bool            typByVal;
     396                 :             : 
     397                 :           5 :                 prm = &(estate->es_param_exec_vals[paramid]);
     398                 :          10 :                 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     399                 :           5 :                                                            paramid);
     400                 :             : 
     401                 :             :                 /* Write paramid. */
     402                 :           5 :                 memcpy(start_address, &paramid, sizeof(int));
     403                 :           5 :                 start_address += sizeof(int);
     404                 :             : 
     405                 :             :                 /* Write datum/isnull */
     406         [ +  - ]:           5 :                 if (OidIsValid(typeOid))
     407                 :           5 :                         get_typlenbyval(typeOid, &typLen, &typByVal);
     408                 :             :                 else
     409                 :             :                 {
     410                 :             :                         /* If no type OID, assume by-value, like copyParamList does. */
     411                 :           0 :                         typLen = sizeof(Datum);
     412                 :           0 :                         typByVal = true;
     413                 :             :                 }
     414                 :           5 :                 datumSerialize(prm->value, prm->isnull, typByVal, typLen,
     415                 :             :                                            &start_address);
     416                 :           5 :         }
     417                 :             : 
     418                 :           8 :         return handle;
     419                 :           4 : }
     420                 :             : 
     421                 :             : /*
     422                 :             :  * Restore specified PARAM_EXEC parameters.
     423                 :             :  */
     424                 :             : static void
     425                 :          12 : RestoreParamExecParams(char *start_address, EState *estate)
     426                 :             : {
     427                 :          12 :         int                     nparams;
     428                 :          12 :         int                     i;
     429                 :          12 :         int                     paramid;
     430                 :             : 
     431                 :          12 :         memcpy(&nparams, start_address, sizeof(int));
     432                 :          12 :         start_address += sizeof(int);
     433                 :             : 
     434         [ +  + ]:          26 :         for (i = 0; i < nparams; i++)
     435                 :             :         {
     436                 :          14 :                 ParamExecData *prm;
     437                 :             : 
     438                 :             :                 /* Read paramid */
     439                 :          14 :                 memcpy(&paramid, start_address, sizeof(int));
     440                 :          14 :                 start_address += sizeof(int);
     441                 :          14 :                 prm = &(estate->es_param_exec_vals[paramid]);
     442                 :             : 
     443                 :             :                 /* Read datum/isnull. */
     444                 :          14 :                 prm->value = datumRestore(&start_address, &prm->isnull);
     445                 :          14 :                 prm->execPlan = NULL;
     446                 :          14 :         }
     447                 :          12 : }
     448                 :             : 
     449                 :             : /*
     450                 :             :  * Initialize the dynamic shared memory segment that will be used to control
     451                 :             :  * parallel execution.
     452                 :             :  */
     453                 :             : static bool
     454                 :         500 : ExecParallelInitializeDSM(PlanState *planstate,
     455                 :             :                                                   ExecParallelInitializeDSMContext *d)
     456                 :             : {
     457         [ +  - ]:         500 :         if (planstate == NULL)
     458                 :           0 :                 return false;
     459                 :             : 
     460                 :             :         /* If instrumentation is enabled, initialize slot for this node. */
     461         [ +  + ]:         500 :         if (d->instrumentation != NULL)
     462                 :         171 :                 d->instrumentation->plan_node_id[d->nnodes] =
     463                 :         171 :                         planstate->plan->plan_node_id;
     464                 :             : 
     465                 :             :         /* Count this node. */
     466                 :         500 :         d->nnodes++;
     467                 :             : 
     468                 :             :         /*
     469                 :             :          * Call initializers for DSM-using plan nodes.
     470                 :             :          *
     471                 :             :          * Most plan nodes won't do anything here, but plan nodes that allocated
     472                 :             :          * DSM may need to initialize shared state in the DSM before parallel
     473                 :             :          * workers are launched.  They can allocate the space they previously
     474                 :             :          * estimated using shm_toc_allocate, and add the keys they previously
     475                 :             :          * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     476                 :             :          */
     477   [ +  +  +  +  :         500 :         switch (nodeTag(planstate))
          +  +  +  +  +  
          -  +  +  -  +  
                   -  + ]
     478                 :             :         {
     479                 :             :                 case T_SeqScanState:
     480         [ +  + ]:         190 :                         if (planstate->plan->parallel_aware)
     481                 :         302 :                                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     482                 :         151 :                                                                                  d->pcxt);
     483                 :         190 :                         break;
     484                 :             :                 case T_IndexScanState:
     485                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     486                 :          49 :                         ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
     487                 :          49 :                         break;
     488                 :             :                 case T_IndexOnlyScanState:
     489                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     490                 :          18 :                         ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     491                 :           9 :                                                                                    d->pcxt);
     492                 :           9 :                         break;
     493                 :             :                 case T_BitmapIndexScanState:
     494                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     495                 :           3 :                         ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
     496                 :           3 :                         break;
     497                 :             :                 case T_ForeignScanState:
     498         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
     499                 :           0 :                                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     500                 :           0 :                                                                                          d->pcxt);
     501                 :           0 :                         break;
     502                 :             :                 case T_TidRangeScanState:
     503         [ -  + ]:           4 :                         if (planstate->plan->parallel_aware)
     504                 :           8 :                                 ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
     505                 :           4 :                                                                                           d->pcxt);
     506                 :           4 :                         break;
     507                 :             :                 case T_AppendState:
     508         [ +  + ]:          31 :                         if (planstate->plan->parallel_aware)
     509                 :          46 :                                 ExecAppendInitializeDSM((AppendState *) planstate,
     510                 :          23 :                                                                                 d->pcxt);
     511                 :          31 :                         break;
     512                 :             :                 case T_CustomScanState:
     513         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
     514                 :           0 :                                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     515                 :           0 :                                                                                         d->pcxt);
     516                 :           0 :                         break;
     517                 :             :                 case T_BitmapHeapScanState:
     518         [ -  + ]:           3 :                         if (planstate->plan->parallel_aware)
     519                 :           6 :                                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     520                 :           3 :                                                                                         d->pcxt);
     521                 :           3 :                         break;
     522                 :             :                 case T_HashJoinState:
     523         [ +  + ]:          33 :                         if (planstate->plan->parallel_aware)
     524                 :          42 :                                 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
     525                 :          21 :                                                                                   d->pcxt);
     526                 :          33 :                         break;
     527                 :             :                 case T_HashState:
     528                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     529                 :          33 :                         ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
     530                 :          33 :                         break;
     531                 :             :                 case T_SortState:
     532                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     533                 :          26 :                         ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     534                 :          26 :                         break;
     535                 :             :                 case T_IncrementalSortState:
     536                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     537                 :           0 :                         ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
     538                 :           0 :                         break;
     539                 :             :                 case T_AggState:
     540                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     541                 :          95 :                         ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
     542                 :          95 :                         break;
     543                 :             :                 case T_MemoizeState:
     544                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
     545                 :           1 :                         ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
     546                 :           1 :                         break;
     547                 :             :                 default:
     548                 :          23 :                         break;
     549                 :             :         }
     550                 :             : 
     551                 :         500 :         return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     552                 :         500 : }
     553                 :             : 
     554                 :             : /*
     555                 :             :  * It sets up the response queues for backend workers to return tuples
     556                 :             :  * to the main backend and start the workers.
     557                 :             :  */
     558                 :             : static shm_mq_handle **
     559                 :         164 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     560                 :             : {
     561                 :         164 :         shm_mq_handle **responseq;
     562                 :         164 :         char       *tqueuespace;
     563                 :         164 :         int                     i;
     564                 :             : 
     565                 :             :         /* Skip this if no workers. */
     566         [ +  - ]:         164 :         if (pcxt->nworkers == 0)
     567                 :           0 :                 return NULL;
     568                 :             : 
     569                 :             :         /* Allocate memory for shared memory queue handles. */
     570                 :         164 :         responseq = (shm_mq_handle **)
     571                 :         164 :                 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     572                 :             : 
     573                 :             :         /*
     574                 :             :          * If not reinitializing, allocate space from the DSM for the queues;
     575                 :             :          * otherwise, find the already allocated space.
     576                 :             :          */
     577         [ +  + ]:         164 :         if (!reinitialize)
     578                 :         121 :                 tqueuespace =
     579                 :         242 :                         shm_toc_allocate(pcxt->toc,
     580                 :         121 :                                                          mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     581                 :         121 :                                                                           pcxt->nworkers));
     582                 :             :         else
     583                 :          43 :                 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     584                 :             : 
     585                 :             :         /* Create the queues, and become the receiver for each. */
     586         [ +  + ]:         613 :         for (i = 0; i < pcxt->nworkers; ++i)
     587                 :             :         {
     588                 :         449 :                 shm_mq     *mq;
     589                 :             : 
     590                 :         898 :                 mq = shm_mq_create(tqueuespace +
     591                 :         449 :                                                    ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     592                 :             :                                                    (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     593                 :             : 
     594                 :         449 :                 shm_mq_set_receiver(mq, MyProc);
     595                 :         449 :                 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     596                 :         449 :         }
     597                 :             : 
     598                 :             :         /* Add array of queues to shm_toc, so others can find it. */
     599         [ +  + ]:         164 :         if (!reinitialize)
     600                 :         121 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     601                 :             : 
     602                 :             :         /* Return array of handles. */
     603                 :         164 :         return responseq;
     604                 :         164 : }
     605                 :             : 
     606                 :             : /*
     607                 :             :  * Sets up the required infrastructure for backend workers to perform
     608                 :             :  * execution and return results to the main backend.
     609                 :             :  */
     610                 :             : ParallelExecutorInfo *
     611                 :         121 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
     612                 :             :                                          Bitmapset *sendParams, int nworkers,
     613                 :             :                                          int64 tuples_needed)
     614                 :             : {
     615                 :         121 :         ParallelExecutorInfo *pei;
     616                 :         121 :         ParallelContext *pcxt;
     617                 :         121 :         ExecParallelEstimateContext e;
     618                 :         121 :         ExecParallelInitializeDSMContext d;
     619                 :         121 :         FixedParallelExecutorState *fpes;
     620                 :         121 :         char       *pstmt_data;
     621                 :         121 :         char       *pstmt_space;
     622                 :         121 :         char       *paramlistinfo_space;
     623                 :         121 :         BufferUsage *bufusage_space;
     624                 :         121 :         WalUsage   *walusage_space;
     625                 :         121 :         SharedExecutorInstrumentation *instrumentation = NULL;
     626                 :         121 :         SharedJitInstrumentation *jit_instrumentation = NULL;
     627                 :         121 :         int                     pstmt_len;
     628                 :         121 :         int                     paramlistinfo_len;
     629                 :         121 :         int                     instrumentation_len = 0;
     630                 :         121 :         int                     jit_instrumentation_len = 0;
     631                 :         121 :         int                     instrument_offset = 0;
     632                 :         121 :         Size            dsa_minsize = dsa_minimum_size();
     633                 :         121 :         char       *query_string;
     634                 :         121 :         int                     query_len;
     635                 :             : 
     636                 :             :         /*
     637                 :             :          * Force any initplan outputs that we're going to pass to workers to be
     638                 :             :          * evaluated, if they weren't already.
     639                 :             :          *
     640                 :             :          * For simplicity, we use the EState's per-output-tuple ExprContext here.
     641                 :             :          * That risks intra-query memory leakage, since we might pass through here
     642                 :             :          * many times before that ExprContext gets reset; but ExecSetParamPlan
     643                 :             :          * doesn't normally leak any memory in the context (see its comments), so
     644                 :             :          * it doesn't seem worth complicating this function's API to pass it a
     645                 :             :          * shorter-lived ExprContext.  This might need to change someday.
     646                 :             :          */
     647         [ +  + ]:         121 :         ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     648                 :             : 
     649                 :             :         /* Allocate object for return value. */
     650                 :         121 :         pei = palloc0_object(ParallelExecutorInfo);
     651                 :         121 :         pei->finished = false;
     652                 :         121 :         pei->planstate = planstate;
     653                 :             : 
     654                 :             :         /* Fix up and serialize plan to be sent to workers. */
     655                 :         121 :         pstmt_data = ExecSerializePlan(planstate->plan, estate);
     656                 :             : 
     657                 :             :         /* Create a parallel context. */
     658                 :         121 :         pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     659                 :         121 :         pei->pcxt = pcxt;
     660                 :             : 
     661                 :             :         /*
     662                 :             :          * Before telling the parallel context to create a dynamic shared memory
     663                 :             :          * segment, we need to figure out how big it should be.  Estimate space
     664                 :             :          * for the various things we need to store.
     665                 :             :          */
     666                 :             : 
     667                 :             :         /* Estimate space for fixed-size state. */
     668                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator,
     669                 :             :                                                    sizeof(FixedParallelExecutorState));
     670                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     671                 :             : 
     672                 :             :         /* Estimate space for query text. */
     673                 :         121 :         query_len = strlen(estate->es_sourceText);
     674                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
     675                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     676                 :             : 
     677                 :             :         /* Estimate space for serialized PlannedStmt. */
     678                 :         121 :         pstmt_len = strlen(pstmt_data) + 1;
     679                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     680                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     681                 :             : 
     682                 :             :         /* Estimate space for serialized ParamListInfo. */
     683                 :         121 :         paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
     684                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
     685                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     686                 :             : 
     687                 :             :         /*
     688                 :             :          * Estimate space for BufferUsage.
     689                 :             :          *
     690                 :             :          * If EXPLAIN is not in use and there are no extensions loaded that care,
     691                 :             :          * we could skip this.  But we have no way of knowing whether anyone's
     692                 :             :          * looking at pgBufferUsage, so do it unconditionally.
     693                 :             :          */
     694                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator,
     695                 :             :                                                    mul_size(sizeof(BufferUsage), pcxt->nworkers));
     696                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     697                 :             : 
     698                 :             :         /*
     699                 :             :          * Same thing for WalUsage.
     700                 :             :          */
     701                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator,
     702                 :             :                                                    mul_size(sizeof(WalUsage), pcxt->nworkers));
     703                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     704                 :             : 
     705                 :             :         /* Estimate space for tuple queues. */
     706                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator,
     707                 :             :                                                    mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     708                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     709                 :             : 
     710                 :             :         /*
     711                 :             :          * Give parallel-aware nodes a chance to add to the estimates, and get a
     712                 :             :          * count of how many PlanState nodes there are.
     713                 :             :          */
     714                 :         121 :         e.pcxt = pcxt;
     715                 :         121 :         e.nnodes = 0;
     716                 :         121 :         ExecParallelEstimate(planstate, &e);
     717                 :             : 
     718                 :             :         /* Estimate space for instrumentation, if required. */
     719         [ +  + ]:         121 :         if (estate->es_instrument)
     720                 :             :         {
     721                 :          30 :                 instrumentation_len =
     722                 :          30 :                         offsetof(SharedExecutorInstrumentation, plan_node_id) +
     723                 :          30 :                         sizeof(int) * e.nnodes;
     724                 :          30 :                 instrumentation_len = MAXALIGN(instrumentation_len);
     725                 :          30 :                 instrument_offset = instrumentation_len;
     726                 :          30 :                 instrumentation_len +=
     727                 :          30 :                         mul_size(sizeof(Instrumentation),
     728                 :          30 :                                          mul_size(e.nnodes, nworkers));
     729                 :          30 :                 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     730                 :          30 :                 shm_toc_estimate_keys(&pcxt->estimator, 1);
     731                 :             : 
     732                 :             :                 /* Estimate space for JIT instrumentation, if required. */
     733         [ +  - ]:          30 :                 if (estate->es_jit_flags != PGJIT_NONE)
     734                 :             :                 {
     735                 :           0 :                         jit_instrumentation_len =
     736                 :           0 :                                 offsetof(SharedJitInstrumentation, jit_instr) +
     737                 :           0 :                                 sizeof(JitInstrumentation) * nworkers;
     738                 :           0 :                         shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
     739                 :           0 :                         shm_toc_estimate_keys(&pcxt->estimator, 1);
     740                 :           0 :                 }
     741                 :          30 :         }
     742                 :             : 
     743                 :             :         /* Estimate space for DSA area. */
     744                 :         121 :         shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     745                 :         121 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     746                 :             : 
     747                 :             :         /*
     748                 :             :          * InitializeParallelDSM() passes the active snapshot to the parallel
     749                 :             :          * worker, which uses it to set es_snapshot.  Make sure we don't set
     750                 :             :          * es_snapshot differently in the child.
     751                 :             :          */
     752         [ +  - ]:         121 :         Assert(GetActiveSnapshot() == estate->es_snapshot);
     753                 :             : 
     754                 :             :         /* Everyone's had a chance to ask for space, so now create the DSM. */
     755                 :         121 :         InitializeParallelDSM(pcxt);
     756                 :             : 
     757                 :             :         /*
     758                 :             :          * OK, now we have a dynamic shared memory segment, and it should be big
     759                 :             :          * enough to store all of the data we estimated we would want to put into
     760                 :             :          * it, plus whatever general stuff (not specifically executor-related) the
     761                 :             :          * ParallelContext itself needs to store there.  None of the space we
     762                 :             :          * asked for has been allocated or initialized yet, though, so do that.
     763                 :             :          */
     764                 :             : 
     765                 :             :         /* Store fixed-size state. */
     766                 :         121 :         fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     767                 :         121 :         fpes->tuples_needed = tuples_needed;
     768                 :         121 :         fpes->param_exec = InvalidDsaPointer;
     769                 :         121 :         fpes->eflags = estate->es_top_eflags;
     770                 :         121 :         fpes->jit_flags = estate->es_jit_flags;
     771                 :         121 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     772                 :             : 
     773                 :             :         /* Store query string */
     774                 :         121 :         query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
     775                 :         121 :         memcpy(query_string, estate->es_sourceText, query_len + 1);
     776                 :         121 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     777                 :             : 
     778                 :             :         /* Store serialized PlannedStmt. */
     779                 :         121 :         pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     780                 :         121 :         memcpy(pstmt_space, pstmt_data, pstmt_len);
     781                 :         121 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     782                 :             : 
     783                 :             :         /* Store serialized ParamListInfo. */
     784                 :         121 :         paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
     785                 :         121 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
     786                 :         121 :         SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
     787                 :             : 
     788                 :             :         /* Allocate space for each worker's BufferUsage; no need to initialize. */
     789                 :         242 :         bufusage_space = shm_toc_allocate(pcxt->toc,
     790                 :         121 :                                                                           mul_size(sizeof(BufferUsage), pcxt->nworkers));
     791                 :         121 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     792                 :         121 :         pei->buffer_usage = bufusage_space;
     793                 :             : 
     794                 :             :         /* Same for WalUsage. */
     795                 :         242 :         walusage_space = shm_toc_allocate(pcxt->toc,
     796                 :         121 :                                                                           mul_size(sizeof(WalUsage), pcxt->nworkers));
     797                 :         121 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
     798                 :         121 :         pei->wal_usage = walusage_space;
     799                 :             : 
     800                 :             :         /* Set up the tuple queues that the workers will write into. */
     801                 :         121 :         pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     802                 :             : 
     803                 :             :         /* We don't need the TupleQueueReaders yet, though. */
     804                 :         121 :         pei->reader = NULL;
     805                 :             : 
     806                 :             :         /*
     807                 :             :          * If instrumentation options were supplied, allocate space for the data.
     808                 :             :          * It only gets partially initialized here; the rest happens during
     809                 :             :          * ExecParallelInitializeDSM.
     810                 :             :          */
     811         [ +  + ]:         121 :         if (estate->es_instrument)
     812                 :             :         {
     813                 :          30 :                 Instrumentation *instrument;
     814                 :          30 :                 int                     i;
     815                 :             : 
     816                 :          30 :                 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     817                 :          30 :                 instrumentation->instrument_options = estate->es_instrument;
     818                 :          30 :                 instrumentation->instrument_offset = instrument_offset;
     819                 :          30 :                 instrumentation->num_workers = nworkers;
     820                 :          30 :                 instrumentation->num_plan_nodes = e.nnodes;
     821                 :          30 :                 instrument = GetInstrumentationArray(instrumentation);
     822         [ +  + ]:         310 :                 for (i = 0; i < nworkers * e.nnodes; ++i)
     823                 :         280 :                         InstrInit(&instrument[i], estate->es_instrument);
     824                 :          60 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     825                 :          30 :                                            instrumentation);
     826                 :          30 :                 pei->instrumentation = instrumentation;
     827                 :             : 
     828         [ +  - ]:          30 :                 if (estate->es_jit_flags != PGJIT_NONE)
     829                 :             :                 {
     830                 :           0 :                         jit_instrumentation = shm_toc_allocate(pcxt->toc,
     831                 :           0 :                                                                                                    jit_instrumentation_len);
     832                 :           0 :                         jit_instrumentation->num_workers = nworkers;
     833                 :           0 :                         memset(jit_instrumentation->jit_instr, 0,
     834                 :             :                                    sizeof(JitInstrumentation) * nworkers);
     835                 :           0 :                         shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     836                 :           0 :                                                    jit_instrumentation);
     837                 :           0 :                         pei->jit_instrumentation = jit_instrumentation;
     838                 :           0 :                 }
     839                 :          30 :         }
     840                 :             : 
     841                 :             :         /*
     842                 :             :          * Create a DSA area that can be used by the leader and all workers.
     843                 :             :          * (However, if we failed to create a DSM and are using private memory
     844                 :             :          * instead, then skip this.)
     845                 :             :          */
     846         [ +  - ]:         121 :         if (pcxt->seg != NULL)
     847                 :             :         {
     848                 :         121 :                 char       *area_space;
     849                 :             : 
     850                 :         121 :                 area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     851                 :         121 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     852                 :         121 :                 pei->area = dsa_create_in_place(area_space, dsa_minsize,
     853                 :             :                                                                                 LWTRANCHE_PARALLEL_QUERY_DSA,
     854                 :             :                                                                                 pcxt->seg);
     855                 :             : 
     856                 :             :                 /*
     857                 :             :                  * Serialize parameters, if any, using DSA storage.  We don't dare use
     858                 :             :                  * the main parallel query DSM for this because we might relaunch
     859                 :             :                  * workers after the values have changed (and thus the amount of
     860                 :             :                  * storage required has changed).
     861                 :             :                  */
     862         [ +  + ]:         121 :                 if (!bms_is_empty(sendParams))
     863                 :             :                 {
     864                 :           8 :                         pei->param_exec = SerializeParamExecParams(estate, sendParams,
     865                 :           4 :                                                                                                            pei->area);
     866                 :           4 :                         fpes->param_exec = pei->param_exec;
     867                 :           4 :                 }
     868                 :         121 :         }
     869                 :             : 
     870                 :             :         /*
     871                 :             :          * Give parallel-aware nodes a chance to initialize their shared data.
     872                 :             :          * This also initializes the elements of instrumentation->ps_instrument,
     873                 :             :          * if it exists.
     874                 :             :          */
     875                 :         121 :         d.pcxt = pcxt;
     876                 :         121 :         d.instrumentation = instrumentation;
     877                 :         121 :         d.nnodes = 0;
     878                 :             : 
     879                 :             :         /* Install our DSA area while initializing the plan. */
     880                 :         121 :         estate->es_query_dsa = pei->area;
     881                 :         121 :         ExecParallelInitializeDSM(planstate, &d);
     882                 :         121 :         estate->es_query_dsa = NULL;
     883                 :             : 
     884                 :             :         /*
     885                 :             :          * Make sure that the world hasn't shifted under our feet.  This could
     886                 :             :          * probably just be an Assert(), but let's be conservative for now.
     887                 :             :          */
     888         [ +  - ]:         121 :         if (e.nnodes != d.nnodes)
     889   [ #  #  #  # ]:           0 :                 elog(ERROR, "inconsistent count of PlanState nodes");
     890                 :             : 
     891                 :             :         /* OK, we're ready to rock and roll. */
     892                 :         242 :         return pei;
     893                 :         121 : }
     894                 :             : 
     895                 :             : /*
     896                 :             :  * Set up tuple queue readers to read the results of a parallel subplan.
     897                 :             :  *
     898                 :             :  * This is separate from ExecInitParallelPlan() because we can launch the
     899                 :             :  * worker processes and let them start doing something before we do this.
     900                 :             :  */
     901                 :             : void
     902                 :         161 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
     903                 :             : {
     904                 :         161 :         int                     nworkers = pei->pcxt->nworkers_launched;
     905                 :         161 :         int                     i;
     906                 :             : 
     907         [ +  - ]:         161 :         Assert(pei->reader == NULL);
     908                 :             : 
     909         [ +  - ]:         161 :         if (nworkers > 0)
     910                 :             :         {
     911                 :         161 :                 pei->reader = (TupleQueueReader **)
     912                 :         161 :                         palloc(nworkers * sizeof(TupleQueueReader *));
     913                 :             : 
     914         [ +  + ]:         597 :                 for (i = 0; i < nworkers; i++)
     915                 :             :                 {
     916                 :         872 :                         shm_mq_set_handle(pei->tqueue[i],
     917                 :         436 :                                                           pei->pcxt->worker[i].bgwhandle);
     918                 :         436 :                         pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
     919                 :         436 :                 }
     920                 :         161 :         }
     921                 :         161 : }
     922                 :             : 
     923                 :             : /*
     924                 :             :  * Re-initialize the parallel executor shared memory state before launching
     925                 :             :  * a fresh batch of workers.
     926                 :             :  */
     927                 :             : void
     928                 :          43 : ExecParallelReinitialize(PlanState *planstate,
     929                 :             :                                                  ParallelExecutorInfo *pei,
     930                 :             :                                                  Bitmapset *sendParams)
     931                 :             : {
     932                 :          43 :         EState     *estate = planstate->state;
     933                 :          43 :         FixedParallelExecutorState *fpes;
     934                 :             : 
     935                 :             :         /* Old workers must already be shut down */
     936         [ +  - ]:          43 :         Assert(pei->finished);
     937                 :             : 
     938                 :             :         /*
     939                 :             :          * Force any initplan outputs that we're going to pass to workers to be
     940                 :             :          * evaluated, if they weren't already (see comments in
     941                 :             :          * ExecInitParallelPlan).
     942                 :             :          */
     943         [ +  - ]:          43 :         ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     944                 :             : 
     945                 :          43 :         ReinitializeParallelDSM(pei->pcxt);
     946                 :          43 :         pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     947                 :          43 :         pei->reader = NULL;
     948                 :          43 :         pei->finished = false;
     949                 :             : 
     950                 :          43 :         fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
     951                 :             : 
     952                 :             :         /* Free any serialized parameters from the last round. */
     953         [ +  - ]:          43 :         if (DsaPointerIsValid(fpes->param_exec))
     954                 :             :         {
     955                 :           0 :                 dsa_free(pei->area, fpes->param_exec);
     956                 :           0 :                 fpes->param_exec = InvalidDsaPointer;
     957                 :           0 :         }
     958                 :             : 
     959                 :             :         /* Serialize current parameter values if required. */
     960         [ +  - ]:          43 :         if (!bms_is_empty(sendParams))
     961                 :             :         {
     962                 :           0 :                 pei->param_exec = SerializeParamExecParams(estate, sendParams,
     963                 :           0 :                                                                                                    pei->area);
     964                 :           0 :                 fpes->param_exec = pei->param_exec;
     965                 :           0 :         }
     966                 :             : 
     967                 :             :         /* Traverse plan tree and let each child node reset associated state. */
     968                 :          43 :         estate->es_query_dsa = pei->area;
     969                 :          43 :         ExecParallelReInitializeDSM(planstate, pei->pcxt);
     970                 :          43 :         estate->es_query_dsa = NULL;
     971                 :          43 : }
     972                 :             : 
     973                 :             : /*
     974                 :             :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
     975                 :             :  */
     976                 :             : static bool
     977                 :         111 : ExecParallelReInitializeDSM(PlanState *planstate,
     978                 :             :                                                         ParallelContext *pcxt)
     979                 :             : {
     980         [ +  - ]:         111 :         if (planstate == NULL)
     981                 :           0 :                 return false;
     982                 :             : 
     983                 :             :         /*
     984                 :             :          * Call reinitializers for DSM-using plan nodes.
     985                 :             :          */
     986   [ +  +  +  +  :         111 :         switch (nodeTag(planstate))
          +  -  -  -  -  
                   +  + ]
     987                 :             :         {
     988                 :             :                 case T_SeqScanState:
     989         [ +  + ]:          46 :                         if (planstate->plan->parallel_aware)
     990                 :          76 :                                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
     991                 :          38 :                                                                                    pcxt);
     992                 :          46 :                         break;
     993                 :             :                 case T_IndexScanState:
     994         [ -  + ]:           2 :                         if (planstate->plan->parallel_aware)
     995                 :           4 :                                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
     996                 :           2 :                                                                                          pcxt);
     997                 :           2 :                         break;
     998                 :             :                 case T_IndexOnlyScanState:
     999         [ -  + ]:           2 :                         if (planstate->plan->parallel_aware)
    1000                 :           4 :                                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
    1001                 :           2 :                                                                                                  pcxt);
    1002                 :           2 :                         break;
    1003                 :             :                 case T_ForeignScanState:
    1004         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
    1005                 :           0 :                                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
    1006                 :           0 :                                                                                            pcxt);
    1007                 :           0 :                         break;
    1008                 :             :                 case T_TidRangeScanState:
    1009         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
    1010                 :           0 :                                 ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
    1011                 :           0 :                                                                                                 pcxt);
    1012                 :           0 :                         break;
    1013                 :             :                 case T_AppendState:
    1014         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
    1015                 :           0 :                                 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
    1016                 :           0 :                         break;
    1017                 :             :                 case T_CustomScanState:
    1018         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
    1019                 :           0 :                                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
    1020                 :           0 :                                                                                           pcxt);
    1021                 :           0 :                         break;
    1022                 :             :                 case T_BitmapHeapScanState:
    1023         [ -  + ]:           9 :                         if (planstate->plan->parallel_aware)
    1024                 :          18 :                                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
    1025                 :           9 :                                                                                           pcxt);
    1026                 :           9 :                         break;
    1027                 :             :                 case T_HashJoinState:
    1028         [ +  + ]:          16 :                         if (planstate->plan->parallel_aware)
    1029                 :          16 :                                 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
    1030                 :           8 :                                                                                         pcxt);
    1031                 :          16 :                         break;
    1032                 :             :                 case T_BitmapIndexScanState:
    1033                 :             :                 case T_HashState:
    1034                 :             :                 case T_SortState:
    1035                 :             :                 case T_IncrementalSortState:
    1036                 :             :                 case T_MemoizeState:
    1037                 :             :                         /* these nodes have DSM state, but no reinitialization is required */
    1038                 :          30 :                         break;
    1039                 :             : 
    1040                 :             :                 default:
    1041                 :           6 :                         break;
    1042                 :             :         }
    1043                 :             : 
    1044                 :         111 :         return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
    1045                 :         111 : }
    1046                 :             : 
    1047                 :             : /*
    1048                 :             :  * Copy instrumentation information about this node and its descendants from
    1049                 :             :  * dynamic shared memory.
    1050                 :             :  */
    1051                 :             : static bool
    1052                 :         171 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
    1053                 :             :                                                                         SharedExecutorInstrumentation *instrumentation)
    1054                 :             : {
    1055                 :         171 :         Instrumentation *instrument;
    1056                 :         171 :         int                     i;
    1057                 :         171 :         int                     n;
    1058                 :         171 :         int                     ibytes;
    1059                 :         171 :         int                     plan_node_id = planstate->plan->plan_node_id;
    1060                 :         171 :         MemoryContext oldcontext;
    1061                 :             : 
    1062                 :             :         /* Find the instrumentation for this node. */
    1063         [ -  + ]:         773 :         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1064         [ +  + ]:         773 :                 if (instrumentation->plan_node_id[i] == plan_node_id)
    1065                 :         171 :                         break;
    1066         [ +  - ]:         171 :         if (i >= instrumentation->num_plan_nodes)
    1067   [ #  #  #  # ]:           0 :                 elog(ERROR, "plan node %d not found", plan_node_id);
    1068                 :             : 
    1069                 :             :         /* Accumulate the statistics from all workers. */
    1070                 :         171 :         instrument = GetInstrumentationArray(instrumentation);
    1071                 :         171 :         instrument += i * instrumentation->num_workers;
    1072         [ +  + ]:         451 :         for (n = 0; n < instrumentation->num_workers; ++n)
    1073                 :         280 :                 InstrAggNode(planstate->instrument, &instrument[n]);
    1074                 :             : 
    1075                 :             :         /*
    1076                 :             :          * Also store the per-worker detail.
    1077                 :             :          *
    1078                 :             :          * Worker instrumentation should be allocated in the same context as the
    1079                 :             :          * regular instrumentation information, which is the per-query context.
    1080                 :             :          * Switch into per-query memory context.
    1081                 :             :          */
    1082                 :         171 :         oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
    1083                 :         171 :         ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
    1084                 :         171 :         planstate->worker_instrument =
    1085                 :         171 :                 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
    1086                 :         171 :         MemoryContextSwitchTo(oldcontext);
    1087                 :             : 
    1088                 :         171 :         planstate->worker_instrument->num_workers = instrumentation->num_workers;
    1089                 :         171 :         memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
    1090                 :             : 
    1091                 :             :         /* Perform any node-type-specific work that needs to be done. */
    1092   [ +  +  +  -  :         171 :         switch (nodeTag(planstate))
          -  -  +  +  -  
                      - ]
    1093                 :             :         {
    1094                 :             :                 case T_IndexScanState:
    1095                 :          45 :                         ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
    1096                 :          45 :                         break;
    1097                 :             :                 case T_IndexOnlyScanState:
    1098                 :           0 :                         ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
    1099                 :           0 :                         break;
    1100                 :             :                 case T_BitmapIndexScanState:
    1101                 :           0 :                         ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
    1102                 :           0 :                         break;
    1103                 :             :                 case T_SortState:
    1104                 :           2 :                         ExecSortRetrieveInstrumentation((SortState *) planstate);
    1105                 :           2 :                         break;
    1106                 :             :                 case T_IncrementalSortState:
    1107                 :           0 :                         ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
    1108                 :           0 :                         break;
    1109                 :             :                 case T_HashState:
    1110                 :          14 :                         ExecHashRetrieveInstrumentation((HashState *) planstate);
    1111                 :          14 :                         break;
    1112                 :             :                 case T_AggState:
    1113                 :          17 :                         ExecAggRetrieveInstrumentation((AggState *) planstate);
    1114                 :          17 :                         break;
    1115                 :             :                 case T_MemoizeState:
    1116                 :           0 :                         ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
    1117                 :           0 :                         break;
    1118                 :             :                 case T_BitmapHeapScanState:
    1119                 :           0 :                         ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
    1120                 :           0 :                         break;
    1121                 :             :                 default:
    1122                 :          93 :                         break;
    1123                 :             :         }
    1124                 :             : 
    1125                 :         342 :         return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
    1126                 :             :                                                                  instrumentation);
    1127                 :         171 : }
    1128                 :             : 
    1129                 :             : /*
    1130                 :             :  * Add up the workers' JIT instrumentation from dynamic shared memory.
    1131                 :             :  */
    1132                 :             : static void
    1133                 :           0 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
    1134                 :             :                                                                            SharedJitInstrumentation *shared_jit)
    1135                 :             : {
    1136                 :           0 :         JitInstrumentation *combined;
    1137                 :           0 :         int                     ibytes;
    1138                 :             : 
    1139                 :           0 :         int                     n;
    1140                 :             : 
    1141                 :             :         /*
    1142                 :             :          * Accumulate worker JIT instrumentation into the combined JIT
    1143                 :             :          * instrumentation, allocating it if required.
    1144                 :             :          */
    1145         [ #  # ]:           0 :         if (!planstate->state->es_jit_worker_instr)
    1146                 :           0 :                 planstate->state->es_jit_worker_instr =
    1147                 :           0 :                         MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
    1148                 :           0 :         combined = planstate->state->es_jit_worker_instr;
    1149                 :             : 
    1150                 :             :         /* Accumulate all the workers' instrumentations. */
    1151         [ #  # ]:           0 :         for (n = 0; n < shared_jit->num_workers; ++n)
    1152                 :           0 :                 InstrJitAgg(combined, &shared_jit->jit_instr[n]);
    1153                 :             : 
    1154                 :             :         /*
    1155                 :             :          * Store the per-worker detail.
    1156                 :             :          *
    1157                 :             :          * Similar to ExecParallelRetrieveInstrumentation(), allocate the
    1158                 :             :          * instrumentation in per-query context.
    1159                 :             :          */
    1160                 :           0 :         ibytes = offsetof(SharedJitInstrumentation, jit_instr)
    1161                 :           0 :                 + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
    1162                 :           0 :         planstate->worker_jit_instrument =
    1163                 :           0 :                 MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
    1164                 :             : 
    1165                 :           0 :         memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
    1166                 :           0 : }
    1167                 :             : 
    1168                 :             : /*
    1169                 :             :  * Finish parallel execution.  We wait for parallel workers to finish, and
    1170                 :             :  * accumulate their buffer/WAL usage.
    1171                 :             :  */
    1172                 :             : void
    1173                 :         295 : ExecParallelFinish(ParallelExecutorInfo *pei)
    1174                 :             : {
    1175                 :         295 :         int                     nworkers = pei->pcxt->nworkers_launched;
    1176                 :         295 :         int                     i;
    1177                 :             : 
    1178                 :             :         /* Make this be a no-op if called twice in a row. */
    1179         [ +  + ]:         295 :         if (pei->finished)
    1180                 :         133 :                 return;
    1181                 :             : 
    1182                 :             :         /*
    1183                 :             :          * Detach from tuple queues ASAP, so that any still-active workers will
    1184                 :             :          * notice that no further results are wanted.
    1185                 :             :          */
    1186         [ -  + ]:         162 :         if (pei->tqueue != NULL)
    1187                 :             :         {
    1188         [ +  + ]:         596 :                 for (i = 0; i < nworkers; i++)
    1189                 :         434 :                         shm_mq_detach(pei->tqueue[i]);
    1190                 :         162 :                 pfree(pei->tqueue);
    1191                 :         162 :                 pei->tqueue = NULL;
    1192                 :         162 :         }
    1193                 :             : 
    1194                 :             :         /*
    1195                 :             :          * While we're waiting for the workers to finish, let's get rid of the
    1196                 :             :          * tuple queue readers.  (Any other local cleanup could be done here too.)
    1197                 :             :          */
    1198         [ +  + ]:         162 :         if (pei->reader != NULL)
    1199                 :             :         {
    1200         [ +  + ]:         593 :                 for (i = 0; i < nworkers; i++)
    1201                 :         434 :                         DestroyTupleQueueReader(pei->reader[i]);
    1202                 :         159 :                 pfree(pei->reader);
    1203                 :         159 :                 pei->reader = NULL;
    1204                 :         159 :         }
    1205                 :             : 
    1206                 :             :         /* Now wait for the workers to finish. */
    1207                 :         162 :         WaitForParallelWorkersToFinish(pei->pcxt);
    1208                 :             : 
    1209                 :             :         /*
    1210                 :             :          * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
    1211                 :             :          * finish, or we might get incomplete data.)
    1212                 :             :          */
    1213         [ +  + ]:         596 :         for (i = 0; i < nworkers; i++)
    1214                 :         434 :                 InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
    1215                 :             : 
    1216                 :         162 :         pei->finished = true;
    1217         [ -  + ]:         295 : }
    1218                 :             : 
    1219                 :             : /*
    1220                 :             :  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
    1221                 :             :  * resources still exist after ExecParallelFinish.  We separate these
    1222                 :             :  * routines because someone might want to examine the contents of the DSM
    1223                 :             :  * after ExecParallelFinish and before calling this routine.
    1224                 :             :  */
    1225                 :             : void
    1226                 :         119 : ExecParallelCleanup(ParallelExecutorInfo *pei)
    1227                 :             : {
    1228                 :             :         /* Accumulate instrumentation, if any. */
    1229         [ +  + ]:         119 :         if (pei->instrumentation)
    1230                 :          60 :                 ExecParallelRetrieveInstrumentation(pei->planstate,
    1231                 :          30 :                                                                                         pei->instrumentation);
    1232                 :             : 
    1233                 :             :         /* Accumulate JIT instrumentation, if any. */
    1234         [ +  - ]:         119 :         if (pei->jit_instrumentation)
    1235                 :           0 :                 ExecParallelRetrieveJitInstrumentation(pei->planstate,
    1236                 :           0 :                                                                                            pei->jit_instrumentation);
    1237                 :             : 
    1238                 :             :         /* Free any serialized parameters. */
    1239         [ +  + ]:         119 :         if (DsaPointerIsValid(pei->param_exec))
    1240                 :             :         {
    1241                 :           4 :                 dsa_free(pei->area, pei->param_exec);
    1242                 :           4 :                 pei->param_exec = InvalidDsaPointer;
    1243                 :           4 :         }
    1244         [ -  + ]:         119 :         if (pei->area != NULL)
    1245                 :             :         {
    1246                 :         119 :                 dsa_detach(pei->area);
    1247                 :         119 :                 pei->area = NULL;
    1248                 :         119 :         }
    1249         [ -  + ]:         119 :         if (pei->pcxt != NULL)
    1250                 :             :         {
    1251                 :         119 :                 DestroyParallelContext(pei->pcxt);
    1252                 :         119 :                 pei->pcxt = NULL;
    1253                 :         119 :         }
    1254                 :         119 :         pfree(pei);
    1255                 :         119 : }
    1256                 :             : 
    1257                 :             : /*
    1258                 :             :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
    1259                 :             :  * for that purpose.
    1260                 :             :  */
    1261                 :             : static DestReceiver *
    1262                 :         436 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
    1263                 :             : {
    1264                 :         436 :         char       *mqspace;
    1265                 :         436 :         shm_mq     *mq;
    1266                 :             : 
    1267                 :         436 :         mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    1268                 :         436 :         mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    1269                 :         436 :         mq = (shm_mq *) mqspace;
    1270                 :         436 :         shm_mq_set_sender(mq, MyProc);
    1271                 :         872 :         return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
    1272                 :         436 : }
    1273                 :             : 
    1274                 :             : /*
    1275                 :             :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
    1276                 :             :  */
    1277                 :             : static QueryDesc *
    1278                 :         436 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    1279                 :             :                                                  int instrument_options)
    1280                 :             : {
    1281                 :         436 :         char       *pstmtspace;
    1282                 :         436 :         char       *paramspace;
    1283                 :         436 :         PlannedStmt *pstmt;
    1284                 :         436 :         ParamListInfo paramLI;
    1285                 :         436 :         char       *queryString;
    1286                 :             : 
    1287                 :             :         /* Get the query string from shared memory */
    1288                 :         436 :         queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
    1289                 :             : 
    1290                 :             :         /* Reconstruct leader-supplied PlannedStmt. */
    1291                 :         436 :         pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
    1292                 :         436 :         pstmt = (PlannedStmt *) stringToNode(pstmtspace);
    1293                 :             : 
    1294                 :             :         /* Reconstruct ParamListInfo. */
    1295                 :         436 :         paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
    1296                 :         436 :         paramLI = RestoreParamList(&paramspace);
    1297                 :             : 
    1298                 :             :         /* Create a QueryDesc for the query. */
    1299                 :        1308 :         return CreateQueryDesc(pstmt,
    1300                 :         436 :                                                    queryString,
    1301                 :         436 :                                                    GetActiveSnapshot(), InvalidSnapshot,
    1302                 :         436 :                                                    receiver, paramLI, NULL, instrument_options);
    1303                 :         436 : }
    1304                 :             : 
    1305                 :             : /*
    1306                 :             :  * Copy instrumentation information from this node and its descendants into
    1307                 :             :  * dynamic shared memory, so that the parallel leader can retrieve it.
    1308                 :             :  */
    1309                 :             : static bool
    1310                 :         396 : ExecParallelReportInstrumentation(PlanState *planstate,
    1311                 :             :                                                                   SharedExecutorInstrumentation *instrumentation)
    1312                 :             : {
    1313                 :         396 :         int                     i;
    1314                 :         396 :         int                     plan_node_id = planstate->plan->plan_node_id;
    1315                 :         396 :         Instrumentation *instrument;
    1316                 :             : 
    1317                 :         396 :         InstrEndLoop(planstate->instrument);
    1318                 :             : 
    1319                 :             :         /*
    1320                 :             :          * If we shuffled the plan_node_id values in ps_instrument into sorted
    1321                 :             :          * order, we could use binary search here.  This might matter someday if
    1322                 :             :          * we're pushing down sufficiently large plan trees.  For now, do it the
    1323                 :             :          * slow, dumb way.
    1324                 :             :          */
    1325         [ -  + ]:        1302 :         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1326         [ +  + ]:        1302 :                 if (instrumentation->plan_node_id[i] == plan_node_id)
    1327                 :         396 :                         break;
    1328         [ +  - ]:         396 :         if (i >= instrumentation->num_plan_nodes)
    1329   [ #  #  #  # ]:           0 :                 elog(ERROR, "plan node %d not found", plan_node_id);
    1330                 :             : 
    1331                 :             :         /*
    1332                 :             :          * Add our statistics to the per-node, per-worker totals.  It's possible
    1333                 :             :          * that this could happen more than once if we relaunched workers.
    1334                 :             :          */
    1335                 :         396 :         instrument = GetInstrumentationArray(instrumentation);
    1336                 :         396 :         instrument += i * instrumentation->num_workers;
    1337         [ +  - ]:         396 :         Assert(IsParallelWorker());
    1338         [ +  - ]:         396 :         Assert(ParallelWorkerNumber < instrumentation->num_workers);
    1339                 :         396 :         InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
    1340                 :             : 
    1341                 :         792 :         return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
    1342                 :             :                                                                  instrumentation);
    1343                 :         396 : }
    1344                 :             : 
    1345                 :             : /*
    1346                 :             :  * Initialize the PlanState and its descendants with the information
    1347                 :             :  * retrieved from shared memory.  This has to be done once the PlanState
    1348                 :             :  * is allocated and initialized by executor; that is, after ExecutorStart().
    1349                 :             :  */
    1350                 :             : static bool
    1351                 :        1399 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
    1352                 :             : {
    1353         [ +  - ]:        1399 :         if (planstate == NULL)
    1354                 :           0 :                 return false;
    1355                 :             : 
    1356   [ +  +  +  +  :        1399 :         switch (nodeTag(planstate))
          +  +  +  +  +  
          -  +  +  -  +  
                   -  + ]
    1357                 :             :         {
    1358                 :             :                 case T_SeqScanState:
    1359         [ +  + ]:         556 :                         if (planstate->plan->parallel_aware)
    1360                 :         452 :                                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
    1361                 :         556 :                         break;
    1362                 :             :                 case T_IndexScanState:
    1363                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1364                 :          66 :                         ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
    1365                 :          66 :                         break;
    1366                 :             :                 case T_IndexOnlyScanState:
    1367                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1368                 :          78 :                         ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
    1369                 :          39 :                                                                                           pwcxt);
    1370                 :          39 :                         break;
    1371                 :             :                 case T_BitmapIndexScanState:
    1372                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1373                 :          90 :                         ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
    1374                 :          45 :                                                                                                 pwcxt);
    1375                 :          45 :                         break;
    1376                 :             :                 case T_ForeignScanState:
    1377         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
    1378                 :           0 :                                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
    1379                 :           0 :                                                                                                 pwcxt);
    1380                 :           0 :                         break;
    1381                 :             :                 case T_TidRangeScanState:
    1382         [ -  + ]:          16 :                         if (planstate->plan->parallel_aware)
    1383                 :          32 :                                 ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
    1384                 :          16 :                                                                                                  pwcxt);
    1385                 :          16 :                         break;
    1386                 :             :                 case T_AppendState:
    1387         [ +  + ]:          63 :                         if (planstate->plan->parallel_aware)
    1388                 :          53 :                                 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
    1389                 :          63 :                         break;
    1390                 :             :                 case T_CustomScanState:
    1391         [ #  # ]:           0 :                         if (planstate->plan->parallel_aware)
    1392                 :           0 :                                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
    1393                 :           0 :                                                                                            pwcxt);
    1394                 :           0 :                         break;
    1395                 :             :                 case T_BitmapHeapScanState:
    1396         [ -  + ]:          45 :                         if (planstate->plan->parallel_aware)
    1397                 :          90 :                                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
    1398                 :          45 :                                                                                            pwcxt);
    1399                 :          45 :                         break;
    1400                 :             :                 case T_HashJoinState:
    1401         [ +  + ]:          94 :                         if (planstate->plan->parallel_aware)
    1402                 :         108 :                                 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
    1403                 :          54 :                                                                                          pwcxt);
    1404                 :          94 :                         break;
    1405                 :             :                 case T_HashState:
    1406                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1407                 :          94 :                         ExecHashInitializeWorker((HashState *) planstate, pwcxt);
    1408                 :          94 :                         break;
    1409                 :             :                 case T_SortState:
    1410                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1411                 :          77 :                         ExecSortInitializeWorker((SortState *) planstate, pwcxt);
    1412                 :          77 :                         break;
    1413                 :             :                 case T_IncrementalSortState:
    1414                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1415                 :           0 :                         ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
    1416                 :           0 :                                                                                                 pwcxt);
    1417                 :           0 :                         break;
    1418                 :             :                 case T_AggState:
    1419                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1420                 :         271 :                         ExecAggInitializeWorker((AggState *) planstate, pwcxt);
    1421                 :         271 :                         break;
    1422                 :             :                 case T_MemoizeState:
    1423                 :             :                         /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1424                 :           2 :                         ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
    1425                 :           2 :                         break;
    1426                 :             :                 default:
    1427                 :          31 :                         break;
    1428                 :             :         }
    1429                 :             : 
    1430                 :        1399 :         return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
    1431                 :             :                                                                  pwcxt);
    1432                 :        1399 : }
    1433                 :             : 
    1434                 :             : /*
    1435                 :             :  * Main entrypoint for parallel query worker processes.
    1436                 :             :  *
    1437                 :             :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1438                 :             :  * create a sensible parallel environment has already been done;
    1439                 :             :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1440                 :             :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1441                 :             :  * here.
    1442                 :             :  *
    1443                 :             :  * Our job is to deal with concerns specific to the executor.  The parallel
    1444                 :             :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1445                 :             :  * to execute that plan and write the resulting tuples to the appropriate
    1446                 :             :  * tuple queue.  Various bits of supporting information that we need in order
    1447                 :             :  * to do this are also stored in the dsm_segment and can be accessed through
    1448                 :             :  * the shm_toc.
    1449                 :             :  */
    1450                 :             : void
    1451                 :         436 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1452                 :             : {
    1453                 :         436 :         FixedParallelExecutorState *fpes;
    1454                 :         436 :         BufferUsage *buffer_usage;
    1455                 :         436 :         WalUsage   *wal_usage;
    1456                 :         436 :         DestReceiver *receiver;
    1457                 :         436 :         QueryDesc  *queryDesc;
    1458                 :         436 :         SharedExecutorInstrumentation *instrumentation;
    1459                 :         436 :         SharedJitInstrumentation *jit_instrumentation;
    1460                 :         436 :         int                     instrument_options = 0;
    1461                 :         436 :         void       *area_space;
    1462                 :         436 :         dsa_area   *area;
    1463                 :         436 :         ParallelWorkerContext pwcxt;
    1464                 :             : 
    1465                 :             :         /* Get fixed-size state. */
    1466                 :         436 :         fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1467                 :             : 
    1468                 :             :         /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1469                 :         436 :         receiver = ExecParallelGetReceiver(seg, toc);
    1470                 :         436 :         instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1471         [ +  + ]:         436 :         if (instrumentation != NULL)
    1472                 :         121 :                 instrument_options = instrumentation->instrument_options;
    1473                 :         436 :         jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
    1474                 :             :                                                                                  true);
    1475                 :         436 :         queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1476                 :             : 
    1477                 :             :         /* Setting debug_query_string for individual workers */
    1478                 :         436 :         debug_query_string = queryDesc->sourceText;
    1479                 :             : 
    1480                 :             :         /* Report workers' query for monitoring purposes */
    1481                 :         436 :         pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1482                 :             : 
    1483                 :             :         /* Attach to the dynamic shared memory area. */
    1484                 :         436 :         area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1485                 :         436 :         area = dsa_attach_in_place(area_space, seg);
    1486                 :             : 
    1487                 :             :         /* Start up the executor */
    1488                 :         436 :         queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
    1489                 :         436 :         ExecutorStart(queryDesc, fpes->eflags);
    1490                 :             : 
    1491                 :             :         /* Special executor initialization steps for parallel workers */
    1492                 :         436 :         queryDesc->planstate->state->es_query_dsa = area;
    1493         [ +  + ]:         436 :         if (DsaPointerIsValid(fpes->param_exec))
    1494                 :             :         {
    1495                 :          12 :                 char       *paramexec_space;
    1496                 :             : 
    1497                 :          12 :                 paramexec_space = dsa_get_address(area, fpes->param_exec);
    1498                 :          12 :                 RestoreParamExecParams(paramexec_space, queryDesc->estate);
    1499                 :          12 :         }
    1500                 :         436 :         pwcxt.toc = toc;
    1501                 :         436 :         pwcxt.seg = seg;
    1502                 :         436 :         ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
    1503                 :             : 
    1504                 :             :         /* Pass down any tuple bound */
    1505                 :         436 :         ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1506                 :             : 
    1507                 :             :         /*
    1508                 :             :          * Prepare to track buffer/WAL usage during query execution.
    1509                 :             :          *
    1510                 :             :          * We do this after starting up the executor to match what happens in the
    1511                 :             :          * leader, which also doesn't count buffer accesses and WAL activity that
    1512                 :             :          * occur during executor startup.
    1513                 :             :          */
    1514                 :         436 :         InstrStartParallelQuery();
    1515                 :             : 
    1516                 :             :         /*
    1517                 :             :          * Run the plan.  If we specified a tuple bound, be careful not to demand
    1518                 :             :          * more tuples than that.
    1519                 :             :          */
    1520                 :         872 :         ExecutorRun(queryDesc,
    1521                 :             :                                 ForwardScanDirection,
    1522         [ +  + ]:         436 :                                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
    1523                 :             : 
    1524                 :             :         /* Shut down the executor */
    1525                 :         436 :         ExecutorFinish(queryDesc);
    1526                 :             : 
    1527                 :             :         /* Report buffer/WAL usage during parallel execution. */
    1528                 :         436 :         buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1529                 :         436 :         wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
    1530                 :         872 :         InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1531                 :         436 :                                                   &wal_usage[ParallelWorkerNumber]);
    1532                 :             : 
    1533                 :             :         /* Report instrumentation data if any instrumentation options are set. */
    1534         [ +  + ]:         436 :         if (instrumentation != NULL)
    1535                 :         242 :                 ExecParallelReportInstrumentation(queryDesc->planstate,
    1536                 :         121 :                                                                                   instrumentation);
    1537                 :             : 
    1538                 :             :         /* Report JIT instrumentation data if any */
    1539   [ -  +  #  # ]:         436 :         if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
    1540                 :             :         {
    1541         [ #  # ]:           0 :                 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
    1542                 :           0 :                 jit_instrumentation->jit_instr[ParallelWorkerNumber] =
    1543                 :           0 :                         queryDesc->estate->es_jit->instr;
    1544                 :           0 :         }
    1545                 :             : 
    1546                 :             :         /* Must do this after capturing instrumentation. */
    1547                 :         436 :         ExecutorEnd(queryDesc);
    1548                 :             : 
    1549                 :             :         /* Cleanup. */
    1550                 :         436 :         dsa_detach(area);
    1551                 :         436 :         FreeQueryDesc(queryDesc);
    1552                 :         436 :         receiver->rDestroy(receiver);
    1553                 :         436 : }
        

Generated by: LCOV version 2.3.2-1