LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 20.7 % 1228 254
Test Date: 2026-01-26 10:56:24 Functions: 41.7 % 48 20
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 9.2 % 1102 101

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * slot.c
       4                 :             :  *         Replication slot management.
       5                 :             :  *
       6                 :             :  *
       7                 :             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       8                 :             :  *
       9                 :             :  *
      10                 :             :  * IDENTIFICATION
      11                 :             :  *        src/backend/replication/slot.c
      12                 :             :  *
      13                 :             :  * NOTES
      14                 :             :  *
      15                 :             :  * Replication slots are used to keep state about replication streams
      16                 :             :  * originating from this cluster.  Their primary purpose is to prevent the
      17                 :             :  * premature removal of WAL or of old tuple versions in a manner that would
      18                 :             :  * interfere with replication; they are also useful for monitoring purposes.
      19                 :             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20                 :             :  * on standbys (to support cascading setups).  The requirement that slots be
      21                 :             :  * usable on standbys precludes storing them in the system catalogs.
      22                 :             :  *
      23                 :             :  * Each replication slot gets its own directory inside the directory
      24                 :             :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25                 :             :  * contain the slot's own data.  Additional data can be stored alongside that
      26                 :             :  * file if required.  While the server is running, the state data is also
      27                 :             :  * cached in memory for efficiency.
      28                 :             :  *
      29                 :             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30                 :             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31                 :             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32                 :             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33                 :             :  *
      34                 :             :  *-------------------------------------------------------------------------
      35                 :             :  */
      36                 :             : 
      37                 :             : #include "postgres.h"
      38                 :             : 
      39                 :             : #include <unistd.h>
      40                 :             : #include <sys/stat.h>
      41                 :             : 
      42                 :             : #include "access/transam.h"
      43                 :             : #include "access/xlog_internal.h"
      44                 :             : #include "access/xlogrecovery.h"
      45                 :             : #include "common/file_utils.h"
      46                 :             : #include "common/string.h"
      47                 :             : #include "miscadmin.h"
      48                 :             : #include "pgstat.h"
      49                 :             : #include "postmaster/interrupt.h"
      50                 :             : #include "replication/logicallauncher.h"
      51                 :             : #include "replication/slotsync.h"
      52                 :             : #include "replication/slot.h"
      53                 :             : #include "replication/walsender_private.h"
      54                 :             : #include "storage/fd.h"
      55                 :             : #include "storage/ipc.h"
      56                 :             : #include "storage/proc.h"
      57                 :             : #include "storage/procarray.h"
      58                 :             : #include "utils/builtins.h"
      59                 :             : #include "utils/guc_hooks.h"
      60                 :             : #include "utils/injection_point.h"
      61                 :             : #include "utils/varlena.h"
      62                 :             : 
      63                 :             : /*
      64                 :             :  * Replication slot on-disk data structure.
      65                 :             :  */
      66                 :             : typedef struct ReplicationSlotOnDisk
      67                 :             : {
      68                 :             :         /* first part of this struct needs to be version independent */
      69                 :             : 
      70                 :             :         /* data not covered by checksum */
      71                 :             :         uint32          magic;
      72                 :             :         pg_crc32c       checksum;
      73                 :             : 
      74                 :             :         /* data covered by checksum */
      75                 :             :         uint32          version;
      76                 :             :         uint32          length;
      77                 :             : 
      78                 :             :         /*
      79                 :             :          * The actual data in the slot that follows can differ based on the above
      80                 :             :          * 'version'.
      81                 :             :          */
      82                 :             : 
      83                 :             :         ReplicationSlotPersistentData slotdata;
      84                 :             : } ReplicationSlotOnDisk;
      85                 :             : 
      86                 :             : /*
      87                 :             :  * Struct for the configuration of synchronized_standby_slots.
      88                 :             :  *
      89                 :             :  * Note: this must be a flat representation that can be held in a single chunk
      90                 :             :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      91                 :             :  * synchronized_standby_slots GUC.
      92                 :             :  */
      93                 :             : typedef struct
      94                 :             : {
      95                 :             :         /* Number of slot names in the slot_names[] */
      96                 :             :         int                     nslotnames;
      97                 :             : 
      98                 :             :         /*
      99                 :             :          * slot_names contains 'nslotnames' consecutive null-terminated C strings.
     100                 :             :          */
     101                 :             :         char            slot_names[FLEXIBLE_ARRAY_MEMBER];
     102                 :             : } SyncStandbySlotsConfigData;
     103                 :             : 
     104                 :             : /*
     105                 :             :  * Lookup table for slot invalidation causes.
     106                 :             :  */
     107                 :             : typedef struct SlotInvalidationCauseMap
     108                 :             : {
     109                 :             :         ReplicationSlotInvalidationCause cause;
     110                 :             :         const char *cause_name;
     111                 :             : } SlotInvalidationCauseMap;
     112                 :             : 
     113                 :             : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
     114                 :             :         {RS_INVAL_NONE, "none"},
     115                 :             :         {RS_INVAL_WAL_REMOVED, "wal_removed"},
     116                 :             :         {RS_INVAL_HORIZON, "rows_removed"},
     117                 :             :         {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
     118                 :             :         {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
     119                 :             : };
     120                 :             : 
     121                 :             : /*
     122                 :             :  * Ensure that the lookup table is up-to-date with the enums defined in
     123                 :             :  * ReplicationSlotInvalidationCause.
     124                 :             :  */
     125                 :             : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     126                 :             :                                  "array length mismatch");
     127                 :             : 
     128                 :             : /* size of version independent data */
     129                 :             : #define ReplicationSlotOnDiskConstantSize \
     130                 :             :         offsetof(ReplicationSlotOnDisk, slotdata)
     131                 :             : /* size of the part of the slot not covered by the checksum */
     132                 :             : #define ReplicationSlotOnDiskNotChecksummedSize  \
     133                 :             :         offsetof(ReplicationSlotOnDisk, version)
     134                 :             : /* size of the part covered by the checksum */
     135                 :             : #define ReplicationSlotOnDiskChecksummedSize \
     136                 :             :         sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     137                 :             : /* size of the slot data that is version dependent */
     138                 :             : #define ReplicationSlotOnDiskV2Size \
     139                 :             :         sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     140                 :             : 
     141                 :             : #define SLOT_MAGIC              0x1051CA1       /* format identifier */
     142                 :             : #define SLOT_VERSION    5               /* version for new files */
     143                 :             : 
     144                 :             : /* Control array for replication slot management */
     145                 :             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     146                 :             : 
     147                 :             : /* My backend's replication slot in the shared memory array */
     148                 :             : ReplicationSlot *MyReplicationSlot = NULL;
     149                 :             : 
     150                 :             : /* GUC variables */
     151                 :             : int                     max_replication_slots = 10; /* the maximum number of replication
     152                 :             :                                                                                  * slots */
     153                 :             : 
     154                 :             : /*
     155                 :             :  * Invalidate replication slots that have remained idle longer than this
     156                 :             :  * duration; '0' disables it.
     157                 :             :  */
     158                 :             : int                     idle_replication_slot_timeout_secs = 0;
     159                 :             : 
     160                 :             : /*
     161                 :             :  * This GUC lists streaming replication standby server slot names that
     162                 :             :  * logical WAL sender processes will wait for.
     163                 :             :  */
     164                 :             : char       *synchronized_standby_slots;
     165                 :             : 
     166                 :             : /* This is the parsed and cached configuration for synchronized_standby_slots */
     167                 :             : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     168                 :             : 
     169                 :             : /*
     170                 :             :  * Oldest LSN that has been confirmed to be flushed to the standbys
     171                 :             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     172                 :             :  */
     173                 :             : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     174                 :             : 
     175                 :             : static void ReplicationSlotShmemExit(int code, Datum arg);
     176                 :             : static bool IsSlotForConflictCheck(const char *name);
     177                 :             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     178                 :             : 
     179                 :             : /* internal persistency functions */
     180                 :             : static void RestoreSlotFromDisk(const char *name);
     181                 :             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     182                 :             : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     183                 :             : 
     184                 :             : /*
     185                 :             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     186                 :             :  */
     187                 :             : Size
     188                 :          21 : ReplicationSlotsShmemSize(void)
     189                 :             : {
     190                 :          21 :         Size            size = 0;
     191                 :             : 
     192         [ +  - ]:          21 :         if (max_replication_slots == 0)
     193                 :           0 :                 return size;
     194                 :             : 
     195                 :          21 :         size = offsetof(ReplicationSlotCtlData, replication_slots);
     196                 :          42 :         size = add_size(size,
     197                 :          21 :                                         mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     198                 :             : 
     199                 :          21 :         return size;
     200                 :          21 : }
     201                 :             : 
     202                 :             : /*
     203                 :             :  * Allocate and initialize shared memory for replication slots.
     204                 :             :  */
     205                 :             : void
     206                 :           6 : ReplicationSlotsShmemInit(void)
     207                 :             : {
     208                 :           6 :         bool            found;
     209                 :             : 
     210         [ +  - ]:           6 :         if (max_replication_slots == 0)
     211                 :           0 :                 return;
     212                 :             : 
     213                 :           6 :         ReplicationSlotCtl = (ReplicationSlotCtlData *)
     214                 :           6 :                 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     215                 :             :                                                 &found);
     216                 :             : 
     217         [ -  + ]:           6 :         if (!found)
     218                 :             :         {
     219                 :           6 :                 int                     i;
     220                 :             : 
     221                 :             :                 /* First time through, so initialize */
     222   [ +  -  +  -  :           6 :                 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
          +  -  +  -  #  
                      # ]
     223                 :             : 
     224         [ +  + ]:          66 :                 for (i = 0; i < max_replication_slots; i++)
     225                 :             :                 {
     226                 :          60 :                         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     227                 :             : 
     228                 :             :                         /* everything else is zeroed by the memset above */
     229                 :          60 :                         SpinLockInit(&slot->mutex);
     230                 :          60 :                         LWLockInitialize(&slot->io_in_progress_lock,
     231                 :             :                                                          LWTRANCHE_REPLICATION_SLOT_IO);
     232                 :          60 :                         ConditionVariableInit(&slot->active_cv);
     233                 :          60 :                 }
     234                 :           6 :         }
     235         [ -  + ]:           6 : }
     236                 :             : 
     237                 :             : /*
     238                 :             :  * Register the callback for replication slot cleanup and releasing.
     239                 :             :  */
     240                 :             : void
     241                 :         806 : ReplicationSlotInitialize(void)
     242                 :             : {
     243                 :         806 :         before_shmem_exit(ReplicationSlotShmemExit, 0);
     244                 :         806 : }
     245                 :             : 
     246                 :             : /*
     247                 :             :  * Release and cleanup replication slots.
     248                 :             :  */
     249                 :             : static void
     250                 :         806 : ReplicationSlotShmemExit(int code, Datum arg)
     251                 :             : {
     252                 :             :         /* Make sure active replication slots are released */
     253         [ +  - ]:         806 :         if (MyReplicationSlot != NULL)
     254                 :           0 :                 ReplicationSlotRelease();
     255                 :             : 
     256                 :             :         /* Also cleanup all the temporary slots. */
     257                 :         806 :         ReplicationSlotCleanup(false);
     258                 :         806 : }
     259                 :             : 
     260                 :             : /*
     261                 :             :  * Check whether the passed slot name is valid and report errors at elevel.
     262                 :             :  *
     263                 :             :  * See comments for ReplicationSlotValidateNameInternal().
     264                 :             :  */
     265                 :             : bool
     266                 :           3 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
     267                 :             :                                                         int elevel)
     268                 :             : {
     269                 :           3 :         int                     err_code;
     270                 :           3 :         char       *err_msg = NULL;
     271                 :           3 :         char       *err_hint = NULL;
     272                 :             : 
     273         [ +  + ]:           3 :         if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
     274                 :             :                                                                                          &err_code, &err_msg, &err_hint))
     275                 :             :         {
     276                 :             :                 /*
     277                 :             :                  * Use errmsg_internal() and errhint_internal() instead of errmsg()
     278                 :             :                  * and errhint(), since the messages from
     279                 :             :                  * ReplicationSlotValidateNameInternal() are already translated. This
     280                 :             :                  * avoids double translation.
     281                 :             :                  */
     282   [ -  +  #  #  :           2 :                 ereport(elevel,
          +  +  -  +  #  
                #  #  # ]
     283                 :             :                                 errcode(err_code),
     284                 :             :                                 errmsg_internal("%s", err_msg),
     285                 :             :                                 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
     286                 :             : 
     287                 :           0 :                 pfree(err_msg);
     288         [ #  # ]:           0 :                 if (err_hint != NULL)
     289                 :           0 :                         pfree(err_hint);
     290                 :           0 :                 return false;
     291                 :             :         }
     292                 :             : 
     293                 :           1 :         return true;
     294                 :           1 : }
     295                 :             : 
     296                 :             : /*
     297                 :             :  * Check whether the passed slot name is valid.
     298                 :             :  *
     299                 :             :  * An error will be reported for a reserved replication slot name if
     300                 :             :  * allow_reserved_name is set to false.
     301                 :             :  *
     302                 :             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     303                 :             :  * the name to be used as a directory name on every supported OS.
     304                 :             :  *
     305                 :             :  * Returns true if the slot name is valid. Otherwise, returns false and stores
     306                 :             :  * the error code, error message, and optional hint in err_code, err_msg, and
     307                 :             :  * err_hint, respectively. The caller is responsible for freeing err_msg and
     308                 :             :  * err_hint, which are palloc'd.
     309                 :             :  */
     310                 :             : bool
     311                 :           2 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
     312                 :             :                                                                         int *err_code, char **err_msg, char **err_hint)
     313                 :             : {
     314                 :           2 :         const char *cp;
     315                 :             : 
     316         [ +  + ]:           2 :         if (strlen(name) == 0)
     317                 :             :         {
     318                 :           1 :                 *err_code = ERRCODE_INVALID_NAME;
     319                 :           1 :                 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
     320                 :           1 :                 *err_hint = NULL;
     321                 :           1 :                 return false;
     322                 :             :         }
     323                 :             : 
     324         [ -  + ]:           1 :         if (strlen(name) >= NAMEDATALEN)
     325                 :             :         {
     326                 :           0 :                 *err_code = ERRCODE_NAME_TOO_LONG;
     327                 :           0 :                 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
     328                 :           0 :                 *err_hint = NULL;
     329                 :           0 :                 return false;
     330                 :             :         }
     331                 :             : 
     332         [ +  + ]:           8 :         for (cp = name; *cp; cp++)
     333                 :             :         {
     334   [ +  -  #  # ]:           7 :                 if (!((*cp >= 'a' && *cp <= 'z')
     335         [ #  # ]:           7 :                           || (*cp >= '0' && *cp <= '9')
     336                 :           0 :                           || (*cp == '_')))
     337                 :             :                 {
     338                 :           0 :                         *err_code = ERRCODE_INVALID_NAME;
     339                 :           0 :                         *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
     340                 :           0 :                         *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
     341                 :           0 :                         return false;
     342                 :             :                 }
     343                 :           7 :         }
     344                 :             : 
     345   [ +  -  +  - ]:           1 :         if (!allow_reserved_name && IsSlotForConflictCheck(name))
     346                 :             :         {
     347                 :           0 :                 *err_code = ERRCODE_RESERVED_NAME;
     348                 :           0 :                 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
     349                 :           0 :                 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
     350                 :             :                                                          CONFLICT_DETECTION_SLOT);
     351                 :           0 :                 return false;
     352                 :             :         }
     353                 :             : 
     354                 :           1 :         return true;
     355                 :           2 : }
     356                 :             : 
     357                 :             : /*
     358                 :             :  * Return true if the replication slot name is "pg_conflict_detection".
     359                 :             :  */
     360                 :             : static bool
     361                 :           1 : IsSlotForConflictCheck(const char *name)
     362                 :             : {
     363                 :           1 :         return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
     364                 :             : }
     365                 :             : 
     366                 :             : /*
     367                 :             :  * Create a new replication slot and mark it as used by this backend.
     368                 :             :  *
     369                 :             :  * name: Name of the slot
     370                 :             :  * db_specific: logical decoding is db specific; if the slot is going to
     371                 :             :  *         be used for that pass true, otherwise false.
     372                 :             :  * two_phase: If enabled, allows decoding of prepared transactions.
     373                 :             :  * failover: If enabled, allows the slot to be synced to standbys so
     374                 :             :  *     that logical replication can be resumed after failover.
     375                 :             :  * synced: True if the slot is synchronized from the primary server.
     376                 :             :  */
     377                 :             : void
     378                 :           0 : ReplicationSlotCreate(const char *name, bool db_specific,
     379                 :             :                                           ReplicationSlotPersistency persistency,
     380                 :             :                                           bool two_phase, bool failover, bool synced)
     381                 :             : {
     382                 :           0 :         ReplicationSlot *slot = NULL;
     383                 :           0 :         int                     i;
     384                 :             : 
     385         [ #  # ]:           0 :         Assert(MyReplicationSlot == NULL);
     386                 :             : 
     387                 :             :         /*
     388                 :             :          * The logical launcher or pg_upgrade may create or migrate an internal
     389                 :             :          * slot, so using a reserved name is allowed in these cases.
     390                 :             :          */
     391         [ #  # ]:           0 :         ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
     392                 :             :                                                                 ERROR);
     393                 :             : 
     394         [ #  # ]:           0 :         if (failover)
     395                 :             :         {
     396                 :             :                 /*
     397                 :             :                  * Do not allow users to create the failover enabled slots on the
     398                 :             :                  * standby as we do not support sync to the cascading standby.
     399                 :             :                  *
     400                 :             :                  * However, failover enabled slots can be created during slot
     401                 :             :                  * synchronization because we need to retain the same values as the
     402                 :             :                  * remote slot.
     403                 :             :                  */
     404   [ #  #  #  # ]:           0 :                 if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     405   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     406                 :             :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     407                 :             :                                         errmsg("cannot enable failover for a replication slot created on the standby"));
     408                 :             : 
     409                 :             :                 /*
     410                 :             :                  * Do not allow users to create failover enabled temporary slots,
     411                 :             :                  * because temporary slots will not be synced to the standby.
     412                 :             :                  *
     413                 :             :                  * However, failover enabled temporary slots can be created during
     414                 :             :                  * slot synchronization. See the comments atop slotsync.c for details.
     415                 :             :                  */
     416   [ #  #  #  # ]:           0 :                 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     417   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     418                 :             :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     419                 :             :                                         errmsg("cannot enable failover for a temporary replication slot"));
     420                 :           0 :         }
     421                 :             : 
     422                 :             :         /*
     423                 :             :          * If some other backend ran this code concurrently with us, we'd likely
     424                 :             :          * both allocate the same slot, and that would be bad.  We'd also be at
     425                 :             :          * risk of missing a name collision.  Also, we don't want to try to create
     426                 :             :          * a new slot while somebody's busy cleaning up an old one, because we
     427                 :             :          * might both be monkeying with the same directory.
     428                 :             :          */
     429                 :           0 :         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     430                 :             : 
     431                 :             :         /*
     432                 :             :          * Check for name collision, and identify an allocatable slot.  We need to
     433                 :             :          * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     434                 :             :          * else can change the in_use flags while we're looking at them.
     435                 :             :          */
     436                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     437         [ #  # ]:           0 :         for (i = 0; i < max_replication_slots; i++)
     438                 :             :         {
     439                 :           0 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     440                 :             : 
     441   [ #  #  #  # ]:           0 :                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     442   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     443                 :             :                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
     444                 :             :                                          errmsg("replication slot \"%s\" already exists", name)));
     445   [ #  #  #  # ]:           0 :                 if (!s->in_use && slot == NULL)
     446                 :           0 :                         slot = s;
     447                 :           0 :         }
     448                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
     449                 :             : 
     450                 :             :         /* If all slots are in use, we're out of luck. */
     451         [ #  # ]:           0 :         if (slot == NULL)
     452   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     453                 :             :                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     454                 :             :                                  errmsg("all replication slots are in use"),
     455                 :             :                                  errhint("Free one or increase \"max_replication_slots\".")));
     456                 :             : 
     457                 :             :         /*
     458                 :             :          * Since this slot is not in use, nobody should be looking at any part of
     459                 :             :          * it other than the in_use field unless they're trying to allocate it.
     460                 :             :          * And since we hold ReplicationSlotAllocationLock, nobody except us can
     461                 :             :          * be doing that.  So it's safe to initialize the slot.
     462                 :             :          */
     463         [ #  # ]:           0 :         Assert(!slot->in_use);
     464         [ #  # ]:           0 :         Assert(slot->active_pid == 0);
     465                 :             : 
     466                 :             :         /* first initialize persistent data */
     467                 :           0 :         memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     468                 :           0 :         namestrcpy(&slot->data.name, name);
     469         [ #  # ]:           0 :         slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     470                 :           0 :         slot->data.persistency = persistency;
     471                 :           0 :         slot->data.two_phase = two_phase;
     472                 :           0 :         slot->data.two_phase_at = InvalidXLogRecPtr;
     473                 :           0 :         slot->data.failover = failover;
     474                 :           0 :         slot->data.synced = synced;
     475                 :             : 
     476                 :             :         /* and then data only present in shared memory */
     477                 :           0 :         slot->just_dirtied = false;
     478                 :           0 :         slot->dirty = false;
     479                 :           0 :         slot->effective_xmin = InvalidTransactionId;
     480                 :           0 :         slot->effective_catalog_xmin = InvalidTransactionId;
     481                 :           0 :         slot->candidate_catalog_xmin = InvalidTransactionId;
     482                 :           0 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     483                 :           0 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
     484                 :           0 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
     485                 :           0 :         slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     486                 :           0 :         slot->last_saved_restart_lsn = InvalidXLogRecPtr;
     487                 :           0 :         slot->inactive_since = 0;
     488                 :           0 :         slot->slotsync_skip_reason = SS_SKIP_NONE;
     489                 :             : 
     490                 :             :         /*
     491                 :             :          * Create the slot on disk.  We haven't actually marked the slot allocated
     492                 :             :          * yet, so no special cleanup is required if this errors out.
     493                 :             :          */
     494                 :           0 :         CreateSlotOnDisk(slot);
     495                 :             : 
     496                 :             :         /*
     497                 :             :          * We need to briefly prevent any other backend from iterating over the
     498                 :             :          * slots while we flip the in_use flag. We also need to set the active
     499                 :             :          * flag while holding the ControlLock as otherwise a concurrent
     500                 :             :          * ReplicationSlotAcquire() could acquire the slot as well.
     501                 :             :          */
     502                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     503                 :             : 
     504                 :           0 :         slot->in_use = true;
     505                 :             : 
     506                 :             :         /* We can now mark the slot active, and that makes it our slot. */
     507         [ #  # ]:           0 :         SpinLockAcquire(&slot->mutex);
     508         [ #  # ]:           0 :         Assert(slot->active_pid == 0);
     509                 :           0 :         slot->active_pid = MyProcPid;
     510                 :           0 :         SpinLockRelease(&slot->mutex);
     511                 :           0 :         MyReplicationSlot = slot;
     512                 :             : 
     513                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
     514                 :             : 
     515                 :             :         /*
     516                 :             :          * Create statistics entry for the new logical slot. We don't collect any
     517                 :             :          * stats for physical slots, so no need to create an entry for the same.
     518                 :             :          * See ReplicationSlotDropPtr for why we need to do this before releasing
     519                 :             :          * ReplicationSlotAllocationLock.
     520                 :             :          */
     521         [ #  # ]:           0 :         if (SlotIsLogical(slot))
     522                 :           0 :                 pgstat_create_replslot(slot);
     523                 :             : 
     524                 :             :         /*
     525                 :             :          * Now that the slot has been marked as in_use and active, it's safe to
     526                 :             :          * let somebody else try to allocate a slot.
     527                 :             :          */
     528                 :           0 :         LWLockRelease(ReplicationSlotAllocationLock);
     529                 :             : 
     530                 :             :         /* Let everybody know we've modified this slot */
     531                 :           0 :         ConditionVariableBroadcast(&slot->active_cv);
     532                 :           0 : }
     533                 :             : 
     534                 :             : /*
     535                 :             :  * Search for the named replication slot.
     536                 :             :  *
     537                 :             :  * Return the replication slot if found, otherwise NULL.
     538                 :             :  */
     539                 :             : ReplicationSlot *
     540                 :           1 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     541                 :             : {
     542                 :           1 :         int                     i;
     543                 :           1 :         ReplicationSlot *slot = NULL;
     544                 :             : 
     545         [ -  + ]:           1 :         if (need_lock)
     546                 :           1 :                 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     547                 :             : 
     548         [ +  + ]:          11 :         for (i = 0; i < max_replication_slots; i++)
     549                 :             :         {
     550                 :          10 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     551                 :             : 
     552   [ -  +  #  # ]:          10 :                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     553                 :             :                 {
     554                 :           0 :                         slot = s;
     555                 :           0 :                         break;
     556                 :             :                 }
     557      [ -  -  + ]:          10 :         }
     558                 :             : 
     559         [ -  + ]:           1 :         if (need_lock)
     560                 :           1 :                 LWLockRelease(ReplicationSlotControlLock);
     561                 :             : 
     562                 :           2 :         return slot;
     563                 :           1 : }
     564                 :             : 
     565                 :             : /*
     566                 :             :  * Return the index of the replication slot in
     567                 :             :  * ReplicationSlotCtl->replication_slots.
     568                 :             :  *
     569                 :             :  * This is mainly useful to have an efficient key for storing replication slot
     570                 :             :  * stats.
     571                 :             :  */
     572                 :             : int
     573                 :           0 : ReplicationSlotIndex(ReplicationSlot *slot)
     574                 :             : {
     575         [ #  # ]:           0 :         Assert(slot >= ReplicationSlotCtl->replication_slots &&
     576                 :             :                    slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
     577                 :             : 
     578                 :           0 :         return slot - ReplicationSlotCtl->replication_slots;
     579                 :             : }
     580                 :             : 
     581                 :             : /*
     582                 :             :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     583                 :             :  * the slot's name and true is returned.
     584                 :             :  *
     585                 :             :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     586                 :             :  * cases there are obvious TOCTOU issues.
     587                 :             :  */
     588                 :             : bool
     589                 :           0 : ReplicationSlotName(int index, Name name)
     590                 :             : {
     591                 :           0 :         ReplicationSlot *slot;
     592                 :           0 :         bool            found;
     593                 :             : 
     594                 :           0 :         slot = &ReplicationSlotCtl->replication_slots[index];
     595                 :             : 
     596                 :             :         /*
     597                 :             :          * Ensure that the slot cannot be dropped while we copy the name. Don't
     598                 :             :          * need the spinlock as the name of an existing slot cannot change.
     599                 :             :          */
     600                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     601                 :           0 :         found = slot->in_use;
     602         [ #  # ]:           0 :         if (slot->in_use)
     603                 :           0 :                 namestrcpy(name, NameStr(slot->data.name));
     604                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
     605                 :             : 
     606                 :           0 :         return found;
     607                 :           0 : }
     608                 :             : 
     609                 :             : /*
     610                 :             :  * Find a previously created slot and mark it as used by this process.
     611                 :             :  *
     612                 :             :  * An error is raised if nowait is true and the slot is currently in use. If
     613                 :             :  * nowait is false, we sleep until the slot is released by the owning process.
     614                 :             :  *
     615                 :             :  * An error is raised if error_if_invalid is true and the slot is found to
     616                 :             :  * be invalid. It should always be set to true, except when we are temporarily
     617                 :             :  * acquiring the slot and don't intend to change it.
     618                 :             :  */
     619                 :             : void
     620                 :           0 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
     621                 :             : {
     622                 :           0 :         ReplicationSlot *s;
     623                 :           0 :         int                     active_pid;
     624                 :             : 
     625         [ #  # ]:           0 :         Assert(name != NULL);
     626                 :             : 
     627                 :             : retry:
     628         [ #  # ]:           0 :         Assert(MyReplicationSlot == NULL);
     629                 :             : 
     630                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     631                 :             : 
     632                 :             :         /* Check if the slot exists with the given name. */
     633                 :           0 :         s = SearchNamedReplicationSlot(name, false);
     634         [ #  # ]:           0 :         if (s == NULL || !s->in_use)
     635                 :             :         {
     636                 :           0 :                 LWLockRelease(ReplicationSlotControlLock);
     637                 :             : 
     638   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     639                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     640                 :             :                                  errmsg("replication slot \"%s\" does not exist",
     641                 :             :                                                 name)));
     642                 :           0 :         }
     643                 :             : 
     644                 :             :         /*
     645                 :             :          * Do not allow users to acquire the reserved slot. This scenario may
     646                 :             :          * occur if the launcher that owns the slot has terminated unexpectedly
     647                 :             :          * due to an error, and a backend process attempts to reuse the slot.
     648                 :             :          */
     649   [ #  #  #  # ]:           0 :         if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
     650   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     651                 :             :                                 errcode(ERRCODE_UNDEFINED_OBJECT),
     652                 :             :                                 errmsg("cannot acquire replication slot \"%s\"", name),
     653                 :             :                                 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
     654                 :             : 
     655                 :             :         /*
     656                 :             :          * This is the slot we want; check if it's active under some other
     657                 :             :          * process.  In single user mode, we don't need this check.
     658                 :             :          */
     659         [ #  # ]:           0 :         if (IsUnderPostmaster)
     660                 :             :         {
     661                 :             :                 /*
     662                 :             :                  * Get ready to sleep on the slot in case it is active.  (We may end
     663                 :             :                  * up not sleeping, but we don't want to do this while holding the
     664                 :             :                  * spinlock.)
     665                 :             :                  */
     666         [ #  # ]:           0 :                 if (!nowait)
     667                 :           0 :                         ConditionVariablePrepareToSleep(&s->active_cv);
     668                 :             : 
     669                 :             :                 /*
     670                 :             :                  * It is important to reset the inactive_since under spinlock here to
     671                 :             :                  * avoid race conditions with slot invalidation. See comments related
     672                 :             :                  * to inactive_since in InvalidatePossiblyObsoleteSlot.
     673                 :             :                  */
     674         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
     675         [ #  # ]:           0 :                 if (s->active_pid == 0)
     676                 :           0 :                         s->active_pid = MyProcPid;
     677                 :           0 :                 active_pid = s->active_pid;
     678                 :           0 :                 ReplicationSlotSetInactiveSince(s, 0, false);
     679                 :           0 :                 SpinLockRelease(&s->mutex);
     680                 :           0 :         }
     681                 :             :         else
     682                 :             :         {
     683                 :           0 :                 s->active_pid = active_pid = MyProcPid;
     684                 :           0 :                 ReplicationSlotSetInactiveSince(s, 0, true);
     685                 :             :         }
     686                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
     687                 :             : 
     688                 :             :         /*
     689                 :             :          * If we found the slot but it's already active in another process, we
     690                 :             :          * wait until the owning process signals us that it's been released, or
     691                 :             :          * error out.
     692                 :             :          */
     693         [ #  # ]:           0 :         if (active_pid != MyProcPid)
     694                 :             :         {
     695         [ #  # ]:           0 :                 if (!nowait)
     696                 :             :                 {
     697                 :             :                         /* Wait here until we get signaled, and then restart */
     698                 :           0 :                         ConditionVariableSleep(&s->active_cv,
     699                 :             :                                                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     700                 :           0 :                         ConditionVariableCancelSleep();
     701                 :           0 :                         goto retry;
     702                 :             :                 }
     703                 :             : 
     704   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     705                 :             :                                 (errcode(ERRCODE_OBJECT_IN_USE),
     706                 :             :                                  errmsg("replication slot \"%s\" is active for PID %d",
     707                 :             :                                                 NameStr(s->data.name), active_pid)));
     708                 :           0 :         }
     709         [ #  # ]:           0 :         else if (!nowait)
     710                 :           0 :                 ConditionVariableCancelSleep(); /* no sleep needed after all */
     711                 :             : 
     712                 :             :         /* We made this slot active, so it's ours now. */
     713                 :           0 :         MyReplicationSlot = s;
     714                 :             : 
     715                 :             :         /*
     716                 :             :          * We need to check for invalidation after making the slot ours to avoid
     717                 :             :          * the possible race condition with the checkpointer that can otherwise
     718                 :             :          * invalidate the slot immediately after the check.
     719                 :             :          */
     720   [ #  #  #  # ]:           0 :         if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
     721   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     722                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     723                 :             :                                 errmsg("can no longer access replication slot \"%s\"",
     724                 :             :                                            NameStr(s->data.name)),
     725                 :             :                                 errdetail("This replication slot has been invalidated due to \"%s\".",
     726                 :             :                                                   GetSlotInvalidationCauseName(s->data.invalidated)));
     727                 :             : 
     728                 :             :         /* Let everybody know we've modified this slot */
     729                 :           0 :         ConditionVariableBroadcast(&s->active_cv);
     730                 :             : 
     731                 :             :         /*
     732                 :             :          * The call to pgstat_acquire_replslot() protects against stats for a
     733                 :             :          * different slot, from before a restart or such, being present during
     734                 :             :          * pgstat_report_replslot().
     735                 :             :          */
     736         [ #  # ]:           0 :         if (SlotIsLogical(s))
     737                 :           0 :                 pgstat_acquire_replslot(s);
     738                 :             : 
     739                 :             : 
     740         [ #  # ]:           0 :         if (am_walsender)
     741                 :             :         {
     742   [ #  #  #  #  :           0 :                 ereport(log_replication_commands ? LOG : DEBUG1,
          #  #  #  #  #  
                #  #  # ]
     743                 :             :                                 SlotIsLogical(s)
     744                 :             :                                 ? errmsg("acquired logical replication slot \"%s\"",
     745                 :             :                                                  NameStr(s->data.name))
     746                 :             :                                 : errmsg("acquired physical replication slot \"%s\"",
     747                 :             :                                                  NameStr(s->data.name)));
     748                 :           0 :         }
     749                 :           0 : }
     750                 :             : 
     751                 :             : /*
     752                 :             :  * Release the replication slot that this backend considers to own.
     753                 :             :  *
     754                 :             :  * This or another backend can re-acquire the slot later.
     755                 :             :  * Resources this slot requires will be preserved.
     756                 :             :  */
     757                 :             : void
     758                 :           0 : ReplicationSlotRelease(void)
     759                 :             : {
     760                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
     761                 :           0 :         char       *slotname = NULL;    /* keep compiler quiet */
     762                 :           0 :         bool            is_logical;
     763                 :           0 :         TimestampTz now = 0;
     764                 :             : 
     765         [ #  # ]:           0 :         Assert(slot != NULL && slot->active_pid != 0);
     766                 :             : 
     767                 :           0 :         is_logical = SlotIsLogical(slot);
     768                 :             : 
     769         [ #  # ]:           0 :         if (am_walsender)
     770                 :           0 :                 slotname = pstrdup(NameStr(slot->data.name));
     771                 :             : 
     772         [ #  # ]:           0 :         if (slot->data.persistency == RS_EPHEMERAL)
     773                 :             :         {
     774                 :             :                 /*
     775                 :             :                  * Delete the slot. There is no !PANIC case where this is allowed to
     776                 :             :                  * fail, all that may happen is an incomplete cleanup of the on-disk
     777                 :             :                  * data.
     778                 :             :                  */
     779                 :           0 :                 ReplicationSlotDropAcquired();
     780                 :             : 
     781                 :             :                 /*
     782                 :             :                  * Request to disable logical decoding, even though this slot may not
     783                 :             :                  * have been the last logical slot. The checkpointer will verify if
     784                 :             :                  * logical decoding should actually be disabled.
     785                 :             :                  */
     786         [ #  # ]:           0 :                 if (is_logical)
     787                 :           0 :                         RequestDisableLogicalDecoding();
     788                 :           0 :         }
     789                 :             : 
     790                 :             :         /*
     791                 :             :          * If slot needed to temporarily restrain both data and catalog xmin to
     792                 :             :          * create the catalog snapshot, remove that temporary constraint.
     793                 :             :          * Snapshots can only be exported while the initial snapshot is still
     794                 :             :          * acquired.
     795                 :             :          */
     796   [ #  #  #  # ]:           0 :         if (!TransactionIdIsValid(slot->data.xmin) &&
     797                 :           0 :                 TransactionIdIsValid(slot->effective_xmin))
     798                 :             :         {
     799         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     800                 :           0 :                 slot->effective_xmin = InvalidTransactionId;
     801                 :           0 :                 SpinLockRelease(&slot->mutex);
     802                 :           0 :                 ReplicationSlotsComputeRequiredXmin(false);
     803                 :           0 :         }
     804                 :             : 
     805                 :             :         /*
     806                 :             :          * Set the time since the slot has become inactive. We get the current
     807                 :             :          * time beforehand to avoid system call while holding the spinlock.
     808                 :             :          */
     809                 :           0 :         now = GetCurrentTimestamp();
     810                 :             : 
     811         [ #  # ]:           0 :         if (slot->data.persistency == RS_PERSISTENT)
     812                 :             :         {
     813                 :             :                 /*
     814                 :             :                  * Mark persistent slot inactive.  We're not freeing it, just
     815                 :             :                  * disconnecting, but wake up others that may be waiting for it.
     816                 :             :                  */
     817         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
     818                 :           0 :                 slot->active_pid = 0;
     819                 :           0 :                 ReplicationSlotSetInactiveSince(slot, now, false);
     820                 :           0 :                 SpinLockRelease(&slot->mutex);
     821                 :           0 :                 ConditionVariableBroadcast(&slot->active_cv);
     822                 :           0 :         }
     823                 :             :         else
     824                 :           0 :                 ReplicationSlotSetInactiveSince(slot, now, true);
     825                 :             : 
     826                 :           0 :         MyReplicationSlot = NULL;
     827                 :             : 
     828                 :             :         /* might not have been set when we've been a plain slot */
     829                 :           0 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     830                 :           0 :         MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     831                 :           0 :         ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     832                 :           0 :         LWLockRelease(ProcArrayLock);
     833                 :             : 
     834         [ #  # ]:           0 :         if (am_walsender)
     835                 :             :         {
     836   [ #  #  #  #  :           0 :                 ereport(log_replication_commands ? LOG : DEBUG1,
          #  #  #  #  #  
                #  #  # ]
     837                 :             :                                 is_logical
     838                 :             :                                 ? errmsg("released logical replication slot \"%s\"",
     839                 :             :                                                  slotname)
     840                 :             :                                 : errmsg("released physical replication slot \"%s\"",
     841                 :             :                                                  slotname));
     842                 :             : 
     843                 :           0 :                 pfree(slotname);
     844                 :           0 :         }
     845                 :           0 : }
     846                 :             : 
     847                 :             : /*
     848                 :             :  * Cleanup temporary slots created in current session.
     849                 :             :  *
     850                 :             :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     851                 :             :  * cleanup all temporary slots.
     852                 :             :  *
     853                 :             :  * If it drops the last logical slot in the cluster, requests to disable
     854                 :             :  * logical decoding.
     855                 :             :  */
     856                 :             : void
     857                 :        7569 : ReplicationSlotCleanup(bool synced_only)
     858                 :             : {
     859                 :        7569 :         int                     i;
     860                 :        7569 :         bool            found_valid_logicalslot;
     861                 :        7569 :         bool            dropped_logical = false;
     862                 :             : 
     863         [ +  - ]:        7569 :         Assert(MyReplicationSlot == NULL);
     864                 :             : 
     865                 :             : restart:
     866                 :        7569 :         found_valid_logicalslot = false;
     867                 :        7569 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     868         [ +  + ]:       83259 :         for (i = 0; i < max_replication_slots; i++)
     869                 :             :         {
     870                 :       75690 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     871                 :             : 
     872         [ -  + ]:       75690 :                 if (!s->in_use)
     873                 :       75690 :                         continue;
     874                 :             : 
     875         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
     876                 :             : 
     877                 :           0 :                 found_valid_logicalslot |=
     878         [ #  # ]:           0 :                         (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
     879                 :             : 
     880   [ #  #  #  # ]:           0 :                 if ((s->active_pid == MyProcPid &&
     881         [ #  # ]:           0 :                          (!synced_only || s->data.synced)))
     882                 :             :                 {
     883         [ #  # ]:           0 :                         Assert(s->data.persistency == RS_TEMPORARY);
     884                 :           0 :                         SpinLockRelease(&s->mutex);
     885                 :           0 :                         LWLockRelease(ReplicationSlotControlLock);      /* avoid deadlock */
     886                 :             : 
     887         [ #  # ]:           0 :                         if (SlotIsLogical(s))
     888                 :           0 :                                 dropped_logical = true;
     889                 :             : 
     890                 :           0 :                         ReplicationSlotDropPtr(s);
     891                 :             : 
     892                 :           0 :                         ConditionVariableBroadcast(&s->active_cv);
     893                 :           0 :                         goto restart;
     894                 :             :                 }
     895                 :             :                 else
     896                 :           0 :                         SpinLockRelease(&s->mutex);
     897   [ -  +  -  - ]:       75690 :         }
     898                 :             : 
     899                 :        7569 :         LWLockRelease(ReplicationSlotControlLock);
     900                 :             : 
     901   [ -  +  #  # ]:        7569 :         if (dropped_logical && !found_valid_logicalslot)
     902                 :           0 :                 RequestDisableLogicalDecoding();
     903                 :        7569 : }
     904                 :             : 
     905                 :             : /*
     906                 :             :  * Permanently drop replication slot identified by the passed in name.
     907                 :             :  */
     908                 :             : void
     909                 :           0 : ReplicationSlotDrop(const char *name, bool nowait)
     910                 :             : {
     911                 :           0 :         bool            is_logical;
     912                 :             : 
     913         [ #  # ]:           0 :         Assert(MyReplicationSlot == NULL);
     914                 :             : 
     915                 :           0 :         ReplicationSlotAcquire(name, nowait, false);
     916                 :             : 
     917                 :             :         /*
     918                 :             :          * Do not allow users to drop the slots which are currently being synced
     919                 :             :          * from the primary to the standby.
     920                 :             :          */
     921   [ #  #  #  # ]:           0 :         if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     922   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     923                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     924                 :             :                                 errmsg("cannot drop replication slot \"%s\"", name),
     925                 :             :                                 errdetail("This replication slot is being synchronized from the primary server."));
     926                 :             : 
     927                 :           0 :         is_logical = SlotIsLogical(MyReplicationSlot);
     928                 :             : 
     929                 :           0 :         ReplicationSlotDropAcquired();
     930                 :             : 
     931         [ #  # ]:           0 :         if (is_logical)
     932                 :           0 :                 RequestDisableLogicalDecoding();
     933                 :           0 : }
     934                 :             : 
     935                 :             : /*
     936                 :             :  * Change the definition of the slot identified by the specified name.
     937                 :             :  *
     938                 :             :  * Altering the two_phase property of a slot requires caution on the
     939                 :             :  * client-side. Enabling it at any random point during decoding has the
     940                 :             :  * risk that transactions prepared before this change may be skipped by
     941                 :             :  * the decoder, leading to missing prepare records on the client. So, we
     942                 :             :  * enable it for subscription related slots only once the initial tablesync
     943                 :             :  * is finished. See comments atop worker.c. Disabling it is safe only when
     944                 :             :  * there are no pending prepared transaction, otherwise, the changes of
     945                 :             :  * already prepared transactions can be replicated again along with their
     946                 :             :  * corresponding commit leading to duplicate data or errors.
     947                 :             :  */
     948                 :             : void
     949                 :           0 : ReplicationSlotAlter(const char *name, const bool *failover,
     950                 :             :                                          const bool *two_phase)
     951                 :             : {
     952                 :           0 :         bool            update_slot = false;
     953                 :             : 
     954         [ #  # ]:           0 :         Assert(MyReplicationSlot == NULL);
     955   [ #  #  #  # ]:           0 :         Assert(failover || two_phase);
     956                 :             : 
     957                 :           0 :         ReplicationSlotAcquire(name, false, true);
     958                 :             : 
     959         [ #  # ]:           0 :         if (SlotIsPhysical(MyReplicationSlot))
     960   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     961                 :             :                                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     962                 :             :                                 errmsg("cannot use %s with a physical replication slot",
     963                 :             :                                            "ALTER_REPLICATION_SLOT"));
     964                 :             : 
     965         [ #  # ]:           0 :         if (RecoveryInProgress())
     966                 :             :         {
     967                 :             :                 /*
     968                 :             :                  * Do not allow users to alter the slots which are currently being
     969                 :             :                  * synced from the primary to the standby.
     970                 :             :                  */
     971         [ #  # ]:           0 :                 if (MyReplicationSlot->data.synced)
     972   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     973                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     974                 :             :                                         errmsg("cannot alter replication slot \"%s\"", name),
     975                 :             :                                         errdetail("This replication slot is being synchronized from the primary server."));
     976                 :             : 
     977                 :             :                 /*
     978                 :             :                  * Do not allow users to enable failover on the standby as we do not
     979                 :             :                  * support sync to the cascading standby.
     980                 :             :                  */
     981   [ #  #  #  # ]:           0 :                 if (failover && *failover)
     982   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     983                 :             :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     984                 :             :                                         errmsg("cannot enable failover for a replication slot"
     985                 :             :                                                    " on the standby"));
     986                 :           0 :         }
     987                 :             : 
     988         [ #  # ]:           0 :         if (failover)
     989                 :             :         {
     990                 :             :                 /*
     991                 :             :                  * Do not allow users to enable failover for temporary slots as we do
     992                 :             :                  * not support syncing temporary slots to the standby.
     993                 :             :                  */
     994   [ #  #  #  # ]:           0 :                 if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     995   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     996                 :             :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     997                 :             :                                         errmsg("cannot enable failover for a temporary replication slot"));
     998                 :             : 
     999         [ #  # ]:           0 :                 if (MyReplicationSlot->data.failover != *failover)
    1000                 :             :                 {
    1001         [ #  # ]:           0 :                         SpinLockAcquire(&MyReplicationSlot->mutex);
    1002                 :           0 :                         MyReplicationSlot->data.failover = *failover;
    1003                 :           0 :                         SpinLockRelease(&MyReplicationSlot->mutex);
    1004                 :             : 
    1005                 :           0 :                         update_slot = true;
    1006                 :           0 :                 }
    1007                 :           0 :         }
    1008                 :             : 
    1009   [ #  #  #  # ]:           0 :         if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
    1010                 :             :         {
    1011         [ #  # ]:           0 :                 SpinLockAcquire(&MyReplicationSlot->mutex);
    1012                 :           0 :                 MyReplicationSlot->data.two_phase = *two_phase;
    1013                 :           0 :                 SpinLockRelease(&MyReplicationSlot->mutex);
    1014                 :             : 
    1015                 :           0 :                 update_slot = true;
    1016                 :           0 :         }
    1017                 :             : 
    1018         [ #  # ]:           0 :         if (update_slot)
    1019                 :             :         {
    1020                 :           0 :                 ReplicationSlotMarkDirty();
    1021                 :           0 :                 ReplicationSlotSave();
    1022                 :           0 :         }
    1023                 :             : 
    1024                 :           0 :         ReplicationSlotRelease();
    1025                 :           0 : }
    1026                 :             : 
    1027                 :             : /*
    1028                 :             :  * Permanently drop the currently acquired replication slot.
    1029                 :             :  */
    1030                 :             : void
    1031                 :           0 : ReplicationSlotDropAcquired(void)
    1032                 :             : {
    1033                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
    1034                 :             : 
    1035         [ #  # ]:           0 :         Assert(MyReplicationSlot != NULL);
    1036                 :             : 
    1037                 :             :         /* slot isn't acquired anymore */
    1038                 :           0 :         MyReplicationSlot = NULL;
    1039                 :             : 
    1040                 :           0 :         ReplicationSlotDropPtr(slot);
    1041                 :           0 : }
    1042                 :             : 
    1043                 :             : /*
    1044                 :             :  * Permanently drop the replication slot which will be released by the point
    1045                 :             :  * this function returns.
    1046                 :             :  */
    1047                 :             : static void
    1048                 :           0 : ReplicationSlotDropPtr(ReplicationSlot *slot)
    1049                 :             : {
    1050                 :           0 :         char            path[MAXPGPATH];
    1051                 :           0 :         char            tmppath[MAXPGPATH];
    1052                 :             : 
    1053                 :             :         /*
    1054                 :             :          * If some other backend ran this code concurrently with us, we might try
    1055                 :             :          * to delete a slot with a certain name while someone else was trying to
    1056                 :             :          * create a slot with the same name.
    1057                 :             :          */
    1058                 :           0 :         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1059                 :             : 
    1060                 :             :         /* Generate pathnames. */
    1061                 :           0 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1062                 :           0 :         sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1063                 :             : 
    1064                 :             :         /*
    1065                 :             :          * Rename the slot directory on disk, so that we'll no longer recognize
    1066                 :             :          * this as a valid slot.  Note that if this fails, we've got to mark the
    1067                 :             :          * slot inactive before bailing out.  If we're dropping an ephemeral or a
    1068                 :             :          * temporary slot, we better never fail hard as the caller won't expect
    1069                 :             :          * the slot to survive and this might get called during error handling.
    1070                 :             :          */
    1071         [ #  # ]:           0 :         if (rename(path, tmppath) == 0)
    1072                 :             :         {
    1073                 :             :                 /*
    1074                 :             :                  * We need to fsync() the directory we just renamed and its parent to
    1075                 :             :                  * make sure that our changes are on disk in a crash-safe fashion.  If
    1076                 :             :                  * fsync() fails, we can't be sure whether the changes are on disk or
    1077                 :             :                  * not.  For now, we handle that by panicking;
    1078                 :             :                  * StartupReplicationSlots() will try to straighten it out after
    1079                 :             :                  * restart.
    1080                 :             :                  */
    1081                 :           0 :                 START_CRIT_SECTION();
    1082                 :           0 :                 fsync_fname(tmppath, true);
    1083                 :           0 :                 fsync_fname(PG_REPLSLOT_DIR, true);
    1084         [ #  # ]:           0 :                 END_CRIT_SECTION();
    1085                 :           0 :         }
    1086                 :             :         else
    1087                 :             :         {
    1088                 :           0 :                 bool            fail_softly = slot->data.persistency != RS_PERSISTENT;
    1089                 :             : 
    1090         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
    1091                 :           0 :                 slot->active_pid = 0;
    1092                 :           0 :                 SpinLockRelease(&slot->mutex);
    1093                 :             : 
    1094                 :             :                 /* wake up anyone waiting on this slot */
    1095                 :           0 :                 ConditionVariableBroadcast(&slot->active_cv);
    1096                 :             : 
    1097   [ #  #  #  #  :           0 :                 ereport(fail_softly ? WARNING : ERROR,
          #  #  #  #  #  
                      # ]
    1098                 :             :                                 (errcode_for_file_access(),
    1099                 :             :                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1100                 :             :                                                 path, tmppath)));
    1101                 :           0 :         }
    1102                 :             : 
    1103                 :             :         /*
    1104                 :             :          * The slot is definitely gone.  Lock out concurrent scans of the array
    1105                 :             :          * long enough to kill it.  It's OK to clear the active PID here without
    1106                 :             :          * grabbing the mutex because nobody else can be scanning the array here,
    1107                 :             :          * and nobody can be attached to this slot and thus access it without
    1108                 :             :          * scanning the array.
    1109                 :             :          *
    1110                 :             :          * Also wake up processes waiting for it.
    1111                 :             :          */
    1112                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1113                 :           0 :         slot->active_pid = 0;
    1114                 :           0 :         slot->in_use = false;
    1115                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
    1116                 :           0 :         ConditionVariableBroadcast(&slot->active_cv);
    1117                 :             : 
    1118                 :             :         /*
    1119                 :             :          * Slot is dead and doesn't prevent resource removal anymore, recompute
    1120                 :             :          * limits.
    1121                 :             :          */
    1122                 :           0 :         ReplicationSlotsComputeRequiredXmin(false);
    1123                 :           0 :         ReplicationSlotsComputeRequiredLSN();
    1124                 :             : 
    1125                 :             :         /*
    1126                 :             :          * If removing the directory fails, the worst thing that will happen is
    1127                 :             :          * that the user won't be able to create a new slot with the same name
    1128                 :             :          * until the next server restart.  We warn about it, but that's all.
    1129                 :             :          */
    1130         [ #  # ]:           0 :         if (!rmtree(tmppath, true))
    1131   [ #  #  #  # ]:           0 :                 ereport(WARNING,
    1132                 :             :                                 (errmsg("could not remove directory \"%s\"", tmppath)));
    1133                 :             : 
    1134                 :             :         /*
    1135                 :             :          * Drop the statistics entry for the replication slot.  Do this while
    1136                 :             :          * holding ReplicationSlotAllocationLock so that we don't drop a
    1137                 :             :          * statistics entry for another slot with the same name just created in
    1138                 :             :          * another session.
    1139                 :             :          */
    1140         [ #  # ]:           0 :         if (SlotIsLogical(slot))
    1141                 :           0 :                 pgstat_drop_replslot(slot);
    1142                 :             : 
    1143                 :             :         /*
    1144                 :             :          * We release this at the very end, so that nobody starts trying to create
    1145                 :             :          * a slot while we're still cleaning up the detritus of the old one.
    1146                 :             :          */
    1147                 :           0 :         LWLockRelease(ReplicationSlotAllocationLock);
    1148                 :           0 : }
    1149                 :             : 
    1150                 :             : /*
    1151                 :             :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1152                 :             :  * guaranteeing the current state will survive a crash.
    1153                 :             :  */
    1154                 :             : void
    1155                 :           0 : ReplicationSlotSave(void)
    1156                 :             : {
    1157                 :           0 :         char            path[MAXPGPATH];
    1158                 :             : 
    1159         [ #  # ]:           0 :         Assert(MyReplicationSlot != NULL);
    1160                 :             : 
    1161                 :           0 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1162                 :           0 :         SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1163                 :           0 : }
    1164                 :             : 
    1165                 :             : /*
    1166                 :             :  * Signal that it would be useful if the currently acquired slot would be
    1167                 :             :  * flushed out to disk.
    1168                 :             :  *
    1169                 :             :  * Note that the actual flush to disk can be delayed for a long time, if
    1170                 :             :  * required for correctness explicitly do a ReplicationSlotSave().
    1171                 :             :  */
    1172                 :             : void
    1173                 :           0 : ReplicationSlotMarkDirty(void)
    1174                 :             : {
    1175                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
    1176                 :             : 
    1177         [ #  # ]:           0 :         Assert(MyReplicationSlot != NULL);
    1178                 :             : 
    1179         [ #  # ]:           0 :         SpinLockAcquire(&slot->mutex);
    1180                 :           0 :         MyReplicationSlot->just_dirtied = true;
    1181                 :           0 :         MyReplicationSlot->dirty = true;
    1182                 :           0 :         SpinLockRelease(&slot->mutex);
    1183                 :           0 : }
    1184                 :             : 
    1185                 :             : /*
    1186                 :             :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1187                 :             :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1188                 :             :  */
    1189                 :             : void
    1190                 :           0 : ReplicationSlotPersist(void)
    1191                 :             : {
    1192                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
    1193                 :             : 
    1194         [ #  # ]:           0 :         Assert(slot != NULL);
    1195         [ #  # ]:           0 :         Assert(slot->data.persistency != RS_PERSISTENT);
    1196                 :             : 
    1197         [ #  # ]:           0 :         SpinLockAcquire(&slot->mutex);
    1198                 :           0 :         slot->data.persistency = RS_PERSISTENT;
    1199                 :           0 :         SpinLockRelease(&slot->mutex);
    1200                 :             : 
    1201                 :           0 :         ReplicationSlotMarkDirty();
    1202                 :           0 :         ReplicationSlotSave();
    1203                 :           0 : }
    1204                 :             : 
    1205                 :             : /*
    1206                 :             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1207                 :             :  *
    1208                 :             :  * If already_locked is true, both the ReplicationSlotControlLock and the
    1209                 :             :  * ProcArrayLock have already been acquired exclusively. It is crucial that the
    1210                 :             :  * caller first acquires the ReplicationSlotControlLock, followed by the
    1211                 :             :  * ProcArrayLock, to prevent any undetectable deadlocks since this function
    1212                 :             :  * acquires them in that order.
    1213                 :             :  */
    1214                 :             : void
    1215                 :           4 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1216                 :             : {
    1217                 :           4 :         int                     i;
    1218                 :           4 :         TransactionId agg_xmin = InvalidTransactionId;
    1219                 :           4 :         TransactionId agg_catalog_xmin = InvalidTransactionId;
    1220                 :             : 
    1221         [ +  - ]:           4 :         Assert(ReplicationSlotCtl != NULL);
    1222   [ +  -  #  # ]:           4 :         Assert(!already_locked ||
    1223                 :             :                    (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
    1224                 :             :                         LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
    1225                 :             : 
    1226                 :             :         /*
    1227                 :             :          * Hold the ReplicationSlotControlLock until after updating the slot xmin
    1228                 :             :          * values, so no backend updates the initial xmin for newly created slot
    1229                 :             :          * concurrently. A shared lock is used here to minimize lock contention,
    1230                 :             :          * especially when many slots exist and advancements occur frequently.
    1231                 :             :          * This is safe since an exclusive lock is taken during initial slot xmin
    1232                 :             :          * update in slot creation.
    1233                 :             :          *
    1234                 :             :          * One might think that we can hold the ProcArrayLock exclusively and
    1235                 :             :          * update the slot xmin values, but it could increase lock contention on
    1236                 :             :          * the ProcArrayLock, which is not great since this function can be called
    1237                 :             :          * at non-negligible frequency.
    1238                 :             :          *
    1239                 :             :          * Concurrent invocation of this function may cause the computed slot xmin
    1240                 :             :          * to regress. However, this is harmless because tuples prior to the most
    1241                 :             :          * recent xmin are no longer useful once advancement occurs (see
    1242                 :             :          * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
    1243                 :             :          * before updating the effective_xmin). Thus, such regression merely
    1244                 :             :          * prevents VACUUM from prematurely removing tuples without causing the
    1245                 :             :          * early deletion of required data.
    1246                 :             :          */
    1247         [ -  + ]:           4 :         if (!already_locked)
    1248                 :           4 :                 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1249                 :             : 
    1250         [ +  + ]:          44 :         for (i = 0; i < max_replication_slots; i++)
    1251                 :             :         {
    1252                 :          40 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1253                 :          40 :                 TransactionId effective_xmin;
    1254                 :          40 :                 TransactionId effective_catalog_xmin;
    1255                 :          40 :                 bool            invalidated;
    1256                 :             : 
    1257         [ -  + ]:          40 :                 if (!s->in_use)
    1258                 :          40 :                         continue;
    1259                 :             : 
    1260         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    1261                 :           0 :                 effective_xmin = s->effective_xmin;
    1262                 :           0 :                 effective_catalog_xmin = s->effective_catalog_xmin;
    1263                 :           0 :                 invalidated = s->data.invalidated != RS_INVAL_NONE;
    1264                 :           0 :                 SpinLockRelease(&s->mutex);
    1265                 :             : 
    1266                 :             :                 /* invalidated slots need not apply */
    1267         [ #  # ]:           0 :                 if (invalidated)
    1268                 :           0 :                         continue;
    1269                 :             : 
    1270                 :             :                 /* check the data xmin */
    1271   [ #  #  #  # ]:           0 :                 if (TransactionIdIsValid(effective_xmin) &&
    1272         [ #  # ]:           0 :                         (!TransactionIdIsValid(agg_xmin) ||
    1273                 :           0 :                          TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1274                 :           0 :                         agg_xmin = effective_xmin;
    1275                 :             : 
    1276                 :             :                 /* check the catalog xmin */
    1277   [ #  #  #  # ]:           0 :                 if (TransactionIdIsValid(effective_catalog_xmin) &&
    1278         [ #  # ]:           0 :                         (!TransactionIdIsValid(agg_catalog_xmin) ||
    1279                 :           0 :                          TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1280                 :           0 :                         agg_catalog_xmin = effective_catalog_xmin;
    1281      [ -  +  - ]:          40 :         }
    1282                 :             : 
    1283                 :           4 :         ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1284                 :             : 
    1285         [ -  + ]:           4 :         if (!already_locked)
    1286                 :           4 :                 LWLockRelease(ReplicationSlotControlLock);
    1287                 :           4 : }
    1288                 :             : 
    1289                 :             : /*
    1290                 :             :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1291                 :             :  *
    1292                 :             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1293                 :             :  * purpose, we don't try to account for that, because this module doesn't
    1294                 :             :  * know what to compare against.
    1295                 :             :  */
    1296                 :             : void
    1297                 :           4 : ReplicationSlotsComputeRequiredLSN(void)
    1298                 :             : {
    1299                 :           4 :         int                     i;
    1300                 :           4 :         XLogRecPtr      min_required = InvalidXLogRecPtr;
    1301                 :             : 
    1302         [ +  - ]:           4 :         Assert(ReplicationSlotCtl != NULL);
    1303                 :             : 
    1304                 :           4 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1305         [ +  + ]:          44 :         for (i = 0; i < max_replication_slots; i++)
    1306                 :             :         {
    1307                 :          40 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1308                 :          40 :                 XLogRecPtr      restart_lsn;
    1309                 :          40 :                 XLogRecPtr      last_saved_restart_lsn;
    1310                 :          40 :                 bool            invalidated;
    1311                 :          40 :                 ReplicationSlotPersistency persistency;
    1312                 :             : 
    1313         [ -  + ]:          40 :                 if (!s->in_use)
    1314                 :          40 :                         continue;
    1315                 :             : 
    1316         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    1317                 :           0 :                 persistency = s->data.persistency;
    1318                 :           0 :                 restart_lsn = s->data.restart_lsn;
    1319                 :           0 :                 invalidated = s->data.invalidated != RS_INVAL_NONE;
    1320                 :           0 :                 last_saved_restart_lsn = s->last_saved_restart_lsn;
    1321                 :           0 :                 SpinLockRelease(&s->mutex);
    1322                 :             : 
    1323                 :             :                 /* invalidated slots need not apply */
    1324         [ #  # ]:           0 :                 if (invalidated)
    1325                 :           0 :                         continue;
    1326                 :             : 
    1327                 :             :                 /*
    1328                 :             :                  * For persistent slot use last_saved_restart_lsn to compute the
    1329                 :             :                  * oldest LSN for removal of WAL segments.  The segments between
    1330                 :             :                  * last_saved_restart_lsn and restart_lsn might be needed by a
    1331                 :             :                  * persistent slot in the case of database crash.  Non-persistent
    1332                 :             :                  * slots can't survive the database crash, so we don't care about
    1333                 :             :                  * last_saved_restart_lsn for them.
    1334                 :             :                  */
    1335         [ #  # ]:           0 :                 if (persistency == RS_PERSISTENT)
    1336                 :             :                 {
    1337   [ #  #  #  # ]:           0 :                         if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1338                 :           0 :                                 restart_lsn > last_saved_restart_lsn)
    1339                 :             :                         {
    1340                 :           0 :                                 restart_lsn = last_saved_restart_lsn;
    1341                 :           0 :                         }
    1342                 :           0 :                 }
    1343                 :             : 
    1344   [ #  #  #  # ]:           0 :                 if (XLogRecPtrIsValid(restart_lsn) &&
    1345         [ #  # ]:           0 :                         (!XLogRecPtrIsValid(min_required) ||
    1346                 :           0 :                          restart_lsn < min_required))
    1347                 :           0 :                         min_required = restart_lsn;
    1348      [ -  +  - ]:          40 :         }
    1349                 :           4 :         LWLockRelease(ReplicationSlotControlLock);
    1350                 :             : 
    1351                 :           4 :         XLogSetReplicationSlotMinimumLSN(min_required);
    1352                 :           4 : }
    1353                 :             : 
    1354                 :             : /*
    1355                 :             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1356                 :             :  *
    1357                 :             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1358                 :             :  * slots exist.
    1359                 :             :  *
    1360                 :             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1361                 :             :  * ignores physical replication slots.
    1362                 :             :  *
    1363                 :             :  * The results aren't required frequently, so we don't maintain a precomputed
    1364                 :             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1365                 :             :  */
    1366                 :             : XLogRecPtr
    1367                 :          14 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1368                 :             : {
    1369                 :          14 :         XLogRecPtr      result = InvalidXLogRecPtr;
    1370                 :          14 :         int                     i;
    1371                 :             : 
    1372         [ -  + ]:          14 :         if (max_replication_slots <= 0)
    1373                 :           0 :                 return InvalidXLogRecPtr;
    1374                 :             : 
    1375                 :          14 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1376                 :             : 
    1377         [ +  + ]:         154 :         for (i = 0; i < max_replication_slots; i++)
    1378                 :             :         {
    1379                 :         140 :                 ReplicationSlot *s;
    1380                 :         140 :                 XLogRecPtr      restart_lsn;
    1381                 :         140 :                 XLogRecPtr      last_saved_restart_lsn;
    1382                 :         140 :                 bool            invalidated;
    1383                 :         140 :                 ReplicationSlotPersistency persistency;
    1384                 :             : 
    1385                 :         140 :                 s = &ReplicationSlotCtl->replication_slots[i];
    1386                 :             : 
    1387                 :             :                 /* cannot change while ReplicationSlotCtlLock is held */
    1388         [ -  + ]:         140 :                 if (!s->in_use)
    1389                 :         140 :                         continue;
    1390                 :             : 
    1391                 :             :                 /* we're only interested in logical slots */
    1392         [ #  # ]:           0 :                 if (!SlotIsLogical(s))
    1393                 :           0 :                         continue;
    1394                 :             : 
    1395                 :             :                 /* read once, it's ok if it increases while we're checking */
    1396         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    1397                 :           0 :                 persistency = s->data.persistency;
    1398                 :           0 :                 restart_lsn = s->data.restart_lsn;
    1399                 :           0 :                 invalidated = s->data.invalidated != RS_INVAL_NONE;
    1400                 :           0 :                 last_saved_restart_lsn = s->last_saved_restart_lsn;
    1401                 :           0 :                 SpinLockRelease(&s->mutex);
    1402                 :             : 
    1403                 :             :                 /* invalidated slots need not apply */
    1404         [ #  # ]:           0 :                 if (invalidated)
    1405                 :           0 :                         continue;
    1406                 :             : 
    1407                 :             :                 /*
    1408                 :             :                  * For persistent slot use last_saved_restart_lsn to compute the
    1409                 :             :                  * oldest LSN for removal of WAL segments.  The segments between
    1410                 :             :                  * last_saved_restart_lsn and restart_lsn might be needed by a
    1411                 :             :                  * persistent slot in the case of database crash.  Non-persistent
    1412                 :             :                  * slots can't survive the database crash, so we don't care about
    1413                 :             :                  * last_saved_restart_lsn for them.
    1414                 :             :                  */
    1415         [ #  # ]:           0 :                 if (persistency == RS_PERSISTENT)
    1416                 :             :                 {
    1417   [ #  #  #  # ]:           0 :                         if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1418                 :           0 :                                 restart_lsn > last_saved_restart_lsn)
    1419                 :             :                         {
    1420                 :           0 :                                 restart_lsn = last_saved_restart_lsn;
    1421                 :           0 :                         }
    1422                 :           0 :                 }
    1423                 :             : 
    1424         [ #  # ]:           0 :                 if (!XLogRecPtrIsValid(restart_lsn))
    1425                 :           0 :                         continue;
    1426                 :             : 
    1427   [ #  #  #  # ]:           0 :                 if (!XLogRecPtrIsValid(result) ||
    1428                 :           0 :                         restart_lsn < result)
    1429                 :           0 :                         result = restart_lsn;
    1430      [ -  +  - ]:         140 :         }
    1431                 :             : 
    1432                 :          14 :         LWLockRelease(ReplicationSlotControlLock);
    1433                 :             : 
    1434                 :          14 :         return result;
    1435                 :          14 : }
    1436                 :             : 
    1437                 :             : /*
    1438                 :             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1439                 :             :  * passed database oid.
    1440                 :             :  *
    1441                 :             :  * Returns true if there are any slots referencing the database. *nslots will
    1442                 :             :  * be set to the absolute number of slots in the database, *nactive to ones
    1443                 :             :  * currently active.
    1444                 :             :  */
    1445                 :             : bool
    1446                 :           1 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1447                 :             : {
    1448                 :           1 :         int                     i;
    1449                 :             : 
    1450                 :           1 :         *nslots = *nactive = 0;
    1451                 :             : 
    1452         [ -  + ]:           1 :         if (max_replication_slots <= 0)
    1453                 :           0 :                 return false;
    1454                 :             : 
    1455                 :           1 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1456         [ +  + ]:          11 :         for (i = 0; i < max_replication_slots; i++)
    1457                 :             :         {
    1458                 :          10 :                 ReplicationSlot *s;
    1459                 :             : 
    1460                 :          10 :                 s = &ReplicationSlotCtl->replication_slots[i];
    1461                 :             : 
    1462                 :             :                 /* cannot change while ReplicationSlotCtlLock is held */
    1463         [ -  + ]:          10 :                 if (!s->in_use)
    1464                 :          10 :                         continue;
    1465                 :             : 
    1466                 :             :                 /* only logical slots are database specific, skip */
    1467         [ #  # ]:           0 :                 if (!SlotIsLogical(s))
    1468                 :           0 :                         continue;
    1469                 :             : 
    1470                 :             :                 /* not our database, skip */
    1471         [ #  # ]:           0 :                 if (s->data.database != dboid)
    1472                 :           0 :                         continue;
    1473                 :             : 
    1474                 :             :                 /* NB: intentionally counting invalidated slots */
    1475                 :             : 
    1476                 :             :                 /* count slots with spinlock held */
    1477         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    1478                 :           0 :                 (*nslots)++;
    1479         [ #  # ]:           0 :                 if (s->active_pid != 0)
    1480                 :           0 :                         (*nactive)++;
    1481                 :           0 :                 SpinLockRelease(&s->mutex);
    1482      [ -  +  - ]:          10 :         }
    1483                 :           1 :         LWLockRelease(ReplicationSlotControlLock);
    1484                 :             : 
    1485         [ -  + ]:           1 :         if (*nslots > 0)
    1486                 :           0 :                 return true;
    1487                 :           1 :         return false;
    1488                 :           1 : }
    1489                 :             : 
    1490                 :             : /*
    1491                 :             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1492                 :             :  * passed database oid. The caller should hold an exclusive lock on the
    1493                 :             :  * pg_database oid for the database to prevent creation of new slots on the db
    1494                 :             :  * or replay from existing slots.
    1495                 :             :  *
    1496                 :             :  * Another session that concurrently acquires an existing slot on the target DB
    1497                 :             :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1498                 :             :  * it may have dropped some but not all slots.
    1499                 :             :  *
    1500                 :             :  * This routine isn't as efficient as it could be - but we don't drop
    1501                 :             :  * databases often, especially databases with lots of slots.
    1502                 :             :  *
    1503                 :             :  * If it drops the last logical slot in the cluster, it requests to disable
    1504                 :             :  * logical decoding.
    1505                 :             :  */
    1506                 :             : void
    1507                 :           1 : ReplicationSlotsDropDBSlots(Oid dboid)
    1508                 :             : {
    1509                 :           1 :         int                     i;
    1510                 :           1 :         bool            found_valid_logicalslot;
    1511                 :           1 :         bool            dropped = false;
    1512                 :             : 
    1513         [ -  + ]:           1 :         if (max_replication_slots <= 0)
    1514                 :           0 :                 return;
    1515                 :             : 
    1516                 :             : restart:
    1517                 :           1 :         found_valid_logicalslot = false;
    1518                 :           1 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1519         [ +  + ]:          11 :         for (i = 0; i < max_replication_slots; i++)
    1520                 :             :         {
    1521                 :          10 :                 ReplicationSlot *s;
    1522                 :          10 :                 char       *slotname;
    1523                 :          10 :                 int                     active_pid;
    1524                 :             : 
    1525                 :          10 :                 s = &ReplicationSlotCtl->replication_slots[i];
    1526                 :             : 
    1527                 :             :                 /* cannot change while ReplicationSlotCtlLock is held */
    1528         [ -  + ]:          10 :                 if (!s->in_use)
    1529                 :          10 :                         continue;
    1530                 :             : 
    1531                 :             :                 /* only logical slots are database specific, skip */
    1532         [ #  # ]:           0 :                 if (!SlotIsLogical(s))
    1533                 :           0 :                         continue;
    1534                 :             : 
    1535                 :             :                 /*
    1536                 :             :                  * Check logical slots on other databases too so we can disable
    1537                 :             :                  * logical decoding only if no slots in the cluster.
    1538                 :             :                  */
    1539         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    1540                 :           0 :                 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    1541                 :           0 :                 SpinLockRelease(&s->mutex);
    1542                 :             : 
    1543                 :             :                 /* not our database, skip */
    1544         [ #  # ]:           0 :                 if (s->data.database != dboid)
    1545                 :           0 :                         continue;
    1546                 :             : 
    1547                 :             :                 /* NB: intentionally including invalidated slots to drop */
    1548                 :             : 
    1549                 :             :                 /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1550         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    1551                 :             :                 /* can't change while ReplicationSlotControlLock is held */
    1552                 :           0 :                 slotname = NameStr(s->data.name);
    1553                 :           0 :                 active_pid = s->active_pid;
    1554         [ #  # ]:           0 :                 if (active_pid == 0)
    1555                 :             :                 {
    1556                 :           0 :                         MyReplicationSlot = s;
    1557                 :           0 :                         s->active_pid = MyProcPid;
    1558                 :           0 :                 }
    1559                 :           0 :                 SpinLockRelease(&s->mutex);
    1560                 :             : 
    1561                 :             :                 /*
    1562                 :             :                  * Even though we hold an exclusive lock on the database object a
    1563                 :             :                  * logical slot for that DB can still be active, e.g. if it's
    1564                 :             :                  * concurrently being dropped by a backend connected to another DB.
    1565                 :             :                  *
    1566                 :             :                  * That's fairly unlikely in practice, so we'll just bail out.
    1567                 :             :                  *
    1568                 :             :                  * The slot sync worker holds a shared lock on the database before
    1569                 :             :                  * operating on synced logical slots to avoid conflict with the drop
    1570                 :             :                  * happening here. The persistent synced slots are thus safe but there
    1571                 :             :                  * is a possibility that the slot sync worker has created a temporary
    1572                 :             :                  * slot (which stays active even on release) and we are trying to drop
    1573                 :             :                  * that here. In practice, the chances of hitting this scenario are
    1574                 :             :                  * less as during slot synchronization, the temporary slot is
    1575                 :             :                  * immediately converted to persistent and thus is safe due to the
    1576                 :             :                  * shared lock taken on the database. So, we'll just bail out in such
    1577                 :             :                  * a case.
    1578                 :             :                  *
    1579                 :             :                  * XXX: We can consider shutting down the slot sync worker before
    1580                 :             :                  * trying to drop synced temporary slots here.
    1581                 :             :                  */
    1582         [ #  # ]:           0 :                 if (active_pid)
    1583   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1584                 :             :                                         (errcode(ERRCODE_OBJECT_IN_USE),
    1585                 :             :                                          errmsg("replication slot \"%s\" is active for PID %d",
    1586                 :             :                                                         slotname, active_pid)));
    1587                 :             : 
    1588                 :             :                 /*
    1589                 :             :                  * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1590                 :             :                  * holding ReplicationSlotControlLock over filesystem operations,
    1591                 :             :                  * release ReplicationSlotControlLock and use
    1592                 :             :                  * ReplicationSlotDropAcquired.
    1593                 :             :                  *
    1594                 :             :                  * As that means the set of slots could change, restart scan from the
    1595                 :             :                  * beginning each time we release the lock.
    1596                 :             :                  */
    1597                 :           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1598                 :           0 :                 ReplicationSlotDropAcquired();
    1599                 :           0 :                 dropped = true;
    1600                 :           0 :                 goto restart;
    1601         [ -  + ]:          10 :         }
    1602                 :           1 :         LWLockRelease(ReplicationSlotControlLock);
    1603                 :             : 
    1604   [ -  +  #  # ]:           1 :         if (dropped && !found_valid_logicalslot)
    1605                 :           0 :                 RequestDisableLogicalDecoding();
    1606                 :           1 : }
    1607                 :             : 
    1608                 :             : /*
    1609                 :             :  * Returns true if there is at least one in-use valid logical replication slot.
    1610                 :             :  */
    1611                 :             : bool
    1612                 :           3 : CheckLogicalSlotExists(void)
    1613                 :             : {
    1614                 :           3 :         bool            found = false;
    1615                 :             : 
    1616         [ -  + ]:           3 :         if (max_replication_slots <= 0)
    1617                 :           0 :                 return false;
    1618                 :             : 
    1619                 :           3 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1620         [ +  + ]:          33 :         for (int i = 0; i < max_replication_slots; i++)
    1621                 :             :         {
    1622                 :          30 :                 ReplicationSlot *s;
    1623                 :          30 :                 bool            invalidated;
    1624                 :             : 
    1625                 :          30 :                 s = &ReplicationSlotCtl->replication_slots[i];
    1626                 :             : 
    1627                 :             :                 /* cannot change while ReplicationSlotCtlLock is held */
    1628         [ -  + ]:          30 :                 if (!s->in_use)
    1629                 :          30 :                         continue;
    1630                 :             : 
    1631         [ #  # ]:           0 :                 if (SlotIsPhysical(s))
    1632                 :           0 :                         continue;
    1633                 :             : 
    1634         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    1635                 :           0 :                 invalidated = s->data.invalidated != RS_INVAL_NONE;
    1636                 :           0 :                 SpinLockRelease(&s->mutex);
    1637                 :             : 
    1638         [ #  # ]:           0 :                 if (invalidated)
    1639                 :           0 :                         continue;
    1640                 :             : 
    1641                 :           0 :                 found = true;
    1642                 :           0 :                 break;
    1643         [ -  + ]:          30 :         }
    1644                 :           3 :         LWLockRelease(ReplicationSlotControlLock);
    1645                 :             : 
    1646                 :           3 :         return found;
    1647                 :           3 : }
    1648                 :             : 
    1649                 :             : /*
    1650                 :             :  * Check whether the server's configuration supports using replication
    1651                 :             :  * slots.
    1652                 :             :  */
    1653                 :             : void
    1654                 :           0 : CheckSlotRequirements(void)
    1655                 :             : {
    1656                 :             :         /*
    1657                 :             :          * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1658                 :             :          * needs the same check.
    1659                 :             :          */
    1660                 :             : 
    1661         [ #  # ]:           0 :         if (max_replication_slots == 0)
    1662   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1663                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1664                 :             :                                  errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
    1665                 :             : 
    1666         [ #  # ]:           0 :         if (wal_level < WAL_LEVEL_REPLICA)
    1667   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1668                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1669                 :             :                                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1670                 :           0 : }
    1671                 :             : 
    1672                 :             : /*
    1673                 :             :  * Check whether the user has privilege to use replication slots.
    1674                 :             :  */
    1675                 :             : void
    1676                 :           0 : CheckSlotPermissions(void)
    1677                 :             : {
    1678         [ #  # ]:           0 :         if (!has_rolreplication(GetUserId()))
    1679   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1680                 :             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1681                 :             :                                  errmsg("permission denied to use replication slots"),
    1682                 :             :                                  errdetail("Only roles with the %s attribute may use replication slots.",
    1683                 :             :                                                    "REPLICATION")));
    1684                 :           0 : }
    1685                 :             : 
    1686                 :             : /*
    1687                 :             :  * Reserve WAL for the currently active slot.
    1688                 :             :  *
    1689                 :             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1690                 :             :  * the slot and concurrency safe.
    1691                 :             :  */
    1692                 :             : void
    1693                 :           0 : ReplicationSlotReserveWal(void)
    1694                 :             : {
    1695                 :           0 :         ReplicationSlot *slot = MyReplicationSlot;
    1696                 :           0 :         XLogSegNo       segno;
    1697                 :           0 :         XLogRecPtr      restart_lsn;
    1698                 :             : 
    1699         [ #  # ]:           0 :         Assert(slot != NULL);
    1700         [ #  # ]:           0 :         Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
    1701         [ #  # ]:           0 :         Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
    1702                 :             : 
    1703                 :             :         /*
    1704                 :             :          * The replication slot mechanism is used to prevent the removal of
    1705                 :             :          * required WAL.
    1706                 :             :          *
    1707                 :             :          * Acquire an exclusive lock to prevent the checkpoint process from
    1708                 :             :          * concurrently computing the minimum slot LSN (see
    1709                 :             :          * CheckPointReplicationSlots). This ensures that the WAL reserved for
    1710                 :             :          * replication cannot be removed during a checkpoint.
    1711                 :             :          *
    1712                 :             :          * The mechanism is reliable because if WAL reservation occurs first, the
    1713                 :             :          * checkpoint must wait for the restart_lsn update before determining the
    1714                 :             :          * minimum non-removable LSN. On the other hand, if the checkpoint happens
    1715                 :             :          * first, subsequent WAL reservations will select positions at or beyond
    1716                 :             :          * the redo pointer of that checkpoint.
    1717                 :             :          */
    1718                 :           0 :         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1719                 :             : 
    1720                 :             :         /*
    1721                 :             :          * For logical slots log a standby snapshot and start logical decoding at
    1722                 :             :          * exactly that position. That allows the slot to start up more quickly.
    1723                 :             :          * But on a standby we cannot do WAL writes, so just use the replay
    1724                 :             :          * pointer; effectively, an attempt to create a logical slot on standby
    1725                 :             :          * will cause it to wait for an xl_running_xact record to be logged
    1726                 :             :          * independently on the primary, so that a snapshot can be built using the
    1727                 :             :          * record.
    1728                 :             :          *
    1729                 :             :          * None of this is needed (or indeed helpful) for physical slots as
    1730                 :             :          * they'll start replay at the last logged checkpoint anyway. Instead,
    1731                 :             :          * return the location of the last redo LSN, where a base backup has to
    1732                 :             :          * start replay at.
    1733                 :             :          */
    1734         [ #  # ]:           0 :         if (SlotIsPhysical(slot))
    1735                 :           0 :                 restart_lsn = GetRedoRecPtr();
    1736         [ #  # ]:           0 :         else if (RecoveryInProgress())
    1737                 :           0 :                 restart_lsn = GetXLogReplayRecPtr(NULL);
    1738                 :             :         else
    1739                 :           0 :                 restart_lsn = GetXLogInsertRecPtr();
    1740                 :             : 
    1741         [ #  # ]:           0 :         SpinLockAcquire(&slot->mutex);
    1742                 :           0 :         slot->data.restart_lsn = restart_lsn;
    1743                 :           0 :         SpinLockRelease(&slot->mutex);
    1744                 :             : 
    1745                 :             :         /* prevent WAL removal as fast as possible */
    1746                 :           0 :         ReplicationSlotsComputeRequiredLSN();
    1747                 :             : 
    1748                 :             :         /* Checkpoint shouldn't remove the required WAL. */
    1749                 :           0 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1750         [ #  # ]:           0 :         if (XLogGetLastRemovedSegno() >= segno)
    1751   [ #  #  #  # ]:           0 :                 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
    1752                 :             :                          NameStr(slot->data.name));
    1753                 :             : 
    1754                 :           0 :         LWLockRelease(ReplicationSlotAllocationLock);
    1755                 :             : 
    1756   [ #  #  #  # ]:           0 :         if (!RecoveryInProgress() && SlotIsLogical(slot))
    1757                 :             :         {
    1758                 :           0 :                 XLogRecPtr      flushptr;
    1759                 :             : 
    1760                 :             :                 /* make sure we have enough information to start */
    1761                 :           0 :                 flushptr = LogStandbySnapshot();
    1762                 :             : 
    1763                 :             :                 /* and make sure it's fsynced to disk */
    1764                 :           0 :                 XLogFlush(flushptr);
    1765                 :           0 :         }
    1766                 :           0 : }
    1767                 :             : 
    1768                 :             : /*
    1769                 :             :  * Report that replication slot needs to be invalidated
    1770                 :             :  */
    1771                 :             : static void
    1772                 :           0 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1773                 :             :                                            bool terminating,
    1774                 :             :                                            int pid,
    1775                 :             :                                            NameData slotname,
    1776                 :             :                                            XLogRecPtr restart_lsn,
    1777                 :             :                                            XLogRecPtr oldestLSN,
    1778                 :             :                                            TransactionId snapshotConflictHorizon,
    1779                 :             :                                            long slot_idle_seconds)
    1780                 :             : {
    1781                 :           0 :         StringInfoData err_detail;
    1782                 :           0 :         StringInfoData err_hint;
    1783                 :             : 
    1784                 :           0 :         initStringInfo(&err_detail);
    1785                 :           0 :         initStringInfo(&err_hint);
    1786                 :             : 
    1787   [ #  #  #  #  :           0 :         switch (cause)
                   #  # ]
    1788                 :             :         {
    1789                 :             :                 case RS_INVAL_WAL_REMOVED:
    1790                 :             :                         {
    1791                 :           0 :                                 uint64          ex = oldestLSN - restart_lsn;
    1792                 :             : 
    1793                 :           0 :                                 appendStringInfo(&err_detail,
    1794                 :           0 :                                                                  ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
    1795                 :             :                                                                                   "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
    1796                 :           0 :                                                                                   ex),
    1797                 :           0 :                                                                  LSN_FORMAT_ARGS(restart_lsn),
    1798                 :           0 :                                                                  ex);
    1799                 :             :                                 /* translator: %s is a GUC variable name */
    1800                 :           0 :                                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1801                 :             :                                                                  "max_slot_wal_keep_size");
    1802                 :             :                                 break;
    1803                 :           0 :                         }
    1804                 :             :                 case RS_INVAL_HORIZON:
    1805                 :           0 :                         appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1806                 :           0 :                                                          snapshotConflictHorizon);
    1807                 :           0 :                         break;
    1808                 :             : 
    1809                 :             :                 case RS_INVAL_WAL_LEVEL:
    1810                 :           0 :                         appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
    1811                 :           0 :                         break;
    1812                 :             : 
    1813                 :             :                 case RS_INVAL_IDLE_TIMEOUT:
    1814                 :             :                         {
    1815                 :             :                                 /* translator: %s is a GUC variable name */
    1816                 :           0 :                                 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
    1817                 :           0 :                                                                  slot_idle_seconds, "idle_replication_slot_timeout",
    1818                 :           0 :                                                                  idle_replication_slot_timeout_secs);
    1819                 :             :                                 /* translator: %s is a GUC variable name */
    1820                 :           0 :                                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1821                 :             :                                                                  "idle_replication_slot_timeout");
    1822                 :           0 :                                 break;
    1823                 :             :                         }
    1824                 :             :                 case RS_INVAL_NONE:
    1825                 :           0 :                         pg_unreachable();
    1826                 :             :         }
    1827                 :             : 
    1828   [ #  #  #  #  :           0 :         ereport(LOG,
             #  #  #  # ]
    1829                 :             :                         terminating ?
    1830                 :             :                         errmsg("terminating process %d to release replication slot \"%s\"",
    1831                 :             :                                    pid, NameStr(slotname)) :
    1832                 :             :                         errmsg("invalidating obsolete replication slot \"%s\"",
    1833                 :             :                                    NameStr(slotname)),
    1834                 :             :                         errdetail_internal("%s", err_detail.data),
    1835                 :             :                         err_hint.len ? errhint("%s", err_hint.data) : 0);
    1836                 :             : 
    1837                 :           0 :         pfree(err_detail.data);
    1838                 :           0 :         pfree(err_hint.data);
    1839                 :           0 : }
    1840                 :             : 
    1841                 :             : /*
    1842                 :             :  * Can we invalidate an idle replication slot?
    1843                 :             :  *
    1844                 :             :  * Idle timeout invalidation is allowed only when:
    1845                 :             :  *
    1846                 :             :  * 1. Idle timeout is set
    1847                 :             :  * 2. Slot has reserved WAL
    1848                 :             :  * 3. Slot is inactive
    1849                 :             :  * 4. The slot is not being synced from the primary while the server is in
    1850                 :             :  *        recovery. This is because synced slots are always considered to be
    1851                 :             :  *        inactive because they don't perform logical decoding to produce changes.
    1852                 :             :  */
    1853                 :             : static inline bool
    1854                 :           0 : CanInvalidateIdleSlot(ReplicationSlot *s)
    1855                 :             : {
    1856         [ #  # ]:           0 :         return (idle_replication_slot_timeout_secs != 0 &&
    1857         [ #  # ]:           0 :                         XLogRecPtrIsValid(s->data.restart_lsn) &&
    1858         [ #  # ]:           0 :                         s->inactive_since > 0 &&
    1859         [ #  # ]:           0 :                         !(RecoveryInProgress() && s->data.synced));
    1860                 :             : }
    1861                 :             : 
    1862                 :             : /*
    1863                 :             :  * DetermineSlotInvalidationCause - Determine the cause for which a slot
    1864                 :             :  * becomes invalid among the given possible causes.
    1865                 :             :  *
    1866                 :             :  * This function sequentially checks all possible invalidation causes and
    1867                 :             :  * returns the first one for which the slot is eligible for invalidation.
    1868                 :             :  */
    1869                 :             : static ReplicationSlotInvalidationCause
    1870                 :           0 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
    1871                 :             :                                                            XLogRecPtr oldestLSN, Oid dboid,
    1872                 :             :                                                            TransactionId snapshotConflictHorizon,
    1873                 :             :                                                            TimestampTz *inactive_since, TimestampTz now)
    1874                 :             : {
    1875         [ #  # ]:           0 :         Assert(possible_causes != RS_INVAL_NONE);
    1876                 :             : 
    1877         [ #  # ]:           0 :         if (possible_causes & RS_INVAL_WAL_REMOVED)
    1878                 :             :         {
    1879                 :           0 :                 XLogRecPtr      restart_lsn = s->data.restart_lsn;
    1880                 :             : 
    1881   [ #  #  #  # ]:           0 :                 if (XLogRecPtrIsValid(restart_lsn) &&
    1882                 :           0 :                         restart_lsn < oldestLSN)
    1883                 :           0 :                         return RS_INVAL_WAL_REMOVED;
    1884         [ #  # ]:           0 :         }
    1885                 :             : 
    1886         [ #  # ]:           0 :         if (possible_causes & RS_INVAL_HORIZON)
    1887                 :             :         {
    1888                 :             :                 /* invalid DB oid signals a shared relation */
    1889   [ #  #  #  # ]:           0 :                 if (SlotIsLogical(s) &&
    1890         [ #  # ]:           0 :                         (dboid == InvalidOid || dboid == s->data.database))
    1891                 :             :                 {
    1892                 :           0 :                         TransactionId effective_xmin = s->effective_xmin;
    1893                 :           0 :                         TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
    1894                 :             : 
    1895   [ #  #  #  # ]:           0 :                         if (TransactionIdIsValid(effective_xmin) &&
    1896                 :           0 :                                 TransactionIdPrecedesOrEquals(effective_xmin,
    1897                 :           0 :                                                                                           snapshotConflictHorizon))
    1898                 :           0 :                                 return RS_INVAL_HORIZON;
    1899   [ #  #  #  # ]:           0 :                         else if (TransactionIdIsValid(catalog_effective_xmin) &&
    1900                 :           0 :                                          TransactionIdPrecedesOrEquals(catalog_effective_xmin,
    1901                 :           0 :                                                                                                    snapshotConflictHorizon))
    1902                 :           0 :                                 return RS_INVAL_HORIZON;
    1903         [ #  # ]:           0 :                 }
    1904                 :           0 :         }
    1905                 :             : 
    1906         [ #  # ]:           0 :         if (possible_causes & RS_INVAL_WAL_LEVEL)
    1907                 :             :         {
    1908         [ #  # ]:           0 :                 if (SlotIsLogical(s))
    1909                 :           0 :                         return RS_INVAL_WAL_LEVEL;
    1910                 :           0 :         }
    1911                 :             : 
    1912         [ #  # ]:           0 :         if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1913                 :             :         {
    1914         [ #  # ]:           0 :                 Assert(now > 0);
    1915                 :             : 
    1916         [ #  # ]:           0 :                 if (CanInvalidateIdleSlot(s))
    1917                 :             :                 {
    1918                 :             :                         /*
    1919                 :             :                          * Simulate the invalidation due to idle_timeout to test the
    1920                 :             :                          * timeout behavior promptly, without waiting for it to trigger
    1921                 :             :                          * naturally.
    1922                 :             :                          */
    1923                 :             : #ifdef USE_INJECTION_POINTS
    1924                 :             :                         if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
    1925                 :             :                         {
    1926                 :             :                                 *inactive_since = 0;    /* since the beginning of time */
    1927                 :             :                                 return RS_INVAL_IDLE_TIMEOUT;
    1928                 :             :                         }
    1929                 :             : #endif
    1930                 :             : 
    1931                 :             :                         /*
    1932                 :             :                          * Check if the slot needs to be invalidated due to
    1933                 :             :                          * idle_replication_slot_timeout GUC.
    1934                 :             :                          */
    1935   [ #  #  #  # ]:           0 :                         if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
    1936                 :           0 :                                                                                                   idle_replication_slot_timeout_secs))
    1937                 :             :                         {
    1938                 :           0 :                                 *inactive_since = s->inactive_since;
    1939                 :           0 :                                 return RS_INVAL_IDLE_TIMEOUT;
    1940                 :             :                         }
    1941                 :           0 :                 }
    1942                 :           0 :         }
    1943                 :             : 
    1944                 :           0 :         return RS_INVAL_NONE;
    1945                 :           0 : }
    1946                 :             : 
    1947                 :             : /*
    1948                 :             :  * Helper for InvalidateObsoleteReplicationSlots
    1949                 :             :  *
    1950                 :             :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1951                 :             :  *
    1952                 :             :  * Returns true if the slot was invalidated.
    1953                 :             :  *
    1954                 :             :  * Set *released_lock_out if ReplicationSlotControlLock was released in the
    1955                 :             :  * interim (and in that case we're not holding the lock at return, otherwise
    1956                 :             :  * we are).
    1957                 :             :  *
    1958                 :             :  * This is inherently racy, because we release the LWLock
    1959                 :             :  * for syscalls, so caller must restart if we return true.
    1960                 :             :  */
    1961                 :             : static bool
    1962                 :           0 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
    1963                 :             :                                                            ReplicationSlot *s,
    1964                 :             :                                                            XLogRecPtr oldestLSN,
    1965                 :             :                                                            Oid dboid, TransactionId snapshotConflictHorizon,
    1966                 :             :                                                            bool *released_lock_out)
    1967                 :             : {
    1968                 :           0 :         int                     last_signaled_pid = 0;
    1969                 :           0 :         bool            released_lock = false;
    1970                 :           0 :         bool            invalidated = false;
    1971                 :           0 :         TimestampTz inactive_since = 0;
    1972                 :             : 
    1973                 :           0 :         for (;;)
    1974                 :             :         {
    1975                 :           0 :                 XLogRecPtr      restart_lsn;
    1976                 :           0 :                 NameData        slotname;
    1977                 :           0 :                 int                     active_pid = 0;
    1978                 :           0 :                 ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1979                 :           0 :                 TimestampTz now = 0;
    1980                 :           0 :                 long            slot_idle_secs = 0;
    1981                 :             : 
    1982         [ #  # ]:           0 :                 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1983                 :             : 
    1984         [ #  # ]:           0 :                 if (!s->in_use)
    1985                 :             :                 {
    1986         [ #  # ]:           0 :                         if (released_lock)
    1987                 :           0 :                                 LWLockRelease(ReplicationSlotControlLock);
    1988                 :           0 :                         break;
    1989                 :             :                 }
    1990                 :             : 
    1991         [ #  # ]:           0 :                 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1992                 :             :                 {
    1993                 :             :                         /*
    1994                 :             :                          * Assign the current time here to avoid system call overhead
    1995                 :             :                          * while holding the spinlock in subsequent code.
    1996                 :             :                          */
    1997                 :           0 :                         now = GetCurrentTimestamp();
    1998                 :           0 :                 }
    1999                 :             : 
    2000                 :             :                 /*
    2001                 :             :                  * Check if the slot needs to be invalidated. If it needs to be
    2002                 :             :                  * invalidated, and is not currently acquired, acquire it and mark it
    2003                 :             :                  * as having been invalidated.  We do this with the spinlock held to
    2004                 :             :                  * avoid race conditions -- for example the restart_lsn could move
    2005                 :             :                  * forward, or the slot could be dropped.
    2006                 :             :                  */
    2007         [ #  # ]:           0 :                 SpinLockAcquire(&s->mutex);
    2008                 :             : 
    2009                 :           0 :                 restart_lsn = s->data.restart_lsn;
    2010                 :             : 
    2011                 :             :                 /* we do nothing if the slot is already invalid */
    2012         [ #  # ]:           0 :                 if (s->data.invalidated == RS_INVAL_NONE)
    2013                 :           0 :                         invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
    2014                 :           0 :                                                                                                                                 s, oldestLSN,
    2015                 :           0 :                                                                                                                                 dboid,
    2016                 :           0 :                                                                                                                                 snapshotConflictHorizon,
    2017                 :             :                                                                                                                                 &inactive_since,
    2018                 :           0 :                                                                                                                                 now);
    2019                 :             : 
    2020                 :             :                 /* if there's no invalidation, we're done */
    2021         [ #  # ]:           0 :                 if (invalidation_cause == RS_INVAL_NONE)
    2022                 :             :                 {
    2023                 :           0 :                         SpinLockRelease(&s->mutex);
    2024         [ #  # ]:           0 :                         if (released_lock)
    2025                 :           0 :                                 LWLockRelease(ReplicationSlotControlLock);
    2026                 :           0 :                         break;
    2027                 :             :                 }
    2028                 :             : 
    2029                 :           0 :                 slotname = s->data.name;
    2030                 :           0 :                 active_pid = s->active_pid;
    2031                 :             : 
    2032                 :             :                 /*
    2033                 :             :                  * If the slot can be acquired, do so and mark it invalidated
    2034                 :             :                  * immediately.  Otherwise we'll signal the owning process, below, and
    2035                 :             :                  * retry.
    2036                 :             :                  *
    2037                 :             :                  * Note: Unlike other slot attributes, slot's inactive_since can't be
    2038                 :             :                  * changed until the acquired slot is released or the owning process
    2039                 :             :                  * is terminated. So, the inactive slot can only be invalidated
    2040                 :             :                  * immediately without being terminated.
    2041                 :             :                  */
    2042         [ #  # ]:           0 :                 if (active_pid == 0)
    2043                 :             :                 {
    2044                 :           0 :                         MyReplicationSlot = s;
    2045                 :           0 :                         s->active_pid = MyProcPid;
    2046                 :           0 :                         s->data.invalidated = invalidation_cause;
    2047                 :             : 
    2048                 :             :                         /*
    2049                 :             :                          * XXX: We should consider not overwriting restart_lsn and instead
    2050                 :             :                          * just rely on .invalidated.
    2051                 :             :                          */
    2052         [ #  # ]:           0 :                         if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    2053                 :             :                         {
    2054                 :           0 :                                 s->data.restart_lsn = InvalidXLogRecPtr;
    2055                 :           0 :                                 s->last_saved_restart_lsn = InvalidXLogRecPtr;
    2056                 :           0 :                         }
    2057                 :             : 
    2058                 :             :                         /* Let caller know */
    2059                 :           0 :                         invalidated = true;
    2060                 :           0 :                 }
    2061                 :             : 
    2062                 :           0 :                 SpinLockRelease(&s->mutex);
    2063                 :             : 
    2064                 :             :                 /*
    2065                 :             :                  * Calculate the idle time duration of the slot if slot is marked
    2066                 :             :                  * invalidated with RS_INVAL_IDLE_TIMEOUT.
    2067                 :             :                  */
    2068         [ #  # ]:           0 :                 if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
    2069                 :             :                 {
    2070                 :           0 :                         int                     slot_idle_usecs;
    2071                 :             : 
    2072                 :           0 :                         TimestampDifference(inactive_since, now, &slot_idle_secs,
    2073                 :             :                                                                 &slot_idle_usecs);
    2074                 :           0 :                 }
    2075                 :             : 
    2076         [ #  # ]:           0 :                 if (active_pid != 0)
    2077                 :             :                 {
    2078                 :             :                         /*
    2079                 :             :                          * Prepare the sleep on the slot's condition variable before
    2080                 :             :                          * releasing the lock, to close a possible race condition if the
    2081                 :             :                          * slot is released before the sleep below.
    2082                 :             :                          */
    2083                 :           0 :                         ConditionVariablePrepareToSleep(&s->active_cv);
    2084                 :             : 
    2085                 :           0 :                         LWLockRelease(ReplicationSlotControlLock);
    2086                 :           0 :                         released_lock = true;
    2087                 :             : 
    2088                 :             :                         /*
    2089                 :             :                          * Signal to terminate the process that owns the slot, if we
    2090                 :             :                          * haven't already signalled it.  (Avoidance of repeated
    2091                 :             :                          * signalling is the only reason for there to be a loop in this
    2092                 :             :                          * routine; otherwise we could rely on caller's restart loop.)
    2093                 :             :                          *
    2094                 :             :                          * There is the race condition that other process may own the slot
    2095                 :             :                          * after its current owner process is terminated and before this
    2096                 :             :                          * process owns it. To handle that, we signal only if the PID of
    2097                 :             :                          * the owning process has changed from the previous time. (This
    2098                 :             :                          * logic assumes that the same PID is not reused very quickly.)
    2099                 :             :                          */
    2100         [ #  # ]:           0 :                         if (last_signaled_pid != active_pid)
    2101                 :             :                         {
    2102                 :           0 :                                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    2103                 :           0 :                                                                            slotname, restart_lsn,
    2104                 :           0 :                                                                            oldestLSN, snapshotConflictHorizon,
    2105                 :           0 :                                                                            slot_idle_secs);
    2106                 :             : 
    2107         [ #  # ]:           0 :                                 if (MyBackendType == B_STARTUP)
    2108                 :           0 :                                         (void) SendProcSignal(active_pid,
    2109                 :             :                                                                                   PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
    2110                 :             :                                                                                   INVALID_PROC_NUMBER);
    2111                 :             :                                 else
    2112                 :           0 :                                         (void) kill(active_pid, SIGTERM);
    2113                 :             : 
    2114                 :           0 :                                 last_signaled_pid = active_pid;
    2115                 :           0 :                         }
    2116                 :             : 
    2117                 :             :                         /* Wait until the slot is released. */
    2118                 :           0 :                         ConditionVariableSleep(&s->active_cv,
    2119                 :             :                                                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    2120                 :             : 
    2121                 :             :                         /*
    2122                 :             :                          * Re-acquire lock and start over; we expect to invalidate the
    2123                 :             :                          * slot next time (unless another process acquires the slot in the
    2124                 :             :                          * meantime).
    2125                 :             :                          *
    2126                 :             :                          * Note: It is possible for a slot to advance its restart_lsn or
    2127                 :             :                          * xmin values sufficiently between when we release the mutex and
    2128                 :             :                          * when we recheck, moving from a conflicting state to a non
    2129                 :             :                          * conflicting state.  This is intentional and safe: if the slot
    2130                 :             :                          * has caught up while we're busy here, the resources we were
    2131                 :             :                          * concerned about (WAL segments or tuples) have not yet been
    2132                 :             :                          * removed, and there's no reason to invalidate the slot.
    2133                 :             :                          */
    2134                 :           0 :                         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2135                 :           0 :                         continue;
    2136                 :             :                 }
    2137                 :             :                 else
    2138                 :             :                 {
    2139                 :             :                         /*
    2140                 :             :                          * We hold the slot now and have already invalidated it; flush it
    2141                 :             :                          * to ensure that state persists.
    2142                 :             :                          *
    2143                 :             :                          * Don't want to hold ReplicationSlotControlLock across file
    2144                 :             :                          * system operations, so release it now but be sure to tell caller
    2145                 :             :                          * to restart from scratch.
    2146                 :             :                          */
    2147                 :           0 :                         LWLockRelease(ReplicationSlotControlLock);
    2148                 :           0 :                         released_lock = true;
    2149                 :             : 
    2150                 :             :                         /* Make sure the invalidated state persists across server restart */
    2151                 :           0 :                         ReplicationSlotMarkDirty();
    2152                 :           0 :                         ReplicationSlotSave();
    2153                 :           0 :                         ReplicationSlotRelease();
    2154                 :             : 
    2155                 :           0 :                         ReportSlotInvalidation(invalidation_cause, false, active_pid,
    2156                 :           0 :                                                                    slotname, restart_lsn,
    2157                 :           0 :                                                                    oldestLSN, snapshotConflictHorizon,
    2158                 :           0 :                                                                    slot_idle_secs);
    2159                 :             : 
    2160                 :             :                         /* done with this slot for now */
    2161                 :           0 :                         break;
    2162                 :             :                 }
    2163      [ #  #  # ]:           0 :         }
    2164                 :             : 
    2165         [ #  # ]:           0 :         Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    2166                 :             : 
    2167                 :           0 :         *released_lock_out = released_lock;
    2168                 :           0 :         return invalidated;
    2169                 :           0 : }
    2170                 :             : 
    2171                 :             : /*
    2172                 :             :  * Invalidate slots that require resources about to be removed.
    2173                 :             :  *
    2174                 :             :  * Returns true when any slot have got invalidated.
    2175                 :             :  *
    2176                 :             :  * Whether a slot needs to be invalidated depends on the invalidation cause.
    2177                 :             :  * A slot is invalidated if it:
    2178                 :             :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    2179                 :             :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    2180                 :             :  *   db; dboid may be InvalidOid for shared relations
    2181                 :             :  * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
    2182                 :             :  *   logical.
    2183                 :             :  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
    2184                 :             :  *   "idle_replication_slot_timeout" duration.
    2185                 :             :  *
    2186                 :             :  * Note: This function attempts to invalidate the slot for multiple possible
    2187                 :             :  * causes in a single pass, minimizing redundant iterations. The "cause"
    2188                 :             :  * parameter can be a MASK representing one or more of the defined causes.
    2189                 :             :  *
    2190                 :             :  * If it invalidates the last logical slot in the cluster, it requests to
    2191                 :             :  * disable logical decoding.
    2192                 :             :  *
    2193                 :             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    2194                 :             :  */
    2195                 :             : bool
    2196                 :           7 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    2197                 :             :                                                                    XLogSegNo oldestSegno, Oid dboid,
    2198                 :             :                                                                    TransactionId snapshotConflictHorizon)
    2199                 :             : {
    2200                 :           7 :         XLogRecPtr      oldestLSN;
    2201                 :           7 :         bool            invalidated = false;
    2202                 :           7 :         bool            invalidated_logical = false;
    2203                 :           7 :         bool            found_valid_logicalslot;
    2204                 :             : 
    2205   [ -  +  #  # ]:           7 :         Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    2206   [ +  -  +  - ]:           7 :         Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
    2207         [ +  - ]:           7 :         Assert(possible_causes != RS_INVAL_NONE);
    2208                 :             : 
    2209         [ +  - ]:           7 :         if (max_replication_slots == 0)
    2210                 :           0 :                 return invalidated;
    2211                 :             : 
    2212                 :           7 :         XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    2213                 :             : 
    2214                 :             : restart:
    2215                 :           7 :         found_valid_logicalslot = false;
    2216                 :           7 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2217   [ +  +  -  -  :          77 :         for (int i = 0; i < max_replication_slots; i++)
                      + ]
    2218                 :             :         {
    2219                 :          70 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2220                 :          70 :                 bool            released_lock = false;
    2221                 :             : 
    2222         [ -  + ]:          70 :                 if (!s->in_use)
    2223                 :          70 :                         continue;
    2224                 :             : 
    2225                 :             :                 /* Prevent invalidation of logical slots during binary upgrade */
    2226   [ #  #  #  # ]:           0 :                 if (SlotIsLogical(s) && IsBinaryUpgrade)
    2227                 :             :                 {
    2228         [ #  # ]:           0 :                         SpinLockAcquire(&s->mutex);
    2229                 :           0 :                         found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    2230                 :           0 :                         SpinLockRelease(&s->mutex);
    2231                 :             : 
    2232                 :           0 :                         continue;
    2233                 :             :                 }
    2234                 :             : 
    2235   [ #  #  #  # ]:           0 :                 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
    2236                 :           0 :                                                                                    dboid, snapshotConflictHorizon,
    2237                 :             :                                                                                    &released_lock))
    2238                 :             :                 {
    2239         [ #  # ]:           0 :                         Assert(released_lock);
    2240                 :             : 
    2241                 :             :                         /* Remember we have invalidated a physical or logical slot */
    2242                 :           0 :                         invalidated = true;
    2243                 :             : 
    2244                 :             :                         /*
    2245                 :             :                          * Additionally, remember we have invalidated a logical slot as we
    2246                 :             :                          * can request disabling logical decoding later.
    2247                 :             :                          */
    2248         [ #  # ]:           0 :                         if (SlotIsLogical(s))
    2249                 :           0 :                                 invalidated_logical = true;
    2250                 :           0 :                 }
    2251                 :             :                 else
    2252                 :             :                 {
    2253                 :             :                         /*
    2254                 :             :                          * We need to check if the slot is invalidated here since
    2255                 :             :                          * InvalidatePossiblyObsoleteSlot() returns false also if the slot
    2256                 :             :                          * is already invalidated.
    2257                 :             :                          */
    2258         [ #  # ]:           0 :                         SpinLockAcquire(&s->mutex);
    2259                 :           0 :                         found_valid_logicalslot |=
    2260         [ #  # ]:           0 :                                 (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
    2261                 :           0 :                         SpinLockRelease(&s->mutex);
    2262                 :             :                 }
    2263                 :             : 
    2264                 :             :                 /* if the lock was released, start from scratch */
    2265         [ #  # ]:           0 :                 if (released_lock)
    2266                 :           0 :                         goto restart;
    2267      [ +  -  - ]:          70 :         }
    2268                 :           7 :         LWLockRelease(ReplicationSlotControlLock);
    2269                 :             : 
    2270                 :             :         /*
    2271                 :             :          * If any slots have been invalidated, recalculate the resource limits.
    2272                 :             :          */
    2273         [ +  - ]:           7 :         if (invalidated)
    2274                 :             :         {
    2275                 :           0 :                 ReplicationSlotsComputeRequiredXmin(false);
    2276                 :           0 :                 ReplicationSlotsComputeRequiredLSN();
    2277                 :           0 :         }
    2278                 :             : 
    2279                 :             :         /*
    2280                 :             :          * Request the checkpointer to disable logical decoding if no valid
    2281                 :             :          * logical slots remain. If called by the checkpointer during a
    2282                 :             :          * checkpoint, only the request is initiated; actual deactivation is
    2283                 :             :          * deferred until after the checkpoint completes.
    2284                 :             :          */
    2285   [ -  +  #  # ]:           7 :         if (invalidated_logical && !found_valid_logicalslot)
    2286                 :           0 :                 RequestDisableLogicalDecoding();
    2287                 :             : 
    2288                 :           7 :         return invalidated;
    2289                 :           7 : }
    2290                 :             : 
    2291                 :             : /*
    2292                 :             :  * Flush all replication slots to disk.
    2293                 :             :  *
    2294                 :             :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    2295                 :             :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    2296                 :             :  * for which the confirmed_flush LSN has been updated since the last time it
    2297                 :             :  * was saved and flush them.
    2298                 :             :  */
    2299                 :             : void
    2300                 :           7 : CheckPointReplicationSlots(bool is_shutdown)
    2301                 :             : {
    2302                 :           7 :         int                     i;
    2303                 :           7 :         bool            last_saved_restart_lsn_updated = false;
    2304                 :             : 
    2305   [ -  +  -  + ]:           7 :         elog(DEBUG1, "performing replication slot checkpoint");
    2306                 :             : 
    2307                 :             :         /*
    2308                 :             :          * Prevent any slot from being created/dropped while we're active. As we
    2309                 :             :          * explicitly do *not* want to block iterating over replication_slots or
    2310                 :             :          * acquiring a slot we cannot take the control lock - but that's OK,
    2311                 :             :          * because holding ReplicationSlotAllocationLock is strictly stronger, and
    2312                 :             :          * enough to guarantee that nobody can change the in_use bits on us.
    2313                 :             :          *
    2314                 :             :          * Additionally, acquiring the Allocation lock is necessary to serialize
    2315                 :             :          * the slot flush process with concurrent slot WAL reservation. This
    2316                 :             :          * ensures that the WAL position being reserved is either flushed to disk
    2317                 :             :          * or is beyond or equal to the redo pointer of the current checkpoint
    2318                 :             :          * (See ReplicationSlotReserveWal for details).
    2319                 :             :          */
    2320                 :           7 :         LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    2321                 :             : 
    2322         [ +  + ]:          77 :         for (i = 0; i < max_replication_slots; i++)
    2323                 :             :         {
    2324                 :          70 :                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2325                 :          70 :                 char            path[MAXPGPATH];
    2326                 :             : 
    2327         [ -  + ]:          70 :                 if (!s->in_use)
    2328                 :          70 :                         continue;
    2329                 :             : 
    2330                 :             :                 /* save the slot to disk, locking is handled in SaveSlotToPath() */
    2331                 :           0 :                 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    2332                 :             : 
    2333                 :             :                 /*
    2334                 :             :                  * Slot's data is not flushed each time the confirmed_flush LSN is
    2335                 :             :                  * updated as that could lead to frequent writes.  However, we decide
    2336                 :             :                  * to force a flush of all logical slot's data at the time of shutdown
    2337                 :             :                  * if the confirmed_flush LSN is changed since we last flushed it to
    2338                 :             :                  * disk.  This helps in avoiding an unnecessary retreat of the
    2339                 :             :                  * confirmed_flush LSN after restart.
    2340                 :             :                  */
    2341   [ #  #  #  # ]:           0 :                 if (is_shutdown && SlotIsLogical(s))
    2342                 :             :                 {
    2343         [ #  # ]:           0 :                         SpinLockAcquire(&s->mutex);
    2344                 :             : 
    2345   [ #  #  #  # ]:           0 :                         if (s->data.invalidated == RS_INVAL_NONE &&
    2346                 :           0 :                                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    2347                 :             :                         {
    2348                 :           0 :                                 s->just_dirtied = true;
    2349                 :           0 :                                 s->dirty = true;
    2350                 :           0 :                         }
    2351                 :           0 :                         SpinLockRelease(&s->mutex);
    2352                 :           0 :                 }
    2353                 :             : 
    2354                 :             :                 /*
    2355                 :             :                  * Track if we're going to update slot's last_saved_restart_lsn. We
    2356                 :             :                  * need this to know if we need to recompute the required LSN.
    2357                 :             :                  */
    2358         [ #  # ]:           0 :                 if (s->last_saved_restart_lsn != s->data.restart_lsn)
    2359                 :           0 :                         last_saved_restart_lsn_updated = true;
    2360                 :             : 
    2361                 :           0 :                 SaveSlotToPath(s, path, LOG);
    2362      [ -  +  - ]:          70 :         }
    2363                 :           7 :         LWLockRelease(ReplicationSlotAllocationLock);
    2364                 :             : 
    2365                 :             :         /*
    2366                 :             :          * Recompute the required LSN if SaveSlotToPath() updated
    2367                 :             :          * last_saved_restart_lsn for any slot.
    2368                 :             :          */
    2369         [ +  - ]:           7 :         if (last_saved_restart_lsn_updated)
    2370                 :           0 :                 ReplicationSlotsComputeRequiredLSN();
    2371                 :           7 : }
    2372                 :             : 
    2373                 :             : /*
    2374                 :             :  * Load all replication slots from disk into memory at server startup. This
    2375                 :             :  * needs to be run before we start crash recovery.
    2376                 :             :  */
    2377                 :             : void
    2378                 :           4 : StartupReplicationSlots(void)
    2379                 :             : {
    2380                 :           4 :         DIR                *replication_dir;
    2381                 :           4 :         struct dirent *replication_de;
    2382                 :             : 
    2383   [ -  +  -  + ]:           4 :         elog(DEBUG1, "starting up replication slots");
    2384                 :             : 
    2385                 :             :         /* restore all slots by iterating over all on-disk entries */
    2386                 :           4 :         replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    2387         [ +  + ]:          12 :         while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    2388                 :             :         {
    2389                 :           8 :                 char            path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2390                 :           8 :                 PGFileType      de_type;
    2391                 :             : 
    2392   [ +  +  +  - ]:           8 :                 if (strcmp(replication_de->d_name, ".") == 0 ||
    2393                 :           4 :                         strcmp(replication_de->d_name, "..") == 0)
    2394                 :           8 :                         continue;
    2395                 :             : 
    2396                 :           0 :                 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    2397                 :           0 :                 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    2398                 :             : 
    2399                 :             :                 /* we're only creating directories here, skip if it's not our's */
    2400   [ #  #  #  # ]:           0 :                 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    2401                 :           0 :                         continue;
    2402                 :             : 
    2403                 :             :                 /* we crashed while a slot was being setup or deleted, clean up */
    2404         [ #  # ]:           0 :                 if (pg_str_endswith(replication_de->d_name, ".tmp"))
    2405                 :             :                 {
    2406         [ #  # ]:           0 :                         if (!rmtree(path, true))
    2407                 :             :                         {
    2408   [ #  #  #  # ]:           0 :                                 ereport(WARNING,
    2409                 :             :                                                 (errmsg("could not remove directory \"%s\"",
    2410                 :             :                                                                 path)));
    2411                 :           0 :                                 continue;
    2412                 :             :                         }
    2413                 :           0 :                         fsync_fname(PG_REPLSLOT_DIR, true);
    2414                 :           0 :                         continue;
    2415                 :             :                 }
    2416                 :             : 
    2417                 :             :                 /* looks like a slot in a normal state, restore */
    2418                 :           0 :                 RestoreSlotFromDisk(replication_de->d_name);
    2419         [ +  - ]:           8 :         }
    2420                 :           4 :         FreeDir(replication_dir);
    2421                 :             : 
    2422                 :             :         /* currently no slots exist, we're done. */
    2423         [ -  + ]:           4 :         if (max_replication_slots <= 0)
    2424                 :           0 :                 return;
    2425                 :             : 
    2426                 :             :         /* Now that we have recovered all the data, compute replication xmin */
    2427                 :           4 :         ReplicationSlotsComputeRequiredXmin(false);
    2428                 :           4 :         ReplicationSlotsComputeRequiredLSN();
    2429                 :           4 : }
    2430                 :             : 
    2431                 :             : /* ----
    2432                 :             :  * Manipulation of on-disk state of replication slots
    2433                 :             :  *
    2434                 :             :  * NB: none of the routines below should take any notice whether a slot is the
    2435                 :             :  * current one or not, that's all handled a layer above.
    2436                 :             :  * ----
    2437                 :             :  */
    2438                 :             : static void
    2439                 :           0 : CreateSlotOnDisk(ReplicationSlot *slot)
    2440                 :             : {
    2441                 :           0 :         char            tmppath[MAXPGPATH];
    2442                 :           0 :         char            path[MAXPGPATH];
    2443                 :           0 :         struct stat st;
    2444                 :             : 
    2445                 :             :         /*
    2446                 :             :          * No need to take out the io_in_progress_lock, nobody else can see this
    2447                 :             :          * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    2448                 :             :          * takes out the lock, if we'd take the lock here, we'd deadlock.
    2449                 :             :          */
    2450                 :             : 
    2451                 :           0 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2452                 :           0 :         sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2453                 :             : 
    2454                 :             :         /*
    2455                 :             :          * It's just barely possible that some previous effort to create or drop a
    2456                 :             :          * slot with this name left a temp directory lying around. If that seems
    2457                 :             :          * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2458                 :             :          * out at the MakePGDirectory() below, so we don't bother checking
    2459                 :             :          * success.
    2460                 :             :          */
    2461   [ #  #  #  # ]:           0 :         if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2462                 :           0 :                 rmtree(tmppath, true);
    2463                 :             : 
    2464                 :             :         /* Create and fsync the temporary slot directory. */
    2465         [ #  # ]:           0 :         if (MakePGDirectory(tmppath) < 0)
    2466   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2467                 :             :                                 (errcode_for_file_access(),
    2468                 :             :                                  errmsg("could not create directory \"%s\": %m",
    2469                 :             :                                                 tmppath)));
    2470                 :           0 :         fsync_fname(tmppath, true);
    2471                 :             : 
    2472                 :             :         /* Write the actual state file. */
    2473                 :           0 :         slot->dirty = true;                  /* signal that we really need to write */
    2474                 :           0 :         SaveSlotToPath(slot, tmppath, ERROR);
    2475                 :             : 
    2476                 :             :         /* Rename the directory into place. */
    2477         [ #  # ]:           0 :         if (rename(tmppath, path) != 0)
    2478   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2479                 :             :                                 (errcode_for_file_access(),
    2480                 :             :                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2481                 :             :                                                 tmppath, path)));
    2482                 :             : 
    2483                 :             :         /*
    2484                 :             :          * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2485                 :             :          * would persist after an OS crash or not - so, force a restart. The
    2486                 :             :          * restart would try to fsync this again till it works.
    2487                 :             :          */
    2488                 :           0 :         START_CRIT_SECTION();
    2489                 :             : 
    2490                 :           0 :         fsync_fname(path, true);
    2491                 :           0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2492                 :             : 
    2493         [ #  # ]:           0 :         END_CRIT_SECTION();
    2494                 :           0 : }
    2495                 :             : 
    2496                 :             : /*
    2497                 :             :  * Shared functionality between saving and creating a replication slot.
    2498                 :             :  */
    2499                 :             : static void
    2500                 :           0 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2501                 :             : {
    2502                 :           0 :         char            tmppath[MAXPGPATH];
    2503                 :           0 :         char            path[MAXPGPATH];
    2504                 :           0 :         int                     fd;
    2505                 :           0 :         ReplicationSlotOnDisk cp;
    2506                 :           0 :         bool            was_dirty;
    2507                 :             : 
    2508                 :             :         /* first check whether there's something to write out */
    2509         [ #  # ]:           0 :         SpinLockAcquire(&slot->mutex);
    2510                 :           0 :         was_dirty = slot->dirty;
    2511                 :           0 :         slot->just_dirtied = false;
    2512                 :           0 :         SpinLockRelease(&slot->mutex);
    2513                 :             : 
    2514                 :             :         /* and don't do anything if there's nothing to write */
    2515         [ #  # ]:           0 :         if (!was_dirty)
    2516                 :           0 :                 return;
    2517                 :             : 
    2518                 :           0 :         LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2519                 :             : 
    2520                 :             :         /* silence valgrind :( */
    2521                 :           0 :         memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2522                 :             : 
    2523                 :           0 :         sprintf(tmppath, "%s/state.tmp", dir);
    2524                 :           0 :         sprintf(path, "%s/state", dir);
    2525                 :             : 
    2526                 :           0 :         fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2527         [ #  # ]:           0 :         if (fd < 0)
    2528                 :             :         {
    2529                 :             :                 /*
    2530                 :             :                  * If not an ERROR, then release the lock before returning.  In case
    2531                 :             :                  * of an ERROR, the error recovery path automatically releases the
    2532                 :             :                  * lock, but no harm in explicitly releasing even in that case.  Note
    2533                 :             :                  * that LWLockRelease() could affect errno.
    2534                 :             :                  */
    2535                 :           0 :                 int                     save_errno = errno;
    2536                 :             : 
    2537                 :           0 :                 LWLockRelease(&slot->io_in_progress_lock);
    2538                 :           0 :                 errno = save_errno;
    2539   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    2540                 :             :                                 (errcode_for_file_access(),
    2541                 :             :                                  errmsg("could not create file \"%s\": %m",
    2542                 :             :                                                 tmppath)));
    2543                 :             :                 return;
    2544                 :           0 :         }
    2545                 :             : 
    2546                 :           0 :         cp.magic = SLOT_MAGIC;
    2547                 :           0 :         INIT_CRC32C(cp.checksum);
    2548                 :           0 :         cp.version = SLOT_VERSION;
    2549                 :           0 :         cp.length = ReplicationSlotOnDiskV2Size;
    2550                 :             : 
    2551         [ #  # ]:           0 :         SpinLockAcquire(&slot->mutex);
    2552                 :             : 
    2553                 :           0 :         memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2554                 :             : 
    2555                 :           0 :         SpinLockRelease(&slot->mutex);
    2556                 :             : 
    2557                 :           0 :         COMP_CRC32C(cp.checksum,
    2558                 :             :                                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2559                 :             :                                 ReplicationSlotOnDiskChecksummedSize);
    2560                 :           0 :         FIN_CRC32C(cp.checksum);
    2561                 :             : 
    2562                 :           0 :         errno = 0;
    2563                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2564         [ #  # ]:           0 :         if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2565                 :             :         {
    2566                 :           0 :                 int                     save_errno = errno;
    2567                 :             : 
    2568                 :           0 :                 pgstat_report_wait_end();
    2569                 :           0 :                 CloseTransientFile(fd);
    2570                 :           0 :                 unlink(tmppath);
    2571                 :           0 :                 LWLockRelease(&slot->io_in_progress_lock);
    2572                 :             : 
    2573                 :             :                 /* if write didn't set errno, assume problem is no disk space */
    2574         [ #  # ]:           0 :                 errno = save_errno ? save_errno : ENOSPC;
    2575   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    2576                 :             :                                 (errcode_for_file_access(),
    2577                 :             :                                  errmsg("could not write to file \"%s\": %m",
    2578                 :             :                                                 tmppath)));
    2579                 :             :                 return;
    2580                 :           0 :         }
    2581                 :           0 :         pgstat_report_wait_end();
    2582                 :             : 
    2583                 :             :         /* fsync the temporary file */
    2584                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2585         [ #  # ]:           0 :         if (pg_fsync(fd) != 0)
    2586                 :             :         {
    2587                 :           0 :                 int                     save_errno = errno;
    2588                 :             : 
    2589                 :           0 :                 pgstat_report_wait_end();
    2590                 :           0 :                 CloseTransientFile(fd);
    2591                 :           0 :                 unlink(tmppath);
    2592                 :           0 :                 LWLockRelease(&slot->io_in_progress_lock);
    2593                 :             : 
    2594                 :           0 :                 errno = save_errno;
    2595   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    2596                 :             :                                 (errcode_for_file_access(),
    2597                 :             :                                  errmsg("could not fsync file \"%s\": %m",
    2598                 :             :                                                 tmppath)));
    2599                 :             :                 return;
    2600                 :           0 :         }
    2601                 :           0 :         pgstat_report_wait_end();
    2602                 :             : 
    2603         [ #  # ]:           0 :         if (CloseTransientFile(fd) != 0)
    2604                 :             :         {
    2605                 :           0 :                 int                     save_errno = errno;
    2606                 :             : 
    2607                 :           0 :                 unlink(tmppath);
    2608                 :           0 :                 LWLockRelease(&slot->io_in_progress_lock);
    2609                 :             : 
    2610                 :           0 :                 errno = save_errno;
    2611   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    2612                 :             :                                 (errcode_for_file_access(),
    2613                 :             :                                  errmsg("could not close file \"%s\": %m",
    2614                 :             :                                                 tmppath)));
    2615                 :             :                 return;
    2616                 :           0 :         }
    2617                 :             : 
    2618                 :             :         /* rename to permanent file, fsync file and directory */
    2619         [ #  # ]:           0 :         if (rename(tmppath, path) != 0)
    2620                 :             :         {
    2621                 :           0 :                 int                     save_errno = errno;
    2622                 :             : 
    2623                 :           0 :                 unlink(tmppath);
    2624                 :           0 :                 LWLockRelease(&slot->io_in_progress_lock);
    2625                 :             : 
    2626                 :           0 :                 errno = save_errno;
    2627   [ #  #  #  #  :           0 :                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    2628                 :             :                                 (errcode_for_file_access(),
    2629                 :             :                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2630                 :             :                                                 tmppath, path)));
    2631                 :             :                 return;
    2632                 :           0 :         }
    2633                 :             : 
    2634                 :             :         /*
    2635                 :             :          * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2636                 :             :          */
    2637                 :           0 :         START_CRIT_SECTION();
    2638                 :             : 
    2639                 :           0 :         fsync_fname(path, false);
    2640                 :           0 :         fsync_fname(dir, true);
    2641                 :           0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2642                 :             : 
    2643         [ #  # ]:           0 :         END_CRIT_SECTION();
    2644                 :             : 
    2645                 :             :         /*
    2646                 :             :          * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2647                 :             :          * already and remember the confirmed_flush LSN value.
    2648                 :             :          */
    2649         [ #  # ]:           0 :         SpinLockAcquire(&slot->mutex);
    2650         [ #  # ]:           0 :         if (!slot->just_dirtied)
    2651                 :           0 :                 slot->dirty = false;
    2652                 :           0 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2653                 :           0 :         slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2654                 :           0 :         SpinLockRelease(&slot->mutex);
    2655                 :             : 
    2656                 :           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2657         [ #  # ]:           0 : }
    2658                 :             : 
    2659                 :             : /*
    2660                 :             :  * Load a single slot from disk into memory.
    2661                 :             :  */
    2662                 :             : static void
    2663                 :           0 : RestoreSlotFromDisk(const char *name)
    2664                 :             : {
    2665                 :           0 :         ReplicationSlotOnDisk cp;
    2666                 :           0 :         int                     i;
    2667                 :           0 :         char            slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2668                 :           0 :         char            path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2669                 :           0 :         int                     fd;
    2670                 :           0 :         bool            restored = false;
    2671                 :           0 :         int                     readBytes;
    2672                 :           0 :         pg_crc32c       checksum;
    2673                 :           0 :         TimestampTz now = 0;
    2674                 :             : 
    2675                 :             :         /* no need to lock here, no concurrent access allowed yet */
    2676                 :             : 
    2677                 :             :         /* delete temp file if it exists */
    2678                 :           0 :         sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2679                 :           0 :         sprintf(path, "%s/state.tmp", slotdir);
    2680   [ #  #  #  # ]:           0 :         if (unlink(path) < 0 && errno != ENOENT)
    2681   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2682                 :             :                                 (errcode_for_file_access(),
    2683                 :             :                                  errmsg("could not remove file \"%s\": %m", path)));
    2684                 :             : 
    2685                 :           0 :         sprintf(path, "%s/state", slotdir);
    2686                 :             : 
    2687   [ #  #  #  # ]:           0 :         elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2688                 :             : 
    2689                 :             :         /* on some operating systems fsyncing a file requires O_RDWR */
    2690                 :           0 :         fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2691                 :             : 
    2692                 :             :         /*
    2693                 :             :          * We do not need to handle this as we are rename()ing the directory into
    2694                 :             :          * place only after we fsync()ed the state file.
    2695                 :             :          */
    2696         [ #  # ]:           0 :         if (fd < 0)
    2697   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2698                 :             :                                 (errcode_for_file_access(),
    2699                 :             :                                  errmsg("could not open file \"%s\": %m", path)));
    2700                 :             : 
    2701                 :             :         /*
    2702                 :             :          * Sync state file before we're reading from it. We might have crashed
    2703                 :             :          * while it wasn't synced yet and we shouldn't continue on that basis.
    2704                 :             :          */
    2705                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2706         [ #  # ]:           0 :         if (pg_fsync(fd) != 0)
    2707   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2708                 :             :                                 (errcode_for_file_access(),
    2709                 :             :                                  errmsg("could not fsync file \"%s\": %m",
    2710                 :             :                                                 path)));
    2711                 :           0 :         pgstat_report_wait_end();
    2712                 :             : 
    2713                 :             :         /* Also sync the parent directory */
    2714                 :           0 :         START_CRIT_SECTION();
    2715                 :           0 :         fsync_fname(slotdir, true);
    2716         [ #  # ]:           0 :         END_CRIT_SECTION();
    2717                 :             : 
    2718                 :             :         /* read part of statefile that's guaranteed to be version independent */
    2719                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2720                 :           0 :         readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2721                 :           0 :         pgstat_report_wait_end();
    2722         [ #  # ]:           0 :         if (readBytes != ReplicationSlotOnDiskConstantSize)
    2723                 :             :         {
    2724         [ #  # ]:           0 :                 if (readBytes < 0)
    2725   [ #  #  #  # ]:           0 :                         ereport(PANIC,
    2726                 :             :                                         (errcode_for_file_access(),
    2727                 :             :                                          errmsg("could not read file \"%s\": %m", path)));
    2728                 :             :                 else
    2729   [ #  #  #  # ]:           0 :                         ereport(PANIC,
    2730                 :             :                                         (errcode(ERRCODE_DATA_CORRUPTED),
    2731                 :             :                                          errmsg("could not read file \"%s\": read %d of %zu",
    2732                 :             :                                                         path, readBytes,
    2733                 :             :                                                         (Size) ReplicationSlotOnDiskConstantSize)));
    2734                 :           0 :         }
    2735                 :             : 
    2736                 :             :         /* verify magic */
    2737         [ #  # ]:           0 :         if (cp.magic != SLOT_MAGIC)
    2738   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2739                 :             :                                 (errcode(ERRCODE_DATA_CORRUPTED),
    2740                 :             :                                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2741                 :             :                                                 path, cp.magic, SLOT_MAGIC)));
    2742                 :             : 
    2743                 :             :         /* verify version */
    2744         [ #  # ]:           0 :         if (cp.version != SLOT_VERSION)
    2745   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2746                 :             :                                 (errcode(ERRCODE_DATA_CORRUPTED),
    2747                 :             :                                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2748                 :             :                                                 path, cp.version)));
    2749                 :             : 
    2750                 :             :         /* boundary check on length */
    2751         [ #  # ]:           0 :         if (cp.length != ReplicationSlotOnDiskV2Size)
    2752   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2753                 :             :                                 (errcode(ERRCODE_DATA_CORRUPTED),
    2754                 :             :                                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2755                 :             :                                                 path, cp.length)));
    2756                 :             : 
    2757                 :             :         /* Now that we know the size, read the entire file */
    2758                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2759                 :           0 :         readBytes = read(fd,
    2760                 :           0 :                                          (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2761                 :           0 :                                          cp.length);
    2762                 :           0 :         pgstat_report_wait_end();
    2763         [ #  # ]:           0 :         if (readBytes != cp.length)
    2764                 :             :         {
    2765         [ #  # ]:           0 :                 if (readBytes < 0)
    2766   [ #  #  #  # ]:           0 :                         ereport(PANIC,
    2767                 :             :                                         (errcode_for_file_access(),
    2768                 :             :                                          errmsg("could not read file \"%s\": %m", path)));
    2769                 :             :                 else
    2770   [ #  #  #  # ]:           0 :                         ereport(PANIC,
    2771                 :             :                                         (errcode(ERRCODE_DATA_CORRUPTED),
    2772                 :             :                                          errmsg("could not read file \"%s\": read %d of %zu",
    2773                 :             :                                                         path, readBytes, (Size) cp.length)));
    2774                 :           0 :         }
    2775                 :             : 
    2776         [ #  # ]:           0 :         if (CloseTransientFile(fd) != 0)
    2777   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2778                 :             :                                 (errcode_for_file_access(),
    2779                 :             :                                  errmsg("could not close file \"%s\": %m", path)));
    2780                 :             : 
    2781                 :             :         /* now verify the CRC */
    2782                 :           0 :         INIT_CRC32C(checksum);
    2783                 :           0 :         COMP_CRC32C(checksum,
    2784                 :             :                                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2785                 :             :                                 ReplicationSlotOnDiskChecksummedSize);
    2786                 :           0 :         FIN_CRC32C(checksum);
    2787                 :             : 
    2788         [ #  # ]:           0 :         if (!EQ_CRC32C(checksum, cp.checksum))
    2789   [ #  #  #  # ]:           0 :                 ereport(PANIC,
    2790                 :             :                                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2791                 :             :                                                 path, checksum, cp.checksum)));
    2792                 :             : 
    2793                 :             :         /*
    2794                 :             :          * If we crashed with an ephemeral slot active, don't restore but delete
    2795                 :             :          * it.
    2796                 :             :          */
    2797         [ #  # ]:           0 :         if (cp.slotdata.persistency != RS_PERSISTENT)
    2798                 :             :         {
    2799         [ #  # ]:           0 :                 if (!rmtree(slotdir, true))
    2800                 :             :                 {
    2801   [ #  #  #  # ]:           0 :                         ereport(WARNING,
    2802                 :             :                                         (errmsg("could not remove directory \"%s\"",
    2803                 :             :                                                         slotdir)));
    2804                 :           0 :                 }
    2805                 :           0 :                 fsync_fname(PG_REPLSLOT_DIR, true);
    2806                 :           0 :                 return;
    2807                 :             :         }
    2808                 :             : 
    2809                 :             :         /*
    2810                 :             :          * Verify that requirements for the specific slot type are met. That's
    2811                 :             :          * important because if these aren't met we're not guaranteed to retain
    2812                 :             :          * all the necessary resources for the slot.
    2813                 :             :          *
    2814                 :             :          * NB: We have to do so *after* the above checks for ephemeral slots,
    2815                 :             :          * because otherwise a slot that shouldn't exist anymore could prevent
    2816                 :             :          * restarts.
    2817                 :             :          *
    2818                 :             :          * NB: Changing the requirements here also requires adapting
    2819                 :             :          * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2820                 :             :          */
    2821         [ #  # ]:           0 :         if (cp.slotdata.database != InvalidOid)
    2822                 :             :         {
    2823         [ #  # ]:           0 :                 if (wal_level < WAL_LEVEL_REPLICA)
    2824   [ #  #  #  # ]:           0 :                         ereport(FATAL,
    2825                 :             :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2826                 :             :                                          errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2827                 :             :                                                         NameStr(cp.slotdata.name)),
    2828                 :             :                                          errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2829                 :             : 
    2830                 :             :                 /*
    2831                 :             :                  * In standby mode, the hot standby must be enabled. This check is
    2832                 :             :                  * necessary to ensure logical slots are invalidated when they become
    2833                 :             :                  * incompatible due to insufficient wal_level. Otherwise, if the
    2834                 :             :                  * primary reduces effective_wal_level < logical while hot standby is
    2835                 :             :                  * disabled, primary disable logical decoding while hot standby is
    2836                 :             :                  * disabled, logical slots would remain valid even after promotion.
    2837                 :             :                  */
    2838   [ #  #  #  # ]:           0 :                 if (StandbyMode && !EnableHotStandby)
    2839   [ #  #  #  # ]:           0 :                         ereport(FATAL,
    2840                 :             :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2841                 :             :                                          errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
    2842                 :             :                                                         NameStr(cp.slotdata.name)),
    2843                 :             :                                          errhint("Change \"hot_standby\" to be \"on\".")));
    2844                 :           0 :         }
    2845         [ #  # ]:           0 :         else if (wal_level < WAL_LEVEL_REPLICA)
    2846   [ #  #  #  # ]:           0 :                 ereport(FATAL,
    2847                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2848                 :             :                                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2849                 :             :                                                 NameStr(cp.slotdata.name)),
    2850                 :             :                                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2851                 :             : 
    2852                 :             :         /* nothing can be active yet, don't lock anything */
    2853         [ #  # ]:           0 :         for (i = 0; i < max_replication_slots; i++)
    2854                 :             :         {
    2855                 :           0 :                 ReplicationSlot *slot;
    2856                 :             : 
    2857                 :           0 :                 slot = &ReplicationSlotCtl->replication_slots[i];
    2858                 :             : 
    2859         [ #  # ]:           0 :                 if (slot->in_use)
    2860                 :           0 :                         continue;
    2861                 :             : 
    2862                 :             :                 /* restore the entire set of persistent data */
    2863                 :           0 :                 memcpy(&slot->data, &cp.slotdata,
    2864                 :             :                            sizeof(ReplicationSlotPersistentData));
    2865                 :             : 
    2866                 :             :                 /* initialize in memory state */
    2867                 :           0 :                 slot->effective_xmin = cp.slotdata.xmin;
    2868                 :           0 :                 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2869                 :           0 :                 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2870                 :           0 :                 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2871                 :             : 
    2872                 :           0 :                 slot->candidate_catalog_xmin = InvalidTransactionId;
    2873                 :           0 :                 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2874                 :           0 :                 slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2875                 :           0 :                 slot->candidate_restart_valid = InvalidXLogRecPtr;
    2876                 :             : 
    2877                 :           0 :                 slot->in_use = true;
    2878                 :           0 :                 slot->active_pid = 0;
    2879                 :             : 
    2880                 :             :                 /*
    2881                 :             :                  * Set the time since the slot has become inactive after loading the
    2882                 :             :                  * slot from the disk into memory. Whoever acquires the slot i.e.
    2883                 :             :                  * makes the slot active will reset it. Use the same inactive_since
    2884                 :             :                  * time for all the slots.
    2885                 :             :                  */
    2886         [ #  # ]:           0 :                 if (now == 0)
    2887                 :           0 :                         now = GetCurrentTimestamp();
    2888                 :             : 
    2889                 :           0 :                 ReplicationSlotSetInactiveSince(slot, now, false);
    2890                 :             : 
    2891                 :           0 :                 restored = true;
    2892                 :           0 :                 break;
    2893         [ #  # ]:           0 :         }
    2894                 :             : 
    2895         [ #  # ]:           0 :         if (!restored)
    2896   [ #  #  #  # ]:           0 :                 ereport(FATAL,
    2897                 :             :                                 (errmsg("too many replication slots active before shutdown"),
    2898                 :             :                                  errhint("Increase \"max_replication_slots\" and try again.")));
    2899                 :           0 : }
    2900                 :             : 
    2901                 :             : /*
    2902                 :             :  * Maps an invalidation reason for a replication slot to
    2903                 :             :  * ReplicationSlotInvalidationCause.
    2904                 :             :  */
    2905                 :             : ReplicationSlotInvalidationCause
    2906                 :           0 : GetSlotInvalidationCause(const char *cause_name)
    2907                 :             : {
    2908         [ #  # ]:           0 :         Assert(cause_name);
    2909                 :             : 
    2910                 :             :         /* Search lookup table for the cause having this name */
    2911   [ #  #  #  #  :           0 :         for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
                      # ]
    2912                 :             :         {
    2913         [ #  # ]:           0 :                 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
    2914                 :           0 :                         return SlotInvalidationCauses[i].cause;
    2915                 :           0 :         }
    2916                 :             : 
    2917                 :           0 :         Assert(false);
    2918                 :           0 :         return RS_INVAL_NONE;           /* to keep compiler quiet */
    2919                 :           0 : }
    2920                 :             : 
    2921                 :             : /*
    2922                 :             :  * Maps a ReplicationSlotInvalidationCause to the invalidation
    2923                 :             :  * reason for a replication slot.
    2924                 :             :  */
    2925                 :             : const char *
    2926                 :           0 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
    2927                 :             : {
    2928                 :             :         /* Search lookup table for the name of this cause */
    2929   [ #  #  #  #  :           0 :         for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
                      # ]
    2930                 :             :         {
    2931         [ #  # ]:           0 :                 if (SlotInvalidationCauses[i].cause == cause)
    2932                 :           0 :                         return SlotInvalidationCauses[i].cause_name;
    2933                 :           0 :         }
    2934                 :             : 
    2935                 :           0 :         Assert(false);
    2936                 :           0 :         return "none";                                /* to keep compiler quiet */
    2937                 :           0 : }
    2938                 :             : 
    2939                 :             : /*
    2940                 :             :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2941                 :             :  *
    2942                 :             :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2943                 :             :  */
    2944                 :             : static bool
    2945                 :           0 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2946                 :             : {
    2947                 :             :         /* Verify syntax and parse string into a list of identifiers */
    2948         [ #  # ]:           0 :         if (!SplitIdentifierString(rawname, ',', elemlist))
    2949                 :             :         {
    2950                 :           0 :                 GUC_check_errdetail("List syntax is invalid.");
    2951                 :           0 :                 return false;
    2952                 :             :         }
    2953                 :             : 
    2954                 :             :         /* Iterate the list to validate each slot name */
    2955   [ #  #  #  #  :           0 :         foreach_ptr(char, name, *elemlist)
          #  #  #  #  #  
             #  #  #  # ]
    2956                 :             :         {
    2957                 :           0 :                 int                     err_code;
    2958                 :           0 :                 char       *err_msg = NULL;
    2959                 :           0 :                 char       *err_hint = NULL;
    2960                 :             : 
    2961         [ #  # ]:           0 :                 if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
    2962                 :             :                                                                                                  &err_msg, &err_hint))
    2963                 :             :                 {
    2964                 :           0 :                         GUC_check_errcode(err_code);
    2965                 :           0 :                         GUC_check_errdetail("%s", err_msg);
    2966         [ #  # ]:           0 :                         if (err_hint != NULL)
    2967                 :           0 :                                 GUC_check_errhint("%s", err_hint);
    2968                 :           0 :                         return false;
    2969                 :             :                 }
    2970         [ #  # ]:           0 :         }
    2971                 :             : 
    2972                 :           0 :         return true;
    2973                 :           0 : }
    2974                 :             : 
    2975                 :             : /*
    2976                 :             :  * GUC check_hook for synchronized_standby_slots
    2977                 :             :  */
    2978                 :             : bool
    2979                 :           6 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    2980                 :             : {
    2981                 :           6 :         char       *rawname;
    2982                 :           6 :         char       *ptr;
    2983                 :           6 :         List       *elemlist;
    2984                 :           6 :         int                     size;
    2985                 :           6 :         bool            ok;
    2986                 :           6 :         SyncStandbySlotsConfigData *config;
    2987                 :             : 
    2988         [ -  + ]:           6 :         if ((*newval)[0] == '\0')
    2989                 :           6 :                 return true;
    2990                 :             : 
    2991                 :             :         /* Need a modifiable copy of the GUC string */
    2992                 :           0 :         rawname = pstrdup(*newval);
    2993                 :             : 
    2994                 :             :         /* Now verify if the specified slots exist and have correct type */
    2995                 :           0 :         ok = validate_sync_standby_slots(rawname, &elemlist);
    2996                 :             : 
    2997   [ #  #  #  # ]:           0 :         if (!ok || elemlist == NIL)
    2998                 :             :         {
    2999                 :           0 :                 pfree(rawname);
    3000                 :           0 :                 list_free(elemlist);
    3001                 :           0 :                 return ok;
    3002                 :             :         }
    3003                 :             : 
    3004                 :             :         /* Compute the size required for the SyncStandbySlotsConfigData struct */
    3005                 :           0 :         size = offsetof(SyncStandbySlotsConfigData, slot_names);
    3006   [ #  #  #  #  :           0 :         foreach_ptr(char, slot_name, elemlist)
             #  #  #  # ]
    3007                 :           0 :                 size += strlen(slot_name) + 1;
    3008                 :             : 
    3009                 :             :         /* GUC extra value must be guc_malloc'd, not palloc'd */
    3010                 :           0 :         config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    3011         [ #  # ]:           0 :         if (!config)
    3012                 :           0 :                 return false;
    3013                 :             : 
    3014                 :             :         /* Transform the data into SyncStandbySlotsConfigData */
    3015                 :           0 :         config->nslotnames = list_length(elemlist);
    3016                 :             : 
    3017                 :           0 :         ptr = config->slot_names;
    3018   [ #  #  #  #  :           0 :         foreach_ptr(char, slot_name, elemlist)
             #  #  #  # ]
    3019                 :             :         {
    3020                 :           0 :                 strcpy(ptr, slot_name);
    3021                 :           0 :                 ptr += strlen(slot_name) + 1;
    3022                 :           0 :         }
    3023                 :             : 
    3024                 :           0 :         *extra = config;
    3025                 :             : 
    3026                 :           0 :         pfree(rawname);
    3027                 :           0 :         list_free(elemlist);
    3028                 :           0 :         return true;
    3029                 :           6 : }
    3030                 :             : 
    3031                 :             : /*
    3032                 :             :  * GUC assign_hook for synchronized_standby_slots
    3033                 :             :  */
    3034                 :             : void
    3035                 :           6 : assign_synchronized_standby_slots(const char *newval, void *extra)
    3036                 :             : {
    3037                 :             :         /*
    3038                 :             :          * The standby slots may have changed, so we must recompute the oldest
    3039                 :             :          * LSN.
    3040                 :             :          */
    3041                 :           6 :         ss_oldest_flush_lsn = InvalidXLogRecPtr;
    3042                 :             : 
    3043                 :           6 :         synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    3044                 :           6 : }
    3045                 :             : 
    3046                 :             : /*
    3047                 :             :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    3048                 :             :  */
    3049                 :             : bool
    3050                 :           0 : SlotExistsInSyncStandbySlots(const char *slot_name)
    3051                 :             : {
    3052                 :           0 :         const char *standby_slot_name;
    3053                 :             : 
    3054                 :             :         /* Return false if there is no value in synchronized_standby_slots */
    3055         [ #  # ]:           0 :         if (synchronized_standby_slots_config == NULL)
    3056                 :           0 :                 return false;
    3057                 :             : 
    3058                 :             :         /*
    3059                 :             :          * XXX: We are not expecting this list to be long so a linear search
    3060                 :             :          * shouldn't hurt but if that turns out not to be true then we can cache
    3061                 :             :          * this information for each WalSender as well.
    3062                 :             :          */
    3063                 :           0 :         standby_slot_name = synchronized_standby_slots_config->slot_names;
    3064   [ #  #  #  # ]:           0 :         for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3065                 :             :         {
    3066         [ #  # ]:           0 :                 if (strcmp(standby_slot_name, slot_name) == 0)
    3067                 :           0 :                         return true;
    3068                 :             : 
    3069                 :           0 :                 standby_slot_name += strlen(standby_slot_name) + 1;
    3070                 :           0 :         }
    3071                 :             : 
    3072                 :           0 :         return false;
    3073                 :           0 : }
    3074                 :             : 
    3075                 :             : /*
    3076                 :             :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    3077                 :             :  * the given WAL location, false otherwise.
    3078                 :             :  *
    3079                 :             :  * The elevel parameter specifies the error level used for logging messages
    3080                 :             :  * related to slots that do not exist, are invalidated, or are inactive.
    3081                 :             :  */
    3082                 :             : bool
    3083                 :           0 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    3084                 :             : {
    3085                 :           0 :         const char *name;
    3086                 :           0 :         int                     caught_up_slot_num = 0;
    3087                 :           0 :         XLogRecPtr      min_restart_lsn = InvalidXLogRecPtr;
    3088                 :             : 
    3089                 :             :         /*
    3090                 :             :          * Don't need to wait for the standbys to catch up if there is no value in
    3091                 :             :          * synchronized_standby_slots.
    3092                 :             :          */
    3093         [ #  # ]:           0 :         if (synchronized_standby_slots_config == NULL)
    3094                 :           0 :                 return true;
    3095                 :             : 
    3096                 :             :         /*
    3097                 :             :          * Don't need to wait for the standbys to catch up if we are on a standby
    3098                 :             :          * server, since we do not support syncing slots to cascading standbys.
    3099                 :             :          */
    3100         [ #  # ]:           0 :         if (RecoveryInProgress())
    3101                 :           0 :                 return true;
    3102                 :             : 
    3103                 :             :         /*
    3104                 :             :          * Don't need to wait for the standbys to catch up if they are already
    3105                 :             :          * beyond the specified WAL location.
    3106                 :             :          */
    3107   [ #  #  #  # ]:           0 :         if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
    3108                 :           0 :                 ss_oldest_flush_lsn >= wait_for_lsn)
    3109                 :           0 :                 return true;
    3110                 :             : 
    3111                 :             :         /*
    3112                 :             :          * To prevent concurrent slot dropping and creation while filtering the
    3113                 :             :          * slots, take the ReplicationSlotControlLock outside of the loop.
    3114                 :             :          */
    3115                 :           0 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    3116                 :             : 
    3117                 :           0 :         name = synchronized_standby_slots_config->slot_names;
    3118         [ #  # ]:           0 :         for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3119                 :             :         {
    3120                 :           0 :                 XLogRecPtr      restart_lsn;
    3121                 :           0 :                 bool            invalidated;
    3122                 :           0 :                 bool            inactive;
    3123                 :           0 :                 ReplicationSlot *slot;
    3124                 :             : 
    3125                 :           0 :                 slot = SearchNamedReplicationSlot(name, false);
    3126                 :             : 
    3127                 :             :                 /*
    3128                 :             :                  * If a slot name provided in synchronized_standby_slots does not
    3129                 :             :                  * exist, report a message and exit the loop.
    3130                 :             :                  */
    3131         [ #  # ]:           0 :                 if (!slot)
    3132                 :             :                 {
    3133   [ #  #  #  #  :           0 :                         ereport(elevel,
          #  #  #  #  #  
                      # ]
    3134                 :             :                                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3135                 :             :                                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    3136                 :             :                                                    name, "synchronized_standby_slots"),
    3137                 :             :                                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3138                 :             :                                                           name),
    3139                 :             :                                         errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    3140                 :             :                                                         name, "synchronized_standby_slots"));
    3141                 :           0 :                         break;
    3142                 :             :                 }
    3143                 :             : 
    3144                 :             :                 /* Same as above: if a slot is not physical, exit the loop. */
    3145         [ #  # ]:           0 :                 if (SlotIsLogical(slot))
    3146                 :             :                 {
    3147   [ #  #  #  #  :           0 :                         ereport(elevel,
          #  #  #  #  #  
                      # ]
    3148                 :             :                                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3149                 :             :                                         errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    3150                 :             :                                                    name, "synchronized_standby_slots"),
    3151                 :             :                                         errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    3152                 :             :                                                           name),
    3153                 :             :                                         errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    3154                 :             :                                                         name, "synchronized_standby_slots"));
    3155                 :           0 :                         break;
    3156                 :             :                 }
    3157                 :             : 
    3158         [ #  # ]:           0 :                 SpinLockAcquire(&slot->mutex);
    3159                 :           0 :                 restart_lsn = slot->data.restart_lsn;
    3160                 :           0 :                 invalidated = slot->data.invalidated != RS_INVAL_NONE;
    3161                 :           0 :                 inactive = slot->active_pid == 0;
    3162                 :           0 :                 SpinLockRelease(&slot->mutex);
    3163                 :             : 
    3164         [ #  # ]:           0 :                 if (invalidated)
    3165                 :             :                 {
    3166                 :             :                         /* Specified physical slot has been invalidated */
    3167   [ #  #  #  #  :           0 :                         ereport(elevel,
          #  #  #  #  #  
                      # ]
    3168                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3169                 :             :                                         errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    3170                 :             :                                                    name, "synchronized_standby_slots"),
    3171                 :             :                                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3172                 :             :                                                           name),
    3173                 :             :                                         errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    3174                 :             :                                                         name, "synchronized_standby_slots"));
    3175                 :           0 :                         break;
    3176                 :             :                 }
    3177                 :             : 
    3178   [ #  #  #  # ]:           0 :                 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
    3179                 :             :                 {
    3180                 :             :                         /* Log a message if no active_pid for this physical slot */
    3181         [ #  # ]:           0 :                         if (inactive)
    3182   [ #  #  #  #  :           0 :                                 ereport(elevel,
          #  #  #  #  #  
                      # ]
    3183                 :             :                                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3184                 :             :                                                 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    3185                 :             :                                                            name, "synchronized_standby_slots"),
    3186                 :             :                                                 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3187                 :             :                                                                   name),
    3188                 :             :                                                 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    3189                 :             :                                                                 name, "synchronized_standby_slots"));
    3190                 :             : 
    3191                 :             :                         /* Continue if the current slot hasn't caught up. */
    3192                 :           0 :                         break;
    3193                 :             :                 }
    3194                 :             : 
    3195         [ #  # ]:           0 :                 Assert(restart_lsn >= wait_for_lsn);
    3196                 :             : 
    3197   [ #  #  #  # ]:           0 :                 if (!XLogRecPtrIsValid(min_restart_lsn) ||
    3198                 :           0 :                         min_restart_lsn > restart_lsn)
    3199                 :           0 :                         min_restart_lsn = restart_lsn;
    3200                 :             : 
    3201                 :           0 :                 caught_up_slot_num++;
    3202                 :             : 
    3203                 :           0 :                 name += strlen(name) + 1;
    3204         [ #  # ]:           0 :         }
    3205                 :             : 
    3206                 :           0 :         LWLockRelease(ReplicationSlotControlLock);
    3207                 :             : 
    3208                 :             :         /*
    3209                 :             :          * Return false if not all the standbys have caught up to the specified
    3210                 :             :          * WAL location.
    3211                 :             :          */
    3212         [ #  # ]:           0 :         if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    3213                 :           0 :                 return false;
    3214                 :             : 
    3215                 :             :         /* The ss_oldest_flush_lsn must not retreat. */
    3216   [ #  #  #  # ]:           0 :         Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
    3217                 :             :                    min_restart_lsn >= ss_oldest_flush_lsn);
    3218                 :             : 
    3219                 :           0 :         ss_oldest_flush_lsn = min_restart_lsn;
    3220                 :             : 
    3221                 :           0 :         return true;
    3222                 :           0 : }
    3223                 :             : 
    3224                 :             : /*
    3225                 :             :  * Wait for physical standbys to confirm receiving the given lsn.
    3226                 :             :  *
    3227                 :             :  * Used by logical decoding SQL functions. It waits for physical standbys
    3228                 :             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    3229                 :             :  */
    3230                 :             : void
    3231                 :           0 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    3232                 :             : {
    3233                 :             :         /*
    3234                 :             :          * Don't need to wait for the standby to catch up if the current acquired
    3235                 :             :          * slot is not a logical failover slot, or there is no value in
    3236                 :             :          * synchronized_standby_slots.
    3237                 :             :          */
    3238   [ #  #  #  # ]:           0 :         if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    3239                 :           0 :                 return;
    3240                 :             : 
    3241                 :           0 :         ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    3242                 :             : 
    3243                 :           0 :         for (;;)
    3244                 :             :         {
    3245         [ #  # ]:           0 :                 CHECK_FOR_INTERRUPTS();
    3246                 :             : 
    3247         [ #  # ]:           0 :                 if (ConfigReloadPending)
    3248                 :             :                 {
    3249                 :           0 :                         ConfigReloadPending = false;
    3250                 :           0 :                         ProcessConfigFile(PGC_SIGHUP);
    3251                 :           0 :                 }
    3252                 :             : 
    3253                 :             :                 /* Exit if done waiting for every slot. */
    3254         [ #  # ]:           0 :                 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    3255                 :           0 :                         break;
    3256                 :             : 
    3257                 :             :                 /*
    3258                 :             :                  * Wait for the slots in the synchronized_standby_slots to catch up,
    3259                 :             :                  * but use a timeout (1s) so we can also check if the
    3260                 :             :                  * synchronized_standby_slots has been changed.
    3261                 :             :                  */
    3262                 :           0 :                 ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    3263                 :             :                                                                         WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    3264                 :             :         }
    3265                 :             : 
    3266                 :           0 :         ConditionVariableCancelSleep();
    3267                 :           0 : }
        

Generated by: LCOV version 2.3.2-1