LCOV - code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 90.2 % 642 579
Test Date: 2026-01-26 10:56:24 Functions: 100.0 % 19 19
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 54.5 % 264 144

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * parallel.c
       4                 :             :  *        Infrastructure for launching parallel workers
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *        src/backend/access/transam/parallel.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/brin.h"
      18                 :             : #include "access/gin.h"
      19                 :             : #include "access/nbtree.h"
      20                 :             : #include "access/parallel.h"
      21                 :             : #include "access/session.h"
      22                 :             : #include "access/xact.h"
      23                 :             : #include "access/xlog.h"
      24                 :             : #include "catalog/index.h"
      25                 :             : #include "catalog/namespace.h"
      26                 :             : #include "catalog/pg_enum.h"
      27                 :             : #include "catalog/storage.h"
      28                 :             : #include "commands/async.h"
      29                 :             : #include "commands/vacuum.h"
      30                 :             : #include "executor/execParallel.h"
      31                 :             : #include "libpq/libpq.h"
      32                 :             : #include "libpq/pqformat.h"
      33                 :             : #include "libpq/pqmq.h"
      34                 :             : #include "miscadmin.h"
      35                 :             : #include "optimizer/optimizer.h"
      36                 :             : #include "pgstat.h"
      37                 :             : #include "storage/ipc.h"
      38                 :             : #include "storage/predicate.h"
      39                 :             : #include "storage/spin.h"
      40                 :             : #include "tcop/tcopprot.h"
      41                 :             : #include "utils/combocid.h"
      42                 :             : #include "utils/guc.h"
      43                 :             : #include "utils/inval.h"
      44                 :             : #include "utils/memutils.h"
      45                 :             : #include "utils/relmapper.h"
      46                 :             : #include "utils/snapmgr.h"
      47                 :             : 
      48                 :             : /*
      49                 :             :  * We don't want to waste a lot of memory on an error queue which, most of
      50                 :             :  * the time, will process only a handful of small messages.  However, it is
      51                 :             :  * desirable to make it large enough that a typical ErrorResponse can be sent
      52                 :             :  * without blocking.  That way, a worker that errors out can write the whole
      53                 :             :  * message into the queue and terminate without waiting for the user backend.
      54                 :             :  */
      55                 :             : #define PARALLEL_ERROR_QUEUE_SIZE                       16384
      56                 :             : 
      57                 :             : /* Magic number for parallel context TOC. */
      58                 :             : #define PARALLEL_MAGIC                                          0x50477c7c
      59                 :             : 
      60                 :             : /*
      61                 :             :  * Magic numbers for per-context parallel state sharing.  Higher-level code
      62                 :             :  * should use smaller values, leaving these very large ones for use by this
      63                 :             :  * module.
      64                 :             :  */
      65                 :             : #define PARALLEL_KEY_FIXED                                      UINT64CONST(0xFFFFFFFFFFFF0001)
      66                 :             : #define PARALLEL_KEY_ERROR_QUEUE                        UINT64CONST(0xFFFFFFFFFFFF0002)
      67                 :             : #define PARALLEL_KEY_LIBRARY                            UINT64CONST(0xFFFFFFFFFFFF0003)
      68                 :             : #define PARALLEL_KEY_GUC                                        UINT64CONST(0xFFFFFFFFFFFF0004)
      69                 :             : #define PARALLEL_KEY_COMBO_CID                          UINT64CONST(0xFFFFFFFFFFFF0005)
      70                 :             : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT       UINT64CONST(0xFFFFFFFFFFFF0006)
      71                 :             : #define PARALLEL_KEY_ACTIVE_SNAPSHOT            UINT64CONST(0xFFFFFFFFFFFF0007)
      72                 :             : #define PARALLEL_KEY_TRANSACTION_STATE          UINT64CONST(0xFFFFFFFFFFFF0008)
      73                 :             : #define PARALLEL_KEY_ENTRYPOINT                         UINT64CONST(0xFFFFFFFFFFFF0009)
      74                 :             : #define PARALLEL_KEY_SESSION_DSM                        UINT64CONST(0xFFFFFFFFFFFF000A)
      75                 :             : #define PARALLEL_KEY_PENDING_SYNCS                      UINT64CONST(0xFFFFFFFFFFFF000B)
      76                 :             : #define PARALLEL_KEY_REINDEX_STATE                      UINT64CONST(0xFFFFFFFFFFFF000C)
      77                 :             : #define PARALLEL_KEY_RELMAPPER_STATE            UINT64CONST(0xFFFFFFFFFFFF000D)
      78                 :             : #define PARALLEL_KEY_UNCOMMITTEDENUMS           UINT64CONST(0xFFFFFFFFFFFF000E)
      79                 :             : #define PARALLEL_KEY_CLIENTCONNINFO                     UINT64CONST(0xFFFFFFFFFFFF000F)
      80                 :             : 
      81                 :             : /* Fixed-size parallel state. */
      82                 :             : typedef struct FixedParallelState
      83                 :             : {
      84                 :             :         /* Fixed-size state that workers must restore. */
      85                 :             :         Oid                     database_id;
      86                 :             :         Oid                     authenticated_user_id;
      87                 :             :         Oid                     session_user_id;
      88                 :             :         Oid                     outer_user_id;
      89                 :             :         Oid                     current_user_id;
      90                 :             :         Oid                     temp_namespace_id;
      91                 :             :         Oid                     temp_toast_namespace_id;
      92                 :             :         int                     sec_context;
      93                 :             :         bool            session_user_is_superuser;
      94                 :             :         bool            role_is_superuser;
      95                 :             :         PGPROC     *parallel_leader_pgproc;
      96                 :             :         pid_t           parallel_leader_pid;
      97                 :             :         ProcNumber      parallel_leader_proc_number;
      98                 :             :         TimestampTz xact_ts;
      99                 :             :         TimestampTz stmt_ts;
     100                 :             :         SerializableXactHandle serializable_xact_handle;
     101                 :             : 
     102                 :             :         /* Mutex protects remaining fields. */
     103                 :             :         slock_t         mutex;
     104                 :             : 
     105                 :             :         /* Maximum XactLastRecEnd of any worker. */
     106                 :             :         XLogRecPtr      last_xlog_end;
     107                 :             : } FixedParallelState;
     108                 :             : 
     109                 :             : /*
     110                 :             :  * Our parallel worker number.  We initialize this to -1, meaning that we are
     111                 :             :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
     112                 :             :  * and < the number of workers before any user code is invoked; each parallel
     113                 :             :  * worker will get a different parallel worker number.
     114                 :             :  */
     115                 :             : int                     ParallelWorkerNumber = -1;
     116                 :             : 
     117                 :             : /* Is there a parallel message pending which we need to receive? */
     118                 :             : volatile sig_atomic_t ParallelMessagePending = false;
     119                 :             : 
     120                 :             : /* Are we initializing a parallel worker? */
     121                 :             : bool            InitializingParallelWorker = false;
     122                 :             : 
     123                 :             : /* Pointer to our fixed parallel state. */
     124                 :             : static FixedParallelState *MyFixedParallelState;
     125                 :             : 
     126                 :             : /* List of active parallel contexts. */
     127                 :             : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
     128                 :             : 
     129                 :             : /* Backend-local copy of data from FixedParallelState. */
     130                 :             : static pid_t ParallelLeaderPid;
     131                 :             : 
     132                 :             : /*
     133                 :             :  * List of internal parallel worker entry points.  We need this for
     134                 :             :  * reasons explained in LookupParallelWorkerFunction(), below.
     135                 :             :  */
     136                 :             : static const struct
     137                 :             : {
     138                 :             :         const char *fn_name;
     139                 :             :         parallel_worker_main_type fn_addr;
     140                 :             : }                       InternalParallelWorkers[] =
     141                 :             : 
     142                 :             : {
     143                 :             :         {
     144                 :             :                 "ParallelQueryMain", ParallelQueryMain
     145                 :             :         },
     146                 :             :         {
     147                 :             :                 "_bt_parallel_build_main", _bt_parallel_build_main
     148                 :             :         },
     149                 :             :         {
     150                 :             :                 "_brin_parallel_build_main", _brin_parallel_build_main
     151                 :             :         },
     152                 :             :         {
     153                 :             :                 "_gin_parallel_build_main", _gin_parallel_build_main
     154                 :             :         },
     155                 :             :         {
     156                 :             :                 "parallel_vacuum_main", parallel_vacuum_main
     157                 :             :         }
     158                 :             : };
     159                 :             : 
     160                 :             : /* Private functions. */
     161                 :             : static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
     162                 :             : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
     163                 :             : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
     164                 :             : static void ParallelWorkerShutdown(int code, Datum arg);
     165                 :             : 
     166                 :             : 
     167                 :             : /*
     168                 :             :  * Establish a new parallel context.  This should be done after entering
     169                 :             :  * parallel mode, and (unless there is an error) the context should be
     170                 :             :  * destroyed before exiting the current subtransaction.
     171                 :             :  */
     172                 :             : ParallelContext *
     173                 :         155 : CreateParallelContext(const char *library_name, const char *function_name,
     174                 :             :                                           int nworkers)
     175                 :             : {
     176                 :         155 :         MemoryContext oldcontext;
     177                 :         155 :         ParallelContext *pcxt;
     178                 :             : 
     179                 :             :         /* It is unsafe to create a parallel context if not in parallel mode. */
     180         [ +  - ]:         155 :         Assert(IsInParallelMode());
     181                 :             : 
     182                 :             :         /* Number of workers should be non-negative. */
     183         [ +  - ]:         155 :         Assert(nworkers >= 0);
     184                 :             : 
     185                 :             :         /* We might be running in a short-lived memory context. */
     186                 :         155 :         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     187                 :             : 
     188                 :             :         /* Initialize a new ParallelContext. */
     189                 :         155 :         pcxt = palloc0_object(ParallelContext);
     190                 :         155 :         pcxt->subid = GetCurrentSubTransactionId();
     191                 :         155 :         pcxt->nworkers = nworkers;
     192                 :         155 :         pcxt->nworkers_to_launch = nworkers;
     193                 :         155 :         pcxt->library_name = pstrdup(library_name);
     194                 :         155 :         pcxt->function_name = pstrdup(function_name);
     195                 :         155 :         pcxt->error_context_stack = error_context_stack;
     196                 :         155 :         shm_toc_initialize_estimator(&pcxt->estimator);
     197                 :         155 :         dlist_push_head(&pcxt_list, &pcxt->node);
     198                 :             : 
     199                 :             :         /* Restore previous memory context. */
     200                 :         155 :         MemoryContextSwitchTo(oldcontext);
     201                 :             : 
     202                 :         310 :         return pcxt;
     203                 :         155 : }
     204                 :             : 
     205                 :             : /*
     206                 :             :  * Establish the dynamic shared memory segment for a parallel context and
     207                 :             :  * copy state and other bookkeeping information that will be needed by
     208                 :             :  * parallel workers into it.
     209                 :             :  */
     210                 :             : void
     211                 :         155 : InitializeParallelDSM(ParallelContext *pcxt)
     212                 :             : {
     213                 :         155 :         MemoryContext oldcontext;
     214                 :         155 :         Size            library_len = 0;
     215                 :         155 :         Size            guc_len = 0;
     216                 :         155 :         Size            combocidlen = 0;
     217                 :         155 :         Size            tsnaplen = 0;
     218                 :         155 :         Size            asnaplen = 0;
     219                 :         155 :         Size            tstatelen = 0;
     220                 :         155 :         Size            pendingsyncslen = 0;
     221                 :         155 :         Size            reindexlen = 0;
     222                 :         155 :         Size            relmapperlen = 0;
     223                 :         155 :         Size            uncommittedenumslen = 0;
     224                 :         155 :         Size            clientconninfolen = 0;
     225                 :         155 :         Size            segsize = 0;
     226                 :         155 :         int                     i;
     227                 :         155 :         FixedParallelState *fps;
     228                 :         155 :         dsm_handle      session_dsm_handle = DSM_HANDLE_INVALID;
     229                 :         155 :         Snapshot        transaction_snapshot = GetTransactionSnapshot();
     230                 :         155 :         Snapshot        active_snapshot = GetActiveSnapshot();
     231                 :             : 
     232                 :             :         /* We might be running in a very short-lived memory context. */
     233                 :         155 :         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     234                 :             : 
     235                 :             :         /* Allow space to store the fixed-size parallel state. */
     236                 :         155 :         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
     237                 :         155 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     238                 :             : 
     239                 :             :         /*
     240                 :             :          * If we manage to reach here while non-interruptible, it's unsafe to
     241                 :             :          * launch any workers: we would fail to process interrupts sent by them.
     242                 :             :          * We can deal with that edge case by pretending no workers were
     243                 :             :          * requested.
     244                 :             :          */
     245   [ +  -  +  -  :         155 :         if (!INTERRUPTS_CAN_BE_PROCESSED())
                   -  + ]
     246                 :           0 :                 pcxt->nworkers = 0;
     247                 :             : 
     248                 :             :         /*
     249                 :             :          * Normally, the user will have requested at least one worker process, but
     250                 :             :          * if by chance they have not, we can skip a bunch of things here.
     251                 :             :          */
     252         [ +  - ]:         155 :         if (pcxt->nworkers > 0)
     253                 :             :         {
     254                 :             :                 /* Get (or create) the per-session DSM segment's handle. */
     255                 :         155 :                 session_dsm_handle = GetSessionDsmHandle();
     256                 :             : 
     257                 :             :                 /*
     258                 :             :                  * If we weren't able to create a per-session DSM segment, then we can
     259                 :             :                  * continue but we can't safely launch any workers because their
     260                 :             :                  * record typmods would be incompatible so they couldn't exchange
     261                 :             :                  * tuples.
     262                 :             :                  */
     263         [ +  - ]:         155 :                 if (session_dsm_handle == DSM_HANDLE_INVALID)
     264                 :           0 :                         pcxt->nworkers = 0;
     265                 :         155 :         }
     266                 :             : 
     267         [ +  - ]:         155 :         if (pcxt->nworkers > 0)
     268                 :             :         {
     269                 :             :                 StaticAssertDecl(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
     270                 :             :                                                  PARALLEL_ERROR_QUEUE_SIZE,
     271                 :             :                                                  "parallel error queue size not buffer-aligned");
     272                 :             : 
     273                 :             :                 /* Estimate space for various kinds of state sharing. */
     274                 :         155 :                 library_len = EstimateLibraryStateSpace();
     275                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, library_len);
     276                 :         155 :                 guc_len = EstimateGUCStateSpace();
     277                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
     278                 :         155 :                 combocidlen = EstimateComboCIDStateSpace();
     279                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
     280         [ +  - ]:         155 :                 if (IsolationUsesXactSnapshot())
     281                 :             :                 {
     282                 :           0 :                         tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
     283                 :           0 :                         shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
     284                 :           0 :                 }
     285                 :         155 :                 asnaplen = EstimateSnapshotSpace(active_snapshot);
     286                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
     287                 :         155 :                 tstatelen = EstimateTransactionStateSpace();
     288                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
     289                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
     290                 :         155 :                 pendingsyncslen = EstimatePendingSyncsSpace();
     291                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
     292                 :         155 :                 reindexlen = EstimateReindexStateSpace();
     293                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
     294                 :         155 :                 relmapperlen = EstimateRelationMapSpace();
     295                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
     296                 :         155 :                 uncommittedenumslen = EstimateUncommittedEnumsSpace();
     297                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
     298                 :         155 :                 clientconninfolen = EstimateClientConnectionInfoSpace();
     299                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
     300                 :             :                 /* If you add more chunks here, you probably need to add keys. */
     301                 :         155 :                 shm_toc_estimate_keys(&pcxt->estimator, 12);
     302                 :             : 
     303                 :             :                 /* Estimate space need for error queues. */
     304                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator,
     305                 :             :                                                            mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     306                 :             :                                                                                 pcxt->nworkers));
     307                 :         155 :                 shm_toc_estimate_keys(&pcxt->estimator, 1);
     308                 :             : 
     309                 :             :                 /* Estimate how much we'll need for the entrypoint info. */
     310                 :         155 :                 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
     311                 :             :                                                            strlen(pcxt->function_name) + 2);
     312                 :         155 :                 shm_toc_estimate_keys(&pcxt->estimator, 1);
     313                 :         155 :         }
     314                 :             : 
     315                 :             :         /*
     316                 :             :          * Create DSM and initialize with new table of contents.  But if the user
     317                 :             :          * didn't request any workers, then don't bother creating a dynamic shared
     318                 :             :          * memory segment; instead, just use backend-private memory.
     319                 :             :          *
     320                 :             :          * Also, if we can't create a dynamic shared memory segment because the
     321                 :             :          * maximum number of segments have already been created, then fall back to
     322                 :             :          * backend-private memory, and plan not to use any workers.  We hope this
     323                 :             :          * won't happen very often, but it's better to abandon the use of
     324                 :             :          * parallelism than to fail outright.
     325                 :             :          */
     326                 :         155 :         segsize = shm_toc_estimate(&pcxt->estimator);
     327         [ +  - ]:         155 :         if (pcxt->nworkers > 0)
     328                 :         155 :                 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
     329         [ +  - ]:         155 :         if (pcxt->seg != NULL)
     330                 :         155 :                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
     331                 :         155 :                                                                    dsm_segment_address(pcxt->seg),
     332                 :         155 :                                                                    segsize);
     333                 :             :         else
     334                 :             :         {
     335                 :           0 :                 pcxt->nworkers = 0;
     336                 :           0 :                 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
     337                 :           0 :                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
     338                 :           0 :                                                                    segsize);
     339                 :             :         }
     340                 :             : 
     341                 :             :         /* Initialize fixed-size state in shared memory. */
     342                 :         155 :         fps = (FixedParallelState *)
     343                 :         155 :                 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
     344                 :         155 :         fps->database_id = MyDatabaseId;
     345                 :         155 :         fps->authenticated_user_id = GetAuthenticatedUserId();
     346                 :         155 :         fps->session_user_id = GetSessionUserId();
     347                 :         155 :         fps->outer_user_id = GetCurrentRoleId();
     348                 :         155 :         GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
     349                 :         155 :         fps->session_user_is_superuser = GetSessionUserIsSuperuser();
     350                 :         155 :         fps->role_is_superuser = current_role_is_superuser;
     351                 :         310 :         GetTempNamespaceState(&fps->temp_namespace_id,
     352                 :         155 :                                                   &fps->temp_toast_namespace_id);
     353                 :         155 :         fps->parallel_leader_pgproc = MyProc;
     354                 :         155 :         fps->parallel_leader_pid = MyProcPid;
     355                 :         155 :         fps->parallel_leader_proc_number = MyProcNumber;
     356                 :         155 :         fps->xact_ts = GetCurrentTransactionStartTimestamp();
     357                 :         155 :         fps->stmt_ts = GetCurrentStatementStartTimestamp();
     358                 :         155 :         fps->serializable_xact_handle = ShareSerializableXact();
     359                 :         155 :         SpinLockInit(&fps->mutex);
     360                 :         155 :         fps->last_xlog_end = 0;
     361                 :         155 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
     362                 :             : 
     363                 :             :         /* We can skip the rest of this if we're not budgeting for any workers. */
     364         [ +  - ]:         155 :         if (pcxt->nworkers > 0)
     365                 :             :         {
     366                 :         155 :                 char       *libraryspace;
     367                 :         155 :                 char       *gucspace;
     368                 :         155 :                 char       *combocidspace;
     369                 :         155 :                 char       *tsnapspace;
     370                 :         155 :                 char       *asnapspace;
     371                 :         155 :                 char       *tstatespace;
     372                 :         155 :                 char       *pendingsyncsspace;
     373                 :         155 :                 char       *reindexspace;
     374                 :         155 :                 char       *relmapperspace;
     375                 :         155 :                 char       *error_queue_space;
     376                 :         155 :                 char       *session_dsm_handle_space;
     377                 :         155 :                 char       *entrypointstate;
     378                 :         155 :                 char       *uncommittedenumsspace;
     379                 :         155 :                 char       *clientconninfospace;
     380                 :         155 :                 Size            lnamelen;
     381                 :             : 
     382                 :             :                 /* Serialize shared libraries we have loaded. */
     383                 :         155 :                 libraryspace = shm_toc_allocate(pcxt->toc, library_len);
     384                 :         155 :                 SerializeLibraryState(library_len, libraryspace);
     385                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
     386                 :             : 
     387                 :             :                 /* Serialize GUC settings. */
     388                 :         155 :                 gucspace = shm_toc_allocate(pcxt->toc, guc_len);
     389                 :         155 :                 SerializeGUCState(guc_len, gucspace);
     390                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
     391                 :             : 
     392                 :             :                 /* Serialize combo CID state. */
     393                 :         155 :                 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
     394                 :         155 :                 SerializeComboCIDState(combocidlen, combocidspace);
     395                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
     396                 :             : 
     397                 :             :                 /*
     398                 :             :                  * Serialize the transaction snapshot if the transaction isolation
     399                 :             :                  * level uses a transaction snapshot.
     400                 :             :                  */
     401         [ +  - ]:         155 :                 if (IsolationUsesXactSnapshot())
     402                 :             :                 {
     403                 :           0 :                         tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
     404                 :           0 :                         SerializeSnapshot(transaction_snapshot, tsnapspace);
     405                 :           0 :                         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
     406                 :           0 :                                                    tsnapspace);
     407                 :           0 :                 }
     408                 :             : 
     409                 :             :                 /* Serialize the active snapshot. */
     410                 :         155 :                 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
     411                 :         155 :                 SerializeSnapshot(active_snapshot, asnapspace);
     412                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
     413                 :             : 
     414                 :             :                 /* Provide the handle for per-session segment. */
     415                 :         155 :                 session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
     416                 :             :                                                                                                         sizeof(dsm_handle));
     417                 :         155 :                 *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
     418                 :         310 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
     419                 :         155 :                                            session_dsm_handle_space);
     420                 :             : 
     421                 :             :                 /* Serialize transaction state. */
     422                 :         155 :                 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
     423                 :         155 :                 SerializeTransactionState(tstatelen, tstatespace);
     424                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
     425                 :             : 
     426                 :             :                 /* Serialize pending syncs. */
     427                 :         155 :                 pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
     428                 :         155 :                 SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
     429                 :         310 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
     430                 :         155 :                                            pendingsyncsspace);
     431                 :             : 
     432                 :             :                 /* Serialize reindex state. */
     433                 :         155 :                 reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
     434                 :         155 :                 SerializeReindexState(reindexlen, reindexspace);
     435                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
     436                 :             : 
     437                 :             :                 /* Serialize relmapper state. */
     438                 :         155 :                 relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
     439                 :         155 :                 SerializeRelationMap(relmapperlen, relmapperspace);
     440                 :         310 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
     441                 :         155 :                                            relmapperspace);
     442                 :             : 
     443                 :             :                 /* Serialize uncommitted enum state. */
     444                 :         310 :                 uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
     445                 :         155 :                                                                                                  uncommittedenumslen);
     446                 :         155 :                 SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
     447                 :         310 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
     448                 :         155 :                                            uncommittedenumsspace);
     449                 :             : 
     450                 :             :                 /* Serialize our ClientConnectionInfo. */
     451                 :         155 :                 clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
     452                 :         155 :                 SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
     453                 :         310 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
     454                 :         155 :                                            clientconninfospace);
     455                 :             : 
     456                 :             :                 /* Allocate space for worker information. */
     457                 :         155 :                 pcxt->worker = palloc0_array(ParallelWorkerInfo, pcxt->nworkers);
     458                 :             : 
     459                 :             :                 /*
     460                 :             :                  * Establish error queues in dynamic shared memory.
     461                 :             :                  *
     462                 :             :                  * These queues should be used only for transmitting ErrorResponse,
     463                 :             :                  * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
     464                 :             :                  * should be transmitted via separate (possibly larger?) queues.
     465                 :             :                  */
     466                 :         155 :                 error_queue_space =
     467                 :         310 :                         shm_toc_allocate(pcxt->toc,
     468                 :         155 :                                                          mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     469                 :         155 :                                                                           pcxt->nworkers));
     470         [ +  + ]:         503 :                 for (i = 0; i < pcxt->nworkers; ++i)
     471                 :             :                 {
     472                 :         348 :                         char       *start;
     473                 :         348 :                         shm_mq     *mq;
     474                 :             : 
     475                 :         348 :                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     476                 :         348 :                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     477                 :         348 :                         shm_mq_set_receiver(mq, MyProc);
     478                 :         348 :                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     479                 :         348 :                 }
     480                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
     481                 :             : 
     482                 :             :                 /*
     483                 :             :                  * Serialize entrypoint information.  It's unsafe to pass function
     484                 :             :                  * pointers across processes, as the function pointer may be different
     485                 :             :                  * in each process in EXEC_BACKEND builds, so we always pass library
     486                 :             :                  * and function name.  (We use library name "postgres" for functions
     487                 :             :                  * in the core backend.)
     488                 :             :                  */
     489                 :         155 :                 lnamelen = strlen(pcxt->library_name);
     490                 :         465 :                 entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
     491                 :         310 :                                                                                    strlen(pcxt->function_name) + 2);
     492                 :         155 :                 strcpy(entrypointstate, pcxt->library_name);
     493                 :         155 :                 strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
     494                 :         155 :                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
     495                 :         155 :         }
     496                 :             : 
     497                 :             :         /* Update nworkers_to_launch, in case we changed nworkers above. */
     498                 :         155 :         pcxt->nworkers_to_launch = pcxt->nworkers;
     499                 :             : 
     500                 :             :         /* Restore previous memory context. */
     501                 :         155 :         MemoryContextSwitchTo(oldcontext);
     502                 :         155 : }
     503                 :             : 
     504                 :             : /*
     505                 :             :  * Reinitialize the dynamic shared memory segment for a parallel context such
     506                 :             :  * that we could launch workers for it again.
     507                 :             :  */
     508                 :             : void
     509                 :          48 : ReinitializeParallelDSM(ParallelContext *pcxt)
     510                 :             : {
     511                 :          48 :         MemoryContext oldcontext;
     512                 :          48 :         FixedParallelState *fps;
     513                 :             : 
     514                 :             :         /* We might be running in a very short-lived memory context. */
     515                 :          48 :         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     516                 :             : 
     517                 :             :         /* Wait for any old workers to exit. */
     518         [ +  - ]:          48 :         if (pcxt->nworkers_launched > 0)
     519                 :             :         {
     520                 :          48 :                 WaitForParallelWorkersToFinish(pcxt);
     521                 :          48 :                 WaitForParallelWorkersToExit(pcxt);
     522                 :          48 :                 pcxt->nworkers_launched = 0;
     523         [ -  + ]:          48 :                 if (pcxt->known_attached_workers)
     524                 :             :                 {
     525                 :          48 :                         pfree(pcxt->known_attached_workers);
     526                 :          48 :                         pcxt->known_attached_workers = NULL;
     527                 :          48 :                         pcxt->nknown_attached_workers = 0;
     528                 :          48 :                 }
     529                 :          48 :         }
     530                 :             : 
     531                 :             :         /* Reset a few bits of fixed parallel state to a clean state. */
     532                 :          48 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     533                 :          48 :         fps->last_xlog_end = 0;
     534                 :             : 
     535                 :             :         /* Recreate error queues (if they exist). */
     536         [ +  - ]:          48 :         if (pcxt->nworkers > 0)
     537                 :             :         {
     538                 :          48 :                 char       *error_queue_space;
     539                 :          48 :                 int                     i;
     540                 :             : 
     541                 :          48 :                 error_queue_space =
     542                 :          48 :                         shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
     543         [ +  + ]:         191 :                 for (i = 0; i < pcxt->nworkers; ++i)
     544                 :             :                 {
     545                 :         143 :                         char       *start;
     546                 :         143 :                         shm_mq     *mq;
     547                 :             : 
     548                 :         143 :                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     549                 :         143 :                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     550                 :         143 :                         shm_mq_set_receiver(mq, MyProc);
     551                 :         143 :                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     552                 :         143 :                 }
     553                 :          48 :         }
     554                 :             : 
     555                 :             :         /* Restore previous memory context. */
     556                 :          48 :         MemoryContextSwitchTo(oldcontext);
     557                 :          48 : }
     558                 :             : 
     559                 :             : /*
     560                 :             :  * Reinitialize parallel workers for a parallel context such that we could
     561                 :             :  * launch a different number of workers.  This is required for cases where
     562                 :             :  * we need to reuse the same DSM segment, but the number of workers can
     563                 :             :  * vary from run-to-run.
     564                 :             :  */
     565                 :             : void
     566                 :          11 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
     567                 :             : {
     568                 :             :         /*
     569                 :             :          * The number of workers that need to be launched must be less than the
     570                 :             :          * number of workers with which the parallel context is initialized.  But
     571                 :             :          * the caller might not know that InitializeParallelDSM reduced nworkers,
     572                 :             :          * so just silently trim the request.
     573                 :             :          */
     574         [ -  + ]:          11 :         pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
     575                 :          11 : }
     576                 :             : 
     577                 :             : /*
     578                 :             :  * Launch parallel workers.
     579                 :             :  */
     580                 :             : void
     581                 :         203 : LaunchParallelWorkers(ParallelContext *pcxt)
     582                 :             : {
     583                 :         203 :         MemoryContext oldcontext;
     584                 :         203 :         BackgroundWorker worker;
     585                 :         203 :         int                     i;
     586                 :         203 :         bool            any_registrations_failed = false;
     587                 :             : 
     588                 :             :         /* Skip this if we have no workers. */
     589   [ +  -  -  + ]:         203 :         if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
     590                 :           0 :                 return;
     591                 :             : 
     592                 :             :         /* We need to be a lock group leader. */
     593                 :         203 :         BecomeLockGroupLeader();
     594                 :             : 
     595                 :             :         /* If we do have workers, we'd better have a DSM segment. */
     596         [ +  - ]:         203 :         Assert(pcxt->seg != NULL);
     597                 :             : 
     598                 :             :         /* We might be running in a short-lived memory context. */
     599                 :         203 :         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     600                 :             : 
     601                 :             :         /* Configure a worker. */
     602                 :         203 :         memset(&worker, 0, sizeof(worker));
     603                 :         406 :         snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
     604                 :         203 :                          MyProcPid);
     605                 :         203 :         snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
     606                 :         203 :         worker.bgw_flags =
     607                 :             :                 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
     608                 :             :                 | BGWORKER_CLASS_PARALLEL;
     609                 :         203 :         worker.bgw_start_time = BgWorkerStart_ConsistentState;
     610                 :         203 :         worker.bgw_restart_time = BGW_NEVER_RESTART;
     611                 :         203 :         sprintf(worker.bgw_library_name, "postgres");
     612                 :         203 :         sprintf(worker.bgw_function_name, "ParallelWorkerMain");
     613                 :         203 :         worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
     614                 :         203 :         worker.bgw_notify_pid = MyProcPid;
     615                 :             : 
     616                 :             :         /*
     617                 :             :          * Start workers.
     618                 :             :          *
     619                 :             :          * The caller must be able to tolerate ending up with fewer workers than
     620                 :             :          * expected, so there is no need to throw an error here if registration
     621                 :             :          * fails.  It wouldn't help much anyway, because registering the worker in
     622                 :             :          * no way guarantees that it will start up and initialize successfully.
     623                 :             :          */
     624         [ +  + ]:         693 :         for (i = 0; i < pcxt->nworkers_to_launch; ++i)
     625                 :             :         {
     626                 :         490 :                 memcpy(worker.bgw_extra, &i, sizeof(int));
     627   [ +  +  +  + ]:         490 :                 if (!any_registrations_failed &&
     628                 :         481 :                         RegisterDynamicBackgroundWorker(&worker,
     629                 :         481 :                                                                                         &pcxt->worker[i].bgwhandle))
     630                 :             :                 {
     631                 :         954 :                         shm_mq_set_handle(pcxt->worker[i].error_mqh,
     632                 :         477 :                                                           pcxt->worker[i].bgwhandle);
     633                 :         477 :                         pcxt->nworkers_launched++;
     634                 :         477 :                 }
     635                 :             :                 else
     636                 :             :                 {
     637                 :             :                         /*
     638                 :             :                          * If we weren't able to register the worker, then we've bumped up
     639                 :             :                          * against the max_worker_processes limit, and future
     640                 :             :                          * registrations will probably fail too, so arrange to skip them.
     641                 :             :                          * But we still have to execute this code for the remaining slots
     642                 :             :                          * to make sure that we forget about the error queues we budgeted
     643                 :             :                          * for those workers.  Otherwise, we'll wait for them to start,
     644                 :             :                          * but they never will.
     645                 :             :                          */
     646                 :          13 :                         any_registrations_failed = true;
     647                 :          13 :                         pcxt->worker[i].bgwhandle = NULL;
     648                 :          13 :                         shm_mq_detach(pcxt->worker[i].error_mqh);
     649                 :          13 :                         pcxt->worker[i].error_mqh = NULL;
     650                 :             :                 }
     651                 :         490 :         }
     652                 :             : 
     653                 :             :         /*
     654                 :             :          * Now that nworkers_launched has taken its final value, we can initialize
     655                 :             :          * known_attached_workers.
     656                 :             :          */
     657         [ +  + ]:         203 :         if (pcxt->nworkers_launched > 0)
     658                 :             :         {
     659                 :         200 :                 pcxt->known_attached_workers = palloc0_array(bool, pcxt->nworkers_launched);
     660                 :         200 :                 pcxt->nknown_attached_workers = 0;
     661                 :         200 :         }
     662                 :             : 
     663                 :             :         /* Restore previous memory context. */
     664                 :         203 :         MemoryContextSwitchTo(oldcontext);
     665         [ -  + ]:         203 : }
     666                 :             : 
     667                 :             : /*
     668                 :             :  * Wait for all workers to attach to their error queues, and throw an error if
     669                 :             :  * any worker fails to do this.
     670                 :             :  *
     671                 :             :  * Callers can assume that if this function returns successfully, then the
     672                 :             :  * number of workers given by pcxt->nworkers_launched have initialized and
     673                 :             :  * attached to their error queues.  Whether or not these workers are guaranteed
     674                 :             :  * to still be running depends on what code the caller asked them to run;
     675                 :             :  * this function does not guarantee that they have not exited.  However, it
     676                 :             :  * does guarantee that any workers which exited must have done so cleanly and
     677                 :             :  * after successfully performing the work with which they were tasked.
     678                 :             :  *
     679                 :             :  * If this function is not called, then some of the workers that were launched
     680                 :             :  * may not have been started due to a fork() failure, or may have exited during
     681                 :             :  * early startup prior to attaching to the error queue, so nworkers_launched
     682                 :             :  * cannot be viewed as completely reliable.  It will never be less than the
     683                 :             :  * number of workers which actually started, but it might be more.  Any workers
     684                 :             :  * that failed to start will still be discovered by
     685                 :             :  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
     686                 :             :  * provided that function is eventually reached.
     687                 :             :  *
     688                 :             :  * In general, the leader process should do as much work as possible before
     689                 :             :  * calling this function.  fork() failures and other early-startup failures
     690                 :             :  * are very uncommon, and having the leader sit idle when it could be doing
     691                 :             :  * useful work is undesirable.  However, if the leader needs to wait for
     692                 :             :  * all of its workers or for a specific worker, it may want to call this
     693                 :             :  * function before doing so.  If not, it must make some other provision for
     694                 :             :  * the failure-to-start case, lest it wait forever.  On the other hand, a
     695                 :             :  * leader which never waits for a worker that might not be started yet, or
     696                 :             :  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
     697                 :             :  * call this function at all.
     698                 :             :  */
     699                 :             : void
     700                 :          28 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
     701                 :             : {
     702                 :          28 :         int                     i;
     703                 :             : 
     704                 :             :         /* Skip this if we have no launched workers. */
     705         [ -  + ]:          28 :         if (pcxt->nworkers_launched == 0)
     706                 :           0 :                 return;
     707                 :             : 
     708                 :      823456 :         for (;;)
     709                 :             :         {
     710                 :             :                 /*
     711                 :             :                  * This will process any parallel messages that are pending and it may
     712                 :             :                  * also throw an error propagated from a worker.
     713                 :             :                  */
     714         [ +  + ]:      823456 :                 CHECK_FOR_INTERRUPTS();
     715                 :             : 
     716         [ +  + ]:     1646912 :                 for (i = 0; i < pcxt->nworkers_launched; ++i)
     717                 :             :                 {
     718                 :      823456 :                         BgwHandleStatus status;
     719                 :      823456 :                         shm_mq     *mq;
     720                 :      823456 :                         int                     rc;
     721                 :      823456 :                         pid_t           pid;
     722                 :             : 
     723         [ +  + ]:      823456 :                         if (pcxt->known_attached_workers[i])
     724                 :           4 :                                 continue;
     725                 :             : 
     726                 :             :                         /*
     727                 :             :                          * If error_mqh is NULL, then the worker has already exited
     728                 :             :                          * cleanly.
     729                 :             :                          */
     730         [ +  - ]:      823452 :                         if (pcxt->worker[i].error_mqh == NULL)
     731                 :             :                         {
     732                 :           0 :                                 pcxt->known_attached_workers[i] = true;
     733                 :           0 :                                 ++pcxt->nknown_attached_workers;
     734                 :           0 :                                 continue;
     735                 :             :                         }
     736                 :             : 
     737                 :      823452 :                         status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
     738         [ +  + ]:      823452 :                         if (status == BGWH_STARTED)
     739                 :             :                         {
     740                 :             :                                 /* Has the worker attached to the error queue? */
     741                 :      823436 :                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     742         [ +  + ]:      823436 :                                 if (shm_mq_get_sender(mq) != NULL)
     743                 :             :                                 {
     744                 :             :                                         /* Yes, so it is known to be attached. */
     745                 :          24 :                                         pcxt->known_attached_workers[i] = true;
     746                 :          24 :                                         ++pcxt->nknown_attached_workers;
     747                 :          24 :                                 }
     748                 :      823436 :                         }
     749         [ -  + ]:          16 :                         else if (status == BGWH_STOPPED)
     750                 :             :                         {
     751                 :             :                                 /*
     752                 :             :                                  * If the worker stopped without attaching to the error queue,
     753                 :             :                                  * throw an error.
     754                 :             :                                  */
     755                 :           0 :                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     756         [ #  # ]:           0 :                                 if (shm_mq_get_sender(mq) == NULL)
     757   [ #  #  #  # ]:           0 :                                         ereport(ERROR,
     758                 :             :                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     759                 :             :                                                          errmsg("parallel worker failed to initialize"),
     760                 :             :                                                          errhint("More details may be available in the server log.")));
     761                 :             : 
     762                 :           0 :                                 pcxt->known_attached_workers[i] = true;
     763                 :           0 :                                 ++pcxt->nknown_attached_workers;
     764                 :           0 :                         }
     765                 :             :                         else
     766                 :             :                         {
     767                 :             :                                 /*
     768                 :             :                                  * Worker not yet started, so we must wait.  The postmaster
     769                 :             :                                  * will notify us if the worker's state changes.  Our latch
     770                 :             :                                  * might also get set for some other reason, but if so we'll
     771                 :             :                                  * just end up waiting for the same worker again.
     772                 :             :                                  */
     773                 :          16 :                                 rc = WaitLatch(MyLatch,
     774                 :             :                                                            WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     775                 :             :                                                            -1, WAIT_EVENT_BGWORKER_STARTUP);
     776                 :             : 
     777         [ -  + ]:          16 :                                 if (rc & WL_LATCH_SET)
     778                 :          16 :                                         ResetLatch(MyLatch);
     779                 :             :                         }
     780         [ +  + ]:      823456 :                 }
     781                 :             : 
     782                 :             :                 /* If all workers are known to have started, we're done. */
     783         [ +  + ]:      823456 :                 if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
     784                 :             :                 {
     785         [ +  - ]:          28 :                         Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
     786                 :          28 :                         break;
     787                 :             :                 }
     788                 :             :         }
     789                 :          28 : }
     790                 :             : 
     791                 :             : /*
     792                 :             :  * Wait for all workers to finish computing.
     793                 :             :  *
     794                 :             :  * Even if the parallel operation seems to have completed successfully, it's
     795                 :             :  * important to call this function afterwards.  We must not miss any errors
     796                 :             :  * the workers may have thrown during the parallel operation, or any that they
     797                 :             :  * may yet throw while shutting down.
     798                 :             :  *
     799                 :             :  * Also, we want to update our notion of XactLastRecEnd based on worker
     800                 :             :  * feedback.
     801                 :             :  */
     802                 :             : void
     803                 :         249 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
     804                 :             : {
     805                 :         378 :         for (;;)
     806                 :             :         {
     807                 :         378 :                 bool            anyone_alive = false;
     808                 :         378 :                 int                     nfinished = 0;
     809                 :         378 :                 int                     i;
     810                 :             : 
     811                 :             :                 /*
     812                 :             :                  * This will process any parallel messages that are pending, which may
     813                 :             :                  * change the outcome of the loop that follows.  It may also throw an
     814                 :             :                  * error propagated from a worker.
     815                 :             :                  */
     816         [ +  + ]:         378 :                 CHECK_FOR_INTERRUPTS();
     817                 :             : 
     818         [ +  + ]:        1296 :                 for (i = 0; i < pcxt->nworkers_launched; ++i)
     819                 :             :                 {
     820                 :             :                         /*
     821                 :             :                          * If error_mqh is NULL, then the worker has already exited
     822                 :             :                          * cleanly.  If we have received a message through error_mqh from
     823                 :             :                          * the worker, we know it started up cleanly, and therefore we're
     824                 :             :                          * certain to be notified when it exits.
     825                 :             :                          */
     826         [ +  + ]:         919 :                         if (pcxt->worker[i].error_mqh == NULL)
     827                 :         767 :                                 ++nfinished;
     828         [ +  + ]:         152 :                         else if (pcxt->known_attached_workers[i])
     829                 :             :                         {
     830                 :           1 :                                 anyone_alive = true;
     831                 :           1 :                                 break;
     832                 :             :                         }
     833                 :         918 :                 }
     834                 :             : 
     835         [ +  + ]:         378 :                 if (!anyone_alive)
     836                 :             :                 {
     837                 :             :                         /* If all workers are known to have finished, we're done. */
     838         [ +  + ]:         377 :                         if (nfinished >= pcxt->nworkers_launched)
     839                 :             :                         {
     840         [ -  + ]:         249 :                                 Assert(nfinished == pcxt->nworkers_launched);
     841                 :         249 :                                 break;
     842                 :             :                         }
     843                 :             : 
     844                 :             :                         /*
     845                 :             :                          * We didn't detect any living workers, but not all workers are
     846                 :             :                          * known to have exited cleanly.  Either not all workers have
     847                 :             :                          * launched yet, or maybe some of them failed to start or
     848                 :             :                          * terminated abnormally.
     849                 :             :                          */
     850         [ +  + ]:         429 :                         for (i = 0; i < pcxt->nworkers_launched; ++i)
     851                 :             :                         {
     852                 :         301 :                                 pid_t           pid;
     853                 :         301 :                                 shm_mq     *mq;
     854                 :             : 
     855                 :             :                                 /*
     856                 :             :                                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
     857                 :             :                                  * should just keep waiting.  If it is BGWH_STOPPED, then
     858                 :             :                                  * further investigation is needed.
     859                 :             :                                  */
     860         [ +  + ]:         301 :                                 if (pcxt->worker[i].error_mqh == NULL ||
     861   [ +  -  +  - ]:         151 :                                         pcxt->worker[i].bgwhandle == NULL ||
     862                 :         151 :                                         GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
     863                 :         151 :                                                                                    &pid) != BGWH_STOPPED)
     864                 :         301 :                                         continue;
     865                 :             : 
     866                 :             :                                 /*
     867                 :             :                                  * Check whether the worker ended up stopped without ever
     868                 :             :                                  * attaching to the error queue.  If so, the postmaster was
     869                 :             :                                  * unable to fork the worker or it exited without initializing
     870                 :             :                                  * properly.  We must throw an error, since the caller may
     871                 :             :                                  * have been expecting the worker to do some work before
     872                 :             :                                  * exiting.
     873                 :             :                                  */
     874                 :           0 :                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     875         [ #  # ]:           0 :                                 if (shm_mq_get_sender(mq) == NULL)
     876   [ #  #  #  # ]:           0 :                                         ereport(ERROR,
     877                 :             :                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     878                 :             :                                                          errmsg("parallel worker failed to initialize"),
     879                 :             :                                                          errhint("More details may be available in the server log.")));
     880                 :             : 
     881                 :             :                                 /*
     882                 :             :                                  * The worker is stopped, but is attached to the error queue.
     883                 :             :                                  * Unless there's a bug somewhere, this will only happen when
     884                 :             :                                  * the worker writes messages and terminates after the
     885                 :             :                                  * CHECK_FOR_INTERRUPTS() near the top of this function and
     886                 :             :                                  * before the call to GetBackgroundWorkerPid().  In that case,
     887                 :             :                                  * our latch should have been set as well and the right things
     888                 :             :                                  * will happen on the next pass through the loop.
     889                 :             :                                  */
     890         [ +  - ]:         301 :                         }
     891                 :         128 :                 }
     892                 :             : 
     893                 :         129 :                 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
     894                 :             :                                                  WAIT_EVENT_PARALLEL_FINISH);
     895                 :         129 :                 ResetLatch(MyLatch);
     896         [ +  + ]:         378 :         }
     897                 :             : 
     898         [ -  + ]:         249 :         if (pcxt->toc != NULL)
     899                 :             :         {
     900                 :         249 :                 FixedParallelState *fps;
     901                 :             : 
     902                 :         249 :                 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     903         [ +  + ]:         249 :                 if (fps->last_xlog_end > XactLastRecEnd)
     904                 :           5 :                         XactLastRecEnd = fps->last_xlog_end;
     905                 :         249 :         }
     906                 :         249 : }
     907                 :             : 
     908                 :             : /*
     909                 :             :  * Wait for all workers to exit.
     910                 :             :  *
     911                 :             :  * This function ensures that workers have been completely shutdown.  The
     912                 :             :  * difference between WaitForParallelWorkersToFinish and this function is
     913                 :             :  * that the former just ensures that last message sent by a worker backend is
     914                 :             :  * received by the leader backend whereas this ensures the complete shutdown.
     915                 :             :  */
     916                 :             : static void
     917                 :         203 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
     918                 :             : {
     919                 :         203 :         int                     i;
     920                 :             : 
     921                 :             :         /* Wait until the workers actually die. */
     922         [ +  + ]:         680 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     923                 :             :         {
     924                 :         477 :                 BgwHandleStatus status;
     925                 :             : 
     926   [ +  -  -  + ]:         477 :                 if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
     927                 :           0 :                         continue;
     928                 :             : 
     929                 :         477 :                 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
     930                 :             : 
     931                 :             :                 /*
     932                 :             :                  * If the postmaster kicked the bucket, we have no chance of cleaning
     933                 :             :                  * up safely -- we won't be able to tell when our workers are actually
     934                 :             :                  * dead.  This doesn't necessitate a PANIC since they will all abort
     935                 :             :                  * eventually, but we can't safely continue this session.
     936                 :             :                  */
     937         [ +  - ]:         477 :                 if (status == BGWH_POSTMASTER_DIED)
     938   [ #  #  #  # ]:           0 :                         ereport(FATAL,
     939                 :             :                                         (errcode(ERRCODE_ADMIN_SHUTDOWN),
     940                 :             :                                          errmsg("postmaster exited during a parallel transaction")));
     941                 :             : 
     942                 :             :                 /* Release memory. */
     943                 :         477 :                 pfree(pcxt->worker[i].bgwhandle);
     944                 :         477 :                 pcxt->worker[i].bgwhandle = NULL;
     945      [ -  -  + ]:         477 :         }
     946                 :         203 : }
     947                 :             : 
     948                 :             : /*
     949                 :             :  * Destroy a parallel context.
     950                 :             :  *
     951                 :             :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
     952                 :             :  * first, before calling this function.  When this function is invoked, any
     953                 :             :  * remaining workers are forcibly killed; the dynamic shared memory segment
     954                 :             :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
     955                 :             :  */
     956                 :             : void
     957                 :         155 : DestroyParallelContext(ParallelContext *pcxt)
     958                 :             : {
     959                 :         155 :         int                     i;
     960                 :             : 
     961                 :             :         /*
     962                 :             :          * Be careful about order of operations here!  We remove the parallel
     963                 :             :          * context from the list before we do anything else; otherwise, if an
     964                 :             :          * error occurs during a subsequent step, we might try to nuke it again
     965                 :             :          * from AtEOXact_Parallel or AtEOSubXact_Parallel.
     966                 :             :          */
     967                 :         155 :         dlist_delete(&pcxt->node);
     968                 :             : 
     969                 :             :         /* Kill each worker in turn, and forget their error queues. */
     970         [ +  - ]:         155 :         if (pcxt->worker != NULL)
     971                 :             :         {
     972         [ +  + ]:         490 :                 for (i = 0; i < pcxt->nworkers_launched; ++i)
     973                 :             :                 {
     974         [ +  + ]:         335 :                         if (pcxt->worker[i].error_mqh != NULL)
     975                 :             :                         {
     976                 :           2 :                                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
     977                 :             : 
     978                 :           2 :                                 shm_mq_detach(pcxt->worker[i].error_mqh);
     979                 :           2 :                                 pcxt->worker[i].error_mqh = NULL;
     980                 :           2 :                         }
     981                 :         335 :                 }
     982                 :         155 :         }
     983                 :             : 
     984                 :             :         /*
     985                 :             :          * If we have allocated a shared memory segment, detach it.  This will
     986                 :             :          * implicitly detach the error queues, and any other shared memory queues,
     987                 :             :          * stored there.
     988                 :             :          */
     989         [ +  - ]:         155 :         if (pcxt->seg != NULL)
     990                 :             :         {
     991                 :         155 :                 dsm_detach(pcxt->seg);
     992                 :         155 :                 pcxt->seg = NULL;
     993                 :         155 :         }
     994                 :             : 
     995                 :             :         /*
     996                 :             :          * If this parallel context is actually in backend-private memory rather
     997                 :             :          * than shared memory, free that memory instead.
     998                 :             :          */
     999         [ -  + ]:         155 :         if (pcxt->private_memory != NULL)
    1000                 :             :         {
    1001                 :           0 :                 pfree(pcxt->private_memory);
    1002                 :           0 :                 pcxt->private_memory = NULL;
    1003                 :           0 :         }
    1004                 :             : 
    1005                 :             :         /*
    1006                 :             :          * We can't finish transaction commit or abort until all of the workers
    1007                 :             :          * have exited.  This means, in particular, that we can't respond to
    1008                 :             :          * interrupts at this stage.
    1009                 :             :          */
    1010                 :         155 :         HOLD_INTERRUPTS();
    1011                 :         155 :         WaitForParallelWorkersToExit(pcxt);
    1012         [ +  - ]:         155 :         RESUME_INTERRUPTS();
    1013                 :             : 
    1014                 :             :         /* Free the worker array itself. */
    1015         [ -  + ]:         155 :         if (pcxt->worker != NULL)
    1016                 :             :         {
    1017                 :         155 :                 pfree(pcxt->worker);
    1018                 :         155 :                 pcxt->worker = NULL;
    1019                 :         155 :         }
    1020                 :             : 
    1021                 :             :         /* Free memory. */
    1022                 :         155 :         pfree(pcxt->library_name);
    1023                 :         155 :         pfree(pcxt->function_name);
    1024                 :         155 :         pfree(pcxt);
    1025                 :         155 : }
    1026                 :             : 
    1027                 :             : /*
    1028                 :             :  * Are there any parallel contexts currently active?
    1029                 :             :  */
    1030                 :             : bool
    1031                 :       57110 : ParallelContextActive(void)
    1032                 :             : {
    1033                 :       57110 :         return !dlist_is_empty(&pcxt_list);
    1034                 :             : }
    1035                 :             : 
    1036                 :             : /*
    1037                 :             :  * Handle receipt of an interrupt indicating a parallel worker message.
    1038                 :             :  *
    1039                 :             :  * Note: this is called within a signal handler!  All we can do is set
    1040                 :             :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
    1041                 :             :  * ProcessParallelMessages().
    1042                 :             :  */
    1043                 :             : void
    1044                 :         416 : HandleParallelMessageInterrupt(void)
    1045                 :             : {
    1046                 :         416 :         InterruptPending = true;
    1047                 :         416 :         ParallelMessagePending = true;
    1048                 :         416 :         SetLatch(MyLatch);
    1049                 :         416 : }
    1050                 :             : 
    1051                 :             : /*
    1052                 :             :  * Process any queued protocol messages received from parallel workers.
    1053                 :             :  */
    1054                 :             : void
    1055                 :         408 : ProcessParallelMessages(void)
    1056                 :             : {
    1057                 :         408 :         dlist_iter      iter;
    1058                 :         408 :         MemoryContext oldcontext;
    1059                 :             : 
    1060                 :             :         static MemoryContext hpm_context = NULL;
    1061                 :             : 
    1062                 :             :         /*
    1063                 :             :          * This is invoked from ProcessInterrupts(), and since some of the
    1064                 :             :          * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1065                 :             :          * for recursive calls if more signals are received while this runs.  It's
    1066                 :             :          * unclear that recursive entry would be safe, and it doesn't seem useful
    1067                 :             :          * even if it is safe, so let's block interrupts until done.
    1068                 :             :          */
    1069                 :         408 :         HOLD_INTERRUPTS();
    1070                 :             : 
    1071                 :             :         /*
    1072                 :             :          * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
    1073                 :             :          * don't want to risk leaking data into long-lived contexts, so let's do
    1074                 :             :          * our work here in a private context that we can reset on each use.
    1075                 :             :          */
    1076         [ +  + ]:         408 :         if (hpm_context == NULL)        /* first time through? */
    1077                 :          22 :                 hpm_context = AllocSetContextCreate(TopMemoryContext,
    1078                 :             :                                                                                         "ProcessParallelMessages",
    1079                 :             :                                                                                         ALLOCSET_DEFAULT_SIZES);
    1080                 :             :         else
    1081                 :         386 :                 MemoryContextReset(hpm_context);
    1082                 :             : 
    1083                 :         408 :         oldcontext = MemoryContextSwitchTo(hpm_context);
    1084                 :             : 
    1085                 :             :         /* OK to process messages.  Reset the flag saying there are more to do. */
    1086                 :         408 :         ParallelMessagePending = false;
    1087                 :             : 
    1088   [ +  -  +  + ]:         855 :         dlist_foreach(iter, &pcxt_list)
    1089                 :             :         {
    1090                 :         447 :                 ParallelContext *pcxt;
    1091                 :         447 :                 int                     i;
    1092                 :             : 
    1093                 :         447 :                 pcxt = dlist_container(ParallelContext, node, iter.cur);
    1094         [ -  + ]:         447 :                 if (pcxt->worker == NULL)
    1095                 :           0 :                         continue;
    1096                 :             : 
    1097         [ +  + ]:        1761 :                 for (i = 0; i < pcxt->nworkers_launched; ++i)
    1098                 :             :                 {
    1099                 :             :                         /*
    1100                 :             :                          * Read as many messages as we can from each worker, but stop when
    1101                 :             :                          * either (1) the worker's error queue goes away, which can happen
    1102                 :             :                          * if we receive a Terminate message from the worker; or (2) no
    1103                 :             :                          * more messages can be read from the worker without blocking.
    1104                 :             :                          */
    1105         [ +  + ]:        1789 :                         while (pcxt->worker[i].error_mqh != NULL)
    1106                 :             :                         {
    1107                 :         785 :                                 shm_mq_result res;
    1108                 :         785 :                                 Size            nbytes;
    1109                 :         785 :                                 void       *data;
    1110                 :             : 
    1111                 :         785 :                                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
    1112                 :             :                                                                          &data, true);
    1113         [ +  + ]:         785 :                                 if (res == SHM_MQ_WOULD_BLOCK)
    1114                 :         310 :                                         break;
    1115         [ +  - ]:         475 :                                 else if (res == SHM_MQ_SUCCESS)
    1116                 :             :                                 {
    1117                 :         475 :                                         StringInfoData msg;
    1118                 :             : 
    1119                 :         475 :                                         initStringInfo(&msg);
    1120                 :         475 :                                         appendBinaryStringInfo(&msg, data, nbytes);
    1121                 :         475 :                                         ProcessParallelMessage(pcxt, i, &msg);
    1122                 :         475 :                                         pfree(msg.data);
    1123                 :         475 :                                 }
    1124                 :             :                                 else
    1125   [ #  #  #  # ]:           0 :                                         ereport(ERROR,
    1126                 :             :                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1127                 :             :                                                          errmsg("lost connection to parallel worker")));
    1128         [ +  + ]:         785 :                         }
    1129                 :        1314 :                 }
    1130         [ -  + ]:         447 :         }
    1131                 :             : 
    1132                 :         408 :         MemoryContextSwitchTo(oldcontext);
    1133                 :             : 
    1134                 :             :         /* Might as well clear the context on our way out */
    1135                 :         408 :         MemoryContextReset(hpm_context);
    1136                 :             : 
    1137         [ +  - ]:         408 :         RESUME_INTERRUPTS();
    1138                 :         408 : }
    1139                 :             : 
    1140                 :             : /*
    1141                 :             :  * Process a single protocol message received from a single parallel worker.
    1142                 :             :  */
    1143                 :             : static void
    1144                 :         477 : ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
    1145                 :             : {
    1146                 :         477 :         char            msgtype;
    1147                 :             : 
    1148   [ +  -  +  + ]:         477 :         if (pcxt->known_attached_workers != NULL &&
    1149                 :         477 :                 !pcxt->known_attached_workers[i])
    1150                 :             :         {
    1151                 :         453 :                 pcxt->known_attached_workers[i] = true;
    1152                 :         453 :                 pcxt->nknown_attached_workers++;
    1153                 :         453 :         }
    1154                 :             : 
    1155                 :         477 :         msgtype = pq_getmsgbyte(msg);
    1156                 :             : 
    1157   [ +  -  -  +  :         477 :         switch (msgtype)
                      - ]
    1158                 :             :         {
    1159                 :             :                 case PqMsg_ErrorResponse:
    1160                 :             :                 case PqMsg_NoticeResponse:
    1161                 :             :                         {
    1162                 :           2 :                                 ErrorData       edata;
    1163                 :           2 :                                 ErrorContextCallback *save_error_context_stack;
    1164                 :             : 
    1165                 :             :                                 /* Parse ErrorResponse or NoticeResponse. */
    1166                 :           2 :                                 pq_parse_errornotice(msg, &edata);
    1167                 :             : 
    1168                 :             :                                 /* Death of a worker isn't enough justification for suicide. */
    1169         [ -  + ]:           2 :                                 edata.elevel = Min(edata.elevel, ERROR);
    1170                 :             : 
    1171                 :             :                                 /*
    1172                 :             :                                  * If desired, add a context line to show that this is a
    1173                 :             :                                  * message propagated from a parallel worker.  Otherwise, it
    1174                 :             :                                  * can sometimes be confusing to understand what actually
    1175                 :             :                                  * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
    1176                 :             :                                  * because it causes test-result instability depending on
    1177                 :             :                                  * whether a parallel worker is actually used or not.)
    1178                 :             :                                  */
    1179         [ -  + ]:           2 :                                 if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
    1180                 :             :                                 {
    1181         [ +  + ]:           2 :                                         if (edata.context)
    1182                 :           2 :                                                 edata.context = psprintf("%s\n%s", edata.context,
    1183                 :           1 :                                                                                                  _("parallel worker"));
    1184                 :             :                                         else
    1185                 :           1 :                                                 edata.context = pstrdup(_("parallel worker"));
    1186                 :           2 :                                 }
    1187                 :             : 
    1188                 :             :                                 /*
    1189                 :             :                                  * Context beyond that should use the error context callbacks
    1190                 :             :                                  * that were in effect when the ParallelContext was created,
    1191                 :             :                                  * not the current ones.
    1192                 :             :                                  */
    1193                 :           2 :                                 save_error_context_stack = error_context_stack;
    1194                 :           2 :                                 error_context_stack = pcxt->error_context_stack;
    1195                 :             : 
    1196                 :             :                                 /* Rethrow error or print notice. */
    1197                 :           2 :                                 ThrowErrorData(&edata);
    1198                 :             : 
    1199                 :             :                                 /* Not an error, so restore previous context stack. */
    1200                 :           2 :                                 error_context_stack = save_error_context_stack;
    1201                 :             : 
    1202                 :             :                                 break;
    1203                 :           2 :                         }
    1204                 :             : 
    1205                 :             :                 case PqMsg_NotificationResponse:
    1206                 :             :                         {
    1207                 :             :                                 /* Propagate NotifyResponse. */
    1208                 :           0 :                                 int32           pid;
    1209                 :           0 :                                 const char *channel;
    1210                 :           0 :                                 const char *payload;
    1211                 :             : 
    1212                 :           0 :                                 pid = pq_getmsgint(msg, 4);
    1213                 :           0 :                                 channel = pq_getmsgrawstring(msg);
    1214                 :           0 :                                 payload = pq_getmsgrawstring(msg);
    1215                 :           0 :                                 pq_endmessage(msg);
    1216                 :             : 
    1217                 :           0 :                                 NotifyMyFrontEnd(channel, payload, pid);
    1218                 :             : 
    1219                 :             :                                 break;
    1220                 :           0 :                         }
    1221                 :             : 
    1222                 :             :                 case PqMsg_Progress:
    1223                 :             :                         {
    1224                 :             :                                 /*
    1225                 :             :                                  * Only incremental progress reporting is currently supported.
    1226                 :             :                                  * However, it's possible to add more fields to the message to
    1227                 :             :                                  * allow for handling of other backend progress APIs.
    1228                 :             :                                  */
    1229                 :           0 :                                 int                     index = pq_getmsgint(msg, 4);
    1230                 :           0 :                                 int64           incr = pq_getmsgint64(msg);
    1231                 :             : 
    1232                 :           0 :                                 pq_getmsgend(msg);
    1233                 :             : 
    1234                 :           0 :                                 pgstat_progress_incr_param(index, incr);
    1235                 :             : 
    1236                 :             :                                 break;
    1237                 :           0 :                         }
    1238                 :             : 
    1239                 :             :                 case PqMsg_Terminate:
    1240                 :             :                         {
    1241                 :         475 :                                 shm_mq_detach(pcxt->worker[i].error_mqh);
    1242                 :         475 :                                 pcxt->worker[i].error_mqh = NULL;
    1243                 :         475 :                                 break;
    1244                 :             :                         }
    1245                 :             : 
    1246                 :             :                 default:
    1247                 :             :                         {
    1248   [ #  #  #  # ]:           0 :                                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
    1249                 :             :                                          msgtype, msg->len);
    1250                 :             :                         }
    1251                 :           0 :         }
    1252                 :         477 : }
    1253                 :             : 
    1254                 :             : /*
    1255                 :             :  * End-of-subtransaction cleanup for parallel contexts.
    1256                 :             :  *
    1257                 :             :  * Here we remove only parallel contexts initiated within the current
    1258                 :             :  * subtransaction.
    1259                 :             :  */
    1260                 :             : void
    1261                 :        1665 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
    1262                 :             : {
    1263         [ +  + ]:        1666 :         while (!dlist_is_empty(&pcxt_list))
    1264                 :             :         {
    1265                 :           1 :                 ParallelContext *pcxt;
    1266                 :             : 
    1267                 :           1 :                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1268         [ -  + ]:           1 :                 if (pcxt->subid != mySubId)
    1269                 :           0 :                         break;
    1270         [ +  - ]:           1 :                 if (isCommit)
    1271   [ #  #  #  # ]:           0 :                         elog(WARNING, "leaked parallel context");
    1272                 :           1 :                 DestroyParallelContext(pcxt);
    1273      [ -  -  + ]:           1 :         }
    1274                 :        1665 : }
    1275                 :             : 
    1276                 :             : /*
    1277                 :             :  * End-of-transaction cleanup for parallel contexts.
    1278                 :             :  *
    1279                 :             :  * We nuke all remaining parallel contexts.
    1280                 :             :  */
    1281                 :             : void
    1282                 :       57918 : AtEOXact_Parallel(bool isCommit)
    1283                 :             : {
    1284         [ +  + ]:       57919 :         while (!dlist_is_empty(&pcxt_list))
    1285                 :             :         {
    1286                 :           1 :                 ParallelContext *pcxt;
    1287                 :             : 
    1288                 :           1 :                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1289         [ +  - ]:           1 :                 if (isCommit)
    1290   [ #  #  #  # ]:           0 :                         elog(WARNING, "leaked parallel context");
    1291                 :           1 :                 DestroyParallelContext(pcxt);
    1292                 :           1 :         }
    1293                 :       57918 : }
    1294                 :             : 
    1295                 :             : /*
    1296                 :             :  * Main entrypoint for parallel workers.
    1297                 :             :  */
    1298                 :             : void
    1299                 :         477 : ParallelWorkerMain(Datum main_arg)
    1300                 :             : {
    1301                 :         477 :         dsm_segment *seg;
    1302                 :         477 :         shm_toc    *toc;
    1303                 :         477 :         FixedParallelState *fps;
    1304                 :         477 :         char       *error_queue_space;
    1305                 :         477 :         shm_mq     *mq;
    1306                 :         477 :         shm_mq_handle *mqh;
    1307                 :         477 :         char       *libraryspace;
    1308                 :         477 :         char       *entrypointstate;
    1309                 :         477 :         char       *library_name;
    1310                 :         477 :         char       *function_name;
    1311                 :         477 :         parallel_worker_main_type entrypt;
    1312                 :         477 :         char       *gucspace;
    1313                 :         477 :         char       *combocidspace;
    1314                 :         477 :         char       *tsnapspace;
    1315                 :         477 :         char       *asnapspace;
    1316                 :         477 :         char       *tstatespace;
    1317                 :         477 :         char       *pendingsyncsspace;
    1318                 :         477 :         char       *reindexspace;
    1319                 :         477 :         char       *relmapperspace;
    1320                 :         477 :         char       *uncommittedenumsspace;
    1321                 :         477 :         char       *clientconninfospace;
    1322                 :         477 :         char       *session_dsm_handle_space;
    1323                 :         477 :         Snapshot        tsnapshot;
    1324                 :         477 :         Snapshot        asnapshot;
    1325                 :             : 
    1326                 :             :         /* Set flag to indicate that we're initializing a parallel worker. */
    1327                 :         477 :         InitializingParallelWorker = true;
    1328                 :             : 
    1329                 :             :         /* Establish signal handlers. */
    1330                 :         477 :         pqsignal(SIGTERM, die);
    1331                 :         477 :         BackgroundWorkerUnblockSignals();
    1332                 :             : 
    1333                 :             :         /* Determine and set our parallel worker number. */
    1334         [ +  - ]:         477 :         Assert(ParallelWorkerNumber == -1);
    1335                 :         477 :         memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
    1336                 :             : 
    1337                 :             :         /* Set up a memory context to work in, just for cleanliness. */
    1338                 :         477 :         CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
    1339                 :             :                                                                                                  "Parallel worker",
    1340                 :             :                                                                                                  ALLOCSET_DEFAULT_SIZES);
    1341                 :             : 
    1342                 :             :         /*
    1343                 :             :          * Attach to the dynamic shared memory segment for the parallel query, and
    1344                 :             :          * find its table of contents.
    1345                 :             :          *
    1346                 :             :          * Note: at this point, we have not created any ResourceOwner in this
    1347                 :             :          * process.  This will result in our DSM mapping surviving until process
    1348                 :             :          * exit, which is fine.  If there were a ResourceOwner, it would acquire
    1349                 :             :          * ownership of the mapping, but we have no need for that.
    1350                 :             :          */
    1351                 :         477 :         seg = dsm_attach(DatumGetUInt32(main_arg));
    1352         [ +  - ]:         477 :         if (seg == NULL)
    1353   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1354                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1355                 :             :                                  errmsg("could not map dynamic shared memory segment")));
    1356                 :         477 :         toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
    1357         [ +  - ]:         477 :         if (toc == NULL)
    1358   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1359                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1360                 :             :                                  errmsg("invalid magic number in dynamic shared memory segment")));
    1361                 :             : 
    1362                 :             :         /* Look up fixed parallel state. */
    1363                 :         477 :         fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
    1364                 :         477 :         MyFixedParallelState = fps;
    1365                 :             : 
    1366                 :             :         /* Arrange to signal the leader if we exit. */
    1367                 :         477 :         ParallelLeaderPid = fps->parallel_leader_pid;
    1368                 :         477 :         ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
    1369                 :         477 :         before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
    1370                 :             : 
    1371                 :             :         /*
    1372                 :             :          * Now we can find and attach to the error queue provided for us.  That's
    1373                 :             :          * good, because until we do that, any errors that happen here will not be
    1374                 :             :          * reported back to the process that requested that this worker be
    1375                 :             :          * launched.
    1376                 :             :          */
    1377                 :         477 :         error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
    1378                 :         954 :         mq = (shm_mq *) (error_queue_space +
    1379                 :         477 :                                          ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
    1380                 :         477 :         shm_mq_set_sender(mq, MyProc);
    1381                 :         477 :         mqh = shm_mq_attach(mq, seg, NULL);
    1382                 :         477 :         pq_redirect_to_shm_mq(seg, mqh);
    1383                 :         954 :         pq_set_parallel_leader(fps->parallel_leader_pid,
    1384                 :         477 :                                                    fps->parallel_leader_proc_number);
    1385                 :             : 
    1386                 :             :         /*
    1387                 :             :          * Hooray! Primary initialization is complete.  Now, we need to set up our
    1388                 :             :          * backend-local state to match the original backend.
    1389                 :             :          */
    1390                 :             : 
    1391                 :             :         /*
    1392                 :             :          * Join locking group.  We must do this before anything that could try to
    1393                 :             :          * acquire a heavyweight lock, because any heavyweight locks acquired to
    1394                 :             :          * this point could block either directly against the parallel group
    1395                 :             :          * leader or against some process which in turn waits for a lock that
    1396                 :             :          * conflicts with the parallel group leader, causing an undetected
    1397                 :             :          * deadlock.  (If we can't join the lock group, the leader has gone away,
    1398                 :             :          * so just exit quietly.)
    1399                 :             :          */
    1400   [ +  -  +  - ]:         954 :         if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
    1401                 :         477 :                                                            fps->parallel_leader_pid))
    1402                 :           0 :                 return;
    1403                 :             : 
    1404                 :             :         /*
    1405                 :             :          * Restore transaction and statement start-time timestamps.  This must
    1406                 :             :          * happen before anything that would start a transaction, else asserts in
    1407                 :             :          * xact.c will fire.
    1408                 :             :          */
    1409                 :         477 :         SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
    1410                 :             : 
    1411                 :             :         /*
    1412                 :             :          * Identify the entry point to be called.  In theory this could result in
    1413                 :             :          * loading an additional library, though most likely the entry point is in
    1414                 :             :          * the core backend or in a library we just loaded.
    1415                 :             :          */
    1416                 :         477 :         entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
    1417                 :         477 :         library_name = entrypointstate;
    1418                 :         477 :         function_name = entrypointstate + strlen(library_name) + 1;
    1419                 :             : 
    1420                 :         477 :         entrypt = LookupParallelWorkerFunction(library_name, function_name);
    1421                 :             : 
    1422                 :             :         /*
    1423                 :             :          * Restore current session authorization and role id.  No verification
    1424                 :             :          * happens here, we just blindly adopt the leader's state.  Note that this
    1425                 :             :          * has to happen before InitPostgres, since InitializeSessionUserId will
    1426                 :             :          * not set these variables.
    1427                 :             :          */
    1428                 :         477 :         SetAuthenticatedUserId(fps->authenticated_user_id);
    1429                 :         954 :         SetSessionAuthorization(fps->session_user_id,
    1430                 :         477 :                                                         fps->session_user_is_superuser);
    1431                 :         477 :         SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
    1432                 :             : 
    1433                 :             :         /*
    1434                 :             :          * Restore database connection.  We skip connection authorization checks,
    1435                 :             :          * reasoning that (a) the leader checked these things when it started, and
    1436                 :             :          * (b) we do not want parallel mode to cause these failures, because that
    1437                 :             :          * would make use of parallel query plans not transparent to applications.
    1438                 :             :          */
    1439                 :         954 :         BackgroundWorkerInitializeConnectionByOid(fps->database_id,
    1440                 :         477 :                                                                                           fps->authenticated_user_id,
    1441                 :             :                                                                                           BGWORKER_BYPASS_ALLOWCONN |
    1442                 :             :                                                                                           BGWORKER_BYPASS_ROLELOGINCHECK);
    1443                 :             : 
    1444                 :             :         /*
    1445                 :             :          * Set the client encoding to the database encoding, since that is what
    1446                 :             :          * the leader will expect.  (We're cheating a bit by not calling
    1447                 :             :          * PrepareClientEncoding first.  It's okay because this call will always
    1448                 :             :          * result in installing a no-op conversion.  No error should be possible,
    1449                 :             :          * but check anyway.)
    1450                 :             :          */
    1451         [ +  - ]:         477 :         if (SetClientEncoding(GetDatabaseEncoding()) < 0)
    1452   [ #  #  #  # ]:           0 :                 elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
    1453                 :             : 
    1454                 :             :         /*
    1455                 :             :          * Load libraries that were loaded by original backend.  We want to do
    1456                 :             :          * this before restoring GUCs, because the libraries might define custom
    1457                 :             :          * variables.
    1458                 :             :          */
    1459                 :         477 :         libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
    1460                 :         477 :         StartTransactionCommand();
    1461                 :         477 :         RestoreLibraryState(libraryspace);
    1462                 :         477 :         CommitTransactionCommand();
    1463                 :             : 
    1464                 :             :         /* Crank up a transaction state appropriate to a parallel worker. */
    1465                 :         477 :         tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
    1466                 :         477 :         StartParallelWorkerTransaction(tstatespace);
    1467                 :             : 
    1468                 :             :         /*
    1469                 :             :          * Restore state that affects catalog access.  Ideally we'd do this even
    1470                 :             :          * before calling InitPostgres, but that has order-of-initialization
    1471                 :             :          * problems, and also the relmapper would get confused during the
    1472                 :             :          * CommitTransactionCommand call above.
    1473                 :             :          */
    1474                 :         477 :         pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
    1475                 :             :                                                                            false);
    1476                 :         477 :         RestorePendingSyncs(pendingsyncsspace);
    1477                 :         477 :         relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
    1478                 :         477 :         RestoreRelationMap(relmapperspace);
    1479                 :         477 :         reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
    1480                 :         477 :         RestoreReindexState(reindexspace);
    1481                 :         477 :         combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
    1482                 :         477 :         RestoreComboCIDState(combocidspace);
    1483                 :             : 
    1484                 :             :         /* Attach to the per-session DSM segment and contained objects. */
    1485                 :         477 :         session_dsm_handle_space =
    1486                 :         477 :                 shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
    1487                 :         477 :         AttachSession(*(dsm_handle *) session_dsm_handle_space);
    1488                 :             : 
    1489                 :             :         /*
    1490                 :             :          * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
    1491                 :             :          * the leader has serialized the transaction snapshot and we must restore
    1492                 :             :          * it. At lower isolation levels, there is no transaction-lifetime
    1493                 :             :          * snapshot, but we need TransactionXmin to get set to a value which is
    1494                 :             :          * less than or equal to the xmin of every snapshot that will be used by
    1495                 :             :          * this worker. The easiest way to accomplish that is to install the
    1496                 :             :          * active snapshot as the transaction snapshot. Code running in this
    1497                 :             :          * parallel worker might take new snapshots via GetTransactionSnapshot()
    1498                 :             :          * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
    1499                 :             :          * snapshot older than the active snapshot.
    1500                 :             :          */
    1501                 :         477 :         asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
    1502                 :         477 :         tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
    1503                 :         477 :         asnapshot = RestoreSnapshot(asnapspace);
    1504         [ -  + ]:         477 :         tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
    1505                 :         954 :         RestoreTransactionSnapshot(tsnapshot,
    1506                 :         477 :                                                            fps->parallel_leader_pgproc);
    1507                 :         477 :         PushActiveSnapshot(asnapshot);
    1508                 :             : 
    1509                 :             :         /*
    1510                 :             :          * We've changed which tuples we can see, and must therefore invalidate
    1511                 :             :          * system caches.
    1512                 :             :          */
    1513                 :         477 :         InvalidateSystemCaches();
    1514                 :             : 
    1515                 :             :         /*
    1516                 :             :          * Restore GUC values from launching backend.  We can't do this earlier,
    1517                 :             :          * because GUC check hooks that do catalog lookups need to see the same
    1518                 :             :          * database state as the leader.  Also, the check hooks for
    1519                 :             :          * session_authorization and role assume we already set the correct role
    1520                 :             :          * OIDs.
    1521                 :             :          */
    1522                 :         477 :         gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
    1523                 :         477 :         RestoreGUCState(gucspace);
    1524                 :             : 
    1525                 :             :         /*
    1526                 :             :          * Restore current user ID and security context.  No verification happens
    1527                 :             :          * here, we just blindly adopt the leader's state.  We can't do this till
    1528                 :             :          * after restoring GUCs, else we'll get complaints about restoring
    1529                 :             :          * session_authorization and role.  (In effect, we're assuming that all
    1530                 :             :          * the restored values are okay to set, even if we are now inside a
    1531                 :             :          * restricted context.)
    1532                 :             :          */
    1533                 :         477 :         SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
    1534                 :             : 
    1535                 :             :         /* Restore temp-namespace state to ensure search path matches leader's. */
    1536                 :         954 :         SetTempNamespaceState(fps->temp_namespace_id,
    1537                 :         477 :                                                   fps->temp_toast_namespace_id);
    1538                 :             : 
    1539                 :             :         /* Restore uncommitted enums. */
    1540                 :         477 :         uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
    1541                 :             :                                                                                    false);
    1542                 :         477 :         RestoreUncommittedEnums(uncommittedenumsspace);
    1543                 :             : 
    1544                 :             :         /* Restore the ClientConnectionInfo. */
    1545                 :         477 :         clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
    1546                 :             :                                                                                  false);
    1547                 :         477 :         RestoreClientConnectionInfo(clientconninfospace);
    1548                 :             : 
    1549                 :             :         /*
    1550                 :             :          * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
    1551                 :             :          * ensure that auth_method is actually valid, aka authn_id is not NULL.
    1552                 :             :          */
    1553         [ +  - ]:         477 :         if (MyClientConnectionInfo.authn_id)
    1554                 :           0 :                 InitializeSystemUser(MyClientConnectionInfo.authn_id,
    1555                 :           0 :                                                          hba_authname(MyClientConnectionInfo.auth_method));
    1556                 :             : 
    1557                 :             :         /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
    1558                 :         477 :         AttachSerializableXact(fps->serializable_xact_handle);
    1559                 :             : 
    1560                 :             :         /*
    1561                 :             :          * We've initialized all of our state now; nothing should change
    1562                 :             :          * hereafter.
    1563                 :             :          */
    1564                 :         477 :         InitializingParallelWorker = false;
    1565                 :         477 :         EnterParallelMode();
    1566                 :             : 
    1567                 :             :         /*
    1568                 :             :          * Time to do the real work: invoke the caller-supplied code.
    1569                 :             :          */
    1570                 :         477 :         entrypt(seg, toc);
    1571                 :             : 
    1572                 :             :         /* Must exit parallel mode to pop active snapshot. */
    1573                 :         477 :         ExitParallelMode();
    1574                 :             : 
    1575                 :             :         /* Must pop active snapshot so snapmgr.c doesn't complain. */
    1576                 :         477 :         PopActiveSnapshot();
    1577                 :             : 
    1578                 :             :         /* Shut down the parallel-worker transaction. */
    1579                 :         477 :         EndParallelWorkerTransaction();
    1580                 :             : 
    1581                 :             :         /* Detach from the per-session DSM segment. */
    1582                 :         477 :         DetachSession();
    1583                 :             : 
    1584                 :             :         /* Report success. */
    1585                 :         477 :         pq_putmessage(PqMsg_Terminate, NULL, 0);
    1586         [ -  + ]:         477 : }
    1587                 :             : 
    1588                 :             : /*
    1589                 :             :  * Update shared memory with the ending location of the last WAL record we
    1590                 :             :  * wrote, if it's greater than the value already stored there.
    1591                 :             :  */
    1592                 :             : void
    1593                 :         475 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
    1594                 :             : {
    1595                 :         475 :         FixedParallelState *fps = MyFixedParallelState;
    1596                 :             : 
    1597         [ +  - ]:         475 :         Assert(fps != NULL);
    1598         [ -  + ]:         475 :         SpinLockAcquire(&fps->mutex);
    1599         [ +  + ]:         475 :         if (fps->last_xlog_end < last_xlog_end)
    1600                 :           7 :                 fps->last_xlog_end = last_xlog_end;
    1601                 :         475 :         SpinLockRelease(&fps->mutex);
    1602                 :         475 : }
    1603                 :             : 
    1604                 :             : /*
    1605                 :             :  * Make sure the leader tries to read from our error queue one more time.
    1606                 :             :  * This guards against the case where we exit uncleanly without sending an
    1607                 :             :  * ErrorResponse to the leader, for example because some code calls proc_exit
    1608                 :             :  * directly.
    1609                 :             :  *
    1610                 :             :  * Also explicitly detach from dsm segment so that subsystems using
    1611                 :             :  * on_dsm_detach() have a chance to send stats before the stats subsystem is
    1612                 :             :  * shut down as part of a before_shmem_exit() hook.
    1613                 :             :  *
    1614                 :             :  * One might think this could instead be solved by carefully ordering the
    1615                 :             :  * attaching to dsm segments, so that the pgstats segments get detached from
    1616                 :             :  * later than the parallel query one. That turns out to not work because the
    1617                 :             :  * stats hash might need to grow which can cause new segments to be allocated,
    1618                 :             :  * which then will be detached from earlier.
    1619                 :             :  */
    1620                 :             : static void
    1621                 :         477 : ParallelWorkerShutdown(int code, Datum arg)
    1622                 :             : {
    1623                 :         954 :         SendProcSignal(ParallelLeaderPid,
    1624                 :             :                                    PROCSIG_PARALLEL_MESSAGE,
    1625                 :         477 :                                    ParallelLeaderProcNumber);
    1626                 :             : 
    1627                 :         477 :         dsm_detach((dsm_segment *) DatumGetPointer(arg));
    1628                 :         477 : }
    1629                 :             : 
    1630                 :             : /*
    1631                 :             :  * Look up (and possibly load) a parallel worker entry point function.
    1632                 :             :  *
    1633                 :             :  * For functions contained in the core code, we use library name "postgres"
    1634                 :             :  * and consult the InternalParallelWorkers array.  External functions are
    1635                 :             :  * looked up, and loaded if necessary, using load_external_function().
    1636                 :             :  *
    1637                 :             :  * The point of this is to pass function names as strings across process
    1638                 :             :  * boundaries.  We can't pass actual function addresses because of the
    1639                 :             :  * possibility that the function has been loaded at a different address
    1640                 :             :  * in a different process.  This is obviously a hazard for functions in
    1641                 :             :  * loadable libraries, but it can happen even for functions in the core code
    1642                 :             :  * on platforms using EXEC_BACKEND (e.g., Windows).
    1643                 :             :  *
    1644                 :             :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
    1645                 :             :  * in favor of applying load_external_function() for core functions too;
    1646                 :             :  * but that raises portability issues that are not worth addressing now.
    1647                 :             :  */
    1648                 :             : static parallel_worker_main_type
    1649                 :         477 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
    1650                 :             : {
    1651                 :             :         /*
    1652                 :             :          * If the function is to be loaded from postgres itself, search the
    1653                 :             :          * InternalParallelWorkers array.
    1654                 :             :          */
    1655         [ -  + ]:         477 :         if (strcmp(libraryname, "postgres") == 0)
    1656                 :             :         {
    1657                 :         477 :                 int                     i;
    1658                 :             : 
    1659         [ +  - ]:         558 :                 for (i = 0; i < lengthof(InternalParallelWorkers); i++)
    1660                 :             :                 {
    1661         [ +  + ]:         558 :                         if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
    1662                 :         477 :                                 return InternalParallelWorkers[i].fn_addr;
    1663                 :          81 :                 }
    1664                 :             : 
    1665                 :             :                 /* We can only reach this by programming error. */
    1666   [ #  #  #  # ]:           0 :                 elog(ERROR, "internal function \"%s\" not found", funcname);
    1667      [ -  +  - ]:         477 :         }
    1668                 :             : 
    1669                 :             :         /* Otherwise load from external library. */
    1670                 :           0 :         return (parallel_worker_main_type)
    1671                 :           0 :                 load_external_function(libraryname, funcname, true, NULL);
    1672                 :         477 : }
        

Generated by: LCOV version 2.3.2-1