Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slotfuncs.c
4 : : * Support functions for replication slots
5 : : *
6 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/slotfuncs.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/htup_details.h"
16 : : #include "access/xlog_internal.h"
17 : : #include "access/xlogrecovery.h"
18 : : #include "access/xlogutils.h"
19 : : #include "funcapi.h"
20 : : #include "replication/logical.h"
21 : : #include "replication/slot.h"
22 : : #include "replication/slotsync.h"
23 : : #include "utils/builtins.h"
24 : : #include "utils/guc.h"
25 : : #include "utils/pg_lsn.h"
26 : :
27 : : /*
28 : : * Map SlotSyncSkipReason enum values to human-readable names.
29 : : */
30 : : static const char *SlotSyncSkipReasonNames[] = {
31 : : [SS_SKIP_NONE] = "none",
32 : : [SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
33 : : [SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
34 : : [SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
35 : : [SS_SKIP_INVALID] = "slot_invalidated"
36 : : };
37 : :
38 : : /*
39 : : * Helper function for creating a new physical replication slot with
40 : : * given arguments. Note that this function doesn't release the created
41 : : * slot.
42 : : *
43 : : * If restart_lsn is a valid value, we use it without WAL reservation
44 : : * routine. So the caller must guarantee that WAL is available.
45 : : */
46 : : static void
47 : 0 : create_physical_replication_slot(char *name, bool immediately_reserve,
48 : : bool temporary, XLogRecPtr restart_lsn)
49 : : {
50 [ # # ]: 0 : Assert(!MyReplicationSlot);
51 : :
52 : : /* acquire replication slot, this will check for conflicting names */
53 : 0 : ReplicationSlotCreate(name, false,
54 : 0 : temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
55 : : false, false);
56 : :
57 [ # # ]: 0 : if (immediately_reserve)
58 : : {
59 : : /* Reserve WAL as the user asked for it */
60 [ # # ]: 0 : if (!XLogRecPtrIsValid(restart_lsn))
61 : 0 : ReplicationSlotReserveWal();
62 : : else
63 : 0 : MyReplicationSlot->data.restart_lsn = restart_lsn;
64 : :
65 : : /* Write this slot to disk */
66 : 0 : ReplicationSlotMarkDirty();
67 : 0 : ReplicationSlotSave();
68 : 0 : }
69 : 0 : }
70 : :
71 : : /*
72 : : * SQL function for creating a new physical (streaming replication)
73 : : * replication slot.
74 : : */
75 : : Datum
76 : 0 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
77 : : {
78 : 0 : Name name = PG_GETARG_NAME(0);
79 : 0 : bool immediately_reserve = PG_GETARG_BOOL(1);
80 : 0 : bool temporary = PG_GETARG_BOOL(2);
81 : 0 : Datum values[2];
82 : 0 : bool nulls[2];
83 : 0 : TupleDesc tupdesc;
84 : 0 : HeapTuple tuple;
85 : 0 : Datum result;
86 : :
87 [ # # ]: 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
88 [ # # # # ]: 0 : elog(ERROR, "return type must be a row type");
89 : :
90 : 0 : CheckSlotPermissions();
91 : :
92 : 0 : CheckSlotRequirements();
93 : :
94 : 0 : create_physical_replication_slot(NameStr(*name),
95 : 0 : immediately_reserve,
96 : 0 : temporary,
97 : : InvalidXLogRecPtr);
98 : :
99 : 0 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
100 : 0 : nulls[0] = false;
101 : :
102 [ # # ]: 0 : if (immediately_reserve)
103 : : {
104 : 0 : values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
105 : 0 : nulls[1] = false;
106 : 0 : }
107 : : else
108 : 0 : nulls[1] = true;
109 : :
110 : 0 : tuple = heap_form_tuple(tupdesc, values, nulls);
111 : 0 : result = HeapTupleGetDatum(tuple);
112 : :
113 : 0 : ReplicationSlotRelease();
114 : :
115 : 0 : PG_RETURN_DATUM(result);
116 : 0 : }
117 : :
118 : :
119 : : /*
120 : : * Helper function for creating a new logical replication slot with
121 : : * given arguments. Note that this function doesn't release the created
122 : : * slot.
123 : : *
124 : : * When find_startpoint is false, the slot's confirmed_flush is not set; it's
125 : : * caller's responsibility to ensure it's set to something sensible.
126 : : */
127 : : static void
128 : 0 : create_logical_replication_slot(char *name, char *plugin,
129 : : bool temporary, bool two_phase,
130 : : bool failover,
131 : : XLogRecPtr restart_lsn,
132 : : bool find_startpoint)
133 : : {
134 : 0 : LogicalDecodingContext *ctx = NULL;
135 : :
136 [ # # ]: 0 : Assert(!MyReplicationSlot);
137 : :
138 : : /*
139 : : * Acquire a logical decoding slot, this will check for conflicting names.
140 : : * Initially create persistent slot as ephemeral - that allows us to
141 : : * nicely handle errors during initialization because it'll get dropped if
142 : : * this transaction fails. We'll make it persistent at the end. Temporary
143 : : * slots can be created as temporary from beginning as they get dropped on
144 : : * error as well.
145 : : */
146 : 0 : ReplicationSlotCreate(name, true,
147 : 0 : temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
148 : 0 : failover, false);
149 : :
150 : : /*
151 : : * Ensure the logical decoding is enabled before initializing the logical
152 : : * decoding context.
153 : : */
154 : 0 : EnsureLogicalDecodingEnabled();
155 [ # # ]: 0 : Assert(IsLogicalDecodingEnabled());
156 : :
157 : : /*
158 : : * Create logical decoding context to find start point or, if we don't
159 : : * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
160 : : *
161 : : * Note: when !find_startpoint this is still important, because it's at
162 : : * this point that the output plugin is validated.
163 : : */
164 : 0 : ctx = CreateInitDecodingContext(plugin, NIL,
165 : : false, /* just catalogs is OK */
166 : 0 : restart_lsn,
167 : 0 : XL_ROUTINE(.page_read = read_local_xlog_page,
168 : : .segment_open = wal_segment_open,
169 : : .segment_close = wal_segment_close),
170 : : NULL, NULL, NULL);
171 : :
172 : : /*
173 : : * If caller needs us to determine the decoding start point, do so now.
174 : : * This might take a while.
175 : : */
176 [ # # ]: 0 : if (find_startpoint)
177 : 0 : DecodingContextFindStartpoint(ctx);
178 : :
179 : : /* don't need the decoding context anymore */
180 : 0 : FreeDecodingContext(ctx);
181 : 0 : }
182 : :
183 : : /*
184 : : * SQL function for creating a new logical replication slot.
185 : : */
186 : : Datum
187 : 0 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
188 : : {
189 : 0 : Name name = PG_GETARG_NAME(0);
190 : 0 : Name plugin = PG_GETARG_NAME(1);
191 : 0 : bool temporary = PG_GETARG_BOOL(2);
192 : 0 : bool two_phase = PG_GETARG_BOOL(3);
193 : 0 : bool failover = PG_GETARG_BOOL(4);
194 : 0 : Datum result;
195 : 0 : TupleDesc tupdesc;
196 : 0 : HeapTuple tuple;
197 : 0 : Datum values[2];
198 : 0 : bool nulls[2];
199 : :
200 [ # # ]: 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
201 [ # # # # ]: 0 : elog(ERROR, "return type must be a row type");
202 : :
203 : 0 : CheckSlotPermissions();
204 : :
205 : 0 : CheckLogicalDecodingRequirements();
206 : :
207 : 0 : create_logical_replication_slot(NameStr(*name),
208 : 0 : NameStr(*plugin),
209 : 0 : temporary,
210 : 0 : two_phase,
211 : 0 : failover,
212 : : InvalidXLogRecPtr,
213 : : true);
214 : :
215 : 0 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
216 : 0 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
217 : :
218 : 0 : memset(nulls, 0, sizeof(nulls));
219 : :
220 : 0 : tuple = heap_form_tuple(tupdesc, values, nulls);
221 : 0 : result = HeapTupleGetDatum(tuple);
222 : :
223 : : /* ok, slot is now fully created, mark it as persistent if needed */
224 [ # # ]: 0 : if (!temporary)
225 : 0 : ReplicationSlotPersist();
226 : 0 : ReplicationSlotRelease();
227 : :
228 : 0 : PG_RETURN_DATUM(result);
229 : 0 : }
230 : :
231 : :
232 : : /*
233 : : * SQL function for dropping a replication slot.
234 : : */
235 : : Datum
236 : 0 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
237 : : {
238 : 0 : Name name = PG_GETARG_NAME(0);
239 : :
240 : 0 : CheckSlotPermissions();
241 : :
242 : 0 : CheckSlotRequirements();
243 : :
244 : 0 : ReplicationSlotDrop(NameStr(*name), true);
245 : :
246 : 0 : PG_RETURN_VOID();
247 : 0 : }
248 : :
249 : : /*
250 : : * pg_get_replication_slots - SQL SRF showing all replication slots
251 : : * that currently exist on the database cluster.
252 : : */
253 : : Datum
254 : 0 : pg_get_replication_slots(PG_FUNCTION_ARGS)
255 : : {
256 : : #define PG_GET_REPLICATION_SLOTS_COLS 21
257 : 0 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
258 : 0 : XLogRecPtr currlsn;
259 : 0 : int slotno;
260 : :
261 : : /*
262 : : * We don't require any special permission to see this function's data
263 : : * because nothing should be sensitive. The most critical being the slot
264 : : * name, which shouldn't contain anything particularly sensitive.
265 : : */
266 : :
267 : 0 : InitMaterializedSRF(fcinfo, 0);
268 : :
269 : 0 : currlsn = GetXLogWriteRecPtr();
270 : :
271 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
272 [ # # ]: 0 : for (slotno = 0; slotno < max_replication_slots; slotno++)
273 : : {
274 : 0 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
275 : 0 : ReplicationSlot slot_contents;
276 : 0 : Datum values[PG_GET_REPLICATION_SLOTS_COLS];
277 : 0 : bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
278 : 0 : WALAvailability walstate;
279 : 0 : int i;
280 : 0 : ReplicationSlotInvalidationCause cause;
281 : :
282 [ # # ]: 0 : if (!slot->in_use)
283 : 0 : continue;
284 : :
285 : : /* Copy slot contents while holding spinlock, then examine at leisure */
286 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
287 : 0 : slot_contents = *slot;
288 : 0 : SpinLockRelease(&slot->mutex);
289 : :
290 : 0 : memset(values, 0, sizeof(values));
291 : 0 : memset(nulls, 0, sizeof(nulls));
292 : :
293 : 0 : i = 0;
294 : 0 : values[i++] = NameGetDatum(&slot_contents.data.name);
295 : :
296 [ # # ]: 0 : if (slot_contents.data.database == InvalidOid)
297 : 0 : nulls[i++] = true;
298 : : else
299 : 0 : values[i++] = NameGetDatum(&slot_contents.data.plugin);
300 : :
301 [ # # ]: 0 : if (slot_contents.data.database == InvalidOid)
302 : 0 : values[i++] = CStringGetTextDatum("physical");
303 : : else
304 : 0 : values[i++] = CStringGetTextDatum("logical");
305 : :
306 [ # # ]: 0 : if (slot_contents.data.database == InvalidOid)
307 : 0 : nulls[i++] = true;
308 : : else
309 : 0 : values[i++] = ObjectIdGetDatum(slot_contents.data.database);
310 : :
311 : 0 : values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
312 : 0 : values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
313 : :
314 [ # # ]: 0 : if (slot_contents.active_pid != 0)
315 : 0 : values[i++] = Int32GetDatum(slot_contents.active_pid);
316 : : else
317 : 0 : nulls[i++] = true;
318 : :
319 [ # # ]: 0 : if (slot_contents.data.xmin != InvalidTransactionId)
320 : 0 : values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
321 : : else
322 : 0 : nulls[i++] = true;
323 : :
324 [ # # ]: 0 : if (slot_contents.data.catalog_xmin != InvalidTransactionId)
325 : 0 : values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
326 : : else
327 : 0 : nulls[i++] = true;
328 : :
329 [ # # ]: 0 : if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
330 : 0 : values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
331 : : else
332 : 0 : nulls[i++] = true;
333 : :
334 [ # # ]: 0 : if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush))
335 : 0 : values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
336 : : else
337 : 0 : nulls[i++] = true;
338 : :
339 : : /*
340 : : * If the slot has not been invalidated, test availability from
341 : : * restart_lsn.
342 : : */
343 [ # # ]: 0 : if (slot_contents.data.invalidated != RS_INVAL_NONE)
344 : 0 : walstate = WALAVAIL_REMOVED;
345 : : else
346 : 0 : walstate = GetWALAvailability(slot_contents.data.restart_lsn);
347 : :
348 [ # # # # : 0 : switch (walstate)
# # ]
349 : : {
350 : : case WALAVAIL_INVALID_LSN:
351 : 0 : nulls[i++] = true;
352 : 0 : break;
353 : :
354 : : case WALAVAIL_RESERVED:
355 : 0 : values[i++] = CStringGetTextDatum("reserved");
356 : 0 : break;
357 : :
358 : : case WALAVAIL_EXTENDED:
359 : 0 : values[i++] = CStringGetTextDatum("extended");
360 : 0 : break;
361 : :
362 : : case WALAVAIL_UNRESERVED:
363 : 0 : values[i++] = CStringGetTextDatum("unreserved");
364 : 0 : break;
365 : :
366 : : case WALAVAIL_REMOVED:
367 : :
368 : : /*
369 : : * If we read the restart_lsn long enough ago, maybe that file
370 : : * has been removed by now. However, the walsender could have
371 : : * moved forward enough that it jumped to another file after
372 : : * we looked. If checkpointer signalled the process to
373 : : * termination, then it's definitely lost; but if a process is
374 : : * still alive, then "unreserved" seems more appropriate.
375 : : *
376 : : * If we do change it, save the state for safe_wal_size below.
377 : : */
378 [ # # ]: 0 : if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
379 : : {
380 : 0 : int pid;
381 : :
382 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
383 : 0 : pid = slot->active_pid;
384 : 0 : slot_contents.data.restart_lsn = slot->data.restart_lsn;
385 : 0 : SpinLockRelease(&slot->mutex);
386 [ # # ]: 0 : if (pid != 0)
387 : : {
388 : 0 : values[i++] = CStringGetTextDatum("unreserved");
389 : 0 : walstate = WALAVAIL_UNRESERVED;
390 : 0 : break;
391 : : }
392 [ # # ]: 0 : }
393 : 0 : values[i++] = CStringGetTextDatum("lost");
394 : 0 : break;
395 : : }
396 : :
397 : : /*
398 : : * safe_wal_size is only computed for slots that have not been lost,
399 : : * and only if there's a configured maximum size.
400 : : */
401 [ # # # # ]: 0 : if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
402 : 0 : nulls[i++] = true;
403 : : else
404 : : {
405 : 0 : XLogSegNo targetSeg;
406 : 0 : uint64 slotKeepSegs;
407 : 0 : uint64 keepSegs;
408 : 0 : XLogSegNo failSeg;
409 : 0 : XLogRecPtr failLSN;
410 : :
411 : 0 : XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
412 : :
413 : : /* determine how many segments can be kept by slots */
414 : 0 : slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
415 : : /* ditto for wal_keep_size */
416 : 0 : keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
417 : :
418 : : /* if currpos reaches failLSN, we lose our segment */
419 [ # # ]: 0 : failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
420 : 0 : XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
421 : :
422 : 0 : values[i++] = Int64GetDatum(failLSN - currlsn);
423 : 0 : }
424 : :
425 : 0 : values[i++] = BoolGetDatum(slot_contents.data.two_phase);
426 : :
427 [ # # # # ]: 0 : if (slot_contents.data.two_phase &&
428 : 0 : XLogRecPtrIsValid(slot_contents.data.two_phase_at))
429 : 0 : values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
430 : : else
431 : 0 : nulls[i++] = true;
432 : :
433 [ # # ]: 0 : if (slot_contents.inactive_since > 0)
434 : 0 : values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
435 : : else
436 : 0 : nulls[i++] = true;
437 : :
438 : 0 : cause = slot_contents.data.invalidated;
439 : :
440 [ # # ]: 0 : if (SlotIsPhysical(&slot_contents))
441 : 0 : nulls[i++] = true;
442 : : else
443 : : {
444 : : /*
445 : : * rows_removed and wal_level_insufficient are the only two
446 : : * reasons for the logical slot's conflict with recovery.
447 : : */
448 [ # # # # ]: 0 : if (cause == RS_INVAL_HORIZON ||
449 : 0 : cause == RS_INVAL_WAL_LEVEL)
450 : 0 : values[i++] = BoolGetDatum(true);
451 : : else
452 : 0 : values[i++] = BoolGetDatum(false);
453 : : }
454 : :
455 [ # # ]: 0 : if (cause == RS_INVAL_NONE)
456 : 0 : nulls[i++] = true;
457 : : else
458 : 0 : values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause));
459 : :
460 : 0 : values[i++] = BoolGetDatum(slot_contents.data.failover);
461 : :
462 : 0 : values[i++] = BoolGetDatum(slot_contents.data.synced);
463 : :
464 [ # # ]: 0 : if (slot_contents.slotsync_skip_reason == SS_SKIP_NONE)
465 : 0 : nulls[i++] = true;
466 : : else
467 : 0 : values[i++] = CStringGetTextDatum(SlotSyncSkipReasonNames[slot_contents.slotsync_skip_reason]);
468 : :
469 [ # # ]: 0 : Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
470 : :
471 : 0 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
472 : 0 : values, nulls);
473 [ # # ]: 0 : }
474 : :
475 : 0 : LWLockRelease(ReplicationSlotControlLock);
476 : :
477 : 0 : return (Datum) 0;
478 : 0 : }
479 : :
480 : : /*
481 : : * Helper function for advancing our physical replication slot forward.
482 : : *
483 : : * The LSN position to move to is compared simply to the slot's restart_lsn,
484 : : * knowing that any position older than that would be removed by successive
485 : : * checkpoints.
486 : : */
487 : : static XLogRecPtr
488 : 0 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
489 : : {
490 : 0 : XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
491 : 0 : XLogRecPtr retlsn = startlsn;
492 : :
493 [ # # ]: 0 : Assert(XLogRecPtrIsValid(moveto));
494 : :
495 [ # # ]: 0 : if (startlsn < moveto)
496 : : {
497 [ # # ]: 0 : SpinLockAcquire(&MyReplicationSlot->mutex);
498 : 0 : MyReplicationSlot->data.restart_lsn = moveto;
499 : 0 : SpinLockRelease(&MyReplicationSlot->mutex);
500 : 0 : retlsn = moveto;
501 : :
502 : : /*
503 : : * Dirty the slot so as it is written out at the next checkpoint. Note
504 : : * that the LSN position advanced may still be lost in the event of a
505 : : * crash, but this makes the data consistent after a clean shutdown.
506 : : */
507 : 0 : ReplicationSlotMarkDirty();
508 : :
509 : : /*
510 : : * Wake up logical walsenders holding logical failover slots after
511 : : * updating the restart_lsn of the physical slot.
512 : : */
513 : 0 : PhysicalWakeupLogicalWalSnd();
514 : 0 : }
515 : :
516 : 0 : return retlsn;
517 : 0 : }
518 : :
519 : : /*
520 : : * Advance our logical replication slot forward. See
521 : : * LogicalSlotAdvanceAndCheckSnapState for details.
522 : : */
523 : : static XLogRecPtr
524 : 0 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
525 : : {
526 : 0 : return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
527 : : }
528 : :
529 : : /*
530 : : * SQL function for moving the position in a replication slot.
531 : : */
532 : : Datum
533 : 0 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
534 : : {
535 : 0 : Name slotname = PG_GETARG_NAME(0);
536 : 0 : XLogRecPtr moveto = PG_GETARG_LSN(1);
537 : 0 : XLogRecPtr endlsn;
538 : 0 : XLogRecPtr minlsn;
539 : 0 : TupleDesc tupdesc;
540 : 0 : Datum values[2];
541 : 0 : bool nulls[2];
542 : 0 : HeapTuple tuple;
543 : 0 : Datum result;
544 : :
545 [ # # ]: 0 : Assert(!MyReplicationSlot);
546 : :
547 : 0 : CheckSlotPermissions();
548 : :
549 [ # # ]: 0 : if (!XLogRecPtrIsValid(moveto))
550 [ # # # # ]: 0 : ereport(ERROR,
551 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
552 : : errmsg("invalid target WAL LSN")));
553 : :
554 : : /* Build a tuple descriptor for our result type */
555 [ # # ]: 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
556 [ # # # # ]: 0 : elog(ERROR, "return type must be a row type");
557 : :
558 : : /*
559 : : * We can't move slot past what's been flushed/replayed so clamp the
560 : : * target position accordingly.
561 : : */
562 [ # # ]: 0 : if (!RecoveryInProgress())
563 [ # # ]: 0 : moveto = Min(moveto, GetFlushRecPtr(NULL));
564 : : else
565 [ # # ]: 0 : moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
566 : :
567 : : /* Acquire the slot so we "own" it */
568 : 0 : ReplicationSlotAcquire(NameStr(*slotname), true, true);
569 : :
570 : : /* A slot whose restart_lsn has never been reserved cannot be advanced */
571 [ # # ]: 0 : if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
572 [ # # # # ]: 0 : ereport(ERROR,
573 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
574 : : errmsg("replication slot \"%s\" cannot be advanced",
575 : : NameStr(*slotname)),
576 : : errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
577 : :
578 : : /*
579 : : * Check if the slot is not moving backwards. Physical slots rely simply
580 : : * on restart_lsn as a minimum point, while logical slots have confirmed
581 : : * consumption up to confirmed_flush, meaning that in both cases data
582 : : * older than that is not available anymore.
583 : : */
584 [ # # ]: 0 : if (OidIsValid(MyReplicationSlot->data.database))
585 : 0 : minlsn = MyReplicationSlot->data.confirmed_flush;
586 : : else
587 : 0 : minlsn = MyReplicationSlot->data.restart_lsn;
588 : :
589 [ # # ]: 0 : if (moveto < minlsn)
590 [ # # # # ]: 0 : ereport(ERROR,
591 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
592 : : errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
593 : : LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
594 : :
595 : : /* Do the actual slot update, depending on the slot type */
596 [ # # ]: 0 : if (OidIsValid(MyReplicationSlot->data.database))
597 : 0 : endlsn = pg_logical_replication_slot_advance(moveto);
598 : : else
599 : 0 : endlsn = pg_physical_replication_slot_advance(moveto);
600 : :
601 : 0 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
602 : 0 : nulls[0] = false;
603 : :
604 : : /*
605 : : * Recompute the minimum LSN and xmin across all slots to adjust with the
606 : : * advancing potentially done.
607 : : */
608 : 0 : ReplicationSlotsComputeRequiredXmin(false);
609 : 0 : ReplicationSlotsComputeRequiredLSN();
610 : :
611 : 0 : ReplicationSlotRelease();
612 : :
613 : : /* Return the reached position. */
614 : 0 : values[1] = LSNGetDatum(endlsn);
615 : 0 : nulls[1] = false;
616 : :
617 : 0 : tuple = heap_form_tuple(tupdesc, values, nulls);
618 : 0 : result = HeapTupleGetDatum(tuple);
619 : :
620 : 0 : PG_RETURN_DATUM(result);
621 : 0 : }
622 : :
623 : : /*
624 : : * Helper function of copying a replication slot.
625 : : */
626 : : static Datum
627 : 0 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
628 : : {
629 : 0 : Name src_name = PG_GETARG_NAME(0);
630 : 0 : Name dst_name = PG_GETARG_NAME(1);
631 : 0 : ReplicationSlot *src = NULL;
632 : 0 : ReplicationSlot first_slot_contents;
633 : 0 : ReplicationSlot second_slot_contents;
634 : 0 : XLogRecPtr src_restart_lsn;
635 : 0 : bool src_islogical;
636 : 0 : bool temporary;
637 : 0 : char *plugin;
638 : 0 : Datum values[2];
639 : 0 : bool nulls[2];
640 : 0 : Datum result;
641 : 0 : TupleDesc tupdesc;
642 : 0 : HeapTuple tuple;
643 : :
644 [ # # ]: 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
645 [ # # # # ]: 0 : elog(ERROR, "return type must be a row type");
646 : :
647 : 0 : CheckSlotPermissions();
648 : :
649 [ # # ]: 0 : if (logical_slot)
650 : 0 : CheckLogicalDecodingRequirements();
651 : : else
652 : 0 : CheckSlotRequirements();
653 : :
654 : 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
655 : :
656 : : /*
657 : : * We need to prevent the source slot's reserved WAL from being removed,
658 : : * but we don't want to lock that slot for very long, and it can advance
659 : : * in the meantime. So obtain the source slot's data, and create a new
660 : : * slot using its restart_lsn. Afterwards we lock the source slot again
661 : : * and verify that the data we copied (name, type) has not changed
662 : : * incompatibly. No inconvenient WAL removal can occur once the new slot
663 : : * is created -- but since WAL removal could have occurred before we
664 : : * managed to create the new slot, we advance the new slot's restart_lsn
665 : : * to the source slot's updated restart_lsn the second time we lock it.
666 : : */
667 [ # # ]: 0 : for (int i = 0; i < max_replication_slots; i++)
668 : : {
669 : 0 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
670 : :
671 [ # # # # ]: 0 : if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
672 : : {
673 : : /* Copy the slot contents while holding spinlock */
674 [ # # ]: 0 : SpinLockAcquire(&s->mutex);
675 : 0 : first_slot_contents = *s;
676 : 0 : SpinLockRelease(&s->mutex);
677 : 0 : src = s;
678 : 0 : break;
679 : : }
680 [ # # ]: 0 : }
681 : :
682 : 0 : LWLockRelease(ReplicationSlotControlLock);
683 : :
684 [ # # ]: 0 : if (src == NULL)
685 [ # # # # ]: 0 : ereport(ERROR,
686 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
687 : : errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
688 : :
689 : 0 : src_islogical = SlotIsLogical(&first_slot_contents);
690 : 0 : src_restart_lsn = first_slot_contents.data.restart_lsn;
691 : 0 : temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
692 [ # # ]: 0 : plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
693 : :
694 : : /* Check type of replication slot */
695 [ # # ]: 0 : if (src_islogical != logical_slot)
696 [ # # # # : 0 : ereport(ERROR,
# # ]
697 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
698 : : src_islogical ?
699 : : errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
700 : : NameStr(*src_name)) :
701 : : errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
702 : : NameStr(*src_name))));
703 : :
704 : : /* Copying non-reserved slot doesn't make sense */
705 [ # # ]: 0 : if (!XLogRecPtrIsValid(src_restart_lsn))
706 [ # # # # ]: 0 : ereport(ERROR,
707 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
708 : : errmsg("cannot copy a replication slot that doesn't reserve WAL")));
709 : :
710 : : /* Cannot copy an invalidated replication slot */
711 [ # # ]: 0 : if (first_slot_contents.data.invalidated != RS_INVAL_NONE)
712 [ # # # # ]: 0 : ereport(ERROR,
713 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
714 : : errmsg("cannot copy invalidated replication slot \"%s\"",
715 : : NameStr(*src_name)));
716 : :
717 : : /* Overwrite params from optional arguments */
718 [ # # ]: 0 : if (PG_NARGS() >= 3)
719 : 0 : temporary = PG_GETARG_BOOL(2);
720 [ # # ]: 0 : if (PG_NARGS() >= 4)
721 : : {
722 [ # # ]: 0 : Assert(logical_slot);
723 : 0 : plugin = NameStr(*(PG_GETARG_NAME(3)));
724 : 0 : }
725 : :
726 : : /* Create new slot and acquire it */
727 [ # # ]: 0 : if (logical_slot)
728 : : {
729 : : /*
730 : : * We must not try to read WAL, since we haven't reserved it yet --
731 : : * hence pass find_startpoint false. confirmed_flush will be set
732 : : * below, by copying from the source slot.
733 : : *
734 : : * We don't copy the failover option to prevent potential issues with
735 : : * slot synchronization. For instance, if a slot was synchronized to
736 : : * the standby, then dropped on the primary, and immediately recreated
737 : : * by copying from another existing slot with much earlier restart_lsn
738 : : * and confirmed_flush_lsn, the slot synchronization would only
739 : : * observe the LSN of the same slot moving backward. As slot
740 : : * synchronization does not copy the restart_lsn and
741 : : * confirmed_flush_lsn backward (see update_local_synced_slot() for
742 : : * details), if a failover happens before the primary's slot catches
743 : : * up, logical replication cannot continue using the synchronized slot
744 : : * on the promoted standby because the slot retains the restart_lsn
745 : : * and confirmed_flush_lsn that are much later than expected.
746 : : */
747 : 0 : create_logical_replication_slot(NameStr(*dst_name),
748 : 0 : plugin,
749 : 0 : temporary,
750 : : false,
751 : : false,
752 : 0 : src_restart_lsn,
753 : : false);
754 : 0 : }
755 : : else
756 : 0 : create_physical_replication_slot(NameStr(*dst_name),
757 : : true,
758 : 0 : temporary,
759 : 0 : src_restart_lsn);
760 : :
761 : : /*
762 : : * Update the destination slot to current values of the source slot;
763 : : * recheck that the source slot is still the one we saw previously.
764 : : */
765 : : {
766 : 0 : TransactionId copy_effective_xmin;
767 : 0 : TransactionId copy_effective_catalog_xmin;
768 : 0 : TransactionId copy_xmin;
769 : 0 : TransactionId copy_catalog_xmin;
770 : 0 : XLogRecPtr copy_restart_lsn;
771 : 0 : XLogRecPtr copy_confirmed_flush;
772 : 0 : bool copy_islogical;
773 : 0 : char *copy_name;
774 : :
775 : : /* Copy data of source slot again */
776 [ # # ]: 0 : SpinLockAcquire(&src->mutex);
777 : 0 : second_slot_contents = *src;
778 : 0 : SpinLockRelease(&src->mutex);
779 : :
780 : 0 : copy_effective_xmin = second_slot_contents.effective_xmin;
781 : 0 : copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
782 : :
783 : 0 : copy_xmin = second_slot_contents.data.xmin;
784 : 0 : copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
785 : 0 : copy_restart_lsn = second_slot_contents.data.restart_lsn;
786 : 0 : copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
787 : :
788 : : /* for existence check */
789 : 0 : copy_name = NameStr(second_slot_contents.data.name);
790 : 0 : copy_islogical = SlotIsLogical(&second_slot_contents);
791 : :
792 : : /*
793 : : * Check if the source slot still exists and is valid. We regard it as
794 : : * invalid if the type of replication slot or name has been changed,
795 : : * or the restart_lsn either is invalid or has gone backward. (The
796 : : * restart_lsn could go backwards if the source slot is dropped and
797 : : * copied from an older slot during installation.)
798 : : *
799 : : * Since erroring out will release and drop the destination slot we
800 : : * don't need to release it here.
801 : : */
802 [ # # ]: 0 : if (copy_restart_lsn < src_restart_lsn ||
803 : 0 : src_islogical != copy_islogical ||
804 : 0 : strcmp(copy_name, NameStr(*src_name)) != 0)
805 [ # # # # ]: 0 : ereport(ERROR,
806 : : (errmsg("could not copy replication slot \"%s\"",
807 : : NameStr(*src_name)),
808 : : errdetail("The source replication slot was modified incompatibly during the copy operation.")));
809 : :
810 : : /* The source slot must have a consistent snapshot */
811 [ # # # # ]: 0 : if (src_islogical && !XLogRecPtrIsValid(copy_confirmed_flush))
812 [ # # # # ]: 0 : ereport(ERROR,
813 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
814 : : errmsg("cannot copy unfinished logical replication slot \"%s\"",
815 : : NameStr(*src_name)),
816 : : errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
817 : :
818 : : /*
819 : : * Copying an invalid slot doesn't make sense. Note that the source
820 : : * slot can become invalid after we create the new slot and copy the
821 : : * data of source slot. This is possible because the operations in
822 : : * InvalidateObsoleteReplicationSlots() are not serialized with this
823 : : * function. Even though we can't detect such a case here, the copied
824 : : * slot will become invalid in the next checkpoint cycle.
825 : : */
826 [ # # ]: 0 : if (second_slot_contents.data.invalidated != RS_INVAL_NONE)
827 [ # # # # ]: 0 : ereport(ERROR,
828 : : errmsg("cannot copy replication slot \"%s\"",
829 : : NameStr(*src_name)),
830 : : errdetail("The source replication slot was invalidated during the copy operation."));
831 : :
832 : : /* Install copied values again */
833 [ # # ]: 0 : SpinLockAcquire(&MyReplicationSlot->mutex);
834 : 0 : MyReplicationSlot->effective_xmin = copy_effective_xmin;
835 : 0 : MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
836 : :
837 : 0 : MyReplicationSlot->data.xmin = copy_xmin;
838 : 0 : MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
839 : 0 : MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
840 : 0 : MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
841 : 0 : SpinLockRelease(&MyReplicationSlot->mutex);
842 : :
843 : 0 : ReplicationSlotMarkDirty();
844 : 0 : ReplicationSlotsComputeRequiredXmin(false);
845 : 0 : ReplicationSlotsComputeRequiredLSN();
846 : 0 : ReplicationSlotSave();
847 : :
848 : : #ifdef USE_ASSERT_CHECKING
849 : : /* Check that the restart_lsn is available */
850 : : {
851 : 0 : XLogSegNo segno;
852 : :
853 : 0 : XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
854 [ # # ]: 0 : Assert(XLogGetLastRemovedSegno() < segno);
855 : 0 : }
856 : : #endif
857 : 0 : }
858 : :
859 : : /* target slot fully created, mark as persistent if needed */
860 [ # # # # ]: 0 : if (logical_slot && !temporary)
861 : 0 : ReplicationSlotPersist();
862 : :
863 : : /* All done. Set up the return values */
864 : 0 : values[0] = NameGetDatum(dst_name);
865 : 0 : nulls[0] = false;
866 [ # # ]: 0 : if (XLogRecPtrIsValid(MyReplicationSlot->data.confirmed_flush))
867 : : {
868 : 0 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
869 : 0 : nulls[1] = false;
870 : 0 : }
871 : : else
872 : 0 : nulls[1] = true;
873 : :
874 : 0 : tuple = heap_form_tuple(tupdesc, values, nulls);
875 : 0 : result = HeapTupleGetDatum(tuple);
876 : :
877 : 0 : ReplicationSlotRelease();
878 : :
879 : 0 : PG_RETURN_DATUM(result);
880 : 0 : }
881 : :
882 : : /* The wrappers below are all to appease opr_sanity */
883 : : Datum
884 : 0 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
885 : : {
886 : 0 : return copy_replication_slot(fcinfo, true);
887 : : }
888 : :
889 : : Datum
890 : 0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
891 : : {
892 : 0 : return copy_replication_slot(fcinfo, true);
893 : : }
894 : :
895 : : Datum
896 : 0 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
897 : : {
898 : 0 : return copy_replication_slot(fcinfo, true);
899 : : }
900 : :
901 : : Datum
902 : 0 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
903 : : {
904 : 0 : return copy_replication_slot(fcinfo, false);
905 : : }
906 : :
907 : : Datum
908 : 0 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
909 : : {
910 : 0 : return copy_replication_slot(fcinfo, false);
911 : : }
912 : :
913 : : /*
914 : : * Synchronize failover enabled replication slots to a standby server
915 : : * from the primary server.
916 : : */
917 : : Datum
918 : 0 : pg_sync_replication_slots(PG_FUNCTION_ARGS)
919 : : {
920 : 0 : WalReceiverConn *wrconn;
921 : 0 : char *err;
922 : 0 : StringInfoData app_name;
923 : :
924 : 0 : CheckSlotPermissions();
925 : :
926 [ # # ]: 0 : if (!RecoveryInProgress())
927 [ # # # # ]: 0 : ereport(ERROR,
928 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
929 : : errmsg("replication slots can only be synchronized to a standby server"));
930 : :
931 : 0 : ValidateSlotSyncParams(ERROR);
932 : :
933 : : /* Load the libpq-specific functions */
934 : 0 : load_file("libpqwalreceiver", false);
935 : :
936 : 0 : (void) CheckAndGetDbnameFromConninfo();
937 : :
938 : 0 : initStringInfo(&app_name);
939 [ # # ]: 0 : if (cluster_name[0])
940 : 0 : appendStringInfo(&app_name, "%s_slotsync", cluster_name);
941 : : else
942 : 0 : appendStringInfoString(&app_name, "slotsync");
943 : :
944 : : /* Connect to the primary server. */
945 : 0 : wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
946 : : app_name.data, &err);
947 : :
948 [ # # ]: 0 : if (!wrconn)
949 [ # # # # ]: 0 : ereport(ERROR,
950 : : errcode(ERRCODE_CONNECTION_FAILURE),
951 : : errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
952 : : app_name.data, err));
953 : :
954 : 0 : pfree(app_name.data);
955 : :
956 : 0 : SyncReplicationSlots(wrconn);
957 : :
958 : 0 : walrcv_disconnect(wrconn);
959 : :
960 : 0 : PG_RETURN_VOID();
961 : 0 : }
|