LCOV - code coverage report
Current view: top level - src/backend/replication/logical - slotsync.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 4.1 % 651 27
Test Date: 2026-01-26 10:56:24 Functions: 13.3 % 30 4
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 1.0 % 525 5

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  * slotsync.c
       3                 :             :  *         Functionality for synchronizing slots to a standby server from the
       4                 :             :  *         primary server.
       5                 :             :  *
       6                 :             :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
       7                 :             :  *
       8                 :             :  * IDENTIFICATION
       9                 :             :  *        src/backend/replication/logical/slotsync.c
      10                 :             :  *
      11                 :             :  * This file contains the code for slot synchronization on a physical standby
      12                 :             :  * to fetch logical failover slots information from the primary server, create
      13                 :             :  * the slots on the standby and synchronize them periodically.
      14                 :             :  *
      15                 :             :  * Slot synchronization can be performed either automatically by enabling slot
      16                 :             :  * sync worker or manually by calling SQL function pg_sync_replication_slots().
      17                 :             :  *
      18                 :             :  * If the WAL corresponding to the remote's restart_lsn is not available on the
      19                 :             :  * physical standby or the remote's catalog_xmin precedes the oldest xid for
      20                 :             :  * which it is guaranteed that rows wouldn't have been removed then we cannot
      21                 :             :  * create the local standby slot because that would mean moving the local slot
      22                 :             :  * backward and decoding won't be possible via such a slot. In this case, the
      23                 :             :  * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
      24                 :             :  * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
      25                 :             :  * which slot sync worker can perform the sync periodically or user can call
      26                 :             :  * pg_sync_replication_slots() periodically to perform the syncs.
      27                 :             :  *
      28                 :             :  * If synchronized slots fail to build a consistent snapshot from the
      29                 :             :  * restart_lsn before reaching confirmed_flush_lsn, they would become
      30                 :             :  * unreliable after promotion due to potential data loss from changes
      31                 :             :  * before reaching a consistent point. This can happen because the slots can
      32                 :             :  * be synced at some random time and we may not reach the consistent point
      33                 :             :  * at the same WAL location as the primary. So, we mark such slots as
      34                 :             :  * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
      35                 :             :  * consistent point, they will be marked as RS_PERSISTENT.
      36                 :             :  *
      37                 :             :  * The slot sync worker waits for some time before the next synchronization,
      38                 :             :  * with the duration varying based on whether any slots were updated during
      39                 :             :  * the last cycle. Refer to the comments above wait_for_slot_activity() for
      40                 :             :  * more details.
      41                 :             :  *
      42                 :             :  * If the SQL function pg_sync_replication_slots() is used to sync the slots,
      43                 :             :  * and if the slots are not ready to be synced and are marked as RS_TEMPORARY
      44                 :             :  * because of any of the reasons mentioned above, then the SQL function also
      45                 :             :  * waits and retries until the slots are marked as RS_PERSISTENT (which means
      46                 :             :  * sync-ready). Refer to the comments in SyncReplicationSlots() for more
      47                 :             :  * details.
      48                 :             :  *
      49                 :             :  * Any standby synchronized slots will be dropped if they no longer need
      50                 :             :  * to be synchronized. See comment atop drop_local_obsolete_slots() for more
      51                 :             :  * details.
      52                 :             :  *---------------------------------------------------------------------------
      53                 :             :  */
      54                 :             : 
      55                 :             : #include "postgres.h"
      56                 :             : 
      57                 :             : #include <time.h>
      58                 :             : 
      59                 :             : #include "access/xlog_internal.h"
      60                 :             : #include "access/xlogrecovery.h"
      61                 :             : #include "catalog/pg_database.h"
      62                 :             : #include "libpq/pqsignal.h"
      63                 :             : #include "pgstat.h"
      64                 :             : #include "postmaster/interrupt.h"
      65                 :             : #include "replication/logical.h"
      66                 :             : #include "replication/slotsync.h"
      67                 :             : #include "replication/snapbuild.h"
      68                 :             : #include "storage/ipc.h"
      69                 :             : #include "storage/lmgr.h"
      70                 :             : #include "storage/proc.h"
      71                 :             : #include "storage/procarray.h"
      72                 :             : #include "tcop/tcopprot.h"
      73                 :             : #include "utils/builtins.h"
      74                 :             : #include "utils/memutils.h"
      75                 :             : #include "utils/pg_lsn.h"
      76                 :             : #include "utils/ps_status.h"
      77                 :             : #include "utils/timeout.h"
      78                 :             : 
      79                 :             : /*
      80                 :             :  * Struct for sharing information to control slot synchronization.
      81                 :             :  *
      82                 :             :  * The 'pid' is either the slot sync worker's pid or the backend's pid running
      83                 :             :  * the SQL function pg_sync_replication_slots(). When the startup process sets
      84                 :             :  * 'stopSignaled' during promotion, it uses this 'pid' to wake up the currently
      85                 :             :  * synchronizing process so that the process can immediately stop its
      86                 :             :  * synchronizing work on seeing 'stopSignaled' set.
      87                 :             :  * Setting 'stopSignaled' is also used to handle the race condition when the
      88                 :             :  * postmaster has not noticed the promotion yet and thus may end up restarting
      89                 :             :  * the slot sync worker. If 'stopSignaled' is set, the worker will exit in such a
      90                 :             :  * case. The SQL function pg_sync_replication_slots() will also error out if
      91                 :             :  * this flag is set. Note that we don't need to reset this variable as after
      92                 :             :  * promotion the slot sync worker won't be restarted because the pmState
      93                 :             :  * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
      94                 :             :  * primary without restarting the server. See LaunchMissingBackgroundProcesses.
      95                 :             :  *
      96                 :             :  * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
      97                 :             :  * overwrites.
      98                 :             :  *
      99                 :             :  * The 'last_start_time' is needed by postmaster to start the slot sync worker
     100                 :             :  * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where an immediate restart
     101                 :             :  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
     102                 :             :  * last_start_time before exiting, so that postmaster can start the worker
     103                 :             :  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
     104                 :             :  */
     105                 :             : typedef struct SlotSyncCtxStruct
     106                 :             : {
     107                 :             :         pid_t           pid;
     108                 :             :         bool            stopSignaled;
     109                 :             :         bool            syncing;
     110                 :             :         time_t          last_start_time;
     111                 :             :         slock_t         mutex;
     112                 :             : } SlotSyncCtxStruct;
     113                 :             : 
     114                 :             : static SlotSyncCtxStruct *SlotSyncCtx = NULL;
     115                 :             : 
     116                 :             : /* GUC variable */
     117                 :             : bool            sync_replication_slots = false;
     118                 :             : 
     119                 :             : /*
     120                 :             :  * The sleep time (ms) between slot-sync cycles varies dynamically
     121                 :             :  * (within a MIN/MAX range) according to slot activity. See
     122                 :             :  * wait_for_slot_activity() for details.
     123                 :             :  */
     124                 :             : #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
     125                 :             : #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000   /* 30s */
     126                 :             : 
     127                 :             : static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
     128                 :             : 
     129                 :             : /* The restart interval for slot sync work used by postmaster */
     130                 :             : #define SLOTSYNC_RESTART_INTERVAL_SEC 10
     131                 :             : 
     132                 :             : /*
     133                 :             :  * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
     134                 :             :  * in SlotSyncCtxStruct, this flag is true only if the current process is
     135                 :             :  * performing slot synchronization.
     136                 :             :  */
     137                 :             : static bool syncing_slots = false;
     138                 :             : 
     139                 :             : /*
     140                 :             :  * Structure to hold information fetched from the primary server about a logical
     141                 :             :  * replication slot.
     142                 :             :  */
     143                 :             : typedef struct RemoteSlot
     144                 :             : {
     145                 :             :         char       *name;
     146                 :             :         char       *plugin;
     147                 :             :         char       *database;
     148                 :             :         bool            two_phase;
     149                 :             :         bool            failover;
     150                 :             :         XLogRecPtr      restart_lsn;
     151                 :             :         XLogRecPtr      confirmed_lsn;
     152                 :             :         XLogRecPtr      two_phase_at;
     153                 :             :         TransactionId catalog_xmin;
     154                 :             : 
     155                 :             :         /* RS_INVAL_NONE if valid, or the reason of invalidation */
     156                 :             :         ReplicationSlotInvalidationCause invalidated;
     157                 :             : } RemoteSlot;
     158                 :             : 
     159                 :             : static void slotsync_failure_callback(int code, Datum arg);
     160                 :             : static void update_synced_slots_inactive_since(void);
     161                 :             : 
     162                 :             : /*
     163                 :             :  * Update slot sync skip stats. This function requires the caller to acquire
     164                 :             :  * the slot.
     165                 :             :  */
     166                 :             : static void
     167                 :           0 : update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
     168                 :             : {
     169                 :           0 :         ReplicationSlot *slot;
     170                 :             : 
     171         [ #  # ]:           0 :         Assert(MyReplicationSlot);
     172                 :             : 
     173                 :           0 :         slot = MyReplicationSlot;
     174                 :             : 
     175                 :             :         /*
     176                 :             :          * Update the slot sync related stats in pg_stat_replication_slots when a
     177                 :             :          * slot sync is skipped
     178                 :             :          */
     179         [ #  # ]:           0 :         if (skip_reason != SS_SKIP_NONE)
     180                 :           0 :                 pgstat_report_replslotsync(slot);
     181                 :             : 
     182                 :             :         /* Update the slot sync skip reason */
     183         [ #  # ]:           0 :         if (slot->slotsync_skip_reason != skip_reason)
     184                 :             :         {
     185         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     186                 :           0 :                 slot->slotsync_skip_reason = skip_reason;
     187                 :           0 :                 SpinLockRelease(&slot->mutex);
     188                 :           0 :         }
     189                 :           0 : }
     190                 :             : 
     191                 :             : /*
     192                 :             :  * If necessary, update the local synced slot's metadata based on the data
     193                 :             :  * from the remote slot.
     194                 :             :  *
     195                 :             :  * If no update was needed (the data of the remote slot is the same as the
     196                 :             :  * local slot) return false, otherwise true.
     197                 :             :  *
     198                 :             :  * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
     199                 :             :  * modified, and decoding from the corresponding LSN's can reach a
     200                 :             :  * consistent snapshot.
     201                 :             :  *
     202                 :             :  * *remote_slot_precedes will be true if the remote slot's LSN or xmin
     203                 :             :  * precedes locally reserved position.
     204                 :             :  */
     205                 :             : static bool
     206                 :           0 : update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     207                 :             :                                                  bool *found_consistent_snapshot,
     208                 :             :                                                  bool *remote_slot_precedes)
     209                 :             : {
     210                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
     211                 :           0 :         bool            updated_xmin_or_lsn = false;
     212                 :           0 :         bool            updated_config = false;
     213                 :           0 :         SlotSyncSkipReason skip_reason = SS_SKIP_NONE;
     214                 :             : 
     215         [ #  # ]:           0 :         Assert(slot->data.invalidated == RS_INVAL_NONE);
     216                 :             : 
     217         [ #  # ]:           0 :         if (found_consistent_snapshot)
     218                 :           0 :                 *found_consistent_snapshot = false;
     219                 :             : 
     220         [ #  # ]:           0 :         if (remote_slot_precedes)
     221                 :           0 :                 *remote_slot_precedes = false;
     222                 :             : 
     223                 :             :         /*
     224                 :             :          * Don't overwrite if we already have a newer catalog_xmin and
     225                 :             :          * restart_lsn.
     226                 :             :          */
     227   [ #  #  #  # ]:           0 :         if (remote_slot->restart_lsn < slot->data.restart_lsn ||
     228                 :           0 :                 TransactionIdPrecedes(remote_slot->catalog_xmin,
     229                 :           0 :                                                           slot->data.catalog_xmin))
     230                 :             :         {
     231                 :             :                 /* Update slot sync skip stats */
     232                 :           0 :                 update_slotsync_skip_stats(SS_SKIP_WAL_OR_ROWS_REMOVED);
     233                 :             : 
     234                 :             :                 /*
     235                 :             :                  * This can happen in following situations:
     236                 :             :                  *
     237                 :             :                  * If the slot is temporary, it means either the initial WAL location
     238                 :             :                  * reserved for the local slot is ahead of the remote slot's
     239                 :             :                  * restart_lsn or the initial xmin_horizon computed for the local slot
     240                 :             :                  * is ahead of the remote slot.
     241                 :             :                  *
     242                 :             :                  * If the slot is persistent, both restart_lsn and catalog_xmin of the
     243                 :             :                  * synced slot could still be ahead of the remote slot. Since we use
     244                 :             :                  * slot advance functionality to keep snapbuild/slot updated, it is
     245                 :             :                  * possible that the restart_lsn and catalog_xmin are advanced to a
     246                 :             :                  * later position than it has on the primary. This can happen when
     247                 :             :                  * slot advancing machinery finds running xacts record after reaching
     248                 :             :                  * the consistent state at a later point than the primary where it
     249                 :             :                  * serializes the snapshot and updates the restart_lsn.
     250                 :             :                  *
     251                 :             :                  * We LOG the message if the slot is temporary as it can help the user
     252                 :             :                  * to understand why the slot is not sync-ready. In the case of a
     253                 :             :                  * persistent slot, it would be a more common case and won't directly
     254                 :             :                  * impact the users, so we used DEBUG1 level to log the message.
     255                 :             :                  */
     256   [ #  #  #  #  :           0 :                 ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
          #  #  #  #  #  
                      # ]
     257                 :             :                                 errmsg("could not synchronize replication slot \"%s\"",
     258                 :             :                                            remote_slot->name),
     259                 :             :                                 errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
     260                 :             :                                                   LSN_FORMAT_ARGS(remote_slot->restart_lsn),
     261                 :             :                                                   remote_slot->catalog_xmin,
     262                 :             :                                                   LSN_FORMAT_ARGS(slot->data.restart_lsn),
     263                 :             :                                                   slot->data.catalog_xmin));
     264                 :             : 
     265         [ #  # ]:           0 :                 if (remote_slot_precedes)
     266                 :           0 :                         *remote_slot_precedes = true;
     267                 :             : 
     268                 :             :                 /*
     269                 :             :                  * Skip updating the configuration. This is required to avoid syncing
     270                 :             :                  * two_phase_at without syncing confirmed_lsn. Otherwise, the prepared
     271                 :             :                  * transaction between old confirmed_lsn and two_phase_at will
     272                 :             :                  * unexpectedly get decoded and sent to the downstream after
     273                 :             :                  * promotion. See comments in ReorderBufferFinishPrepared.
     274                 :             :                  */
     275                 :           0 :                 return false;
     276                 :             :         }
     277                 :             : 
     278                 :             :         /*
     279                 :             :          * Attempt to sync LSNs and xmins only if remote slot is ahead of local
     280                 :             :          * slot.
     281                 :             :          */
     282         [ #  # ]:           0 :         if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
     283   [ #  #  #  # ]:           0 :                 remote_slot->restart_lsn > slot->data.restart_lsn ||
     284                 :           0 :                 TransactionIdFollows(remote_slot->catalog_xmin,
     285                 :           0 :                                                          slot->data.catalog_xmin))
     286                 :             :         {
     287                 :             :                 /*
     288                 :             :                  * We can't directly copy the remote slot's LSN or xmin unless there
     289                 :             :                  * exists a consistent snapshot at that point. Otherwise, after
     290                 :             :                  * promotion, the slots may not reach a consistent point before the
     291                 :             :                  * confirmed_flush_lsn which can lead to a data loss. To avoid data
     292                 :             :                  * loss, we let slot machinery advance the slot which ensures that
     293                 :             :                  * snapbuilder/slot statuses are updated properly.
     294                 :             :                  */
     295         [ #  # ]:           0 :                 if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
     296                 :             :                 {
     297                 :             :                         /*
     298                 :             :                          * Update the slot info directly if there is a serialized snapshot
     299                 :             :                          * at the restart_lsn, as the slot can quickly reach consistency
     300                 :             :                          * at restart_lsn by restoring the snapshot.
     301                 :             :                          */
     302         [ #  # ]:           0 :                         SpinLockAcquire(&slot->mutex);
     303                 :           0 :                         slot->data.restart_lsn = remote_slot->restart_lsn;
     304                 :           0 :                         slot->data.confirmed_flush = remote_slot->confirmed_lsn;
     305                 :           0 :                         slot->data.catalog_xmin = remote_slot->catalog_xmin;
     306                 :           0 :                         SpinLockRelease(&slot->mutex);
     307                 :             : 
     308         [ #  # ]:           0 :                         if (found_consistent_snapshot)
     309                 :           0 :                                 *found_consistent_snapshot = true;
     310                 :           0 :                 }
     311                 :             :                 else
     312                 :             :                 {
     313                 :           0 :                         LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
     314                 :           0 :                                                                                                 found_consistent_snapshot);
     315                 :             : 
     316                 :             :                         /* Sanity check */
     317         [ #  # ]:           0 :                         if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
     318   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
     319                 :             :                                                 errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
     320                 :             :                                                                                 remote_slot->name),
     321                 :             :                                                 errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
     322                 :             :                                                                                    LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     323                 :             :                                                                                    LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
     324                 :             : 
     325                 :             :                         /*
     326                 :             :                          * If we can't reach a consistent snapshot, the slot won't be
     327                 :             :                          * persisted. See update_and_persist_local_synced_slot().
     328                 :             :                          */
     329   [ #  #  #  # ]:           0 :                         if (found_consistent_snapshot && !(*found_consistent_snapshot))
     330                 :           0 :                                 skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
     331                 :             :                 }
     332                 :             : 
     333                 :           0 :                 updated_xmin_or_lsn = true;
     334                 :           0 :         }
     335                 :             : 
     336                 :             :         /* Update slot sync skip stats */
     337                 :           0 :         update_slotsync_skip_stats(skip_reason);
     338                 :             : 
     339         [ #  # ]:           0 :         if (remote_dbid != slot->data.database ||
     340         [ #  # ]:           0 :                 remote_slot->two_phase != slot->data.two_phase ||
     341         [ #  # ]:           0 :                 remote_slot->failover != slot->data.failover ||
     342   [ #  #  #  # ]:           0 :                 strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
     343                 :           0 :                 remote_slot->two_phase_at != slot->data.two_phase_at)
     344                 :             :         {
     345                 :           0 :                 NameData        plugin_name;
     346                 :             : 
     347                 :             :                 /* Avoid expensive operations while holding a spinlock. */
     348                 :           0 :                 namestrcpy(&plugin_name, remote_slot->plugin);
     349                 :             : 
     350         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     351                 :           0 :                 slot->data.plugin = plugin_name;
     352                 :           0 :                 slot->data.database = remote_dbid;
     353                 :           0 :                 slot->data.two_phase = remote_slot->two_phase;
     354                 :           0 :                 slot->data.two_phase_at = remote_slot->two_phase_at;
     355                 :           0 :                 slot->data.failover = remote_slot->failover;
     356                 :           0 :                 SpinLockRelease(&slot->mutex);
     357                 :             : 
     358                 :           0 :                 updated_config = true;
     359                 :             : 
     360                 :             :                 /*
     361                 :             :                  * Ensure that there is no risk of sending prepared transactions
     362                 :             :                  * unexpectedly after the promotion.
     363                 :             :                  */
     364         [ #  # ]:           0 :                 Assert(slot->data.two_phase_at <= slot->data.confirmed_flush);
     365                 :           0 :         }
     366                 :             : 
     367                 :             :         /*
     368                 :             :          * We have to write the changed xmin to disk *before* we change the
     369                 :             :          * in-memory value, otherwise after a crash we wouldn't know that some
     370                 :             :          * catalog tuples might have been removed already.
     371                 :             :          */
     372   [ #  #  #  # ]:           0 :         if (updated_config || updated_xmin_or_lsn)
     373                 :             :         {
     374                 :           0 :                 ReplicationSlotMarkDirty();
     375                 :           0 :                 ReplicationSlotSave();
     376                 :           0 :         }
     377                 :             : 
     378                 :             :         /*
     379                 :             :          * Now the new xmin is safely on disk, we can let the global value
     380                 :             :          * advance. We do not take ProcArrayLock or similar since we only advance
     381                 :             :          * xmin here and there's not much harm done by a concurrent computation
     382                 :             :          * missing that.
     383                 :             :          */
     384         [ #  # ]:           0 :         if (updated_xmin_or_lsn)
     385                 :             :         {
     386         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     387                 :           0 :                 slot->effective_catalog_xmin = remote_slot->catalog_xmin;
     388                 :           0 :                 SpinLockRelease(&slot->mutex);
     389                 :             : 
     390                 :           0 :                 ReplicationSlotsComputeRequiredXmin(false);
     391                 :           0 :                 ReplicationSlotsComputeRequiredLSN();
     392                 :           0 :         }
     393                 :             : 
     394         [ #  # ]:           0 :         return updated_config || updated_xmin_or_lsn;
     395                 :           0 : }
     396                 :             : 
     397                 :             : /*
     398                 :             :  * Get the list of local logical slots that are synchronized from the
     399                 :             :  * primary server.
     400                 :             :  */
     401                 :             : static List *
     402                 :           0 : get_local_synced_slots(void)
     403                 :             : {
     404                 :           0 :         List       *local_slots = NIL;
     405                 :             : 
     406                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     407                 :             : 
     408         [ #  # ]:           0 :         for (int i = 0; i < max_replication_slots; i++)
     409                 :             :         {
     410                 :           0 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     411                 :             : 
     412                 :             :                 /* Check if it is a synchronized slot */
     413   [ #  #  #  # ]:           0 :                 if (s->in_use && s->data.synced)
     414                 :             :                 {
     415         [ #  # ]:           0 :                         Assert(SlotIsLogical(s));
     416                 :           0 :                         local_slots = lappend(local_slots, s);
     417                 :           0 :                 }
     418                 :           0 :         }
     419                 :             : 
     420                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
     421                 :             : 
     422                 :           0 :         return local_slots;
     423                 :           0 : }
     424                 :             : 
     425                 :             : /*
     426                 :             :  * Helper function to check if local_slot is required to be retained.
     427                 :             :  *
     428                 :             :  * Return false either if local_slot does not exist in the remote_slots list
     429                 :             :  * or is invalidated while the corresponding remote slot is still valid,
     430                 :             :  * otherwise true.
     431                 :             :  */
     432                 :             : static bool
     433                 :           0 : local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
     434                 :             : {
     435                 :           0 :         bool            remote_exists = false;
     436                 :           0 :         bool            locally_invalidated = false;
     437                 :             : 
     438   [ #  #  #  #  :           0 :         foreach_ptr(RemoteSlot, remote_slot, remote_slots)
             #  #  #  # ]
     439                 :             :         {
     440         [ #  # ]:           0 :                 if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
     441                 :             :                 {
     442                 :           0 :                         remote_exists = true;
     443                 :             : 
     444                 :             :                         /*
     445                 :             :                          * If remote slot is not invalidated but local slot is marked as
     446                 :             :                          * invalidated, then set locally_invalidated flag.
     447                 :             :                          */
     448         [ #  # ]:           0 :                         SpinLockAcquire(&local_slot->mutex);
     449                 :           0 :                         locally_invalidated =
     450         [ #  # ]:           0 :                                 (remote_slot->invalidated == RS_INVAL_NONE) &&
     451                 :           0 :                                 (local_slot->data.invalidated != RS_INVAL_NONE);
     452                 :           0 :                         SpinLockRelease(&local_slot->mutex);
     453                 :             : 
     454                 :           0 :                         break;
     455                 :             :                 }
     456                 :           0 :         }
     457                 :             : 
     458         [ #  # ]:           0 :         return (remote_exists && !locally_invalidated);
     459                 :           0 : }
     460                 :             : 
     461                 :             : /*
     462                 :             :  * Drop local obsolete slots.
     463                 :             :  *
     464                 :             :  * Drop the local slots that no longer need to be synced i.e. these either do
     465                 :             :  * not exist on the primary or are no longer enabled for failover.
     466                 :             :  *
     467                 :             :  * Additionally, drop any slots that are valid on the primary but got
     468                 :             :  * invalidated on the standby. This situation may occur due to the following
     469                 :             :  * reasons:
     470                 :             :  * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
     471                 :             :  *   records from the restart_lsn of the slot.
     472                 :             :  * - 'primary_slot_name' is temporarily reset to null and the physical slot is
     473                 :             :  *   removed.
     474                 :             :  * These dropped slots will get recreated in next sync-cycle and it is okay to
     475                 :             :  * drop and recreate such slots as long as these are not consumable on the
     476                 :             :  * standby (which is the case currently).
     477                 :             :  *
     478                 :             :  * Note: Change of 'wal_level' on the primary server to a level lower than
     479                 :             :  * logical may also result in slot invalidation and removal on the standby.
     480                 :             :  * This is because such 'wal_level' change is only possible if the logical
     481                 :             :  * slots are removed on the primary server, so it's expected to see the
     482                 :             :  * slots being invalidated and removed on the standby too (and re-created
     483                 :             :  * if they are re-created on the primary server).
     484                 :             :  */
     485                 :             : static void
     486                 :           0 : drop_local_obsolete_slots(List *remote_slot_list)
     487                 :             : {
     488                 :           0 :         List       *local_slots = get_local_synced_slots();
     489                 :             : 
     490   [ #  #  #  #  :           0 :         foreach_ptr(ReplicationSlot, local_slot, local_slots)
             #  #  #  # ]
     491                 :             :         {
     492                 :             :                 /* Drop the local slot if it is not required to be retained. */
     493         [ #  # ]:           0 :                 if (!local_sync_slot_required(local_slot, remote_slot_list))
     494                 :             :                 {
     495                 :           0 :                         bool            synced_slot;
     496                 :             : 
     497                 :             :                         /*
     498                 :             :                          * Use shared lock to prevent a conflict with
     499                 :             :                          * ReplicationSlotsDropDBSlots(), trying to drop the same slot
     500                 :             :                          * during a drop-database operation.
     501                 :             :                          */
     502                 :           0 :                         LockSharedObject(DatabaseRelationId, local_slot->data.database,
     503                 :             :                                                          0, AccessShareLock);
     504                 :             : 
     505                 :             :                         /*
     506                 :             :                          * In the small window between getting the slot to drop and
     507                 :             :                          * locking the database, there is a possibility of a parallel
     508                 :             :                          * database drop by the startup process and the creation of a new
     509                 :             :                          * slot by the user. This new user-created slot may end up using
     510                 :             :                          * the same shared memory as that of 'local_slot'. Thus check if
     511                 :             :                          * local_slot is still the synced one before performing actual
     512                 :             :                          * drop.
     513                 :             :                          */
     514         [ #  # ]:           0 :                         SpinLockAcquire(&local_slot->mutex);
     515         [ #  # ]:           0 :                         synced_slot = local_slot->in_use && local_slot->data.synced;
     516                 :           0 :                         SpinLockRelease(&local_slot->mutex);
     517                 :             : 
     518         [ #  # ]:           0 :                         if (synced_slot)
     519                 :             :                         {
     520                 :           0 :                                 ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
     521                 :           0 :                                 ReplicationSlotDropAcquired();
     522                 :           0 :                         }
     523                 :             : 
     524                 :           0 :                         UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
     525                 :             :                                                            0, AccessShareLock);
     526                 :             : 
     527   [ #  #  #  # ]:           0 :                         ereport(LOG,
     528                 :             :                                         errmsg("dropped replication slot \"%s\" of database with OID %u",
     529                 :             :                                                    NameStr(local_slot->data.name),
     530                 :             :                                                    local_slot->data.database));
     531                 :           0 :                 }
     532                 :           0 :         }
     533                 :           0 : }
     534                 :             : 
     535                 :             : /*
     536                 :             :  * Reserve WAL for the currently active local slot using the specified WAL
     537                 :             :  * location (restart_lsn).
     538                 :             :  *
     539                 :             :  * If the given WAL location has been removed, reserve WAL using the oldest
     540                 :             :  * existing WAL segment.
     541                 :             :  */
     542                 :             : static void
     543                 :           0 : reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
     544                 :             : {
     545                 :           0 :         XLogSegNo       oldest_segno;
     546                 :           0 :         XLogSegNo       segno;
     547                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
     548                 :             : 
     549         [ #  # ]:           0 :         Assert(slot != NULL);
     550         [ #  # ]:           0 :         Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
     551                 :             : 
     552                 :           0 :         while (true)
     553                 :             :         {
     554         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     555                 :           0 :                 slot->data.restart_lsn = restart_lsn;
     556                 :           0 :                 SpinLockRelease(&slot->mutex);
     557                 :             : 
     558                 :             :                 /* Prevent WAL removal as fast as possible */
     559                 :           0 :                 ReplicationSlotsComputeRequiredLSN();
     560                 :             : 
     561                 :           0 :                 XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
     562                 :             : 
     563                 :             :                 /*
     564                 :             :                  * Find the oldest existing WAL segment file.
     565                 :             :                  *
     566                 :             :                  * Normally, we can determine it by using the last removed segment
     567                 :             :                  * number. However, if no WAL segment files have been removed by a
     568                 :             :                  * checkpoint since startup, we need to search for the oldest segment
     569                 :             :                  * file from the current timeline existing in XLOGDIR.
     570                 :             :                  *
     571                 :             :                  * XXX: Currently, we are searching for the oldest segment in the
     572                 :             :                  * current timeline as there is less chance of the slot's restart_lsn
     573                 :             :                  * from being some prior timeline, and even if it happens, in the
     574                 :             :                  * worst case, we will wait to sync till the slot's restart_lsn moved
     575                 :             :                  * to the current timeline.
     576                 :             :                  */
     577                 :           0 :                 oldest_segno = XLogGetLastRemovedSegno() + 1;
     578                 :             : 
     579         [ #  # ]:           0 :                 if (oldest_segno == 1)
     580                 :             :                 {
     581                 :           0 :                         TimeLineID      cur_timeline;
     582                 :             : 
     583                 :           0 :                         GetWalRcvFlushRecPtr(NULL, &cur_timeline);
     584                 :           0 :                         oldest_segno = XLogGetOldestSegno(cur_timeline);
     585                 :           0 :                 }
     586                 :             : 
     587   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
     588                 :             :                          segno, oldest_segno);
     589                 :             : 
     590                 :             :                 /*
     591                 :             :                  * If all required WAL is still there, great, otherwise retry. The
     592                 :             :                  * slot should prevent further removal of WAL, unless there's a
     593                 :             :                  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
     594                 :             :                  * the new restart_lsn above, so normally we should never need to loop
     595                 :             :                  * more than twice.
     596                 :             :                  */
     597         [ #  # ]:           0 :                 if (segno >= oldest_segno)
     598                 :           0 :                         break;
     599                 :             : 
     600                 :             :                 /* Retry using the location of the oldest wal segment */
     601                 :           0 :                 XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
     602                 :             :         }
     603                 :           0 : }
     604                 :             : 
     605                 :             : /*
     606                 :             :  * If the remote restart_lsn and catalog_xmin have caught up with the
     607                 :             :  * local ones, then update the LSNs and persist the local synced slot for
     608                 :             :  * future synchronization; otherwise, do nothing.
     609                 :             :  *
     610                 :             :  * *slot_persistence_pending is set to true if any of the slots fail to
     611                 :             :  * persist.
     612                 :             :  *
     613                 :             :  * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
     614                 :             :  * false.
     615                 :             :  */
     616                 :             : static bool
     617                 :           0 : update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     618                 :             :                                                                          bool *slot_persistence_pending)
     619                 :             : {
     620                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
     621                 :           0 :         bool            found_consistent_snapshot = false;
     622                 :           0 :         bool            remote_slot_precedes = false;
     623                 :             : 
     624                 :             :         /* Slotsync skip stats are handled in function update_local_synced_slot() */
     625                 :           0 :         (void) update_local_synced_slot(remote_slot, remote_dbid,
     626                 :             :                                                                         &found_consistent_snapshot,
     627                 :             :                                                                         &remote_slot_precedes);
     628                 :             : 
     629                 :             :         /*
     630                 :             :          * Check if the primary server has caught up. Refer to the comment atop
     631                 :             :          * the file for details on this check.
     632                 :             :          */
     633         [ #  # ]:           0 :         if (remote_slot_precedes)
     634                 :             :         {
     635                 :             :                 /*
     636                 :             :                  * The remote slot didn't catch up to locally reserved position.
     637                 :             :                  *
     638                 :             :                  * We do not drop the slot because the restart_lsn can be ahead of the
     639                 :             :                  * current location when recreating the slot in the next cycle. It may
     640                 :             :                  * take more time to create such a slot. Therefore, we keep this slot
     641                 :             :                  * and attempt the synchronization in the next cycle.
     642                 :             :                  *
     643                 :             :                  * We also update the slot_persistence_pending parameter, so the SQL
     644                 :             :                  * function can retry.
     645                 :             :                  */
     646         [ #  # ]:           0 :                 if (slot_persistence_pending)
     647                 :           0 :                         *slot_persistence_pending = true;
     648                 :             : 
     649                 :           0 :                 return false;
     650                 :             :         }
     651                 :             : 
     652                 :             :         /*
     653                 :             :          * Don't persist the slot if it cannot reach the consistent point from the
     654                 :             :          * restart_lsn. See comments atop this file.
     655                 :             :          */
     656         [ #  # ]:           0 :         if (!found_consistent_snapshot)
     657                 :             :         {
     658   [ #  #  #  # ]:           0 :                 ereport(LOG,
     659                 :             :                                 errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
     660                 :             :                                 errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
     661                 :             :                                                   LSN_FORMAT_ARGS(slot->data.restart_lsn)));
     662                 :             : 
     663                 :             :                 /* Set this, so that SQL function can retry */
     664         [ #  # ]:           0 :                 if (slot_persistence_pending)
     665                 :           0 :                         *slot_persistence_pending = true;
     666                 :             : 
     667                 :           0 :                 return false;
     668                 :             :         }
     669                 :             : 
     670                 :           0 :         ReplicationSlotPersist();
     671                 :             : 
     672   [ #  #  #  # ]:           0 :         ereport(LOG,
     673                 :             :                         errmsg("newly created replication slot \"%s\" is sync-ready now",
     674                 :             :                                    remote_slot->name));
     675                 :             : 
     676                 :           0 :         return true;
     677                 :           0 : }
     678                 :             : 
     679                 :             : /*
     680                 :             :  * Synchronize a single slot to the given position.
     681                 :             :  *
     682                 :             :  * This creates a new slot if there is no existing one and updates the
     683                 :             :  * metadata of the slot as per the data received from the primary server.
     684                 :             :  *
     685                 :             :  * The slot is created as a temporary slot and stays in the same state until the
     686                 :             :  * remote_slot catches up with locally reserved position and local slot is
     687                 :             :  * updated. The slot is then persisted and is considered as sync-ready for
     688                 :             :  * periodic syncs.
     689                 :             :  *
     690                 :             :  * *slot_persistence_pending is set to true if any of the slots fail to
     691                 :             :  * persist.
     692                 :             :  *
     693                 :             :  * Returns TRUE if the local slot is updated.
     694                 :             :  */
     695                 :             : static bool
     696                 :           0 : synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     697                 :             :                                          bool *slot_persistence_pending)
     698                 :             : {
     699                 :           0 :         ReplicationSlot *slot;
     700                 :           0 :         XLogRecPtr      latestFlushPtr = GetStandbyFlushRecPtr(NULL);
     701                 :           0 :         bool            slot_updated = false;
     702                 :             : 
     703                 :             :         /* Search for the named slot */
     704         [ #  # ]:           0 :         if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
     705                 :             :         {
     706                 :           0 :                 bool            synced;
     707                 :             : 
     708         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     709                 :           0 :                 synced = slot->data.synced;
     710                 :           0 :                 SpinLockRelease(&slot->mutex);
     711                 :             : 
     712                 :             :                 /* User-created slot with the same name exists, raise ERROR. */
     713         [ #  # ]:           0 :                 if (!synced)
     714   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     715                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     716                 :             :                                         errmsg("exiting from slot synchronization because same"
     717                 :             :                                                    " name slot \"%s\" already exists on the standby",
     718                 :             :                                                    remote_slot->name));
     719                 :             : 
     720                 :             :                 /*
     721                 :             :                  * The slot has been synchronized before.
     722                 :             :                  *
     723                 :             :                  * It is important to acquire the slot here before checking
     724                 :             :                  * invalidation. If we don't acquire the slot first, there could be a
     725                 :             :                  * race condition that the local slot could be invalidated just after
     726                 :             :                  * checking the 'invalidated' flag here and we could end up
     727                 :             :                  * overwriting 'invalidated' flag to remote_slot's value. See
     728                 :             :                  * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
     729                 :             :                  * if the slot is not acquired by other processes.
     730                 :             :                  *
     731                 :             :                  * XXX: If it ever turns out that slot acquire/release is costly for
     732                 :             :                  * cases when none of the slot properties is changed then we can do a
     733                 :             :                  * pre-check to ensure that at least one of the slot properties is
     734                 :             :                  * changed before acquiring the slot.
     735                 :             :                  */
     736                 :           0 :                 ReplicationSlotAcquire(remote_slot->name, true, false);
     737                 :             : 
     738         [ #  # ]:           0 :                 Assert(slot == MyReplicationSlot);
     739                 :             : 
     740                 :             :                 /*
     741                 :             :                  * Copy the invalidation cause from remote only if local slot is not
     742                 :             :                  * invalidated locally, we don't want to overwrite existing one.
     743                 :             :                  */
     744   [ #  #  #  # ]:           0 :                 if (slot->data.invalidated == RS_INVAL_NONE &&
     745                 :           0 :                         remote_slot->invalidated != RS_INVAL_NONE)
     746                 :             :                 {
     747         [ #  # ]:           0 :                         SpinLockAcquire(&slot->mutex);
     748                 :           0 :                         slot->data.invalidated = remote_slot->invalidated;
     749                 :           0 :                         SpinLockRelease(&slot->mutex);
     750                 :             : 
     751                 :             :                         /* Make sure the invalidated state persists across server restart */
     752                 :           0 :                         ReplicationSlotMarkDirty();
     753                 :           0 :                         ReplicationSlotSave();
     754                 :             : 
     755                 :           0 :                         slot_updated = true;
     756                 :           0 :                 }
     757                 :             : 
     758                 :             :                 /* Skip the sync of an invalidated slot */
     759         [ #  # ]:           0 :                 if (slot->data.invalidated != RS_INVAL_NONE)
     760                 :             :                 {
     761                 :           0 :                         update_slotsync_skip_stats(SS_SKIP_INVALID);
     762                 :             : 
     763                 :           0 :                         ReplicationSlotRelease();
     764                 :           0 :                         return slot_updated;
     765                 :             :                 }
     766                 :             : 
     767                 :             :                 /*
     768                 :             :                  * Make sure that concerned WAL is received and flushed before syncing
     769                 :             :                  * slot to target lsn received from the primary server.
     770                 :             :                  *
     771                 :             :                  * Report statistics only after the slot has been acquired, ensuring
     772                 :             :                  * it cannot be dropped during the reporting process.
     773                 :             :                  */
     774         [ #  # ]:           0 :                 if (remote_slot->confirmed_lsn > latestFlushPtr)
     775                 :             :                 {
     776                 :           0 :                         update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
     777                 :             : 
     778                 :             :                         /*
     779                 :             :                          * Can get here only if GUC 'synchronized_standby_slots' on the
     780                 :             :                          * primary server was not configured correctly.
     781                 :             :                          */
     782   [ #  #  #  #  :           0 :                         ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
          #  #  #  #  #  
                      # ]
     783                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     784                 :             :                                         errmsg("skipping slot synchronization because the received slot sync"
     785                 :             :                                                    " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
     786                 :             :                                                    LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     787                 :             :                                                    remote_slot->name,
     788                 :             :                                                    LSN_FORMAT_ARGS(latestFlushPtr)));
     789                 :             : 
     790                 :           0 :                         ReplicationSlotRelease();
     791                 :             : 
     792                 :           0 :                         return slot_updated;
     793                 :             :                 }
     794                 :             : 
     795                 :             :                 /* Slot not ready yet, let's attempt to make it sync-ready now. */
     796         [ #  # ]:           0 :                 if (slot->data.persistency == RS_TEMPORARY)
     797                 :             :                 {
     798                 :           0 :                         slot_updated = update_and_persist_local_synced_slot(remote_slot,
     799                 :           0 :                                                                                                                                 remote_dbid,
     800                 :           0 :                                                                                                                                 slot_persistence_pending);
     801                 :           0 :                 }
     802                 :             : 
     803                 :             :                 /* Slot ready for sync, so sync it. */
     804                 :             :                 else
     805                 :             :                 {
     806                 :             :                         /*
     807                 :             :                          * Sanity check: As long as the invalidations are handled
     808                 :             :                          * appropriately as above, this should never happen.
     809                 :             :                          *
     810                 :             :                          * We don't need to check restart_lsn here. See the comments in
     811                 :             :                          * update_local_synced_slot() for details.
     812                 :             :                          */
     813         [ #  # ]:           0 :                         if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
     814   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
     815                 :             :                                                 errmsg_internal("cannot synchronize local slot \"%s\"",
     816                 :             :                                                                                 remote_slot->name),
     817                 :             :                                                 errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
     818                 :             :                                                                                    LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     819                 :             :                                                                                    LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
     820                 :             : 
     821                 :           0 :                         slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
     822                 :             :                                                                                                         NULL, NULL);
     823                 :             :                 }
     824         [ #  # ]:           0 :         }
     825                 :             :         /* Otherwise create the slot first. */
     826                 :             :         else
     827                 :             :         {
     828                 :           0 :                 NameData        plugin_name;
     829                 :           0 :                 TransactionId xmin_horizon = InvalidTransactionId;
     830                 :             : 
     831                 :             :                 /* Skip creating the local slot if remote_slot is invalidated already */
     832         [ #  # ]:           0 :                 if (remote_slot->invalidated != RS_INVAL_NONE)
     833                 :           0 :                         return false;
     834                 :             : 
     835                 :             :                 /*
     836                 :             :                  * We create temporary slots instead of ephemeral slots here because
     837                 :             :                  * we want the slots to survive after releasing them. This is done to
     838                 :             :                  * avoid dropping and re-creating the slots in each synchronization
     839                 :             :                  * cycle if the restart_lsn or catalog_xmin of the remote slot has not
     840                 :             :                  * caught up.
     841                 :             :                  */
     842                 :           0 :                 ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
     843                 :           0 :                                                           remote_slot->two_phase,
     844                 :           0 :                                                           remote_slot->failover,
     845                 :             :                                                           true);
     846                 :             : 
     847                 :             :                 /* For shorter lines. */
     848                 :           0 :                 slot = MyReplicationSlot;
     849                 :             : 
     850                 :             :                 /* Avoid expensive operations while holding a spinlock. */
     851                 :           0 :                 namestrcpy(&plugin_name, remote_slot->plugin);
     852                 :             : 
     853         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     854                 :           0 :                 slot->data.database = remote_dbid;
     855                 :           0 :                 slot->data.plugin = plugin_name;
     856                 :           0 :                 SpinLockRelease(&slot->mutex);
     857                 :             : 
     858                 :           0 :                 reserve_wal_for_local_slot(remote_slot->restart_lsn);
     859                 :             : 
     860                 :           0 :                 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     861                 :           0 :                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     862                 :           0 :                 xmin_horizon = GetOldestSafeDecodingTransactionId(true);
     863         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     864                 :           0 :                 slot->effective_catalog_xmin = xmin_horizon;
     865                 :           0 :                 slot->data.catalog_xmin = xmin_horizon;
     866                 :           0 :                 SpinLockRelease(&slot->mutex);
     867                 :           0 :                 ReplicationSlotsComputeRequiredXmin(true);
     868                 :           0 :                 LWLockRelease(ProcArrayLock);
     869                 :           0 :                 LWLockRelease(ReplicationSlotControlLock);
     870                 :             : 
     871                 :             :                 /*
     872                 :             :                  * Make sure that concerned WAL is received and flushed before syncing
     873                 :             :                  * slot to target lsn received from the primary server.
     874                 :             :                  *
     875                 :             :                  * Report statistics only after the slot has been acquired, ensuring
     876                 :             :                  * it cannot be dropped during the reporting process.
     877                 :             :                  */
     878         [ #  # ]:           0 :                 if (remote_slot->confirmed_lsn > latestFlushPtr)
     879                 :             :                 {
     880                 :           0 :                         update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
     881                 :             : 
     882                 :             :                         /*
     883                 :             :                          * Can get here only if GUC 'synchronized_standby_slots' on the
     884                 :             :                          * primary server was not configured correctly.
     885                 :             :                          */
     886   [ #  #  #  #  :           0 :                         ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
          #  #  #  #  #  
                      # ]
     887                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     888                 :             :                                         errmsg("skipping slot synchronization because the received slot sync"
     889                 :             :                                                    " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
     890                 :             :                                                    LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     891                 :             :                                                    remote_slot->name,
     892                 :             :                                                    LSN_FORMAT_ARGS(latestFlushPtr)));
     893                 :             : 
     894                 :           0 :                         ReplicationSlotRelease();
     895                 :             : 
     896                 :           0 :                         return false;
     897                 :             :                 }
     898                 :             : 
     899                 :           0 :                 update_and_persist_local_synced_slot(remote_slot, remote_dbid,
     900                 :           0 :                                                                                          slot_persistence_pending);
     901                 :             : 
     902                 :           0 :                 slot_updated = true;
     903         [ #  # ]:           0 :         }
     904                 :             : 
     905                 :           0 :         ReplicationSlotRelease();
     906                 :             : 
     907                 :           0 :         return slot_updated;
     908                 :           0 : }
     909                 :             : 
     910                 :             : /*
     911                 :             :  * Fetch remote slots.
     912                 :             :  *
     913                 :             :  * If slot_names is NIL, fetches all failover logical slots from the
     914                 :             :  * primary server, otherwise fetches only the ones with names in slot_names.
     915                 :             :  *
     916                 :             :  * Returns a list of remote slot information structures, or NIL if none
     917                 :             :  * are found.
     918                 :             :  */
     919                 :             : static List *
     920                 :           0 : fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
     921                 :             : {
     922                 :             : #define SLOTSYNC_COLUMN_COUNT 10
     923                 :           0 :         Oid                     slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
     924                 :             :         LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
     925                 :             : 
     926                 :           0 :         WalRcvExecResult *res;
     927                 :           0 :         TupleTableSlot *tupslot;
     928                 :           0 :         List       *remote_slot_list = NIL;
     929                 :           0 :         StringInfoData query;
     930                 :             : 
     931                 :           0 :         initStringInfo(&query);
     932                 :           0 :         appendStringInfoString(&query,
     933                 :             :                                                    "SELECT slot_name, plugin, confirmed_flush_lsn,"
     934                 :             :                                                    " restart_lsn, catalog_xmin, two_phase,"
     935                 :             :                                                    " two_phase_at, failover,"
     936                 :             :                                                    " database, invalidation_reason"
     937                 :             :                                                    " FROM pg_catalog.pg_replication_slots"
     938                 :             :                                                    " WHERE failover and NOT temporary");
     939                 :             : 
     940         [ #  # ]:           0 :         if (slot_names != NIL)
     941                 :             :         {
     942                 :           0 :                 bool            first_slot = true;
     943                 :             : 
     944                 :             :                 /*
     945                 :             :                  * Construct the query to fetch only the specified slots
     946                 :             :                  */
     947                 :           0 :                 appendStringInfoString(&query, " AND slot_name IN (");
     948                 :             : 
     949   [ #  #  #  #  :           0 :                 foreach_ptr(char, slot_name, slot_names)
             #  #  #  # ]
     950                 :             :                 {
     951         [ #  # ]:           0 :                         if (!first_slot)
     952                 :           0 :                                 appendStringInfoString(&query, ", ");
     953                 :             : 
     954                 :           0 :                         appendStringInfo(&query, "%s", quote_literal_cstr(slot_name));
     955                 :           0 :                         first_slot = false;
     956                 :           0 :                 }
     957                 :           0 :                 appendStringInfoChar(&query, ')');
     958                 :           0 :         }
     959                 :             : 
     960                 :             :         /* Execute the query */
     961                 :           0 :         res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
     962                 :           0 :         pfree(query.data);
     963         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
     964   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     965                 :             :                                 errmsg("could not fetch failover logical slots info from the primary server: %s",
     966                 :             :                                            res->err));
     967                 :             : 
     968                 :           0 :         tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     969         [ #  # ]:           0 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
     970                 :             :         {
     971                 :           0 :                 bool            isnull;
     972                 :           0 :                 RemoteSlot *remote_slot = palloc0_object(RemoteSlot);
     973                 :           0 :                 Datum           d;
     974                 :           0 :                 int                     col = 0;
     975                 :             : 
     976                 :           0 :                 remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
     977                 :             :                                                                                                                          &isnull));
     978         [ #  # ]:           0 :                 Assert(!isnull);
     979                 :             : 
     980                 :           0 :                 remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
     981                 :             :                                                                                                                            &isnull));
     982         [ #  # ]:           0 :                 Assert(!isnull);
     983                 :             : 
     984                 :             :                 /*
     985                 :             :                  * It is possible to get null values for LSN and Xmin if slot is
     986                 :             :                  * invalidated on the primary server, so handle accordingly.
     987                 :             :                  */
     988                 :           0 :                 d = slot_getattr(tupslot, ++col, &isnull);
     989         [ #  # ]:           0 :                 remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
     990                 :           0 :                         DatumGetLSN(d);
     991                 :             : 
     992                 :           0 :                 d = slot_getattr(tupslot, ++col, &isnull);
     993         [ #  # ]:           0 :                 remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
     994                 :             : 
     995                 :           0 :                 d = slot_getattr(tupslot, ++col, &isnull);
     996         [ #  # ]:           0 :                 remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
     997                 :           0 :                         DatumGetTransactionId(d);
     998                 :             : 
     999                 :           0 :                 remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
    1000                 :             :                                                                                                                    &isnull));
    1001         [ #  # ]:           0 :                 Assert(!isnull);
    1002                 :             : 
    1003                 :           0 :                 d = slot_getattr(tupslot, ++col, &isnull);
    1004         [ #  # ]:           0 :                 remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
    1005                 :             : 
    1006                 :           0 :                 remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
    1007                 :             :                                                                                                                   &isnull));
    1008         [ #  # ]:           0 :                 Assert(!isnull);
    1009                 :             : 
    1010                 :           0 :                 remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
    1011                 :             :                                                                                                                                  ++col, &isnull));
    1012         [ #  # ]:           0 :                 Assert(!isnull);
    1013                 :             : 
    1014                 :           0 :                 d = slot_getattr(tupslot, ++col, &isnull);
    1015         [ #  # ]:           0 :                 remote_slot->invalidated = isnull ? RS_INVAL_NONE :
    1016                 :           0 :                         GetSlotInvalidationCause(TextDatumGetCString(d));
    1017                 :             : 
    1018                 :             :                 /* Sanity check */
    1019         [ #  # ]:           0 :                 Assert(col == SLOTSYNC_COLUMN_COUNT);
    1020                 :             : 
    1021                 :             :                 /*
    1022                 :             :                  * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
    1023                 :             :                  * slot is valid, that means we have fetched the remote_slot in its
    1024                 :             :                  * RS_EPHEMERAL state. In such a case, don't sync it; we can always
    1025                 :             :                  * sync it in the next sync cycle when the remote_slot is persisted
    1026                 :             :                  * and has valid lsn(s) and xmin values.
    1027                 :             :                  *
    1028                 :             :                  * XXX: In future, if we plan to expose 'slot->data.persistency' in
    1029                 :             :                  * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
    1030                 :             :                  * slots in the first place.
    1031                 :             :                  */
    1032         [ #  # ]:           0 :                 if ((!XLogRecPtrIsValid(remote_slot->restart_lsn) ||
    1033         [ #  # ]:           0 :                          !XLogRecPtrIsValid(remote_slot->confirmed_lsn) ||
    1034         [ #  # ]:           0 :                          !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
    1035                 :           0 :                         remote_slot->invalidated == RS_INVAL_NONE)
    1036                 :           0 :                         pfree(remote_slot);
    1037                 :             :                 else
    1038                 :             :                         /* Create list of remote slots */
    1039                 :           0 :                         remote_slot_list = lappend(remote_slot_list, remote_slot);
    1040                 :             : 
    1041                 :           0 :                 ExecClearTuple(tupslot);
    1042                 :           0 :         }
    1043                 :             : 
    1044                 :           0 :         walrcv_clear_result(res);
    1045                 :             : 
    1046                 :           0 :         return remote_slot_list;
    1047                 :           0 : }
    1048                 :             : 
    1049                 :             : /*
    1050                 :             :  * Synchronize slots.
    1051                 :             :  *
    1052                 :             :  * This function takes a list of remote slots and synchronizes them locally. It
    1053                 :             :  * creates the slots if not present on the standby and updates existing ones.
    1054                 :             :  *
    1055                 :             :  * If slot_persistence_pending is not NULL, it will be set to true if one or
    1056                 :             :  * more slots could not be persisted. This allows callers such as
    1057                 :             :  * SyncReplicationSlots() to retry those slots.
    1058                 :             :  *
    1059                 :             :  * Returns TRUE if any of the slots gets updated in this sync-cycle.
    1060                 :             :  */
    1061                 :             : static bool
    1062                 :           0 : synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
    1063                 :             :                                   bool *slot_persistence_pending)
    1064                 :             : {
    1065                 :           0 :         bool            some_slot_updated = false;
    1066                 :             : 
    1067                 :             :         /* Drop local slots that no longer need to be synced. */
    1068                 :           0 :         drop_local_obsolete_slots(remote_slot_list);
    1069                 :             : 
    1070                 :             :         /* Now sync the slots locally */
    1071   [ #  #  #  #  :           0 :         foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
             #  #  #  # ]
    1072                 :             :         {
    1073                 :           0 :                 Oid                     remote_dbid = get_database_oid(remote_slot->database, false);
    1074                 :             : 
    1075                 :             :                 /*
    1076                 :             :                  * Use shared lock to prevent a conflict with
    1077                 :             :                  * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
    1078                 :             :                  * a drop-database operation.
    1079                 :             :                  */
    1080                 :           0 :                 LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
    1081                 :             : 
    1082                 :           0 :                 some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid,
    1083                 :           0 :                                                                                                   slot_persistence_pending);
    1084                 :             : 
    1085                 :           0 :                 UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
    1086                 :           0 :         }
    1087                 :             : 
    1088                 :           0 :         return some_slot_updated;
    1089                 :           0 : }
    1090                 :             : 
    1091                 :             : /*
    1092                 :             :  * Checks the remote server info.
    1093                 :             :  *
    1094                 :             :  * We ensure that the 'primary_slot_name' exists on the remote server and the
    1095                 :             :  * remote server is not a standby node.
    1096                 :             :  */
    1097                 :             : static void
    1098                 :           0 : validate_remote_info(WalReceiverConn *wrconn)
    1099                 :             : {
    1100                 :             : #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
    1101                 :           0 :         WalRcvExecResult *res;
    1102                 :           0 :         Oid                     slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
    1103                 :           0 :         StringInfoData cmd;
    1104                 :           0 :         bool            isnull;
    1105                 :           0 :         TupleTableSlot *tupslot;
    1106                 :           0 :         bool            remote_in_recovery;
    1107                 :           0 :         bool            primary_slot_valid;
    1108                 :           0 :         bool            started_tx = false;
    1109                 :             : 
    1110                 :           0 :         initStringInfo(&cmd);
    1111                 :           0 :         appendStringInfo(&cmd,
    1112                 :             :                                          "SELECT pg_is_in_recovery(), count(*) = 1"
    1113                 :             :                                          " FROM pg_catalog.pg_replication_slots"
    1114                 :             :                                          " WHERE slot_type='physical' AND slot_name=%s",
    1115                 :           0 :                                          quote_literal_cstr(PrimarySlotName));
    1116                 :             : 
    1117                 :             :         /* The syscache access in walrcv_exec() needs a transaction env. */
    1118         [ #  # ]:           0 :         if (!IsTransactionState())
    1119                 :             :         {
    1120                 :           0 :                 StartTransactionCommand();
    1121                 :           0 :                 started_tx = true;
    1122                 :           0 :         }
    1123                 :             : 
    1124                 :           0 :         res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
    1125                 :           0 :         pfree(cmd.data);
    1126                 :             : 
    1127         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
    1128   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1129                 :             :                                 errmsg("could not fetch primary slot name \"%s\" info from the primary server: %s",
    1130                 :             :                                            PrimarySlotName, res->err),
    1131                 :             :                                 errhint("Check if \"primary_slot_name\" is configured correctly."));
    1132                 :             : 
    1133                 :           0 :         tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1134         [ #  # ]:           0 :         if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
    1135   [ #  #  #  # ]:           0 :                 elog(ERROR,
    1136                 :             :                          "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
    1137                 :             : 
    1138                 :           0 :         remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
    1139         [ #  # ]:           0 :         Assert(!isnull);
    1140                 :             : 
    1141                 :             :         /*
    1142                 :             :          * Slot sync is currently not supported on a cascading standby. This is
    1143                 :             :          * because if we allow it, the primary server needs to wait for all the
    1144                 :             :          * cascading standbys, otherwise, logical subscribers can still be ahead
    1145                 :             :          * of one of the cascading standbys which we plan to promote. Thus, to
    1146                 :             :          * avoid this additional complexity, we restrict it for the time being.
    1147                 :             :          */
    1148         [ #  # ]:           0 :         if (remote_in_recovery)
    1149   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1150                 :             :                                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1151                 :             :                                 errmsg("cannot synchronize replication slots from a standby server"));
    1152                 :             : 
    1153                 :           0 :         primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
    1154         [ #  # ]:           0 :         Assert(!isnull);
    1155                 :             : 
    1156         [ #  # ]:           0 :         if (!primary_slot_valid)
    1157   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1158                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1159                 :             :                 /* translator: second %s is a GUC variable name */
    1160                 :             :                                 errmsg("replication slot \"%s\" specified by \"%s\" does not exist on primary server",
    1161                 :             :                                            PrimarySlotName, "primary_slot_name"));
    1162                 :             : 
    1163                 :           0 :         ExecClearTuple(tupslot);
    1164                 :           0 :         walrcv_clear_result(res);
    1165                 :             : 
    1166         [ #  # ]:           0 :         if (started_tx)
    1167                 :           0 :                 CommitTransactionCommand();
    1168                 :           0 : }
    1169                 :             : 
    1170                 :             : /*
    1171                 :             :  * Checks if dbname is specified in 'primary_conninfo'.
    1172                 :             :  *
    1173                 :             :  * Error out if not specified otherwise return it.
    1174                 :             :  */
    1175                 :             : char *
    1176                 :           0 : CheckAndGetDbnameFromConninfo(void)
    1177                 :             : {
    1178                 :           0 :         char       *dbname;
    1179                 :             : 
    1180                 :             :         /*
    1181                 :             :          * The slot synchronization needs a database connection for walrcv_exec to
    1182                 :             :          * work.
    1183                 :             :          */
    1184                 :           0 :         dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
    1185         [ #  # ]:           0 :         if (dbname == NULL)
    1186   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1187                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1188                 :             : 
    1189                 :             :                 /*
    1190                 :             :                  * translator: first %s is a connection option; second %s is a GUC
    1191                 :             :                  * variable name
    1192                 :             :                  */
    1193                 :             :                                 errmsg("replication slot synchronization requires \"%s\" to be specified in \"%s\"",
    1194                 :             :                                            "dbname", "primary_conninfo"));
    1195                 :           0 :         return dbname;
    1196                 :           0 : }
    1197                 :             : 
    1198                 :             : /*
    1199                 :             :  * Return true if all necessary GUCs for slot synchronization are set
    1200                 :             :  * appropriately, otherwise, return false.
    1201                 :             :  */
    1202                 :             : bool
    1203                 :           0 : ValidateSlotSyncParams(int elevel)
    1204                 :             : {
    1205                 :             :         /*
    1206                 :             :          * Logical slot sync/creation requires logical decoding to be enabled.
    1207                 :             :          */
    1208         [ #  # ]:           0 :         if (!IsLogicalDecodingEnabled())
    1209                 :             :         {
    1210   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    1211                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1212                 :             :                                 errmsg("replication slot synchronization requires \"effective_wal_level\" >= \"logical\" on the primary"),
    1213                 :             :                                 errhint("To enable logical decoding on primary, set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\"."));
    1214                 :             : 
    1215                 :           0 :                 return false;
    1216                 :             :         }
    1217                 :             : 
    1218                 :             :         /*
    1219                 :             :          * A physical replication slot(primary_slot_name) is required on the
    1220                 :             :          * primary to ensure that the rows needed by the standby are not removed
    1221                 :             :          * after restarting, so that the synchronized slot on the standby will not
    1222                 :             :          * be invalidated.
    1223                 :             :          */
    1224   [ #  #  #  # ]:           0 :         if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
    1225                 :             :         {
    1226   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    1227                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1228                 :             :                 /* translator: %s is a GUC variable name */
    1229                 :             :                                 errmsg("replication slot synchronization requires \"%s\" to be set", "primary_slot_name"));
    1230                 :           0 :                 return false;
    1231                 :             :         }
    1232                 :             : 
    1233                 :             :         /*
    1234                 :             :          * hot_standby_feedback must be enabled to cooperate with the physical
    1235                 :             :          * replication slot, which allows informing the primary about the xmin and
    1236                 :             :          * catalog_xmin values on the standby.
    1237                 :             :          */
    1238         [ #  # ]:           0 :         if (!hot_standby_feedback)
    1239                 :             :         {
    1240   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    1241                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1242                 :             :                 /* translator: %s is a GUC variable name */
    1243                 :             :                                 errmsg("replication slot synchronization requires \"%s\" to be enabled",
    1244                 :             :                                            "hot_standby_feedback"));
    1245                 :           0 :                 return false;
    1246                 :             :         }
    1247                 :             : 
    1248                 :             :         /*
    1249                 :             :          * The primary_conninfo is required to make connection to primary for
    1250                 :             :          * getting slots information.
    1251                 :             :          */
    1252   [ #  #  #  # ]:           0 :         if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
    1253                 :             :         {
    1254   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    1255                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1256                 :             :                 /* translator: %s is a GUC variable name */
    1257                 :             :                                 errmsg("replication slot synchronization requires \"%s\" to be set",
    1258                 :             :                                            "primary_conninfo"));
    1259                 :           0 :                 return false;
    1260                 :             :         }
    1261                 :             : 
    1262                 :           0 :         return true;
    1263                 :           0 : }
    1264                 :             : 
    1265                 :             : /*
    1266                 :             :  * Re-read the config file for slot synchronization.
    1267                 :             :  *
    1268                 :             :  * Exit or throw error if relevant GUCs have changed depending on whether
    1269                 :             :  * called from slot sync worker or from the SQL function pg_sync_replication_slots()
    1270                 :             :  */
    1271                 :             : static void
    1272                 :           0 : slotsync_reread_config(void)
    1273                 :             : {
    1274                 :           0 :         char       *old_primary_conninfo = pstrdup(PrimaryConnInfo);
    1275                 :           0 :         char       *old_primary_slotname = pstrdup(PrimarySlotName);
    1276                 :           0 :         bool            old_sync_replication_slots = sync_replication_slots;
    1277                 :           0 :         bool            old_hot_standby_feedback = hot_standby_feedback;
    1278                 :           0 :         bool            conninfo_changed;
    1279                 :           0 :         bool            primary_slotname_changed;
    1280                 :           0 :         bool            is_slotsync_worker = AmLogicalSlotSyncWorkerProcess();
    1281                 :           0 :         bool            parameter_changed = false;
    1282                 :             : 
    1283         [ #  # ]:           0 :         if (is_slotsync_worker)
    1284         [ #  # ]:           0 :                 Assert(sync_replication_slots);
    1285                 :             : 
    1286                 :           0 :         ConfigReloadPending = false;
    1287                 :           0 :         ProcessConfigFile(PGC_SIGHUP);
    1288                 :             : 
    1289                 :           0 :         conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
    1290                 :           0 :         primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
    1291                 :           0 :         pfree(old_primary_conninfo);
    1292                 :           0 :         pfree(old_primary_slotname);
    1293                 :             : 
    1294         [ #  # ]:           0 :         if (old_sync_replication_slots != sync_replication_slots)
    1295                 :             :         {
    1296         [ #  # ]:           0 :                 if (is_slotsync_worker)
    1297                 :             :                 {
    1298   [ #  #  #  # ]:           0 :                         ereport(LOG,
    1299                 :             :                         /* translator: %s is a GUC variable name */
    1300                 :             :                                         errmsg("replication slot synchronization worker will stop because \"%s\" is disabled",
    1301                 :             :                                                    "sync_replication_slots"));
    1302                 :             : 
    1303                 :           0 :                         proc_exit(0);
    1304                 :             :                 }
    1305                 :             : 
    1306                 :           0 :                 parameter_changed = true;
    1307                 :           0 :         }
    1308                 :             :         else
    1309                 :             :         {
    1310         [ #  # ]:           0 :                 if (conninfo_changed ||
    1311   [ #  #  #  # ]:           0 :                         primary_slotname_changed ||
    1312                 :           0 :                         (old_hot_standby_feedback != hot_standby_feedback))
    1313                 :             :                 {
    1314                 :             : 
    1315         [ #  # ]:           0 :                         if (is_slotsync_worker)
    1316                 :             :                         {
    1317   [ #  #  #  # ]:           0 :                                 ereport(LOG,
    1318                 :             :                                                 errmsg("replication slot synchronization worker will restart because of a parameter change"));
    1319                 :             : 
    1320                 :             :                                 /*
    1321                 :             :                                  * Reset the last-start time for this worker so that the
    1322                 :             :                                  * postmaster can restart it without waiting for
    1323                 :             :                                  * SLOTSYNC_RESTART_INTERVAL_SEC.
    1324                 :             :                                  */
    1325                 :           0 :                                 SlotSyncCtx->last_start_time = 0;
    1326                 :             : 
    1327                 :           0 :                                 proc_exit(0);
    1328                 :             :                         }
    1329                 :             : 
    1330                 :           0 :                         parameter_changed = true;
    1331                 :           0 :                 }
    1332                 :             :         }
    1333                 :             : 
    1334                 :             :         /*
    1335                 :             :          * If we have reached here with a parameter change, we must be running in
    1336                 :             :          * SQL function, emit error in such a case.
    1337                 :             :          */
    1338         [ #  # ]:           0 :         if (parameter_changed)
    1339                 :             :         {
    1340         [ #  # ]:           0 :                 Assert(!is_slotsync_worker);
    1341   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1342                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1343                 :             :                                 errmsg("replication slot synchronization will stop because of a parameter change"));
    1344                 :           0 :         }
    1345                 :             : 
    1346                 :           0 : }
    1347                 :             : 
    1348                 :             : /*
    1349                 :             :  * Interrupt handler for process performing slot synchronization.
    1350                 :             :  */
    1351                 :             : static void
    1352                 :           0 : ProcessSlotSyncInterrupts(void)
    1353                 :             : {
    1354         [ #  # ]:           0 :         CHECK_FOR_INTERRUPTS();
    1355                 :             : 
    1356         [ #  # ]:           0 :         if (SlotSyncCtx->stopSignaled)
    1357                 :             :         {
    1358         [ #  # ]:           0 :                 if (AmLogicalSlotSyncWorkerProcess())
    1359                 :             :                 {
    1360   [ #  #  #  # ]:           0 :                         ereport(LOG,
    1361                 :             :                                         errmsg("replication slot synchronization worker will stop because promotion is triggered"));
    1362                 :             : 
    1363                 :           0 :                         proc_exit(0);
    1364                 :             :                 }
    1365                 :             :                 else
    1366                 :             :                 {
    1367                 :             :                         /*
    1368                 :             :                          * For the backend executing SQL function
    1369                 :             :                          * pg_sync_replication_slots().
    1370                 :             :                          */
    1371   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1372                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1373                 :             :                                         errmsg("replication slot synchronization will stop because promotion is triggered"));
    1374                 :             :                 }
    1375                 :           0 :         }
    1376                 :             : 
    1377         [ #  # ]:           0 :         if (ConfigReloadPending)
    1378                 :           0 :                 slotsync_reread_config();
    1379                 :           0 : }
    1380                 :             : 
    1381                 :             : /*
    1382                 :             :  * Connection cleanup function for slotsync worker.
    1383                 :             :  *
    1384                 :             :  * Called on slotsync worker exit.
    1385                 :             :  */
    1386                 :             : static void
    1387                 :           0 : slotsync_worker_disconnect(int code, Datum arg)
    1388                 :             : {
    1389                 :           0 :         WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1390                 :             : 
    1391                 :           0 :         walrcv_disconnect(wrconn);
    1392                 :           0 : }
    1393                 :             : 
    1394                 :             : /*
    1395                 :             :  * Cleanup function for slotsync worker.
    1396                 :             :  *
    1397                 :             :  * Called on slotsync worker exit.
    1398                 :             :  */
    1399                 :             : static void
    1400                 :           0 : slotsync_worker_onexit(int code, Datum arg)
    1401                 :             : {
    1402                 :             :         /*
    1403                 :             :          * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1404                 :             :          *
    1405                 :             :          * The startup process during promotion invokes ShutDownSlotSync() which
    1406                 :             :          * waits for slot sync to finish and it does that by checking the
    1407                 :             :          * 'syncing' flag. Thus the slot sync worker must be done with slots'
    1408                 :             :          * release and cleanup to avoid any dangling temporary slots or active
    1409                 :             :          * slots before it marks itself as finished syncing.
    1410                 :             :          */
    1411                 :             : 
    1412                 :             :         /* Make sure active replication slots are released */
    1413         [ #  # ]:           0 :         if (MyReplicationSlot != NULL)
    1414                 :           0 :                 ReplicationSlotRelease();
    1415                 :             : 
    1416                 :             :         /* Also cleanup the temporary slots. */
    1417                 :           0 :         ReplicationSlotCleanup(false);
    1418                 :             : 
    1419         [ #  # ]:           0 :         SpinLockAcquire(&SlotSyncCtx->mutex);
    1420                 :             : 
    1421                 :           0 :         SlotSyncCtx->pid = InvalidPid;
    1422                 :             : 
    1423                 :             :         /*
    1424                 :             :          * If syncing_slots is true, it indicates that the process errored out
    1425                 :             :          * without resetting the flag. So, we need to clean up shared memory and
    1426                 :             :          * reset the flag here.
    1427                 :             :          */
    1428         [ #  # ]:           0 :         if (syncing_slots)
    1429                 :             :         {
    1430                 :           0 :                 SlotSyncCtx->syncing = false;
    1431                 :           0 :                 syncing_slots = false;
    1432                 :           0 :         }
    1433                 :             : 
    1434                 :           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1435                 :           0 : }
    1436                 :             : 
    1437                 :             : /*
    1438                 :             :  * Sleep for long enough that we believe it's likely that the slots on primary
    1439                 :             :  * get updated.
    1440                 :             :  *
    1441                 :             :  * If there is no slot activity the wait time between sync-cycles will double
    1442                 :             :  * (to a maximum of 30s). If there is some slot activity the wait time between
    1443                 :             :  * sync-cycles is reset to the minimum (200ms).
    1444                 :             :  */
    1445                 :             : static void
    1446                 :           0 : wait_for_slot_activity(bool some_slot_updated)
    1447                 :             : {
    1448                 :           0 :         int                     rc;
    1449                 :             : 
    1450         [ #  # ]:           0 :         if (!some_slot_updated)
    1451                 :             :         {
    1452                 :             :                 /*
    1453                 :             :                  * No slots were updated, so double the sleep time, but not beyond the
    1454                 :             :                  * maximum allowable value.
    1455                 :             :                  */
    1456         [ #  # ]:           0 :                 sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
    1457                 :           0 :         }
    1458                 :             :         else
    1459                 :             :         {
    1460                 :             :                 /*
    1461                 :             :                  * Some slots were updated since the last sleep, so reset the sleep
    1462                 :             :                  * time.
    1463                 :             :                  */
    1464                 :           0 :                 sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
    1465                 :             :         }
    1466                 :             : 
    1467                 :           0 :         rc = WaitLatch(MyLatch,
    1468                 :             :                                    WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1469                 :           0 :                                    sleep_ms,
    1470                 :             :                                    WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
    1471                 :             : 
    1472         [ #  # ]:           0 :         if (rc & WL_LATCH_SET)
    1473                 :           0 :                 ResetLatch(MyLatch);
    1474                 :           0 : }
    1475                 :             : 
    1476                 :             : /*
    1477                 :             :  * Emit an error if a concurrent sync call is in progress.
    1478                 :             :  * Otherwise, advertise that a sync is in progress.
    1479                 :             :  */
    1480                 :             : static void
    1481                 :           0 : check_and_set_sync_info(pid_t sync_process_pid)
    1482                 :             : {
    1483         [ #  # ]:           0 :         SpinLockAcquire(&SlotSyncCtx->mutex);
    1484                 :             : 
    1485         [ #  # ]:           0 :         if (SlotSyncCtx->syncing)
    1486                 :             :         {
    1487                 :           0 :                 SpinLockRelease(&SlotSyncCtx->mutex);
    1488   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1489                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1490                 :             :                                 errmsg("cannot synchronize replication slots concurrently"));
    1491                 :           0 :         }
    1492                 :             : 
    1493                 :             :         /* The pid must not be already assigned in SlotSyncCtx */
    1494         [ #  # ]:           0 :         Assert(SlotSyncCtx->pid == InvalidPid);
    1495                 :             : 
    1496                 :           0 :         SlotSyncCtx->syncing = true;
    1497                 :             : 
    1498                 :             :         /*
    1499                 :             :          * Advertise the required PID so that the startup process can kill the
    1500                 :             :          * slot sync process on promotion.
    1501                 :             :          */
    1502                 :           0 :         SlotSyncCtx->pid = sync_process_pid;
    1503                 :             : 
    1504                 :           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1505                 :             : 
    1506                 :           0 :         syncing_slots = true;
    1507                 :           0 : }
    1508                 :             : 
    1509                 :             : /*
    1510                 :             :  * Reset syncing flag.
    1511                 :             :  */
    1512                 :             : static void
    1513                 :           0 : reset_syncing_flag(void)
    1514                 :             : {
    1515         [ #  # ]:           0 :         SpinLockAcquire(&SlotSyncCtx->mutex);
    1516                 :           0 :         SlotSyncCtx->syncing = false;
    1517                 :           0 :         SlotSyncCtx->pid = InvalidPid;
    1518                 :           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1519                 :             : 
    1520                 :           0 :         syncing_slots = false;
    1521                 :           0 : }
    1522                 :             : 
    1523                 :             : /*
    1524                 :             :  * The main loop of our worker process.
    1525                 :             :  *
    1526                 :             :  * It connects to the primary server, fetches logical failover slots
    1527                 :             :  * information periodically in order to create and sync the slots.
    1528                 :             :  *
    1529                 :             :  * Note: If any changes are made here, check if the corresponding SQL
    1530                 :             :  * function logic in SyncReplicationSlots() also needs to be changed.
    1531                 :             :  */
    1532                 :             : void
    1533                 :           0 : ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
    1534                 :             : {
    1535                 :           0 :         WalReceiverConn *wrconn = NULL;
    1536                 :           0 :         char       *dbname;
    1537                 :           0 :         char       *err;
    1538                 :           0 :         sigjmp_buf      local_sigjmp_buf;
    1539                 :           0 :         StringInfoData app_name;
    1540                 :             : 
    1541         [ #  # ]:           0 :         Assert(startup_data_len == 0);
    1542                 :             : 
    1543                 :           0 :         MyBackendType = B_SLOTSYNC_WORKER;
    1544                 :             : 
    1545                 :           0 :         init_ps_display(NULL);
    1546                 :             : 
    1547         [ #  # ]:           0 :         Assert(GetProcessingMode() == InitProcessing);
    1548                 :             : 
    1549                 :             :         /*
    1550                 :             :          * Create a per-backend PGPROC struct in shared memory.  We must do this
    1551                 :             :          * before we access any shared memory.
    1552                 :             :          */
    1553                 :           0 :         InitProcess();
    1554                 :             : 
    1555                 :             :         /*
    1556                 :             :          * Early initialization.
    1557                 :             :          */
    1558                 :           0 :         BaseInit();
    1559                 :             : 
    1560         [ #  # ]:           0 :         Assert(SlotSyncCtx != NULL);
    1561                 :             : 
    1562                 :             :         /*
    1563                 :             :          * If an exception is encountered, processing resumes here.
    1564                 :             :          *
    1565                 :             :          * We just need to clean up, report the error, and go away.
    1566                 :             :          *
    1567                 :             :          * If we do not have this handling here, then since this worker process
    1568                 :             :          * operates at the bottom of the exception stack, ERRORs turn into FATALs.
    1569                 :             :          * Therefore, we create our own exception handler to catch ERRORs.
    1570                 :             :          */
    1571         [ #  # ]:           0 :         if (sigsetjmp(local_sigjmp_buf, 1) != 0)
    1572                 :             :         {
    1573                 :             :                 /* since not using PG_TRY, must reset error stack by hand */
    1574                 :           0 :                 error_context_stack = NULL;
    1575                 :             : 
    1576                 :             :                 /* Prevents interrupts while cleaning up */
    1577                 :           0 :                 HOLD_INTERRUPTS();
    1578                 :             : 
    1579                 :             :                 /* Report the error to the server log */
    1580                 :           0 :                 EmitErrorReport();
    1581                 :             : 
    1582                 :             :                 /*
    1583                 :             :                  * We can now go away.  Note that because we called InitProcess, a
    1584                 :             :                  * callback was registered to do ProcKill, which will clean up
    1585                 :             :                  * necessary state.
    1586                 :             :                  */
    1587                 :           0 :                 proc_exit(0);
    1588                 :             :         }
    1589                 :             : 
    1590                 :             :         /* We can now handle ereport(ERROR) */
    1591                 :           0 :         PG_exception_stack = &local_sigjmp_buf;
    1592                 :             : 
    1593                 :             :         /* Setup signal handling */
    1594                 :           0 :         pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1595                 :           0 :         pqsignal(SIGINT, StatementCancelHandler);
    1596                 :           0 :         pqsignal(SIGTERM, die);
    1597                 :           0 :         pqsignal(SIGFPE, FloatExceptionHandler);
    1598                 :           0 :         pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    1599                 :           0 :         pqsignal(SIGUSR2, SIG_IGN);
    1600                 :           0 :         pqsignal(SIGPIPE, SIG_IGN);
    1601                 :           0 :         pqsignal(SIGCHLD, SIG_DFL);
    1602                 :             : 
    1603                 :           0 :         check_and_set_sync_info(MyProcPid);
    1604                 :             : 
    1605   [ #  #  #  # ]:           0 :         ereport(LOG, errmsg("slot sync worker started"));
    1606                 :             : 
    1607                 :             :         /* Register it as soon as SlotSyncCtx->pid is initialized. */
    1608                 :           0 :         before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
    1609                 :             : 
    1610                 :             :         /*
    1611                 :             :          * Establishes SIGALRM handler and initialize timeout module. It is needed
    1612                 :             :          * by InitPostgres to register different timeouts.
    1613                 :             :          */
    1614                 :           0 :         InitializeTimeouts();
    1615                 :             : 
    1616                 :             :         /* Load the libpq-specific functions */
    1617                 :           0 :         load_file("libpqwalreceiver", false);
    1618                 :             : 
    1619                 :             :         /*
    1620                 :             :          * Unblock signals (they were blocked when the postmaster forked us)
    1621                 :             :          */
    1622                 :           0 :         sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
    1623                 :             : 
    1624                 :             :         /*
    1625                 :             :          * Set always-secure search path, so malicious users can't redirect user
    1626                 :             :          * code (e.g. operators).
    1627                 :             :          *
    1628                 :             :          * It's not strictly necessary since we won't be scanning or writing to
    1629                 :             :          * any user table locally, but it's good to retain it here for added
    1630                 :             :          * precaution.
    1631                 :             :          */
    1632                 :           0 :         SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    1633                 :             : 
    1634                 :           0 :         dbname = CheckAndGetDbnameFromConninfo();
    1635                 :             : 
    1636                 :             :         /*
    1637                 :             :          * Connect to the database specified by the user in primary_conninfo. We
    1638                 :             :          * need a database connection for walrcv_exec to work which we use to
    1639                 :             :          * fetch slot information from the remote node. See comments atop
    1640                 :             :          * libpqrcv_exec.
    1641                 :             :          *
    1642                 :             :          * We do not specify a specific user here since the slot sync worker will
    1643                 :             :          * operate as a superuser. This is safe because the slot sync worker does
    1644                 :             :          * not interact with user tables, eliminating the risk of executing
    1645                 :             :          * arbitrary code within triggers.
    1646                 :             :          */
    1647                 :           0 :         InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
    1648                 :             : 
    1649                 :           0 :         SetProcessingMode(NormalProcessing);
    1650                 :             : 
    1651                 :           0 :         initStringInfo(&app_name);
    1652         [ #  # ]:           0 :         if (cluster_name[0])
    1653                 :           0 :                 appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
    1654                 :             :         else
    1655                 :           0 :                 appendStringInfoString(&app_name, "slotsync worker");
    1656                 :             : 
    1657                 :             :         /*
    1658                 :             :          * Establish the connection to the primary server for slot
    1659                 :             :          * synchronization.
    1660                 :             :          */
    1661                 :           0 :         wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
    1662                 :             :                                                         app_name.data, &err);
    1663                 :             : 
    1664         [ #  # ]:           0 :         if (!wrconn)
    1665   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1666                 :             :                                 errcode(ERRCODE_CONNECTION_FAILURE),
    1667                 :             :                                 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
    1668                 :             :                                            app_name.data, err));
    1669                 :             : 
    1670                 :           0 :         pfree(app_name.data);
    1671                 :             : 
    1672                 :             :         /*
    1673                 :             :          * Register the disconnection callback.
    1674                 :             :          *
    1675                 :             :          * XXX: This can be combined with previous cleanup registration of
    1676                 :             :          * slotsync_worker_onexit() but that will need the connection to be made
    1677                 :             :          * global and we want to avoid introducing global for this purpose.
    1678                 :             :          */
    1679                 :           0 :         before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
    1680                 :             : 
    1681                 :             :         /*
    1682                 :             :          * Using the specified primary server connection, check that we are not a
    1683                 :             :          * cascading standby and slot configured in 'primary_slot_name' exists on
    1684                 :             :          * the primary server.
    1685                 :             :          */
    1686                 :           0 :         validate_remote_info(wrconn);
    1687                 :             : 
    1688                 :             :         /* Main loop to synchronize slots */
    1689                 :           0 :         for (;;)
    1690                 :             :         {
    1691                 :           0 :                 bool            some_slot_updated = false;
    1692                 :           0 :                 bool            started_tx = false;
    1693                 :           0 :                 List       *remote_slots;
    1694                 :             : 
    1695                 :           0 :                 ProcessSlotSyncInterrupts();
    1696                 :             : 
    1697                 :             :                 /*
    1698                 :             :                  * The syscache access in fetch_remote_slots() needs a transaction
    1699                 :             :                  * env.
    1700                 :             :                  */
    1701         [ #  # ]:           0 :                 if (!IsTransactionState())
    1702                 :             :                 {
    1703                 :           0 :                         StartTransactionCommand();
    1704                 :           0 :                         started_tx = true;
    1705                 :           0 :                 }
    1706                 :             : 
    1707                 :           0 :                 remote_slots = fetch_remote_slots(wrconn, NIL);
    1708                 :           0 :                 some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL);
    1709                 :           0 :                 list_free_deep(remote_slots);
    1710                 :             : 
    1711         [ #  # ]:           0 :                 if (started_tx)
    1712                 :           0 :                         CommitTransactionCommand();
    1713                 :             : 
    1714                 :           0 :                 wait_for_slot_activity(some_slot_updated);
    1715                 :           0 :         }
    1716                 :             : 
    1717                 :             :         /*
    1718                 :             :          * The slot sync worker can't get here because it will only stop when it
    1719                 :             :          * receives a stop request from the startup process, or when there is an
    1720                 :             :          * error.
    1721                 :             :          */
    1722                 :             :         Assert(false);
    1723                 :             : }
    1724                 :             : 
    1725                 :             : /*
    1726                 :             :  * Update the inactive_since property for synced slots.
    1727                 :             :  *
    1728                 :             :  * Note that this function is currently called when we shutdown the slot
    1729                 :             :  * sync machinery.
    1730                 :             :  */
    1731                 :             : static void
    1732                 :           4 : update_synced_slots_inactive_since(void)
    1733                 :             : {
    1734                 :           4 :         TimestampTz now = 0;
    1735                 :             : 
    1736                 :             :         /*
    1737                 :             :          * We need to update inactive_since only when we are promoting standby to
    1738                 :             :          * correctly interpret the inactive_since if the standby gets promoted
    1739                 :             :          * without a restart. We don't want the slots to appear inactive for a
    1740                 :             :          * long time after promotion if they haven't been synchronized recently.
    1741                 :             :          * Whoever acquires the slot, i.e., makes the slot active, will reset it.
    1742                 :             :          */
    1743         [ -  + ]:           4 :         if (!StandbyMode)
    1744                 :           4 :                 return;
    1745                 :             : 
    1746                 :             :         /* The slot sync worker or the SQL function mustn't be running by now */
    1747         [ #  # ]:           0 :         Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
    1748                 :             : 
    1749                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1750                 :             : 
    1751         [ #  # ]:           0 :         for (int i = 0; i < max_replication_slots; i++)
    1752                 :             :         {
    1753                 :           0 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1754                 :             : 
    1755                 :             :                 /* Check if it is a synchronized slot */
    1756   [ #  #  #  # ]:           0 :                 if (s->in_use && s->data.synced)
    1757                 :             :                 {
    1758         [ #  # ]:           0 :                         Assert(SlotIsLogical(s));
    1759                 :             : 
    1760                 :             :                         /* The slot must not be acquired by any process */
    1761         [ #  # ]:           0 :                         Assert(s->active_pid == 0);
    1762                 :             : 
    1763                 :             :                         /* Use the same inactive_since time for all the slots. */
    1764         [ #  # ]:           0 :                         if (now == 0)
    1765                 :           0 :                                 now = GetCurrentTimestamp();
    1766                 :             : 
    1767                 :           0 :                         ReplicationSlotSetInactiveSince(s, now, true);
    1768                 :           0 :                 }
    1769                 :           0 :         }
    1770                 :             : 
    1771                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
    1772         [ -  + ]:           4 : }
    1773                 :             : 
    1774                 :             : /*
    1775                 :             :  * Shut down slot synchronization.
    1776                 :             :  *
    1777                 :             :  * This function sets stopSignaled=true and wakes up the slot sync process
    1778                 :             :  * (either worker or backend running the SQL function pg_sync_replication_slots())
    1779                 :             :  * so that worker can exit or the SQL function pg_sync_replication_slots() can
    1780                 :             :  * finish. It also waits till the slot sync worker has exited or
    1781                 :             :  * pg_sync_replication_slots() has finished.
    1782                 :             :  */
    1783                 :             : void
    1784                 :           4 : ShutDownSlotSync(void)
    1785                 :             : {
    1786                 :           4 :         pid_t           sync_process_pid;
    1787                 :             : 
    1788         [ -  + ]:           4 :         SpinLockAcquire(&SlotSyncCtx->mutex);
    1789                 :             : 
    1790                 :           4 :         SlotSyncCtx->stopSignaled = true;
    1791                 :             : 
    1792                 :             :         /*
    1793                 :             :          * Return if neither the slot sync worker is running nor the function
    1794                 :             :          * pg_sync_replication_slots() is executing.
    1795                 :             :          */
    1796         [ -  + ]:           4 :         if (!SlotSyncCtx->syncing)
    1797                 :             :         {
    1798                 :           4 :                 SpinLockRelease(&SlotSyncCtx->mutex);
    1799                 :           4 :                 update_synced_slots_inactive_since();
    1800                 :           4 :                 return;
    1801                 :             :         }
    1802                 :             : 
    1803                 :           0 :         sync_process_pid = SlotSyncCtx->pid;
    1804                 :             : 
    1805                 :           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1806                 :             : 
    1807                 :             :         /*
    1808                 :             :          * Signal process doing slotsync, if any. The process will stop upon
    1809                 :             :          * detecting that the stopSignaled flag is set to true.
    1810                 :             :          */
    1811         [ #  # ]:           0 :         if (sync_process_pid != InvalidPid)
    1812                 :           0 :                 kill(sync_process_pid, SIGUSR1);
    1813                 :             : 
    1814                 :             :         /* Wait for slot sync to end */
    1815                 :           0 :         for (;;)
    1816                 :             :         {
    1817                 :           0 :                 int                     rc;
    1818                 :             : 
    1819                 :             :                 /* Wait a bit, we don't expect to have to wait long */
    1820                 :           0 :                 rc = WaitLatch(MyLatch,
    1821                 :             :                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1822                 :             :                                            10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
    1823                 :             : 
    1824         [ #  # ]:           0 :                 if (rc & WL_LATCH_SET)
    1825                 :             :                 {
    1826                 :           0 :                         ResetLatch(MyLatch);
    1827         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
    1828                 :           0 :                 }
    1829                 :             : 
    1830         [ #  # ]:           0 :                 SpinLockAcquire(&SlotSyncCtx->mutex);
    1831                 :             : 
    1832                 :             :                 /* Ensure that no process is syncing the slots. */
    1833         [ #  # ]:           0 :                 if (!SlotSyncCtx->syncing)
    1834                 :           0 :                         break;
    1835                 :             : 
    1836                 :           0 :                 SpinLockRelease(&SlotSyncCtx->mutex);
    1837         [ #  # ]:           0 :         }
    1838                 :             : 
    1839                 :           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1840                 :             : 
    1841                 :           0 :         update_synced_slots_inactive_since();
    1842                 :           4 : }
    1843                 :             : 
    1844                 :             : /*
    1845                 :             :  * SlotSyncWorkerCanRestart
    1846                 :             :  *
    1847                 :             :  * Return true, indicating worker is allowed to restart, if enough time has
    1848                 :             :  * passed since it was last launched to reach SLOTSYNC_RESTART_INTERVAL_SEC.
    1849                 :             :  * Otherwise return false.
    1850                 :             :  *
    1851                 :             :  * This is a safety valve to protect against continuous respawn attempts if the
    1852                 :             :  * worker is dying immediately at launch. Note that since we will retry to
    1853                 :             :  * launch the worker from the postmaster main loop, we will get another
    1854                 :             :  * chance later.
    1855                 :             :  */
    1856                 :             : bool
    1857                 :           0 : SlotSyncWorkerCanRestart(void)
    1858                 :             : {
    1859                 :           0 :         time_t          curtime = time(NULL);
    1860                 :             : 
    1861                 :             :         /*
    1862                 :             :          * If first time through, or time somehow went backwards, always update
    1863                 :             :          * last_start_time to match the current clock and allow worker start.
    1864                 :             :          * Otherwise allow it only once enough time has elapsed.
    1865                 :             :          */
    1866         [ #  # ]:           0 :         if (SlotSyncCtx->last_start_time == 0 ||
    1867   [ #  #  #  # ]:           0 :                 curtime < SlotSyncCtx->last_start_time ||
    1868                 :           0 :                 curtime - SlotSyncCtx->last_start_time >= SLOTSYNC_RESTART_INTERVAL_SEC)
    1869                 :             :         {
    1870                 :           0 :                 SlotSyncCtx->last_start_time = curtime;
    1871                 :           0 :                 return true;
    1872                 :             :         }
    1873                 :           0 :         return false;
    1874                 :           0 : }
    1875                 :             : 
    1876                 :             : /*
    1877                 :             :  * Is current process syncing replication slots?
    1878                 :             :  *
    1879                 :             :  * Could be either backend executing SQL function or slot sync worker.
    1880                 :             :  */
    1881                 :             : bool
    1882                 :           0 : IsSyncingReplicationSlots(void)
    1883                 :             : {
    1884                 :           0 :         return syncing_slots;
    1885                 :             : }
    1886                 :             : 
    1887                 :             : /*
    1888                 :             :  * Amount of shared memory required for slot synchronization.
    1889                 :             :  */
    1890                 :             : Size
    1891                 :          15 : SlotSyncShmemSize(void)
    1892                 :             : {
    1893                 :          15 :         return sizeof(SlotSyncCtxStruct);
    1894                 :             : }
    1895                 :             : 
    1896                 :             : /*
    1897                 :             :  * Allocate and initialize the shared memory of slot synchronization.
    1898                 :             :  */
    1899                 :             : void
    1900                 :           6 : SlotSyncShmemInit(void)
    1901                 :             : {
    1902                 :           6 :         Size            size = SlotSyncShmemSize();
    1903                 :           6 :         bool            found;
    1904                 :             : 
    1905                 :           6 :         SlotSyncCtx = (SlotSyncCtxStruct *)
    1906                 :           6 :                 ShmemInitStruct("Slot Sync Data", size, &found);
    1907                 :             : 
    1908         [ -  + ]:           6 :         if (!found)
    1909                 :             :         {
    1910                 :           6 :                 memset(SlotSyncCtx, 0, size);
    1911                 :           6 :                 SlotSyncCtx->pid = InvalidPid;
    1912                 :           6 :                 SpinLockInit(&SlotSyncCtx->mutex);
    1913                 :           6 :         }
    1914                 :           6 : }
    1915                 :             : 
    1916                 :             : /*
    1917                 :             :  * Error cleanup callback for slot sync SQL function.
    1918                 :             :  */
    1919                 :             : static void
    1920                 :           0 : slotsync_failure_callback(int code, Datum arg)
    1921                 :             : {
    1922                 :           0 :         WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1923                 :             : 
    1924                 :             :         /*
    1925                 :             :          * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1926                 :             :          *
    1927                 :             :          * The startup process during promotion invokes ShutDownSlotSync() which
    1928                 :             :          * waits for slot sync to finish and it does that by checking the
    1929                 :             :          * 'syncing' flag. Thus the SQL function must be done with slots' release
    1930                 :             :          * and cleanup to avoid any dangling temporary slots or active slots
    1931                 :             :          * before it marks itself as finished syncing.
    1932                 :             :          */
    1933                 :             : 
    1934                 :             :         /* Make sure active replication slots are released */
    1935         [ #  # ]:           0 :         if (MyReplicationSlot != NULL)
    1936                 :           0 :                 ReplicationSlotRelease();
    1937                 :             : 
    1938                 :             :         /* Also cleanup the synced temporary slots. */
    1939                 :           0 :         ReplicationSlotCleanup(true);
    1940                 :             : 
    1941                 :             :         /*
    1942                 :             :          * The set syncing_slots indicates that the process errored out without
    1943                 :             :          * resetting the flag. So, we need to clean up shared memory and reset the
    1944                 :             :          * flag here.
    1945                 :             :          */
    1946         [ #  # ]:           0 :         if (syncing_slots)
    1947                 :           0 :                 reset_syncing_flag();
    1948                 :             : 
    1949                 :           0 :         walrcv_disconnect(wrconn);
    1950                 :           0 : }
    1951                 :             : 
    1952                 :             : /*
    1953                 :             :  * Helper function to extract slot names from a list of remote slots
    1954                 :             :  */
    1955                 :             : static List *
    1956                 :           0 : extract_slot_names(List *remote_slots)
    1957                 :             : {
    1958                 :           0 :         List       *slot_names = NIL;
    1959                 :             : 
    1960   [ #  #  #  #  :           0 :         foreach_ptr(RemoteSlot, remote_slot, remote_slots)
             #  #  #  # ]
    1961                 :             :         {
    1962                 :           0 :                 char       *slot_name;
    1963                 :             : 
    1964                 :           0 :                 slot_name = pstrdup(remote_slot->name);
    1965                 :           0 :                 slot_names = lappend(slot_names, slot_name);
    1966                 :           0 :         }
    1967                 :             : 
    1968                 :           0 :         return slot_names;
    1969                 :           0 : }
    1970                 :             : 
    1971                 :             : /*
    1972                 :             :  * Synchronize the failover enabled replication slots using the specified
    1973                 :             :  * primary server connection.
    1974                 :             :  *
    1975                 :             :  * Repeatedly fetches and updates replication slot information from the
    1976                 :             :  * primary until all slots are at least "sync ready".
    1977                 :             :  *
    1978                 :             :  * Exits early if promotion is triggered or certain critical
    1979                 :             :  * configuration parameters have changed.
    1980                 :             :  */
    1981                 :             : void
    1982                 :           0 : SyncReplicationSlots(WalReceiverConn *wrconn)
    1983                 :             : {
    1984         [ #  # ]:           0 :         PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    1985                 :             :         {
    1986                 :           0 :                 List       *remote_slots = NIL;
    1987                 :           0 :                 List       *slot_names = NIL;   /* List of slot names to track */
    1988                 :             : 
    1989                 :           0 :                 check_and_set_sync_info(MyProcPid);
    1990                 :             : 
    1991                 :             :                 /* Check for interrupts and config changes */
    1992                 :           0 :                 ProcessSlotSyncInterrupts();
    1993                 :             : 
    1994                 :           0 :                 validate_remote_info(wrconn);
    1995                 :             : 
    1996                 :             :                 /* Retry until all the slots are sync-ready */
    1997                 :           0 :                 for (;;)
    1998                 :             :                 {
    1999                 :           0 :                         bool            slot_persistence_pending = false;
    2000                 :           0 :                         bool            some_slot_updated = false;
    2001                 :             : 
    2002                 :             :                         /* Check for interrupts and config changes */
    2003                 :           0 :                         ProcessSlotSyncInterrupts();
    2004                 :             : 
    2005                 :             :                         /* We must be in a valid transaction state */
    2006         [ #  # ]:           0 :                         Assert(IsTransactionState());
    2007                 :             : 
    2008                 :             :                         /*
    2009                 :             :                          * Fetch remote slot info for the given slot_names. If slot_names
    2010                 :             :                          * is NIL, fetch all failover-enabled slots. Note that we reuse
    2011                 :             :                          * slot_names from the first iteration; re-fetching all failover
    2012                 :             :                          * slots each time could cause an endless loop. Instead of
    2013                 :             :                          * reprocessing only the pending slots in each iteration, it's
    2014                 :             :                          * better to process all the slots received in the first
    2015                 :             :                          * iteration. This ensures that by the time we're done, all slots
    2016                 :             :                          * reflect the latest values.
    2017                 :             :                          */
    2018                 :           0 :                         remote_slots = fetch_remote_slots(wrconn, slot_names);
    2019                 :             : 
    2020                 :             :                         /* Attempt to synchronize slots */
    2021                 :           0 :                         some_slot_updated = synchronize_slots(wrconn, remote_slots,
    2022                 :             :                                                                                                   &slot_persistence_pending);
    2023                 :             : 
    2024                 :             :                         /*
    2025                 :             :                          * If slot_persistence_pending is true, extract slot names for
    2026                 :             :                          * future iterations (only needed if we haven't done it yet)
    2027                 :             :                          */
    2028   [ #  #  #  # ]:           0 :                         if (slot_names == NIL && slot_persistence_pending)
    2029                 :           0 :                                 slot_names = extract_slot_names(remote_slots);
    2030                 :             : 
    2031                 :             :                         /* Free the current remote_slots list */
    2032                 :           0 :                         list_free_deep(remote_slots);
    2033                 :             : 
    2034                 :             :                         /* Done if all slots are persisted i.e are sync-ready */
    2035         [ #  # ]:           0 :                         if (!slot_persistence_pending)
    2036                 :           0 :                                 break;
    2037                 :             : 
    2038                 :             :                         /* wait before retrying again */
    2039                 :           0 :                         wait_for_slot_activity(some_slot_updated);
    2040      [ #  #  # ]:           0 :                 }
    2041                 :             : 
    2042         [ #  # ]:           0 :                 if (slot_names)
    2043                 :           0 :                         list_free_deep(slot_names);
    2044                 :             : 
    2045                 :             :                 /* Cleanup the synced temporary slots */
    2046                 :           0 :                 ReplicationSlotCleanup(true);
    2047                 :             : 
    2048                 :             :                 /* We are done with sync, so reset sync flag */
    2049                 :           0 :                 reset_syncing_flag();
    2050                 :           0 :         }
    2051         [ #  # ]:           0 :         PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    2052                 :           0 : }
        

Generated by: LCOV version 2.3.2-1