LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 37.1 % 703 261
Test Date: 2026-01-26 10:56:24 Functions: 59.5 % 37 22
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 21.4 % 448 96

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  * launcher.c
       3                 :             :  *         PostgreSQL logical replication worker launcher process
       4                 :             :  *
       5                 :             :  * Copyright (c) 2016-2026, PostgreSQL Global Development Group
       6                 :             :  *
       7                 :             :  * IDENTIFICATION
       8                 :             :  *        src/backend/replication/logical/launcher.c
       9                 :             :  *
      10                 :             :  * NOTES
      11                 :             :  *        This module contains the logical replication worker launcher which
      12                 :             :  *        uses the background worker infrastructure to start the logical
      13                 :             :  *        replication workers for every enabled subscription.
      14                 :             :  *
      15                 :             :  *-------------------------------------------------------------------------
      16                 :             :  */
      17                 :             : 
      18                 :             : #include "postgres.h"
      19                 :             : 
      20                 :             : #include "access/heapam.h"
      21                 :             : #include "access/htup.h"
      22                 :             : #include "access/htup_details.h"
      23                 :             : #include "access/tableam.h"
      24                 :             : #include "access/xact.h"
      25                 :             : #include "catalog/pg_subscription.h"
      26                 :             : #include "catalog/pg_subscription_rel.h"
      27                 :             : #include "funcapi.h"
      28                 :             : #include "lib/dshash.h"
      29                 :             : #include "miscadmin.h"
      30                 :             : #include "pgstat.h"
      31                 :             : #include "postmaster/bgworker.h"
      32                 :             : #include "postmaster/interrupt.h"
      33                 :             : #include "replication/logicallauncher.h"
      34                 :             : #include "replication/origin.h"
      35                 :             : #include "replication/slot.h"
      36                 :             : #include "replication/walreceiver.h"
      37                 :             : #include "replication/worker_internal.h"
      38                 :             : #include "storage/ipc.h"
      39                 :             : #include "storage/proc.h"
      40                 :             : #include "storage/procarray.h"
      41                 :             : #include "tcop/tcopprot.h"
      42                 :             : #include "utils/builtins.h"
      43                 :             : #include "utils/memutils.h"
      44                 :             : #include "utils/pg_lsn.h"
      45                 :             : #include "utils/snapmgr.h"
      46                 :             : #include "utils/syscache.h"
      47                 :             : 
      48                 :             : /* max sleep time between cycles (3min) */
      49                 :             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      50                 :             : 
      51                 :             : /* GUC variables */
      52                 :             : int                     max_logical_replication_workers = 4;
      53                 :             : int                     max_sync_workers_per_subscription = 2;
      54                 :             : int                     max_parallel_apply_workers_per_subscription = 2;
      55                 :             : 
      56                 :             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      57                 :             : 
      58                 :             : typedef struct LogicalRepCtxStruct
      59                 :             : {
      60                 :             :         /* Supervisor process. */
      61                 :             :         pid_t           launcher_pid;
      62                 :             : 
      63                 :             :         /* Hash table holding last start times of subscriptions' apply workers. */
      64                 :             :         dsa_handle      last_start_dsa;
      65                 :             :         dshash_table_handle last_start_dsh;
      66                 :             : 
      67                 :             :         /* Background workers. */
      68                 :             :         LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      69                 :             : } LogicalRepCtxStruct;
      70                 :             : 
      71                 :             : static LogicalRepCtxStruct *LogicalRepCtx;
      72                 :             : 
      73                 :             : /* an entry in the last-start-times shared hash table */
      74                 :             : typedef struct LauncherLastStartTimesEntry
      75                 :             : {
      76                 :             :         Oid                     subid;                  /* OID of logrep subscription (hash key) */
      77                 :             :         TimestampTz last_start_time;    /* last time its apply worker was started */
      78                 :             : } LauncherLastStartTimesEntry;
      79                 :             : 
      80                 :             : /* parameters for the last-start-times shared hash table */
      81                 :             : static const dshash_parameters dsh_params = {
      82                 :             :         sizeof(Oid),
      83                 :             :         sizeof(LauncherLastStartTimesEntry),
      84                 :             :         dshash_memcmp,
      85                 :             :         dshash_memhash,
      86                 :             :         dshash_memcpy,
      87                 :             :         LWTRANCHE_LAUNCHER_HASH
      88                 :             : };
      89                 :             : 
      90                 :             : static dsa_area *last_start_times_dsa = NULL;
      91                 :             : static dshash_table *last_start_times = NULL;
      92                 :             : 
      93                 :             : static bool on_commit_launcher_wakeup = false;
      94                 :             : 
      95                 :             : 
      96                 :             : static void logicalrep_launcher_onexit(int code, Datum arg);
      97                 :             : static void logicalrep_worker_onexit(int code, Datum arg);
      98                 :             : static void logicalrep_worker_detach(void);
      99                 :             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
     100                 :             : static int      logicalrep_pa_worker_count(Oid subid);
     101                 :             : static void logicalrep_launcher_attach_dshmem(void);
     102                 :             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     103                 :             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     104                 :             : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
     105                 :             : static bool acquire_conflict_slot_if_exists(void);
     106                 :             : static void update_conflict_slot_xmin(TransactionId new_xmin);
     107                 :             : static void init_conflict_slot_xmin(void);
     108                 :             : 
     109                 :             : 
     110                 :             : /*
     111                 :             :  * Load the list of subscriptions.
     112                 :             :  *
     113                 :             :  * Only the fields interesting for worker start/stop functions are filled for
     114                 :             :  * each subscription.
     115                 :             :  */
     116                 :             : static List *
     117                 :           3 : get_subscription_list(void)
     118                 :             : {
     119                 :           3 :         List       *res = NIL;
     120                 :           3 :         Relation        rel;
     121                 :           3 :         TableScanDesc scan;
     122                 :           3 :         HeapTuple       tup;
     123                 :           3 :         MemoryContext resultcxt;
     124                 :             : 
     125                 :             :         /* This is the context that we will allocate our output data in */
     126                 :           3 :         resultcxt = CurrentMemoryContext;
     127                 :             : 
     128                 :             :         /*
     129                 :             :          * Start a transaction so we can access pg_subscription.
     130                 :             :          */
     131                 :           3 :         StartTransactionCommand();
     132                 :             : 
     133                 :           3 :         rel = table_open(SubscriptionRelationId, AccessShareLock);
     134                 :           3 :         scan = table_beginscan_catalog(rel, 0, NULL);
     135                 :             : 
     136         [ -  + ]:           3 :         while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     137                 :             :         {
     138                 :           0 :                 Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     139                 :           0 :                 Subscription *sub;
     140                 :           0 :                 MemoryContext oldcxt;
     141                 :             : 
     142                 :             :                 /*
     143                 :             :                  * Allocate our results in the caller's context, not the
     144                 :             :                  * transaction's. We do this inside the loop, and restore the original
     145                 :             :                  * context at the end, so that leaky things like heap_getnext() are
     146                 :             :                  * not called in a potentially long-lived context.
     147                 :             :                  */
     148                 :           0 :                 oldcxt = MemoryContextSwitchTo(resultcxt);
     149                 :             : 
     150                 :           0 :                 sub = palloc0_object(Subscription);
     151                 :           0 :                 sub->oid = subform->oid;
     152                 :           0 :                 sub->dbid = subform->subdbid;
     153                 :           0 :                 sub->owner = subform->subowner;
     154                 :           0 :                 sub->enabled = subform->subenabled;
     155                 :           0 :                 sub->name = pstrdup(NameStr(subform->subname));
     156                 :           0 :                 sub->retaindeadtuples = subform->subretaindeadtuples;
     157                 :           0 :                 sub->retentionactive = subform->subretentionactive;
     158                 :             :                 /* We don't fill fields we are not interested in. */
     159                 :             : 
     160                 :           0 :                 res = lappend(res, sub);
     161                 :           0 :                 MemoryContextSwitchTo(oldcxt);
     162                 :           0 :         }
     163                 :             : 
     164                 :           3 :         table_endscan(scan);
     165                 :           3 :         table_close(rel, AccessShareLock);
     166                 :             : 
     167                 :           3 :         CommitTransactionCommand();
     168                 :             : 
     169                 :           6 :         return res;
     170                 :           3 : }
     171                 :             : 
     172                 :             : /*
     173                 :             :  * Wait for a background worker to start up and attach to the shmem context.
     174                 :             :  *
     175                 :             :  * This is only needed for cleaning up the shared memory in case the worker
     176                 :             :  * fails to attach.
     177                 :             :  *
     178                 :             :  * Returns whether the attach was successful.
     179                 :             :  */
     180                 :             : static bool
     181                 :           0 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     182                 :             :                                                            uint16 generation,
     183                 :             :                                                            BackgroundWorkerHandle *handle)
     184                 :             : {
     185                 :           0 :         bool            result = false;
     186                 :           0 :         bool            dropped_latch = false;
     187                 :             : 
     188                 :           0 :         for (;;)
     189                 :             :         {
     190                 :           0 :                 BgwHandleStatus status;
     191                 :           0 :                 pid_t           pid;
     192                 :           0 :                 int                     rc;
     193                 :             : 
     194         [ #  # ]:           0 :                 CHECK_FOR_INTERRUPTS();
     195                 :             : 
     196                 :           0 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197                 :             : 
     198                 :             :                 /* Worker either died or has started. Return false if died. */
     199   [ #  #  #  # ]:           0 :                 if (!worker->in_use || worker->proc)
     200                 :             :                 {
     201                 :           0 :                         result = worker->in_use;
     202                 :           0 :                         LWLockRelease(LogicalRepWorkerLock);
     203                 :           0 :                         break;
     204                 :             :                 }
     205                 :             : 
     206                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     207                 :             : 
     208                 :             :                 /* Check if worker has died before attaching, and clean up after it. */
     209                 :           0 :                 status = GetBackgroundWorkerPid(handle, &pid);
     210                 :             : 
     211         [ #  # ]:           0 :                 if (status == BGWH_STOPPED)
     212                 :             :                 {
     213                 :           0 :                         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     214                 :             :                         /* Ensure that this was indeed the worker we waited for. */
     215         [ #  # ]:           0 :                         if (generation == worker->generation)
     216                 :           0 :                                 logicalrep_worker_cleanup(worker);
     217                 :           0 :                         LWLockRelease(LogicalRepWorkerLock);
     218                 :           0 :                         break;                          /* result is already false */
     219                 :             :                 }
     220                 :             : 
     221                 :             :                 /*
     222                 :             :                  * We need timeout because we generally don't get notified via latch
     223                 :             :                  * about the worker attach.  But we don't expect to have to wait long.
     224                 :             :                  */
     225                 :           0 :                 rc = WaitLatch(MyLatch,
     226                 :             :                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     227                 :             :                                            10L, WAIT_EVENT_BGWORKER_STARTUP);
     228                 :             : 
     229         [ #  # ]:           0 :                 if (rc & WL_LATCH_SET)
     230                 :             :                 {
     231                 :           0 :                         ResetLatch(MyLatch);
     232         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
     233                 :           0 :                         dropped_latch = true;
     234                 :           0 :                 }
     235      [ #  #  # ]:           0 :         }
     236                 :             : 
     237                 :             :         /*
     238                 :             :          * If we had to clear a latch event in order to wait, be sure to restore
     239                 :             :          * it before exiting.  Otherwise caller may miss events.
     240                 :             :          */
     241         [ #  # ]:           0 :         if (dropped_latch)
     242                 :           0 :                 SetLatch(MyLatch);
     243                 :             : 
     244                 :           0 :         return result;
     245                 :           0 : }
     246                 :             : 
     247                 :             : /*
     248                 :             :  * Walks the workers array and searches for one that matches given worker type,
     249                 :             :  * subscription id, and relation id.
     250                 :             :  *
     251                 :             :  * For both apply workers and sequencesync workers, the relid should be set to
     252                 :             :  * InvalidOid, as these workers handle changes across all tables and sequences
     253                 :             :  * respectively, rather than targeting a specific relation. For tablesync
     254                 :             :  * workers, the relid should be set to the OID of the relation being
     255                 :             :  * synchronized.
     256                 :             :  */
     257                 :             : LogicalRepWorker *
     258                 :           1 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
     259                 :             :                                            bool only_running)
     260                 :             : {
     261                 :           1 :         int                     i;
     262                 :           1 :         LogicalRepWorker *res = NULL;
     263                 :             : 
     264                 :             :         /* relid must be valid only for table sync workers */
     265         [ +  - ]:           1 :         Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     266         [ +  - ]:           1 :         Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     267                 :             : 
     268                 :             :         /* Search for an attached worker that matches the specified criteria. */
     269         [ -  + ]:           1 :         for (i = 0; i < max_logical_replication_workers; i++)
     270                 :             :         {
     271                 :           1 :                 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     272                 :             : 
     273                 :             :                 /* Skip parallel apply workers. */
     274   [ +  -  +  - ]:           1 :                 if (isParallelApplyWorker(w))
     275                 :           0 :                         continue;
     276                 :             : 
     277   [ +  -  +  -  :           1 :                 if (w->in_use && w->subid == subid && w->relid == relid &&
                   +  - ]
     278   [ +  -  -  +  :           1 :                         w->type == wtype && (!only_running || w->proc))
                   #  # ]
     279                 :             :                 {
     280                 :           1 :                         res = w;
     281                 :           1 :                         break;
     282                 :             :                 }
     283   [ -  -  +  - ]:           1 :         }
     284                 :             : 
     285                 :           2 :         return res;
     286                 :           1 : }
     287                 :             : 
     288                 :             : /*
     289                 :             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     290                 :             :  * the subscription, instead of just one.
     291                 :             :  */
     292                 :             : List *
     293                 :          51 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     294                 :             : {
     295                 :          51 :         int                     i;
     296                 :          51 :         List       *res = NIL;
     297                 :             : 
     298         [ +  + ]:          51 :         if (acquire_lock)
     299                 :          13 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     300                 :             : 
     301         [ +  - ]:          51 :         Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     302                 :             : 
     303                 :             :         /* Search for attached worker for a given subscription id. */
     304         [ +  + ]:         255 :         for (i = 0; i < max_logical_replication_workers; i++)
     305                 :             :         {
     306                 :         204 :                 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     307                 :             : 
     308   [ +  +  +  -  :         204 :                 if (w->in_use && w->subid == subid && (!only_running || w->proc))
             +  +  +  + ]
     309                 :           2 :                         res = lappend(res, w);
     310                 :         204 :         }
     311                 :             : 
     312         [ +  + ]:          51 :         if (acquire_lock)
     313                 :          13 :                 LWLockRelease(LogicalRepWorkerLock);
     314                 :             : 
     315                 :         102 :         return res;
     316                 :          51 : }
     317                 :             : 
     318                 :             : /*
     319                 :             :  * Start new logical replication background worker, if possible.
     320                 :             :  *
     321                 :             :  * Returns true on success, false on failure.
     322                 :             :  */
     323                 :             : bool
     324                 :           0 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     325                 :             :                                                  Oid dbid, Oid subid, const char *subname, Oid userid,
     326                 :             :                                                  Oid relid, dsm_handle subworker_dsm,
     327                 :             :                                                  bool retain_dead_tuples)
     328                 :             : {
     329                 :           0 :         BackgroundWorker bgw;
     330                 :           0 :         BackgroundWorkerHandle *bgw_handle;
     331                 :           0 :         uint16          generation;
     332                 :           0 :         int                     i;
     333                 :           0 :         int                     slot = 0;
     334                 :           0 :         LogicalRepWorker *worker = NULL;
     335                 :           0 :         int                     nsyncworkers;
     336                 :           0 :         int                     nparallelapplyworkers;
     337                 :           0 :         TimestampTz now;
     338                 :           0 :         bool            is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     339                 :           0 :         bool            is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
     340                 :           0 :         bool            is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     341                 :             : 
     342                 :             :         /*----------
     343                 :             :          * Sanity checks:
     344                 :             :          * - must be valid worker type
     345                 :             :          * - tablesync workers are only ones to have relid
     346                 :             :          * - parallel apply worker is the only kind of subworker
     347                 :             :          * - The replication slot used in conflict detection is created when
     348                 :             :          *   retain_dead_tuples is enabled
     349                 :             :          */
     350         [ #  # ]:           0 :         Assert(wtype != WORKERTYPE_UNKNOWN);
     351         [ #  # ]:           0 :         Assert(is_tablesync_worker == OidIsValid(relid));
     352         [ #  # ]:           0 :         Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     353   [ #  #  #  # ]:           0 :         Assert(!retain_dead_tuples || MyReplicationSlot);
     354                 :             : 
     355   [ #  #  #  # ]:           0 :         ereport(DEBUG1,
     356                 :             :                         (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     357                 :             :                                                          subname)));
     358                 :             : 
     359                 :             :         /* Report this after the initial starting message for consistency. */
     360         [ #  # ]:           0 :         if (max_active_replication_origins == 0)
     361   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     362                 :             :                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     363                 :             :                                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
     364                 :             : 
     365                 :             :         /*
     366                 :             :          * We need to do the modification of the shared memory under lock so that
     367                 :             :          * we have consistent view.
     368                 :             :          */
     369                 :           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     370                 :             : 
     371                 :             : retry:
     372                 :             :         /* Find unused worker slot. */
     373         [ #  # ]:           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     374                 :             :         {
     375                 :           0 :                 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     376                 :             : 
     377         [ #  # ]:           0 :                 if (!w->in_use)
     378                 :             :                 {
     379                 :           0 :                         worker = w;
     380                 :           0 :                         slot = i;
     381                 :           0 :                         break;
     382                 :             :                 }
     383         [ #  # ]:           0 :         }
     384                 :             : 
     385                 :           0 :         nsyncworkers = logicalrep_sync_worker_count(subid);
     386                 :             : 
     387                 :           0 :         now = GetCurrentTimestamp();
     388                 :             : 
     389                 :             :         /*
     390                 :             :          * If we didn't find a free slot, try to do garbage collection.  The
     391                 :             :          * reason we do this is because if some worker failed to start up and its
     392                 :             :          * parent has crashed while waiting, the in_use state was never cleared.
     393                 :             :          */
     394   [ #  #  #  # ]:           0 :         if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     395                 :             :         {
     396                 :           0 :                 bool            did_cleanup = false;
     397                 :             : 
     398         [ #  # ]:           0 :                 for (i = 0; i < max_logical_replication_workers; i++)
     399                 :             :                 {
     400                 :           0 :                         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     401                 :             : 
     402                 :             :                         /*
     403                 :             :                          * If the worker was marked in use but didn't manage to attach in
     404                 :             :                          * time, clean it up.
     405                 :             :                          */
     406   [ #  #  #  #  :           0 :                         if (w->in_use && !w->proc &&
                   #  # ]
     407                 :           0 :                                 TimestampDifferenceExceeds(w->launch_time, now,
     408                 :           0 :                                                                                    wal_receiver_timeout))
     409                 :             :                         {
     410   [ #  #  #  # ]:           0 :                                 elog(WARNING,
     411                 :             :                                          "logical replication worker for subscription %u took too long to start; canceled",
     412                 :             :                                          w->subid);
     413                 :             : 
     414                 :           0 :                                 logicalrep_worker_cleanup(w);
     415                 :           0 :                                 did_cleanup = true;
     416                 :           0 :                         }
     417                 :           0 :                 }
     418                 :             : 
     419         [ #  # ]:           0 :                 if (did_cleanup)
     420                 :           0 :                         goto retry;
     421         [ #  # ]:           0 :         }
     422                 :             : 
     423                 :             :         /*
     424                 :             :          * We don't allow to invoke more sync workers once we have reached the
     425                 :             :          * sync worker limit per subscription. So, just return silently as we
     426                 :             :          * might get here because of an otherwise harmless race condition.
     427                 :             :          */
     428   [ #  #  #  # ]:           0 :         if ((is_tablesync_worker || is_sequencesync_worker) &&
     429                 :           0 :                 nsyncworkers >= max_sync_workers_per_subscription)
     430                 :             :         {
     431                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     432                 :           0 :                 return false;
     433                 :             :         }
     434                 :             : 
     435                 :           0 :         nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     436                 :             : 
     437                 :             :         /*
     438                 :             :          * Return false if the number of parallel apply workers reached the limit
     439                 :             :          * per subscription.
     440                 :             :          */
     441   [ #  #  #  # ]:           0 :         if (is_parallel_apply_worker &&
     442                 :           0 :                 nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     443                 :             :         {
     444                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     445                 :           0 :                 return false;
     446                 :             :         }
     447                 :             : 
     448                 :             :         /*
     449                 :             :          * However if there are no more free worker slots, inform user about it
     450                 :             :          * before exiting.
     451                 :             :          */
     452         [ #  # ]:           0 :         if (worker == NULL)
     453                 :             :         {
     454                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     455   [ #  #  #  # ]:           0 :                 ereport(WARNING,
     456                 :             :                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     457                 :             :                                  errmsg("out of logical replication worker slots"),
     458                 :             :                                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     459                 :           0 :                 return false;
     460                 :             :         }
     461                 :             : 
     462                 :             :         /* Prepare the worker slot. */
     463                 :           0 :         worker->type = wtype;
     464                 :           0 :         worker->launch_time = now;
     465                 :           0 :         worker->in_use = true;
     466                 :           0 :         worker->generation++;
     467                 :           0 :         worker->proc = NULL;
     468                 :           0 :         worker->dbid = dbid;
     469                 :           0 :         worker->userid = userid;
     470                 :           0 :         worker->subid = subid;
     471                 :           0 :         worker->relid = relid;
     472                 :           0 :         worker->relstate = SUBREL_STATE_UNKNOWN;
     473                 :           0 :         worker->relstate_lsn = InvalidXLogRecPtr;
     474                 :           0 :         worker->stream_fileset = NULL;
     475         [ #  # ]:           0 :         worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     476                 :           0 :         worker->parallel_apply = is_parallel_apply_worker;
     477         [ #  # ]:           0 :         worker->oldest_nonremovable_xid = retain_dead_tuples
     478                 :           0 :                 ? MyReplicationSlot->data.xmin
     479                 :             :                 : InvalidTransactionId;
     480                 :           0 :         worker->last_lsn = InvalidXLogRecPtr;
     481                 :           0 :         TIMESTAMP_NOBEGIN(worker->last_send_time);
     482                 :           0 :         TIMESTAMP_NOBEGIN(worker->last_recv_time);
     483                 :           0 :         worker->reply_lsn = InvalidXLogRecPtr;
     484                 :           0 :         TIMESTAMP_NOBEGIN(worker->reply_time);
     485                 :           0 :         worker->last_seqsync_start_time = 0;
     486                 :             : 
     487                 :             :         /* Before releasing lock, remember generation for future identification. */
     488                 :           0 :         generation = worker->generation;
     489                 :             : 
     490                 :           0 :         LWLockRelease(LogicalRepWorkerLock);
     491                 :             : 
     492                 :             :         /* Register the new dynamic worker. */
     493                 :           0 :         memset(&bgw, 0, sizeof(bgw));
     494                 :           0 :         bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     495                 :             :                 BGWORKER_BACKEND_DATABASE_CONNECTION;
     496                 :           0 :         bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     497                 :           0 :         snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     498                 :             : 
     499   [ #  #  #  #  :           0 :         switch (worker->type)
                   #  # ]
     500                 :             :         {
     501                 :             :                 case WORKERTYPE_APPLY:
     502                 :           0 :                         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     503                 :           0 :                         snprintf(bgw.bgw_name, BGW_MAXLEN,
     504                 :             :                                          "logical replication apply worker for subscription %u",
     505                 :           0 :                                          subid);
     506                 :           0 :                         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     507                 :           0 :                         break;
     508                 :             : 
     509                 :             :                 case WORKERTYPE_PARALLEL_APPLY:
     510                 :           0 :                         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     511                 :           0 :                         snprintf(bgw.bgw_name, BGW_MAXLEN,
     512                 :             :                                          "logical replication parallel apply worker for subscription %u",
     513                 :           0 :                                          subid);
     514                 :           0 :                         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     515                 :             : 
     516                 :           0 :                         memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     517                 :           0 :                         break;
     518                 :             : 
     519                 :             :                 case WORKERTYPE_SEQUENCESYNC:
     520                 :           0 :                         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
     521                 :           0 :                         snprintf(bgw.bgw_name, BGW_MAXLEN,
     522                 :             :                                          "logical replication sequencesync worker for subscription %u",
     523                 :           0 :                                          subid);
     524                 :           0 :                         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
     525                 :           0 :                         break;
     526                 :             : 
     527                 :             :                 case WORKERTYPE_TABLESYNC:
     528                 :           0 :                         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
     529                 :           0 :                         snprintf(bgw.bgw_name, BGW_MAXLEN,
     530                 :             :                                          "logical replication tablesync worker for subscription %u sync %u",
     531                 :           0 :                                          subid,
     532                 :           0 :                                          relid);
     533                 :           0 :                         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     534                 :           0 :                         break;
     535                 :             : 
     536                 :             :                 case WORKERTYPE_UNKNOWN:
     537                 :             :                         /* Should never happen. */
     538   [ #  #  #  # ]:           0 :                         elog(ERROR, "unknown worker type");
     539                 :           0 :         }
     540                 :             : 
     541                 :           0 :         bgw.bgw_restart_time = BGW_NEVER_RESTART;
     542                 :           0 :         bgw.bgw_notify_pid = MyProcPid;
     543                 :           0 :         bgw.bgw_main_arg = Int32GetDatum(slot);
     544                 :             : 
     545         [ #  # ]:           0 :         if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     546                 :             :         {
     547                 :             :                 /* Failed to start worker, so clean up the worker slot. */
     548                 :           0 :                 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     549         [ #  # ]:           0 :                 Assert(generation == worker->generation);
     550                 :           0 :                 logicalrep_worker_cleanup(worker);
     551                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     552                 :             : 
     553   [ #  #  #  # ]:           0 :                 ereport(WARNING,
     554                 :             :                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     555                 :             :                                  errmsg("out of background worker slots"),
     556                 :             :                                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     557                 :           0 :                 return false;
     558                 :             :         }
     559                 :             : 
     560                 :             :         /* Now wait until it attaches. */
     561                 :           0 :         return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     562                 :           0 : }
     563                 :             : 
     564                 :             : /*
     565                 :             :  * Internal function to stop the worker and wait until it detaches from the
     566                 :             :  * slot.
     567                 :             :  */
     568                 :             : static void
     569                 :           1 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     570                 :             : {
     571                 :           1 :         uint16          generation;
     572                 :             : 
     573         [ +  - ]:           1 :         Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     574                 :             : 
     575                 :             :         /*
     576                 :             :          * Remember which generation was our worker so we can check if what we see
     577                 :             :          * is still the same one.
     578                 :             :          */
     579                 :           1 :         generation = worker->generation;
     580                 :             : 
     581                 :             :         /*
     582                 :             :          * If we found a worker but it does not have proc set then it is still
     583                 :             :          * starting up; wait for it to finish starting and then kill it.
     584                 :             :          */
     585   [ -  +  -  + ]:           1 :         while (worker->in_use && !worker->proc)
     586                 :             :         {
     587                 :           1 :                 int                     rc;
     588                 :             : 
     589                 :           1 :                 LWLockRelease(LogicalRepWorkerLock);
     590                 :             : 
     591                 :             :                 /* Wait a bit --- we don't expect to have to wait long. */
     592                 :           1 :                 rc = WaitLatch(MyLatch,
     593                 :             :                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     594                 :             :                                            10L, WAIT_EVENT_BGWORKER_STARTUP);
     595                 :             : 
     596         [ +  - ]:           1 :                 if (rc & WL_LATCH_SET)
     597                 :             :                 {
     598                 :           0 :                         ResetLatch(MyLatch);
     599         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
     600                 :           0 :                 }
     601                 :             : 
     602                 :             :                 /* Recheck worker status. */
     603                 :           1 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     604                 :             : 
     605                 :             :                 /*
     606                 :             :                  * Check whether the worker slot is no longer used, which would mean
     607                 :             :                  * that the worker has exited, or whether the worker generation is
     608                 :             :                  * different, meaning that a different worker has taken the slot.
     609                 :             :                  */
     610   [ +  -  -  + ]:           1 :                 if (!worker->in_use || worker->generation != generation)
     611                 :           0 :                         return;
     612                 :             : 
     613                 :             :                 /* Worker has assigned proc, so it has started. */
     614         [ +  - ]:           1 :                 if (worker->proc)
     615                 :           1 :                         break;
     616      [ -  +  - ]:           1 :         }
     617                 :             : 
     618                 :             :         /* Now terminate the worker ... */
     619                 :           1 :         kill(worker->proc->pid, signo);
     620                 :             : 
     621                 :             :         /* ... and wait for it to die. */
     622                 :           2 :         for (;;)
     623                 :             :         {
     624                 :           2 :                 int                     rc;
     625                 :             : 
     626                 :             :                 /* is it gone? */
     627   [ +  +  -  + ]:           2 :                 if (!worker->proc || worker->generation != generation)
     628                 :           1 :                         break;
     629                 :             : 
     630                 :           1 :                 LWLockRelease(LogicalRepWorkerLock);
     631                 :             : 
     632                 :             :                 /* Wait a bit --- we don't expect to have to wait long. */
     633                 :           1 :                 rc = WaitLatch(MyLatch,
     634                 :             :                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     635                 :             :                                            10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     636                 :             : 
     637         [ +  - ]:           1 :                 if (rc & WL_LATCH_SET)
     638                 :             :                 {
     639                 :           0 :                         ResetLatch(MyLatch);
     640         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
     641                 :           0 :                 }
     642                 :             : 
     643                 :           1 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     644         [ +  + ]:           2 :         }
     645                 :           1 : }
     646                 :             : 
     647                 :             : /*
     648                 :             :  * Stop the logical replication worker that matches the specified worker type,
     649                 :             :  * subscription id, and relation id.
     650                 :             :  */
     651                 :             : void
     652                 :           1 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     653                 :             : {
     654                 :           1 :         LogicalRepWorker *worker;
     655                 :             : 
     656                 :             :         /* relid must be valid only for table sync workers */
     657         [ +  - ]:           1 :         Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     658                 :             : 
     659                 :           1 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     660                 :             : 
     661                 :           1 :         worker = logicalrep_worker_find(wtype, subid, relid, false);
     662                 :             : 
     663         [ -  + ]:           1 :         if (worker)
     664                 :             :         {
     665   [ +  -  +  - ]:           1 :                 Assert(!isParallelApplyWorker(worker));
     666                 :           1 :                 logicalrep_worker_stop_internal(worker, SIGTERM);
     667                 :           1 :         }
     668                 :             : 
     669                 :           1 :         LWLockRelease(LogicalRepWorkerLock);
     670                 :           1 : }
     671                 :             : 
     672                 :             : /*
     673                 :             :  * Stop the given logical replication parallel apply worker.
     674                 :             :  *
     675                 :             :  * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
     676                 :             :  * worker so that the worker exits cleanly.
     677                 :             :  */
     678                 :             : void
     679                 :           0 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     680                 :             : {
     681                 :           0 :         int                     slot_no;
     682                 :           0 :         uint16          generation;
     683                 :           0 :         LogicalRepWorker *worker;
     684                 :             : 
     685         [ #  # ]:           0 :         SpinLockAcquire(&winfo->shared->mutex);
     686                 :           0 :         generation = winfo->shared->logicalrep_worker_generation;
     687                 :           0 :         slot_no = winfo->shared->logicalrep_worker_slot_no;
     688                 :           0 :         SpinLockRelease(&winfo->shared->mutex);
     689                 :             : 
     690         [ #  # ]:           0 :         Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     691                 :             : 
     692                 :             :         /*
     693                 :             :          * Detach from the error_mq_handle for the parallel apply worker before
     694                 :             :          * stopping it. This prevents the leader apply worker from trying to
     695                 :             :          * receive the message from the error queue that might already be detached
     696                 :             :          * by the parallel apply worker.
     697                 :             :          */
     698         [ #  # ]:           0 :         if (winfo->error_mq_handle)
     699                 :             :         {
     700                 :           0 :                 shm_mq_detach(winfo->error_mq_handle);
     701                 :           0 :                 winfo->error_mq_handle = NULL;
     702                 :           0 :         }
     703                 :             : 
     704                 :           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     705                 :             : 
     706                 :           0 :         worker = &LogicalRepCtx->workers[slot_no];
     707         [ #  # ]:           0 :         Assert(isParallelApplyWorker(worker));
     708                 :             : 
     709                 :             :         /*
     710                 :             :          * Only stop the worker if the generation matches and the worker is alive.
     711                 :             :          */
     712   [ #  #  #  # ]:           0 :         if (worker->generation == generation && worker->proc)
     713                 :           0 :                 logicalrep_worker_stop_internal(worker, SIGUSR2);
     714                 :             : 
     715                 :           0 :         LWLockRelease(LogicalRepWorkerLock);
     716                 :           0 : }
     717                 :             : 
     718                 :             : /*
     719                 :             :  * Wake up (using latch) any logical replication worker that matches the
     720                 :             :  * specified worker type, subscription id, and relation id.
     721                 :             :  */
     722                 :             : void
     723                 :           0 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     724                 :             : {
     725                 :           0 :         LogicalRepWorker *worker;
     726                 :             : 
     727                 :             :         /* relid must be valid only for table sync workers */
     728         [ #  # ]:           0 :         Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     729                 :             : 
     730                 :           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     731                 :             : 
     732                 :           0 :         worker = logicalrep_worker_find(wtype, subid, relid, true);
     733                 :             : 
     734         [ #  # ]:           0 :         if (worker)
     735                 :           0 :                 logicalrep_worker_wakeup_ptr(worker);
     736                 :             : 
     737                 :           0 :         LWLockRelease(LogicalRepWorkerLock);
     738                 :           0 : }
     739                 :             : 
     740                 :             : /*
     741                 :             :  * Wake up (using latch) the specified logical replication worker.
     742                 :             :  *
     743                 :             :  * Caller must hold lock, else worker->proc could change under us.
     744                 :             :  */
     745                 :             : void
     746                 :           0 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     747                 :             : {
     748         [ #  # ]:           0 :         Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     749                 :             : 
     750                 :           0 :         SetLatch(&worker->proc->procLatch);
     751                 :           0 : }
     752                 :             : 
     753                 :             : /*
     754                 :             :  * Attach to a slot.
     755                 :             :  */
     756                 :             : void
     757                 :           1 : logicalrep_worker_attach(int slot)
     758                 :             : {
     759                 :             :         /* Block concurrent access. */
     760                 :           1 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     761                 :             : 
     762         [ +  - ]:           1 :         Assert(slot >= 0 && slot < max_logical_replication_workers);
     763                 :           1 :         MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     764                 :             : 
     765         [ +  - ]:           1 :         if (!MyLogicalRepWorker->in_use)
     766                 :             :         {
     767                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     768   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     769                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     770                 :             :                                  errmsg("logical replication worker slot %d is empty, cannot attach",
     771                 :             :                                                 slot)));
     772                 :           0 :         }
     773                 :             : 
     774         [ +  - ]:           1 :         if (MyLogicalRepWorker->proc)
     775                 :             :         {
     776                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     777   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     778                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     779                 :             :                                  errmsg("logical replication worker slot %d is already used by "
     780                 :             :                                                 "another worker, cannot attach", slot)));
     781                 :           0 :         }
     782                 :             : 
     783                 :           1 :         MyLogicalRepWorker->proc = MyProc;
     784                 :           1 :         before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     785                 :             : 
     786                 :           1 :         LWLockRelease(LogicalRepWorkerLock);
     787                 :           1 : }
     788                 :             : 
     789                 :             : /*
     790                 :             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     791                 :             :  * (cleans up the worker info).
     792                 :             :  */
     793                 :             : static void
     794                 :           1 : logicalrep_worker_detach(void)
     795                 :             : {
     796                 :             :         /* Stop the parallel apply workers. */
     797         [ -  + ]:           1 :         if (am_leader_apply_worker())
     798                 :             :         {
     799                 :           1 :                 List       *workers;
     800                 :           1 :                 ListCell   *lc;
     801                 :             : 
     802                 :             :                 /*
     803                 :             :                  * Detach from the error_mq_handle for all parallel apply workers
     804                 :             :                  * before terminating them. This prevents the leader apply worker from
     805                 :             :                  * receiving the worker termination message and sending it to logs
     806                 :             :                  * when the same is already done by the parallel worker.
     807                 :             :                  */
     808                 :           1 :                 pa_detach_all_error_mq();
     809                 :             : 
     810                 :           1 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     811                 :             : 
     812                 :           1 :                 workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     813   [ +  -  +  +  :           2 :                 foreach(lc, workers)
                   +  + ]
     814                 :             :                 {
     815                 :           1 :                         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     816                 :             : 
     817   [ +  -  +  - ]:           1 :                         if (isParallelApplyWorker(w))
     818                 :           0 :                                 logicalrep_worker_stop_internal(w, SIGTERM);
     819                 :           1 :                 }
     820                 :             : 
     821                 :           1 :                 LWLockRelease(LogicalRepWorkerLock);
     822                 :             : 
     823                 :           1 :                 list_free(workers);
     824                 :           1 :         }
     825                 :             : 
     826                 :             :         /* Block concurrent access. */
     827                 :           1 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     828                 :             : 
     829                 :           1 :         logicalrep_worker_cleanup(MyLogicalRepWorker);
     830                 :             : 
     831                 :           1 :         LWLockRelease(LogicalRepWorkerLock);
     832                 :           1 : }
     833                 :             : 
     834                 :             : /*
     835                 :             :  * Clean up worker info.
     836                 :             :  */
     837                 :             : static void
     838                 :           1 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     839                 :             : {
     840         [ +  - ]:           1 :         Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     841                 :             : 
     842                 :           1 :         worker->type = WORKERTYPE_UNKNOWN;
     843                 :           1 :         worker->in_use = false;
     844                 :           1 :         worker->proc = NULL;
     845                 :           1 :         worker->dbid = InvalidOid;
     846                 :           1 :         worker->userid = InvalidOid;
     847                 :           1 :         worker->subid = InvalidOid;
     848                 :           1 :         worker->relid = InvalidOid;
     849                 :           1 :         worker->leader_pid = InvalidPid;
     850                 :           1 :         worker->parallel_apply = false;
     851                 :           1 : }
     852                 :             : 
     853                 :             : /*
     854                 :             :  * Cleanup function for logical replication launcher.
     855                 :             :  *
     856                 :             :  * Called on logical replication launcher exit.
     857                 :             :  */
     858                 :             : static void
     859                 :           1 : logicalrep_launcher_onexit(int code, Datum arg)
     860                 :             : {
     861                 :           1 :         LogicalRepCtx->launcher_pid = 0;
     862                 :           1 : }
     863                 :             : 
     864                 :             : /*
     865                 :             :  * Reset the last_seqsync_start_time of the sequencesync worker in the
     866                 :             :  * subscription's apply worker.
     867                 :             :  *
     868                 :             :  * Note that this value is not stored in the sequencesync worker, because that
     869                 :             :  * has finished already and is about to exit.
     870                 :             :  */
     871                 :             : void
     872                 :           0 : logicalrep_reset_seqsync_start_time(void)
     873                 :             : {
     874                 :           0 :         LogicalRepWorker *worker;
     875                 :             : 
     876                 :             :         /*
     877                 :             :          * The apply worker can't access last_seqsync_start_time concurrently, so
     878                 :             :          * it is okay to use SHARED lock here. See ProcessSequencesForSync().
     879                 :             :          */
     880                 :           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     881                 :             : 
     882                 :           0 :         worker = logicalrep_worker_find(WORKERTYPE_APPLY,
     883                 :           0 :                                                                         MyLogicalRepWorker->subid, InvalidOid,
     884                 :             :                                                                         true);
     885         [ #  # ]:           0 :         if (worker)
     886                 :           0 :                 worker->last_seqsync_start_time = 0;
     887                 :             : 
     888                 :           0 :         LWLockRelease(LogicalRepWorkerLock);
     889                 :           0 : }
     890                 :             : 
     891                 :             : /*
     892                 :             :  * Cleanup function.
     893                 :             :  *
     894                 :             :  * Called on logical replication worker exit.
     895                 :             :  */
     896                 :             : static void
     897                 :           1 : logicalrep_worker_onexit(int code, Datum arg)
     898                 :             : {
     899                 :             :         /* Disconnect gracefully from the remote side. */
     900         [ +  - ]:           1 :         if (LogRepWorkerWalRcvConn)
     901                 :           0 :                 walrcv_disconnect(LogRepWorkerWalRcvConn);
     902                 :             : 
     903                 :           1 :         logicalrep_worker_detach();
     904                 :             : 
     905                 :             :         /* Cleanup fileset used for streaming transactions. */
     906         [ +  - ]:           1 :         if (MyLogicalRepWorker->stream_fileset != NULL)
     907                 :           0 :                 FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     908                 :             : 
     909                 :             :         /*
     910                 :             :          * Session level locks may be acquired outside of a transaction in
     911                 :             :          * parallel apply mode and will not be released when the worker
     912                 :             :          * terminates, so manually release all locks before the worker exits.
     913                 :             :          *
     914                 :             :          * The locks will be acquired once the worker is initialized.
     915                 :             :          */
     916         [ +  - ]:           1 :         if (!InitializingApplyWorker)
     917                 :           0 :                 LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     918                 :             : 
     919                 :           1 :         ApplyLauncherWakeup();
     920                 :           1 : }
     921                 :             : 
     922                 :             : /*
     923                 :             :  * Count the number of registered (not necessarily running) sync workers
     924                 :             :  * for a subscription.
     925                 :             :  */
     926                 :             : int
     927                 :           0 : logicalrep_sync_worker_count(Oid subid)
     928                 :             : {
     929                 :           0 :         int                     i;
     930                 :           0 :         int                     res = 0;
     931                 :             : 
     932         [ #  # ]:           0 :         Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     933                 :             : 
     934                 :             :         /* Search for attached worker for a given subscription id. */
     935         [ #  # ]:           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     936                 :             :         {
     937                 :           0 :                 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     938                 :             : 
     939   [ #  #  #  #  :           0 :                 if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
             #  #  #  # ]
     940                 :           0 :                         res++;
     941                 :           0 :         }
     942                 :             : 
     943                 :           0 :         return res;
     944                 :           0 : }
     945                 :             : 
     946                 :             : /*
     947                 :             :  * Count the number of registered (but not necessarily running) parallel apply
     948                 :             :  * workers for a subscription.
     949                 :             :  */
     950                 :             : static int
     951                 :           0 : logicalrep_pa_worker_count(Oid subid)
     952                 :             : {
     953                 :           0 :         int                     i;
     954                 :           0 :         int                     res = 0;
     955                 :             : 
     956         [ #  # ]:           0 :         Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     957                 :             : 
     958                 :             :         /*
     959                 :             :          * Scan all attached parallel apply workers, only counting those which
     960                 :             :          * have the given subscription id.
     961                 :             :          */
     962         [ #  # ]:           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     963                 :             :         {
     964                 :           0 :                 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     965                 :             : 
     966   [ #  #  #  #  :           0 :                 if (isParallelApplyWorker(w) && w->subid == subid)
                   #  # ]
     967                 :           0 :                         res++;
     968                 :           0 :         }
     969                 :             : 
     970                 :           0 :         return res;
     971                 :           0 : }
     972                 :             : 
     973                 :             : /*
     974                 :             :  * ApplyLauncherShmemSize
     975                 :             :  *              Compute space needed for replication launcher shared memory
     976                 :             :  */
     977                 :             : Size
     978                 :          21 : ApplyLauncherShmemSize(void)
     979                 :             : {
     980                 :          21 :         Size            size;
     981                 :             : 
     982                 :             :         /*
     983                 :             :          * Need the fixed struct and the array of LogicalRepWorker.
     984                 :             :          */
     985                 :          21 :         size = sizeof(LogicalRepCtxStruct);
     986                 :          21 :         size = MAXALIGN(size);
     987                 :          21 :         size = add_size(size, mul_size(max_logical_replication_workers,
     988                 :             :                                                                    sizeof(LogicalRepWorker)));
     989                 :          42 :         return size;
     990                 :          21 : }
     991                 :             : 
     992                 :             : /*
     993                 :             :  * ApplyLauncherRegister
     994                 :             :  *              Register a background worker running the logical replication launcher.
     995                 :             :  */
     996                 :             : void
     997                 :           2 : ApplyLauncherRegister(void)
     998                 :             : {
     999                 :           2 :         BackgroundWorker bgw;
    1000                 :             : 
    1001                 :             :         /*
    1002                 :             :          * The logical replication launcher is disabled during binary upgrades, to
    1003                 :             :          * prevent logical replication workers from running on the source cluster.
    1004                 :             :          * That could cause replication origins to move forward after having been
    1005                 :             :          * copied to the target cluster, potentially creating conflicts with the
    1006                 :             :          * copied data files.
    1007                 :             :          */
    1008   [ +  -  -  + ]:           2 :         if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
    1009                 :           0 :                 return;
    1010                 :             : 
    1011                 :           2 :         memset(&bgw, 0, sizeof(bgw));
    1012                 :           2 :         bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
    1013                 :             :                 BGWORKER_BACKEND_DATABASE_CONNECTION;
    1014                 :           2 :         bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
    1015                 :           2 :         snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
    1016                 :           2 :         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
    1017                 :           2 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
    1018                 :             :                          "logical replication launcher");
    1019                 :           2 :         snprintf(bgw.bgw_type, BGW_MAXLEN,
    1020                 :             :                          "logical replication launcher");
    1021                 :           2 :         bgw.bgw_restart_time = 5;
    1022                 :           2 :         bgw.bgw_notify_pid = 0;
    1023                 :           2 :         bgw.bgw_main_arg = (Datum) 0;
    1024                 :             : 
    1025                 :           2 :         RegisterBackgroundWorker(&bgw);
    1026         [ -  + ]:           2 : }
    1027                 :             : 
    1028                 :             : /*
    1029                 :             :  * ApplyLauncherShmemInit
    1030                 :             :  *              Allocate and initialize replication launcher shared memory
    1031                 :             :  */
    1032                 :             : void
    1033                 :           6 : ApplyLauncherShmemInit(void)
    1034                 :             : {
    1035                 :           6 :         bool            found;
    1036                 :             : 
    1037                 :           6 :         LogicalRepCtx = (LogicalRepCtxStruct *)
    1038                 :           6 :                 ShmemInitStruct("Logical Replication Launcher Data",
    1039                 :           6 :                                                 ApplyLauncherShmemSize(),
    1040                 :             :                                                 &found);
    1041                 :             : 
    1042         [ -  + ]:           6 :         if (!found)
    1043                 :             :         {
    1044                 :           6 :                 int                     slot;
    1045                 :             : 
    1046                 :           6 :                 memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
    1047                 :             : 
    1048                 :           6 :                 LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
    1049                 :           6 :                 LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
    1050                 :             : 
    1051                 :             :                 /* Initialize memory and spin locks for each worker slot. */
    1052         [ +  + ]:          30 :                 for (slot = 0; slot < max_logical_replication_workers; slot++)
    1053                 :             :                 {
    1054                 :          24 :                         LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
    1055                 :             : 
    1056                 :          24 :                         memset(worker, 0, sizeof(LogicalRepWorker));
    1057                 :          24 :                         SpinLockInit(&worker->relmutex);
    1058                 :          24 :                 }
    1059                 :           6 :         }
    1060                 :           6 : }
    1061                 :             : 
    1062                 :             : /*
    1063                 :             :  * Initialize or attach to the dynamic shared hash table that stores the
    1064                 :             :  * last-start times, if not already done.
    1065                 :             :  * This must be called before accessing the table.
    1066                 :             :  */
    1067                 :             : static void
    1068                 :          13 : logicalrep_launcher_attach_dshmem(void)
    1069                 :             : {
    1070                 :          13 :         MemoryContext oldcontext;
    1071                 :             : 
    1072                 :             :         /* Quick exit if we already did this. */
    1073   [ +  +  +  + ]:          13 :         if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1074                 :          12 :                 last_start_times != NULL)
    1075                 :          11 :                 return;
    1076                 :             : 
    1077                 :             :         /* Otherwise, use a lock to ensure only one process creates the table. */
    1078                 :           2 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1079                 :             : 
    1080                 :             :         /* Be sure any local memory allocated by DSA routines is persistent. */
    1081                 :           2 :         oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1082                 :             : 
    1083         [ +  + ]:           2 :         if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1084                 :             :         {
    1085                 :             :                 /* Initialize dynamic shared hash table for last-start times. */
    1086                 :           1 :                 last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1087                 :           1 :                 dsa_pin(last_start_times_dsa);
    1088                 :           1 :                 dsa_pin_mapping(last_start_times_dsa);
    1089                 :           1 :                 last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1090                 :             : 
    1091                 :             :                 /* Store handles in shared memory for other backends to use. */
    1092                 :           1 :                 LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1093                 :           1 :                 LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1094                 :           1 :         }
    1095         [ -  + ]:           1 :         else if (!last_start_times)
    1096                 :             :         {
    1097                 :             :                 /* Attach to existing dynamic shared hash table. */
    1098                 :           1 :                 last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1099                 :           1 :                 dsa_pin_mapping(last_start_times_dsa);
    1100                 :           2 :                 last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1101                 :           1 :                                                                                  LogicalRepCtx->last_start_dsh, NULL);
    1102                 :           1 :         }
    1103                 :             : 
    1104                 :           2 :         MemoryContextSwitchTo(oldcontext);
    1105                 :           2 :         LWLockRelease(LogicalRepWorkerLock);
    1106         [ -  + ]:          13 : }
    1107                 :             : 
    1108                 :             : /*
    1109                 :             :  * Set the last-start time for the subscription.
    1110                 :             :  */
    1111                 :             : static void
    1112                 :           0 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1113                 :             : {
    1114                 :           0 :         LauncherLastStartTimesEntry *entry;
    1115                 :           0 :         bool            found;
    1116                 :             : 
    1117                 :           0 :         logicalrep_launcher_attach_dshmem();
    1118                 :             : 
    1119                 :           0 :         entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1120                 :           0 :         entry->last_start_time = start_time;
    1121                 :           0 :         dshash_release_lock(last_start_times, entry);
    1122                 :           0 : }
    1123                 :             : 
    1124                 :             : /*
    1125                 :             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1126                 :             :  */
    1127                 :             : static TimestampTz
    1128                 :           0 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1129                 :             : {
    1130                 :           0 :         LauncherLastStartTimesEntry *entry;
    1131                 :           0 :         TimestampTz ret;
    1132                 :             : 
    1133                 :           0 :         logicalrep_launcher_attach_dshmem();
    1134                 :             : 
    1135                 :           0 :         entry = dshash_find(last_start_times, &subid, false);
    1136         [ #  # ]:           0 :         if (entry == NULL)
    1137                 :           0 :                 return 0;
    1138                 :             : 
    1139                 :           0 :         ret = entry->last_start_time;
    1140                 :           0 :         dshash_release_lock(last_start_times, entry);
    1141                 :             : 
    1142                 :           0 :         return ret;
    1143                 :           0 : }
    1144                 :             : 
    1145                 :             : /*
    1146                 :             :  * Remove the last-start-time entry for the subscription, if one exists.
    1147                 :             :  *
    1148                 :             :  * This has two use-cases: to remove the entry related to a subscription
    1149                 :             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1150                 :             :  * and to allow immediate restart of an apply worker that has exited
    1151                 :             :  * due to subscription parameter changes.
    1152                 :             :  */
    1153                 :             : void
    1154                 :          13 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1155                 :             : {
    1156                 :          13 :         logicalrep_launcher_attach_dshmem();
    1157                 :             : 
    1158                 :          13 :         (void) dshash_delete_key(last_start_times, &subid);
    1159                 :          13 : }
    1160                 :             : 
    1161                 :             : /*
    1162                 :             :  * Wakeup the launcher on commit if requested.
    1163                 :             :  */
    1164                 :             : void
    1165                 :       57914 : AtEOXact_ApplyLauncher(bool isCommit)
    1166                 :             : {
    1167         [ +  + ]:       57914 :         if (isCommit)
    1168                 :             :         {
    1169         [ +  + ]:       50898 :                 if (on_commit_launcher_wakeup)
    1170                 :           3 :                         ApplyLauncherWakeup();
    1171                 :       50898 :         }
    1172                 :             : 
    1173                 :       57914 :         on_commit_launcher_wakeup = false;
    1174                 :       57914 : }
    1175                 :             : 
    1176                 :             : /*
    1177                 :             :  * Request wakeup of the launcher on commit of the transaction.
    1178                 :             :  *
    1179                 :             :  * This is used to send launcher signal to stop sleeping and process the
    1180                 :             :  * subscriptions when current transaction commits. Should be used when new
    1181                 :             :  * tuple was added to the pg_subscription catalog.
    1182                 :             : */
    1183                 :             : void
    1184                 :           3 : ApplyLauncherWakeupAtCommit(void)
    1185                 :             : {
    1186         [ -  + ]:           3 :         if (!on_commit_launcher_wakeup)
    1187                 :           3 :                 on_commit_launcher_wakeup = true;
    1188                 :           3 : }
    1189                 :             : 
    1190                 :             : /*
    1191                 :             :  * Wakeup the launcher immediately.
    1192                 :             :  */
    1193                 :             : void
    1194                 :           4 : ApplyLauncherWakeup(void)
    1195                 :             : {
    1196         [ -  + ]:           4 :         if (LogicalRepCtx->launcher_pid != 0)
    1197                 :           4 :                 kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1198                 :           4 : }
    1199                 :             : 
    1200                 :             : /*
    1201                 :             :  * Main loop for the apply launcher process.
    1202                 :             :  */
    1203                 :             : void
    1204                 :           1 : ApplyLauncherMain(Datum main_arg)
    1205                 :             : {
    1206   [ -  +  +  + ]:           1 :         ereport(DEBUG1,
    1207                 :             :                         (errmsg_internal("logical replication launcher started")));
    1208                 :             : 
    1209                 :           1 :         before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1210                 :             : 
    1211         [ +  - ]:           1 :         Assert(LogicalRepCtx->launcher_pid == 0);
    1212                 :           0 :         LogicalRepCtx->launcher_pid = MyProcPid;
    1213                 :             : 
    1214                 :             :         /* Establish signal handlers. */
    1215                 :           0 :         pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1216                 :           0 :         pqsignal(SIGTERM, die);
    1217                 :           0 :         BackgroundWorkerUnblockSignals();
    1218                 :             : 
    1219                 :             :         /*
    1220                 :             :          * Establish connection to nailed catalogs (we only ever access
    1221                 :             :          * pg_subscription).
    1222                 :             :          */
    1223                 :           0 :         BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1224                 :             : 
    1225                 :             :         /*
    1226                 :             :          * Acquire the conflict detection slot at startup to ensure it can be
    1227                 :             :          * dropped if no longer needed after a restart.
    1228                 :             :          */
    1229                 :           0 :         acquire_conflict_slot_if_exists();
    1230                 :             : 
    1231                 :             :         /* Enter main loop */
    1232                 :           2 :         for (;;)
    1233                 :             :         {
    1234                 :           2 :                 int                     rc;
    1235                 :           2 :                 List       *sublist;
    1236                 :           2 :                 ListCell   *lc;
    1237                 :           2 :                 MemoryContext subctx;
    1238                 :           2 :                 MemoryContext oldctx;
    1239                 :           2 :                 long            wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1240                 :           2 :                 bool            can_update_xmin = true;
    1241                 :           2 :                 bool            retain_dead_tuples = false;
    1242                 :           2 :                 TransactionId xmin = InvalidTransactionId;
    1243                 :             : 
    1244         [ +  - ]:           2 :                 CHECK_FOR_INTERRUPTS();
    1245                 :             : 
    1246                 :             :                 /* Use temporary context to avoid leaking memory across cycles. */
    1247                 :           2 :                 subctx = AllocSetContextCreate(TopMemoryContext,
    1248                 :             :                                                                            "Logical Replication Launcher sublist",
    1249                 :             :                                                                            ALLOCSET_DEFAULT_SIZES);
    1250                 :           2 :                 oldctx = MemoryContextSwitchTo(subctx);
    1251                 :             : 
    1252                 :             :                 /*
    1253                 :             :                  * Start any missing workers for enabled subscriptions.
    1254                 :             :                  *
    1255                 :             :                  * Also, during the iteration through all subscriptions, we compute
    1256                 :             :                  * the minimum XID required to protect deleted tuples for conflict
    1257                 :             :                  * detection if one of the subscription enables retain_dead_tuples
    1258                 :             :                  * option.
    1259                 :             :                  */
    1260                 :           2 :                 sublist = get_subscription_list();
    1261   [ -  +  #  #  :           2 :                 foreach(lc, sublist)
                   -  + ]
    1262                 :             :                 {
    1263                 :           0 :                         Subscription *sub = (Subscription *) lfirst(lc);
    1264                 :           0 :                         LogicalRepWorker *w;
    1265                 :           0 :                         TimestampTz last_start;
    1266                 :           0 :                         TimestampTz now;
    1267                 :           0 :                         long            elapsed;
    1268                 :             : 
    1269         [ #  # ]:           0 :                         if (sub->retaindeadtuples)
    1270                 :             :                         {
    1271                 :           0 :                                 retain_dead_tuples = true;
    1272                 :             : 
    1273                 :             :                                 /*
    1274                 :             :                                  * Create a replication slot to retain information necessary
    1275                 :             :                                  * for conflict detection such as dead tuples, commit
    1276                 :             :                                  * timestamps, and origins.
    1277                 :             :                                  *
    1278                 :             :                                  * The slot is created before starting the apply worker to
    1279                 :             :                                  * prevent it from unnecessarily maintaining its
    1280                 :             :                                  * oldest_nonremovable_xid.
    1281                 :             :                                  *
    1282                 :             :                                  * The slot is created even for a disabled subscription to
    1283                 :             :                                  * ensure that conflict-related information is available when
    1284                 :             :                                  * applying remote changes that occurred before the
    1285                 :             :                                  * subscription was enabled.
    1286                 :             :                                  */
    1287                 :           0 :                                 CreateConflictDetectionSlot();
    1288                 :             : 
    1289         [ #  # ]:           0 :                                 if (sub->retentionactive)
    1290                 :             :                                 {
    1291                 :             :                                         /*
    1292                 :             :                                          * Can't advance xmin of the slot unless all the
    1293                 :             :                                          * subscriptions actively retaining dead tuples are
    1294                 :             :                                          * enabled. This is required to ensure that we don't
    1295                 :             :                                          * advance the xmin of CONFLICT_DETECTION_SLOT if one of
    1296                 :             :                                          * the subscriptions is not enabled. Otherwise, we won't
    1297                 :             :                                          * be able to detect conflicts reliably for such a
    1298                 :             :                                          * subscription even though it has set the
    1299                 :             :                                          * retain_dead_tuples option.
    1300                 :             :                                          */
    1301                 :           0 :                                         can_update_xmin &= sub->enabled;
    1302                 :             : 
    1303                 :             :                                         /*
    1304                 :             :                                          * Initialize the slot once the subscription activates
    1305                 :             :                                          * retention.
    1306                 :             :                                          */
    1307         [ #  # ]:           0 :                                         if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
    1308                 :           0 :                                                 init_conflict_slot_xmin();
    1309                 :           0 :                                 }
    1310                 :           0 :                         }
    1311                 :             : 
    1312         [ #  # ]:           0 :                         if (!sub->enabled)
    1313                 :           0 :                                 continue;
    1314                 :             : 
    1315                 :           0 :                         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1316                 :           0 :                         w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
    1317                 :             :                                                                            false);
    1318                 :             : 
    1319         [ #  # ]:           0 :                         if (w != NULL)
    1320                 :             :                         {
    1321                 :             :                                 /*
    1322                 :             :                                  * Compute the minimum xmin required to protect dead tuples
    1323                 :             :                                  * required for conflict detection among all running apply
    1324                 :             :                                  * workers. This computation is performed while holding
    1325                 :             :                                  * LogicalRepWorkerLock to prevent accessing invalid worker
    1326                 :             :                                  * data, in scenarios where a worker might exit and reset its
    1327                 :             :                                  * state concurrently.
    1328                 :             :                                  */
    1329         [ #  # ]:           0 :                                 if (sub->retaindeadtuples &&
    1330   [ #  #  #  # ]:           0 :                                         sub->retentionactive &&
    1331                 :           0 :                                         can_update_xmin)
    1332                 :           0 :                                         compute_min_nonremovable_xid(w, &xmin);
    1333                 :             : 
    1334                 :           0 :                                 LWLockRelease(LogicalRepWorkerLock);
    1335                 :             : 
    1336                 :             :                                 /* worker is running already */
    1337                 :           0 :                                 continue;
    1338                 :             :                         }
    1339                 :             : 
    1340                 :           0 :                         LWLockRelease(LogicalRepWorkerLock);
    1341                 :             : 
    1342                 :             :                         /*
    1343                 :             :                          * Can't advance xmin of the slot unless all the workers
    1344                 :             :                          * corresponding to subscriptions actively retaining dead tuples
    1345                 :             :                          * are running, disabling the further computation of the minimum
    1346                 :             :                          * nonremovable xid.
    1347                 :             :                          */
    1348   [ #  #  #  # ]:           0 :                         if (sub->retaindeadtuples && sub->retentionactive)
    1349                 :           0 :                                 can_update_xmin = false;
    1350                 :             : 
    1351                 :             :                         /*
    1352                 :             :                          * If the worker is eligible to start now, launch it.  Otherwise,
    1353                 :             :                          * adjust wait_time so that we'll wake up as soon as it can be
    1354                 :             :                          * started.
    1355                 :             :                          *
    1356                 :             :                          * Each subscription's apply worker can only be restarted once per
    1357                 :             :                          * wal_retrieve_retry_interval, so that errors do not cause us to
    1358                 :             :                          * repeatedly restart the worker as fast as possible.  In cases
    1359                 :             :                          * where a restart is expected (e.g., subscription parameter
    1360                 :             :                          * changes), another process should remove the last-start entry
    1361                 :             :                          * for the subscription so that the worker can be restarted
    1362                 :             :                          * without waiting for wal_retrieve_retry_interval to elapse.
    1363                 :             :                          */
    1364                 :           0 :                         last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1365                 :           0 :                         now = GetCurrentTimestamp();
    1366   [ #  #  #  # ]:           0 :                         if (last_start == 0 ||
    1367                 :           0 :                                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1368                 :             :                         {
    1369                 :           0 :                                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1370         [ #  # ]:           0 :                                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1371                 :           0 :                                                                                           sub->dbid, sub->oid, sub->name,
    1372                 :           0 :                                                                                           sub->owner, InvalidOid,
    1373                 :             :                                                                                           DSM_HANDLE_INVALID,
    1374         [ #  # ]:           0 :                                                                                           sub->retaindeadtuples &&
    1375                 :           0 :                                                                                           sub->retentionactive))
    1376                 :             :                                 {
    1377                 :             :                                         /*
    1378                 :             :                                          * We get here either if we failed to launch a worker
    1379                 :             :                                          * (perhaps for resource-exhaustion reasons) or if we
    1380                 :             :                                          * launched one but it immediately quit.  Either way, it
    1381                 :             :                                          * seems appropriate to try again after
    1382                 :             :                                          * wal_retrieve_retry_interval.
    1383                 :             :                                          */
    1384         [ #  # ]:           0 :                                         wait_time = Min(wait_time,
    1385                 :             :                                                                         wal_retrieve_retry_interval);
    1386                 :           0 :                                 }
    1387                 :           0 :                         }
    1388                 :             :                         else
    1389                 :             :                         {
    1390         [ #  # ]:           0 :                                 wait_time = Min(wait_time,
    1391                 :             :                                                                 wal_retrieve_retry_interval - elapsed);
    1392                 :             :                         }
    1393      [ #  #  # ]:           0 :                 }
    1394                 :             : 
    1395                 :             :                 /*
    1396                 :             :                  * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
    1397                 :             :                  * that requires us to retain dead tuples. Otherwise, if required,
    1398                 :             :                  * advance the slot's xmin to protect dead tuples required for the
    1399                 :             :                  * conflict detection.
    1400                 :             :                  *
    1401                 :             :                  * Additionally, if all apply workers for subscriptions with
    1402                 :             :                  * retain_dead_tuples enabled have requested to stop retention, the
    1403                 :             :                  * slot's xmin will be set to InvalidTransactionId allowing the
    1404                 :             :                  * removal of dead tuples.
    1405                 :             :                  */
    1406         [ +  - ]:           2 :                 if (MyReplicationSlot)
    1407                 :             :                 {
    1408         [ #  # ]:           0 :                         if (!retain_dead_tuples)
    1409                 :           0 :                                 ReplicationSlotDropAcquired();
    1410         [ #  # ]:           0 :                         else if (can_update_xmin)
    1411                 :           0 :                                 update_conflict_slot_xmin(xmin);
    1412                 :           0 :                 }
    1413                 :             : 
    1414                 :             :                 /* Switch back to original memory context. */
    1415                 :           2 :                 MemoryContextSwitchTo(oldctx);
    1416                 :             :                 /* Clean the temporary memory. */
    1417                 :           2 :                 MemoryContextDelete(subctx);
    1418                 :             : 
    1419                 :             :                 /* Wait for more work. */
    1420                 :           4 :                 rc = WaitLatch(MyLatch,
    1421                 :             :                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1422                 :           2 :                                            wait_time,
    1423                 :             :                                            WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1424                 :             : 
    1425         [ -  + ]:           2 :                 if (rc & WL_LATCH_SET)
    1426                 :             :                 {
    1427                 :           2 :                         ResetLatch(MyLatch);
    1428         [ +  + ]:           2 :                         CHECK_FOR_INTERRUPTS();
    1429                 :           2 :                 }
    1430                 :             : 
    1431         [ +  - ]:           2 :                 if (ConfigReloadPending)
    1432                 :             :                 {
    1433                 :           0 :                         ConfigReloadPending = false;
    1434                 :           0 :                         ProcessConfigFile(PGC_SIGHUP);
    1435                 :           0 :                 }
    1436                 :           2 :         }
    1437                 :             : 
    1438                 :             :         /* Not reachable */
    1439                 :             : }
    1440                 :             : 
    1441                 :             : /*
    1442                 :             :  * Determine the minimum non-removable transaction ID across all apply workers
    1443                 :             :  * for subscriptions that have retain_dead_tuples enabled. Store the result
    1444                 :             :  * in *xmin.
    1445                 :             :  */
    1446                 :             : static void
    1447                 :           0 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
    1448                 :             : {
    1449                 :           0 :         TransactionId nonremovable_xid;
    1450                 :             : 
    1451         [ #  # ]:           0 :         Assert(worker != NULL);
    1452                 :             : 
    1453                 :             :         /*
    1454                 :             :          * The replication slot for conflict detection must be created before the
    1455                 :             :          * worker starts.
    1456                 :             :          */
    1457         [ #  # ]:           0 :         Assert(MyReplicationSlot);
    1458                 :             : 
    1459         [ #  # ]:           0 :         SpinLockAcquire(&worker->relmutex);
    1460                 :           0 :         nonremovable_xid = worker->oldest_nonremovable_xid;
    1461                 :           0 :         SpinLockRelease(&worker->relmutex);
    1462                 :             : 
    1463                 :             :         /*
    1464                 :             :          * Return if the apply worker has stopped retention concurrently.
    1465                 :             :          *
    1466                 :             :          * Although this function is invoked only when retentionactive is true,
    1467                 :             :          * the apply worker might stop retention after the launcher fetches the
    1468                 :             :          * retentionactive flag.
    1469                 :             :          */
    1470         [ #  # ]:           0 :         if (!TransactionIdIsValid(nonremovable_xid))
    1471                 :           0 :                 return;
    1472                 :             : 
    1473   [ #  #  #  # ]:           0 :         if (!TransactionIdIsValid(*xmin) ||
    1474                 :           0 :                 TransactionIdPrecedes(nonremovable_xid, *xmin))
    1475                 :           0 :                 *xmin = nonremovable_xid;
    1476         [ #  # ]:           0 : }
    1477                 :             : 
    1478                 :             : /*
    1479                 :             :  * Acquire the replication slot used to retain information for conflict
    1480                 :             :  * detection, if it exists.
    1481                 :             :  *
    1482                 :             :  * Return true if successfully acquired, otherwise return false.
    1483                 :             :  */
    1484                 :             : static bool
    1485                 :           1 : acquire_conflict_slot_if_exists(void)
    1486                 :             : {
    1487         [ -  + ]:           1 :         if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
    1488                 :           1 :                 return false;
    1489                 :             : 
    1490                 :           0 :         ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
    1491                 :           0 :         return true;
    1492                 :           1 : }
    1493                 :             : 
    1494                 :             : /*
    1495                 :             :  * Update the xmin the replication slot used to retain information required
    1496                 :             :  * for conflict detection.
    1497                 :             :  */
    1498                 :             : static void
    1499                 :           0 : update_conflict_slot_xmin(TransactionId new_xmin)
    1500                 :             : {
    1501         [ #  # ]:           0 :         Assert(MyReplicationSlot);
    1502   [ #  #  #  # ]:           0 :         Assert(!TransactionIdIsValid(new_xmin) ||
    1503                 :             :                    TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
    1504                 :             : 
    1505                 :             :         /* Return if the xmin value of the slot cannot be updated */
    1506         [ #  # ]:           0 :         if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
    1507                 :           0 :                 return;
    1508                 :             : 
    1509         [ #  # ]:           0 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1510                 :           0 :         MyReplicationSlot->effective_xmin = new_xmin;
    1511                 :           0 :         MyReplicationSlot->data.xmin = new_xmin;
    1512                 :           0 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1513                 :             : 
    1514   [ #  #  #  # ]:           0 :         elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
    1515                 :             : 
    1516                 :           0 :         ReplicationSlotMarkDirty();
    1517                 :           0 :         ReplicationSlotsComputeRequiredXmin(false);
    1518                 :             : 
    1519                 :             :         /*
    1520                 :             :          * Like PhysicalConfirmReceivedLocation(), do not save slot information
    1521                 :             :          * each time. This is acceptable because all concurrent transactions on
    1522                 :             :          * the publisher that require the data preceding the slot's xmin should
    1523                 :             :          * have already been applied and flushed on the subscriber before the xmin
    1524                 :             :          * is advanced. So, even if the slot's xmin regresses after a restart, it
    1525                 :             :          * will be advanced again in the next cycle. Therefore, no data required
    1526                 :             :          * for conflict detection will be prematurely removed.
    1527                 :             :          */
    1528                 :           0 :         return;
    1529                 :           0 : }
    1530                 :             : 
    1531                 :             : /*
    1532                 :             :  * Initialize the xmin for the conflict detection slot.
    1533                 :             :  */
    1534                 :             : static void
    1535                 :           0 : init_conflict_slot_xmin(void)
    1536                 :             : {
    1537                 :           0 :         TransactionId xmin_horizon;
    1538                 :             : 
    1539                 :             :         /* Replication slot must exist but shouldn't be initialized. */
    1540         [ #  # ]:           0 :         Assert(MyReplicationSlot &&
    1541                 :             :                    !TransactionIdIsValid(MyReplicationSlot->data.xmin));
    1542                 :             : 
    1543                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1544                 :           0 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    1545                 :             : 
    1546                 :           0 :         xmin_horizon = GetOldestSafeDecodingTransactionId(false);
    1547                 :             : 
    1548         [ #  # ]:           0 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1549                 :           0 :         MyReplicationSlot->effective_xmin = xmin_horizon;
    1550                 :           0 :         MyReplicationSlot->data.xmin = xmin_horizon;
    1551                 :           0 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1552                 :             : 
    1553                 :           0 :         ReplicationSlotsComputeRequiredXmin(true);
    1554                 :             : 
    1555                 :           0 :         LWLockRelease(ProcArrayLock);
    1556                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
    1557                 :             : 
    1558                 :             :         /* Write this slot to disk */
    1559                 :           0 :         ReplicationSlotMarkDirty();
    1560                 :           0 :         ReplicationSlotSave();
    1561                 :           0 : }
    1562                 :             : 
    1563                 :             : /*
    1564                 :             :  * Create and acquire the replication slot used to retain information for
    1565                 :             :  * conflict detection, if not yet.
    1566                 :             :  */
    1567                 :             : void
    1568                 :           0 : CreateConflictDetectionSlot(void)
    1569                 :             : {
    1570                 :             :         /* Exit early, if the replication slot is already created and acquired */
    1571         [ #  # ]:           0 :         if (MyReplicationSlot)
    1572                 :           0 :                 return;
    1573                 :             : 
    1574   [ #  #  #  # ]:           0 :         ereport(LOG,
    1575                 :             :                         errmsg("creating replication conflict detection slot"));
    1576                 :             : 
    1577                 :           0 :         ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
    1578                 :             :                                                   false, false);
    1579                 :             : 
    1580                 :           0 :         init_conflict_slot_xmin();
    1581                 :           0 : }
    1582                 :             : 
    1583                 :             : /*
    1584                 :             :  * Is current process the logical replication launcher?
    1585                 :             :  */
    1586                 :             : bool
    1587                 :           1 : IsLogicalLauncher(void)
    1588                 :             : {
    1589                 :           1 :         return LogicalRepCtx->launcher_pid == MyProcPid;
    1590                 :             : }
    1591                 :             : 
    1592                 :             : /*
    1593                 :             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1594                 :             :  * parallel apply worker, otherwise, return InvalidPid.
    1595                 :             :  */
    1596                 :             : pid_t
    1597                 :           5 : GetLeaderApplyWorkerPid(pid_t pid)
    1598                 :             : {
    1599                 :           5 :         int                     leader_pid = InvalidPid;
    1600                 :           5 :         int                     i;
    1601                 :             : 
    1602                 :           5 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1603                 :             : 
    1604         [ +  + ]:          25 :         for (i = 0; i < max_logical_replication_workers; i++)
    1605                 :             :         {
    1606                 :          20 :                 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1607                 :             : 
    1608   [ -  +  #  #  :          20 :                 if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
             #  #  #  # ]
    1609                 :             :                 {
    1610                 :           0 :                         leader_pid = w->leader_pid;
    1611                 :           0 :                         break;
    1612                 :             :                 }
    1613      [ -  -  + ]:          20 :         }
    1614                 :             : 
    1615                 :           5 :         LWLockRelease(LogicalRepWorkerLock);
    1616                 :             : 
    1617                 :          10 :         return leader_pid;
    1618                 :           5 : }
    1619                 :             : 
    1620                 :             : /*
    1621                 :             :  * Returns state of the subscriptions.
    1622                 :             :  */
    1623                 :             : Datum
    1624                 :           0 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1625                 :             : {
    1626                 :             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1627         [ #  # ]:           0 :         Oid                     subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1628                 :           0 :         int                     i;
    1629                 :           0 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1630                 :             : 
    1631                 :           0 :         InitMaterializedSRF(fcinfo, 0);
    1632                 :             : 
    1633                 :             :         /* Make sure we get consistent view of the workers. */
    1634                 :           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1635                 :             : 
    1636         [ #  # ]:           0 :         for (i = 0; i < max_logical_replication_workers; i++)
    1637                 :             :         {
    1638                 :             :                 /* for each row */
    1639                 :           0 :                 Datum           values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1640                 :           0 :                 bool            nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1641                 :           0 :                 int                     worker_pid;
    1642                 :           0 :                 LogicalRepWorker worker;
    1643                 :             : 
    1644                 :           0 :                 memcpy(&worker, &LogicalRepCtx->workers[i],
    1645                 :             :                            sizeof(LogicalRepWorker));
    1646   [ #  #  #  # ]:           0 :                 if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1647                 :           0 :                         continue;
    1648                 :             : 
    1649   [ #  #  #  # ]:           0 :                 if (OidIsValid(subid) && worker.subid != subid)
    1650                 :           0 :                         continue;
    1651                 :             : 
    1652                 :           0 :                 worker_pid = worker.proc->pid;
    1653                 :             : 
    1654                 :           0 :                 values[0] = ObjectIdGetDatum(worker.subid);
    1655   [ #  #  #  # ]:           0 :                 if (isTableSyncWorker(&worker))
    1656                 :           0 :                         values[1] = ObjectIdGetDatum(worker.relid);
    1657                 :             :                 else
    1658                 :           0 :                         nulls[1] = true;
    1659                 :           0 :                 values[2] = Int32GetDatum(worker_pid);
    1660                 :             : 
    1661   [ #  #  #  # ]:           0 :                 if (isParallelApplyWorker(&worker))
    1662                 :           0 :                         values[3] = Int32GetDatum(worker.leader_pid);
    1663                 :             :                 else
    1664                 :           0 :                         nulls[3] = true;
    1665                 :             : 
    1666         [ #  # ]:           0 :                 if (!XLogRecPtrIsValid(worker.last_lsn))
    1667                 :           0 :                         nulls[4] = true;
    1668                 :             :                 else
    1669                 :           0 :                         values[4] = LSNGetDatum(worker.last_lsn);
    1670         [ #  # ]:           0 :                 if (worker.last_send_time == 0)
    1671                 :           0 :                         nulls[5] = true;
    1672                 :             :                 else
    1673                 :           0 :                         values[5] = TimestampTzGetDatum(worker.last_send_time);
    1674         [ #  # ]:           0 :                 if (worker.last_recv_time == 0)
    1675                 :           0 :                         nulls[6] = true;
    1676                 :             :                 else
    1677                 :           0 :                         values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1678         [ #  # ]:           0 :                 if (!XLogRecPtrIsValid(worker.reply_lsn))
    1679                 :           0 :                         nulls[7] = true;
    1680                 :             :                 else
    1681                 :           0 :                         values[7] = LSNGetDatum(worker.reply_lsn);
    1682         [ #  # ]:           0 :                 if (worker.reply_time == 0)
    1683                 :           0 :                         nulls[8] = true;
    1684                 :             :                 else
    1685                 :           0 :                         values[8] = TimestampTzGetDatum(worker.reply_time);
    1686                 :             : 
    1687   [ #  #  #  #  :           0 :                 switch (worker.type)
                   #  # ]
    1688                 :             :                 {
    1689                 :             :                         case WORKERTYPE_APPLY:
    1690                 :           0 :                                 values[9] = CStringGetTextDatum("apply");
    1691                 :           0 :                                 break;
    1692                 :             :                         case WORKERTYPE_PARALLEL_APPLY:
    1693                 :           0 :                                 values[9] = CStringGetTextDatum("parallel apply");
    1694                 :           0 :                                 break;
    1695                 :             :                         case WORKERTYPE_SEQUENCESYNC:
    1696                 :           0 :                                 values[9] = CStringGetTextDatum("sequence synchronization");
    1697                 :           0 :                                 break;
    1698                 :             :                         case WORKERTYPE_TABLESYNC:
    1699                 :           0 :                                 values[9] = CStringGetTextDatum("table synchronization");
    1700                 :           0 :                                 break;
    1701                 :             :                         case WORKERTYPE_UNKNOWN:
    1702                 :             :                                 /* Should never happen. */
    1703   [ #  #  #  # ]:           0 :                                 elog(ERROR, "unknown worker type");
    1704                 :           0 :                 }
    1705                 :             : 
    1706                 :           0 :                 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1707                 :           0 :                                                          values, nulls);
    1708                 :             : 
    1709                 :             :                 /*
    1710                 :             :                  * If only a single subscription was requested, and we found it,
    1711                 :             :                  * break.
    1712                 :             :                  */
    1713         [ #  # ]:           0 :                 if (OidIsValid(subid))
    1714                 :           0 :                         break;
    1715   [ #  #  #  # ]:           0 :         }
    1716                 :             : 
    1717                 :           0 :         LWLockRelease(LogicalRepWorkerLock);
    1718                 :             : 
    1719                 :           0 :         return (Datum) 0;
    1720                 :           0 : }
        

Generated by: LCOV version 2.3.2-1