Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * sequencesync.c
3 : : * PostgreSQL logical replication: sequence synchronization
4 : : *
5 : : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/sequencesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for sequence synchronization for
12 : : * logical replication.
13 : : *
14 : : * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 : : * catalog.
16 : : *
17 : : * Sequences to be synchronized will be added with state INIT when either of
18 : : * the following commands is executed:
19 : : * CREATE SUBSCRIPTION
20 : : * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 : : *
22 : : * Executing the following command resets all sequences in the subscription to
23 : : * state INIT, triggering re-synchronization:
24 : : * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 : : *
26 : : * The apply worker periodically scans pg_subscription_rel for sequences in
27 : : * INIT state. When such sequences are found, it spawns a sequencesync worker
28 : : * to handle synchronization.
29 : : *
30 : : * A single sequencesync worker is responsible for synchronizing all sequences.
31 : : * It begins by retrieving the list of sequences that are flagged for
32 : : * synchronization, i.e., those in the INIT state. These sequences are then
33 : : * processed in batches, allowing multiple entries to be synchronized within a
34 : : * single transaction. The worker fetches the current sequence values and page
35 : : * LSNs from the remote publisher, updates the corresponding sequences on the
36 : : * local subscriber, and finally marks each sequence as READY upon successful
37 : : * synchronization.
38 : : *
39 : : * Sequence state transitions follow this pattern:
40 : : * INIT -> READY
41 : : *
42 : : * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 : : * sequences are synchronized per transaction. The locks on the sequence
44 : : * relation will be periodically released at each transaction commit.
45 : : *
46 : : * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 : : * worker as it didn't have database connection to access the sequences from the
48 : : * pg_subscription_rel system catalog that need to be synchronized.
49 : : *-------------------------------------------------------------------------
50 : : */
51 : :
52 : : #include "postgres.h"
53 : :
54 : : #include "access/genam.h"
55 : : #include "access/table.h"
56 : : #include "catalog/pg_sequence.h"
57 : : #include "catalog/pg_subscription_rel.h"
58 : : #include "commands/sequence.h"
59 : : #include "pgstat.h"
60 : : #include "postmaster/interrupt.h"
61 : : #include "replication/logicalworker.h"
62 : : #include "replication/worker_internal.h"
63 : : #include "utils/acl.h"
64 : : #include "utils/builtins.h"
65 : : #include "utils/fmgroids.h"
66 : : #include "utils/guc.h"
67 : : #include "utils/inval.h"
68 : : #include "utils/lsyscache.h"
69 : : #include "utils/memutils.h"
70 : : #include "utils/pg_lsn.h"
71 : : #include "utils/syscache.h"
72 : : #include "utils/usercontext.h"
73 : :
74 : : #define REMOTE_SEQ_COL_COUNT 10
75 : :
76 : : typedef enum CopySeqResult
77 : : {
78 : : COPYSEQ_SUCCESS,
79 : : COPYSEQ_MISMATCH,
80 : : COPYSEQ_INSUFFICIENT_PERM,
81 : : COPYSEQ_SKIPPED
82 : : } CopySeqResult;
83 : :
84 : : static List *seqinfos = NIL;
85 : :
86 : : /*
87 : : * Apply worker determines if sequence synchronization is needed.
88 : : *
89 : : * Start a sequencesync worker if one is not already running. The active
90 : : * sequencesync worker will handle all pending sequence synchronization. If any
91 : : * sequences remain unsynchronized after it exits, a new worker can be started
92 : : * in the next iteration.
93 : : */
94 : : void
95 : 0 : ProcessSequencesForSync(void)
96 : : {
97 : 0 : LogicalRepWorker *sequencesync_worker;
98 : 0 : int nsyncworkers;
99 : 0 : bool has_pending_sequences;
100 : 0 : bool started_tx;
101 : :
102 : 0 : FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
103 : :
104 [ # # ]: 0 : if (started_tx)
105 : : {
106 : 0 : CommitTransactionCommand();
107 : 0 : pgstat_report_stat(true);
108 : 0 : }
109 : :
110 [ # # ]: 0 : if (!has_pending_sequences)
111 : 0 : return;
112 : :
113 : 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
114 : :
115 : : /* Check if there is a sequencesync worker already running? */
116 : 0 : sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
117 : 0 : MyLogicalRepWorker->subid,
118 : : InvalidOid, true);
119 [ # # ]: 0 : if (sequencesync_worker)
120 : : {
121 : 0 : LWLockRelease(LogicalRepWorkerLock);
122 : 0 : return;
123 : : }
124 : :
125 : : /*
126 : : * Count running sync workers for this subscription, while we have the
127 : : * lock.
128 : : */
129 : 0 : nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
130 : 0 : LWLockRelease(LogicalRepWorkerLock);
131 : :
132 : : /*
133 : : * It is okay to read/update last_seqsync_start_time here in apply worker
134 : : * as we have already ensured that sync worker doesn't exist.
135 : : */
136 : 0 : launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
137 : 0 : &MyLogicalRepWorker->last_seqsync_start_time);
138 [ # # ]: 0 : }
139 : :
140 : : /*
141 : : * get_sequences_string
142 : : *
143 : : * Build a comma-separated string of schema-qualified sequence names
144 : : * for the given list of sequence indexes.
145 : : */
146 : : static void
147 : 0 : get_sequences_string(List *seqindexes, StringInfo buf)
148 : : {
149 : 0 : resetStringInfo(buf);
150 [ # # # # : 0 : foreach_int(seqidx, seqindexes)
# # # # ]
151 : : {
152 : 0 : LogicalRepSequenceInfo *seqinfo =
153 : 0 : (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
154 : :
155 [ # # ]: 0 : if (buf->len > 0)
156 : 0 : appendStringInfoString(buf, ", ");
157 : :
158 : 0 : appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
159 : 0 : }
160 : 0 : }
161 : :
162 : : /*
163 : : * report_sequence_errors
164 : : *
165 : : * Report discrepancies found during sequence synchronization between
166 : : * the publisher and subscriber. Emits warnings for:
167 : : * a) mismatched definitions or concurrent rename
168 : : * b) insufficient privileges
169 : : * c) missing sequences on the subscriber
170 : : * Then raises an ERROR to indicate synchronization failure.
171 : : */
172 : : static void
173 : 0 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
174 : : List *missing_seqs_idx)
175 : : {
176 : 0 : StringInfo seqstr;
177 : :
178 : : /* Quick exit if there are no errors to report */
179 [ # # ]: 0 : if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
180 : 0 : return;
181 : :
182 : 0 : seqstr = makeStringInfo();
183 : :
184 [ # # ]: 0 : if (mismatched_seqs_idx)
185 : : {
186 : 0 : get_sequences_string(mismatched_seqs_idx, seqstr);
187 [ # # # # ]: 0 : ereport(WARNING,
188 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189 : : errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
190 : : "mismatched or renamed sequences on subscriber (%s)",
191 : : list_length(mismatched_seqs_idx),
192 : : seqstr->data));
193 : 0 : }
194 : :
195 [ # # ]: 0 : if (insuffperm_seqs_idx)
196 : : {
197 : 0 : get_sequences_string(insuffperm_seqs_idx, seqstr);
198 [ # # # # ]: 0 : ereport(WARNING,
199 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
200 : : errmsg_plural("insufficient privileges on sequence (%s)",
201 : : "insufficient privileges on sequences (%s)",
202 : : list_length(insuffperm_seqs_idx),
203 : : seqstr->data));
204 : 0 : }
205 : :
206 [ # # ]: 0 : if (missing_seqs_idx)
207 : : {
208 : 0 : get_sequences_string(missing_seqs_idx, seqstr);
209 [ # # # # ]: 0 : ereport(WARNING,
210 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
211 : : errmsg_plural("missing sequence on publisher (%s)",
212 : : "missing sequences on publisher (%s)",
213 : : list_length(missing_seqs_idx),
214 : : seqstr->data));
215 : 0 : }
216 : :
217 [ # # # # ]: 0 : ereport(ERROR,
218 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
219 : : errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
220 : : MySubscription->name));
221 [ # # ]: 0 : }
222 : :
223 : : /*
224 : : * get_and_validate_seq_info
225 : : *
226 : : * Extracts remote sequence information from the tuple slot received from the
227 : : * publisher, and validates it against the corresponding local sequence
228 : : * definition.
229 : : */
230 : : static CopySeqResult
231 : 0 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
232 : : LogicalRepSequenceInfo **seqinfo, int *seqidx)
233 : : {
234 : 0 : bool isnull;
235 : 0 : int col = 0;
236 : 0 : Datum datum;
237 : 0 : Oid remote_typid;
238 : 0 : int64 remote_start;
239 : 0 : int64 remote_increment;
240 : 0 : int64 remote_min;
241 : 0 : int64 remote_max;
242 : 0 : bool remote_cycle;
243 : 0 : CopySeqResult result = COPYSEQ_SUCCESS;
244 : 0 : HeapTuple tup;
245 : 0 : Form_pg_sequence local_seq;
246 : 0 : LogicalRepSequenceInfo *seqinfo_local;
247 : :
248 : 0 : *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
249 [ # # ]: 0 : Assert(!isnull);
250 : :
251 : : /* Identify the corresponding local sequence for the given index. */
252 : 0 : *seqinfo = seqinfo_local =
253 : 0 : (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
254 : :
255 : : /*
256 : : * last_value can be NULL if the sequence was dropped concurrently (see
257 : : * pg_get_sequence_data()).
258 : : */
259 : 0 : datum = slot_getattr(slot, ++col, &isnull);
260 [ # # ]: 0 : if (isnull)
261 : 0 : return COPYSEQ_SKIPPED;
262 : 0 : seqinfo_local->last_value = DatumGetInt64(datum);
263 : :
264 : 0 : seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
265 [ # # ]: 0 : Assert(!isnull);
266 : :
267 : 0 : seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
268 [ # # ]: 0 : Assert(!isnull);
269 : :
270 : 0 : remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
271 [ # # ]: 0 : Assert(!isnull);
272 : :
273 : 0 : remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
274 [ # # ]: 0 : Assert(!isnull);
275 : :
276 : 0 : remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
277 [ # # ]: 0 : Assert(!isnull);
278 : :
279 : 0 : remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
280 [ # # ]: 0 : Assert(!isnull);
281 : :
282 : 0 : remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
283 [ # # ]: 0 : Assert(!isnull);
284 : :
285 : 0 : remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
286 [ # # ]: 0 : Assert(!isnull);
287 : :
288 : : /* Sanity check */
289 [ # # ]: 0 : Assert(col == REMOTE_SEQ_COL_COUNT);
290 : :
291 : 0 : seqinfo_local->found_on_pub = true;
292 : :
293 : 0 : *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
294 : :
295 : : /* Sequence was concurrently dropped? */
296 [ # # ]: 0 : if (!*sequence_rel)
297 : 0 : return COPYSEQ_SKIPPED;
298 : :
299 : 0 : tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
300 : :
301 : : /* Sequence was concurrently dropped? */
302 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
303 [ # # # # ]: 0 : elog(ERROR, "cache lookup failed for sequence %u",
304 : : seqinfo_local->localrelid);
305 : :
306 : 0 : local_seq = (Form_pg_sequence) GETSTRUCT(tup);
307 : :
308 : : /* Sequence parameters for remote/local are the same? */
309 [ # # ]: 0 : if (local_seq->seqtypid != remote_typid ||
310 [ # # ]: 0 : local_seq->seqstart != remote_start ||
311 [ # # ]: 0 : local_seq->seqincrement != remote_increment ||
312 [ # # ]: 0 : local_seq->seqmin != remote_min ||
313 [ # # # # ]: 0 : local_seq->seqmax != remote_max ||
314 : 0 : local_seq->seqcycle != remote_cycle)
315 : 0 : result = COPYSEQ_MISMATCH;
316 : :
317 : : /* Sequence was concurrently renamed? */
318 : 0 : if (strcmp(seqinfo_local->nspname,
319 [ # # # # : 0 : get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
# # ]
320 : 0 : strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
321 : 0 : result = COPYSEQ_MISMATCH;
322 : :
323 : 0 : ReleaseSysCache(tup);
324 : 0 : return result;
325 : 0 : }
326 : :
327 : : /*
328 : : * Apply remote sequence state to local sequence and mark it as
329 : : * synchronized (READY).
330 : : */
331 : : static CopySeqResult
332 : 0 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
333 : : {
334 : 0 : UserContext ucxt;
335 : 0 : AclResult aclresult;
336 : 0 : bool run_as_owner = MySubscription->runasowner;
337 : 0 : Oid seqoid = seqinfo->localrelid;
338 : :
339 : : /*
340 : : * If the user did not opt to run as the owner of the subscription
341 : : * ('run_as_owner'), then copy the sequence as the owner of the sequence.
342 : : */
343 [ # # ]: 0 : if (!run_as_owner)
344 : 0 : SwitchToUntrustedUser(seqowner, &ucxt);
345 : :
346 : 0 : aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
347 : :
348 [ # # ]: 0 : if (aclresult != ACLCHECK_OK)
349 : : {
350 [ # # ]: 0 : if (!run_as_owner)
351 : 0 : RestoreUserContext(&ucxt);
352 : :
353 : 0 : return COPYSEQ_INSUFFICIENT_PERM;
354 : : }
355 : :
356 : : /*
357 : : * The log counter (log_cnt) tracks how many sequence values are still
358 : : * unused locally. It is only relevant to the local node and managed
359 : : * internally by nextval() when allocating new ranges. Since log_cnt does
360 : : * not affect the visible sequence state (like last_value or is_called)
361 : : * and is only used for local caching, it need not be copied to the
362 : : * subscriber during synchronization.
363 : : */
364 : 0 : SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
365 : :
366 [ # # ]: 0 : if (!run_as_owner)
367 : 0 : RestoreUserContext(&ucxt);
368 : :
369 : : /*
370 : : * Record the remote sequence's LSN in pg_subscription_rel and mark the
371 : : * sequence as READY.
372 : : */
373 : 0 : UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
374 : 0 : seqinfo->page_lsn, false);
375 : :
376 : 0 : return COPYSEQ_SUCCESS;
377 : 0 : }
378 : :
379 : : /*
380 : : * Copy existing data of sequences from the publisher.
381 : : */
382 : : static void
383 : 0 : copy_sequences(WalReceiverConn *conn)
384 : : {
385 : 0 : int cur_batch_base_index = 0;
386 : 0 : int n_seqinfos = list_length(seqinfos);
387 : 0 : List *mismatched_seqs_idx = NIL;
388 : 0 : List *missing_seqs_idx = NIL;
389 : 0 : List *insuffperm_seqs_idx = NIL;
390 : 0 : StringInfo seqstr = makeStringInfo();
391 : 0 : StringInfo cmd = makeStringInfo();
392 : 0 : MemoryContext oldctx;
393 : :
394 : : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
395 : :
396 [ # # # # ]: 0 : elog(DEBUG1,
397 : : "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
398 : : MySubscription->name, n_seqinfos);
399 : :
400 [ # # ]: 0 : while (cur_batch_base_index < n_seqinfos)
401 : : {
402 : 0 : Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
403 : : BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
404 : 0 : int batch_size = 0;
405 : 0 : int batch_succeeded_count = 0;
406 : 0 : int batch_mismatched_count = 0;
407 : 0 : int batch_skipped_count = 0;
408 : 0 : int batch_insuffperm_count = 0;
409 : 0 : int batch_missing_count;
410 : 0 : Relation sequence_rel = NULL;
411 : :
412 : 0 : WalRcvExecResult *res;
413 : 0 : TupleTableSlot *slot;
414 : :
415 : 0 : StartTransactionCommand();
416 : :
417 [ # # ]: 0 : for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
418 : : {
419 : 0 : char *nspname_literal;
420 : 0 : char *seqname_literal;
421 : :
422 : 0 : LogicalRepSequenceInfo *seqinfo =
423 : 0 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
424 : :
425 [ # # ]: 0 : if (seqstr->len > 0)
426 : 0 : appendStringInfoString(seqstr, ", ");
427 : :
428 : 0 : nspname_literal = quote_literal_cstr(seqinfo->nspname);
429 : 0 : seqname_literal = quote_literal_cstr(seqinfo->seqname);
430 : :
431 : 0 : appendStringInfo(seqstr, "(%s, %s, %d)",
432 : 0 : nspname_literal, seqname_literal, idx);
433 : :
434 [ # # ]: 0 : if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
435 : 0 : break;
436 [ # # ]: 0 : }
437 : :
438 : : /*
439 : : * We deliberately avoid acquiring a local lock on the sequence before
440 : : * querying the publisher to prevent potential distributed deadlocks
441 : : * in bi-directional replication setups.
442 : : *
443 : : * Example scenario:
444 : : *
445 : : * - On each node, a background worker acquires a lock on a sequence
446 : : * as part of a sync operation.
447 : : *
448 : : * - Concurrently, a user transaction attempts to alter the same
449 : : * sequence, waiting on the background worker's lock.
450 : : *
451 : : * - Meanwhile, a query from the other node tries to access metadata
452 : : * that depends on the completion of the alter operation.
453 : : *
454 : : * - This creates a circular wait across nodes:
455 : : *
456 : : * Node-1: Query -> waits on Alter -> waits on Sync Worker
457 : : *
458 : : * Node-2: Query -> waits on Alter -> waits on Sync Worker
459 : : *
460 : : * Since each node only sees part of the wait graph, the deadlock may
461 : : * go undetected, leading to indefinite blocking.
462 : : *
463 : : * Note: Each entry in VALUES includes an index 'seqidx' that
464 : : * represents the sequence's position in the local 'seqinfos' list.
465 : : * This index is propagated to the query results and later used to
466 : : * directly map the fetched publisher sequence rows back to their
467 : : * corresponding local entries without relying on result order or name
468 : : * matching.
469 : : */
470 : 0 : appendStringInfo(cmd,
471 : : "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
472 : : " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
473 : : " seq.seqmax, seq.seqcycle\n"
474 : : "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
475 : : "JOIN pg_namespace n ON n.nspname = s.schname\n"
476 : : "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
477 : : "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
478 : : "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
479 : 0 : seqstr->data);
480 : :
481 : 0 : res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
482 [ # # ]: 0 : if (res->status != WALRCV_OK_TUPLES)
483 [ # # # # ]: 0 : ereport(ERROR,
484 : : errcode(ERRCODE_CONNECTION_FAILURE),
485 : : errmsg("could not fetch sequence information from the publisher: %s",
486 : : res->err));
487 : :
488 : 0 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
489 [ # # ]: 0 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
490 : : {
491 : 0 : CopySeqResult sync_status;
492 : 0 : LogicalRepSequenceInfo *seqinfo;
493 : 0 : int seqidx;
494 : :
495 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
496 : :
497 [ # # ]: 0 : if (ConfigReloadPending)
498 : : {
499 : 0 : ConfigReloadPending = false;
500 : 0 : ProcessConfigFile(PGC_SIGHUP);
501 : 0 : }
502 : :
503 : 0 : sync_status = get_and_validate_seq_info(slot, &sequence_rel,
504 : : &seqinfo, &seqidx);
505 [ # # ]: 0 : if (sync_status == COPYSEQ_SUCCESS)
506 : 0 : sync_status = copy_sequence(seqinfo,
507 : 0 : sequence_rel->rd_rel->relowner);
508 : :
509 [ # # # # : 0 : switch (sync_status)
# ]
510 : : {
511 : : case COPYSEQ_SUCCESS:
512 [ # # # # ]: 0 : elog(DEBUG1,
513 : : "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
514 : : MySubscription->name, seqinfo->nspname,
515 : : seqinfo->seqname);
516 : 0 : batch_succeeded_count++;
517 : 0 : break;
518 : : case COPYSEQ_MISMATCH:
519 : :
520 : : /*
521 : : * Remember mismatched sequences in a long-lived memory
522 : : * context since these will be used after the transaction
523 : : * is committed.
524 : : */
525 : 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
526 : 0 : mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
527 : 0 : seqidx);
528 : 0 : MemoryContextSwitchTo(oldctx);
529 : 0 : batch_mismatched_count++;
530 : 0 : break;
531 : : case COPYSEQ_INSUFFICIENT_PERM:
532 : :
533 : : /*
534 : : * Remember sequences with insufficient privileges in a
535 : : * long-lived memory context since these will be used
536 : : * after the transaction is committed.
537 : : */
538 : 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
539 : 0 : insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
540 : 0 : seqidx);
541 : 0 : MemoryContextSwitchTo(oldctx);
542 : 0 : batch_insuffperm_count++;
543 : 0 : break;
544 : : case COPYSEQ_SKIPPED:
545 : :
546 : : /*
547 : : * Concurrent removal of a sequence on the subscriber is
548 : : * treated as success, since the only viable action is to
549 : : * skip the corresponding sequence data. Missing sequences
550 : : * on the publisher are treated as ERROR.
551 : : */
552 [ # # ]: 0 : if (seqinfo->found_on_pub)
553 : : {
554 [ # # # # ]: 0 : ereport(LOG,
555 : : errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
556 : : seqinfo->nspname,
557 : : seqinfo->seqname));
558 : 0 : batch_skipped_count++;
559 : 0 : }
560 : 0 : break;
561 : : }
562 : :
563 [ # # ]: 0 : if (sequence_rel)
564 : 0 : table_close(sequence_rel, NoLock);
565 : 0 : }
566 : :
567 : 0 : ExecDropSingleTupleTableSlot(slot);
568 : 0 : walrcv_clear_result(res);
569 : 0 : resetStringInfo(seqstr);
570 : 0 : resetStringInfo(cmd);
571 : :
572 : 0 : batch_missing_count = batch_size - (batch_succeeded_count +
573 : 0 : batch_mismatched_count +
574 : 0 : batch_insuffperm_count +
575 : 0 : batch_skipped_count);
576 : :
577 [ # # # # ]: 0 : elog(DEBUG1,
578 : : "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
579 : : MySubscription->name,
580 : : (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
581 : : batch_size, batch_succeeded_count, batch_mismatched_count,
582 : : batch_insuffperm_count, batch_missing_count, batch_skipped_count);
583 : :
584 : : /* Commit this batch, and prepare for next batch */
585 : 0 : CommitTransactionCommand();
586 : :
587 [ # # ]: 0 : if (batch_missing_count)
588 : : {
589 [ # # ]: 0 : for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
590 : : {
591 : 0 : LogicalRepSequenceInfo *seqinfo =
592 : 0 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
593 : :
594 : : /* If the sequence was not found on publisher, record it */
595 [ # # ]: 0 : if (!seqinfo->found_on_pub)
596 : 0 : missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
597 : 0 : }
598 : 0 : }
599 : :
600 : : /*
601 : : * cur_batch_base_index is not incremented sequentially because some
602 : : * sequences may be missing, and the number of fetched rows may not
603 : : * match the batch size.
604 : : */
605 : 0 : cur_batch_base_index += batch_size;
606 : 0 : }
607 : :
608 : : /* Report mismatches, permission issues, or missing sequences */
609 : 0 : report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
610 : 0 : missing_seqs_idx);
611 : 0 : }
612 : :
613 : : /*
614 : : * Identifies sequences that require synchronization and initiates the
615 : : * synchronization process.
616 : : */
617 : : static void
618 : 0 : LogicalRepSyncSequences(void)
619 : : {
620 : 0 : char *err;
621 : 0 : bool must_use_password;
622 : 0 : Relation rel;
623 : 0 : HeapTuple tup;
624 : 0 : ScanKeyData skey[2];
625 : 0 : SysScanDesc scan;
626 : 0 : Oid subid = MyLogicalRepWorker->subid;
627 : 0 : StringInfoData app_name;
628 : :
629 : 0 : StartTransactionCommand();
630 : :
631 : 0 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
632 : :
633 : 0 : ScanKeyInit(&skey[0],
634 : : Anum_pg_subscription_rel_srsubid,
635 : : BTEqualStrategyNumber, F_OIDEQ,
636 : 0 : ObjectIdGetDatum(subid));
637 : :
638 : 0 : ScanKeyInit(&skey[1],
639 : : Anum_pg_subscription_rel_srsubstate,
640 : : BTEqualStrategyNumber, F_CHAREQ,
641 : 0 : CharGetDatum(SUBREL_STATE_INIT));
642 : :
643 : 0 : scan = systable_beginscan(rel, InvalidOid, false,
644 : 0 : NULL, 2, skey);
645 [ # # ]: 0 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
646 : : {
647 : 0 : Form_pg_subscription_rel subrel;
648 : 0 : LogicalRepSequenceInfo *seq;
649 : 0 : Relation sequence_rel;
650 : 0 : MemoryContext oldctx;
651 : :
652 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
653 : :
654 : 0 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
655 : :
656 : 0 : sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
657 : :
658 : : /* Skip if sequence was dropped concurrently */
659 [ # # ]: 0 : if (!sequence_rel)
660 : 0 : continue;
661 : :
662 : : /* Skip if the relation is not a sequence */
663 [ # # ]: 0 : if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
664 : : {
665 : 0 : table_close(sequence_rel, NoLock);
666 : 0 : continue;
667 : : }
668 : :
669 : : /*
670 : : * Worker needs to process sequences across transaction boundary, so
671 : : * allocate them under long-lived context.
672 : : */
673 : 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
674 : :
675 : 0 : seq = palloc0_object(LogicalRepSequenceInfo);
676 : 0 : seq->localrelid = subrel->srrelid;
677 : 0 : seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
678 : 0 : seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
679 : 0 : seqinfos = lappend(seqinfos, seq);
680 : :
681 : 0 : MemoryContextSwitchTo(oldctx);
682 : :
683 : 0 : table_close(sequence_rel, NoLock);
684 [ # # ]: 0 : }
685 : :
686 : : /* Cleanup */
687 : 0 : systable_endscan(scan);
688 : 0 : table_close(rel, AccessShareLock);
689 : :
690 : 0 : CommitTransactionCommand();
691 : :
692 : : /*
693 : : * Exit early if no catalog entries found, likely due to concurrent drops.
694 : : */
695 [ # # ]: 0 : if (!seqinfos)
696 : 0 : return;
697 : :
698 : : /* Is the use of a password mandatory? */
699 [ # # ]: 0 : must_use_password = MySubscription->passwordrequired &&
700 : 0 : !MySubscription->ownersuperuser;
701 : :
702 : 0 : initStringInfo(&app_name);
703 : 0 : appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
704 : 0 : MySubscription->oid, GetSystemIdentifier());
705 : :
706 : : /*
707 : : * Establish the connection to the publisher for sequence synchronization.
708 : : */
709 : 0 : LogRepWorkerWalRcvConn =
710 : 0 : walrcv_connect(MySubscription->conninfo, true, true,
711 : : must_use_password,
712 : : app_name.data, &err);
713 [ # # ]: 0 : if (LogRepWorkerWalRcvConn == NULL)
714 [ # # # # ]: 0 : ereport(ERROR,
715 : : errcode(ERRCODE_CONNECTION_FAILURE),
716 : : errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
717 : : MySubscription->name, err));
718 : :
719 : 0 : pfree(app_name.data);
720 : :
721 : 0 : copy_sequences(LogRepWorkerWalRcvConn);
722 : 0 : }
723 : :
724 : : /*
725 : : * Execute the initial sync with error handling. Disable the subscription,
726 : : * if required.
727 : : *
728 : : * Note that we don't handle FATAL errors which are probably because of system
729 : : * resource error and are not repeatable.
730 : : */
731 : : static void
732 : 0 : start_sequence_sync(void)
733 : : {
734 [ # # ]: 0 : Assert(am_sequencesync_worker());
735 : :
736 [ # # ]: 0 : PG_TRY();
737 : : {
738 : : /* Call initial sync. */
739 : 0 : LogicalRepSyncSequences();
740 : : }
741 : 0 : PG_CATCH();
742 : : {
743 [ # # ]: 0 : if (MySubscription->disableonerr)
744 : 0 : DisableSubscriptionAndExit();
745 : : else
746 : : {
747 : : /*
748 : : * Report the worker failed during sequence synchronization. Abort
749 : : * the current transaction so that the stats message is sent in an
750 : : * idle state.
751 : : */
752 : 0 : AbortOutOfAnyTransaction();
753 : 0 : pgstat_report_subscription_error(MySubscription->oid,
754 : : WORKERTYPE_SEQUENCESYNC);
755 : :
756 : 0 : PG_RE_THROW();
757 : : }
758 : : }
759 [ # # ]: 0 : PG_END_TRY();
760 : 0 : }
761 : :
762 : : /* Logical Replication sequencesync worker entry point */
763 : : void
764 : 0 : SequenceSyncWorkerMain(Datum main_arg)
765 : : {
766 : 0 : int worker_slot = DatumGetInt32(main_arg);
767 : :
768 : 0 : SetupApplyOrSyncWorker(worker_slot);
769 : :
770 : 0 : start_sequence_sync();
771 : :
772 : 0 : FinishSyncWorker();
773 : : }
|