Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execParallel.c
4 : : * Support routines for parallel execution.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * This file contains routines that are intended to support setting up,
10 : : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : : * executor. The ParallelContext machinery will handle starting the
12 : : * workers and ensuring that their state generally matches that of the
13 : : * leader; see src/backend/access/transam/README.parallel for details.
14 : : * However, we must save and restore relevant executor state, such as
15 : : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : : * the actual plan to be passed down to the worker.
17 : : *
18 : : * IDENTIFICATION
19 : : * src/backend/executor/execParallel.c
20 : : *
21 : : *-------------------------------------------------------------------------
22 : : */
23 : :
24 : : #include "postgres.h"
25 : :
26 : : #include "executor/execParallel.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/nodeAgg.h"
29 : : #include "executor/nodeAppend.h"
30 : : #include "executor/nodeBitmapHeapscan.h"
31 : : #include "executor/nodeBitmapIndexscan.h"
32 : : #include "executor/nodeCustom.h"
33 : : #include "executor/nodeForeignscan.h"
34 : : #include "executor/nodeHash.h"
35 : : #include "executor/nodeHashjoin.h"
36 : : #include "executor/nodeIncrementalSort.h"
37 : : #include "executor/nodeIndexonlyscan.h"
38 : : #include "executor/nodeIndexscan.h"
39 : : #include "executor/nodeMemoize.h"
40 : : #include "executor/nodeSeqscan.h"
41 : : #include "executor/nodeSort.h"
42 : : #include "executor/nodeSubplan.h"
43 : : #include "executor/nodeTidrangescan.h"
44 : : #include "executor/tqueue.h"
45 : : #include "jit/jit.h"
46 : : #include "nodes/nodeFuncs.h"
47 : : #include "pgstat.h"
48 : : #include "tcop/tcopprot.h"
49 : : #include "utils/datum.h"
50 : : #include "utils/dsa.h"
51 : : #include "utils/lsyscache.h"
52 : : #include "utils/snapmgr.h"
53 : :
54 : : /*
55 : : * Magic numbers for parallel executor communication. We use constants
56 : : * greater than any 32-bit integer here so that values < 2^32 can be used
57 : : * by individual parallel nodes to store their own state.
58 : : */
59 : : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
60 : : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
61 : : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
62 : : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
63 : : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
64 : : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
65 : : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
66 : : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
67 : : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68 : : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
69 : :
70 : : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
71 : :
72 : : /*
73 : : * Fixed-size random stuff that we need to pass to parallel workers.
74 : : */
75 : : typedef struct FixedParallelExecutorState
76 : : {
77 : : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
78 : : dsa_pointer param_exec;
79 : : int eflags;
80 : : int jit_flags;
81 : : } FixedParallelExecutorState;
82 : :
83 : : /*
84 : : * DSM structure for accumulating per-PlanState instrumentation.
85 : : *
86 : : * instrument_options: Same meaning here as in instrument.c.
87 : : *
88 : : * instrument_offset: Offset, relative to the start of this structure,
89 : : * of the first Instrumentation object. This will depend on the length of
90 : : * the plan_node_id array.
91 : : *
92 : : * num_workers: Number of workers.
93 : : *
94 : : * num_plan_nodes: Number of plan nodes.
95 : : *
96 : : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
97 : : * from parallel workers. The length of this array is given by num_plan_nodes.
98 : : */
99 : : struct SharedExecutorInstrumentation
100 : : {
101 : : int instrument_options;
102 : : int instrument_offset;
103 : : int num_workers;
104 : : int num_plan_nodes;
105 : : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
106 : : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
107 : : };
108 : : #define GetInstrumentationArray(sei) \
109 : : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 : : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
111 : :
112 : : /* Context object for ExecParallelEstimate. */
113 : : typedef struct ExecParallelEstimateContext
114 : : {
115 : : ParallelContext *pcxt;
116 : : int nnodes;
117 : : } ExecParallelEstimateContext;
118 : :
119 : : /* Context object for ExecParallelInitializeDSM. */
120 : : typedef struct ExecParallelInitializeDSMContext
121 : : {
122 : : ParallelContext *pcxt;
123 : : SharedExecutorInstrumentation *instrumentation;
124 : : int nnodes;
125 : : } ExecParallelInitializeDSMContext;
126 : :
127 : : /* Helper functions that run in the parallel leader. */
128 : : static char *ExecSerializePlan(Plan *plan, EState *estate);
129 : : static bool ExecParallelEstimate(PlanState *planstate,
130 : : ExecParallelEstimateContext *e);
131 : : static bool ExecParallelInitializeDSM(PlanState *planstate,
132 : : ExecParallelInitializeDSMContext *d);
133 : : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
134 : : bool reinitialize);
135 : : static bool ExecParallelReInitializeDSM(PlanState *planstate,
136 : : ParallelContext *pcxt);
137 : : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
138 : : SharedExecutorInstrumentation *instrumentation);
139 : :
140 : : /* Helper function that runs in the parallel worker. */
141 : : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
142 : :
143 : : /*
144 : : * Create a serialized representation of the plan to be sent to each worker.
145 : : */
146 : : static char *
147 : 121 : ExecSerializePlan(Plan *plan, EState *estate)
148 : : {
149 : 121 : PlannedStmt *pstmt;
150 : 121 : ListCell *lc;
151 : :
152 : : /* We can't scribble on the original plan, so make a copy. */
153 : 121 : plan = copyObject(plan);
154 : :
155 : : /*
156 : : * The worker will start its own copy of the executor, and that copy will
157 : : * insert a junk filter if the toplevel node has any resjunk entries. We
158 : : * don't want that to happen, because while resjunk columns shouldn't be
159 : : * sent back to the user, here the tuples are coming back to another
160 : : * backend which may very well need them. So mutate the target list
161 : : * accordingly. This is sort of a hack; there might be better ways to do
162 : : * this...
163 : : */
164 [ + + + + : 334 : foreach(lc, plan->targetlist)
+ + ]
165 : : {
166 : 213 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
167 : :
168 : 213 : tle->resjunk = false;
169 : 213 : }
170 : :
171 : : /*
172 : : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
173 : : * for our purposes, but the worker will need at least a minimal
174 : : * PlannedStmt to start the executor.
175 : : */
176 : 121 : pstmt = makeNode(PlannedStmt);
177 : 121 : pstmt->commandType = CMD_SELECT;
178 : 121 : pstmt->queryId = pgstat_get_my_query_id();
179 : 121 : pstmt->planId = pgstat_get_my_plan_id();
180 : 121 : pstmt->hasReturning = false;
181 : 121 : pstmt->hasModifyingCTE = false;
182 : 121 : pstmt->canSetTag = true;
183 : 121 : pstmt->transientPlan = false;
184 : 121 : pstmt->dependsOnRole = false;
185 : 121 : pstmt->parallelModeNeeded = false;
186 : 121 : pstmt->planTree = plan;
187 : 121 : pstmt->partPruneInfos = estate->es_part_prune_infos;
188 : 121 : pstmt->rtable = estate->es_range_table;
189 : 121 : pstmt->unprunableRelids = estate->es_unpruned_relids;
190 : 121 : pstmt->permInfos = estate->es_rteperminfos;
191 : 121 : pstmt->resultRelations = NIL;
192 : 121 : pstmt->appendRelations = NIL;
193 : 121 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
194 : :
195 : : /*
196 : : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
197 : : * for unsafe ones (so that the list indexes of the safe ones are
198 : : * preserved). This positively ensures that the worker won't try to run,
199 : : * or even do ExecInitNode on, an unsafe subplan. That's important to
200 : : * protect, eg, non-parallel-aware FDWs from getting into trouble.
201 : : */
202 : 121 : pstmt->subplans = NIL;
203 [ + + + + : 130 : foreach(lc, estate->es_plannedstmt->subplans)
+ + ]
204 : : {
205 : 9 : Plan *subplan = (Plan *) lfirst(lc);
206 : :
207 [ + - + + ]: 9 : if (subplan && !subplan->parallel_safe)
208 : 2 : subplan = NULL;
209 : 9 : pstmt->subplans = lappend(pstmt->subplans, subplan);
210 : 9 : }
211 : :
212 : 121 : pstmt->rewindPlanIDs = NULL;
213 : 121 : pstmt->rowMarks = NIL;
214 : 121 : pstmt->relationOids = NIL;
215 : 121 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
216 : 121 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
217 : 121 : pstmt->utilityStmt = NULL;
218 : 121 : pstmt->stmt_location = -1;
219 : 121 : pstmt->stmt_len = -1;
220 : :
221 : : /* Return serialized copy of our dummy PlannedStmt. */
222 : 242 : return nodeToString(pstmt);
223 : 121 : }
224 : :
225 : : /*
226 : : * Parallel-aware plan nodes (and occasionally others) may need some state
227 : : * which is shared across all parallel workers. Before we size the DSM, give
228 : : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
229 : : * &pcxt->estimator.
230 : : *
231 : : * While we're at it, count the number of PlanState nodes in the tree, so
232 : : * we know how many Instrumentation structures we need.
233 : : */
234 : : static bool
235 : 500 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
236 : : {
237 [ + - ]: 500 : if (planstate == NULL)
238 : 0 : return false;
239 : :
240 : : /* Count this node. */
241 : 500 : e->nnodes++;
242 : :
243 [ + + + + : 500 : switch (nodeTag(planstate))
+ + + + +
- + + - +
- + ]
244 : : {
245 : : case T_SeqScanState:
246 [ + + ]: 190 : if (planstate->plan->parallel_aware)
247 : 302 : ExecSeqScanEstimate((SeqScanState *) planstate,
248 : 151 : e->pcxt);
249 : 190 : break;
250 : : case T_IndexScanState:
251 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
252 : 98 : ExecIndexScanEstimate((IndexScanState *) planstate,
253 : 49 : e->pcxt);
254 : 49 : break;
255 : : case T_IndexOnlyScanState:
256 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
257 : 18 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
258 : 9 : e->pcxt);
259 : 9 : break;
260 : : case T_BitmapIndexScanState:
261 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
262 : 6 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
263 : 3 : e->pcxt);
264 : 3 : break;
265 : : case T_ForeignScanState:
266 [ # # ]: 0 : if (planstate->plan->parallel_aware)
267 : 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
268 : 0 : e->pcxt);
269 : 0 : break;
270 : : case T_TidRangeScanState:
271 [ - + ]: 4 : if (planstate->plan->parallel_aware)
272 : 8 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
273 : 4 : e->pcxt);
274 : 4 : break;
275 : : case T_AppendState:
276 [ + + ]: 31 : if (planstate->plan->parallel_aware)
277 : 46 : ExecAppendEstimate((AppendState *) planstate,
278 : 23 : e->pcxt);
279 : 31 : break;
280 : : case T_CustomScanState:
281 [ # # ]: 0 : if (planstate->plan->parallel_aware)
282 : 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
283 : 0 : e->pcxt);
284 : 0 : break;
285 : : case T_BitmapHeapScanState:
286 [ - + ]: 3 : if (planstate->plan->parallel_aware)
287 : 6 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
288 : 3 : e->pcxt);
289 : 3 : break;
290 : : case T_HashJoinState:
291 [ + + ]: 33 : if (planstate->plan->parallel_aware)
292 : 42 : ExecHashJoinEstimate((HashJoinState *) planstate,
293 : 21 : e->pcxt);
294 : 33 : break;
295 : : case T_HashState:
296 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
297 : 33 : ExecHashEstimate((HashState *) planstate, e->pcxt);
298 : 33 : break;
299 : : case T_SortState:
300 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
301 : 26 : ExecSortEstimate((SortState *) planstate, e->pcxt);
302 : 26 : break;
303 : : case T_IncrementalSortState:
304 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
305 : 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
306 : 0 : break;
307 : : case T_AggState:
308 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
309 : 95 : ExecAggEstimate((AggState *) planstate, e->pcxt);
310 : 95 : break;
311 : : case T_MemoizeState:
312 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
313 : 1 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
314 : 1 : break;
315 : : default:
316 : 23 : break;
317 : : }
318 : :
319 : 500 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
320 : 500 : }
321 : :
322 : : /*
323 : : * Estimate the amount of space required to serialize the indicated parameters.
324 : : */
325 : : static Size
326 : 4 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
327 : : {
328 : 4 : int paramid;
329 : 4 : Size sz = sizeof(int);
330 : :
331 : 4 : paramid = -1;
332 [ + + ]: 9 : while ((paramid = bms_next_member(params, paramid)) >= 0)
333 : : {
334 : 5 : Oid typeOid;
335 : 5 : int16 typLen;
336 : 5 : bool typByVal;
337 : 5 : ParamExecData *prm;
338 : :
339 : 5 : prm = &(estate->es_param_exec_vals[paramid]);
340 : 10 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
341 : 5 : paramid);
342 : :
343 : 5 : sz = add_size(sz, sizeof(int)); /* space for paramid */
344 : :
345 : : /* space for datum/isnull */
346 [ + - ]: 5 : if (OidIsValid(typeOid))
347 : 5 : get_typlenbyval(typeOid, &typLen, &typByVal);
348 : : else
349 : : {
350 : : /* If no type OID, assume by-value, like copyParamList does. */
351 : 0 : typLen = sizeof(Datum);
352 : 0 : typByVal = true;
353 : : }
354 : 10 : sz = add_size(sz,
355 : 10 : datumEstimateSpace(prm->value, prm->isnull,
356 : 5 : typByVal, typLen));
357 : 5 : }
358 : 8 : return sz;
359 : 4 : }
360 : :
361 : : /*
362 : : * Serialize specified PARAM_EXEC parameters.
363 : : *
364 : : * We write the number of parameters first, as a 4-byte integer, and then
365 : : * write details for each parameter in turn. The details for each parameter
366 : : * consist of a 4-byte paramid (location of param in execution time internal
367 : : * parameter array) and then the datum as serialized by datumSerialize().
368 : : */
369 : : static dsa_pointer
370 : 4 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
371 : : {
372 : 4 : Size size;
373 : 4 : int nparams;
374 : 4 : int paramid;
375 : 4 : ParamExecData *prm;
376 : 4 : dsa_pointer handle;
377 : 4 : char *start_address;
378 : :
379 : : /* Allocate enough space for the current parameter values. */
380 : 4 : size = EstimateParamExecSpace(estate, params);
381 : 4 : handle = dsa_allocate(area, size);
382 : 4 : start_address = dsa_get_address(area, handle);
383 : :
384 : : /* First write the number of parameters as a 4-byte integer. */
385 : 4 : nparams = bms_num_members(params);
386 : 4 : memcpy(start_address, &nparams, sizeof(int));
387 : 4 : start_address += sizeof(int);
388 : :
389 : : /* Write details for each parameter in turn. */
390 : 4 : paramid = -1;
391 [ + + ]: 9 : while ((paramid = bms_next_member(params, paramid)) >= 0)
392 : : {
393 : 5 : Oid typeOid;
394 : 5 : int16 typLen;
395 : 5 : bool typByVal;
396 : :
397 : 5 : prm = &(estate->es_param_exec_vals[paramid]);
398 : 10 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
399 : 5 : paramid);
400 : :
401 : : /* Write paramid. */
402 : 5 : memcpy(start_address, ¶mid, sizeof(int));
403 : 5 : start_address += sizeof(int);
404 : :
405 : : /* Write datum/isnull */
406 [ + - ]: 5 : if (OidIsValid(typeOid))
407 : 5 : get_typlenbyval(typeOid, &typLen, &typByVal);
408 : : else
409 : : {
410 : : /* If no type OID, assume by-value, like copyParamList does. */
411 : 0 : typLen = sizeof(Datum);
412 : 0 : typByVal = true;
413 : : }
414 : 5 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
415 : : &start_address);
416 : 5 : }
417 : :
418 : 8 : return handle;
419 : 4 : }
420 : :
421 : : /*
422 : : * Restore specified PARAM_EXEC parameters.
423 : : */
424 : : static void
425 : 12 : RestoreParamExecParams(char *start_address, EState *estate)
426 : : {
427 : 12 : int nparams;
428 : 12 : int i;
429 : 12 : int paramid;
430 : :
431 : 12 : memcpy(&nparams, start_address, sizeof(int));
432 : 12 : start_address += sizeof(int);
433 : :
434 [ + + ]: 26 : for (i = 0; i < nparams; i++)
435 : : {
436 : 14 : ParamExecData *prm;
437 : :
438 : : /* Read paramid */
439 : 14 : memcpy(¶mid, start_address, sizeof(int));
440 : 14 : start_address += sizeof(int);
441 : 14 : prm = &(estate->es_param_exec_vals[paramid]);
442 : :
443 : : /* Read datum/isnull. */
444 : 14 : prm->value = datumRestore(&start_address, &prm->isnull);
445 : 14 : prm->execPlan = NULL;
446 : 14 : }
447 : 12 : }
448 : :
449 : : /*
450 : : * Initialize the dynamic shared memory segment that will be used to control
451 : : * parallel execution.
452 : : */
453 : : static bool
454 : 500 : ExecParallelInitializeDSM(PlanState *planstate,
455 : : ExecParallelInitializeDSMContext *d)
456 : : {
457 [ + - ]: 500 : if (planstate == NULL)
458 : 0 : return false;
459 : :
460 : : /* If instrumentation is enabled, initialize slot for this node. */
461 [ + + ]: 500 : if (d->instrumentation != NULL)
462 : 171 : d->instrumentation->plan_node_id[d->nnodes] =
463 : 171 : planstate->plan->plan_node_id;
464 : :
465 : : /* Count this node. */
466 : 500 : d->nnodes++;
467 : :
468 : : /*
469 : : * Call initializers for DSM-using plan nodes.
470 : : *
471 : : * Most plan nodes won't do anything here, but plan nodes that allocated
472 : : * DSM may need to initialize shared state in the DSM before parallel
473 : : * workers are launched. They can allocate the space they previously
474 : : * estimated using shm_toc_allocate, and add the keys they previously
475 : : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
476 : : */
477 [ + + + + : 500 : switch (nodeTag(planstate))
+ + + + +
- + + - +
- + ]
478 : : {
479 : : case T_SeqScanState:
480 [ + + ]: 190 : if (planstate->plan->parallel_aware)
481 : 302 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
482 : 151 : d->pcxt);
483 : 190 : break;
484 : : case T_IndexScanState:
485 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
486 : 49 : ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
487 : 49 : break;
488 : : case T_IndexOnlyScanState:
489 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
490 : 18 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
491 : 9 : d->pcxt);
492 : 9 : break;
493 : : case T_BitmapIndexScanState:
494 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
495 : 3 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
496 : 3 : break;
497 : : case T_ForeignScanState:
498 [ # # ]: 0 : if (planstate->plan->parallel_aware)
499 : 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
500 : 0 : d->pcxt);
501 : 0 : break;
502 : : case T_TidRangeScanState:
503 [ - + ]: 4 : if (planstate->plan->parallel_aware)
504 : 8 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
505 : 4 : d->pcxt);
506 : 4 : break;
507 : : case T_AppendState:
508 [ + + ]: 31 : if (planstate->plan->parallel_aware)
509 : 46 : ExecAppendInitializeDSM((AppendState *) planstate,
510 : 23 : d->pcxt);
511 : 31 : break;
512 : : case T_CustomScanState:
513 [ # # ]: 0 : if (planstate->plan->parallel_aware)
514 : 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
515 : 0 : d->pcxt);
516 : 0 : break;
517 : : case T_BitmapHeapScanState:
518 [ - + ]: 3 : if (planstate->plan->parallel_aware)
519 : 6 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
520 : 3 : d->pcxt);
521 : 3 : break;
522 : : case T_HashJoinState:
523 [ + + ]: 33 : if (planstate->plan->parallel_aware)
524 : 42 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
525 : 21 : d->pcxt);
526 : 33 : break;
527 : : case T_HashState:
528 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
529 : 33 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
530 : 33 : break;
531 : : case T_SortState:
532 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
533 : 26 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
534 : 26 : break;
535 : : case T_IncrementalSortState:
536 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
537 : 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
538 : 0 : break;
539 : : case T_AggState:
540 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
541 : 95 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
542 : 95 : break;
543 : : case T_MemoizeState:
544 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
545 : 1 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
546 : 1 : break;
547 : : default:
548 : 23 : break;
549 : : }
550 : :
551 : 500 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
552 : 500 : }
553 : :
554 : : /*
555 : : * It sets up the response queues for backend workers to return tuples
556 : : * to the main backend and start the workers.
557 : : */
558 : : static shm_mq_handle **
559 : 164 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
560 : : {
561 : 164 : shm_mq_handle **responseq;
562 : 164 : char *tqueuespace;
563 : 164 : int i;
564 : :
565 : : /* Skip this if no workers. */
566 [ + - ]: 164 : if (pcxt->nworkers == 0)
567 : 0 : return NULL;
568 : :
569 : : /* Allocate memory for shared memory queue handles. */
570 : 164 : responseq = (shm_mq_handle **)
571 : 164 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
572 : :
573 : : /*
574 : : * If not reinitializing, allocate space from the DSM for the queues;
575 : : * otherwise, find the already allocated space.
576 : : */
577 [ + + ]: 164 : if (!reinitialize)
578 : 121 : tqueuespace =
579 : 242 : shm_toc_allocate(pcxt->toc,
580 : 121 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
581 : 121 : pcxt->nworkers));
582 : : else
583 : 43 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
584 : :
585 : : /* Create the queues, and become the receiver for each. */
586 [ + + ]: 613 : for (i = 0; i < pcxt->nworkers; ++i)
587 : : {
588 : 449 : shm_mq *mq;
589 : :
590 : 898 : mq = shm_mq_create(tqueuespace +
591 : 449 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
592 : : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
593 : :
594 : 449 : shm_mq_set_receiver(mq, MyProc);
595 : 449 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
596 : 449 : }
597 : :
598 : : /* Add array of queues to shm_toc, so others can find it. */
599 [ + + ]: 164 : if (!reinitialize)
600 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
601 : :
602 : : /* Return array of handles. */
603 : 164 : return responseq;
604 : 164 : }
605 : :
606 : : /*
607 : : * Sets up the required infrastructure for backend workers to perform
608 : : * execution and return results to the main backend.
609 : : */
610 : : ParallelExecutorInfo *
611 : 121 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
612 : : Bitmapset *sendParams, int nworkers,
613 : : int64 tuples_needed)
614 : : {
615 : 121 : ParallelExecutorInfo *pei;
616 : 121 : ParallelContext *pcxt;
617 : 121 : ExecParallelEstimateContext e;
618 : 121 : ExecParallelInitializeDSMContext d;
619 : 121 : FixedParallelExecutorState *fpes;
620 : 121 : char *pstmt_data;
621 : 121 : char *pstmt_space;
622 : 121 : char *paramlistinfo_space;
623 : 121 : BufferUsage *bufusage_space;
624 : 121 : WalUsage *walusage_space;
625 : 121 : SharedExecutorInstrumentation *instrumentation = NULL;
626 : 121 : SharedJitInstrumentation *jit_instrumentation = NULL;
627 : 121 : int pstmt_len;
628 : 121 : int paramlistinfo_len;
629 : 121 : int instrumentation_len = 0;
630 : 121 : int jit_instrumentation_len = 0;
631 : 121 : int instrument_offset = 0;
632 : 121 : Size dsa_minsize = dsa_minimum_size();
633 : 121 : char *query_string;
634 : 121 : int query_len;
635 : :
636 : : /*
637 : : * Force any initplan outputs that we're going to pass to workers to be
638 : : * evaluated, if they weren't already.
639 : : *
640 : : * For simplicity, we use the EState's per-output-tuple ExprContext here.
641 : : * That risks intra-query memory leakage, since we might pass through here
642 : : * many times before that ExprContext gets reset; but ExecSetParamPlan
643 : : * doesn't normally leak any memory in the context (see its comments), so
644 : : * it doesn't seem worth complicating this function's API to pass it a
645 : : * shorter-lived ExprContext. This might need to change someday.
646 : : */
647 [ + + ]: 121 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
648 : :
649 : : /* Allocate object for return value. */
650 : 121 : pei = palloc0_object(ParallelExecutorInfo);
651 : 121 : pei->finished = false;
652 : 121 : pei->planstate = planstate;
653 : :
654 : : /* Fix up and serialize plan to be sent to workers. */
655 : 121 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
656 : :
657 : : /* Create a parallel context. */
658 : 121 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
659 : 121 : pei->pcxt = pcxt;
660 : :
661 : : /*
662 : : * Before telling the parallel context to create a dynamic shared memory
663 : : * segment, we need to figure out how big it should be. Estimate space
664 : : * for the various things we need to store.
665 : : */
666 : :
667 : : /* Estimate space for fixed-size state. */
668 : 121 : shm_toc_estimate_chunk(&pcxt->estimator,
669 : : sizeof(FixedParallelExecutorState));
670 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
671 : :
672 : : /* Estimate space for query text. */
673 : 121 : query_len = strlen(estate->es_sourceText);
674 : 121 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
675 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
676 : :
677 : : /* Estimate space for serialized PlannedStmt. */
678 : 121 : pstmt_len = strlen(pstmt_data) + 1;
679 : 121 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
680 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
681 : :
682 : : /* Estimate space for serialized ParamListInfo. */
683 : 121 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
684 : 121 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
685 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
686 : :
687 : : /*
688 : : * Estimate space for BufferUsage.
689 : : *
690 : : * If EXPLAIN is not in use and there are no extensions loaded that care,
691 : : * we could skip this. But we have no way of knowing whether anyone's
692 : : * looking at pgBufferUsage, so do it unconditionally.
693 : : */
694 : 121 : shm_toc_estimate_chunk(&pcxt->estimator,
695 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
696 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
697 : :
698 : : /*
699 : : * Same thing for WalUsage.
700 : : */
701 : 121 : shm_toc_estimate_chunk(&pcxt->estimator,
702 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
703 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
704 : :
705 : : /* Estimate space for tuple queues. */
706 : 121 : shm_toc_estimate_chunk(&pcxt->estimator,
707 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
708 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
709 : :
710 : : /*
711 : : * Give parallel-aware nodes a chance to add to the estimates, and get a
712 : : * count of how many PlanState nodes there are.
713 : : */
714 : 121 : e.pcxt = pcxt;
715 : 121 : e.nnodes = 0;
716 : 121 : ExecParallelEstimate(planstate, &e);
717 : :
718 : : /* Estimate space for instrumentation, if required. */
719 [ + + ]: 121 : if (estate->es_instrument)
720 : : {
721 : 30 : instrumentation_len =
722 : 30 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
723 : 30 : sizeof(int) * e.nnodes;
724 : 30 : instrumentation_len = MAXALIGN(instrumentation_len);
725 : 30 : instrument_offset = instrumentation_len;
726 : 30 : instrumentation_len +=
727 : 30 : mul_size(sizeof(Instrumentation),
728 : 30 : mul_size(e.nnodes, nworkers));
729 : 30 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
730 : 30 : shm_toc_estimate_keys(&pcxt->estimator, 1);
731 : :
732 : : /* Estimate space for JIT instrumentation, if required. */
733 [ + - ]: 30 : if (estate->es_jit_flags != PGJIT_NONE)
734 : : {
735 : 0 : jit_instrumentation_len =
736 : 0 : offsetof(SharedJitInstrumentation, jit_instr) +
737 : 0 : sizeof(JitInstrumentation) * nworkers;
738 : 0 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
739 : 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
740 : 0 : }
741 : 30 : }
742 : :
743 : : /* Estimate space for DSA area. */
744 : 121 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
745 : 121 : shm_toc_estimate_keys(&pcxt->estimator, 1);
746 : :
747 : : /*
748 : : * InitializeParallelDSM() passes the active snapshot to the parallel
749 : : * worker, which uses it to set es_snapshot. Make sure we don't set
750 : : * es_snapshot differently in the child.
751 : : */
752 [ + - ]: 121 : Assert(GetActiveSnapshot() == estate->es_snapshot);
753 : :
754 : : /* Everyone's had a chance to ask for space, so now create the DSM. */
755 : 121 : InitializeParallelDSM(pcxt);
756 : :
757 : : /*
758 : : * OK, now we have a dynamic shared memory segment, and it should be big
759 : : * enough to store all of the data we estimated we would want to put into
760 : : * it, plus whatever general stuff (not specifically executor-related) the
761 : : * ParallelContext itself needs to store there. None of the space we
762 : : * asked for has been allocated or initialized yet, though, so do that.
763 : : */
764 : :
765 : : /* Store fixed-size state. */
766 : 121 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
767 : 121 : fpes->tuples_needed = tuples_needed;
768 : 121 : fpes->param_exec = InvalidDsaPointer;
769 : 121 : fpes->eflags = estate->es_top_eflags;
770 : 121 : fpes->jit_flags = estate->es_jit_flags;
771 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
772 : :
773 : : /* Store query string */
774 : 121 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
775 : 121 : memcpy(query_string, estate->es_sourceText, query_len + 1);
776 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
777 : :
778 : : /* Store serialized PlannedStmt. */
779 : 121 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
780 : 121 : memcpy(pstmt_space, pstmt_data, pstmt_len);
781 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
782 : :
783 : : /* Store serialized ParamListInfo. */
784 : 121 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
785 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
786 : 121 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
787 : :
788 : : /* Allocate space for each worker's BufferUsage; no need to initialize. */
789 : 242 : bufusage_space = shm_toc_allocate(pcxt->toc,
790 : 121 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
791 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
792 : 121 : pei->buffer_usage = bufusage_space;
793 : :
794 : : /* Same for WalUsage. */
795 : 242 : walusage_space = shm_toc_allocate(pcxt->toc,
796 : 121 : mul_size(sizeof(WalUsage), pcxt->nworkers));
797 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
798 : 121 : pei->wal_usage = walusage_space;
799 : :
800 : : /* Set up the tuple queues that the workers will write into. */
801 : 121 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
802 : :
803 : : /* We don't need the TupleQueueReaders yet, though. */
804 : 121 : pei->reader = NULL;
805 : :
806 : : /*
807 : : * If instrumentation options were supplied, allocate space for the data.
808 : : * It only gets partially initialized here; the rest happens during
809 : : * ExecParallelInitializeDSM.
810 : : */
811 [ + + ]: 121 : if (estate->es_instrument)
812 : : {
813 : 30 : Instrumentation *instrument;
814 : 30 : int i;
815 : :
816 : 30 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
817 : 30 : instrumentation->instrument_options = estate->es_instrument;
818 : 30 : instrumentation->instrument_offset = instrument_offset;
819 : 30 : instrumentation->num_workers = nworkers;
820 : 30 : instrumentation->num_plan_nodes = e.nnodes;
821 : 30 : instrument = GetInstrumentationArray(instrumentation);
822 [ + + ]: 310 : for (i = 0; i < nworkers * e.nnodes; ++i)
823 : 280 : InstrInit(&instrument[i], estate->es_instrument);
824 : 60 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
825 : 30 : instrumentation);
826 : 30 : pei->instrumentation = instrumentation;
827 : :
828 [ + - ]: 30 : if (estate->es_jit_flags != PGJIT_NONE)
829 : : {
830 : 0 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
831 : 0 : jit_instrumentation_len);
832 : 0 : jit_instrumentation->num_workers = nworkers;
833 : 0 : memset(jit_instrumentation->jit_instr, 0,
834 : : sizeof(JitInstrumentation) * nworkers);
835 : 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
836 : 0 : jit_instrumentation);
837 : 0 : pei->jit_instrumentation = jit_instrumentation;
838 : 0 : }
839 : 30 : }
840 : :
841 : : /*
842 : : * Create a DSA area that can be used by the leader and all workers.
843 : : * (However, if we failed to create a DSM and are using private memory
844 : : * instead, then skip this.)
845 : : */
846 [ + - ]: 121 : if (pcxt->seg != NULL)
847 : : {
848 : 121 : char *area_space;
849 : :
850 : 121 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
851 : 121 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
852 : 121 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
853 : : LWTRANCHE_PARALLEL_QUERY_DSA,
854 : : pcxt->seg);
855 : :
856 : : /*
857 : : * Serialize parameters, if any, using DSA storage. We don't dare use
858 : : * the main parallel query DSM for this because we might relaunch
859 : : * workers after the values have changed (and thus the amount of
860 : : * storage required has changed).
861 : : */
862 [ + + ]: 121 : if (!bms_is_empty(sendParams))
863 : : {
864 : 8 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
865 : 4 : pei->area);
866 : 4 : fpes->param_exec = pei->param_exec;
867 : 4 : }
868 : 121 : }
869 : :
870 : : /*
871 : : * Give parallel-aware nodes a chance to initialize their shared data.
872 : : * This also initializes the elements of instrumentation->ps_instrument,
873 : : * if it exists.
874 : : */
875 : 121 : d.pcxt = pcxt;
876 : 121 : d.instrumentation = instrumentation;
877 : 121 : d.nnodes = 0;
878 : :
879 : : /* Install our DSA area while initializing the plan. */
880 : 121 : estate->es_query_dsa = pei->area;
881 : 121 : ExecParallelInitializeDSM(planstate, &d);
882 : 121 : estate->es_query_dsa = NULL;
883 : :
884 : : /*
885 : : * Make sure that the world hasn't shifted under our feet. This could
886 : : * probably just be an Assert(), but let's be conservative for now.
887 : : */
888 [ + - ]: 121 : if (e.nnodes != d.nnodes)
889 [ # # # # ]: 0 : elog(ERROR, "inconsistent count of PlanState nodes");
890 : :
891 : : /* OK, we're ready to rock and roll. */
892 : 242 : return pei;
893 : 121 : }
894 : :
895 : : /*
896 : : * Set up tuple queue readers to read the results of a parallel subplan.
897 : : *
898 : : * This is separate from ExecInitParallelPlan() because we can launch the
899 : : * worker processes and let them start doing something before we do this.
900 : : */
901 : : void
902 : 161 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
903 : : {
904 : 161 : int nworkers = pei->pcxt->nworkers_launched;
905 : 161 : int i;
906 : :
907 [ + - ]: 161 : Assert(pei->reader == NULL);
908 : :
909 [ + - ]: 161 : if (nworkers > 0)
910 : : {
911 : 161 : pei->reader = (TupleQueueReader **)
912 : 161 : palloc(nworkers * sizeof(TupleQueueReader *));
913 : :
914 [ + + ]: 597 : for (i = 0; i < nworkers; i++)
915 : : {
916 : 872 : shm_mq_set_handle(pei->tqueue[i],
917 : 436 : pei->pcxt->worker[i].bgwhandle);
918 : 436 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
919 : 436 : }
920 : 161 : }
921 : 161 : }
922 : :
923 : : /*
924 : : * Re-initialize the parallel executor shared memory state before launching
925 : : * a fresh batch of workers.
926 : : */
927 : : void
928 : 43 : ExecParallelReinitialize(PlanState *planstate,
929 : : ParallelExecutorInfo *pei,
930 : : Bitmapset *sendParams)
931 : : {
932 : 43 : EState *estate = planstate->state;
933 : 43 : FixedParallelExecutorState *fpes;
934 : :
935 : : /* Old workers must already be shut down */
936 [ + - ]: 43 : Assert(pei->finished);
937 : :
938 : : /*
939 : : * Force any initplan outputs that we're going to pass to workers to be
940 : : * evaluated, if they weren't already (see comments in
941 : : * ExecInitParallelPlan).
942 : : */
943 [ + - ]: 43 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
944 : :
945 : 43 : ReinitializeParallelDSM(pei->pcxt);
946 : 43 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
947 : 43 : pei->reader = NULL;
948 : 43 : pei->finished = false;
949 : :
950 : 43 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
951 : :
952 : : /* Free any serialized parameters from the last round. */
953 [ + - ]: 43 : if (DsaPointerIsValid(fpes->param_exec))
954 : : {
955 : 0 : dsa_free(pei->area, fpes->param_exec);
956 : 0 : fpes->param_exec = InvalidDsaPointer;
957 : 0 : }
958 : :
959 : : /* Serialize current parameter values if required. */
960 [ + - ]: 43 : if (!bms_is_empty(sendParams))
961 : : {
962 : 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
963 : 0 : pei->area);
964 : 0 : fpes->param_exec = pei->param_exec;
965 : 0 : }
966 : :
967 : : /* Traverse plan tree and let each child node reset associated state. */
968 : 43 : estate->es_query_dsa = pei->area;
969 : 43 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
970 : 43 : estate->es_query_dsa = NULL;
971 : 43 : }
972 : :
973 : : /*
974 : : * Traverse plan tree to reinitialize per-node dynamic shared memory state
975 : : */
976 : : static bool
977 : 111 : ExecParallelReInitializeDSM(PlanState *planstate,
978 : : ParallelContext *pcxt)
979 : : {
980 [ + - ]: 111 : if (planstate == NULL)
981 : 0 : return false;
982 : :
983 : : /*
984 : : * Call reinitializers for DSM-using plan nodes.
985 : : */
986 [ + + + + : 111 : switch (nodeTag(planstate))
+ - - - -
+ + ]
987 : : {
988 : : case T_SeqScanState:
989 [ + + ]: 46 : if (planstate->plan->parallel_aware)
990 : 76 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
991 : 38 : pcxt);
992 : 46 : break;
993 : : case T_IndexScanState:
994 [ - + ]: 2 : if (planstate->plan->parallel_aware)
995 : 4 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
996 : 2 : pcxt);
997 : 2 : break;
998 : : case T_IndexOnlyScanState:
999 [ - + ]: 2 : if (planstate->plan->parallel_aware)
1000 : 4 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1001 : 2 : pcxt);
1002 : 2 : break;
1003 : : case T_ForeignScanState:
1004 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1005 : 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1006 : 0 : pcxt);
1007 : 0 : break;
1008 : : case T_TidRangeScanState:
1009 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1010 : 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1011 : 0 : pcxt);
1012 : 0 : break;
1013 : : case T_AppendState:
1014 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1015 : 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1016 : 0 : break;
1017 : : case T_CustomScanState:
1018 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1019 : 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1020 : 0 : pcxt);
1021 : 0 : break;
1022 : : case T_BitmapHeapScanState:
1023 [ - + ]: 9 : if (planstate->plan->parallel_aware)
1024 : 18 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1025 : 9 : pcxt);
1026 : 9 : break;
1027 : : case T_HashJoinState:
1028 [ + + ]: 16 : if (planstate->plan->parallel_aware)
1029 : 16 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1030 : 8 : pcxt);
1031 : 16 : break;
1032 : : case T_BitmapIndexScanState:
1033 : : case T_HashState:
1034 : : case T_SortState:
1035 : : case T_IncrementalSortState:
1036 : : case T_MemoizeState:
1037 : : /* these nodes have DSM state, but no reinitialization is required */
1038 : 30 : break;
1039 : :
1040 : : default:
1041 : 6 : break;
1042 : : }
1043 : :
1044 : 111 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1045 : 111 : }
1046 : :
1047 : : /*
1048 : : * Copy instrumentation information about this node and its descendants from
1049 : : * dynamic shared memory.
1050 : : */
1051 : : static bool
1052 : 171 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1053 : : SharedExecutorInstrumentation *instrumentation)
1054 : : {
1055 : 171 : Instrumentation *instrument;
1056 : 171 : int i;
1057 : 171 : int n;
1058 : 171 : int ibytes;
1059 : 171 : int plan_node_id = planstate->plan->plan_node_id;
1060 : 171 : MemoryContext oldcontext;
1061 : :
1062 : : /* Find the instrumentation for this node. */
1063 [ - + ]: 773 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1064 [ + + ]: 773 : if (instrumentation->plan_node_id[i] == plan_node_id)
1065 : 171 : break;
1066 [ + - ]: 171 : if (i >= instrumentation->num_plan_nodes)
1067 [ # # # # ]: 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1068 : :
1069 : : /* Accumulate the statistics from all workers. */
1070 : 171 : instrument = GetInstrumentationArray(instrumentation);
1071 : 171 : instrument += i * instrumentation->num_workers;
1072 [ + + ]: 451 : for (n = 0; n < instrumentation->num_workers; ++n)
1073 : 280 : InstrAggNode(planstate->instrument, &instrument[n]);
1074 : :
1075 : : /*
1076 : : * Also store the per-worker detail.
1077 : : *
1078 : : * Worker instrumentation should be allocated in the same context as the
1079 : : * regular instrumentation information, which is the per-query context.
1080 : : * Switch into per-query memory context.
1081 : : */
1082 : 171 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1083 : 171 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1084 : 171 : planstate->worker_instrument =
1085 : 171 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1086 : 171 : MemoryContextSwitchTo(oldcontext);
1087 : :
1088 : 171 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1089 : 171 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1090 : :
1091 : : /* Perform any node-type-specific work that needs to be done. */
1092 [ + + + - : 171 : switch (nodeTag(planstate))
- - + + -
- ]
1093 : : {
1094 : : case T_IndexScanState:
1095 : 45 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1096 : 45 : break;
1097 : : case T_IndexOnlyScanState:
1098 : 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1099 : 0 : break;
1100 : : case T_BitmapIndexScanState:
1101 : 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1102 : 0 : break;
1103 : : case T_SortState:
1104 : 2 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1105 : 2 : break;
1106 : : case T_IncrementalSortState:
1107 : 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1108 : 0 : break;
1109 : : case T_HashState:
1110 : 14 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1111 : 14 : break;
1112 : : case T_AggState:
1113 : 17 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1114 : 17 : break;
1115 : : case T_MemoizeState:
1116 : 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1117 : 0 : break;
1118 : : case T_BitmapHeapScanState:
1119 : 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1120 : 0 : break;
1121 : : default:
1122 : 93 : break;
1123 : : }
1124 : :
1125 : 342 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1126 : : instrumentation);
1127 : 171 : }
1128 : :
1129 : : /*
1130 : : * Add up the workers' JIT instrumentation from dynamic shared memory.
1131 : : */
1132 : : static void
1133 : 0 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1134 : : SharedJitInstrumentation *shared_jit)
1135 : : {
1136 : 0 : JitInstrumentation *combined;
1137 : 0 : int ibytes;
1138 : :
1139 : 0 : int n;
1140 : :
1141 : : /*
1142 : : * Accumulate worker JIT instrumentation into the combined JIT
1143 : : * instrumentation, allocating it if required.
1144 : : */
1145 [ # # ]: 0 : if (!planstate->state->es_jit_worker_instr)
1146 : 0 : planstate->state->es_jit_worker_instr =
1147 : 0 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1148 : 0 : combined = planstate->state->es_jit_worker_instr;
1149 : :
1150 : : /* Accumulate all the workers' instrumentations. */
1151 [ # # ]: 0 : for (n = 0; n < shared_jit->num_workers; ++n)
1152 : 0 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1153 : :
1154 : : /*
1155 : : * Store the per-worker detail.
1156 : : *
1157 : : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1158 : : * instrumentation in per-query context.
1159 : : */
1160 : 0 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1161 : 0 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1162 : 0 : planstate->worker_jit_instrument =
1163 : 0 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1164 : :
1165 : 0 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1166 : 0 : }
1167 : :
1168 : : /*
1169 : : * Finish parallel execution. We wait for parallel workers to finish, and
1170 : : * accumulate their buffer/WAL usage.
1171 : : */
1172 : : void
1173 : 295 : ExecParallelFinish(ParallelExecutorInfo *pei)
1174 : : {
1175 : 295 : int nworkers = pei->pcxt->nworkers_launched;
1176 : 295 : int i;
1177 : :
1178 : : /* Make this be a no-op if called twice in a row. */
1179 [ + + ]: 295 : if (pei->finished)
1180 : 133 : return;
1181 : :
1182 : : /*
1183 : : * Detach from tuple queues ASAP, so that any still-active workers will
1184 : : * notice that no further results are wanted.
1185 : : */
1186 [ - + ]: 162 : if (pei->tqueue != NULL)
1187 : : {
1188 [ + + ]: 596 : for (i = 0; i < nworkers; i++)
1189 : 434 : shm_mq_detach(pei->tqueue[i]);
1190 : 162 : pfree(pei->tqueue);
1191 : 162 : pei->tqueue = NULL;
1192 : 162 : }
1193 : :
1194 : : /*
1195 : : * While we're waiting for the workers to finish, let's get rid of the
1196 : : * tuple queue readers. (Any other local cleanup could be done here too.)
1197 : : */
1198 [ + + ]: 162 : if (pei->reader != NULL)
1199 : : {
1200 [ + + ]: 593 : for (i = 0; i < nworkers; i++)
1201 : 434 : DestroyTupleQueueReader(pei->reader[i]);
1202 : 159 : pfree(pei->reader);
1203 : 159 : pei->reader = NULL;
1204 : 159 : }
1205 : :
1206 : : /* Now wait for the workers to finish. */
1207 : 162 : WaitForParallelWorkersToFinish(pei->pcxt);
1208 : :
1209 : : /*
1210 : : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1211 : : * finish, or we might get incomplete data.)
1212 : : */
1213 [ + + ]: 596 : for (i = 0; i < nworkers; i++)
1214 : 434 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1215 : :
1216 : 162 : pei->finished = true;
1217 [ - + ]: 295 : }
1218 : :
1219 : : /*
1220 : : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1221 : : * resources still exist after ExecParallelFinish. We separate these
1222 : : * routines because someone might want to examine the contents of the DSM
1223 : : * after ExecParallelFinish and before calling this routine.
1224 : : */
1225 : : void
1226 : 119 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1227 : : {
1228 : : /* Accumulate instrumentation, if any. */
1229 [ + + ]: 119 : if (pei->instrumentation)
1230 : 60 : ExecParallelRetrieveInstrumentation(pei->planstate,
1231 : 30 : pei->instrumentation);
1232 : :
1233 : : /* Accumulate JIT instrumentation, if any. */
1234 [ + - ]: 119 : if (pei->jit_instrumentation)
1235 : 0 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1236 : 0 : pei->jit_instrumentation);
1237 : :
1238 : : /* Free any serialized parameters. */
1239 [ + + ]: 119 : if (DsaPointerIsValid(pei->param_exec))
1240 : : {
1241 : 4 : dsa_free(pei->area, pei->param_exec);
1242 : 4 : pei->param_exec = InvalidDsaPointer;
1243 : 4 : }
1244 [ - + ]: 119 : if (pei->area != NULL)
1245 : : {
1246 : 119 : dsa_detach(pei->area);
1247 : 119 : pei->area = NULL;
1248 : 119 : }
1249 [ - + ]: 119 : if (pei->pcxt != NULL)
1250 : : {
1251 : 119 : DestroyParallelContext(pei->pcxt);
1252 : 119 : pei->pcxt = NULL;
1253 : 119 : }
1254 : 119 : pfree(pei);
1255 : 119 : }
1256 : :
1257 : : /*
1258 : : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1259 : : * for that purpose.
1260 : : */
1261 : : static DestReceiver *
1262 : 436 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1263 : : {
1264 : 436 : char *mqspace;
1265 : 436 : shm_mq *mq;
1266 : :
1267 : 436 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1268 : 436 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1269 : 436 : mq = (shm_mq *) mqspace;
1270 : 436 : shm_mq_set_sender(mq, MyProc);
1271 : 872 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1272 : 436 : }
1273 : :
1274 : : /*
1275 : : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1276 : : */
1277 : : static QueryDesc *
1278 : 436 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1279 : : int instrument_options)
1280 : : {
1281 : 436 : char *pstmtspace;
1282 : 436 : char *paramspace;
1283 : 436 : PlannedStmt *pstmt;
1284 : 436 : ParamListInfo paramLI;
1285 : 436 : char *queryString;
1286 : :
1287 : : /* Get the query string from shared memory */
1288 : 436 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1289 : :
1290 : : /* Reconstruct leader-supplied PlannedStmt. */
1291 : 436 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1292 : 436 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1293 : :
1294 : : /* Reconstruct ParamListInfo. */
1295 : 436 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1296 : 436 : paramLI = RestoreParamList(¶mspace);
1297 : :
1298 : : /* Create a QueryDesc for the query. */
1299 : 1308 : return CreateQueryDesc(pstmt,
1300 : 436 : queryString,
1301 : 436 : GetActiveSnapshot(), InvalidSnapshot,
1302 : 436 : receiver, paramLI, NULL, instrument_options);
1303 : 436 : }
1304 : :
1305 : : /*
1306 : : * Copy instrumentation information from this node and its descendants into
1307 : : * dynamic shared memory, so that the parallel leader can retrieve it.
1308 : : */
1309 : : static bool
1310 : 396 : ExecParallelReportInstrumentation(PlanState *planstate,
1311 : : SharedExecutorInstrumentation *instrumentation)
1312 : : {
1313 : 396 : int i;
1314 : 396 : int plan_node_id = planstate->plan->plan_node_id;
1315 : 396 : Instrumentation *instrument;
1316 : :
1317 : 396 : InstrEndLoop(planstate->instrument);
1318 : :
1319 : : /*
1320 : : * If we shuffled the plan_node_id values in ps_instrument into sorted
1321 : : * order, we could use binary search here. This might matter someday if
1322 : : * we're pushing down sufficiently large plan trees. For now, do it the
1323 : : * slow, dumb way.
1324 : : */
1325 [ - + ]: 1302 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1326 [ + + ]: 1302 : if (instrumentation->plan_node_id[i] == plan_node_id)
1327 : 396 : break;
1328 [ + - ]: 396 : if (i >= instrumentation->num_plan_nodes)
1329 [ # # # # ]: 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1330 : :
1331 : : /*
1332 : : * Add our statistics to the per-node, per-worker totals. It's possible
1333 : : * that this could happen more than once if we relaunched workers.
1334 : : */
1335 : 396 : instrument = GetInstrumentationArray(instrumentation);
1336 : 396 : instrument += i * instrumentation->num_workers;
1337 [ + - ]: 396 : Assert(IsParallelWorker());
1338 [ + - ]: 396 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1339 : 396 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1340 : :
1341 : 792 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1342 : : instrumentation);
1343 : 396 : }
1344 : :
1345 : : /*
1346 : : * Initialize the PlanState and its descendants with the information
1347 : : * retrieved from shared memory. This has to be done once the PlanState
1348 : : * is allocated and initialized by executor; that is, after ExecutorStart().
1349 : : */
1350 : : static bool
1351 : 1399 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1352 : : {
1353 [ + - ]: 1399 : if (planstate == NULL)
1354 : 0 : return false;
1355 : :
1356 [ + + + + : 1399 : switch (nodeTag(planstate))
+ + + + +
- + + - +
- + ]
1357 : : {
1358 : : case T_SeqScanState:
1359 [ + + ]: 556 : if (planstate->plan->parallel_aware)
1360 : 452 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1361 : 556 : break;
1362 : : case T_IndexScanState:
1363 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1364 : 66 : ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
1365 : 66 : break;
1366 : : case T_IndexOnlyScanState:
1367 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1368 : 78 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1369 : 39 : pwcxt);
1370 : 39 : break;
1371 : : case T_BitmapIndexScanState:
1372 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1373 : 90 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1374 : 45 : pwcxt);
1375 : 45 : break;
1376 : : case T_ForeignScanState:
1377 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1378 : 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1379 : 0 : pwcxt);
1380 : 0 : break;
1381 : : case T_TidRangeScanState:
1382 [ - + ]: 16 : if (planstate->plan->parallel_aware)
1383 : 32 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1384 : 16 : pwcxt);
1385 : 16 : break;
1386 : : case T_AppendState:
1387 [ + + ]: 63 : if (planstate->plan->parallel_aware)
1388 : 53 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1389 : 63 : break;
1390 : : case T_CustomScanState:
1391 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1392 : 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1393 : 0 : pwcxt);
1394 : 0 : break;
1395 : : case T_BitmapHeapScanState:
1396 [ - + ]: 45 : if (planstate->plan->parallel_aware)
1397 : 90 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1398 : 45 : pwcxt);
1399 : 45 : break;
1400 : : case T_HashJoinState:
1401 [ + + ]: 94 : if (planstate->plan->parallel_aware)
1402 : 108 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1403 : 54 : pwcxt);
1404 : 94 : break;
1405 : : case T_HashState:
1406 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1407 : 94 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1408 : 94 : break;
1409 : : case T_SortState:
1410 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1411 : 77 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1412 : 77 : break;
1413 : : case T_IncrementalSortState:
1414 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1415 : 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1416 : 0 : pwcxt);
1417 : 0 : break;
1418 : : case T_AggState:
1419 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1420 : 271 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1421 : 271 : break;
1422 : : case T_MemoizeState:
1423 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1424 : 2 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1425 : 2 : break;
1426 : : default:
1427 : 31 : break;
1428 : : }
1429 : :
1430 : 1399 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1431 : : pwcxt);
1432 : 1399 : }
1433 : :
1434 : : /*
1435 : : * Main entrypoint for parallel query worker processes.
1436 : : *
1437 : : * We reach this function from ParallelWorkerMain, so the setup necessary to
1438 : : * create a sensible parallel environment has already been done;
1439 : : * ParallelWorkerMain worries about stuff like the transaction state, combo
1440 : : * CID mappings, and GUC values, so we don't need to deal with any of that
1441 : : * here.
1442 : : *
1443 : : * Our job is to deal with concerns specific to the executor. The parallel
1444 : : * group leader will have stored a serialized PlannedStmt, and it's our job
1445 : : * to execute that plan and write the resulting tuples to the appropriate
1446 : : * tuple queue. Various bits of supporting information that we need in order
1447 : : * to do this are also stored in the dsm_segment and can be accessed through
1448 : : * the shm_toc.
1449 : : */
1450 : : void
1451 : 436 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1452 : : {
1453 : 436 : FixedParallelExecutorState *fpes;
1454 : 436 : BufferUsage *buffer_usage;
1455 : 436 : WalUsage *wal_usage;
1456 : 436 : DestReceiver *receiver;
1457 : 436 : QueryDesc *queryDesc;
1458 : 436 : SharedExecutorInstrumentation *instrumentation;
1459 : 436 : SharedJitInstrumentation *jit_instrumentation;
1460 : 436 : int instrument_options = 0;
1461 : 436 : void *area_space;
1462 : 436 : dsa_area *area;
1463 : 436 : ParallelWorkerContext pwcxt;
1464 : :
1465 : : /* Get fixed-size state. */
1466 : 436 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1467 : :
1468 : : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1469 : 436 : receiver = ExecParallelGetReceiver(seg, toc);
1470 : 436 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1471 [ + + ]: 436 : if (instrumentation != NULL)
1472 : 121 : instrument_options = instrumentation->instrument_options;
1473 : 436 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1474 : : true);
1475 : 436 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1476 : :
1477 : : /* Setting debug_query_string for individual workers */
1478 : 436 : debug_query_string = queryDesc->sourceText;
1479 : :
1480 : : /* Report workers' query for monitoring purposes */
1481 : 436 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1482 : :
1483 : : /* Attach to the dynamic shared memory area. */
1484 : 436 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1485 : 436 : area = dsa_attach_in_place(area_space, seg);
1486 : :
1487 : : /* Start up the executor */
1488 : 436 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1489 : 436 : ExecutorStart(queryDesc, fpes->eflags);
1490 : :
1491 : : /* Special executor initialization steps for parallel workers */
1492 : 436 : queryDesc->planstate->state->es_query_dsa = area;
1493 [ + + ]: 436 : if (DsaPointerIsValid(fpes->param_exec))
1494 : : {
1495 : 12 : char *paramexec_space;
1496 : :
1497 : 12 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1498 : 12 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1499 : 12 : }
1500 : 436 : pwcxt.toc = toc;
1501 : 436 : pwcxt.seg = seg;
1502 : 436 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1503 : :
1504 : : /* Pass down any tuple bound */
1505 : 436 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1506 : :
1507 : : /*
1508 : : * Prepare to track buffer/WAL usage during query execution.
1509 : : *
1510 : : * We do this after starting up the executor to match what happens in the
1511 : : * leader, which also doesn't count buffer accesses and WAL activity that
1512 : : * occur during executor startup.
1513 : : */
1514 : 436 : InstrStartParallelQuery();
1515 : :
1516 : : /*
1517 : : * Run the plan. If we specified a tuple bound, be careful not to demand
1518 : : * more tuples than that.
1519 : : */
1520 : 872 : ExecutorRun(queryDesc,
1521 : : ForwardScanDirection,
1522 [ + + ]: 436 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1523 : :
1524 : : /* Shut down the executor */
1525 : 436 : ExecutorFinish(queryDesc);
1526 : :
1527 : : /* Report buffer/WAL usage during parallel execution. */
1528 : 436 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1529 : 436 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1530 : 872 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1531 : 436 : &wal_usage[ParallelWorkerNumber]);
1532 : :
1533 : : /* Report instrumentation data if any instrumentation options are set. */
1534 [ + + ]: 436 : if (instrumentation != NULL)
1535 : 242 : ExecParallelReportInstrumentation(queryDesc->planstate,
1536 : 121 : instrumentation);
1537 : :
1538 : : /* Report JIT instrumentation data if any */
1539 [ - + # # ]: 436 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1540 : : {
1541 [ # # ]: 0 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1542 : 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1543 : 0 : queryDesc->estate->es_jit->instr;
1544 : 0 : }
1545 : :
1546 : : /* Must do this after capturing instrumentation. */
1547 : 436 : ExecutorEnd(queryDesc);
1548 : :
1549 : : /* Cleanup. */
1550 : 436 : dsa_detach(area);
1551 : 436 : FreeQueryDesc(queryDesc);
1552 : 436 : receiver->rDestroy(receiver);
1553 : 436 : }
|