Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slot.c
4 : : * Replication slot management.
5 : : *
6 : : *
7 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/replication/slot.c
12 : : *
13 : : * NOTES
14 : : *
15 : : * Replication slots are used to keep state about replication streams
16 : : * originating from this cluster. Their primary purpose is to prevent the
17 : : * premature removal of WAL or of old tuple versions in a manner that would
18 : : * interfere with replication; they are also useful for monitoring purposes.
19 : : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : : * on standbys (to support cascading setups). The requirement that slots be
21 : : * usable on standbys precludes storing them in the system catalogs.
22 : : *
23 : : * Each replication slot gets its own directory inside the directory
24 : : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : : * contain the slot's own data. Additional data can be stored alongside that
26 : : * file if required. While the server is running, the state data is also
27 : : * cached in memory for efficiency.
28 : : *
29 : : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : : * of a slot. The remaining data in each slot is protected by its mutex.
33 : : *
34 : : *-------------------------------------------------------------------------
35 : : */
36 : :
37 : : #include "postgres.h"
38 : :
39 : : #include <unistd.h>
40 : : #include <sys/stat.h>
41 : :
42 : : #include "access/transam.h"
43 : : #include "access/xlog_internal.h"
44 : : #include "access/xlogrecovery.h"
45 : : #include "common/file_utils.h"
46 : : #include "common/string.h"
47 : : #include "miscadmin.h"
48 : : #include "pgstat.h"
49 : : #include "postmaster/interrupt.h"
50 : : #include "replication/logicallauncher.h"
51 : : #include "replication/slotsync.h"
52 : : #include "replication/slot.h"
53 : : #include "replication/walsender_private.h"
54 : : #include "storage/fd.h"
55 : : #include "storage/ipc.h"
56 : : #include "storage/proc.h"
57 : : #include "storage/procarray.h"
58 : : #include "utils/builtins.h"
59 : : #include "utils/guc_hooks.h"
60 : : #include "utils/injection_point.h"
61 : : #include "utils/varlena.h"
62 : :
63 : : /*
64 : : * Replication slot on-disk data structure.
65 : : */
66 : : typedef struct ReplicationSlotOnDisk
67 : : {
68 : : /* first part of this struct needs to be version independent */
69 : :
70 : : /* data not covered by checksum */
71 : : uint32 magic;
72 : : pg_crc32c checksum;
73 : :
74 : : /* data covered by checksum */
75 : : uint32 version;
76 : : uint32 length;
77 : :
78 : : /*
79 : : * The actual data in the slot that follows can differ based on the above
80 : : * 'version'.
81 : : */
82 : :
83 : : ReplicationSlotPersistentData slotdata;
84 : : } ReplicationSlotOnDisk;
85 : :
86 : : /*
87 : : * Struct for the configuration of synchronized_standby_slots.
88 : : *
89 : : * Note: this must be a flat representation that can be held in a single chunk
90 : : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 : : * synchronized_standby_slots GUC.
92 : : */
93 : : typedef struct
94 : : {
95 : : /* Number of slot names in the slot_names[] */
96 : : int nslotnames;
97 : :
98 : : /*
99 : : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 : : */
101 : : char slot_names[FLEXIBLE_ARRAY_MEMBER];
102 : : } SyncStandbySlotsConfigData;
103 : :
104 : : /*
105 : : * Lookup table for slot invalidation causes.
106 : : */
107 : : typedef struct SlotInvalidationCauseMap
108 : : {
109 : : ReplicationSlotInvalidationCause cause;
110 : : const char *cause_name;
111 : : } SlotInvalidationCauseMap;
112 : :
113 : : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
114 : : {RS_INVAL_NONE, "none"},
115 : : {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 : : {RS_INVAL_HORIZON, "rows_removed"},
117 : : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 : : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119 : : };
120 : :
121 : : /*
122 : : * Ensure that the lookup table is up-to-date with the enums defined in
123 : : * ReplicationSlotInvalidationCause.
124 : : */
125 : : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
126 : : "array length mismatch");
127 : :
128 : : /* size of version independent data */
129 : : #define ReplicationSlotOnDiskConstantSize \
130 : : offsetof(ReplicationSlotOnDisk, slotdata)
131 : : /* size of the part of the slot not covered by the checksum */
132 : : #define ReplicationSlotOnDiskNotChecksummedSize \
133 : : offsetof(ReplicationSlotOnDisk, version)
134 : : /* size of the part covered by the checksum */
135 : : #define ReplicationSlotOnDiskChecksummedSize \
136 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137 : : /* size of the slot data that is version dependent */
138 : : #define ReplicationSlotOnDiskV2Size \
139 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140 : :
141 : : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
142 : : #define SLOT_VERSION 5 /* version for new files */
143 : :
144 : : /* Control array for replication slot management */
145 : : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
146 : :
147 : : /* My backend's replication slot in the shared memory array */
148 : : ReplicationSlot *MyReplicationSlot = NULL;
149 : :
150 : : /* GUC variables */
151 : : int max_replication_slots = 10; /* the maximum number of replication
152 : : * slots */
153 : :
154 : : /*
155 : : * Invalidate replication slots that have remained idle longer than this
156 : : * duration; '0' disables it.
157 : : */
158 : : int idle_replication_slot_timeout_secs = 0;
159 : :
160 : : /*
161 : : * This GUC lists streaming replication standby server slot names that
162 : : * logical WAL sender processes will wait for.
163 : : */
164 : : char *synchronized_standby_slots;
165 : :
166 : : /* This is the parsed and cached configuration for synchronized_standby_slots */
167 : : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
168 : :
169 : : /*
170 : : * Oldest LSN that has been confirmed to be flushed to the standbys
171 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 : : */
173 : : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
174 : :
175 : : static void ReplicationSlotShmemExit(int code, Datum arg);
176 : : static bool IsSlotForConflictCheck(const char *name);
177 : : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178 : :
179 : : /* internal persistency functions */
180 : : static void RestoreSlotFromDisk(const char *name);
181 : : static void CreateSlotOnDisk(ReplicationSlot *slot);
182 : : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183 : :
184 : : /*
185 : : * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 : : */
187 : : Size
188 : 21 : ReplicationSlotsShmemSize(void)
189 : : {
190 : 21 : Size size = 0;
191 : :
192 [ + - ]: 21 : if (max_replication_slots == 0)
193 : 0 : return size;
194 : :
195 : 21 : size = offsetof(ReplicationSlotCtlData, replication_slots);
196 : 42 : size = add_size(size,
197 : 21 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
198 : :
199 : 21 : return size;
200 : 21 : }
201 : :
202 : : /*
203 : : * Allocate and initialize shared memory for replication slots.
204 : : */
205 : : void
206 : 6 : ReplicationSlotsShmemInit(void)
207 : : {
208 : 6 : bool found;
209 : :
210 [ + - ]: 6 : if (max_replication_slots == 0)
211 : 0 : return;
212 : :
213 : 6 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
214 : 6 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 : : &found);
216 : :
217 [ - + ]: 6 : if (!found)
218 : : {
219 : 6 : int i;
220 : :
221 : : /* First time through, so initialize */
222 [ + - + - : 6 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+ - + - #
# ]
223 : :
224 [ + + ]: 66 : for (i = 0; i < max_replication_slots; i++)
225 : : {
226 : 60 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
227 : :
228 : : /* everything else is zeroed by the memset above */
229 : 60 : SpinLockInit(&slot->mutex);
230 : 60 : LWLockInitialize(&slot->io_in_progress_lock,
231 : : LWTRANCHE_REPLICATION_SLOT_IO);
232 : 60 : ConditionVariableInit(&slot->active_cv);
233 : 60 : }
234 : 6 : }
235 [ - + ]: 6 : }
236 : :
237 : : /*
238 : : * Register the callback for replication slot cleanup and releasing.
239 : : */
240 : : void
241 : 806 : ReplicationSlotInitialize(void)
242 : : {
243 : 806 : before_shmem_exit(ReplicationSlotShmemExit, 0);
244 : 806 : }
245 : :
246 : : /*
247 : : * Release and cleanup replication slots.
248 : : */
249 : : static void
250 : 806 : ReplicationSlotShmemExit(int code, Datum arg)
251 : : {
252 : : /* Make sure active replication slots are released */
253 [ + - ]: 806 : if (MyReplicationSlot != NULL)
254 : 0 : ReplicationSlotRelease();
255 : :
256 : : /* Also cleanup all the temporary slots. */
257 : 806 : ReplicationSlotCleanup(false);
258 : 806 : }
259 : :
260 : : /*
261 : : * Check whether the passed slot name is valid and report errors at elevel.
262 : : *
263 : : * See comments for ReplicationSlotValidateNameInternal().
264 : : */
265 : : bool
266 : 3 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
267 : : int elevel)
268 : : {
269 : 3 : int err_code;
270 : 3 : char *err_msg = NULL;
271 : 3 : char *err_hint = NULL;
272 : :
273 [ + + ]: 3 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 : : &err_code, &err_msg, &err_hint))
275 : : {
276 : : /*
277 : : * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 : : * and errhint(), since the messages from
279 : : * ReplicationSlotValidateNameInternal() are already translated. This
280 : : * avoids double translation.
281 : : */
282 [ - + # # : 2 : ereport(elevel,
+ + - + #
# # # ]
283 : : errcode(err_code),
284 : : errmsg_internal("%s", err_msg),
285 : : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286 : :
287 : 0 : pfree(err_msg);
288 [ # # ]: 0 : if (err_hint != NULL)
289 : 0 : pfree(err_hint);
290 : 0 : return false;
291 : : }
292 : :
293 : 1 : return true;
294 : 1 : }
295 : :
296 : : /*
297 : : * Check whether the passed slot name is valid.
298 : : *
299 : : * An error will be reported for a reserved replication slot name if
300 : : * allow_reserved_name is set to false.
301 : : *
302 : : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 : : * the name to be used as a directory name on every supported OS.
304 : : *
305 : : * Returns true if the slot name is valid. Otherwise, returns false and stores
306 : : * the error code, error message, and optional hint in err_code, err_msg, and
307 : : * err_hint, respectively. The caller is responsible for freeing err_msg and
308 : : * err_hint, which are palloc'd.
309 : : */
310 : : bool
311 : 2 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
312 : : int *err_code, char **err_msg, char **err_hint)
313 : : {
314 : 2 : const char *cp;
315 : :
316 [ + + ]: 2 : if (strlen(name) == 0)
317 : : {
318 : 1 : *err_code = ERRCODE_INVALID_NAME;
319 : 1 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 : 1 : *err_hint = NULL;
321 : 1 : return false;
322 : : }
323 : :
324 [ - + ]: 1 : if (strlen(name) >= NAMEDATALEN)
325 : : {
326 : 0 : *err_code = ERRCODE_NAME_TOO_LONG;
327 : 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 : 0 : *err_hint = NULL;
329 : 0 : return false;
330 : : }
331 : :
332 [ + + ]: 8 : for (cp = name; *cp; cp++)
333 : : {
334 [ + - # # ]: 7 : if (!((*cp >= 'a' && *cp <= 'z')
335 [ # # ]: 7 : || (*cp >= '0' && *cp <= '9')
336 : 0 : || (*cp == '_')))
337 : : {
338 : 0 : *err_code = ERRCODE_INVALID_NAME;
339 : 0 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 : 0 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 : 0 : return false;
342 : : }
343 : 7 : }
344 : :
345 [ + - + - ]: 1 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 : : {
347 : 0 : *err_code = ERRCODE_RESERVED_NAME;
348 : 0 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 : 0 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 : : CONFLICT_DETECTION_SLOT);
351 : 0 : return false;
352 : : }
353 : :
354 : 1 : return true;
355 : 2 : }
356 : :
357 : : /*
358 : : * Return true if the replication slot name is "pg_conflict_detection".
359 : : */
360 : : static bool
361 : 1 : IsSlotForConflictCheck(const char *name)
362 : : {
363 : 1 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364 : : }
365 : :
366 : : /*
367 : : * Create a new replication slot and mark it as used by this backend.
368 : : *
369 : : * name: Name of the slot
370 : : * db_specific: logical decoding is db specific; if the slot is going to
371 : : * be used for that pass true, otherwise false.
372 : : * two_phase: If enabled, allows decoding of prepared transactions.
373 : : * failover: If enabled, allows the slot to be synced to standbys so
374 : : * that logical replication can be resumed after failover.
375 : : * synced: True if the slot is synchronized from the primary server.
376 : : */
377 : : void
378 : 0 : ReplicationSlotCreate(const char *name, bool db_specific,
379 : : ReplicationSlotPersistency persistency,
380 : : bool two_phase, bool failover, bool synced)
381 : : {
382 : 0 : ReplicationSlot *slot = NULL;
383 : 0 : int i;
384 : :
385 [ # # ]: 0 : Assert(MyReplicationSlot == NULL);
386 : :
387 : : /*
388 : : * The logical launcher or pg_upgrade may create or migrate an internal
389 : : * slot, so using a reserved name is allowed in these cases.
390 : : */
391 [ # # ]: 0 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
392 : : ERROR);
393 : :
394 [ # # ]: 0 : if (failover)
395 : : {
396 : : /*
397 : : * Do not allow users to create the failover enabled slots on the
398 : : * standby as we do not support sync to the cascading standby.
399 : : *
400 : : * However, failover enabled slots can be created during slot
401 : : * synchronization because we need to retain the same values as the
402 : : * remote slot.
403 : : */
404 [ # # # # ]: 0 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
405 [ # # # # ]: 0 : ereport(ERROR,
406 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
407 : : errmsg("cannot enable failover for a replication slot created on the standby"));
408 : :
409 : : /*
410 : : * Do not allow users to create failover enabled temporary slots,
411 : : * because temporary slots will not be synced to the standby.
412 : : *
413 : : * However, failover enabled temporary slots can be created during
414 : : * slot synchronization. See the comments atop slotsync.c for details.
415 : : */
416 [ # # # # ]: 0 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
417 [ # # # # ]: 0 : ereport(ERROR,
418 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
419 : : errmsg("cannot enable failover for a temporary replication slot"));
420 : 0 : }
421 : :
422 : : /*
423 : : * If some other backend ran this code concurrently with us, we'd likely
424 : : * both allocate the same slot, and that would be bad. We'd also be at
425 : : * risk of missing a name collision. Also, we don't want to try to create
426 : : * a new slot while somebody's busy cleaning up an old one, because we
427 : : * might both be monkeying with the same directory.
428 : : */
429 : 0 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
430 : :
431 : : /*
432 : : * Check for name collision, and identify an allocatable slot. We need to
433 : : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
434 : : * else can change the in_use flags while we're looking at them.
435 : : */
436 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
437 [ # # ]: 0 : for (i = 0; i < max_replication_slots; i++)
438 : : {
439 : 0 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
440 : :
441 [ # # # # ]: 0 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
442 [ # # # # ]: 0 : ereport(ERROR,
443 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
444 : : errmsg("replication slot \"%s\" already exists", name)));
445 [ # # # # ]: 0 : if (!s->in_use && slot == NULL)
446 : 0 : slot = s;
447 : 0 : }
448 : 0 : LWLockRelease(ReplicationSlotControlLock);
449 : :
450 : : /* If all slots are in use, we're out of luck. */
451 [ # # ]: 0 : if (slot == NULL)
452 [ # # # # ]: 0 : ereport(ERROR,
453 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
454 : : errmsg("all replication slots are in use"),
455 : : errhint("Free one or increase \"max_replication_slots\".")));
456 : :
457 : : /*
458 : : * Since this slot is not in use, nobody should be looking at any part of
459 : : * it other than the in_use field unless they're trying to allocate it.
460 : : * And since we hold ReplicationSlotAllocationLock, nobody except us can
461 : : * be doing that. So it's safe to initialize the slot.
462 : : */
463 [ # # ]: 0 : Assert(!slot->in_use);
464 [ # # ]: 0 : Assert(slot->active_pid == 0);
465 : :
466 : : /* first initialize persistent data */
467 : 0 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
468 : 0 : namestrcpy(&slot->data.name, name);
469 [ # # ]: 0 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
470 : 0 : slot->data.persistency = persistency;
471 : 0 : slot->data.two_phase = two_phase;
472 : 0 : slot->data.two_phase_at = InvalidXLogRecPtr;
473 : 0 : slot->data.failover = failover;
474 : 0 : slot->data.synced = synced;
475 : :
476 : : /* and then data only present in shared memory */
477 : 0 : slot->just_dirtied = false;
478 : 0 : slot->dirty = false;
479 : 0 : slot->effective_xmin = InvalidTransactionId;
480 : 0 : slot->effective_catalog_xmin = InvalidTransactionId;
481 : 0 : slot->candidate_catalog_xmin = InvalidTransactionId;
482 : 0 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
483 : 0 : slot->candidate_restart_valid = InvalidXLogRecPtr;
484 : 0 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
485 : 0 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
486 : 0 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
487 : 0 : slot->inactive_since = 0;
488 : 0 : slot->slotsync_skip_reason = SS_SKIP_NONE;
489 : :
490 : : /*
491 : : * Create the slot on disk. We haven't actually marked the slot allocated
492 : : * yet, so no special cleanup is required if this errors out.
493 : : */
494 : 0 : CreateSlotOnDisk(slot);
495 : :
496 : : /*
497 : : * We need to briefly prevent any other backend from iterating over the
498 : : * slots while we flip the in_use flag. We also need to set the active
499 : : * flag while holding the ControlLock as otherwise a concurrent
500 : : * ReplicationSlotAcquire() could acquire the slot as well.
501 : : */
502 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
503 : :
504 : 0 : slot->in_use = true;
505 : :
506 : : /* We can now mark the slot active, and that makes it our slot. */
507 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
508 [ # # ]: 0 : Assert(slot->active_pid == 0);
509 : 0 : slot->active_pid = MyProcPid;
510 : 0 : SpinLockRelease(&slot->mutex);
511 : 0 : MyReplicationSlot = slot;
512 : :
513 : 0 : LWLockRelease(ReplicationSlotControlLock);
514 : :
515 : : /*
516 : : * Create statistics entry for the new logical slot. We don't collect any
517 : : * stats for physical slots, so no need to create an entry for the same.
518 : : * See ReplicationSlotDropPtr for why we need to do this before releasing
519 : : * ReplicationSlotAllocationLock.
520 : : */
521 [ # # ]: 0 : if (SlotIsLogical(slot))
522 : 0 : pgstat_create_replslot(slot);
523 : :
524 : : /*
525 : : * Now that the slot has been marked as in_use and active, it's safe to
526 : : * let somebody else try to allocate a slot.
527 : : */
528 : 0 : LWLockRelease(ReplicationSlotAllocationLock);
529 : :
530 : : /* Let everybody know we've modified this slot */
531 : 0 : ConditionVariableBroadcast(&slot->active_cv);
532 : 0 : }
533 : :
534 : : /*
535 : : * Search for the named replication slot.
536 : : *
537 : : * Return the replication slot if found, otherwise NULL.
538 : : */
539 : : ReplicationSlot *
540 : 1 : SearchNamedReplicationSlot(const char *name, bool need_lock)
541 : : {
542 : 1 : int i;
543 : 1 : ReplicationSlot *slot = NULL;
544 : :
545 [ - + ]: 1 : if (need_lock)
546 : 1 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
547 : :
548 [ + + ]: 11 : for (i = 0; i < max_replication_slots; i++)
549 : : {
550 : 10 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
551 : :
552 [ - + # # ]: 10 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
553 : : {
554 : 0 : slot = s;
555 : 0 : break;
556 : : }
557 [ - - + ]: 10 : }
558 : :
559 [ - + ]: 1 : if (need_lock)
560 : 1 : LWLockRelease(ReplicationSlotControlLock);
561 : :
562 : 2 : return slot;
563 : 1 : }
564 : :
565 : : /*
566 : : * Return the index of the replication slot in
567 : : * ReplicationSlotCtl->replication_slots.
568 : : *
569 : : * This is mainly useful to have an efficient key for storing replication slot
570 : : * stats.
571 : : */
572 : : int
573 : 0 : ReplicationSlotIndex(ReplicationSlot *slot)
574 : : {
575 [ # # ]: 0 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
576 : : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
577 : :
578 : 0 : return slot - ReplicationSlotCtl->replication_slots;
579 : : }
580 : :
581 : : /*
582 : : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
583 : : * the slot's name and true is returned.
584 : : *
585 : : * This likely is only useful for pgstat_replslot.c during shutdown, in other
586 : : * cases there are obvious TOCTOU issues.
587 : : */
588 : : bool
589 : 0 : ReplicationSlotName(int index, Name name)
590 : : {
591 : 0 : ReplicationSlot *slot;
592 : 0 : bool found;
593 : :
594 : 0 : slot = &ReplicationSlotCtl->replication_slots[index];
595 : :
596 : : /*
597 : : * Ensure that the slot cannot be dropped while we copy the name. Don't
598 : : * need the spinlock as the name of an existing slot cannot change.
599 : : */
600 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
601 : 0 : found = slot->in_use;
602 [ # # ]: 0 : if (slot->in_use)
603 : 0 : namestrcpy(name, NameStr(slot->data.name));
604 : 0 : LWLockRelease(ReplicationSlotControlLock);
605 : :
606 : 0 : return found;
607 : 0 : }
608 : :
609 : : /*
610 : : * Find a previously created slot and mark it as used by this process.
611 : : *
612 : : * An error is raised if nowait is true and the slot is currently in use. If
613 : : * nowait is false, we sleep until the slot is released by the owning process.
614 : : *
615 : : * An error is raised if error_if_invalid is true and the slot is found to
616 : : * be invalid. It should always be set to true, except when we are temporarily
617 : : * acquiring the slot and don't intend to change it.
618 : : */
619 : : void
620 : 0 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
621 : : {
622 : 0 : ReplicationSlot *s;
623 : 0 : int active_pid;
624 : :
625 [ # # ]: 0 : Assert(name != NULL);
626 : :
627 : : retry:
628 [ # # ]: 0 : Assert(MyReplicationSlot == NULL);
629 : :
630 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
631 : :
632 : : /* Check if the slot exists with the given name. */
633 : 0 : s = SearchNamedReplicationSlot(name, false);
634 [ # # ]: 0 : if (s == NULL || !s->in_use)
635 : : {
636 : 0 : LWLockRelease(ReplicationSlotControlLock);
637 : :
638 [ # # # # ]: 0 : ereport(ERROR,
639 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
640 : : errmsg("replication slot \"%s\" does not exist",
641 : : name)));
642 : 0 : }
643 : :
644 : : /*
645 : : * Do not allow users to acquire the reserved slot. This scenario may
646 : : * occur if the launcher that owns the slot has terminated unexpectedly
647 : : * due to an error, and a backend process attempts to reuse the slot.
648 : : */
649 [ # # # # ]: 0 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
650 [ # # # # ]: 0 : ereport(ERROR,
651 : : errcode(ERRCODE_UNDEFINED_OBJECT),
652 : : errmsg("cannot acquire replication slot \"%s\"", name),
653 : : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
654 : :
655 : : /*
656 : : * This is the slot we want; check if it's active under some other
657 : : * process. In single user mode, we don't need this check.
658 : : */
659 [ # # ]: 0 : if (IsUnderPostmaster)
660 : : {
661 : : /*
662 : : * Get ready to sleep on the slot in case it is active. (We may end
663 : : * up not sleeping, but we don't want to do this while holding the
664 : : * spinlock.)
665 : : */
666 [ # # ]: 0 : if (!nowait)
667 : 0 : ConditionVariablePrepareToSleep(&s->active_cv);
668 : :
669 : : /*
670 : : * It is important to reset the inactive_since under spinlock here to
671 : : * avoid race conditions with slot invalidation. See comments related
672 : : * to inactive_since in InvalidatePossiblyObsoleteSlot.
673 : : */
674 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
675 [ # # ]: 0 : if (s->active_pid == 0)
676 : 0 : s->active_pid = MyProcPid;
677 : 0 : active_pid = s->active_pid;
678 : 0 : ReplicationSlotSetInactiveSince(s, 0, false);
679 : 0 : SpinLockRelease(&s->mutex);
680 : 0 : }
681 : : else
682 : : {
683 : 0 : s->active_pid = active_pid = MyProcPid;
684 : 0 : ReplicationSlotSetInactiveSince(s, 0, true);
685 : : }
686 : 0 : LWLockRelease(ReplicationSlotControlLock);
687 : :
688 : : /*
689 : : * If we found the slot but it's already active in another process, we
690 : : * wait until the owning process signals us that it's been released, or
691 : : * error out.
692 : : */
693 [ # # ]: 0 : if (active_pid != MyProcPid)
694 : : {
695 [ # # ]: 0 : if (!nowait)
696 : : {
697 : : /* Wait here until we get signaled, and then restart */
698 : 0 : ConditionVariableSleep(&s->active_cv,
699 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
700 : 0 : ConditionVariableCancelSleep();
701 : 0 : goto retry;
702 : : }
703 : :
704 [ # # # # ]: 0 : ereport(ERROR,
705 : : (errcode(ERRCODE_OBJECT_IN_USE),
706 : : errmsg("replication slot \"%s\" is active for PID %d",
707 : : NameStr(s->data.name), active_pid)));
708 : 0 : }
709 [ # # ]: 0 : else if (!nowait)
710 : 0 : ConditionVariableCancelSleep(); /* no sleep needed after all */
711 : :
712 : : /* We made this slot active, so it's ours now. */
713 : 0 : MyReplicationSlot = s;
714 : :
715 : : /*
716 : : * We need to check for invalidation after making the slot ours to avoid
717 : : * the possible race condition with the checkpointer that can otherwise
718 : : * invalidate the slot immediately after the check.
719 : : */
720 [ # # # # ]: 0 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
721 [ # # # # ]: 0 : ereport(ERROR,
722 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
723 : : errmsg("can no longer access replication slot \"%s\"",
724 : : NameStr(s->data.name)),
725 : : errdetail("This replication slot has been invalidated due to \"%s\".",
726 : : GetSlotInvalidationCauseName(s->data.invalidated)));
727 : :
728 : : /* Let everybody know we've modified this slot */
729 : 0 : ConditionVariableBroadcast(&s->active_cv);
730 : :
731 : : /*
732 : : * The call to pgstat_acquire_replslot() protects against stats for a
733 : : * different slot, from before a restart or such, being present during
734 : : * pgstat_report_replslot().
735 : : */
736 [ # # ]: 0 : if (SlotIsLogical(s))
737 : 0 : pgstat_acquire_replslot(s);
738 : :
739 : :
740 [ # # ]: 0 : if (am_walsender)
741 : : {
742 [ # # # # : 0 : ereport(log_replication_commands ? LOG : DEBUG1,
# # # # #
# # # ]
743 : : SlotIsLogical(s)
744 : : ? errmsg("acquired logical replication slot \"%s\"",
745 : : NameStr(s->data.name))
746 : : : errmsg("acquired physical replication slot \"%s\"",
747 : : NameStr(s->data.name)));
748 : 0 : }
749 : 0 : }
750 : :
751 : : /*
752 : : * Release the replication slot that this backend considers to own.
753 : : *
754 : : * This or another backend can re-acquire the slot later.
755 : : * Resources this slot requires will be preserved.
756 : : */
757 : : void
758 : 0 : ReplicationSlotRelease(void)
759 : : {
760 : 0 : ReplicationSlot *slot = MyReplicationSlot;
761 : 0 : char *slotname = NULL; /* keep compiler quiet */
762 : 0 : bool is_logical;
763 : 0 : TimestampTz now = 0;
764 : :
765 [ # # ]: 0 : Assert(slot != NULL && slot->active_pid != 0);
766 : :
767 : 0 : is_logical = SlotIsLogical(slot);
768 : :
769 [ # # ]: 0 : if (am_walsender)
770 : 0 : slotname = pstrdup(NameStr(slot->data.name));
771 : :
772 [ # # ]: 0 : if (slot->data.persistency == RS_EPHEMERAL)
773 : : {
774 : : /*
775 : : * Delete the slot. There is no !PANIC case where this is allowed to
776 : : * fail, all that may happen is an incomplete cleanup of the on-disk
777 : : * data.
778 : : */
779 : 0 : ReplicationSlotDropAcquired();
780 : :
781 : : /*
782 : : * Request to disable logical decoding, even though this slot may not
783 : : * have been the last logical slot. The checkpointer will verify if
784 : : * logical decoding should actually be disabled.
785 : : */
786 [ # # ]: 0 : if (is_logical)
787 : 0 : RequestDisableLogicalDecoding();
788 : 0 : }
789 : :
790 : : /*
791 : : * If slot needed to temporarily restrain both data and catalog xmin to
792 : : * create the catalog snapshot, remove that temporary constraint.
793 : : * Snapshots can only be exported while the initial snapshot is still
794 : : * acquired.
795 : : */
796 [ # # # # ]: 0 : if (!TransactionIdIsValid(slot->data.xmin) &&
797 : 0 : TransactionIdIsValid(slot->effective_xmin))
798 : : {
799 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
800 : 0 : slot->effective_xmin = InvalidTransactionId;
801 : 0 : SpinLockRelease(&slot->mutex);
802 : 0 : ReplicationSlotsComputeRequiredXmin(false);
803 : 0 : }
804 : :
805 : : /*
806 : : * Set the time since the slot has become inactive. We get the current
807 : : * time beforehand to avoid system call while holding the spinlock.
808 : : */
809 : 0 : now = GetCurrentTimestamp();
810 : :
811 [ # # ]: 0 : if (slot->data.persistency == RS_PERSISTENT)
812 : : {
813 : : /*
814 : : * Mark persistent slot inactive. We're not freeing it, just
815 : : * disconnecting, but wake up others that may be waiting for it.
816 : : */
817 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
818 : 0 : slot->active_pid = 0;
819 : 0 : ReplicationSlotSetInactiveSince(slot, now, false);
820 : 0 : SpinLockRelease(&slot->mutex);
821 : 0 : ConditionVariableBroadcast(&slot->active_cv);
822 : 0 : }
823 : : else
824 : 0 : ReplicationSlotSetInactiveSince(slot, now, true);
825 : :
826 : 0 : MyReplicationSlot = NULL;
827 : :
828 : : /* might not have been set when we've been a plain slot */
829 : 0 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
830 : 0 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
831 : 0 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
832 : 0 : LWLockRelease(ProcArrayLock);
833 : :
834 [ # # ]: 0 : if (am_walsender)
835 : : {
836 [ # # # # : 0 : ereport(log_replication_commands ? LOG : DEBUG1,
# # # # #
# # # ]
837 : : is_logical
838 : : ? errmsg("released logical replication slot \"%s\"",
839 : : slotname)
840 : : : errmsg("released physical replication slot \"%s\"",
841 : : slotname));
842 : :
843 : 0 : pfree(slotname);
844 : 0 : }
845 : 0 : }
846 : :
847 : : /*
848 : : * Cleanup temporary slots created in current session.
849 : : *
850 : : * Cleanup only synced temporary slots if 'synced_only' is true, else
851 : : * cleanup all temporary slots.
852 : : *
853 : : * If it drops the last logical slot in the cluster, requests to disable
854 : : * logical decoding.
855 : : */
856 : : void
857 : 7569 : ReplicationSlotCleanup(bool synced_only)
858 : : {
859 : 7569 : int i;
860 : 7569 : bool found_valid_logicalslot;
861 : 7569 : bool dropped_logical = false;
862 : :
863 [ + - ]: 7569 : Assert(MyReplicationSlot == NULL);
864 : :
865 : : restart:
866 : 7569 : found_valid_logicalslot = false;
867 : 7569 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
868 [ + + ]: 83259 : for (i = 0; i < max_replication_slots; i++)
869 : : {
870 : 75690 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
871 : :
872 [ - + ]: 75690 : if (!s->in_use)
873 : 75690 : continue;
874 : :
875 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
876 : :
877 : 0 : found_valid_logicalslot |=
878 [ # # ]: 0 : (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
879 : :
880 [ # # # # ]: 0 : if ((s->active_pid == MyProcPid &&
881 [ # # ]: 0 : (!synced_only || s->data.synced)))
882 : : {
883 [ # # ]: 0 : Assert(s->data.persistency == RS_TEMPORARY);
884 : 0 : SpinLockRelease(&s->mutex);
885 : 0 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
886 : :
887 [ # # ]: 0 : if (SlotIsLogical(s))
888 : 0 : dropped_logical = true;
889 : :
890 : 0 : ReplicationSlotDropPtr(s);
891 : :
892 : 0 : ConditionVariableBroadcast(&s->active_cv);
893 : 0 : goto restart;
894 : : }
895 : : else
896 : 0 : SpinLockRelease(&s->mutex);
897 [ - + - - ]: 75690 : }
898 : :
899 : 7569 : LWLockRelease(ReplicationSlotControlLock);
900 : :
901 [ - + # # ]: 7569 : if (dropped_logical && !found_valid_logicalslot)
902 : 0 : RequestDisableLogicalDecoding();
903 : 7569 : }
904 : :
905 : : /*
906 : : * Permanently drop replication slot identified by the passed in name.
907 : : */
908 : : void
909 : 0 : ReplicationSlotDrop(const char *name, bool nowait)
910 : : {
911 : 0 : bool is_logical;
912 : :
913 [ # # ]: 0 : Assert(MyReplicationSlot == NULL);
914 : :
915 : 0 : ReplicationSlotAcquire(name, nowait, false);
916 : :
917 : : /*
918 : : * Do not allow users to drop the slots which are currently being synced
919 : : * from the primary to the standby.
920 : : */
921 [ # # # # ]: 0 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
922 [ # # # # ]: 0 : ereport(ERROR,
923 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
924 : : errmsg("cannot drop replication slot \"%s\"", name),
925 : : errdetail("This replication slot is being synchronized from the primary server."));
926 : :
927 : 0 : is_logical = SlotIsLogical(MyReplicationSlot);
928 : :
929 : 0 : ReplicationSlotDropAcquired();
930 : :
931 [ # # ]: 0 : if (is_logical)
932 : 0 : RequestDisableLogicalDecoding();
933 : 0 : }
934 : :
935 : : /*
936 : : * Change the definition of the slot identified by the specified name.
937 : : *
938 : : * Altering the two_phase property of a slot requires caution on the
939 : : * client-side. Enabling it at any random point during decoding has the
940 : : * risk that transactions prepared before this change may be skipped by
941 : : * the decoder, leading to missing prepare records on the client. So, we
942 : : * enable it for subscription related slots only once the initial tablesync
943 : : * is finished. See comments atop worker.c. Disabling it is safe only when
944 : : * there are no pending prepared transaction, otherwise, the changes of
945 : : * already prepared transactions can be replicated again along with their
946 : : * corresponding commit leading to duplicate data or errors.
947 : : */
948 : : void
949 : 0 : ReplicationSlotAlter(const char *name, const bool *failover,
950 : : const bool *two_phase)
951 : : {
952 : 0 : bool update_slot = false;
953 : :
954 [ # # ]: 0 : Assert(MyReplicationSlot == NULL);
955 [ # # # # ]: 0 : Assert(failover || two_phase);
956 : :
957 : 0 : ReplicationSlotAcquire(name, false, true);
958 : :
959 [ # # ]: 0 : if (SlotIsPhysical(MyReplicationSlot))
960 [ # # # # ]: 0 : ereport(ERROR,
961 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
962 : : errmsg("cannot use %s with a physical replication slot",
963 : : "ALTER_REPLICATION_SLOT"));
964 : :
965 [ # # ]: 0 : if (RecoveryInProgress())
966 : : {
967 : : /*
968 : : * Do not allow users to alter the slots which are currently being
969 : : * synced from the primary to the standby.
970 : : */
971 [ # # ]: 0 : if (MyReplicationSlot->data.synced)
972 [ # # # # ]: 0 : ereport(ERROR,
973 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
974 : : errmsg("cannot alter replication slot \"%s\"", name),
975 : : errdetail("This replication slot is being synchronized from the primary server."));
976 : :
977 : : /*
978 : : * Do not allow users to enable failover on the standby as we do not
979 : : * support sync to the cascading standby.
980 : : */
981 [ # # # # ]: 0 : if (failover && *failover)
982 [ # # # # ]: 0 : ereport(ERROR,
983 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
984 : : errmsg("cannot enable failover for a replication slot"
985 : : " on the standby"));
986 : 0 : }
987 : :
988 [ # # ]: 0 : if (failover)
989 : : {
990 : : /*
991 : : * Do not allow users to enable failover for temporary slots as we do
992 : : * not support syncing temporary slots to the standby.
993 : : */
994 [ # # # # ]: 0 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
995 [ # # # # ]: 0 : ereport(ERROR,
996 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
997 : : errmsg("cannot enable failover for a temporary replication slot"));
998 : :
999 [ # # ]: 0 : if (MyReplicationSlot->data.failover != *failover)
1000 : : {
1001 [ # # ]: 0 : SpinLockAcquire(&MyReplicationSlot->mutex);
1002 : 0 : MyReplicationSlot->data.failover = *failover;
1003 : 0 : SpinLockRelease(&MyReplicationSlot->mutex);
1004 : :
1005 : 0 : update_slot = true;
1006 : 0 : }
1007 : 0 : }
1008 : :
1009 [ # # # # ]: 0 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
1010 : : {
1011 [ # # ]: 0 : SpinLockAcquire(&MyReplicationSlot->mutex);
1012 : 0 : MyReplicationSlot->data.two_phase = *two_phase;
1013 : 0 : SpinLockRelease(&MyReplicationSlot->mutex);
1014 : :
1015 : 0 : update_slot = true;
1016 : 0 : }
1017 : :
1018 [ # # ]: 0 : if (update_slot)
1019 : : {
1020 : 0 : ReplicationSlotMarkDirty();
1021 : 0 : ReplicationSlotSave();
1022 : 0 : }
1023 : :
1024 : 0 : ReplicationSlotRelease();
1025 : 0 : }
1026 : :
1027 : : /*
1028 : : * Permanently drop the currently acquired replication slot.
1029 : : */
1030 : : void
1031 : 0 : ReplicationSlotDropAcquired(void)
1032 : : {
1033 : 0 : ReplicationSlot *slot = MyReplicationSlot;
1034 : :
1035 [ # # ]: 0 : Assert(MyReplicationSlot != NULL);
1036 : :
1037 : : /* slot isn't acquired anymore */
1038 : 0 : MyReplicationSlot = NULL;
1039 : :
1040 : 0 : ReplicationSlotDropPtr(slot);
1041 : 0 : }
1042 : :
1043 : : /*
1044 : : * Permanently drop the replication slot which will be released by the point
1045 : : * this function returns.
1046 : : */
1047 : : static void
1048 : 0 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1049 : : {
1050 : 0 : char path[MAXPGPATH];
1051 : 0 : char tmppath[MAXPGPATH];
1052 : :
1053 : : /*
1054 : : * If some other backend ran this code concurrently with us, we might try
1055 : : * to delete a slot with a certain name while someone else was trying to
1056 : : * create a slot with the same name.
1057 : : */
1058 : 0 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1059 : :
1060 : : /* Generate pathnames. */
1061 : 0 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1062 : 0 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1063 : :
1064 : : /*
1065 : : * Rename the slot directory on disk, so that we'll no longer recognize
1066 : : * this as a valid slot. Note that if this fails, we've got to mark the
1067 : : * slot inactive before bailing out. If we're dropping an ephemeral or a
1068 : : * temporary slot, we better never fail hard as the caller won't expect
1069 : : * the slot to survive and this might get called during error handling.
1070 : : */
1071 [ # # ]: 0 : if (rename(path, tmppath) == 0)
1072 : : {
1073 : : /*
1074 : : * We need to fsync() the directory we just renamed and its parent to
1075 : : * make sure that our changes are on disk in a crash-safe fashion. If
1076 : : * fsync() fails, we can't be sure whether the changes are on disk or
1077 : : * not. For now, we handle that by panicking;
1078 : : * StartupReplicationSlots() will try to straighten it out after
1079 : : * restart.
1080 : : */
1081 : 0 : START_CRIT_SECTION();
1082 : 0 : fsync_fname(tmppath, true);
1083 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
1084 [ # # ]: 0 : END_CRIT_SECTION();
1085 : 0 : }
1086 : : else
1087 : : {
1088 : 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1089 : :
1090 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
1091 : 0 : slot->active_pid = 0;
1092 : 0 : SpinLockRelease(&slot->mutex);
1093 : :
1094 : : /* wake up anyone waiting on this slot */
1095 : 0 : ConditionVariableBroadcast(&slot->active_cv);
1096 : :
1097 [ # # # # : 0 : ereport(fail_softly ? WARNING : ERROR,
# # # # #
# ]
1098 : : (errcode_for_file_access(),
1099 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1100 : : path, tmppath)));
1101 : 0 : }
1102 : :
1103 : : /*
1104 : : * The slot is definitely gone. Lock out concurrent scans of the array
1105 : : * long enough to kill it. It's OK to clear the active PID here without
1106 : : * grabbing the mutex because nobody else can be scanning the array here,
1107 : : * and nobody can be attached to this slot and thus access it without
1108 : : * scanning the array.
1109 : : *
1110 : : * Also wake up processes waiting for it.
1111 : : */
1112 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1113 : 0 : slot->active_pid = 0;
1114 : 0 : slot->in_use = false;
1115 : 0 : LWLockRelease(ReplicationSlotControlLock);
1116 : 0 : ConditionVariableBroadcast(&slot->active_cv);
1117 : :
1118 : : /*
1119 : : * Slot is dead and doesn't prevent resource removal anymore, recompute
1120 : : * limits.
1121 : : */
1122 : 0 : ReplicationSlotsComputeRequiredXmin(false);
1123 : 0 : ReplicationSlotsComputeRequiredLSN();
1124 : :
1125 : : /*
1126 : : * If removing the directory fails, the worst thing that will happen is
1127 : : * that the user won't be able to create a new slot with the same name
1128 : : * until the next server restart. We warn about it, but that's all.
1129 : : */
1130 [ # # ]: 0 : if (!rmtree(tmppath, true))
1131 [ # # # # ]: 0 : ereport(WARNING,
1132 : : (errmsg("could not remove directory \"%s\"", tmppath)));
1133 : :
1134 : : /*
1135 : : * Drop the statistics entry for the replication slot. Do this while
1136 : : * holding ReplicationSlotAllocationLock so that we don't drop a
1137 : : * statistics entry for another slot with the same name just created in
1138 : : * another session.
1139 : : */
1140 [ # # ]: 0 : if (SlotIsLogical(slot))
1141 : 0 : pgstat_drop_replslot(slot);
1142 : :
1143 : : /*
1144 : : * We release this at the very end, so that nobody starts trying to create
1145 : : * a slot while we're still cleaning up the detritus of the old one.
1146 : : */
1147 : 0 : LWLockRelease(ReplicationSlotAllocationLock);
1148 : 0 : }
1149 : :
1150 : : /*
1151 : : * Serialize the currently acquired slot's state from memory to disk, thereby
1152 : : * guaranteeing the current state will survive a crash.
1153 : : */
1154 : : void
1155 : 0 : ReplicationSlotSave(void)
1156 : : {
1157 : 0 : char path[MAXPGPATH];
1158 : :
1159 [ # # ]: 0 : Assert(MyReplicationSlot != NULL);
1160 : :
1161 : 0 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1162 : 0 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1163 : 0 : }
1164 : :
1165 : : /*
1166 : : * Signal that it would be useful if the currently acquired slot would be
1167 : : * flushed out to disk.
1168 : : *
1169 : : * Note that the actual flush to disk can be delayed for a long time, if
1170 : : * required for correctness explicitly do a ReplicationSlotSave().
1171 : : */
1172 : : void
1173 : 0 : ReplicationSlotMarkDirty(void)
1174 : : {
1175 : 0 : ReplicationSlot *slot = MyReplicationSlot;
1176 : :
1177 [ # # ]: 0 : Assert(MyReplicationSlot != NULL);
1178 : :
1179 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
1180 : 0 : MyReplicationSlot->just_dirtied = true;
1181 : 0 : MyReplicationSlot->dirty = true;
1182 : 0 : SpinLockRelease(&slot->mutex);
1183 : 0 : }
1184 : :
1185 : : /*
1186 : : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1187 : : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1188 : : */
1189 : : void
1190 : 0 : ReplicationSlotPersist(void)
1191 : : {
1192 : 0 : ReplicationSlot *slot = MyReplicationSlot;
1193 : :
1194 [ # # ]: 0 : Assert(slot != NULL);
1195 [ # # ]: 0 : Assert(slot->data.persistency != RS_PERSISTENT);
1196 : :
1197 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
1198 : 0 : slot->data.persistency = RS_PERSISTENT;
1199 : 0 : SpinLockRelease(&slot->mutex);
1200 : :
1201 : 0 : ReplicationSlotMarkDirty();
1202 : 0 : ReplicationSlotSave();
1203 : 0 : }
1204 : :
1205 : : /*
1206 : : * Compute the oldest xmin across all slots and store it in the ProcArray.
1207 : : *
1208 : : * If already_locked is true, both the ReplicationSlotControlLock and the
1209 : : * ProcArrayLock have already been acquired exclusively. It is crucial that the
1210 : : * caller first acquires the ReplicationSlotControlLock, followed by the
1211 : : * ProcArrayLock, to prevent any undetectable deadlocks since this function
1212 : : * acquires them in that order.
1213 : : */
1214 : : void
1215 : 4 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1216 : : {
1217 : 4 : int i;
1218 : 4 : TransactionId agg_xmin = InvalidTransactionId;
1219 : 4 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1220 : :
1221 [ + - ]: 4 : Assert(ReplicationSlotCtl != NULL);
1222 [ + - # # ]: 4 : Assert(!already_locked ||
1223 : : (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1224 : : LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1225 : :
1226 : : /*
1227 : : * Hold the ReplicationSlotControlLock until after updating the slot xmin
1228 : : * values, so no backend updates the initial xmin for newly created slot
1229 : : * concurrently. A shared lock is used here to minimize lock contention,
1230 : : * especially when many slots exist and advancements occur frequently.
1231 : : * This is safe since an exclusive lock is taken during initial slot xmin
1232 : : * update in slot creation.
1233 : : *
1234 : : * One might think that we can hold the ProcArrayLock exclusively and
1235 : : * update the slot xmin values, but it could increase lock contention on
1236 : : * the ProcArrayLock, which is not great since this function can be called
1237 : : * at non-negligible frequency.
1238 : : *
1239 : : * Concurrent invocation of this function may cause the computed slot xmin
1240 : : * to regress. However, this is harmless because tuples prior to the most
1241 : : * recent xmin are no longer useful once advancement occurs (see
1242 : : * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1243 : : * before updating the effective_xmin). Thus, such regression merely
1244 : : * prevents VACUUM from prematurely removing tuples without causing the
1245 : : * early deletion of required data.
1246 : : */
1247 [ - + ]: 4 : if (!already_locked)
1248 : 4 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1249 : :
1250 [ + + ]: 44 : for (i = 0; i < max_replication_slots; i++)
1251 : : {
1252 : 40 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1253 : 40 : TransactionId effective_xmin;
1254 : 40 : TransactionId effective_catalog_xmin;
1255 : 40 : bool invalidated;
1256 : :
1257 [ - + ]: 40 : if (!s->in_use)
1258 : 40 : continue;
1259 : :
1260 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
1261 : 0 : effective_xmin = s->effective_xmin;
1262 : 0 : effective_catalog_xmin = s->effective_catalog_xmin;
1263 : 0 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1264 : 0 : SpinLockRelease(&s->mutex);
1265 : :
1266 : : /* invalidated slots need not apply */
1267 [ # # ]: 0 : if (invalidated)
1268 : 0 : continue;
1269 : :
1270 : : /* check the data xmin */
1271 [ # # # # ]: 0 : if (TransactionIdIsValid(effective_xmin) &&
1272 [ # # ]: 0 : (!TransactionIdIsValid(agg_xmin) ||
1273 : 0 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1274 : 0 : agg_xmin = effective_xmin;
1275 : :
1276 : : /* check the catalog xmin */
1277 [ # # # # ]: 0 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1278 [ # # ]: 0 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1279 : 0 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1280 : 0 : agg_catalog_xmin = effective_catalog_xmin;
1281 [ - + - ]: 40 : }
1282 : :
1283 : 4 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1284 : :
1285 [ - + ]: 4 : if (!already_locked)
1286 : 4 : LWLockRelease(ReplicationSlotControlLock);
1287 : 4 : }
1288 : :
1289 : : /*
1290 : : * Compute the oldest restart LSN across all slots and inform xlog module.
1291 : : *
1292 : : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1293 : : * purpose, we don't try to account for that, because this module doesn't
1294 : : * know what to compare against.
1295 : : */
1296 : : void
1297 : 4 : ReplicationSlotsComputeRequiredLSN(void)
1298 : : {
1299 : 4 : int i;
1300 : 4 : XLogRecPtr min_required = InvalidXLogRecPtr;
1301 : :
1302 [ + - ]: 4 : Assert(ReplicationSlotCtl != NULL);
1303 : :
1304 : 4 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1305 [ + + ]: 44 : for (i = 0; i < max_replication_slots; i++)
1306 : : {
1307 : 40 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1308 : 40 : XLogRecPtr restart_lsn;
1309 : 40 : XLogRecPtr last_saved_restart_lsn;
1310 : 40 : bool invalidated;
1311 : 40 : ReplicationSlotPersistency persistency;
1312 : :
1313 [ - + ]: 40 : if (!s->in_use)
1314 : 40 : continue;
1315 : :
1316 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
1317 : 0 : persistency = s->data.persistency;
1318 : 0 : restart_lsn = s->data.restart_lsn;
1319 : 0 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1320 : 0 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1321 : 0 : SpinLockRelease(&s->mutex);
1322 : :
1323 : : /* invalidated slots need not apply */
1324 [ # # ]: 0 : if (invalidated)
1325 : 0 : continue;
1326 : :
1327 : : /*
1328 : : * For persistent slot use last_saved_restart_lsn to compute the
1329 : : * oldest LSN for removal of WAL segments. The segments between
1330 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1331 : : * persistent slot in the case of database crash. Non-persistent
1332 : : * slots can't survive the database crash, so we don't care about
1333 : : * last_saved_restart_lsn for them.
1334 : : */
1335 [ # # ]: 0 : if (persistency == RS_PERSISTENT)
1336 : : {
1337 [ # # # # ]: 0 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1338 : 0 : restart_lsn > last_saved_restart_lsn)
1339 : : {
1340 : 0 : restart_lsn = last_saved_restart_lsn;
1341 : 0 : }
1342 : 0 : }
1343 : :
1344 [ # # # # ]: 0 : if (XLogRecPtrIsValid(restart_lsn) &&
1345 [ # # ]: 0 : (!XLogRecPtrIsValid(min_required) ||
1346 : 0 : restart_lsn < min_required))
1347 : 0 : min_required = restart_lsn;
1348 [ - + - ]: 40 : }
1349 : 4 : LWLockRelease(ReplicationSlotControlLock);
1350 : :
1351 : 4 : XLogSetReplicationSlotMinimumLSN(min_required);
1352 : 4 : }
1353 : :
1354 : : /*
1355 : : * Compute the oldest WAL LSN required by *logical* decoding slots..
1356 : : *
1357 : : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1358 : : * slots exist.
1359 : : *
1360 : : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1361 : : * ignores physical replication slots.
1362 : : *
1363 : : * The results aren't required frequently, so we don't maintain a precomputed
1364 : : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1365 : : */
1366 : : XLogRecPtr
1367 : 14 : ReplicationSlotsComputeLogicalRestartLSN(void)
1368 : : {
1369 : 14 : XLogRecPtr result = InvalidXLogRecPtr;
1370 : 14 : int i;
1371 : :
1372 [ - + ]: 14 : if (max_replication_slots <= 0)
1373 : 0 : return InvalidXLogRecPtr;
1374 : :
1375 : 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1376 : :
1377 [ + + ]: 154 : for (i = 0; i < max_replication_slots; i++)
1378 : : {
1379 : 140 : ReplicationSlot *s;
1380 : 140 : XLogRecPtr restart_lsn;
1381 : 140 : XLogRecPtr last_saved_restart_lsn;
1382 : 140 : bool invalidated;
1383 : 140 : ReplicationSlotPersistency persistency;
1384 : :
1385 : 140 : s = &ReplicationSlotCtl->replication_slots[i];
1386 : :
1387 : : /* cannot change while ReplicationSlotCtlLock is held */
1388 [ - + ]: 140 : if (!s->in_use)
1389 : 140 : continue;
1390 : :
1391 : : /* we're only interested in logical slots */
1392 [ # # ]: 0 : if (!SlotIsLogical(s))
1393 : 0 : continue;
1394 : :
1395 : : /* read once, it's ok if it increases while we're checking */
1396 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
1397 : 0 : persistency = s->data.persistency;
1398 : 0 : restart_lsn = s->data.restart_lsn;
1399 : 0 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1400 : 0 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1401 : 0 : SpinLockRelease(&s->mutex);
1402 : :
1403 : : /* invalidated slots need not apply */
1404 [ # # ]: 0 : if (invalidated)
1405 : 0 : continue;
1406 : :
1407 : : /*
1408 : : * For persistent slot use last_saved_restart_lsn to compute the
1409 : : * oldest LSN for removal of WAL segments. The segments between
1410 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1411 : : * persistent slot in the case of database crash. Non-persistent
1412 : : * slots can't survive the database crash, so we don't care about
1413 : : * last_saved_restart_lsn for them.
1414 : : */
1415 [ # # ]: 0 : if (persistency == RS_PERSISTENT)
1416 : : {
1417 [ # # # # ]: 0 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1418 : 0 : restart_lsn > last_saved_restart_lsn)
1419 : : {
1420 : 0 : restart_lsn = last_saved_restart_lsn;
1421 : 0 : }
1422 : 0 : }
1423 : :
1424 [ # # ]: 0 : if (!XLogRecPtrIsValid(restart_lsn))
1425 : 0 : continue;
1426 : :
1427 [ # # # # ]: 0 : if (!XLogRecPtrIsValid(result) ||
1428 : 0 : restart_lsn < result)
1429 : 0 : result = restart_lsn;
1430 [ - + - ]: 140 : }
1431 : :
1432 : 14 : LWLockRelease(ReplicationSlotControlLock);
1433 : :
1434 : 14 : return result;
1435 : 14 : }
1436 : :
1437 : : /*
1438 : : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1439 : : * passed database oid.
1440 : : *
1441 : : * Returns true if there are any slots referencing the database. *nslots will
1442 : : * be set to the absolute number of slots in the database, *nactive to ones
1443 : : * currently active.
1444 : : */
1445 : : bool
1446 : 1 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1447 : : {
1448 : 1 : int i;
1449 : :
1450 : 1 : *nslots = *nactive = 0;
1451 : :
1452 [ - + ]: 1 : if (max_replication_slots <= 0)
1453 : 0 : return false;
1454 : :
1455 : 1 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1456 [ + + ]: 11 : for (i = 0; i < max_replication_slots; i++)
1457 : : {
1458 : 10 : ReplicationSlot *s;
1459 : :
1460 : 10 : s = &ReplicationSlotCtl->replication_slots[i];
1461 : :
1462 : : /* cannot change while ReplicationSlotCtlLock is held */
1463 [ - + ]: 10 : if (!s->in_use)
1464 : 10 : continue;
1465 : :
1466 : : /* only logical slots are database specific, skip */
1467 [ # # ]: 0 : if (!SlotIsLogical(s))
1468 : 0 : continue;
1469 : :
1470 : : /* not our database, skip */
1471 [ # # ]: 0 : if (s->data.database != dboid)
1472 : 0 : continue;
1473 : :
1474 : : /* NB: intentionally counting invalidated slots */
1475 : :
1476 : : /* count slots with spinlock held */
1477 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
1478 : 0 : (*nslots)++;
1479 [ # # ]: 0 : if (s->active_pid != 0)
1480 : 0 : (*nactive)++;
1481 : 0 : SpinLockRelease(&s->mutex);
1482 [ - + - ]: 10 : }
1483 : 1 : LWLockRelease(ReplicationSlotControlLock);
1484 : :
1485 [ - + ]: 1 : if (*nslots > 0)
1486 : 0 : return true;
1487 : 1 : return false;
1488 : 1 : }
1489 : :
1490 : : /*
1491 : : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1492 : : * passed database oid. The caller should hold an exclusive lock on the
1493 : : * pg_database oid for the database to prevent creation of new slots on the db
1494 : : * or replay from existing slots.
1495 : : *
1496 : : * Another session that concurrently acquires an existing slot on the target DB
1497 : : * (most likely to drop it) may cause this function to ERROR. If that happens
1498 : : * it may have dropped some but not all slots.
1499 : : *
1500 : : * This routine isn't as efficient as it could be - but we don't drop
1501 : : * databases often, especially databases with lots of slots.
1502 : : *
1503 : : * If it drops the last logical slot in the cluster, it requests to disable
1504 : : * logical decoding.
1505 : : */
1506 : : void
1507 : 1 : ReplicationSlotsDropDBSlots(Oid dboid)
1508 : : {
1509 : 1 : int i;
1510 : 1 : bool found_valid_logicalslot;
1511 : 1 : bool dropped = false;
1512 : :
1513 [ - + ]: 1 : if (max_replication_slots <= 0)
1514 : 0 : return;
1515 : :
1516 : : restart:
1517 : 1 : found_valid_logicalslot = false;
1518 : 1 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1519 [ + + ]: 11 : for (i = 0; i < max_replication_slots; i++)
1520 : : {
1521 : 10 : ReplicationSlot *s;
1522 : 10 : char *slotname;
1523 : 10 : int active_pid;
1524 : :
1525 : 10 : s = &ReplicationSlotCtl->replication_slots[i];
1526 : :
1527 : : /* cannot change while ReplicationSlotCtlLock is held */
1528 [ - + ]: 10 : if (!s->in_use)
1529 : 10 : continue;
1530 : :
1531 : : /* only logical slots are database specific, skip */
1532 [ # # ]: 0 : if (!SlotIsLogical(s))
1533 : 0 : continue;
1534 : :
1535 : : /*
1536 : : * Check logical slots on other databases too so we can disable
1537 : : * logical decoding only if no slots in the cluster.
1538 : : */
1539 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
1540 : 0 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1541 : 0 : SpinLockRelease(&s->mutex);
1542 : :
1543 : : /* not our database, skip */
1544 [ # # ]: 0 : if (s->data.database != dboid)
1545 : 0 : continue;
1546 : :
1547 : : /* NB: intentionally including invalidated slots to drop */
1548 : :
1549 : : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1550 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
1551 : : /* can't change while ReplicationSlotControlLock is held */
1552 : 0 : slotname = NameStr(s->data.name);
1553 : 0 : active_pid = s->active_pid;
1554 [ # # ]: 0 : if (active_pid == 0)
1555 : : {
1556 : 0 : MyReplicationSlot = s;
1557 : 0 : s->active_pid = MyProcPid;
1558 : 0 : }
1559 : 0 : SpinLockRelease(&s->mutex);
1560 : :
1561 : : /*
1562 : : * Even though we hold an exclusive lock on the database object a
1563 : : * logical slot for that DB can still be active, e.g. if it's
1564 : : * concurrently being dropped by a backend connected to another DB.
1565 : : *
1566 : : * That's fairly unlikely in practice, so we'll just bail out.
1567 : : *
1568 : : * The slot sync worker holds a shared lock on the database before
1569 : : * operating on synced logical slots to avoid conflict with the drop
1570 : : * happening here. The persistent synced slots are thus safe but there
1571 : : * is a possibility that the slot sync worker has created a temporary
1572 : : * slot (which stays active even on release) and we are trying to drop
1573 : : * that here. In practice, the chances of hitting this scenario are
1574 : : * less as during slot synchronization, the temporary slot is
1575 : : * immediately converted to persistent and thus is safe due to the
1576 : : * shared lock taken on the database. So, we'll just bail out in such
1577 : : * a case.
1578 : : *
1579 : : * XXX: We can consider shutting down the slot sync worker before
1580 : : * trying to drop synced temporary slots here.
1581 : : */
1582 [ # # ]: 0 : if (active_pid)
1583 [ # # # # ]: 0 : ereport(ERROR,
1584 : : (errcode(ERRCODE_OBJECT_IN_USE),
1585 : : errmsg("replication slot \"%s\" is active for PID %d",
1586 : : slotname, active_pid)));
1587 : :
1588 : : /*
1589 : : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1590 : : * holding ReplicationSlotControlLock over filesystem operations,
1591 : : * release ReplicationSlotControlLock and use
1592 : : * ReplicationSlotDropAcquired.
1593 : : *
1594 : : * As that means the set of slots could change, restart scan from the
1595 : : * beginning each time we release the lock.
1596 : : */
1597 : 0 : LWLockRelease(ReplicationSlotControlLock);
1598 : 0 : ReplicationSlotDropAcquired();
1599 : 0 : dropped = true;
1600 : 0 : goto restart;
1601 [ - + ]: 10 : }
1602 : 1 : LWLockRelease(ReplicationSlotControlLock);
1603 : :
1604 [ - + # # ]: 1 : if (dropped && !found_valid_logicalslot)
1605 : 0 : RequestDisableLogicalDecoding();
1606 : 1 : }
1607 : :
1608 : : /*
1609 : : * Returns true if there is at least one in-use valid logical replication slot.
1610 : : */
1611 : : bool
1612 : 3 : CheckLogicalSlotExists(void)
1613 : : {
1614 : 3 : bool found = false;
1615 : :
1616 [ - + ]: 3 : if (max_replication_slots <= 0)
1617 : 0 : return false;
1618 : :
1619 : 3 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1620 [ + + ]: 33 : for (int i = 0; i < max_replication_slots; i++)
1621 : : {
1622 : 30 : ReplicationSlot *s;
1623 : 30 : bool invalidated;
1624 : :
1625 : 30 : s = &ReplicationSlotCtl->replication_slots[i];
1626 : :
1627 : : /* cannot change while ReplicationSlotCtlLock is held */
1628 [ - + ]: 30 : if (!s->in_use)
1629 : 30 : continue;
1630 : :
1631 [ # # ]: 0 : if (SlotIsPhysical(s))
1632 : 0 : continue;
1633 : :
1634 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
1635 : 0 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1636 : 0 : SpinLockRelease(&s->mutex);
1637 : :
1638 [ # # ]: 0 : if (invalidated)
1639 : 0 : continue;
1640 : :
1641 : 0 : found = true;
1642 : 0 : break;
1643 [ - + ]: 30 : }
1644 : 3 : LWLockRelease(ReplicationSlotControlLock);
1645 : :
1646 : 3 : return found;
1647 : 3 : }
1648 : :
1649 : : /*
1650 : : * Check whether the server's configuration supports using replication
1651 : : * slots.
1652 : : */
1653 : : void
1654 : 0 : CheckSlotRequirements(void)
1655 : : {
1656 : : /*
1657 : : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1658 : : * needs the same check.
1659 : : */
1660 : :
1661 [ # # ]: 0 : if (max_replication_slots == 0)
1662 [ # # # # ]: 0 : ereport(ERROR,
1663 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1664 : : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1665 : :
1666 [ # # ]: 0 : if (wal_level < WAL_LEVEL_REPLICA)
1667 [ # # # # ]: 0 : ereport(ERROR,
1668 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1669 : : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1670 : 0 : }
1671 : :
1672 : : /*
1673 : : * Check whether the user has privilege to use replication slots.
1674 : : */
1675 : : void
1676 : 0 : CheckSlotPermissions(void)
1677 : : {
1678 [ # # ]: 0 : if (!has_rolreplication(GetUserId()))
1679 [ # # # # ]: 0 : ereport(ERROR,
1680 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1681 : : errmsg("permission denied to use replication slots"),
1682 : : errdetail("Only roles with the %s attribute may use replication slots.",
1683 : : "REPLICATION")));
1684 : 0 : }
1685 : :
1686 : : /*
1687 : : * Reserve WAL for the currently active slot.
1688 : : *
1689 : : * Compute and set restart_lsn in a manner that's appropriate for the type of
1690 : : * the slot and concurrency safe.
1691 : : */
1692 : : void
1693 : 0 : ReplicationSlotReserveWal(void)
1694 : : {
1695 : 0 : ReplicationSlot *slot = MyReplicationSlot;
1696 : 0 : XLogSegNo segno;
1697 : 0 : XLogRecPtr restart_lsn;
1698 : :
1699 [ # # ]: 0 : Assert(slot != NULL);
1700 [ # # ]: 0 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1701 [ # # ]: 0 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1702 : :
1703 : : /*
1704 : : * The replication slot mechanism is used to prevent the removal of
1705 : : * required WAL.
1706 : : *
1707 : : * Acquire an exclusive lock to prevent the checkpoint process from
1708 : : * concurrently computing the minimum slot LSN (see
1709 : : * CheckPointReplicationSlots). This ensures that the WAL reserved for
1710 : : * replication cannot be removed during a checkpoint.
1711 : : *
1712 : : * The mechanism is reliable because if WAL reservation occurs first, the
1713 : : * checkpoint must wait for the restart_lsn update before determining the
1714 : : * minimum non-removable LSN. On the other hand, if the checkpoint happens
1715 : : * first, subsequent WAL reservations will select positions at or beyond
1716 : : * the redo pointer of that checkpoint.
1717 : : */
1718 : 0 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1719 : :
1720 : : /*
1721 : : * For logical slots log a standby snapshot and start logical decoding at
1722 : : * exactly that position. That allows the slot to start up more quickly.
1723 : : * But on a standby we cannot do WAL writes, so just use the replay
1724 : : * pointer; effectively, an attempt to create a logical slot on standby
1725 : : * will cause it to wait for an xl_running_xact record to be logged
1726 : : * independently on the primary, so that a snapshot can be built using the
1727 : : * record.
1728 : : *
1729 : : * None of this is needed (or indeed helpful) for physical slots as
1730 : : * they'll start replay at the last logged checkpoint anyway. Instead,
1731 : : * return the location of the last redo LSN, where a base backup has to
1732 : : * start replay at.
1733 : : */
1734 [ # # ]: 0 : if (SlotIsPhysical(slot))
1735 : 0 : restart_lsn = GetRedoRecPtr();
1736 [ # # ]: 0 : else if (RecoveryInProgress())
1737 : 0 : restart_lsn = GetXLogReplayRecPtr(NULL);
1738 : : else
1739 : 0 : restart_lsn = GetXLogInsertRecPtr();
1740 : :
1741 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
1742 : 0 : slot->data.restart_lsn = restart_lsn;
1743 : 0 : SpinLockRelease(&slot->mutex);
1744 : :
1745 : : /* prevent WAL removal as fast as possible */
1746 : 0 : ReplicationSlotsComputeRequiredLSN();
1747 : :
1748 : : /* Checkpoint shouldn't remove the required WAL. */
1749 : 0 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1750 [ # # ]: 0 : if (XLogGetLastRemovedSegno() >= segno)
1751 [ # # # # ]: 0 : elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1752 : : NameStr(slot->data.name));
1753 : :
1754 : 0 : LWLockRelease(ReplicationSlotAllocationLock);
1755 : :
1756 [ # # # # ]: 0 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1757 : : {
1758 : 0 : XLogRecPtr flushptr;
1759 : :
1760 : : /* make sure we have enough information to start */
1761 : 0 : flushptr = LogStandbySnapshot();
1762 : :
1763 : : /* and make sure it's fsynced to disk */
1764 : 0 : XLogFlush(flushptr);
1765 : 0 : }
1766 : 0 : }
1767 : :
1768 : : /*
1769 : : * Report that replication slot needs to be invalidated
1770 : : */
1771 : : static void
1772 : 0 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1773 : : bool terminating,
1774 : : int pid,
1775 : : NameData slotname,
1776 : : XLogRecPtr restart_lsn,
1777 : : XLogRecPtr oldestLSN,
1778 : : TransactionId snapshotConflictHorizon,
1779 : : long slot_idle_seconds)
1780 : : {
1781 : 0 : StringInfoData err_detail;
1782 : 0 : StringInfoData err_hint;
1783 : :
1784 : 0 : initStringInfo(&err_detail);
1785 : 0 : initStringInfo(&err_hint);
1786 : :
1787 [ # # # # : 0 : switch (cause)
# # ]
1788 : : {
1789 : : case RS_INVAL_WAL_REMOVED:
1790 : : {
1791 : 0 : uint64 ex = oldestLSN - restart_lsn;
1792 : :
1793 : 0 : appendStringInfo(&err_detail,
1794 : 0 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1795 : : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1796 : 0 : ex),
1797 : 0 : LSN_FORMAT_ARGS(restart_lsn),
1798 : 0 : ex);
1799 : : /* translator: %s is a GUC variable name */
1800 : 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1801 : : "max_slot_wal_keep_size");
1802 : : break;
1803 : 0 : }
1804 : : case RS_INVAL_HORIZON:
1805 : 0 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1806 : 0 : snapshotConflictHorizon);
1807 : 0 : break;
1808 : :
1809 : : case RS_INVAL_WAL_LEVEL:
1810 : 0 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1811 : 0 : break;
1812 : :
1813 : : case RS_INVAL_IDLE_TIMEOUT:
1814 : : {
1815 : : /* translator: %s is a GUC variable name */
1816 : 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1817 : 0 : slot_idle_seconds, "idle_replication_slot_timeout",
1818 : 0 : idle_replication_slot_timeout_secs);
1819 : : /* translator: %s is a GUC variable name */
1820 : 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1821 : : "idle_replication_slot_timeout");
1822 : 0 : break;
1823 : : }
1824 : : case RS_INVAL_NONE:
1825 : 0 : pg_unreachable();
1826 : : }
1827 : :
1828 [ # # # # : 0 : ereport(LOG,
# # # # ]
1829 : : terminating ?
1830 : : errmsg("terminating process %d to release replication slot \"%s\"",
1831 : : pid, NameStr(slotname)) :
1832 : : errmsg("invalidating obsolete replication slot \"%s\"",
1833 : : NameStr(slotname)),
1834 : : errdetail_internal("%s", err_detail.data),
1835 : : err_hint.len ? errhint("%s", err_hint.data) : 0);
1836 : :
1837 : 0 : pfree(err_detail.data);
1838 : 0 : pfree(err_hint.data);
1839 : 0 : }
1840 : :
1841 : : /*
1842 : : * Can we invalidate an idle replication slot?
1843 : : *
1844 : : * Idle timeout invalidation is allowed only when:
1845 : : *
1846 : : * 1. Idle timeout is set
1847 : : * 2. Slot has reserved WAL
1848 : : * 3. Slot is inactive
1849 : : * 4. The slot is not being synced from the primary while the server is in
1850 : : * recovery. This is because synced slots are always considered to be
1851 : : * inactive because they don't perform logical decoding to produce changes.
1852 : : */
1853 : : static inline bool
1854 : 0 : CanInvalidateIdleSlot(ReplicationSlot *s)
1855 : : {
1856 [ # # ]: 0 : return (idle_replication_slot_timeout_secs != 0 &&
1857 [ # # ]: 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
1858 [ # # ]: 0 : s->inactive_since > 0 &&
1859 [ # # ]: 0 : !(RecoveryInProgress() && s->data.synced));
1860 : : }
1861 : :
1862 : : /*
1863 : : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1864 : : * becomes invalid among the given possible causes.
1865 : : *
1866 : : * This function sequentially checks all possible invalidation causes and
1867 : : * returns the first one for which the slot is eligible for invalidation.
1868 : : */
1869 : : static ReplicationSlotInvalidationCause
1870 : 0 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1871 : : XLogRecPtr oldestLSN, Oid dboid,
1872 : : TransactionId snapshotConflictHorizon,
1873 : : TimestampTz *inactive_since, TimestampTz now)
1874 : : {
1875 [ # # ]: 0 : Assert(possible_causes != RS_INVAL_NONE);
1876 : :
1877 [ # # ]: 0 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1878 : : {
1879 : 0 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1880 : :
1881 [ # # # # ]: 0 : if (XLogRecPtrIsValid(restart_lsn) &&
1882 : 0 : restart_lsn < oldestLSN)
1883 : 0 : return RS_INVAL_WAL_REMOVED;
1884 [ # # ]: 0 : }
1885 : :
1886 [ # # ]: 0 : if (possible_causes & RS_INVAL_HORIZON)
1887 : : {
1888 : : /* invalid DB oid signals a shared relation */
1889 [ # # # # ]: 0 : if (SlotIsLogical(s) &&
1890 [ # # ]: 0 : (dboid == InvalidOid || dboid == s->data.database))
1891 : : {
1892 : 0 : TransactionId effective_xmin = s->effective_xmin;
1893 : 0 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1894 : :
1895 [ # # # # ]: 0 : if (TransactionIdIsValid(effective_xmin) &&
1896 : 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1897 : 0 : snapshotConflictHorizon))
1898 : 0 : return RS_INVAL_HORIZON;
1899 [ # # # # ]: 0 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1900 : 0 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1901 : 0 : snapshotConflictHorizon))
1902 : 0 : return RS_INVAL_HORIZON;
1903 [ # # ]: 0 : }
1904 : 0 : }
1905 : :
1906 [ # # ]: 0 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1907 : : {
1908 [ # # ]: 0 : if (SlotIsLogical(s))
1909 : 0 : return RS_INVAL_WAL_LEVEL;
1910 : 0 : }
1911 : :
1912 [ # # ]: 0 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1913 : : {
1914 [ # # ]: 0 : Assert(now > 0);
1915 : :
1916 [ # # ]: 0 : if (CanInvalidateIdleSlot(s))
1917 : : {
1918 : : /*
1919 : : * Simulate the invalidation due to idle_timeout to test the
1920 : : * timeout behavior promptly, without waiting for it to trigger
1921 : : * naturally.
1922 : : */
1923 : : #ifdef USE_INJECTION_POINTS
1924 : : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1925 : : {
1926 : : *inactive_since = 0; /* since the beginning of time */
1927 : : return RS_INVAL_IDLE_TIMEOUT;
1928 : : }
1929 : : #endif
1930 : :
1931 : : /*
1932 : : * Check if the slot needs to be invalidated due to
1933 : : * idle_replication_slot_timeout GUC.
1934 : : */
1935 [ # # # # ]: 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1936 : 0 : idle_replication_slot_timeout_secs))
1937 : : {
1938 : 0 : *inactive_since = s->inactive_since;
1939 : 0 : return RS_INVAL_IDLE_TIMEOUT;
1940 : : }
1941 : 0 : }
1942 : 0 : }
1943 : :
1944 : 0 : return RS_INVAL_NONE;
1945 : 0 : }
1946 : :
1947 : : /*
1948 : : * Helper for InvalidateObsoleteReplicationSlots
1949 : : *
1950 : : * Acquires the given slot and mark it invalid, if necessary and possible.
1951 : : *
1952 : : * Returns true if the slot was invalidated.
1953 : : *
1954 : : * Set *released_lock_out if ReplicationSlotControlLock was released in the
1955 : : * interim (and in that case we're not holding the lock at return, otherwise
1956 : : * we are).
1957 : : *
1958 : : * This is inherently racy, because we release the LWLock
1959 : : * for syscalls, so caller must restart if we return true.
1960 : : */
1961 : : static bool
1962 : 0 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1963 : : ReplicationSlot *s,
1964 : : XLogRecPtr oldestLSN,
1965 : : Oid dboid, TransactionId snapshotConflictHorizon,
1966 : : bool *released_lock_out)
1967 : : {
1968 : 0 : int last_signaled_pid = 0;
1969 : 0 : bool released_lock = false;
1970 : 0 : bool invalidated = false;
1971 : 0 : TimestampTz inactive_since = 0;
1972 : :
1973 : 0 : for (;;)
1974 : : {
1975 : 0 : XLogRecPtr restart_lsn;
1976 : 0 : NameData slotname;
1977 : 0 : int active_pid = 0;
1978 : 0 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1979 : 0 : TimestampTz now = 0;
1980 : 0 : long slot_idle_secs = 0;
1981 : :
1982 [ # # ]: 0 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1983 : :
1984 [ # # ]: 0 : if (!s->in_use)
1985 : : {
1986 [ # # ]: 0 : if (released_lock)
1987 : 0 : LWLockRelease(ReplicationSlotControlLock);
1988 : 0 : break;
1989 : : }
1990 : :
1991 [ # # ]: 0 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1992 : : {
1993 : : /*
1994 : : * Assign the current time here to avoid system call overhead
1995 : : * while holding the spinlock in subsequent code.
1996 : : */
1997 : 0 : now = GetCurrentTimestamp();
1998 : 0 : }
1999 : :
2000 : : /*
2001 : : * Check if the slot needs to be invalidated. If it needs to be
2002 : : * invalidated, and is not currently acquired, acquire it and mark it
2003 : : * as having been invalidated. We do this with the spinlock held to
2004 : : * avoid race conditions -- for example the restart_lsn could move
2005 : : * forward, or the slot could be dropped.
2006 : : */
2007 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
2008 : :
2009 : 0 : restart_lsn = s->data.restart_lsn;
2010 : :
2011 : : /* we do nothing if the slot is already invalid */
2012 [ # # ]: 0 : if (s->data.invalidated == RS_INVAL_NONE)
2013 : 0 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2014 : 0 : s, oldestLSN,
2015 : 0 : dboid,
2016 : 0 : snapshotConflictHorizon,
2017 : : &inactive_since,
2018 : 0 : now);
2019 : :
2020 : : /* if there's no invalidation, we're done */
2021 [ # # ]: 0 : if (invalidation_cause == RS_INVAL_NONE)
2022 : : {
2023 : 0 : SpinLockRelease(&s->mutex);
2024 [ # # ]: 0 : if (released_lock)
2025 : 0 : LWLockRelease(ReplicationSlotControlLock);
2026 : 0 : break;
2027 : : }
2028 : :
2029 : 0 : slotname = s->data.name;
2030 : 0 : active_pid = s->active_pid;
2031 : :
2032 : : /*
2033 : : * If the slot can be acquired, do so and mark it invalidated
2034 : : * immediately. Otherwise we'll signal the owning process, below, and
2035 : : * retry.
2036 : : *
2037 : : * Note: Unlike other slot attributes, slot's inactive_since can't be
2038 : : * changed until the acquired slot is released or the owning process
2039 : : * is terminated. So, the inactive slot can only be invalidated
2040 : : * immediately without being terminated.
2041 : : */
2042 [ # # ]: 0 : if (active_pid == 0)
2043 : : {
2044 : 0 : MyReplicationSlot = s;
2045 : 0 : s->active_pid = MyProcPid;
2046 : 0 : s->data.invalidated = invalidation_cause;
2047 : :
2048 : : /*
2049 : : * XXX: We should consider not overwriting restart_lsn and instead
2050 : : * just rely on .invalidated.
2051 : : */
2052 [ # # ]: 0 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2053 : : {
2054 : 0 : s->data.restart_lsn = InvalidXLogRecPtr;
2055 : 0 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
2056 : 0 : }
2057 : :
2058 : : /* Let caller know */
2059 : 0 : invalidated = true;
2060 : 0 : }
2061 : :
2062 : 0 : SpinLockRelease(&s->mutex);
2063 : :
2064 : : /*
2065 : : * Calculate the idle time duration of the slot if slot is marked
2066 : : * invalidated with RS_INVAL_IDLE_TIMEOUT.
2067 : : */
2068 [ # # ]: 0 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2069 : : {
2070 : 0 : int slot_idle_usecs;
2071 : :
2072 : 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
2073 : : &slot_idle_usecs);
2074 : 0 : }
2075 : :
2076 [ # # ]: 0 : if (active_pid != 0)
2077 : : {
2078 : : /*
2079 : : * Prepare the sleep on the slot's condition variable before
2080 : : * releasing the lock, to close a possible race condition if the
2081 : : * slot is released before the sleep below.
2082 : : */
2083 : 0 : ConditionVariablePrepareToSleep(&s->active_cv);
2084 : :
2085 : 0 : LWLockRelease(ReplicationSlotControlLock);
2086 : 0 : released_lock = true;
2087 : :
2088 : : /*
2089 : : * Signal to terminate the process that owns the slot, if we
2090 : : * haven't already signalled it. (Avoidance of repeated
2091 : : * signalling is the only reason for there to be a loop in this
2092 : : * routine; otherwise we could rely on caller's restart loop.)
2093 : : *
2094 : : * There is the race condition that other process may own the slot
2095 : : * after its current owner process is terminated and before this
2096 : : * process owns it. To handle that, we signal only if the PID of
2097 : : * the owning process has changed from the previous time. (This
2098 : : * logic assumes that the same PID is not reused very quickly.)
2099 : : */
2100 [ # # ]: 0 : if (last_signaled_pid != active_pid)
2101 : : {
2102 : 0 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2103 : 0 : slotname, restart_lsn,
2104 : 0 : oldestLSN, snapshotConflictHorizon,
2105 : 0 : slot_idle_secs);
2106 : :
2107 [ # # ]: 0 : if (MyBackendType == B_STARTUP)
2108 : 0 : (void) SendProcSignal(active_pid,
2109 : : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
2110 : : INVALID_PROC_NUMBER);
2111 : : else
2112 : 0 : (void) kill(active_pid, SIGTERM);
2113 : :
2114 : 0 : last_signaled_pid = active_pid;
2115 : 0 : }
2116 : :
2117 : : /* Wait until the slot is released. */
2118 : 0 : ConditionVariableSleep(&s->active_cv,
2119 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
2120 : :
2121 : : /*
2122 : : * Re-acquire lock and start over; we expect to invalidate the
2123 : : * slot next time (unless another process acquires the slot in the
2124 : : * meantime).
2125 : : *
2126 : : * Note: It is possible for a slot to advance its restart_lsn or
2127 : : * xmin values sufficiently between when we release the mutex and
2128 : : * when we recheck, moving from a conflicting state to a non
2129 : : * conflicting state. This is intentional and safe: if the slot
2130 : : * has caught up while we're busy here, the resources we were
2131 : : * concerned about (WAL segments or tuples) have not yet been
2132 : : * removed, and there's no reason to invalidate the slot.
2133 : : */
2134 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2135 : 0 : continue;
2136 : : }
2137 : : else
2138 : : {
2139 : : /*
2140 : : * We hold the slot now and have already invalidated it; flush it
2141 : : * to ensure that state persists.
2142 : : *
2143 : : * Don't want to hold ReplicationSlotControlLock across file
2144 : : * system operations, so release it now but be sure to tell caller
2145 : : * to restart from scratch.
2146 : : */
2147 : 0 : LWLockRelease(ReplicationSlotControlLock);
2148 : 0 : released_lock = true;
2149 : :
2150 : : /* Make sure the invalidated state persists across server restart */
2151 : 0 : ReplicationSlotMarkDirty();
2152 : 0 : ReplicationSlotSave();
2153 : 0 : ReplicationSlotRelease();
2154 : :
2155 : 0 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2156 : 0 : slotname, restart_lsn,
2157 : 0 : oldestLSN, snapshotConflictHorizon,
2158 : 0 : slot_idle_secs);
2159 : :
2160 : : /* done with this slot for now */
2161 : 0 : break;
2162 : : }
2163 [ # # # ]: 0 : }
2164 : :
2165 [ # # ]: 0 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2166 : :
2167 : 0 : *released_lock_out = released_lock;
2168 : 0 : return invalidated;
2169 : 0 : }
2170 : :
2171 : : /*
2172 : : * Invalidate slots that require resources about to be removed.
2173 : : *
2174 : : * Returns true when any slot have got invalidated.
2175 : : *
2176 : : * Whether a slot needs to be invalidated depends on the invalidation cause.
2177 : : * A slot is invalidated if it:
2178 : : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2179 : : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2180 : : * db; dboid may be InvalidOid for shared relations
2181 : : * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2182 : : * logical.
2183 : : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2184 : : * "idle_replication_slot_timeout" duration.
2185 : : *
2186 : : * Note: This function attempts to invalidate the slot for multiple possible
2187 : : * causes in a single pass, minimizing redundant iterations. The "cause"
2188 : : * parameter can be a MASK representing one or more of the defined causes.
2189 : : *
2190 : : * If it invalidates the last logical slot in the cluster, it requests to
2191 : : * disable logical decoding.
2192 : : *
2193 : : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2194 : : */
2195 : : bool
2196 : 7 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2197 : : XLogSegNo oldestSegno, Oid dboid,
2198 : : TransactionId snapshotConflictHorizon)
2199 : : {
2200 : 7 : XLogRecPtr oldestLSN;
2201 : 7 : bool invalidated = false;
2202 : 7 : bool invalidated_logical = false;
2203 : 7 : bool found_valid_logicalslot;
2204 : :
2205 [ - + # # ]: 7 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2206 [ + - + - ]: 7 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2207 [ + - ]: 7 : Assert(possible_causes != RS_INVAL_NONE);
2208 : :
2209 [ + - ]: 7 : if (max_replication_slots == 0)
2210 : 0 : return invalidated;
2211 : :
2212 : 7 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2213 : :
2214 : : restart:
2215 : 7 : found_valid_logicalslot = false;
2216 : 7 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2217 [ + + - - : 77 : for (int i = 0; i < max_replication_slots; i++)
+ ]
2218 : : {
2219 : 70 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2220 : 70 : bool released_lock = false;
2221 : :
2222 [ - + ]: 70 : if (!s->in_use)
2223 : 70 : continue;
2224 : :
2225 : : /* Prevent invalidation of logical slots during binary upgrade */
2226 [ # # # # ]: 0 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2227 : : {
2228 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
2229 : 0 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2230 : 0 : SpinLockRelease(&s->mutex);
2231 : :
2232 : 0 : continue;
2233 : : }
2234 : :
2235 [ # # # # ]: 0 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2236 : 0 : dboid, snapshotConflictHorizon,
2237 : : &released_lock))
2238 : : {
2239 [ # # ]: 0 : Assert(released_lock);
2240 : :
2241 : : /* Remember we have invalidated a physical or logical slot */
2242 : 0 : invalidated = true;
2243 : :
2244 : : /*
2245 : : * Additionally, remember we have invalidated a logical slot as we
2246 : : * can request disabling logical decoding later.
2247 : : */
2248 [ # # ]: 0 : if (SlotIsLogical(s))
2249 : 0 : invalidated_logical = true;
2250 : 0 : }
2251 : : else
2252 : : {
2253 : : /*
2254 : : * We need to check if the slot is invalidated here since
2255 : : * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2256 : : * is already invalidated.
2257 : : */
2258 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
2259 : 0 : found_valid_logicalslot |=
2260 [ # # ]: 0 : (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
2261 : 0 : SpinLockRelease(&s->mutex);
2262 : : }
2263 : :
2264 : : /* if the lock was released, start from scratch */
2265 [ # # ]: 0 : if (released_lock)
2266 : 0 : goto restart;
2267 [ + - - ]: 70 : }
2268 : 7 : LWLockRelease(ReplicationSlotControlLock);
2269 : :
2270 : : /*
2271 : : * If any slots have been invalidated, recalculate the resource limits.
2272 : : */
2273 [ + - ]: 7 : if (invalidated)
2274 : : {
2275 : 0 : ReplicationSlotsComputeRequiredXmin(false);
2276 : 0 : ReplicationSlotsComputeRequiredLSN();
2277 : 0 : }
2278 : :
2279 : : /*
2280 : : * Request the checkpointer to disable logical decoding if no valid
2281 : : * logical slots remain. If called by the checkpointer during a
2282 : : * checkpoint, only the request is initiated; actual deactivation is
2283 : : * deferred until after the checkpoint completes.
2284 : : */
2285 [ - + # # ]: 7 : if (invalidated_logical && !found_valid_logicalslot)
2286 : 0 : RequestDisableLogicalDecoding();
2287 : :
2288 : 7 : return invalidated;
2289 : 7 : }
2290 : :
2291 : : /*
2292 : : * Flush all replication slots to disk.
2293 : : *
2294 : : * It is convenient to flush dirty replication slots at the time of checkpoint.
2295 : : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2296 : : * for which the confirmed_flush LSN has been updated since the last time it
2297 : : * was saved and flush them.
2298 : : */
2299 : : void
2300 : 7 : CheckPointReplicationSlots(bool is_shutdown)
2301 : : {
2302 : 7 : int i;
2303 : 7 : bool last_saved_restart_lsn_updated = false;
2304 : :
2305 [ - + - + ]: 7 : elog(DEBUG1, "performing replication slot checkpoint");
2306 : :
2307 : : /*
2308 : : * Prevent any slot from being created/dropped while we're active. As we
2309 : : * explicitly do *not* want to block iterating over replication_slots or
2310 : : * acquiring a slot we cannot take the control lock - but that's OK,
2311 : : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2312 : : * enough to guarantee that nobody can change the in_use bits on us.
2313 : : *
2314 : : * Additionally, acquiring the Allocation lock is necessary to serialize
2315 : : * the slot flush process with concurrent slot WAL reservation. This
2316 : : * ensures that the WAL position being reserved is either flushed to disk
2317 : : * or is beyond or equal to the redo pointer of the current checkpoint
2318 : : * (See ReplicationSlotReserveWal for details).
2319 : : */
2320 : 7 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2321 : :
2322 [ + + ]: 77 : for (i = 0; i < max_replication_slots; i++)
2323 : : {
2324 : 70 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2325 : 70 : char path[MAXPGPATH];
2326 : :
2327 [ - + ]: 70 : if (!s->in_use)
2328 : 70 : continue;
2329 : :
2330 : : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2331 : 0 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2332 : :
2333 : : /*
2334 : : * Slot's data is not flushed each time the confirmed_flush LSN is
2335 : : * updated as that could lead to frequent writes. However, we decide
2336 : : * to force a flush of all logical slot's data at the time of shutdown
2337 : : * if the confirmed_flush LSN is changed since we last flushed it to
2338 : : * disk. This helps in avoiding an unnecessary retreat of the
2339 : : * confirmed_flush LSN after restart.
2340 : : */
2341 [ # # # # ]: 0 : if (is_shutdown && SlotIsLogical(s))
2342 : : {
2343 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
2344 : :
2345 [ # # # # ]: 0 : if (s->data.invalidated == RS_INVAL_NONE &&
2346 : 0 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2347 : : {
2348 : 0 : s->just_dirtied = true;
2349 : 0 : s->dirty = true;
2350 : 0 : }
2351 : 0 : SpinLockRelease(&s->mutex);
2352 : 0 : }
2353 : :
2354 : : /*
2355 : : * Track if we're going to update slot's last_saved_restart_lsn. We
2356 : : * need this to know if we need to recompute the required LSN.
2357 : : */
2358 [ # # ]: 0 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2359 : 0 : last_saved_restart_lsn_updated = true;
2360 : :
2361 : 0 : SaveSlotToPath(s, path, LOG);
2362 [ - + - ]: 70 : }
2363 : 7 : LWLockRelease(ReplicationSlotAllocationLock);
2364 : :
2365 : : /*
2366 : : * Recompute the required LSN if SaveSlotToPath() updated
2367 : : * last_saved_restart_lsn for any slot.
2368 : : */
2369 [ + - ]: 7 : if (last_saved_restart_lsn_updated)
2370 : 0 : ReplicationSlotsComputeRequiredLSN();
2371 : 7 : }
2372 : :
2373 : : /*
2374 : : * Load all replication slots from disk into memory at server startup. This
2375 : : * needs to be run before we start crash recovery.
2376 : : */
2377 : : void
2378 : 4 : StartupReplicationSlots(void)
2379 : : {
2380 : 4 : DIR *replication_dir;
2381 : 4 : struct dirent *replication_de;
2382 : :
2383 [ - + - + ]: 4 : elog(DEBUG1, "starting up replication slots");
2384 : :
2385 : : /* restore all slots by iterating over all on-disk entries */
2386 : 4 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2387 [ + + ]: 12 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2388 : : {
2389 : 8 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2390 : 8 : PGFileType de_type;
2391 : :
2392 [ + + + - ]: 8 : if (strcmp(replication_de->d_name, ".") == 0 ||
2393 : 4 : strcmp(replication_de->d_name, "..") == 0)
2394 : 8 : continue;
2395 : :
2396 : 0 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2397 : 0 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2398 : :
2399 : : /* we're only creating directories here, skip if it's not our's */
2400 [ # # # # ]: 0 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2401 : 0 : continue;
2402 : :
2403 : : /* we crashed while a slot was being setup or deleted, clean up */
2404 [ # # ]: 0 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2405 : : {
2406 [ # # ]: 0 : if (!rmtree(path, true))
2407 : : {
2408 [ # # # # ]: 0 : ereport(WARNING,
2409 : : (errmsg("could not remove directory \"%s\"",
2410 : : path)));
2411 : 0 : continue;
2412 : : }
2413 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2414 : 0 : continue;
2415 : : }
2416 : :
2417 : : /* looks like a slot in a normal state, restore */
2418 : 0 : RestoreSlotFromDisk(replication_de->d_name);
2419 [ + - ]: 8 : }
2420 : 4 : FreeDir(replication_dir);
2421 : :
2422 : : /* currently no slots exist, we're done. */
2423 [ - + ]: 4 : if (max_replication_slots <= 0)
2424 : 0 : return;
2425 : :
2426 : : /* Now that we have recovered all the data, compute replication xmin */
2427 : 4 : ReplicationSlotsComputeRequiredXmin(false);
2428 : 4 : ReplicationSlotsComputeRequiredLSN();
2429 : 4 : }
2430 : :
2431 : : /* ----
2432 : : * Manipulation of on-disk state of replication slots
2433 : : *
2434 : : * NB: none of the routines below should take any notice whether a slot is the
2435 : : * current one or not, that's all handled a layer above.
2436 : : * ----
2437 : : */
2438 : : static void
2439 : 0 : CreateSlotOnDisk(ReplicationSlot *slot)
2440 : : {
2441 : 0 : char tmppath[MAXPGPATH];
2442 : 0 : char path[MAXPGPATH];
2443 : 0 : struct stat st;
2444 : :
2445 : : /*
2446 : : * No need to take out the io_in_progress_lock, nobody else can see this
2447 : : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2448 : : * takes out the lock, if we'd take the lock here, we'd deadlock.
2449 : : */
2450 : :
2451 : 0 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2452 : 0 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2453 : :
2454 : : /*
2455 : : * It's just barely possible that some previous effort to create or drop a
2456 : : * slot with this name left a temp directory lying around. If that seems
2457 : : * to be the case, try to remove it. If the rmtree() fails, we'll error
2458 : : * out at the MakePGDirectory() below, so we don't bother checking
2459 : : * success.
2460 : : */
2461 [ # # # # ]: 0 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2462 : 0 : rmtree(tmppath, true);
2463 : :
2464 : : /* Create and fsync the temporary slot directory. */
2465 [ # # ]: 0 : if (MakePGDirectory(tmppath) < 0)
2466 [ # # # # ]: 0 : ereport(ERROR,
2467 : : (errcode_for_file_access(),
2468 : : errmsg("could not create directory \"%s\": %m",
2469 : : tmppath)));
2470 : 0 : fsync_fname(tmppath, true);
2471 : :
2472 : : /* Write the actual state file. */
2473 : 0 : slot->dirty = true; /* signal that we really need to write */
2474 : 0 : SaveSlotToPath(slot, tmppath, ERROR);
2475 : :
2476 : : /* Rename the directory into place. */
2477 [ # # ]: 0 : if (rename(tmppath, path) != 0)
2478 [ # # # # ]: 0 : ereport(ERROR,
2479 : : (errcode_for_file_access(),
2480 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2481 : : tmppath, path)));
2482 : :
2483 : : /*
2484 : : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2485 : : * would persist after an OS crash or not - so, force a restart. The
2486 : : * restart would try to fsync this again till it works.
2487 : : */
2488 : 0 : START_CRIT_SECTION();
2489 : :
2490 : 0 : fsync_fname(path, true);
2491 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2492 : :
2493 [ # # ]: 0 : END_CRIT_SECTION();
2494 : 0 : }
2495 : :
2496 : : /*
2497 : : * Shared functionality between saving and creating a replication slot.
2498 : : */
2499 : : static void
2500 : 0 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2501 : : {
2502 : 0 : char tmppath[MAXPGPATH];
2503 : 0 : char path[MAXPGPATH];
2504 : 0 : int fd;
2505 : 0 : ReplicationSlotOnDisk cp;
2506 : 0 : bool was_dirty;
2507 : :
2508 : : /* first check whether there's something to write out */
2509 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
2510 : 0 : was_dirty = slot->dirty;
2511 : 0 : slot->just_dirtied = false;
2512 : 0 : SpinLockRelease(&slot->mutex);
2513 : :
2514 : : /* and don't do anything if there's nothing to write */
2515 [ # # ]: 0 : if (!was_dirty)
2516 : 0 : return;
2517 : :
2518 : 0 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2519 : :
2520 : : /* silence valgrind :( */
2521 : 0 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2522 : :
2523 : 0 : sprintf(tmppath, "%s/state.tmp", dir);
2524 : 0 : sprintf(path, "%s/state", dir);
2525 : :
2526 : 0 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2527 [ # # ]: 0 : if (fd < 0)
2528 : : {
2529 : : /*
2530 : : * If not an ERROR, then release the lock before returning. In case
2531 : : * of an ERROR, the error recovery path automatically releases the
2532 : : * lock, but no harm in explicitly releasing even in that case. Note
2533 : : * that LWLockRelease() could affect errno.
2534 : : */
2535 : 0 : int save_errno = errno;
2536 : :
2537 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2538 : 0 : errno = save_errno;
2539 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
2540 : : (errcode_for_file_access(),
2541 : : errmsg("could not create file \"%s\": %m",
2542 : : tmppath)));
2543 : : return;
2544 : 0 : }
2545 : :
2546 : 0 : cp.magic = SLOT_MAGIC;
2547 : 0 : INIT_CRC32C(cp.checksum);
2548 : 0 : cp.version = SLOT_VERSION;
2549 : 0 : cp.length = ReplicationSlotOnDiskV2Size;
2550 : :
2551 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
2552 : :
2553 : 0 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2554 : :
2555 : 0 : SpinLockRelease(&slot->mutex);
2556 : :
2557 : 0 : COMP_CRC32C(cp.checksum,
2558 : : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2559 : : ReplicationSlotOnDiskChecksummedSize);
2560 : 0 : FIN_CRC32C(cp.checksum);
2561 : :
2562 : 0 : errno = 0;
2563 : 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2564 [ # # ]: 0 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2565 : : {
2566 : 0 : int save_errno = errno;
2567 : :
2568 : 0 : pgstat_report_wait_end();
2569 : 0 : CloseTransientFile(fd);
2570 : 0 : unlink(tmppath);
2571 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2572 : :
2573 : : /* if write didn't set errno, assume problem is no disk space */
2574 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
2575 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
2576 : : (errcode_for_file_access(),
2577 : : errmsg("could not write to file \"%s\": %m",
2578 : : tmppath)));
2579 : : return;
2580 : 0 : }
2581 : 0 : pgstat_report_wait_end();
2582 : :
2583 : : /* fsync the temporary file */
2584 : 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2585 [ # # ]: 0 : if (pg_fsync(fd) != 0)
2586 : : {
2587 : 0 : int save_errno = errno;
2588 : :
2589 : 0 : pgstat_report_wait_end();
2590 : 0 : CloseTransientFile(fd);
2591 : 0 : unlink(tmppath);
2592 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2593 : :
2594 : 0 : errno = save_errno;
2595 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
2596 : : (errcode_for_file_access(),
2597 : : errmsg("could not fsync file \"%s\": %m",
2598 : : tmppath)));
2599 : : return;
2600 : 0 : }
2601 : 0 : pgstat_report_wait_end();
2602 : :
2603 [ # # ]: 0 : if (CloseTransientFile(fd) != 0)
2604 : : {
2605 : 0 : int save_errno = errno;
2606 : :
2607 : 0 : unlink(tmppath);
2608 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2609 : :
2610 : 0 : errno = save_errno;
2611 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
2612 : : (errcode_for_file_access(),
2613 : : errmsg("could not close file \"%s\": %m",
2614 : : tmppath)));
2615 : : return;
2616 : 0 : }
2617 : :
2618 : : /* rename to permanent file, fsync file and directory */
2619 [ # # ]: 0 : if (rename(tmppath, path) != 0)
2620 : : {
2621 : 0 : int save_errno = errno;
2622 : :
2623 : 0 : unlink(tmppath);
2624 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2625 : :
2626 : 0 : errno = save_errno;
2627 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
2628 : : (errcode_for_file_access(),
2629 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2630 : : tmppath, path)));
2631 : : return;
2632 : 0 : }
2633 : :
2634 : : /*
2635 : : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2636 : : */
2637 : 0 : START_CRIT_SECTION();
2638 : :
2639 : 0 : fsync_fname(path, false);
2640 : 0 : fsync_fname(dir, true);
2641 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2642 : :
2643 [ # # ]: 0 : END_CRIT_SECTION();
2644 : :
2645 : : /*
2646 : : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2647 : : * already and remember the confirmed_flush LSN value.
2648 : : */
2649 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
2650 [ # # ]: 0 : if (!slot->just_dirtied)
2651 : 0 : slot->dirty = false;
2652 : 0 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2653 : 0 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2654 : 0 : SpinLockRelease(&slot->mutex);
2655 : :
2656 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2657 [ # # ]: 0 : }
2658 : :
2659 : : /*
2660 : : * Load a single slot from disk into memory.
2661 : : */
2662 : : static void
2663 : 0 : RestoreSlotFromDisk(const char *name)
2664 : : {
2665 : 0 : ReplicationSlotOnDisk cp;
2666 : 0 : int i;
2667 : 0 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2668 : 0 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2669 : 0 : int fd;
2670 : 0 : bool restored = false;
2671 : 0 : int readBytes;
2672 : 0 : pg_crc32c checksum;
2673 : 0 : TimestampTz now = 0;
2674 : :
2675 : : /* no need to lock here, no concurrent access allowed yet */
2676 : :
2677 : : /* delete temp file if it exists */
2678 : 0 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2679 : 0 : sprintf(path, "%s/state.tmp", slotdir);
2680 [ # # # # ]: 0 : if (unlink(path) < 0 && errno != ENOENT)
2681 [ # # # # ]: 0 : ereport(PANIC,
2682 : : (errcode_for_file_access(),
2683 : : errmsg("could not remove file \"%s\": %m", path)));
2684 : :
2685 : 0 : sprintf(path, "%s/state", slotdir);
2686 : :
2687 [ # # # # ]: 0 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2688 : :
2689 : : /* on some operating systems fsyncing a file requires O_RDWR */
2690 : 0 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2691 : :
2692 : : /*
2693 : : * We do not need to handle this as we are rename()ing the directory into
2694 : : * place only after we fsync()ed the state file.
2695 : : */
2696 [ # # ]: 0 : if (fd < 0)
2697 [ # # # # ]: 0 : ereport(PANIC,
2698 : : (errcode_for_file_access(),
2699 : : errmsg("could not open file \"%s\": %m", path)));
2700 : :
2701 : : /*
2702 : : * Sync state file before we're reading from it. We might have crashed
2703 : : * while it wasn't synced yet and we shouldn't continue on that basis.
2704 : : */
2705 : 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2706 [ # # ]: 0 : if (pg_fsync(fd) != 0)
2707 [ # # # # ]: 0 : ereport(PANIC,
2708 : : (errcode_for_file_access(),
2709 : : errmsg("could not fsync file \"%s\": %m",
2710 : : path)));
2711 : 0 : pgstat_report_wait_end();
2712 : :
2713 : : /* Also sync the parent directory */
2714 : 0 : START_CRIT_SECTION();
2715 : 0 : fsync_fname(slotdir, true);
2716 [ # # ]: 0 : END_CRIT_SECTION();
2717 : :
2718 : : /* read part of statefile that's guaranteed to be version independent */
2719 : 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2720 : 0 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2721 : 0 : pgstat_report_wait_end();
2722 [ # # ]: 0 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2723 : : {
2724 [ # # ]: 0 : if (readBytes < 0)
2725 [ # # # # ]: 0 : ereport(PANIC,
2726 : : (errcode_for_file_access(),
2727 : : errmsg("could not read file \"%s\": %m", path)));
2728 : : else
2729 [ # # # # ]: 0 : ereport(PANIC,
2730 : : (errcode(ERRCODE_DATA_CORRUPTED),
2731 : : errmsg("could not read file \"%s\": read %d of %zu",
2732 : : path, readBytes,
2733 : : (Size) ReplicationSlotOnDiskConstantSize)));
2734 : 0 : }
2735 : :
2736 : : /* verify magic */
2737 [ # # ]: 0 : if (cp.magic != SLOT_MAGIC)
2738 [ # # # # ]: 0 : ereport(PANIC,
2739 : : (errcode(ERRCODE_DATA_CORRUPTED),
2740 : : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2741 : : path, cp.magic, SLOT_MAGIC)));
2742 : :
2743 : : /* verify version */
2744 [ # # ]: 0 : if (cp.version != SLOT_VERSION)
2745 [ # # # # ]: 0 : ereport(PANIC,
2746 : : (errcode(ERRCODE_DATA_CORRUPTED),
2747 : : errmsg("replication slot file \"%s\" has unsupported version %u",
2748 : : path, cp.version)));
2749 : :
2750 : : /* boundary check on length */
2751 [ # # ]: 0 : if (cp.length != ReplicationSlotOnDiskV2Size)
2752 [ # # # # ]: 0 : ereport(PANIC,
2753 : : (errcode(ERRCODE_DATA_CORRUPTED),
2754 : : errmsg("replication slot file \"%s\" has corrupted length %u",
2755 : : path, cp.length)));
2756 : :
2757 : : /* Now that we know the size, read the entire file */
2758 : 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2759 : 0 : readBytes = read(fd,
2760 : 0 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2761 : 0 : cp.length);
2762 : 0 : pgstat_report_wait_end();
2763 [ # # ]: 0 : if (readBytes != cp.length)
2764 : : {
2765 [ # # ]: 0 : if (readBytes < 0)
2766 [ # # # # ]: 0 : ereport(PANIC,
2767 : : (errcode_for_file_access(),
2768 : : errmsg("could not read file \"%s\": %m", path)));
2769 : : else
2770 [ # # # # ]: 0 : ereport(PANIC,
2771 : : (errcode(ERRCODE_DATA_CORRUPTED),
2772 : : errmsg("could not read file \"%s\": read %d of %zu",
2773 : : path, readBytes, (Size) cp.length)));
2774 : 0 : }
2775 : :
2776 [ # # ]: 0 : if (CloseTransientFile(fd) != 0)
2777 [ # # # # ]: 0 : ereport(PANIC,
2778 : : (errcode_for_file_access(),
2779 : : errmsg("could not close file \"%s\": %m", path)));
2780 : :
2781 : : /* now verify the CRC */
2782 : 0 : INIT_CRC32C(checksum);
2783 : 0 : COMP_CRC32C(checksum,
2784 : : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2785 : : ReplicationSlotOnDiskChecksummedSize);
2786 : 0 : FIN_CRC32C(checksum);
2787 : :
2788 [ # # ]: 0 : if (!EQ_CRC32C(checksum, cp.checksum))
2789 [ # # # # ]: 0 : ereport(PANIC,
2790 : : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2791 : : path, checksum, cp.checksum)));
2792 : :
2793 : : /*
2794 : : * If we crashed with an ephemeral slot active, don't restore but delete
2795 : : * it.
2796 : : */
2797 [ # # ]: 0 : if (cp.slotdata.persistency != RS_PERSISTENT)
2798 : : {
2799 [ # # ]: 0 : if (!rmtree(slotdir, true))
2800 : : {
2801 [ # # # # ]: 0 : ereport(WARNING,
2802 : : (errmsg("could not remove directory \"%s\"",
2803 : : slotdir)));
2804 : 0 : }
2805 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2806 : 0 : return;
2807 : : }
2808 : :
2809 : : /*
2810 : : * Verify that requirements for the specific slot type are met. That's
2811 : : * important because if these aren't met we're not guaranteed to retain
2812 : : * all the necessary resources for the slot.
2813 : : *
2814 : : * NB: We have to do so *after* the above checks for ephemeral slots,
2815 : : * because otherwise a slot that shouldn't exist anymore could prevent
2816 : : * restarts.
2817 : : *
2818 : : * NB: Changing the requirements here also requires adapting
2819 : : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2820 : : */
2821 [ # # ]: 0 : if (cp.slotdata.database != InvalidOid)
2822 : : {
2823 [ # # ]: 0 : if (wal_level < WAL_LEVEL_REPLICA)
2824 [ # # # # ]: 0 : ereport(FATAL,
2825 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2826 : : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2827 : : NameStr(cp.slotdata.name)),
2828 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2829 : :
2830 : : /*
2831 : : * In standby mode, the hot standby must be enabled. This check is
2832 : : * necessary to ensure logical slots are invalidated when they become
2833 : : * incompatible due to insufficient wal_level. Otherwise, if the
2834 : : * primary reduces effective_wal_level < logical while hot standby is
2835 : : * disabled, primary disable logical decoding while hot standby is
2836 : : * disabled, logical slots would remain valid even after promotion.
2837 : : */
2838 [ # # # # ]: 0 : if (StandbyMode && !EnableHotStandby)
2839 [ # # # # ]: 0 : ereport(FATAL,
2840 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2841 : : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2842 : : NameStr(cp.slotdata.name)),
2843 : : errhint("Change \"hot_standby\" to be \"on\".")));
2844 : 0 : }
2845 [ # # ]: 0 : else if (wal_level < WAL_LEVEL_REPLICA)
2846 [ # # # # ]: 0 : ereport(FATAL,
2847 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2848 : : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2849 : : NameStr(cp.slotdata.name)),
2850 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2851 : :
2852 : : /* nothing can be active yet, don't lock anything */
2853 [ # # ]: 0 : for (i = 0; i < max_replication_slots; i++)
2854 : : {
2855 : 0 : ReplicationSlot *slot;
2856 : :
2857 : 0 : slot = &ReplicationSlotCtl->replication_slots[i];
2858 : :
2859 [ # # ]: 0 : if (slot->in_use)
2860 : 0 : continue;
2861 : :
2862 : : /* restore the entire set of persistent data */
2863 : 0 : memcpy(&slot->data, &cp.slotdata,
2864 : : sizeof(ReplicationSlotPersistentData));
2865 : :
2866 : : /* initialize in memory state */
2867 : 0 : slot->effective_xmin = cp.slotdata.xmin;
2868 : 0 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2869 : 0 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2870 : 0 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2871 : :
2872 : 0 : slot->candidate_catalog_xmin = InvalidTransactionId;
2873 : 0 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2874 : 0 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2875 : 0 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2876 : :
2877 : 0 : slot->in_use = true;
2878 : 0 : slot->active_pid = 0;
2879 : :
2880 : : /*
2881 : : * Set the time since the slot has become inactive after loading the
2882 : : * slot from the disk into memory. Whoever acquires the slot i.e.
2883 : : * makes the slot active will reset it. Use the same inactive_since
2884 : : * time for all the slots.
2885 : : */
2886 [ # # ]: 0 : if (now == 0)
2887 : 0 : now = GetCurrentTimestamp();
2888 : :
2889 : 0 : ReplicationSlotSetInactiveSince(slot, now, false);
2890 : :
2891 : 0 : restored = true;
2892 : 0 : break;
2893 [ # # ]: 0 : }
2894 : :
2895 [ # # ]: 0 : if (!restored)
2896 [ # # # # ]: 0 : ereport(FATAL,
2897 : : (errmsg("too many replication slots active before shutdown"),
2898 : : errhint("Increase \"max_replication_slots\" and try again.")));
2899 : 0 : }
2900 : :
2901 : : /*
2902 : : * Maps an invalidation reason for a replication slot to
2903 : : * ReplicationSlotInvalidationCause.
2904 : : */
2905 : : ReplicationSlotInvalidationCause
2906 : 0 : GetSlotInvalidationCause(const char *cause_name)
2907 : : {
2908 [ # # ]: 0 : Assert(cause_name);
2909 : :
2910 : : /* Search lookup table for the cause having this name */
2911 [ # # # # : 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
# ]
2912 : : {
2913 [ # # ]: 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2914 : 0 : return SlotInvalidationCauses[i].cause;
2915 : 0 : }
2916 : :
2917 : 0 : Assert(false);
2918 : 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2919 : 0 : }
2920 : :
2921 : : /*
2922 : : * Maps a ReplicationSlotInvalidationCause to the invalidation
2923 : : * reason for a replication slot.
2924 : : */
2925 : : const char *
2926 : 0 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2927 : : {
2928 : : /* Search lookup table for the name of this cause */
2929 [ # # # # : 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
# ]
2930 : : {
2931 [ # # ]: 0 : if (SlotInvalidationCauses[i].cause == cause)
2932 : 0 : return SlotInvalidationCauses[i].cause_name;
2933 : 0 : }
2934 : :
2935 : 0 : Assert(false);
2936 : 0 : return "none"; /* to keep compiler quiet */
2937 : 0 : }
2938 : :
2939 : : /*
2940 : : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2941 : : *
2942 : : * The rawname will be parsed, and the result will be saved into *elemlist.
2943 : : */
2944 : : static bool
2945 : 0 : validate_sync_standby_slots(char *rawname, List **elemlist)
2946 : : {
2947 : : /* Verify syntax and parse string into a list of identifiers */
2948 [ # # ]: 0 : if (!SplitIdentifierString(rawname, ',', elemlist))
2949 : : {
2950 : 0 : GUC_check_errdetail("List syntax is invalid.");
2951 : 0 : return false;
2952 : : }
2953 : :
2954 : : /* Iterate the list to validate each slot name */
2955 [ # # # # : 0 : foreach_ptr(char, name, *elemlist)
# # # # #
# # # # ]
2956 : : {
2957 : 0 : int err_code;
2958 : 0 : char *err_msg = NULL;
2959 : 0 : char *err_hint = NULL;
2960 : :
2961 [ # # ]: 0 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2962 : : &err_msg, &err_hint))
2963 : : {
2964 : 0 : GUC_check_errcode(err_code);
2965 : 0 : GUC_check_errdetail("%s", err_msg);
2966 [ # # ]: 0 : if (err_hint != NULL)
2967 : 0 : GUC_check_errhint("%s", err_hint);
2968 : 0 : return false;
2969 : : }
2970 [ # # ]: 0 : }
2971 : :
2972 : 0 : return true;
2973 : 0 : }
2974 : :
2975 : : /*
2976 : : * GUC check_hook for synchronized_standby_slots
2977 : : */
2978 : : bool
2979 : 6 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2980 : : {
2981 : 6 : char *rawname;
2982 : 6 : char *ptr;
2983 : 6 : List *elemlist;
2984 : 6 : int size;
2985 : 6 : bool ok;
2986 : 6 : SyncStandbySlotsConfigData *config;
2987 : :
2988 [ - + ]: 6 : if ((*newval)[0] == '\0')
2989 : 6 : return true;
2990 : :
2991 : : /* Need a modifiable copy of the GUC string */
2992 : 0 : rawname = pstrdup(*newval);
2993 : :
2994 : : /* Now verify if the specified slots exist and have correct type */
2995 : 0 : ok = validate_sync_standby_slots(rawname, &elemlist);
2996 : :
2997 [ # # # # ]: 0 : if (!ok || elemlist == NIL)
2998 : : {
2999 : 0 : pfree(rawname);
3000 : 0 : list_free(elemlist);
3001 : 0 : return ok;
3002 : : }
3003 : :
3004 : : /* Compute the size required for the SyncStandbySlotsConfigData struct */
3005 : 0 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
3006 [ # # # # : 0 : foreach_ptr(char, slot_name, elemlist)
# # # # ]
3007 : 0 : size += strlen(slot_name) + 1;
3008 : :
3009 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
3010 : 0 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3011 [ # # ]: 0 : if (!config)
3012 : 0 : return false;
3013 : :
3014 : : /* Transform the data into SyncStandbySlotsConfigData */
3015 : 0 : config->nslotnames = list_length(elemlist);
3016 : :
3017 : 0 : ptr = config->slot_names;
3018 [ # # # # : 0 : foreach_ptr(char, slot_name, elemlist)
# # # # ]
3019 : : {
3020 : 0 : strcpy(ptr, slot_name);
3021 : 0 : ptr += strlen(slot_name) + 1;
3022 : 0 : }
3023 : :
3024 : 0 : *extra = config;
3025 : :
3026 : 0 : pfree(rawname);
3027 : 0 : list_free(elemlist);
3028 : 0 : return true;
3029 : 6 : }
3030 : :
3031 : : /*
3032 : : * GUC assign_hook for synchronized_standby_slots
3033 : : */
3034 : : void
3035 : 6 : assign_synchronized_standby_slots(const char *newval, void *extra)
3036 : : {
3037 : : /*
3038 : : * The standby slots may have changed, so we must recompute the oldest
3039 : : * LSN.
3040 : : */
3041 : 6 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
3042 : :
3043 : 6 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
3044 : 6 : }
3045 : :
3046 : : /*
3047 : : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3048 : : */
3049 : : bool
3050 : 0 : SlotExistsInSyncStandbySlots(const char *slot_name)
3051 : : {
3052 : 0 : const char *standby_slot_name;
3053 : :
3054 : : /* Return false if there is no value in synchronized_standby_slots */
3055 [ # # ]: 0 : if (synchronized_standby_slots_config == NULL)
3056 : 0 : return false;
3057 : :
3058 : : /*
3059 : : * XXX: We are not expecting this list to be long so a linear search
3060 : : * shouldn't hurt but if that turns out not to be true then we can cache
3061 : : * this information for each WalSender as well.
3062 : : */
3063 : 0 : standby_slot_name = synchronized_standby_slots_config->slot_names;
3064 [ # # # # ]: 0 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3065 : : {
3066 [ # # ]: 0 : if (strcmp(standby_slot_name, slot_name) == 0)
3067 : 0 : return true;
3068 : :
3069 : 0 : standby_slot_name += strlen(standby_slot_name) + 1;
3070 : 0 : }
3071 : :
3072 : 0 : return false;
3073 : 0 : }
3074 : :
3075 : : /*
3076 : : * Return true if the slots specified in synchronized_standby_slots have caught up to
3077 : : * the given WAL location, false otherwise.
3078 : : *
3079 : : * The elevel parameter specifies the error level used for logging messages
3080 : : * related to slots that do not exist, are invalidated, or are inactive.
3081 : : */
3082 : : bool
3083 : 0 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3084 : : {
3085 : 0 : const char *name;
3086 : 0 : int caught_up_slot_num = 0;
3087 : 0 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3088 : :
3089 : : /*
3090 : : * Don't need to wait for the standbys to catch up if there is no value in
3091 : : * synchronized_standby_slots.
3092 : : */
3093 [ # # ]: 0 : if (synchronized_standby_slots_config == NULL)
3094 : 0 : return true;
3095 : :
3096 : : /*
3097 : : * Don't need to wait for the standbys to catch up if we are on a standby
3098 : : * server, since we do not support syncing slots to cascading standbys.
3099 : : */
3100 [ # # ]: 0 : if (RecoveryInProgress())
3101 : 0 : return true;
3102 : :
3103 : : /*
3104 : : * Don't need to wait for the standbys to catch up if they are already
3105 : : * beyond the specified WAL location.
3106 : : */
3107 [ # # # # ]: 0 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
3108 : 0 : ss_oldest_flush_lsn >= wait_for_lsn)
3109 : 0 : return true;
3110 : :
3111 : : /*
3112 : : * To prevent concurrent slot dropping and creation while filtering the
3113 : : * slots, take the ReplicationSlotControlLock outside of the loop.
3114 : : */
3115 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3116 : :
3117 : 0 : name = synchronized_standby_slots_config->slot_names;
3118 [ # # ]: 0 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3119 : : {
3120 : 0 : XLogRecPtr restart_lsn;
3121 : 0 : bool invalidated;
3122 : 0 : bool inactive;
3123 : 0 : ReplicationSlot *slot;
3124 : :
3125 : 0 : slot = SearchNamedReplicationSlot(name, false);
3126 : :
3127 : : /*
3128 : : * If a slot name provided in synchronized_standby_slots does not
3129 : : * exist, report a message and exit the loop.
3130 : : */
3131 [ # # ]: 0 : if (!slot)
3132 : : {
3133 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
3134 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3135 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3136 : : name, "synchronized_standby_slots"),
3137 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3138 : : name),
3139 : : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3140 : : name, "synchronized_standby_slots"));
3141 : 0 : break;
3142 : : }
3143 : :
3144 : : /* Same as above: if a slot is not physical, exit the loop. */
3145 [ # # ]: 0 : if (SlotIsLogical(slot))
3146 : : {
3147 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
3148 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3149 : : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3150 : : name, "synchronized_standby_slots"),
3151 : : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3152 : : name),
3153 : : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3154 : : name, "synchronized_standby_slots"));
3155 : 0 : break;
3156 : : }
3157 : :
3158 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
3159 : 0 : restart_lsn = slot->data.restart_lsn;
3160 : 0 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
3161 : 0 : inactive = slot->active_pid == 0;
3162 : 0 : SpinLockRelease(&slot->mutex);
3163 : :
3164 [ # # ]: 0 : if (invalidated)
3165 : : {
3166 : : /* Specified physical slot has been invalidated */
3167 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
3168 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3169 : : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3170 : : name, "synchronized_standby_slots"),
3171 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3172 : : name),
3173 : : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3174 : : name, "synchronized_standby_slots"));
3175 : 0 : break;
3176 : : }
3177 : :
3178 [ # # # # ]: 0 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3179 : : {
3180 : : /* Log a message if no active_pid for this physical slot */
3181 [ # # ]: 0 : if (inactive)
3182 [ # # # # : 0 : ereport(elevel,
# # # # #
# ]
3183 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3184 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3185 : : name, "synchronized_standby_slots"),
3186 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3187 : : name),
3188 : : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3189 : : name, "synchronized_standby_slots"));
3190 : :
3191 : : /* Continue if the current slot hasn't caught up. */
3192 : 0 : break;
3193 : : }
3194 : :
3195 [ # # ]: 0 : Assert(restart_lsn >= wait_for_lsn);
3196 : :
3197 [ # # # # ]: 0 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3198 : 0 : min_restart_lsn > restart_lsn)
3199 : 0 : min_restart_lsn = restart_lsn;
3200 : :
3201 : 0 : caught_up_slot_num++;
3202 : :
3203 : 0 : name += strlen(name) + 1;
3204 [ # # ]: 0 : }
3205 : :
3206 : 0 : LWLockRelease(ReplicationSlotControlLock);
3207 : :
3208 : : /*
3209 : : * Return false if not all the standbys have caught up to the specified
3210 : : * WAL location.
3211 : : */
3212 [ # # ]: 0 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3213 : 0 : return false;
3214 : :
3215 : : /* The ss_oldest_flush_lsn must not retreat. */
3216 [ # # # # ]: 0 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3217 : : min_restart_lsn >= ss_oldest_flush_lsn);
3218 : :
3219 : 0 : ss_oldest_flush_lsn = min_restart_lsn;
3220 : :
3221 : 0 : return true;
3222 : 0 : }
3223 : :
3224 : : /*
3225 : : * Wait for physical standbys to confirm receiving the given lsn.
3226 : : *
3227 : : * Used by logical decoding SQL functions. It waits for physical standbys
3228 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3229 : : */
3230 : : void
3231 : 0 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3232 : : {
3233 : : /*
3234 : : * Don't need to wait for the standby to catch up if the current acquired
3235 : : * slot is not a logical failover slot, or there is no value in
3236 : : * synchronized_standby_slots.
3237 : : */
3238 [ # # # # ]: 0 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3239 : 0 : return;
3240 : :
3241 : 0 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3242 : :
3243 : 0 : for (;;)
3244 : : {
3245 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
3246 : :
3247 [ # # ]: 0 : if (ConfigReloadPending)
3248 : : {
3249 : 0 : ConfigReloadPending = false;
3250 : 0 : ProcessConfigFile(PGC_SIGHUP);
3251 : 0 : }
3252 : :
3253 : : /* Exit if done waiting for every slot. */
3254 [ # # ]: 0 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3255 : 0 : break;
3256 : :
3257 : : /*
3258 : : * Wait for the slots in the synchronized_standby_slots to catch up,
3259 : : * but use a timeout (1s) so we can also check if the
3260 : : * synchronized_standby_slots has been changed.
3261 : : */
3262 : 0 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3263 : : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3264 : : }
3265 : :
3266 : 0 : ConditionVariableCancelSleep();
3267 : 0 : }
|