Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * origin.c
4 : : * Logical replication progress tracking support.
5 : : *
6 : : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/origin.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * This file provides the following:
14 : : * * An infrastructure to name nodes in a replication setup
15 : : * * A facility to efficiently store and persist replication progress in an
16 : : * efficient and durable manner.
17 : : *
18 : : * Replication origin consists of a descriptive, user defined, external
19 : : * name and a short, thus space efficient, internal 2 byte one. This split
20 : : * exists because replication origin have to be stored in WAL and shared
21 : : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : : * for the internal id of a replication origin as it seems unlikely that there
23 : : * soon will be more than 65k nodes in one replication setup; and using only
24 : : * two bytes allow us to be more space efficient.
25 : : *
26 : : * Replication progress is tracked in a shared memory table
27 : : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : : * ('slots') in this table are identified by the internal id. That's the case
29 : : * because it allows to increase replication progress during crash
30 : : * recovery. To allow doing so we store the original LSN (from the originating
31 : : * system) of a transaction in the commit record. That allows to recover the
32 : : * precise replayed state after crash recovery; without requiring synchronous
33 : : * commits. Allowing logical replication to use asynchronous commit is
34 : : * generally good for performance, but especially important as it allows a
35 : : * single threaded replay process to keep up with a source that has multiple
36 : : * backends generating changes concurrently. For efficiency and simplicity
37 : : * reasons a backend can setup one replication origin that's from then used as
38 : : * the source of changes produced by the backend, until reset again.
39 : : *
40 : : * This infrastructure is intended to be used in cooperation with logical
41 : : * decoding. When replaying from a remote system the configured origin is
42 : : * provided to output plugins, allowing prevention of replication loops and
43 : : * other filtering.
44 : : *
45 : : * There are several levels of locking at work:
46 : : *
47 : : * * To create and drop replication origins an exclusive lock on
48 : : * pg_replication_slot is required for the duration. That allows us to
49 : : * safely and conflict free assign new origins using a dirty snapshot.
50 : : *
51 : : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : : * LWLock has to be held exclusively; when iterating over the replication
53 : : * progress a shared lock has to be held, the same when advancing the
54 : : * replication progress of an individual backend that has not setup as the
55 : : * session's replication origin.
56 : : *
57 : : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : : * replication progress slot that slot's lwlock has to be held. That's
59 : : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : : * all our platforms, but it also simplifies memory ordering concerns
61 : : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : : * so it's less harmful to hold the lock over a WAL write
63 : : * (cf. AdvanceReplicationProgress).
64 : : *
65 : : * ---------------------------------------------------------------------------
66 : : */
67 : :
68 : : #include "postgres.h"
69 : :
70 : : #include <unistd.h>
71 : : #include <sys/stat.h>
72 : :
73 : : #include "access/genam.h"
74 : : #include "access/htup_details.h"
75 : : #include "access/table.h"
76 : : #include "access/xact.h"
77 : : #include "access/xloginsert.h"
78 : : #include "catalog/catalog.h"
79 : : #include "catalog/indexing.h"
80 : : #include "catalog/pg_subscription.h"
81 : : #include "funcapi.h"
82 : : #include "miscadmin.h"
83 : : #include "nodes/execnodes.h"
84 : : #include "pgstat.h"
85 : : #include "replication/origin.h"
86 : : #include "replication/slot.h"
87 : : #include "storage/condition_variable.h"
88 : : #include "storage/fd.h"
89 : : #include "storage/ipc.h"
90 : : #include "storage/lmgr.h"
91 : : #include "utils/builtins.h"
92 : : #include "utils/fmgroids.h"
93 : : #include "utils/guc.h"
94 : : #include "utils/pg_lsn.h"
95 : : #include "utils/rel.h"
96 : : #include "utils/snapmgr.h"
97 : : #include "utils/syscache.h"
98 : :
99 : : /* paths for replication origin checkpoint files */
100 : : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 : :
103 : : /* GUC variables */
104 : : int max_active_replication_origins = 10;
105 : :
106 : : /*
107 : : * Replay progress of a single remote node.
108 : : */
109 : : typedef struct ReplicationState
110 : : {
111 : : /*
112 : : * Local identifier for the remote node.
113 : : */
114 : : RepOriginId roident;
115 : :
116 : : /*
117 : : * Location of the latest commit from the remote side.
118 : : */
119 : : XLogRecPtr remote_lsn;
120 : :
121 : : /*
122 : : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : : * during a checkpoint so we know the commit record actually is safe on
124 : : * disk.
125 : : */
126 : : XLogRecPtr local_lsn;
127 : :
128 : : /*
129 : : * PID of backend that's acquired slot, or 0 if none.
130 : : */
131 : : int acquired_by;
132 : :
133 : : /* Count of processes that are currently using this origin. */
134 : : int refcount;
135 : :
136 : : /*
137 : : * Condition variable that's signaled when acquired_by changes.
138 : : */
139 : : ConditionVariable origin_cv;
140 : :
141 : : /*
142 : : * Lock protecting remote_lsn and local_lsn.
143 : : */
144 : : LWLock lock;
145 : : } ReplicationState;
146 : :
147 : : /*
148 : : * On disk version of ReplicationState.
149 : : */
150 : : typedef struct ReplicationStateOnDisk
151 : : {
152 : : RepOriginId roident;
153 : : XLogRecPtr remote_lsn;
154 : : } ReplicationStateOnDisk;
155 : :
156 : :
157 : : typedef struct ReplicationStateCtl
158 : : {
159 : : /* Tranche to use for per-origin LWLocks */
160 : : int tranche_id;
161 : : /* Array of length max_active_replication_origins */
162 : : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
163 : : } ReplicationStateCtl;
164 : :
165 : : /* external variables */
166 : : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
167 : : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
168 : : TimestampTz replorigin_session_origin_timestamp = 0;
169 : :
170 : : /*
171 : : * Base address into a shared memory array of replication states of size
172 : : * max_active_replication_origins.
173 : : */
174 : : static ReplicationState *replication_states;
175 : :
176 : : /*
177 : : * Actual shared memory block (replication_states[] is now part of this).
178 : : */
179 : : static ReplicationStateCtl *replication_states_ctl;
180 : :
181 : : /*
182 : : * We keep a pointer to this backend's ReplicationState to avoid having to
183 : : * search the replication_states array in replorigin_session_advance for each
184 : : * remote commit. (Ownership of a backend's own entry can only be changed by
185 : : * that backend.)
186 : : */
187 : : static ReplicationState *session_replication_state = NULL;
188 : :
189 : : /* Magic for on disk files. */
190 : : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
191 : :
192 : : static void
193 : 1 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
194 : : {
195 [ - + # # ]: 1 : if (check_origins && max_active_replication_origins == 0)
196 [ # # # # ]: 0 : ereport(ERROR,
197 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
198 : : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
199 : :
200 [ + - + - ]: 1 : if (!recoveryOK && RecoveryInProgress())
201 [ # # # # ]: 0 : ereport(ERROR,
202 : : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
203 : : errmsg("cannot manipulate replication origins during recovery")));
204 : 1 : }
205 : :
206 : :
207 : : /*
208 : : * IsReservedOriginName
209 : : * True iff name is either "none" or "any".
210 : : */
211 : : static bool
212 : 1 : IsReservedOriginName(const char *name)
213 : : {
214 [ - + ]: 1 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
215 : 1 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
216 : : }
217 : :
218 : : /* ---------------------------------------------------------------------------
219 : : * Functions for working with replication origins themselves.
220 : : * ---------------------------------------------------------------------------
221 : : */
222 : :
223 : : /*
224 : : * Check for a persistent replication origin identified by name.
225 : : *
226 : : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
227 : : */
228 : : RepOriginId
229 : 14 : replorigin_by_name(const char *roname, bool missing_ok)
230 : : {
231 : 14 : Form_pg_replication_origin ident;
232 : 14 : Oid roident = InvalidOid;
233 : 14 : HeapTuple tuple;
234 : 14 : Datum roname_d;
235 : :
236 : 14 : roname_d = CStringGetTextDatum(roname);
237 : :
238 : 14 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
239 [ + - ]: 14 : if (HeapTupleIsValid(tuple))
240 : : {
241 : 14 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
242 : 14 : roident = ident->roident;
243 : 14 : ReleaseSysCache(tuple);
244 : 14 : }
245 [ # # ]: 0 : else if (!missing_ok)
246 [ # # # # ]: 0 : ereport(ERROR,
247 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
248 : : errmsg("replication origin \"%s\" does not exist",
249 : : roname)));
250 : :
251 : 28 : return roident;
252 : 14 : }
253 : :
254 : : /*
255 : : * Create a replication origin.
256 : : *
257 : : * Needs to be called in a transaction.
258 : : */
259 : : RepOriginId
260 : 15 : replorigin_create(const char *roname)
261 : : {
262 : 15 : Oid roident;
263 : 15 : HeapTuple tuple = NULL;
264 : 15 : Relation rel;
265 : 15 : Datum roname_d;
266 : 15 : SnapshotData SnapshotDirty;
267 : 15 : SysScanDesc scan;
268 : 15 : ScanKeyData key;
269 : :
270 : : /*
271 : : * To avoid needing a TOAST table for pg_replication_origin, we limit
272 : : * replication origin names to 512 bytes. This should be more than enough
273 : : * for all practical use.
274 : : */
275 [ + + ]: 15 : if (strlen(roname) > MAX_RONAME_LEN)
276 [ + - + - ]: 1 : ereport(ERROR,
277 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
278 : : errmsg("replication origin name is too long"),
279 : : errdetail("Replication origin names must be no longer than %d bytes.",
280 : : MAX_RONAME_LEN)));
281 : :
282 : 14 : roname_d = CStringGetTextDatum(roname);
283 : :
284 [ + - ]: 14 : Assert(IsTransactionState());
285 : :
286 : : /*
287 : : * We need the numeric replication origin to be 16bit wide, so we cannot
288 : : * rely on the normal oid allocation. Instead we simply scan
289 : : * pg_replication_origin for the first unused id. That's not particularly
290 : : * efficient, but this should be a fairly infrequent operation - we can
291 : : * easily spend a bit more code on this when it turns out it needs to be
292 : : * faster.
293 : : *
294 : : * We handle concurrency by taking an exclusive lock (allowing reads!)
295 : : * over the table for the duration of the search. Because we use a "dirty
296 : : * snapshot" we can read rows that other in-progress sessions have
297 : : * written, even though they would be invisible with normal snapshots. Due
298 : : * to the exclusive lock there's no danger that new rows can appear while
299 : : * we're checking.
300 : : */
301 : 14 : InitDirtySnapshot(SnapshotDirty);
302 : :
303 : 14 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
304 : :
305 : : /*
306 : : * We want to be able to access pg_replication_origin without setting up a
307 : : * snapshot. To make that safe, it needs to not have a TOAST table, since
308 : : * TOASTed data cannot be fetched without a snapshot. As of this writing,
309 : : * its only varlena column is roname, which we limit to 512 bytes to avoid
310 : : * needing out-of-line storage. If you add a TOAST table to this catalog,
311 : : * be sure to set up a snapshot everywhere it might be needed. For more
312 : : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
313 : : */
314 [ + - ]: 14 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
315 : :
316 [ - + ]: 18 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
317 : : {
318 : 18 : bool nulls[Natts_pg_replication_origin];
319 : 18 : Datum values[Natts_pg_replication_origin];
320 : 18 : bool collides;
321 : :
322 [ + - ]: 18 : CHECK_FOR_INTERRUPTS();
323 : :
324 : 18 : ScanKeyInit(&key,
325 : : Anum_pg_replication_origin_roident,
326 : : BTEqualStrategyNumber, F_OIDEQ,
327 : 18 : ObjectIdGetDatum(roident));
328 : :
329 : 18 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
330 : : true /* indexOK */ ,
331 : : &SnapshotDirty,
332 : : 1, &key);
333 : :
334 : 18 : collides = HeapTupleIsValid(systable_getnext(scan));
335 : :
336 : 18 : systable_endscan(scan);
337 : :
338 [ + + ]: 18 : if (!collides)
339 : : {
340 : : /*
341 : : * Ok, found an unused roident, insert the new row and do a CCI,
342 : : * so our callers can look it up if they want to.
343 : : */
344 : 14 : memset(&nulls, 0, sizeof(nulls));
345 : :
346 : 14 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
347 : 14 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
348 : :
349 : 14 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
350 : 14 : CatalogTupleInsert(rel, tuple);
351 : 14 : CommandCounterIncrement();
352 : 14 : break;
353 : : }
354 [ - + + ]: 18 : }
355 : :
356 : : /* now release lock again, */
357 : 14 : table_close(rel, ExclusiveLock);
358 : :
359 [ + - ]: 14 : if (tuple == NULL)
360 [ # # # # ]: 0 : ereport(ERROR,
361 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
362 : : errmsg("could not find free replication origin ID")));
363 : :
364 : 14 : heap_freetuple(tuple);
365 : 28 : return roident;
366 : 14 : }
367 : :
368 : : /*
369 : : * Helper function to drop a replication origin.
370 : : */
371 : : static void
372 : 13 : replorigin_state_clear(RepOriginId roident, bool nowait)
373 : : {
374 : 13 : int i;
375 : :
376 : : /*
377 : : * Clean up the slot state info, if there is any matching slot.
378 : : */
379 : : restart:
380 : 13 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
381 : :
382 [ + + ]: 143 : for (i = 0; i < max_active_replication_origins; i++)
383 : : {
384 : 130 : ReplicationState *state = &replication_states[i];
385 : :
386 [ - + ]: 130 : if (state->roident == roident)
387 : : {
388 : : /* found our slot, is it busy? */
389 [ # # ]: 0 : if (state->refcount > 0)
390 : : {
391 : 0 : ConditionVariable *cv;
392 : :
393 [ # # ]: 0 : if (nowait)
394 [ # # # # : 0 : ereport(ERROR,
# # ]
395 : : (errcode(ERRCODE_OBJECT_IN_USE),
396 : : (state->acquired_by != 0)
397 : : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
398 : : state->roident,
399 : : state->acquired_by)
400 : : : errmsg("could not drop replication origin with ID %d, in use by another process",
401 : : state->roident)));
402 : :
403 : : /*
404 : : * We must wait and then retry. Since we don't know which CV
405 : : * to wait on until here, we can't readily use
406 : : * ConditionVariablePrepareToSleep (calling it here would be
407 : : * wrong, since we could miss the signal if we did so); just
408 : : * use ConditionVariableSleep directly.
409 : : */
410 : 0 : cv = &state->origin_cv;
411 : :
412 : 0 : LWLockRelease(ReplicationOriginLock);
413 : :
414 : 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
415 : : goto restart;
416 : 0 : }
417 : :
418 : : /* first make a WAL log entry */
419 : : {
420 : 0 : xl_replorigin_drop xlrec;
421 : :
422 : 0 : xlrec.node_id = roident;
423 : 0 : XLogBeginInsert();
424 : 0 : XLogRegisterData(&xlrec, sizeof(xlrec));
425 : 0 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
426 : 0 : }
427 : :
428 : : /* then clear the in-memory slot */
429 : 0 : state->roident = InvalidRepOriginId;
430 : 0 : state->remote_lsn = InvalidXLogRecPtr;
431 : 0 : state->local_lsn = InvalidXLogRecPtr;
432 : 0 : break;
433 : : }
434 [ - - - + ]: 130 : }
435 : 13 : LWLockRelease(ReplicationOriginLock);
436 : 13 : ConditionVariableCancelSleep();
437 : 13 : }
438 : :
439 : : /*
440 : : * Drop replication origin (by name).
441 : : *
442 : : * Needs to be called in a transaction.
443 : : */
444 : : void
445 : 13 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
446 : : {
447 : 13 : RepOriginId roident;
448 : 13 : Relation rel;
449 : 13 : HeapTuple tuple;
450 : :
451 [ + - ]: 13 : Assert(IsTransactionState());
452 : :
453 : 13 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
454 : :
455 : 13 : roident = replorigin_by_name(name, missing_ok);
456 : :
457 : : /* Lock the origin to prevent concurrent drops. */
458 : 13 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
459 : : AccessExclusiveLock);
460 : :
461 : 13 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
462 [ + - ]: 13 : if (!HeapTupleIsValid(tuple))
463 : : {
464 [ # # ]: 0 : if (!missing_ok)
465 [ # # # # ]: 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
466 : : roident);
467 : :
468 : : /*
469 : : * We don't need to retain the locks if the origin is already dropped.
470 : : */
471 : 0 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
472 : : AccessExclusiveLock);
473 : 0 : table_close(rel, RowExclusiveLock);
474 : 0 : return;
475 : : }
476 : :
477 : 13 : replorigin_state_clear(roident, nowait);
478 : :
479 : : /*
480 : : * Now, we can delete the catalog entry.
481 : : */
482 : 13 : CatalogTupleDelete(rel, &tuple->t_self);
483 : 13 : ReleaseSysCache(tuple);
484 : :
485 : 13 : CommandCounterIncrement();
486 : :
487 : : /* We keep the lock on pg_replication_origin until commit */
488 : 13 : table_close(rel, NoLock);
489 [ - + ]: 13 : }
490 : :
491 : : /*
492 : : * Lookup replication origin via its oid and return the name.
493 : : *
494 : : * The external name is palloc'd in the calling context.
495 : : *
496 : : * Returns true if the origin is known, false otherwise.
497 : : */
498 : : bool
499 : 0 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
500 : : {
501 : 0 : HeapTuple tuple;
502 : 0 : Form_pg_replication_origin ric;
503 : :
504 [ # # ]: 0 : Assert(OidIsValid((Oid) roident));
505 [ # # ]: 0 : Assert(roident != InvalidRepOriginId);
506 [ # # ]: 0 : Assert(roident != DoNotReplicateId);
507 : :
508 : 0 : tuple = SearchSysCache1(REPLORIGIDENT,
509 : 0 : ObjectIdGetDatum((Oid) roident));
510 : :
511 [ # # ]: 0 : if (HeapTupleIsValid(tuple))
512 : : {
513 : 0 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
514 : 0 : *roname = text_to_cstring(&ric->roname);
515 : 0 : ReleaseSysCache(tuple);
516 : :
517 : 0 : return true;
518 : : }
519 : : else
520 : : {
521 : 0 : *roname = NULL;
522 : :
523 [ # # ]: 0 : if (!missing_ok)
524 [ # # # # ]: 0 : ereport(ERROR,
525 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
526 : : errmsg("replication origin with ID %d does not exist",
527 : : roident)));
528 : :
529 : 0 : return false;
530 : : }
531 : 0 : }
532 : :
533 : :
534 : : /* ---------------------------------------------------------------------------
535 : : * Functions for handling replication progress.
536 : : * ---------------------------------------------------------------------------
537 : : */
538 : :
539 : : Size
540 : 21 : ReplicationOriginShmemSize(void)
541 : : {
542 : 21 : Size size = 0;
543 : :
544 [ + - ]: 21 : if (max_active_replication_origins == 0)
545 : 0 : return size;
546 : :
547 : 21 : size = add_size(size, offsetof(ReplicationStateCtl, states));
548 : :
549 : 42 : size = add_size(size,
550 : 21 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
551 : 21 : return size;
552 : 21 : }
553 : :
554 : : void
555 : 6 : ReplicationOriginShmemInit(void)
556 : : {
557 : 6 : bool found;
558 : :
559 [ + - ]: 6 : if (max_active_replication_origins == 0)
560 : 0 : return;
561 : :
562 : 6 : replication_states_ctl = (ReplicationStateCtl *)
563 : 6 : ShmemInitStruct("ReplicationOriginState",
564 : 6 : ReplicationOriginShmemSize(),
565 : : &found);
566 : 6 : replication_states = replication_states_ctl->states;
567 : :
568 [ - + ]: 6 : if (!found)
569 : : {
570 : 6 : int i;
571 : :
572 [ + - + - : 492 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
+ - - + +
+ ]
573 : :
574 : 6 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
575 : :
576 [ + + ]: 66 : for (i = 0; i < max_active_replication_origins; i++)
577 : : {
578 : 120 : LWLockInitialize(&replication_states[i].lock,
579 : 60 : replication_states_ctl->tranche_id);
580 : 60 : ConditionVariableInit(&replication_states[i].origin_cv);
581 : 60 : }
582 : 6 : }
583 [ - + ]: 6 : }
584 : :
585 : : /* ---------------------------------------------------------------------------
586 : : * Perform a checkpoint of each replication origin's progress with respect to
587 : : * the replayed remote_lsn. Make sure that all transactions we refer to in the
588 : : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
589 : : * if the transactions were originally committed asynchronously.
590 : : *
591 : : * We store checkpoints in the following format:
592 : : * +-------+------------------------+------------------+-----+--------+
593 : : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
594 : : * +-------+------------------------+------------------+-----+--------+
595 : : *
596 : : * So its just the magic, followed by the statically sized
597 : : * ReplicationStateOnDisk structs. Note that the maximum number of
598 : : * ReplicationState is determined by max_active_replication_origins.
599 : : * ---------------------------------------------------------------------------
600 : : */
601 : : void
602 : 7 : CheckPointReplicationOrigin(void)
603 : : {
604 : 7 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
605 : 7 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
606 : 7 : int tmpfd;
607 : 7 : int i;
608 : 7 : uint32 magic = REPLICATION_STATE_MAGIC;
609 : 7 : pg_crc32c crc;
610 : :
611 [ + - ]: 7 : if (max_active_replication_origins == 0)
612 : 0 : return;
613 : :
614 : 7 : INIT_CRC32C(crc);
615 : :
616 : : /* make sure no old temp file is remaining */
617 [ + - + - ]: 7 : if (unlink(tmppath) < 0 && errno != ENOENT)
618 [ # # # # ]: 0 : ereport(PANIC,
619 : : (errcode_for_file_access(),
620 : : errmsg("could not remove file \"%s\": %m",
621 : : tmppath)));
622 : :
623 : : /*
624 : : * no other backend can perform this at the same time; only one checkpoint
625 : : * can happen at a time.
626 : : */
627 : 7 : tmpfd = OpenTransientFile(tmppath,
628 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
629 [ + - ]: 7 : if (tmpfd < 0)
630 [ # # # # ]: 0 : ereport(PANIC,
631 : : (errcode_for_file_access(),
632 : : errmsg("could not create file \"%s\": %m",
633 : : tmppath)));
634 : :
635 : : /* write magic */
636 : 7 : errno = 0;
637 [ + - ]: 7 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
638 : : {
639 : : /* if write didn't set errno, assume problem is no disk space */
640 [ # # ]: 0 : if (errno == 0)
641 : 0 : errno = ENOSPC;
642 [ # # # # ]: 0 : ereport(PANIC,
643 : : (errcode_for_file_access(),
644 : : errmsg("could not write to file \"%s\": %m",
645 : : tmppath)));
646 : 0 : }
647 : 7 : COMP_CRC32C(crc, &magic, sizeof(magic));
648 : :
649 : : /* prevent concurrent creations/drops */
650 : 7 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
651 : :
652 : : /* write actual data */
653 [ + + ]: 77 : for (i = 0; i < max_active_replication_origins; i++)
654 : : {
655 : 70 : ReplicationStateOnDisk disk_state;
656 : 70 : ReplicationState *curstate = &replication_states[i];
657 : 70 : XLogRecPtr local_lsn;
658 : :
659 [ - + ]: 70 : if (curstate->roident == InvalidRepOriginId)
660 : 70 : continue;
661 : :
662 : : /* zero, to avoid uninitialized padding bytes */
663 : 0 : memset(&disk_state, 0, sizeof(disk_state));
664 : :
665 : 0 : LWLockAcquire(&curstate->lock, LW_SHARED);
666 : :
667 : 0 : disk_state.roident = curstate->roident;
668 : :
669 : 0 : disk_state.remote_lsn = curstate->remote_lsn;
670 : 0 : local_lsn = curstate->local_lsn;
671 : :
672 : 0 : LWLockRelease(&curstate->lock);
673 : :
674 : : /* make sure we only write out a commit that's persistent */
675 : 0 : XLogFlush(local_lsn);
676 : :
677 : 0 : errno = 0;
678 [ # # ]: 0 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
679 : : sizeof(disk_state))
680 : : {
681 : : /* if write didn't set errno, assume problem is no disk space */
682 [ # # ]: 0 : if (errno == 0)
683 : 0 : errno = ENOSPC;
684 [ # # # # ]: 0 : ereport(PANIC,
685 : : (errcode_for_file_access(),
686 : : errmsg("could not write to file \"%s\": %m",
687 : : tmppath)));
688 : 0 : }
689 : :
690 : 0 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
691 [ + - ]: 70 : }
692 : :
693 : 7 : LWLockRelease(ReplicationOriginLock);
694 : :
695 : : /* write out the CRC */
696 : 7 : FIN_CRC32C(crc);
697 : 7 : errno = 0;
698 [ + - ]: 7 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
699 : : {
700 : : /* if write didn't set errno, assume problem is no disk space */
701 [ # # ]: 0 : if (errno == 0)
702 : 0 : errno = ENOSPC;
703 [ # # # # ]: 0 : ereport(PANIC,
704 : : (errcode_for_file_access(),
705 : : errmsg("could not write to file \"%s\": %m",
706 : : tmppath)));
707 : 0 : }
708 : :
709 [ + - ]: 7 : if (CloseTransientFile(tmpfd) != 0)
710 [ # # # # ]: 0 : ereport(PANIC,
711 : : (errcode_for_file_access(),
712 : : errmsg("could not close file \"%s\": %m",
713 : : tmppath)));
714 : :
715 : : /* fsync, rename to permanent file, fsync file and directory */
716 : 7 : durable_rename(tmppath, path, PANIC);
717 : 7 : }
718 : :
719 : : /*
720 : : * Recover replication replay status from checkpoint data saved earlier by
721 : : * CheckPointReplicationOrigin.
722 : : *
723 : : * This only needs to be called at startup and *not* during every checkpoint
724 : : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
725 : : * state thereafter can be recovered by looking at commit records.
726 : : */
727 : : void
728 : 4 : StartupReplicationOrigin(void)
729 : : {
730 : 4 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
731 : 4 : int fd;
732 : 4 : int readBytes;
733 : 4 : uint32 magic = REPLICATION_STATE_MAGIC;
734 : 4 : int last_state = 0;
735 : 4 : pg_crc32c file_crc;
736 : 4 : pg_crc32c crc;
737 : :
738 : : /* don't want to overwrite already existing state */
739 : : #ifdef USE_ASSERT_CHECKING
740 : : static bool already_started = false;
741 : :
742 [ + - ]: 4 : Assert(!already_started);
743 : 4 : already_started = true;
744 : : #endif
745 : :
746 [ + - ]: 4 : if (max_active_replication_origins == 0)
747 : 0 : return;
748 : :
749 : 4 : INIT_CRC32C(crc);
750 : :
751 [ - + - + ]: 4 : elog(DEBUG2, "starting up replication origin progress state");
752 : :
753 : 4 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
754 : :
755 : : /*
756 : : * might have had max_active_replication_origins == 0 last run, or we just
757 : : * brought up a standby.
758 : : */
759 [ + + - + ]: 4 : if (fd < 0 && errno == ENOENT)
760 : 1 : return;
761 [ + - ]: 3 : else if (fd < 0)
762 [ # # # # ]: 0 : ereport(PANIC,
763 : : (errcode_for_file_access(),
764 : : errmsg("could not open file \"%s\": %m",
765 : : path)));
766 : :
767 : : /* verify magic, that is written even if nothing was active */
768 : 3 : readBytes = read(fd, &magic, sizeof(magic));
769 [ + - ]: 3 : if (readBytes != sizeof(magic))
770 : : {
771 [ # # ]: 0 : if (readBytes < 0)
772 [ # # # # ]: 0 : ereport(PANIC,
773 : : (errcode_for_file_access(),
774 : : errmsg("could not read file \"%s\": %m",
775 : : path)));
776 : : else
777 [ # # # # ]: 0 : ereport(PANIC,
778 : : (errcode(ERRCODE_DATA_CORRUPTED),
779 : : errmsg("could not read file \"%s\": read %d of %zu",
780 : : path, readBytes, sizeof(magic))));
781 : 0 : }
782 : 3 : COMP_CRC32C(crc, &magic, sizeof(magic));
783 : :
784 [ + - ]: 3 : if (magic != REPLICATION_STATE_MAGIC)
785 [ # # # # ]: 0 : ereport(PANIC,
786 : : (errmsg("replication checkpoint has wrong magic %u instead of %u",
787 : : magic, REPLICATION_STATE_MAGIC)));
788 : :
789 : : /* we can skip locking here, no other access is possible */
790 : :
791 : : /* recover individual states, until there are no more to be found */
792 : 3 : while (true)
793 : : {
794 : 3 : ReplicationStateOnDisk disk_state;
795 : :
796 : 3 : readBytes = read(fd, &disk_state, sizeof(disk_state));
797 : :
798 [ + - ]: 3 : if (readBytes < 0)
799 : : {
800 [ # # # # ]: 0 : ereport(PANIC,
801 : : (errcode_for_file_access(),
802 : : errmsg("could not read file \"%s\": %m",
803 : : path)));
804 : 0 : }
805 : :
806 : : /* no further data */
807 [ + - ]: 3 : if (readBytes == sizeof(crc))
808 : : {
809 : 3 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
810 : 3 : break;
811 : : }
812 : :
813 [ # # ]: 0 : if (readBytes != sizeof(disk_state))
814 : : {
815 [ # # # # ]: 0 : ereport(PANIC,
816 : : (errcode_for_file_access(),
817 : : errmsg("could not read file \"%s\": read %d of %zu",
818 : : path, readBytes, sizeof(disk_state))));
819 : 0 : }
820 : :
821 : 0 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
822 : :
823 [ # # ]: 0 : if (last_state == max_active_replication_origins)
824 [ # # # # ]: 0 : ereport(PANIC,
825 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
826 : : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
827 : :
828 : : /* copy data to shared memory */
829 : 0 : replication_states[last_state].roident = disk_state.roident;
830 : 0 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
831 : 0 : last_state++;
832 : :
833 [ # # # # ]: 0 : ereport(LOG,
834 : : errmsg("recovered replication state of node %d to %X/%08X",
835 : : disk_state.roident,
836 : : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
837 [ - + ]: 3 : }
838 : :
839 : : /* now check checksum */
840 : 3 : FIN_CRC32C(crc);
841 [ + - ]: 3 : if (file_crc != crc)
842 [ # # # # ]: 0 : ereport(PANIC,
843 : : (errcode(ERRCODE_DATA_CORRUPTED),
844 : : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
845 : : crc, file_crc)));
846 : :
847 [ + - ]: 3 : if (CloseTransientFile(fd) != 0)
848 [ # # # # ]: 0 : ereport(PANIC,
849 : : (errcode_for_file_access(),
850 : : errmsg("could not close file \"%s\": %m",
851 : : path)));
852 : 4 : }
853 : :
854 : : void
855 : 0 : replorigin_redo(XLogReaderState *record)
856 : : {
857 : 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
858 : :
859 [ # # # ]: 0 : switch (info)
860 : : {
861 : : case XLOG_REPLORIGIN_SET:
862 : : {
863 : 0 : xl_replorigin_set *xlrec =
864 : 0 : (xl_replorigin_set *) XLogRecGetData(record);
865 : :
866 : 0 : replorigin_advance(xlrec->node_id,
867 : 0 : xlrec->remote_lsn, record->EndRecPtr,
868 : 0 : xlrec->force /* backward */ ,
869 : : false /* WAL log */ );
870 : : break;
871 : 0 : }
872 : : case XLOG_REPLORIGIN_DROP:
873 : : {
874 : 0 : xl_replorigin_drop *xlrec;
875 : 0 : int i;
876 : :
877 : 0 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
878 : :
879 [ # # ]: 0 : for (i = 0; i < max_active_replication_origins; i++)
880 : : {
881 : 0 : ReplicationState *state = &replication_states[i];
882 : :
883 : : /* found our slot */
884 [ # # ]: 0 : if (state->roident == xlrec->node_id)
885 : : {
886 : : /* reset entry */
887 : 0 : state->roident = InvalidRepOriginId;
888 : 0 : state->remote_lsn = InvalidXLogRecPtr;
889 : 0 : state->local_lsn = InvalidXLogRecPtr;
890 : 0 : break;
891 : : }
892 [ # # # ]: 0 : }
893 : : break;
894 : 0 : }
895 : : default:
896 [ # # # # ]: 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
897 : 0 : }
898 : 0 : }
899 : :
900 : :
901 : : /*
902 : : * Tell the replication origin progress machinery that a commit from 'node'
903 : : * that originated at the LSN remote_commit on the remote node was replayed
904 : : * successfully and that we don't need to do so again. In combination with
905 : : * setting up replorigin_session_origin_lsn and replorigin_session_origin
906 : : * that ensures we won't lose knowledge about that after a crash if the
907 : : * transaction had a persistent effect (think of asynchronous commits).
908 : : *
909 : : * local_commit needs to be a local LSN of the commit so that we can make sure
910 : : * upon a checkpoint that enough WAL has been persisted to disk.
911 : : *
912 : : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
913 : : * unless running in recovery.
914 : : */
915 : : void
916 : 0 : replorigin_advance(RepOriginId node,
917 : : XLogRecPtr remote_commit, XLogRecPtr local_commit,
918 : : bool go_backward, bool wal_log)
919 : : {
920 : 0 : int i;
921 : 0 : ReplicationState *replication_state = NULL;
922 : 0 : ReplicationState *free_state = NULL;
923 : :
924 [ # # ]: 0 : Assert(node != InvalidRepOriginId);
925 : :
926 : : /* we don't track DoNotReplicateId */
927 [ # # ]: 0 : if (node == DoNotReplicateId)
928 : 0 : return;
929 : :
930 : : /*
931 : : * XXX: For the case where this is called by WAL replay, it'd be more
932 : : * efficient to restore into a backend local hashtable and only dump into
933 : : * shmem after recovery is finished. Let's wait with implementing that
934 : : * till it's shown to be a measurable expense
935 : : */
936 : :
937 : : /* Lock exclusively, as we may have to create a new table entry. */
938 : 0 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
939 : :
940 : : /*
941 : : * Search for either an existing slot for the origin, or a free one we can
942 : : * use.
943 : : */
944 [ # # ]: 0 : for (i = 0; i < max_active_replication_origins; i++)
945 : : {
946 : 0 : ReplicationState *curstate = &replication_states[i];
947 : :
948 : : /* remember where to insert if necessary */
949 [ # # # # ]: 0 : if (curstate->roident == InvalidRepOriginId &&
950 : 0 : free_state == NULL)
951 : : {
952 : 0 : free_state = curstate;
953 : 0 : continue;
954 : : }
955 : :
956 : : /* not our slot */
957 [ # # ]: 0 : if (curstate->roident != node)
958 : : {
959 : 0 : continue;
960 : : }
961 : :
962 : : /* ok, found slot */
963 : 0 : replication_state = curstate;
964 : :
965 : 0 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
966 : :
967 : : /* Make sure it's not used by somebody else */
968 [ # # ]: 0 : if (replication_state->refcount > 0)
969 : : {
970 [ # # # # : 0 : ereport(ERROR,
# # ]
971 : : (errcode(ERRCODE_OBJECT_IN_USE),
972 : : (replication_state->acquired_by != 0)
973 : : ? errmsg("replication origin with ID %d is already active for PID %d",
974 : : replication_state->roident,
975 : : replication_state->acquired_by)
976 : : : errmsg("replication origin with ID %d is already active in another process",
977 : : replication_state->roident)));
978 : 0 : }
979 : :
980 : 0 : break;
981 [ # # ]: 0 : }
982 : :
983 [ # # # # ]: 0 : if (replication_state == NULL && free_state == NULL)
984 [ # # # # ]: 0 : ereport(ERROR,
985 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
986 : : errmsg("could not find free replication state slot for replication origin with ID %d",
987 : : node),
988 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
989 : :
990 [ # # ]: 0 : if (replication_state == NULL)
991 : : {
992 : : /* initialize new slot */
993 : 0 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
994 : 0 : replication_state = free_state;
995 [ # # ]: 0 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
996 [ # # ]: 0 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
997 : 0 : replication_state->roident = node;
998 : 0 : }
999 : :
1000 [ # # ]: 0 : Assert(replication_state->roident != InvalidRepOriginId);
1001 : :
1002 : : /*
1003 : : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1004 : : * and the standby gets the message. Primarily this will be called during
1005 : : * WAL replay (of commit records) where no WAL logging is necessary.
1006 : : */
1007 [ # # ]: 0 : if (wal_log)
1008 : : {
1009 : 0 : xl_replorigin_set xlrec;
1010 : :
1011 : 0 : xlrec.remote_lsn = remote_commit;
1012 : 0 : xlrec.node_id = node;
1013 : 0 : xlrec.force = go_backward;
1014 : :
1015 : 0 : XLogBeginInsert();
1016 : 0 : XLogRegisterData(&xlrec, sizeof(xlrec));
1017 : :
1018 : 0 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1019 : 0 : }
1020 : :
1021 : : /*
1022 : : * Due to - harmless - race conditions during a checkpoint we could see
1023 : : * values here that are older than the ones we already have in memory. We
1024 : : * could also see older values for prepared transactions when the prepare
1025 : : * is sent at a later point of time along with commit prepared and there
1026 : : * are other transactions commits between prepare and commit prepared. See
1027 : : * ReorderBufferFinishPrepared. Don't overwrite those.
1028 : : */
1029 [ # # # # ]: 0 : if (go_backward || replication_state->remote_lsn < remote_commit)
1030 : 0 : replication_state->remote_lsn = remote_commit;
1031 [ # # # # ]: 0 : if (XLogRecPtrIsValid(local_commit) &&
1032 [ # # ]: 0 : (go_backward || replication_state->local_lsn < local_commit))
1033 : 0 : replication_state->local_lsn = local_commit;
1034 : 0 : LWLockRelease(&replication_state->lock);
1035 : :
1036 : : /*
1037 : : * Release *after* changing the LSNs, slot isn't acquired and thus could
1038 : : * otherwise be dropped anytime.
1039 : : */
1040 : 0 : LWLockRelease(ReplicationOriginLock);
1041 : 0 : }
1042 : :
1043 : :
1044 : : XLogRecPtr
1045 : 1 : replorigin_get_progress(RepOriginId node, bool flush)
1046 : : {
1047 : 1 : int i;
1048 : 1 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1049 : 1 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1050 : :
1051 : : /* prevent slots from being concurrently dropped */
1052 : 1 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1053 : :
1054 [ + + ]: 11 : for (i = 0; i < max_active_replication_origins; i++)
1055 : : {
1056 : 10 : ReplicationState *state;
1057 : :
1058 : 10 : state = &replication_states[i];
1059 : :
1060 [ - + ]: 10 : if (state->roident == node)
1061 : : {
1062 : 0 : LWLockAcquire(&state->lock, LW_SHARED);
1063 : :
1064 : 0 : remote_lsn = state->remote_lsn;
1065 : 0 : local_lsn = state->local_lsn;
1066 : :
1067 : 0 : LWLockRelease(&state->lock);
1068 : :
1069 : 0 : break;
1070 : : }
1071 [ - - + ]: 10 : }
1072 : :
1073 : 1 : LWLockRelease(ReplicationOriginLock);
1074 : :
1075 [ - + # # ]: 1 : if (flush && XLogRecPtrIsValid(local_lsn))
1076 : 0 : XLogFlush(local_lsn);
1077 : :
1078 : 2 : return remote_lsn;
1079 : 1 : }
1080 : :
1081 : : /* Helper function to reset the session replication origin */
1082 : : static void
1083 : 0 : replorigin_session_reset_internal(void)
1084 : : {
1085 : 0 : ConditionVariable *cv;
1086 : :
1087 [ # # ]: 0 : Assert(session_replication_state != NULL);
1088 : :
1089 : 0 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1090 : :
1091 : : /* The origin must be held by at least one process at this point. */
1092 [ # # ]: 0 : Assert(session_replication_state->refcount > 0);
1093 : :
1094 : : /*
1095 : : * Reset the PID only if the current session is the first to set up this
1096 : : * origin. This avoids clearing the first process's PID when any other
1097 : : * session releases the origin.
1098 : : */
1099 [ # # ]: 0 : if (session_replication_state->acquired_by == MyProcPid)
1100 : 0 : session_replication_state->acquired_by = 0;
1101 : :
1102 : 0 : session_replication_state->refcount--;
1103 : :
1104 : 0 : cv = &session_replication_state->origin_cv;
1105 : 0 : session_replication_state = NULL;
1106 : :
1107 : 0 : LWLockRelease(ReplicationOriginLock);
1108 : :
1109 : 0 : ConditionVariableBroadcast(cv);
1110 : 0 : }
1111 : :
1112 : : /*
1113 : : * Tear down a (possibly) configured session replication origin during process
1114 : : * exit.
1115 : : */
1116 : : static void
1117 : 0 : ReplicationOriginExitCleanup(int code, Datum arg)
1118 : : {
1119 [ # # ]: 0 : if (session_replication_state == NULL)
1120 : 0 : return;
1121 : :
1122 : 0 : replorigin_session_reset_internal();
1123 : 0 : }
1124 : :
1125 : : /*
1126 : : * Setup a replication origin in the shared memory struct if it doesn't
1127 : : * already exist and cache access to the specific ReplicationSlot so the
1128 : : * array doesn't have to be searched when calling
1129 : : * replorigin_session_advance().
1130 : : *
1131 : : * Normally only one such cached origin can exist per process so the cached
1132 : : * value can only be set again after the previous value is torn down with
1133 : : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1134 : : * (meaning the slot is not allowed to be already acquired by another process).
1135 : : *
1136 : : * However, sometimes multiple processes can safely re-use the same origin slot
1137 : : * (for example, multiple parallel apply processes can safely use the same
1138 : : * origin, provided they maintain commit order by allowing only one process to
1139 : : * commit at a time). For this case the first process must pass acquired_by =
1140 : : * 0, and then the other processes sharing that same origin can pass
1141 : : * acquired_by = PID of the first process.
1142 : : */
1143 : : void
1144 : 0 : replorigin_session_setup(RepOriginId node, int acquired_by)
1145 : : {
1146 : : static bool registered_cleanup;
1147 : 0 : int i;
1148 : 0 : int free_slot = -1;
1149 : :
1150 [ # # ]: 0 : if (!registered_cleanup)
1151 : : {
1152 : 0 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1153 : 0 : registered_cleanup = true;
1154 : 0 : }
1155 : :
1156 [ # # ]: 0 : Assert(max_active_replication_origins > 0);
1157 : :
1158 [ # # ]: 0 : if (session_replication_state != NULL)
1159 [ # # # # ]: 0 : ereport(ERROR,
1160 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1161 : : errmsg("cannot setup replication origin when one is already setup")));
1162 : :
1163 : : /* Lock exclusively, as we may have to create a new table entry. */
1164 : 0 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1165 : :
1166 : : /*
1167 : : * Search for either an existing slot for the origin, or a free one we can
1168 : : * use.
1169 : : */
1170 [ # # ]: 0 : for (i = 0; i < max_active_replication_origins; i++)
1171 : : {
1172 : 0 : ReplicationState *curstate = &replication_states[i];
1173 : :
1174 : : /* remember where to insert if necessary */
1175 [ # # # # ]: 0 : if (curstate->roident == InvalidRepOriginId &&
1176 : 0 : free_slot == -1)
1177 : : {
1178 : 0 : free_slot = i;
1179 : 0 : continue;
1180 : : }
1181 : :
1182 : : /* not our slot */
1183 [ # # ]: 0 : if (curstate->roident != node)
1184 : 0 : continue;
1185 : :
1186 [ # # # # ]: 0 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1187 : : {
1188 [ # # # # ]: 0 : ereport(ERROR,
1189 : : (errcode(ERRCODE_OBJECT_IN_USE),
1190 : : errmsg("replication origin with ID %d is already active for PID %d",
1191 : : curstate->roident, curstate->acquired_by)));
1192 : 0 : }
1193 : :
1194 [ # # ]: 0 : else if (curstate->acquired_by != acquired_by)
1195 : : {
1196 [ # # # # ]: 0 : ereport(ERROR,
1197 : : (errcode(ERRCODE_OBJECT_IN_USE),
1198 : : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1199 : : node, acquired_by)));
1200 : 0 : }
1201 : :
1202 : : /*
1203 : : * The origin is in use, but PID is not recorded. This can happen if
1204 : : * the process that originally acquired the origin exited without
1205 : : * releasing it. To ensure correctness, other processes cannot acquire
1206 : : * the origin until all processes currently using it have released it.
1207 : : */
1208 [ # # # # ]: 0 : else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1209 [ # # # # ]: 0 : ereport(ERROR,
1210 : : (errcode(ERRCODE_OBJECT_IN_USE),
1211 : : errmsg("replication origin with ID %d is already active in another process",
1212 : : curstate->roident)));
1213 : :
1214 : : /* ok, found slot */
1215 : 0 : session_replication_state = curstate;
1216 : 0 : break;
1217 [ # # # ]: 0 : }
1218 : :
1219 : :
1220 [ # # # # ]: 0 : if (session_replication_state == NULL && free_slot == -1)
1221 [ # # # # ]: 0 : ereport(ERROR,
1222 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1223 : : errmsg("could not find free replication state slot for replication origin with ID %d",
1224 : : node),
1225 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
1226 [ # # ]: 0 : else if (session_replication_state == NULL)
1227 : : {
1228 [ # # ]: 0 : if (acquired_by)
1229 [ # # # # ]: 0 : ereport(ERROR,
1230 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1231 : : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1232 : : acquired_by, node)));
1233 : :
1234 : : /* initialize new slot */
1235 : 0 : session_replication_state = &replication_states[free_slot];
1236 [ # # ]: 0 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1237 [ # # ]: 0 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
1238 : 0 : session_replication_state->roident = node;
1239 : 0 : }
1240 : :
1241 : :
1242 [ # # ]: 0 : Assert(session_replication_state->roident != InvalidRepOriginId);
1243 : :
1244 [ # # ]: 0 : if (acquired_by == 0)
1245 : : {
1246 : 0 : session_replication_state->acquired_by = MyProcPid;
1247 [ # # ]: 0 : Assert(session_replication_state->refcount == 0);
1248 : 0 : }
1249 : : else
1250 : : {
1251 : : /*
1252 : : * Sanity check: the origin must already be acquired by the process
1253 : : * passed as input, and at least one process must be using it.
1254 : : */
1255 [ # # ]: 0 : Assert(session_replication_state->acquired_by == acquired_by);
1256 [ # # ]: 0 : Assert(session_replication_state->refcount > 0);
1257 : : }
1258 : :
1259 : 0 : session_replication_state->refcount++;
1260 : :
1261 : 0 : LWLockRelease(ReplicationOriginLock);
1262 : :
1263 : : /* probably this one is pointless */
1264 : 0 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1265 : 0 : }
1266 : :
1267 : : /*
1268 : : * Reset replay state previously setup in this session.
1269 : : *
1270 : : * This function may only be called if an origin was setup with
1271 : : * replorigin_session_setup().
1272 : : */
1273 : : void
1274 : 0 : replorigin_session_reset(void)
1275 : : {
1276 [ # # ]: 0 : Assert(max_active_replication_origins != 0);
1277 : :
1278 [ # # ]: 0 : if (session_replication_state == NULL)
1279 [ # # # # ]: 0 : ereport(ERROR,
1280 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1281 : : errmsg("no replication origin is configured")));
1282 : :
1283 : : /*
1284 : : * Restrict explicit resetting of the replication origin if it was first
1285 : : * acquired by this process and others are still using it. While the
1286 : : * system handles this safely (as happens if the first session exits
1287 : : * without calling reset), it is best to avoid doing so.
1288 : : */
1289 [ # # # # ]: 0 : if (session_replication_state->acquired_by == MyProcPid &&
1290 : 0 : session_replication_state->refcount > 1)
1291 [ # # # # ]: 0 : ereport(ERROR,
1292 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1293 : : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1294 : : session_replication_state->roident),
1295 : : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1296 : : errhint("Reset the replication origin in all other processes before retrying.")));
1297 : :
1298 : 0 : replorigin_session_reset_internal();
1299 : 0 : }
1300 : :
1301 : : /*
1302 : : * Do the same work replorigin_advance() does, just on the session's
1303 : : * configured origin.
1304 : : *
1305 : : * This is noticeably cheaper than using replorigin_advance().
1306 : : */
1307 : : void
1308 : 0 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1309 : : {
1310 [ # # ]: 0 : Assert(session_replication_state != NULL);
1311 [ # # ]: 0 : Assert(session_replication_state->roident != InvalidRepOriginId);
1312 : :
1313 : 0 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1314 [ # # ]: 0 : if (session_replication_state->local_lsn < local_commit)
1315 : 0 : session_replication_state->local_lsn = local_commit;
1316 [ # # ]: 0 : if (session_replication_state->remote_lsn < remote_commit)
1317 : 0 : session_replication_state->remote_lsn = remote_commit;
1318 : 0 : LWLockRelease(&session_replication_state->lock);
1319 : 0 : }
1320 : :
1321 : : /*
1322 : : * Ask the machinery about the point up to which we successfully replayed
1323 : : * changes from an already setup replication origin.
1324 : : */
1325 : : XLogRecPtr
1326 : 0 : replorigin_session_get_progress(bool flush)
1327 : : {
1328 : 0 : XLogRecPtr remote_lsn;
1329 : 0 : XLogRecPtr local_lsn;
1330 : :
1331 [ # # ]: 0 : Assert(session_replication_state != NULL);
1332 : :
1333 : 0 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1334 : 0 : remote_lsn = session_replication_state->remote_lsn;
1335 : 0 : local_lsn = session_replication_state->local_lsn;
1336 : 0 : LWLockRelease(&session_replication_state->lock);
1337 : :
1338 [ # # # # ]: 0 : if (flush && XLogRecPtrIsValid(local_lsn))
1339 : 0 : XLogFlush(local_lsn);
1340 : :
1341 : 0 : return remote_lsn;
1342 : 0 : }
1343 : :
1344 : :
1345 : :
1346 : : /* ---------------------------------------------------------------------------
1347 : : * SQL functions for working with replication origin.
1348 : : *
1349 : : * These mostly should be fairly short wrappers around more generic functions.
1350 : : * ---------------------------------------------------------------------------
1351 : : */
1352 : :
1353 : : /*
1354 : : * Create replication origin for the passed in name, and return the assigned
1355 : : * oid.
1356 : : */
1357 : : Datum
1358 : 1 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1359 : : {
1360 : 1 : char *name;
1361 : 1 : RepOriginId roident;
1362 : :
1363 : 1 : replorigin_check_prerequisites(false, false);
1364 : :
1365 : 1 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1366 : :
1367 : : /*
1368 : : * Replication origins "any and "none" are reserved for system options.
1369 : : * The origins "pg_xxx" are reserved for internal use.
1370 : : */
1371 [ + - ]: 1 : if (IsReservedName(name) || IsReservedOriginName(name))
1372 [ # # # # ]: 0 : ereport(ERROR,
1373 : : (errcode(ERRCODE_RESERVED_NAME),
1374 : : errmsg("replication origin name \"%s\" is reserved",
1375 : : name),
1376 : : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1377 : : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1378 : :
1379 : : /*
1380 : : * If built with appropriate switch, whine when regression-testing
1381 : : * conventions for replication origin names are violated.
1382 : : */
1383 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1384 : : if (strncmp(name, "regress_", 8) != 0)
1385 : : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1386 : : #endif
1387 : :
1388 : 1 : roident = replorigin_create(name);
1389 : :
1390 : 1 : pfree(name);
1391 : :
1392 : 2 : PG_RETURN_OID(roident);
1393 : 1 : }
1394 : :
1395 : : /*
1396 : : * Drop replication origin.
1397 : : */
1398 : : Datum
1399 : 0 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1400 : : {
1401 : 0 : char *name;
1402 : :
1403 : 0 : replorigin_check_prerequisites(false, false);
1404 : :
1405 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1406 : :
1407 : 0 : replorigin_drop_by_name(name, false, true);
1408 : :
1409 : 0 : pfree(name);
1410 : :
1411 : 0 : PG_RETURN_VOID();
1412 : 0 : }
1413 : :
1414 : : /*
1415 : : * Return oid of a replication origin.
1416 : : */
1417 : : Datum
1418 : 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1419 : : {
1420 : 0 : char *name;
1421 : 0 : RepOriginId roident;
1422 : :
1423 : 0 : replorigin_check_prerequisites(false, false);
1424 : :
1425 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1426 : 0 : roident = replorigin_by_name(name, true);
1427 : :
1428 : 0 : pfree(name);
1429 : :
1430 [ # # ]: 0 : if (OidIsValid(roident))
1431 : 0 : PG_RETURN_OID(roident);
1432 : 0 : PG_RETURN_NULL();
1433 [ # # ]: 0 : }
1434 : :
1435 : : /*
1436 : : * Setup a replication origin for this session.
1437 : : */
1438 : : Datum
1439 : 0 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1440 : : {
1441 : 0 : char *name;
1442 : 0 : RepOriginId origin;
1443 : 0 : int pid;
1444 : :
1445 : 0 : replorigin_check_prerequisites(true, false);
1446 : :
1447 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1448 : 0 : origin = replorigin_by_name(name, false);
1449 : 0 : pid = PG_GETARG_INT32(1);
1450 : 0 : replorigin_session_setup(origin, pid);
1451 : :
1452 : 0 : replorigin_session_origin = origin;
1453 : :
1454 : 0 : pfree(name);
1455 : :
1456 : 0 : PG_RETURN_VOID();
1457 : 0 : }
1458 : :
1459 : : /*
1460 : : * Reset previously setup origin in this session
1461 : : */
1462 : : Datum
1463 : 0 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1464 : : {
1465 : 0 : replorigin_check_prerequisites(true, false);
1466 : :
1467 : 0 : replorigin_session_reset();
1468 : :
1469 : 0 : replorigin_session_origin = InvalidRepOriginId;
1470 : 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1471 : 0 : replorigin_session_origin_timestamp = 0;
1472 : :
1473 : 0 : PG_RETURN_VOID();
1474 : : }
1475 : :
1476 : : /*
1477 : : * Has a replication origin been setup for this session.
1478 : : */
1479 : : Datum
1480 : 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1481 : : {
1482 : 0 : replorigin_check_prerequisites(false, false);
1483 : :
1484 : 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1485 : : }
1486 : :
1487 : :
1488 : : /*
1489 : : * Return the replication progress for origin setup in the current session.
1490 : : *
1491 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1492 : : * to a local transaction that has been flushed. This is useful if asynchronous
1493 : : * commits are used when replaying replicated transactions.
1494 : : */
1495 : : Datum
1496 : 0 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1497 : : {
1498 : 0 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1499 : 0 : bool flush = PG_GETARG_BOOL(0);
1500 : :
1501 : 0 : replorigin_check_prerequisites(true, false);
1502 : :
1503 [ # # ]: 0 : if (session_replication_state == NULL)
1504 [ # # # # ]: 0 : ereport(ERROR,
1505 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1506 : : errmsg("no replication origin is configured")));
1507 : :
1508 : 0 : remote_lsn = replorigin_session_get_progress(flush);
1509 : :
1510 [ # # ]: 0 : if (!XLogRecPtrIsValid(remote_lsn))
1511 : 0 : PG_RETURN_NULL();
1512 : :
1513 : 0 : PG_RETURN_LSN(remote_lsn);
1514 : 0 : }
1515 : :
1516 : : Datum
1517 : 0 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1518 : : {
1519 : 0 : XLogRecPtr location = PG_GETARG_LSN(0);
1520 : :
1521 : 0 : replorigin_check_prerequisites(true, false);
1522 : :
1523 [ # # ]: 0 : if (session_replication_state == NULL)
1524 [ # # # # ]: 0 : ereport(ERROR,
1525 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1526 : : errmsg("no replication origin is configured")));
1527 : :
1528 : 0 : replorigin_session_origin_lsn = location;
1529 : 0 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1530 : :
1531 : 0 : PG_RETURN_VOID();
1532 : 0 : }
1533 : :
1534 : : Datum
1535 : 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1536 : : {
1537 : 0 : replorigin_check_prerequisites(true, false);
1538 : :
1539 : 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1540 : 0 : replorigin_session_origin_timestamp = 0;
1541 : :
1542 : 0 : PG_RETURN_VOID();
1543 : : }
1544 : :
1545 : :
1546 : : Datum
1547 : 0 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1548 : : {
1549 : 0 : text *name = PG_GETARG_TEXT_PP(0);
1550 : 0 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1551 : 0 : RepOriginId node;
1552 : :
1553 : 0 : replorigin_check_prerequisites(true, false);
1554 : :
1555 : : /* lock to prevent the replication origin from vanishing */
1556 : 0 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1557 : :
1558 : 0 : node = replorigin_by_name(text_to_cstring(name), false);
1559 : :
1560 : : /*
1561 : : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1562 : : * xact hasn't committed yet. This is why this function should be used to
1563 : : * set up the initial replication state, but not for replay.
1564 : : */
1565 : 0 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1566 : : true /* go backward */ , true /* WAL log */ );
1567 : :
1568 : 0 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1569 : :
1570 : 0 : PG_RETURN_VOID();
1571 : 0 : }
1572 : :
1573 : :
1574 : : /*
1575 : : * Return the replication progress for an individual replication origin.
1576 : : *
1577 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1578 : : * to a local transaction that has been flushed. This is useful if asynchronous
1579 : : * commits are used when replaying replicated transactions.
1580 : : */
1581 : : Datum
1582 : 0 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1583 : : {
1584 : 0 : char *name;
1585 : 0 : bool flush;
1586 : 0 : RepOriginId roident;
1587 : 0 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1588 : :
1589 : 0 : replorigin_check_prerequisites(true, true);
1590 : :
1591 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1592 : 0 : flush = PG_GETARG_BOOL(1);
1593 : :
1594 : 0 : roident = replorigin_by_name(name, false);
1595 [ # # ]: 0 : Assert(OidIsValid(roident));
1596 : :
1597 : 0 : remote_lsn = replorigin_get_progress(roident, flush);
1598 : :
1599 [ # # ]: 0 : if (!XLogRecPtrIsValid(remote_lsn))
1600 : 0 : PG_RETURN_NULL();
1601 : :
1602 : 0 : PG_RETURN_LSN(remote_lsn);
1603 : 0 : }
1604 : :
1605 : :
1606 : : Datum
1607 : 0 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1608 : : {
1609 : 0 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1610 : 0 : int i;
1611 : : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1612 : :
1613 : : /* we want to return 0 rows if slot is set to zero */
1614 : 0 : replorigin_check_prerequisites(false, true);
1615 : :
1616 : 0 : InitMaterializedSRF(fcinfo, 0);
1617 : :
1618 : : /* prevent slots from being concurrently dropped */
1619 : 0 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1620 : :
1621 : : /*
1622 : : * Iterate through all possible replication_states, display if they are
1623 : : * filled. Note that we do not take any locks, so slightly corrupted/out
1624 : : * of date values are a possibility.
1625 : : */
1626 [ # # ]: 0 : for (i = 0; i < max_active_replication_origins; i++)
1627 : : {
1628 : 0 : ReplicationState *state;
1629 : 0 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1630 : 0 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1631 : 0 : char *roname;
1632 : :
1633 : 0 : state = &replication_states[i];
1634 : :
1635 : : /* unused slot, nothing to display */
1636 [ # # ]: 0 : if (state->roident == InvalidRepOriginId)
1637 : 0 : continue;
1638 : :
1639 : 0 : memset(values, 0, sizeof(values));
1640 : 0 : memset(nulls, 1, sizeof(nulls));
1641 : :
1642 : 0 : values[0] = ObjectIdGetDatum(state->roident);
1643 : 0 : nulls[0] = false;
1644 : :
1645 : : /*
1646 : : * We're not preventing the origin to be dropped concurrently, so
1647 : : * silently accept that it might be gone.
1648 : : */
1649 [ # # ]: 0 : if (replorigin_by_oid(state->roident, true,
1650 : : &roname))
1651 : : {
1652 : 0 : values[1] = CStringGetTextDatum(roname);
1653 : 0 : nulls[1] = false;
1654 : 0 : }
1655 : :
1656 : 0 : LWLockAcquire(&state->lock, LW_SHARED);
1657 : :
1658 : 0 : values[2] = LSNGetDatum(state->remote_lsn);
1659 : 0 : nulls[2] = false;
1660 : :
1661 : 0 : values[3] = LSNGetDatum(state->local_lsn);
1662 : 0 : nulls[3] = false;
1663 : :
1664 : 0 : LWLockRelease(&state->lock);
1665 : :
1666 : 0 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1667 : 0 : values, nulls);
1668 [ # # # ]: 0 : }
1669 : :
1670 : 0 : LWLockRelease(ReplicationOriginLock);
1671 : :
1672 : : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1673 : :
1674 : 0 : return (Datum) 0;
1675 : 0 : }
|