LCOV - code coverage report
Current view: top level - src/backend/executor - nodeAppend.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 57.8 % 460 266
Test Date: 2026-01-26 10:56:24 Functions: 61.1 % 18 11
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 44.6 % 260 116

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * nodeAppend.c
       4                 :             :  *        routines to handle append nodes.
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  *
      10                 :             :  * IDENTIFICATION
      11                 :             :  *        src/backend/executor/nodeAppend.c
      12                 :             :  *
      13                 :             :  *-------------------------------------------------------------------------
      14                 :             :  */
      15                 :             : /* INTERFACE ROUTINES
      16                 :             :  *              ExecInitAppend  - initialize the append node
      17                 :             :  *              ExecAppend              - retrieve the next tuple from the node
      18                 :             :  *              ExecEndAppend   - shut down the append node
      19                 :             :  *              ExecReScanAppend - rescan the append node
      20                 :             :  *
      21                 :             :  *       NOTES
      22                 :             :  *              Each append node contains a list of one or more subplans which
      23                 :             :  *              must be iteratively processed (forwards or backwards).
      24                 :             :  *              Tuples are retrieved by executing the 'whichplan'th subplan
      25                 :             :  *              until the subplan stops returning tuples, at which point that
      26                 :             :  *              plan is shut down and the next started up.
      27                 :             :  *
      28                 :             :  *              Append nodes don't make use of their left and right
      29                 :             :  *              subtrees, rather they maintain a list of subplans so
      30                 :             :  *              a typical append node looks like this in the plan tree:
      31                 :             :  *
      32                 :             :  *                                 ...
      33                 :             :  *                                 /
      34                 :             :  *                              Append -------+------+------+--- nil
      35                 :             :  *                              /       \                 |              |              |
      36                 :             :  *                        nil   nil              ...    ...    ...
      37                 :             :  *                                                               subplans
      38                 :             :  *
      39                 :             :  *              Append nodes are currently used for unions, and to support
      40                 :             :  *              inheritance queries, where several relations need to be scanned.
      41                 :             :  *              For example, in our standard person/student/employee/student-emp
      42                 :             :  *              example, where student and employee inherit from person
      43                 :             :  *              and student-emp inherits from student and employee, the
      44                 :             :  *              query:
      45                 :             :  *
      46                 :             :  *                              select name from person
      47                 :             :  *
      48                 :             :  *              generates the plan:
      49                 :             :  *
      50                 :             :  *                                |
      51                 :             :  *                              Append -------+-------+--------+--------+
      52                 :             :  *                              /       \                 |               |                |            |
      53                 :             :  *                        nil   nil              Scan    Scan     Scan     Scan
      54                 :             :  *                                                        |               |                |            |
      55                 :             :  *                                                      person employee student student-emp
      56                 :             :  */
      57                 :             : 
      58                 :             : #include "postgres.h"
      59                 :             : 
      60                 :             : #include "executor/execAsync.h"
      61                 :             : #include "executor/execPartition.h"
      62                 :             : #include "executor/executor.h"
      63                 :             : #include "executor/nodeAppend.h"
      64                 :             : #include "miscadmin.h"
      65                 :             : #include "pgstat.h"
      66                 :             : #include "storage/latch.h"
      67                 :             : 
      68                 :             : /* Shared state for parallel-aware Append. */
      69                 :             : struct ParallelAppendState
      70                 :             : {
      71                 :             :         LWLock          pa_lock;                /* mutual exclusion to choose next subplan */
      72                 :             :         int                     pa_next_plan;   /* next plan to choose by any worker */
      73                 :             : 
      74                 :             :         /*
      75                 :             :          * pa_finished[i] should be true if no more workers should select subplan
      76                 :             :          * i.  for a non-partial plan, this should be set to true as soon as a
      77                 :             :          * worker selects the plan; for a partial plan, it remains false until
      78                 :             :          * some worker executes the plan to completion.
      79                 :             :          */
      80                 :             :         bool            pa_finished[FLEXIBLE_ARRAY_MEMBER];
      81                 :             : };
      82                 :             : 
      83                 :             : #define INVALID_SUBPLAN_INDEX           -1
      84                 :             : #define EVENT_BUFFER_SIZE                       16
      85                 :             : 
      86                 :             : static TupleTableSlot *ExecAppend(PlanState *pstate);
      87                 :             : static bool choose_next_subplan_locally(AppendState *node);
      88                 :             : static bool choose_next_subplan_for_leader(AppendState *node);
      89                 :             : static bool choose_next_subplan_for_worker(AppendState *node);
      90                 :             : static void mark_invalid_subplans_as_finished(AppendState *node);
      91                 :             : static void ExecAppendAsyncBegin(AppendState *node);
      92                 :             : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
      93                 :             : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
      94                 :             : static void ExecAppendAsyncEventWait(AppendState *node);
      95                 :             : static void classify_matching_subplans(AppendState *node);
      96                 :             : 
      97                 :             : /* ----------------------------------------------------------------
      98                 :             :  *              ExecInitAppend
      99                 :             :  *
     100                 :             :  *              Begin all of the subscans of the append node.
     101                 :             :  *
     102                 :             :  *         (This is potentially wasteful, since the entire result of the
     103                 :             :  *              append node may not be scanned, but this way all of the
     104                 :             :  *              structures get allocated in the executor's top level memory
     105                 :             :  *              block instead of that of the call to ExecAppend.)
     106                 :             :  * ----------------------------------------------------------------
     107                 :             :  */
     108                 :             : AppendState *
     109                 :        2577 : ExecInitAppend(Append *node, EState *estate, int eflags)
     110                 :             : {
     111                 :        2577 :         AppendState *appendstate = makeNode(AppendState);
     112                 :        2577 :         PlanState **appendplanstates;
     113                 :        2577 :         const TupleTableSlotOps *appendops;
     114                 :        2577 :         Bitmapset  *validsubplans;
     115                 :        2577 :         Bitmapset  *asyncplans;
     116                 :        2577 :         int                     nplans;
     117                 :        2577 :         int                     nasyncplans;
     118                 :        2577 :         int                     firstvalid;
     119                 :        2577 :         int                     i,
     120                 :             :                                 j;
     121                 :             : 
     122                 :             :         /* check for unsupported flags */
     123         [ +  - ]:        2577 :         Assert(!(eflags & EXEC_FLAG_MARK));
     124                 :             : 
     125                 :             :         /*
     126                 :             :          * create new AppendState for our append node
     127                 :             :          */
     128                 :        2577 :         appendstate->ps.plan = (Plan *) node;
     129                 :        2577 :         appendstate->ps.state = estate;
     130                 :        2577 :         appendstate->ps.ExecProcNode = ExecAppend;
     131                 :             : 
     132                 :             :         /* Let choose_next_subplan_* function handle setting the first subplan */
     133                 :        2577 :         appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
     134                 :        2577 :         appendstate->as_syncdone = false;
     135                 :        2577 :         appendstate->as_begun = false;
     136                 :             : 
     137                 :             :         /* If run-time partition pruning is enabled, then set that up now */
     138         [ +  + ]:        2577 :         if (node->part_prune_index >= 0)
     139                 :             :         {
     140                 :         120 :                 PartitionPruneState *prunestate;
     141                 :             : 
     142                 :             :                 /*
     143                 :             :                  * Set up pruning data structure.  This also initializes the set of
     144                 :             :                  * subplans to initialize (validsubplans) by taking into account the
     145                 :             :                  * result of performing initial pruning if any.
     146                 :             :                  */
     147                 :         240 :                 prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
     148                 :         120 :                                                                                                   list_length(node->appendplans),
     149                 :         120 :                                                                                                   node->part_prune_index,
     150                 :         120 :                                                                                                   node->apprelids,
     151                 :             :                                                                                                   &validsubplans);
     152                 :         120 :                 appendstate->as_prune_state = prunestate;
     153                 :         120 :                 nplans = bms_num_members(validsubplans);
     154                 :             : 
     155                 :             :                 /*
     156                 :             :                  * When no run-time pruning is required and there's at least one
     157                 :             :                  * subplan, we can fill as_valid_subplans immediately, preventing
     158                 :             :                  * later calls to ExecFindMatchingSubPlans.
     159                 :             :                  */
     160   [ +  +  +  + ]:         120 :                 if (!prunestate->do_exec_prune && nplans > 0)
     161                 :             :                 {
     162                 :          43 :                         appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
     163                 :          43 :                         appendstate->as_valid_subplans_identified = true;
     164                 :          43 :                 }
     165                 :         120 :         }
     166                 :             :         else
     167                 :             :         {
     168                 :        2457 :                 nplans = list_length(node->appendplans);
     169                 :             : 
     170                 :             :                 /*
     171                 :             :                  * When run-time partition pruning is not enabled we can just mark all
     172                 :             :                  * subplans as valid; they must also all be initialized.
     173                 :             :                  */
     174         [ +  - ]:        2457 :                 Assert(nplans > 0);
     175                 :        2457 :                 appendstate->as_valid_subplans = validsubplans =
     176                 :        2457 :                         bms_add_range(NULL, 0, nplans - 1);
     177                 :        2457 :                 appendstate->as_valid_subplans_identified = true;
     178                 :        2457 :                 appendstate->as_prune_state = NULL;
     179                 :             :         }
     180                 :             : 
     181                 :        2577 :         appendplanstates = (PlanState **) palloc(nplans *
     182                 :             :                                                                                          sizeof(PlanState *));
     183                 :             : 
     184                 :             :         /*
     185                 :             :          * call ExecInitNode on each of the valid plans to be executed and save
     186                 :             :          * the results into the appendplanstates array.
     187                 :             :          *
     188                 :             :          * While at it, find out the first valid partial plan.
     189                 :             :          */
     190                 :        2577 :         j = 0;
     191                 :        2577 :         asyncplans = NULL;
     192                 :        2577 :         nasyncplans = 0;
     193                 :        2577 :         firstvalid = nplans;
     194                 :        2577 :         i = -1;
     195         [ +  + ]:        9857 :         while ((i = bms_next_member(validsubplans, i)) >= 0)
     196                 :             :         {
     197                 :        7280 :                 Plan       *initNode = (Plan *) list_nth(node->appendplans, i);
     198                 :             : 
     199                 :             :                 /*
     200                 :             :                  * Record async subplans.  When executing EvalPlanQual, we treat them
     201                 :             :                  * as sync ones; don't do this when initializing an EvalPlanQual plan
     202                 :             :                  * tree.
     203                 :             :                  */
     204   [ -  +  #  # ]:        7280 :                 if (initNode->async_capable && estate->es_epq_active == NULL)
     205                 :             :                 {
     206                 :           0 :                         asyncplans = bms_add_member(asyncplans, j);
     207                 :           0 :                         nasyncplans++;
     208                 :           0 :                 }
     209                 :             : 
     210                 :             :                 /*
     211                 :             :                  * Record the lowest appendplans index which is a valid partial plan.
     212                 :             :                  */
     213   [ +  +  +  + ]:        7280 :                 if (i >= node->first_partial_plan && j < firstvalid)
     214                 :          75 :                         firstvalid = j;
     215                 :             : 
     216                 :        7280 :                 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
     217                 :        7280 :         }
     218                 :             : 
     219                 :        2577 :         appendstate->as_first_partial_plan = firstvalid;
     220                 :        2577 :         appendstate->appendplans = appendplanstates;
     221                 :        2577 :         appendstate->as_nplans = nplans;
     222                 :             : 
     223                 :             :         /*
     224                 :             :          * Initialize Append's result tuple type and slot.  If the child plans all
     225                 :             :          * produce the same fixed slot type, we can use that slot type; otherwise
     226                 :             :          * make a virtual slot.  (Note that the result slot itself is used only to
     227                 :             :          * return a null tuple at end of execution; real tuples are returned to
     228                 :             :          * the caller in the children's own result slots.  What we are doing here
     229                 :             :          * is allowing the parent plan node to optimize if the Append will return
     230                 :             :          * only one kind of slot.)
     231                 :             :          */
     232                 :        2577 :         appendops = ExecGetCommonSlotOps(appendplanstates, j);
     233         [ +  + ]:        2577 :         if (appendops != NULL)
     234                 :             :         {
     235                 :        2468 :                 ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
     236                 :        2468 :         }
     237                 :             :         else
     238                 :             :         {
     239                 :         109 :                 ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
     240                 :             :                 /* show that the output slot type is not fixed */
     241                 :         109 :                 appendstate->ps.resultopsset = true;
     242                 :         109 :                 appendstate->ps.resultopsfixed = false;
     243                 :             :         }
     244                 :             : 
     245                 :             :         /* Initialize async state */
     246                 :        2577 :         appendstate->as_asyncplans = asyncplans;
     247                 :        2577 :         appendstate->as_nasyncplans = nasyncplans;
     248                 :        2577 :         appendstate->as_asyncrequests = NULL;
     249                 :        2577 :         appendstate->as_asyncresults = NULL;
     250                 :        2577 :         appendstate->as_nasyncresults = 0;
     251                 :        2577 :         appendstate->as_nasyncremain = 0;
     252                 :        2577 :         appendstate->as_needrequest = NULL;
     253                 :        2577 :         appendstate->as_eventset = NULL;
     254                 :        2577 :         appendstate->as_valid_asyncplans = NULL;
     255                 :             : 
     256         [ +  - ]:        2577 :         if (nasyncplans > 0)
     257                 :             :         {
     258                 :           0 :                 appendstate->as_asyncrequests = (AsyncRequest **)
     259                 :           0 :                         palloc0(nplans * sizeof(AsyncRequest *));
     260                 :             : 
     261                 :           0 :                 i = -1;
     262         [ #  # ]:           0 :                 while ((i = bms_next_member(asyncplans, i)) >= 0)
     263                 :             :                 {
     264                 :           0 :                         AsyncRequest *areq;
     265                 :             : 
     266                 :           0 :                         areq = palloc_object(AsyncRequest);
     267                 :           0 :                         areq->requestor = (PlanState *) appendstate;
     268                 :           0 :                         areq->requestee = appendplanstates[i];
     269                 :           0 :                         areq->request_index = i;
     270                 :           0 :                         areq->callback_pending = false;
     271                 :           0 :                         areq->request_complete = false;
     272                 :           0 :                         areq->result = NULL;
     273                 :             : 
     274                 :           0 :                         appendstate->as_asyncrequests[i] = areq;
     275                 :           0 :                 }
     276                 :             : 
     277                 :           0 :                 appendstate->as_asyncresults = (TupleTableSlot **)
     278                 :           0 :                         palloc0(nasyncplans * sizeof(TupleTableSlot *));
     279                 :             : 
     280         [ #  # ]:           0 :                 if (appendstate->as_valid_subplans_identified)
     281                 :           0 :                         classify_matching_subplans(appendstate);
     282                 :           0 :         }
     283                 :             : 
     284                 :             :         /*
     285                 :             :          * Miscellaneous initialization
     286                 :             :          */
     287                 :             : 
     288                 :        2577 :         appendstate->ps.ps_ProjInfo = NULL;
     289                 :             : 
     290                 :             :         /* For parallel query, this will be overridden later. */
     291                 :        2577 :         appendstate->choose_next_subplan = choose_next_subplan_locally;
     292                 :             : 
     293                 :        5154 :         return appendstate;
     294                 :        2577 : }
     295                 :             : 
     296                 :             : /* ----------------------------------------------------------------
     297                 :             :  *         ExecAppend
     298                 :             :  *
     299                 :             :  *              Handles iteration over multiple subplans.
     300                 :             :  * ----------------------------------------------------------------
     301                 :             :  */
     302                 :             : static TupleTableSlot *
     303                 :      298512 : ExecAppend(PlanState *pstate)
     304                 :             : {
     305                 :      298512 :         AppendState *node = castNode(AppendState, pstate);
     306                 :      298512 :         TupleTableSlot *result;
     307                 :             : 
     308                 :             :         /*
     309                 :             :          * If this is the first call after Init or ReScan, we need to do the
     310                 :             :          * initialization work.
     311                 :             :          */
     312         [ +  + ]:      298512 :         if (!node->as_begun)
     313                 :             :         {
     314         [ +  - ]:        6420 :                 Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
     315         [ +  - ]:        6420 :                 Assert(!node->as_syncdone);
     316                 :             : 
     317                 :             :                 /* Nothing to do if there are no subplans */
     318         [ +  + ]:        6420 :                 if (node->as_nplans == 0)
     319                 :          10 :                         return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     320                 :             : 
     321                 :             :                 /* If there are any async subplans, begin executing them. */
     322         [ +  - ]:        6410 :                 if (node->as_nasyncplans > 0)
     323                 :           0 :                         ExecAppendAsyncBegin(node);
     324                 :             : 
     325                 :             :                 /*
     326                 :             :                  * If no sync subplan has been chosen, we must choose one before
     327                 :             :                  * proceeding.
     328                 :             :                  */
     329   [ +  +  -  + ]:        6410 :                 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
     330                 :         537 :                         return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     331                 :             : 
     332   [ -  +  +  - ]:        5873 :                 Assert(node->as_syncdone ||
     333                 :             :                            (node->as_whichplan >= 0 &&
     334                 :             :                                 node->as_whichplan < node->as_nplans));
     335                 :             : 
     336                 :             :                 /* And we're initialized. */
     337                 :        5873 :                 node->as_begun = true;
     338                 :        5873 :         }
     339                 :             : 
     340                 :      305066 :         for (;;)
     341                 :             :         {
     342                 :      305066 :                 PlanState  *subnode;
     343                 :             : 
     344         [ +  + ]:      305066 :                 CHECK_FOR_INTERRUPTS();
     345                 :             : 
     346                 :             :                 /*
     347                 :             :                  * try to get a tuple from an async subplan if any
     348                 :             :                  */
     349   [ +  +  +  + ]:      305066 :                 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
     350                 :             :                 {
     351         [ #  # ]:          16 :                         if (ExecAppendAsyncGetNext(node, &result))
     352                 :           0 :                                 return result;
     353         [ #  # ]:           0 :                         Assert(!node->as_syncdone);
     354         [ #  # ]:           0 :                         Assert(bms_is_empty(node->as_needrequest));
     355                 :           0 :                 }
     356                 :             : 
     357                 :             :                 /*
     358                 :             :                  * figure out which sync subplan we are currently processing
     359                 :             :                  */
     360         [ +  - ]:      305050 :                 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
     361                 :      305050 :                 subnode = node->appendplans[node->as_whichplan];
     362                 :             : 
     363                 :             :                 /*
     364                 :             :                  * get a tuple from the subplan
     365                 :             :                  */
     366                 :      305050 :                 result = ExecProcNode(subnode);
     367                 :             : 
     368   [ +  +  +  + ]:      305050 :                 if (!TupIsNull(result))
     369                 :             :                 {
     370                 :             :                         /*
     371                 :             :                          * If the subplan gave us something then return it as-is. We do
     372                 :             :                          * NOT make use of the result slot that was set up in
     373                 :             :                          * ExecInitAppend; there's no need for it.
     374                 :             :                          */
     375                 :      292175 :                         return result;
     376                 :             :                 }
     377                 :             : 
     378                 :             :                 /*
     379                 :             :                  * wait or poll for async events if any. We do this before checking
     380                 :             :                  * for the end of iteration, because it might drain the remaining
     381                 :             :                  * async subplans.
     382                 :             :                  */
     383         [ +  - ]:       12875 :                 if (node->as_nasyncremain > 0)
     384                 :           0 :                         ExecAppendAsyncEventWait(node);
     385                 :             : 
     386                 :             :                 /* choose new sync subplan; if no sync/async subplans, we're done */
     387   [ +  +  -  + ]:       12875 :                 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
     388                 :        5782 :                         return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     389         [ +  + ]:      305050 :         }
     390                 :      298496 : }
     391                 :             : 
     392                 :             : /* ----------------------------------------------------------------
     393                 :             :  *              ExecEndAppend
     394                 :             :  *
     395                 :             :  *              Shuts down the subscans of the append node.
     396                 :             :  *
     397                 :             :  *              Returns nothing of interest.
     398                 :             :  * ----------------------------------------------------------------
     399                 :             :  */
     400                 :             : void
     401                 :        2534 : ExecEndAppend(AppendState *node)
     402                 :             : {
     403                 :        2534 :         PlanState **appendplans;
     404                 :        2534 :         int                     nplans;
     405                 :        2534 :         int                     i;
     406                 :             : 
     407                 :             :         /*
     408                 :             :          * get information from the node
     409                 :             :          */
     410                 :        2534 :         appendplans = node->appendplans;
     411                 :        2534 :         nplans = node->as_nplans;
     412                 :             : 
     413                 :             :         /*
     414                 :             :          * shut down each of the subscans
     415                 :             :          */
     416         [ +  + ]:        9710 :         for (i = 0; i < nplans; i++)
     417                 :        7176 :                 ExecEndNode(appendplans[i]);
     418                 :        2534 : }
     419                 :             : 
     420                 :             : void
     421                 :        4959 : ExecReScanAppend(AppendState *node)
     422                 :             : {
     423                 :        4959 :         int                     nasyncplans = node->as_nasyncplans;
     424                 :        4959 :         int                     i;
     425                 :             : 
     426                 :             :         /*
     427                 :             :          * If any PARAM_EXEC Params used in pruning expressions have changed, then
     428                 :             :          * we'd better unset the valid subplans so that they are reselected for
     429                 :             :          * the new parameter values.
     430                 :             :          */
     431   [ +  +  -  + ]:        4959 :         if (node->as_prune_state &&
     432                 :        1088 :                 bms_overlap(node->ps.chgParam,
     433                 :         544 :                                         node->as_prune_state->execparamids))
     434                 :             :         {
     435                 :         544 :                 node->as_valid_subplans_identified = false;
     436                 :         544 :                 bms_free(node->as_valid_subplans);
     437                 :         544 :                 node->as_valid_subplans = NULL;
     438                 :         544 :                 bms_free(node->as_valid_asyncplans);
     439                 :         544 :                 node->as_valid_asyncplans = NULL;
     440                 :         544 :         }
     441                 :             : 
     442         [ +  + ]:       18586 :         for (i = 0; i < node->as_nplans; i++)
     443                 :             :         {
     444                 :       13627 :                 PlanState  *subnode = node->appendplans[i];
     445                 :             : 
     446                 :             :                 /*
     447                 :             :                  * ExecReScan doesn't know about my subplans, so I have to do
     448                 :             :                  * changed-parameter signaling myself.
     449                 :             :                  */
     450         [ +  + ]:       13627 :                 if (node->ps.chgParam != NULL)
     451                 :        5488 :                         UpdateChangedParamSet(subnode, node->ps.chgParam);
     452                 :             : 
     453                 :             :                 /*
     454                 :             :                  * If chgParam of subnode is not null then plan will be re-scanned by
     455                 :             :                  * first ExecProcNode or by first ExecAsyncRequest.
     456                 :             :                  */
     457         [ +  + ]:       13627 :                 if (subnode->chgParam == NULL)
     458                 :        8278 :                         ExecReScan(subnode);
     459                 :       13627 :         }
     460                 :             : 
     461                 :             :         /* Reset async state */
     462         [ +  - ]:        4959 :         if (nasyncplans > 0)
     463                 :             :         {
     464                 :           0 :                 i = -1;
     465         [ #  # ]:           0 :                 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
     466                 :             :                 {
     467                 :           0 :                         AsyncRequest *areq = node->as_asyncrequests[i];
     468                 :             : 
     469                 :           0 :                         areq->callback_pending = false;
     470                 :           0 :                         areq->request_complete = false;
     471                 :           0 :                         areq->result = NULL;
     472                 :           0 :                 }
     473                 :             : 
     474                 :           0 :                 node->as_nasyncresults = 0;
     475                 :           0 :                 node->as_nasyncremain = 0;
     476                 :           0 :                 bms_free(node->as_needrequest);
     477                 :           0 :                 node->as_needrequest = NULL;
     478                 :           0 :         }
     479                 :             : 
     480                 :             :         /* Let choose_next_subplan_* function handle setting the first subplan */
     481                 :        4959 :         node->as_whichplan = INVALID_SUBPLAN_INDEX;
     482                 :        4959 :         node->as_syncdone = false;
     483                 :        4959 :         node->as_begun = false;
     484                 :        4959 : }
     485                 :             : 
     486                 :             : /* ----------------------------------------------------------------
     487                 :             :  *                                              Parallel Append Support
     488                 :             :  * ----------------------------------------------------------------
     489                 :             :  */
     490                 :             : 
     491                 :             : /* ----------------------------------------------------------------
     492                 :             :  *              ExecAppendEstimate
     493                 :             :  *
     494                 :             :  *              Compute the amount of space we'll need in the parallel
     495                 :             :  *              query DSM, and inform pcxt->estimator about our needs.
     496                 :             :  * ----------------------------------------------------------------
     497                 :             :  */
     498                 :             : void
     499                 :          23 : ExecAppendEstimate(AppendState *node,
     500                 :             :                                    ParallelContext *pcxt)
     501                 :             : {
     502                 :          23 :         node->pstate_len =
     503                 :          23 :                 add_size(offsetof(ParallelAppendState, pa_finished),
     504                 :          23 :                                  sizeof(bool) * node->as_nplans);
     505                 :             : 
     506                 :          23 :         shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
     507                 :          23 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     508                 :          23 : }
     509                 :             : 
     510                 :             : 
     511                 :             : /* ----------------------------------------------------------------
     512                 :             :  *              ExecAppendInitializeDSM
     513                 :             :  *
     514                 :             :  *              Set up shared state for Parallel Append.
     515                 :             :  * ----------------------------------------------------------------
     516                 :             :  */
     517                 :             : void
     518                 :          23 : ExecAppendInitializeDSM(AppendState *node,
     519                 :             :                                                 ParallelContext *pcxt)
     520                 :             : {
     521                 :          23 :         ParallelAppendState *pstate;
     522                 :             : 
     523                 :          23 :         pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
     524                 :          23 :         memset(pstate, 0, node->pstate_len);
     525                 :          23 :         LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
     526                 :          23 :         shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
     527                 :             : 
     528                 :          23 :         node->as_pstate = pstate;
     529                 :          23 :         node->choose_next_subplan = choose_next_subplan_for_leader;
     530                 :          23 : }
     531                 :             : 
     532                 :             : /* ----------------------------------------------------------------
     533                 :             :  *              ExecAppendReInitializeDSM
     534                 :             :  *
     535                 :             :  *              Reset shared state before beginning a fresh scan.
     536                 :             :  * ----------------------------------------------------------------
     537                 :             :  */
     538                 :             : void
     539                 :           0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
     540                 :             : {
     541                 :           0 :         ParallelAppendState *pstate = node->as_pstate;
     542                 :             : 
     543                 :           0 :         pstate->pa_next_plan = 0;
     544                 :           0 :         memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
     545                 :           0 : }
     546                 :             : 
     547                 :             : /* ----------------------------------------------------------------
     548                 :             :  *              ExecAppendInitializeWorker
     549                 :             :  *
     550                 :             :  *              Copy relevant information from TOC into planstate, and initialize
     551                 :             :  *              whatever is required to choose and execute the optimal subplan.
     552                 :             :  * ----------------------------------------------------------------
     553                 :             :  */
     554                 :             : void
     555                 :          53 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
     556                 :             : {
     557                 :          53 :         node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
     558                 :          53 :         node->choose_next_subplan = choose_next_subplan_for_worker;
     559                 :          53 : }
     560                 :             : 
     561                 :             : /* ----------------------------------------------------------------
     562                 :             :  *              choose_next_subplan_locally
     563                 :             :  *
     564                 :             :  *              Choose next sync subplan for a non-parallel-aware Append,
     565                 :             :  *              returning false if there are no more.
     566                 :             :  * ----------------------------------------------------------------
     567                 :             :  */
     568                 :             : static bool
     569                 :       19133 : choose_next_subplan_locally(AppendState *node)
     570                 :             : {
     571                 :       19133 :         int                     whichplan = node->as_whichplan;
     572                 :       19133 :         int                     nextplan;
     573                 :             : 
     574                 :             :         /* We should never be called when there are no subplans */
     575         [ +  - ]:       19133 :         Assert(node->as_nplans > 0);
     576                 :             : 
     577                 :             :         /* Nothing to do if syncdone */
     578         [ -  + ]:       19133 :         if (node->as_syncdone)
     579                 :           0 :                 return false;
     580                 :             : 
     581                 :             :         /*
     582                 :             :          * If first call then have the bms member function choose the first valid
     583                 :             :          * sync subplan by initializing whichplan to -1.  If there happen to be no
     584                 :             :          * valid sync subplans then the bms member function will handle that by
     585                 :             :          * returning a negative number which will allow us to exit returning a
     586                 :             :          * false value.
     587                 :             :          */
     588         [ +  + ]:       19133 :         if (whichplan == INVALID_SUBPLAN_INDEX)
     589                 :             :         {
     590         [ -  + ]:        6343 :                 if (node->as_nasyncplans > 0)
     591                 :             :                 {
     592                 :             :                         /* We'd have filled as_valid_subplans already */
     593         [ #  # ]:           0 :                         Assert(node->as_valid_subplans_identified);
     594                 :           0 :                 }
     595         [ +  + ]:        6343 :                 else if (!node->as_valid_subplans_identified)
     596                 :             :                 {
     597                 :         562 :                         node->as_valid_subplans =
     598                 :         562 :                                 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     599                 :         562 :                         node->as_valid_subplans_identified = true;
     600                 :         562 :                 }
     601                 :             : 
     602                 :        6343 :                 whichplan = -1;
     603                 :        6343 :         }
     604                 :             : 
     605                 :             :         /* Ensure whichplan is within the expected range */
     606         [ +  - ]:       19133 :         Assert(whichplan >= -1 && whichplan <= node->as_nplans);
     607                 :             : 
     608         [ +  + ]:       19133 :         if (ScanDirectionIsForward(node->ps.state->es_direction))
     609                 :       19130 :                 nextplan = bms_next_member(node->as_valid_subplans, whichplan);
     610                 :             :         else
     611                 :           3 :                 nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
     612                 :             : 
     613         [ +  + ]:       19133 :         if (nextplan < 0)
     614                 :             :         {
     615                 :             :                 /* Set as_syncdone if in async mode */
     616         [ +  - ]:        6252 :                 if (node->as_nasyncplans > 0)
     617                 :           0 :                         node->as_syncdone = true;
     618                 :        6252 :                 return false;
     619                 :             :         }
     620                 :             : 
     621                 :       12881 :         node->as_whichplan = nextplan;
     622                 :             : 
     623                 :       12881 :         return true;
     624                 :       19133 : }
     625                 :             : 
     626                 :             : /* ----------------------------------------------------------------
     627                 :             :  *              choose_next_subplan_for_leader
     628                 :             :  *
     629                 :             :  *      Try to pick a plan which doesn't commit us to doing much
     630                 :             :  *      work locally, so that as much work as possible is done in
     631                 :             :  *      the workers.  Cheapest subplans are at the end.
     632                 :             :  * ----------------------------------------------------------------
     633                 :             :  */
     634                 :             : static bool
     635                 :          83 : choose_next_subplan_for_leader(AppendState *node)
     636                 :             : {
     637                 :          83 :         ParallelAppendState *pstate = node->as_pstate;
     638                 :             : 
     639                 :             :         /* Backward scan is not supported by parallel-aware plans */
     640         [ +  - ]:          83 :         Assert(ScanDirectionIsForward(node->ps.state->es_direction));
     641                 :             : 
     642                 :             :         /* We should never be called when there are no subplans */
     643         [ +  - ]:          83 :         Assert(node->as_nplans > 0);
     644                 :             : 
     645                 :          83 :         LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
     646                 :             : 
     647         [ +  + ]:          83 :         if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
     648                 :             :         {
     649                 :             :                 /* Mark just-completed subplan as finished. */
     650                 :          63 :                 node->as_pstate->pa_finished[node->as_whichplan] = true;
     651                 :          63 :         }
     652                 :             :         else
     653                 :             :         {
     654                 :             :                 /* Start with last subplan. */
     655                 :          20 :                 node->as_whichplan = node->as_nplans - 1;
     656                 :             : 
     657                 :             :                 /*
     658                 :             :                  * If we've yet to determine the valid subplans then do so now.  If
     659                 :             :                  * run-time pruning is disabled then the valid subplans will always be
     660                 :             :                  * set to all subplans.
     661                 :             :                  */
     662         [ +  + ]:          20 :                 if (!node->as_valid_subplans_identified)
     663                 :             :                 {
     664                 :           4 :                         node->as_valid_subplans =
     665                 :           4 :                                 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     666                 :           4 :                         node->as_valid_subplans_identified = true;
     667                 :             : 
     668                 :             :                         /*
     669                 :             :                          * Mark each invalid plan as finished to allow the loop below to
     670                 :             :                          * select the first valid subplan.
     671                 :             :                          */
     672                 :           4 :                         mark_invalid_subplans_as_finished(node);
     673                 :           4 :                 }
     674                 :             :         }
     675                 :             : 
     676                 :             :         /* Loop until we find a subplan to execute. */
     677         [ +  + ]:         133 :         while (pstate->pa_finished[node->as_whichplan])
     678                 :             :         {
     679         [ +  + ]:          70 :                 if (node->as_whichplan == 0)
     680                 :             :                 {
     681                 :          20 :                         pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
     682                 :          20 :                         node->as_whichplan = INVALID_SUBPLAN_INDEX;
     683                 :          20 :                         LWLockRelease(&pstate->pa_lock);
     684                 :          20 :                         return false;
     685                 :             :                 }
     686                 :             : 
     687                 :             :                 /*
     688                 :             :                  * We needn't pay attention to as_valid_subplans here as all invalid
     689                 :             :                  * plans have been marked as finished.
     690                 :             :                  */
     691                 :          50 :                 node->as_whichplan--;
     692                 :             :         }
     693                 :             : 
     694                 :             :         /* If non-partial, immediately mark as finished. */
     695         [ +  + ]:          63 :         if (node->as_whichplan < node->as_first_partial_plan)
     696                 :          21 :                 node->as_pstate->pa_finished[node->as_whichplan] = true;
     697                 :             : 
     698                 :          63 :         LWLockRelease(&pstate->pa_lock);
     699                 :             : 
     700                 :          63 :         return true;
     701                 :          83 : }
     702                 :             : 
     703                 :             : /* ----------------------------------------------------------------
     704                 :             :  *              choose_next_subplan_for_worker
     705                 :             :  *
     706                 :             :  *              Choose next subplan for a parallel-aware Append, returning
     707                 :             :  *              false if there are no more.
     708                 :             :  *
     709                 :             :  *              We start from the first plan and advance through the list;
     710                 :             :  *              when we get back to the end, we loop back to the first
     711                 :             :  *              partial plan.  This assigns the non-partial plans first in
     712                 :             :  *              order of descending cost and then spreads out the workers
     713                 :             :  *              as evenly as possible across the remaining partial plans.
     714                 :             :  * ----------------------------------------------------------------
     715                 :             :  */
     716                 :             : static bool
     717                 :          69 : choose_next_subplan_for_worker(AppendState *node)
     718                 :             : {
     719                 :          69 :         ParallelAppendState *pstate = node->as_pstate;
     720                 :             : 
     721                 :             :         /* Backward scan is not supported by parallel-aware plans */
     722         [ +  - ]:          69 :         Assert(ScanDirectionIsForward(node->ps.state->es_direction));
     723                 :             : 
     724                 :             :         /* We should never be called when there are no subplans */
     725         [ +  - ]:          69 :         Assert(node->as_nplans > 0);
     726                 :             : 
     727                 :          69 :         LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
     728                 :             : 
     729                 :             :         /* Mark just-completed subplan as finished. */
     730         [ +  + ]:          69 :         if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
     731                 :          22 :                 node->as_pstate->pa_finished[node->as_whichplan] = true;
     732                 :             : 
     733                 :             :         /*
     734                 :             :          * If we've yet to determine the valid subplans then do so now.  If
     735                 :             :          * run-time pruning is disabled then the valid subplans will always be set
     736                 :             :          * to all subplans.
     737                 :             :          */
     738         [ +  + ]:          47 :         else if (!node->as_valid_subplans_identified)
     739                 :             :         {
     740                 :           4 :                 node->as_valid_subplans =
     741                 :           4 :                         ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     742                 :           4 :                 node->as_valid_subplans_identified = true;
     743                 :             : 
     744                 :           4 :                 mark_invalid_subplans_as_finished(node);
     745                 :           4 :         }
     746                 :             : 
     747                 :             :         /* If all the plans are already done, we have nothing to do */
     748         [ +  + ]:          69 :         if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
     749                 :             :         {
     750                 :          39 :                 LWLockRelease(&pstate->pa_lock);
     751                 :          39 :                 return false;
     752                 :             :         }
     753                 :             : 
     754                 :             :         /* Save the plan from which we are starting the search. */
     755                 :          30 :         node->as_whichplan = pstate->pa_next_plan;
     756                 :             : 
     757                 :             :         /* Loop until we find a valid subplan to execute. */
     758         [ +  + ]:          69 :         while (pstate->pa_finished[pstate->pa_next_plan])
     759                 :             :         {
     760                 :          47 :                 int                     nextplan;
     761                 :             : 
     762                 :          94 :                 nextplan = bms_next_member(node->as_valid_subplans,
     763                 :          47 :                                                                    pstate->pa_next_plan);
     764         [ +  + ]:          47 :                 if (nextplan >= 0)
     765                 :             :                 {
     766                 :             :                         /* Advance to the next valid plan. */
     767                 :          38 :                         pstate->pa_next_plan = nextplan;
     768                 :          38 :                 }
     769         [ +  + ]:           9 :                 else if (node->as_whichplan > node->as_first_partial_plan)
     770                 :             :                 {
     771                 :             :                         /*
     772                 :             :                          * Try looping back to the first valid partial plan, if there is
     773                 :             :                          * one.  If there isn't, arrange to bail out below.
     774                 :             :                          */
     775                 :           6 :                         nextplan = bms_next_member(node->as_valid_subplans,
     776                 :           3 :                                                                            node->as_first_partial_plan - 1);
     777                 :           3 :                         pstate->pa_next_plan =
     778         [ +  - ]:           3 :                                 nextplan < 0 ? node->as_whichplan : nextplan;
     779                 :           3 :                 }
     780                 :             :                 else
     781                 :             :                 {
     782                 :             :                         /*
     783                 :             :                          * At last plan, and either there are no partial plans or we've
     784                 :             :                          * tried them all.  Arrange to bail out.
     785                 :             :                          */
     786                 :           6 :                         pstate->pa_next_plan = node->as_whichplan;
     787                 :             :                 }
     788                 :             : 
     789         [ +  + ]:          47 :                 if (pstate->pa_next_plan == node->as_whichplan)
     790                 :             :                 {
     791                 :             :                         /* We've tried everything! */
     792                 :           8 :                         pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
     793                 :           8 :                         LWLockRelease(&pstate->pa_lock);
     794                 :           8 :                         return false;
     795                 :             :                 }
     796         [ +  + ]:          47 :         }
     797                 :             : 
     798                 :             :         /* Pick the plan we found, and advance pa_next_plan one more time. */
     799                 :          22 :         node->as_whichplan = pstate->pa_next_plan;
     800                 :          44 :         pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
     801                 :          22 :                                                                                    pstate->pa_next_plan);
     802                 :             : 
     803                 :             :         /*
     804                 :             :          * If there are no more valid plans then try setting the next plan to the
     805                 :             :          * first valid partial plan.
     806                 :             :          */
     807         [ +  + ]:          22 :         if (pstate->pa_next_plan < 0)
     808                 :             :         {
     809                 :          16 :                 int                     nextplan = bms_next_member(node->as_valid_subplans,
     810                 :           8 :                                                                                            node->as_first_partial_plan - 1);
     811                 :             : 
     812         [ +  - ]:           8 :                 if (nextplan >= 0)
     813                 :           8 :                         pstate->pa_next_plan = nextplan;
     814                 :             :                 else
     815                 :             :                 {
     816                 :             :                         /*
     817                 :             :                          * There are no valid partial plans, and we already chose the last
     818                 :             :                          * non-partial plan; so flag that there's nothing more for our
     819                 :             :                          * fellow workers to do.
     820                 :             :                          */
     821                 :           0 :                         pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
     822                 :             :                 }
     823                 :           8 :         }
     824                 :             : 
     825                 :             :         /* If non-partial, immediately mark as finished. */
     826         [ +  + ]:          22 :         if (node->as_whichplan < node->as_first_partial_plan)
     827                 :           2 :                 node->as_pstate->pa_finished[node->as_whichplan] = true;
     828                 :             : 
     829                 :          22 :         LWLockRelease(&pstate->pa_lock);
     830                 :             : 
     831                 :          22 :         return true;
     832                 :          69 : }
     833                 :             : 
     834                 :             : /*
     835                 :             :  * mark_invalid_subplans_as_finished
     836                 :             :  *              Marks the ParallelAppendState's pa_finished as true for each invalid
     837                 :             :  *              subplan.
     838                 :             :  *
     839                 :             :  * This function should only be called for parallel Append with run-time
     840                 :             :  * pruning enabled.
     841                 :             :  */
     842                 :             : static void
     843                 :           8 : mark_invalid_subplans_as_finished(AppendState *node)
     844                 :             : {
     845                 :           8 :         int                     i;
     846                 :             : 
     847                 :             :         /* Only valid to call this while in parallel Append mode */
     848         [ +  - ]:           8 :         Assert(node->as_pstate);
     849                 :             : 
     850                 :             :         /* Shouldn't have been called when run-time pruning is not enabled */
     851         [ +  - ]:           8 :         Assert(node->as_prune_state);
     852                 :             : 
     853                 :             :         /* Nothing to do if all plans are valid */
     854         [ -  + ]:           8 :         if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
     855                 :           0 :                 return;
     856                 :             : 
     857                 :             :         /* Mark all non-valid plans as finished */
     858         [ +  + ]:          27 :         for (i = 0; i < node->as_nplans; i++)
     859                 :             :         {
     860         [ +  + ]:          19 :                 if (!bms_is_member(i, node->as_valid_subplans))
     861                 :           8 :                         node->as_pstate->pa_finished[i] = true;
     862                 :          19 :         }
     863         [ -  + ]:           8 : }
     864                 :             : 
     865                 :             : /* ----------------------------------------------------------------
     866                 :             :  *                                              Asynchronous Append Support
     867                 :             :  * ----------------------------------------------------------------
     868                 :             :  */
     869                 :             : 
     870                 :             : /* ----------------------------------------------------------------
     871                 :             :  *              ExecAppendAsyncBegin
     872                 :             :  *
     873                 :             :  *              Begin executing designed async-capable subplans.
     874                 :             :  * ----------------------------------------------------------------
     875                 :             :  */
     876                 :             : static void
     877                 :           0 : ExecAppendAsyncBegin(AppendState *node)
     878                 :             : {
     879                 :           0 :         int                     i;
     880                 :             : 
     881                 :             :         /* Backward scan is not supported by async-aware Appends. */
     882         [ #  # ]:           0 :         Assert(ScanDirectionIsForward(node->ps.state->es_direction));
     883                 :             : 
     884                 :             :         /* We should never be called when there are no subplans */
     885         [ #  # ]:           0 :         Assert(node->as_nplans > 0);
     886                 :             : 
     887                 :             :         /* We should never be called when there are no async subplans. */
     888         [ #  # ]:           0 :         Assert(node->as_nasyncplans > 0);
     889                 :             : 
     890                 :             :         /* If we've yet to determine the valid subplans then do so now. */
     891         [ #  # ]:           0 :         if (!node->as_valid_subplans_identified)
     892                 :             :         {
     893                 :           0 :                 node->as_valid_subplans =
     894                 :           0 :                         ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     895                 :           0 :                 node->as_valid_subplans_identified = true;
     896                 :             : 
     897                 :           0 :                 classify_matching_subplans(node);
     898                 :           0 :         }
     899                 :             : 
     900                 :             :         /* Initialize state variables. */
     901                 :           0 :         node->as_syncdone = bms_is_empty(node->as_valid_subplans);
     902                 :           0 :         node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
     903                 :             : 
     904                 :             :         /* Nothing to do if there are no valid async subplans. */
     905         [ #  # ]:           0 :         if (node->as_nasyncremain == 0)
     906                 :           0 :                 return;
     907                 :             : 
     908                 :             :         /* Make a request for each of the valid async subplans. */
     909                 :           0 :         i = -1;
     910         [ #  # ]:           0 :         while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
     911                 :             :         {
     912                 :           0 :                 AsyncRequest *areq = node->as_asyncrequests[i];
     913                 :             : 
     914         [ #  # ]:           0 :                 Assert(areq->request_index == i);
     915         [ #  # ]:           0 :                 Assert(!areq->callback_pending);
     916                 :             : 
     917                 :             :                 /* Do the actual work. */
     918                 :           0 :                 ExecAsyncRequest(areq);
     919                 :           0 :         }
     920         [ #  # ]:           0 : }
     921                 :             : 
     922                 :             : /* ----------------------------------------------------------------
     923                 :             :  *              ExecAppendAsyncGetNext
     924                 :             :  *
     925                 :             :  *              Get the next tuple from any of the asynchronous subplans.
     926                 :             :  * ----------------------------------------------------------------
     927                 :             :  */
     928                 :             : static bool
     929                 :           0 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
     930                 :             : {
     931                 :           0 :         *result = NULL;
     932                 :             : 
     933                 :             :         /* We should never be called when there are no valid async subplans. */
     934         [ #  # ]:           0 :         Assert(node->as_nasyncremain > 0);
     935                 :             : 
     936                 :             :         /* Request a tuple asynchronously. */
     937         [ #  # ]:           0 :         if (ExecAppendAsyncRequest(node, result))
     938                 :           0 :                 return true;
     939                 :             : 
     940         [ #  # ]:           0 :         while (node->as_nasyncremain > 0)
     941                 :             :         {
     942         [ #  # ]:           0 :                 CHECK_FOR_INTERRUPTS();
     943                 :             : 
     944                 :             :                 /* Wait or poll for async events. */
     945                 :           0 :                 ExecAppendAsyncEventWait(node);
     946                 :             : 
     947                 :             :                 /* Request a tuple asynchronously. */
     948         [ #  # ]:           0 :                 if (ExecAppendAsyncRequest(node, result))
     949                 :           0 :                         return true;
     950                 :             : 
     951                 :             :                 /* Break from loop if there's any sync subplan that isn't complete. */
     952         [ #  # ]:           0 :                 if (!node->as_syncdone)
     953                 :           0 :                         break;
     954                 :             :         }
     955                 :             : 
     956                 :             :         /*
     957                 :             :          * If all sync subplans are complete, we're totally done scanning the
     958                 :             :          * given node.  Otherwise, we're done with the asynchronous stuff but must
     959                 :             :          * continue scanning the sync subplans.
     960                 :             :          */
     961         [ #  # ]:           0 :         if (node->as_syncdone)
     962                 :             :         {
     963         [ #  # ]:           0 :                 Assert(node->as_nasyncremain == 0);
     964                 :           0 :                 *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
     965                 :           0 :                 return true;
     966                 :             :         }
     967                 :             : 
     968                 :           0 :         return false;
     969                 :           0 : }
     970                 :             : 
     971                 :             : /* ----------------------------------------------------------------
     972                 :             :  *              ExecAppendAsyncRequest
     973                 :             :  *
     974                 :             :  *              Request a tuple asynchronously.
     975                 :             :  * ----------------------------------------------------------------
     976                 :             :  */
     977                 :             : static bool
     978                 :           0 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
     979                 :             : {
     980                 :           0 :         Bitmapset  *needrequest;
     981                 :           0 :         int                     i;
     982                 :             : 
     983                 :             :         /* Nothing to do if there are no async subplans needing a new request. */
     984         [ #  # ]:           0 :         if (bms_is_empty(node->as_needrequest))
     985                 :             :         {
     986         [ #  # ]:           0 :                 Assert(node->as_nasyncresults == 0);
     987                 :           0 :                 return false;
     988                 :             :         }
     989                 :             : 
     990                 :             :         /*
     991                 :             :          * If there are any asynchronously-generated results that have not yet
     992                 :             :          * been returned, we have nothing to do; just return one of them.
     993                 :             :          */
     994         [ #  # ]:           0 :         if (node->as_nasyncresults > 0)
     995                 :             :         {
     996                 :           0 :                 --node->as_nasyncresults;
     997                 :           0 :                 *result = node->as_asyncresults[node->as_nasyncresults];
     998                 :           0 :                 return true;
     999                 :             :         }
    1000                 :             : 
    1001                 :             :         /* Make a new request for each of the async subplans that need it. */
    1002                 :           0 :         needrequest = node->as_needrequest;
    1003                 :           0 :         node->as_needrequest = NULL;
    1004                 :           0 :         i = -1;
    1005         [ #  # ]:           0 :         while ((i = bms_next_member(needrequest, i)) >= 0)
    1006                 :             :         {
    1007                 :           0 :                 AsyncRequest *areq = node->as_asyncrequests[i];
    1008                 :             : 
    1009                 :             :                 /* Do the actual work. */
    1010                 :           0 :                 ExecAsyncRequest(areq);
    1011                 :           0 :         }
    1012                 :           0 :         bms_free(needrequest);
    1013                 :             : 
    1014                 :             :         /* Return one of the asynchronously-generated results if any. */
    1015         [ #  # ]:           0 :         if (node->as_nasyncresults > 0)
    1016                 :             :         {
    1017                 :           0 :                 --node->as_nasyncresults;
    1018                 :           0 :                 *result = node->as_asyncresults[node->as_nasyncresults];
    1019                 :           0 :                 return true;
    1020                 :             :         }
    1021                 :             : 
    1022                 :           0 :         return false;
    1023                 :           0 : }
    1024                 :             : 
    1025                 :             : /* ----------------------------------------------------------------
    1026                 :             :  *              ExecAppendAsyncEventWait
    1027                 :             :  *
    1028                 :             :  *              Wait or poll for file descriptor events and fire callbacks.
    1029                 :             :  * ----------------------------------------------------------------
    1030                 :             :  */
    1031                 :             : static void
    1032                 :           0 : ExecAppendAsyncEventWait(AppendState *node)
    1033                 :             : {
    1034                 :           0 :         int                     nevents = node->as_nasyncplans + 2;
    1035                 :           0 :         long            timeout = node->as_syncdone ? -1 : 0;
    1036                 :           0 :         WaitEvent       occurred_event[EVENT_BUFFER_SIZE];
    1037                 :           0 :         int                     noccurred;
    1038                 :           0 :         int                     i;
    1039                 :             : 
    1040                 :             :         /* We should never be called when there are no valid async subplans. */
    1041         [ #  # ]:           0 :         Assert(node->as_nasyncremain > 0);
    1042                 :             : 
    1043         [ #  # ]:           0 :         Assert(node->as_eventset == NULL);
    1044                 :           0 :         node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
    1045                 :           0 :         AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
    1046                 :             :                                           NULL, NULL);
    1047                 :             : 
    1048                 :             :         /* Give each waiting subplan a chance to add an event. */
    1049                 :           0 :         i = -1;
    1050         [ #  # ]:           0 :         while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
    1051                 :             :         {
    1052                 :           0 :                 AsyncRequest *areq = node->as_asyncrequests[i];
    1053                 :             : 
    1054         [ #  # ]:           0 :                 if (areq->callback_pending)
    1055                 :           0 :                         ExecAsyncConfigureWait(areq);
    1056                 :           0 :         }
    1057                 :             : 
    1058                 :             :         /*
    1059                 :             :          * No need for further processing if none of the subplans configured any
    1060                 :             :          * events.
    1061                 :             :          */
    1062         [ #  # ]:           0 :         if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
    1063                 :             :         {
    1064                 :           0 :                 FreeWaitEventSet(node->as_eventset);
    1065                 :           0 :                 node->as_eventset = NULL;
    1066                 :           0 :                 return;
    1067                 :             :         }
    1068                 :             : 
    1069                 :             :         /*
    1070                 :             :          * Add the process latch to the set, so that we wake up to process the
    1071                 :             :          * standard interrupts with CHECK_FOR_INTERRUPTS().
    1072                 :             :          *
    1073                 :             :          * NOTE: For historical reasons, it's important that this is added to the
    1074                 :             :          * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
    1075                 :             :          * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
    1076                 :             :          * any other events are in the set.  That's a poor design, it's
    1077                 :             :          * questionable for postgres_fdw to be doing that in the first place, but
    1078                 :             :          * we cannot change it now.  The pattern has possibly been copied to other
    1079                 :             :          * extensions too.
    1080                 :             :          */
    1081                 :           0 :         AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
    1082                 :           0 :                                           MyLatch, NULL);
    1083                 :             : 
    1084                 :             :         /* Return at most EVENT_BUFFER_SIZE events in one call. */
    1085         [ #  # ]:           0 :         if (nevents > EVENT_BUFFER_SIZE)
    1086                 :           0 :                 nevents = EVENT_BUFFER_SIZE;
    1087                 :             : 
    1088                 :             :         /*
    1089                 :             :          * If the timeout is -1, wait until at least one event occurs.  If the
    1090                 :             :          * timeout is 0, poll for events, but do not wait at all.
    1091                 :             :          */
    1092                 :           0 :         noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
    1093                 :           0 :                                                                  nevents, WAIT_EVENT_APPEND_READY);
    1094                 :           0 :         FreeWaitEventSet(node->as_eventset);
    1095                 :           0 :         node->as_eventset = NULL;
    1096         [ #  # ]:           0 :         if (noccurred == 0)
    1097                 :           0 :                 return;
    1098                 :             : 
    1099                 :             :         /* Deliver notifications. */
    1100         [ #  # ]:           0 :         for (i = 0; i < noccurred; i++)
    1101                 :             :         {
    1102                 :           0 :                 WaitEvent  *w = &occurred_event[i];
    1103                 :             : 
    1104                 :             :                 /*
    1105                 :             :                  * Each waiting subplan should have registered its wait event with
    1106                 :             :                  * user_data pointing back to its AsyncRequest.
    1107                 :             :                  */
    1108         [ #  # ]:           0 :                 if ((w->events & WL_SOCKET_READABLE) != 0)
    1109                 :             :                 {
    1110                 :           0 :                         AsyncRequest *areq = (AsyncRequest *) w->user_data;
    1111                 :             : 
    1112         [ #  # ]:           0 :                         if (areq->callback_pending)
    1113                 :             :                         {
    1114                 :             :                                 /*
    1115                 :             :                                  * Mark it as no longer needing a callback.  We must do this
    1116                 :             :                                  * before dispatching the callback in case the callback resets
    1117                 :             :                                  * the flag.
    1118                 :             :                                  */
    1119                 :           0 :                                 areq->callback_pending = false;
    1120                 :             : 
    1121                 :             :                                 /* Do the actual work. */
    1122                 :           0 :                                 ExecAsyncNotify(areq);
    1123                 :           0 :                         }
    1124                 :           0 :                 }
    1125                 :             : 
    1126                 :             :                 /* Handle standard interrupts */
    1127         [ #  # ]:           0 :                 if ((w->events & WL_LATCH_SET) != 0)
    1128                 :             :                 {
    1129                 :           0 :                         ResetLatch(MyLatch);
    1130         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
    1131                 :           0 :                 }
    1132                 :           0 :         }
    1133         [ #  # ]:           0 : }
    1134                 :             : 
    1135                 :             : /* ----------------------------------------------------------------
    1136                 :             :  *              ExecAsyncAppendResponse
    1137                 :             :  *
    1138                 :             :  *              Receive a response from an asynchronous request we made.
    1139                 :             :  * ----------------------------------------------------------------
    1140                 :             :  */
    1141                 :             : void
    1142                 :           0 : ExecAsyncAppendResponse(AsyncRequest *areq)
    1143                 :             : {
    1144                 :           0 :         AppendState *node = (AppendState *) areq->requestor;
    1145                 :           0 :         TupleTableSlot *slot = areq->result;
    1146                 :             : 
    1147                 :             :         /* The result should be a TupleTableSlot or NULL. */
    1148   [ #  #  #  # ]:           0 :         Assert(slot == NULL || IsA(slot, TupleTableSlot));
    1149                 :             : 
    1150                 :             :         /* Nothing to do if the request is pending. */
    1151         [ #  # ]:           0 :         if (!areq->request_complete)
    1152                 :             :         {
    1153                 :             :                 /* The request would have been pending for a callback. */
    1154         [ #  # ]:           0 :                 Assert(areq->callback_pending);
    1155                 :           0 :                 return;
    1156                 :             :         }
    1157                 :             : 
    1158                 :             :         /* If the result is NULL or an empty slot, there's nothing more to do. */
    1159   [ #  #  #  # ]:           0 :         if (TupIsNull(slot))
    1160                 :             :         {
    1161                 :             :                 /* The ending subplan wouldn't have been pending for a callback. */
    1162         [ #  # ]:           0 :                 Assert(!areq->callback_pending);
    1163                 :           0 :                 --node->as_nasyncremain;
    1164                 :           0 :                 return;
    1165                 :             :         }
    1166                 :             : 
    1167                 :             :         /* Save result so we can return it. */
    1168         [ #  # ]:           0 :         Assert(node->as_nasyncresults < node->as_nasyncplans);
    1169                 :           0 :         node->as_asyncresults[node->as_nasyncresults++] = slot;
    1170                 :             : 
    1171                 :             :         /*
    1172                 :             :          * Mark the subplan that returned a result as ready for a new request.  We
    1173                 :             :          * don't launch another one here immediately because it might complete.
    1174                 :             :          */
    1175                 :           0 :         node->as_needrequest = bms_add_member(node->as_needrequest,
    1176                 :           0 :                                                                                   areq->request_index);
    1177         [ #  # ]:           0 : }
    1178                 :             : 
    1179                 :             : /* ----------------------------------------------------------------
    1180                 :             :  *              classify_matching_subplans
    1181                 :             :  *
    1182                 :             :  *              Classify the node's as_valid_subplans into sync ones and
    1183                 :             :  *              async ones, adjust it to contain sync ones only, and save
    1184                 :             :  *              async ones in the node's as_valid_asyncplans.
    1185                 :             :  * ----------------------------------------------------------------
    1186                 :             :  */
    1187                 :             : static void
    1188                 :           0 : classify_matching_subplans(AppendState *node)
    1189                 :             : {
    1190                 :           0 :         Bitmapset  *valid_asyncplans;
    1191                 :             : 
    1192         [ #  # ]:           0 :         Assert(node->as_valid_subplans_identified);
    1193         [ #  # ]:           0 :         Assert(node->as_valid_asyncplans == NULL);
    1194                 :             : 
    1195                 :             :         /* Nothing to do if there are no valid subplans. */
    1196         [ #  # ]:           0 :         if (bms_is_empty(node->as_valid_subplans))
    1197                 :             :         {
    1198                 :           0 :                 node->as_syncdone = true;
    1199                 :           0 :                 node->as_nasyncremain = 0;
    1200                 :           0 :                 return;
    1201                 :             :         }
    1202                 :             : 
    1203                 :             :         /* Nothing to do if there are no valid async subplans. */
    1204         [ #  # ]:           0 :         if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
    1205                 :             :         {
    1206                 :           0 :                 node->as_nasyncremain = 0;
    1207                 :           0 :                 return;
    1208                 :             :         }
    1209                 :             : 
    1210                 :             :         /* Get valid async subplans. */
    1211                 :           0 :         valid_asyncplans = bms_intersect(node->as_asyncplans,
    1212                 :           0 :                                                                          node->as_valid_subplans);
    1213                 :             : 
    1214                 :             :         /* Adjust the valid subplans to contain sync subplans only. */
    1215                 :           0 :         node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
    1216                 :           0 :                                                                                           valid_asyncplans);
    1217                 :             : 
    1218                 :             :         /* Save valid async subplans. */
    1219                 :           0 :         node->as_valid_asyncplans = valid_asyncplans;
    1220         [ #  # ]:           0 : }
        

Generated by: LCOV version 2.3.2-1