LCOV - code coverage report
Current view: top level - src/backend/replication/logical - sequencesync.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 307 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 9 0
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 0.0 % 183 0

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  * sequencesync.c
       3                 :             :  *        PostgreSQL logical replication: sequence synchronization
       4                 :             :  *
       5                 :             :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       6                 :             :  *
       7                 :             :  * IDENTIFICATION
       8                 :             :  *        src/backend/replication/logical/sequencesync.c
       9                 :             :  *
      10                 :             :  * NOTES
      11                 :             :  *        This file contains code for sequence synchronization for
      12                 :             :  *        logical replication.
      13                 :             :  *
      14                 :             :  * Sequences requiring synchronization are tracked in the pg_subscription_rel
      15                 :             :  * catalog.
      16                 :             :  *
      17                 :             :  * Sequences to be synchronized will be added with state INIT when either of
      18                 :             :  * the following commands is executed:
      19                 :             :  * CREATE SUBSCRIPTION
      20                 :             :  * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
      21                 :             :  *
      22                 :             :  * Executing the following command resets all sequences in the subscription to
      23                 :             :  * state INIT, triggering re-synchronization:
      24                 :             :  * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
      25                 :             :  *
      26                 :             :  * The apply worker periodically scans pg_subscription_rel for sequences in
      27                 :             :  * INIT state. When such sequences are found, it spawns a sequencesync worker
      28                 :             :  * to handle synchronization.
      29                 :             :  *
      30                 :             :  * A single sequencesync worker is responsible for synchronizing all sequences.
      31                 :             :  * It begins by retrieving the list of sequences that are flagged for
      32                 :             :  * synchronization, i.e., those in the INIT state. These sequences are then
      33                 :             :  * processed in batches, allowing multiple entries to be synchronized within a
      34                 :             :  * single transaction. The worker fetches the current sequence values and page
      35                 :             :  * LSNs from the remote publisher, updates the corresponding sequences on the
      36                 :             :  * local subscriber, and finally marks each sequence as READY upon successful
      37                 :             :  * synchronization.
      38                 :             :  *
      39                 :             :  * Sequence state transitions follow this pattern:
      40                 :             :  *   INIT -> READY
      41                 :             :  *
      42                 :             :  * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
      43                 :             :  * sequences are synchronized per transaction. The locks on the sequence
      44                 :             :  * relation will be periodically released at each transaction commit.
      45                 :             :  *
      46                 :             :  * XXX: We didn't choose launcher process to maintain the launch of sequencesync
      47                 :             :  * worker as it didn't have database connection to access the sequences from the
      48                 :             :  * pg_subscription_rel system catalog that need to be synchronized.
      49                 :             :  *-------------------------------------------------------------------------
      50                 :             :  */
      51                 :             : 
      52                 :             : #include "postgres.h"
      53                 :             : 
      54                 :             : #include "access/genam.h"
      55                 :             : #include "access/table.h"
      56                 :             : #include "catalog/pg_sequence.h"
      57                 :             : #include "catalog/pg_subscription_rel.h"
      58                 :             : #include "commands/sequence.h"
      59                 :             : #include "pgstat.h"
      60                 :             : #include "postmaster/interrupt.h"
      61                 :             : #include "replication/logicalworker.h"
      62                 :             : #include "replication/worker_internal.h"
      63                 :             : #include "utils/acl.h"
      64                 :             : #include "utils/builtins.h"
      65                 :             : #include "utils/fmgroids.h"
      66                 :             : #include "utils/guc.h"
      67                 :             : #include "utils/inval.h"
      68                 :             : #include "utils/lsyscache.h"
      69                 :             : #include "utils/memutils.h"
      70                 :             : #include "utils/pg_lsn.h"
      71                 :             : #include "utils/syscache.h"
      72                 :             : #include "utils/usercontext.h"
      73                 :             : 
      74                 :             : #define REMOTE_SEQ_COL_COUNT 10
      75                 :             : 
      76                 :             : typedef enum CopySeqResult
      77                 :             : {
      78                 :             :         COPYSEQ_SUCCESS,
      79                 :             :         COPYSEQ_MISMATCH,
      80                 :             :         COPYSEQ_INSUFFICIENT_PERM,
      81                 :             :         COPYSEQ_SKIPPED
      82                 :             : } CopySeqResult;
      83                 :             : 
      84                 :             : static List *seqinfos = NIL;
      85                 :             : 
      86                 :             : /*
      87                 :             :  * Apply worker determines if sequence synchronization is needed.
      88                 :             :  *
      89                 :             :  * Start a sequencesync worker if one is not already running. The active
      90                 :             :  * sequencesync worker will handle all pending sequence synchronization. If any
      91                 :             :  * sequences remain unsynchronized after it exits, a new worker can be started
      92                 :             :  * in the next iteration.
      93                 :             :  */
      94                 :             : void
      95                 :           0 : ProcessSequencesForSync(void)
      96                 :             : {
      97                 :           0 :         LogicalRepWorker *sequencesync_worker;
      98                 :           0 :         int                     nsyncworkers;
      99                 :           0 :         bool            has_pending_sequences;
     100                 :           0 :         bool            started_tx;
     101                 :             : 
     102                 :           0 :         FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
     103                 :             : 
     104         [ #  # ]:           0 :         if (started_tx)
     105                 :             :         {
     106                 :           0 :                 CommitTransactionCommand();
     107                 :           0 :                 pgstat_report_stat(true);
     108                 :           0 :         }
     109                 :             : 
     110         [ #  # ]:           0 :         if (!has_pending_sequences)
     111                 :           0 :                 return;
     112                 :             : 
     113                 :           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     114                 :             : 
     115                 :             :         /* Check if there is a sequencesync worker already running? */
     116                 :           0 :         sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
     117                 :           0 :                                                                                                  MyLogicalRepWorker->subid,
     118                 :             :                                                                                                  InvalidOid, true);
     119         [ #  # ]:           0 :         if (sequencesync_worker)
     120                 :             :         {
     121                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     122                 :           0 :                 return;
     123                 :             :         }
     124                 :             : 
     125                 :             :         /*
     126                 :             :          * Count running sync workers for this subscription, while we have the
     127                 :             :          * lock.
     128                 :             :          */
     129                 :           0 :         nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     130                 :           0 :         LWLockRelease(LogicalRepWorkerLock);
     131                 :             : 
     132                 :             :         /*
     133                 :             :          * It is okay to read/update last_seqsync_start_time here in apply worker
     134                 :             :          * as we have already ensured that sync worker doesn't exist.
     135                 :             :          */
     136                 :           0 :         launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
     137                 :           0 :                                            &MyLogicalRepWorker->last_seqsync_start_time);
     138         [ #  # ]:           0 : }
     139                 :             : 
     140                 :             : /*
     141                 :             :  * get_sequences_string
     142                 :             :  *
     143                 :             :  * Build a comma-separated string of schema-qualified sequence names
     144                 :             :  * for the given list of sequence indexes.
     145                 :             :  */
     146                 :             : static void
     147                 :           0 : get_sequences_string(List *seqindexes, StringInfo buf)
     148                 :             : {
     149                 :           0 :         resetStringInfo(buf);
     150   [ #  #  #  #  :           0 :         foreach_int(seqidx, seqindexes)
             #  #  #  # ]
     151                 :             :         {
     152                 :           0 :                 LogicalRepSequenceInfo *seqinfo =
     153                 :           0 :                         (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
     154                 :             : 
     155         [ #  # ]:           0 :                 if (buf->len > 0)
     156                 :           0 :                         appendStringInfoString(buf, ", ");
     157                 :             : 
     158                 :           0 :                 appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
     159                 :           0 :         }
     160                 :           0 : }
     161                 :             : 
     162                 :             : /*
     163                 :             :  * report_sequence_errors
     164                 :             :  *
     165                 :             :  * Report discrepancies found during sequence synchronization between
     166                 :             :  * the publisher and subscriber. Emits warnings for:
     167                 :             :  * a) mismatched definitions or concurrent rename
     168                 :             :  * b) insufficient privileges
     169                 :             :  * c) missing sequences on the subscriber
     170                 :             :  * Then raises an ERROR to indicate synchronization failure.
     171                 :             :  */
     172                 :             : static void
     173                 :           0 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
     174                 :             :                                            List *missing_seqs_idx)
     175                 :             : {
     176                 :           0 :         StringInfo      seqstr;
     177                 :             : 
     178                 :             :         /* Quick exit if there are no errors to report */
     179         [ #  # ]:           0 :         if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
     180                 :           0 :                 return;
     181                 :             : 
     182                 :           0 :         seqstr = makeStringInfo();
     183                 :             : 
     184         [ #  # ]:           0 :         if (mismatched_seqs_idx)
     185                 :             :         {
     186                 :           0 :                 get_sequences_string(mismatched_seqs_idx, seqstr);
     187   [ #  #  #  # ]:           0 :                 ereport(WARNING,
     188                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     189                 :             :                                 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
     190                 :             :                                                           "mismatched or renamed sequences on subscriber (%s)",
     191                 :             :                                                           list_length(mismatched_seqs_idx),
     192                 :             :                                                           seqstr->data));
     193                 :           0 :         }
     194                 :             : 
     195         [ #  # ]:           0 :         if (insuffperm_seqs_idx)
     196                 :             :         {
     197                 :           0 :                 get_sequences_string(insuffperm_seqs_idx, seqstr);
     198   [ #  #  #  # ]:           0 :                 ereport(WARNING,
     199                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     200                 :             :                                 errmsg_plural("insufficient privileges on sequence (%s)",
     201                 :             :                                                           "insufficient privileges on sequences (%s)",
     202                 :             :                                                           list_length(insuffperm_seqs_idx),
     203                 :             :                                                           seqstr->data));
     204                 :           0 :         }
     205                 :             : 
     206         [ #  # ]:           0 :         if (missing_seqs_idx)
     207                 :             :         {
     208                 :           0 :                 get_sequences_string(missing_seqs_idx, seqstr);
     209   [ #  #  #  # ]:           0 :                 ereport(WARNING,
     210                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     211                 :             :                                 errmsg_plural("missing sequence on publisher (%s)",
     212                 :             :                                                           "missing sequences on publisher (%s)",
     213                 :             :                                                           list_length(missing_seqs_idx),
     214                 :             :                                                           seqstr->data));
     215                 :           0 :         }
     216                 :             : 
     217   [ #  #  #  # ]:           0 :         ereport(ERROR,
     218                 :             :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     219                 :             :                         errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
     220                 :             :                                    MySubscription->name));
     221         [ #  # ]:           0 : }
     222                 :             : 
     223                 :             : /*
     224                 :             :  * get_and_validate_seq_info
     225                 :             :  *
     226                 :             :  * Extracts remote sequence information from the tuple slot received from the
     227                 :             :  * publisher, and validates it against the corresponding local sequence
     228                 :             :  * definition.
     229                 :             :  */
     230                 :             : static CopySeqResult
     231                 :           0 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
     232                 :             :                                                   LogicalRepSequenceInfo **seqinfo, int *seqidx)
     233                 :             : {
     234                 :           0 :         bool            isnull;
     235                 :           0 :         int                     col = 0;
     236                 :           0 :         Datum           datum;
     237                 :           0 :         Oid                     remote_typid;
     238                 :           0 :         int64           remote_start;
     239                 :           0 :         int64           remote_increment;
     240                 :           0 :         int64           remote_min;
     241                 :           0 :         int64           remote_max;
     242                 :           0 :         bool            remote_cycle;
     243                 :           0 :         CopySeqResult result = COPYSEQ_SUCCESS;
     244                 :           0 :         HeapTuple       tup;
     245                 :           0 :         Form_pg_sequence local_seq;
     246                 :           0 :         LogicalRepSequenceInfo *seqinfo_local;
     247                 :             : 
     248                 :           0 :         *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
     249         [ #  # ]:           0 :         Assert(!isnull);
     250                 :             : 
     251                 :             :         /* Identify the corresponding local sequence for the given index. */
     252                 :           0 :         *seqinfo = seqinfo_local =
     253                 :           0 :                 (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
     254                 :             : 
     255                 :             :         /*
     256                 :             :          * last_value can be NULL if the sequence was dropped concurrently (see
     257                 :             :          * pg_get_sequence_data()).
     258                 :             :          */
     259                 :           0 :         datum = slot_getattr(slot, ++col, &isnull);
     260         [ #  # ]:           0 :         if (isnull)
     261                 :           0 :                 return COPYSEQ_SKIPPED;
     262                 :           0 :         seqinfo_local->last_value = DatumGetInt64(datum);
     263                 :             : 
     264                 :           0 :         seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     265         [ #  # ]:           0 :         Assert(!isnull);
     266                 :             : 
     267                 :           0 :         seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
     268         [ #  # ]:           0 :         Assert(!isnull);
     269                 :             : 
     270                 :           0 :         remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
     271         [ #  # ]:           0 :         Assert(!isnull);
     272                 :             : 
     273                 :           0 :         remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     274         [ #  # ]:           0 :         Assert(!isnull);
     275                 :             : 
     276                 :           0 :         remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     277         [ #  # ]:           0 :         Assert(!isnull);
     278                 :             : 
     279                 :           0 :         remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     280         [ #  # ]:           0 :         Assert(!isnull);
     281                 :             : 
     282                 :           0 :         remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     283         [ #  # ]:           0 :         Assert(!isnull);
     284                 :             : 
     285                 :           0 :         remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     286         [ #  # ]:           0 :         Assert(!isnull);
     287                 :             : 
     288                 :             :         /* Sanity check */
     289         [ #  # ]:           0 :         Assert(col == REMOTE_SEQ_COL_COUNT);
     290                 :             : 
     291                 :           0 :         seqinfo_local->found_on_pub = true;
     292                 :             : 
     293                 :           0 :         *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
     294                 :             : 
     295                 :             :         /* Sequence was concurrently dropped? */
     296         [ #  # ]:           0 :         if (!*sequence_rel)
     297                 :           0 :                 return COPYSEQ_SKIPPED;
     298                 :             : 
     299                 :           0 :         tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
     300                 :             : 
     301                 :             :         /* Sequence was concurrently dropped? */
     302         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
     303   [ #  #  #  # ]:           0 :                 elog(ERROR, "cache lookup failed for sequence %u",
     304                 :             :                          seqinfo_local->localrelid);
     305                 :             : 
     306                 :           0 :         local_seq = (Form_pg_sequence) GETSTRUCT(tup);
     307                 :             : 
     308                 :             :         /* Sequence parameters for remote/local are the same? */
     309         [ #  # ]:           0 :         if (local_seq->seqtypid != remote_typid ||
     310         [ #  # ]:           0 :                 local_seq->seqstart != remote_start ||
     311         [ #  # ]:           0 :                 local_seq->seqincrement != remote_increment ||
     312         [ #  # ]:           0 :                 local_seq->seqmin != remote_min ||
     313   [ #  #  #  # ]:           0 :                 local_seq->seqmax != remote_max ||
     314                 :           0 :                 local_seq->seqcycle != remote_cycle)
     315                 :           0 :                 result = COPYSEQ_MISMATCH;
     316                 :             : 
     317                 :             :         /* Sequence was concurrently renamed? */
     318                 :           0 :         if (strcmp(seqinfo_local->nspname,
     319   [ #  #  #  #  :           0 :                            get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
                   #  # ]
     320                 :           0 :                 strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
     321                 :           0 :                 result = COPYSEQ_MISMATCH;
     322                 :             : 
     323                 :           0 :         ReleaseSysCache(tup);
     324                 :           0 :         return result;
     325                 :           0 : }
     326                 :             : 
     327                 :             : /*
     328                 :             :  * Apply remote sequence state to local sequence and mark it as
     329                 :             :  * synchronized (READY).
     330                 :             :  */
     331                 :             : static CopySeqResult
     332                 :           0 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
     333                 :             : {
     334                 :           0 :         UserContext ucxt;
     335                 :           0 :         AclResult       aclresult;
     336                 :           0 :         bool            run_as_owner = MySubscription->runasowner;
     337                 :           0 :         Oid                     seqoid = seqinfo->localrelid;
     338                 :             : 
     339                 :             :         /*
     340                 :             :          * If the user did not opt to run as the owner of the subscription
     341                 :             :          * ('run_as_owner'), then copy the sequence as the owner of the sequence.
     342                 :             :          */
     343         [ #  # ]:           0 :         if (!run_as_owner)
     344                 :           0 :                 SwitchToUntrustedUser(seqowner, &ucxt);
     345                 :             : 
     346                 :           0 :         aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
     347                 :             : 
     348         [ #  # ]:           0 :         if (aclresult != ACLCHECK_OK)
     349                 :             :         {
     350         [ #  # ]:           0 :                 if (!run_as_owner)
     351                 :           0 :                         RestoreUserContext(&ucxt);
     352                 :             : 
     353                 :           0 :                 return COPYSEQ_INSUFFICIENT_PERM;
     354                 :             :         }
     355                 :             : 
     356                 :             :         /*
     357                 :             :          * The log counter (log_cnt) tracks how many sequence values are still
     358                 :             :          * unused locally. It is only relevant to the local node and managed
     359                 :             :          * internally by nextval() when allocating new ranges. Since log_cnt does
     360                 :             :          * not affect the visible sequence state (like last_value or is_called)
     361                 :             :          * and is only used for local caching, it need not be copied to the
     362                 :             :          * subscriber during synchronization.
     363                 :             :          */
     364                 :           0 :         SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
     365                 :             : 
     366         [ #  # ]:           0 :         if (!run_as_owner)
     367                 :           0 :                 RestoreUserContext(&ucxt);
     368                 :             : 
     369                 :             :         /*
     370                 :             :          * Record the remote sequence's LSN in pg_subscription_rel and mark the
     371                 :             :          * sequence as READY.
     372                 :             :          */
     373                 :           0 :         UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
     374                 :           0 :                                                            seqinfo->page_lsn, false);
     375                 :             : 
     376                 :           0 :         return COPYSEQ_SUCCESS;
     377                 :           0 : }
     378                 :             : 
     379                 :             : /*
     380                 :             :  * Copy existing data of sequences from the publisher.
     381                 :             :  */
     382                 :             : static void
     383                 :           0 : copy_sequences(WalReceiverConn *conn)
     384                 :             : {
     385                 :           0 :         int                     cur_batch_base_index = 0;
     386                 :           0 :         int                     n_seqinfos = list_length(seqinfos);
     387                 :           0 :         List       *mismatched_seqs_idx = NIL;
     388                 :           0 :         List       *missing_seqs_idx = NIL;
     389                 :           0 :         List       *insuffperm_seqs_idx = NIL;
     390                 :           0 :         StringInfo      seqstr = makeStringInfo();
     391                 :           0 :         StringInfo      cmd = makeStringInfo();
     392                 :           0 :         MemoryContext oldctx;
     393                 :             : 
     394                 :             : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
     395                 :             : 
     396   [ #  #  #  # ]:           0 :         elog(DEBUG1,
     397                 :             :                  "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
     398                 :             :                  MySubscription->name, n_seqinfos);
     399                 :             : 
     400         [ #  # ]:           0 :         while (cur_batch_base_index < n_seqinfos)
     401                 :             :         {
     402                 :           0 :                 Oid                     seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
     403                 :             :                 BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
     404                 :           0 :                 int                     batch_size = 0;
     405                 :           0 :                 int                     batch_succeeded_count = 0;
     406                 :           0 :                 int                     batch_mismatched_count = 0;
     407                 :           0 :                 int                     batch_skipped_count = 0;
     408                 :           0 :                 int                     batch_insuffperm_count = 0;
     409                 :           0 :                 int                     batch_missing_count;
     410                 :           0 :                 Relation        sequence_rel = NULL;
     411                 :             : 
     412                 :           0 :                 WalRcvExecResult *res;
     413                 :           0 :                 TupleTableSlot *slot;
     414                 :             : 
     415                 :           0 :                 StartTransactionCommand();
     416                 :             : 
     417         [ #  # ]:           0 :                 for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
     418                 :             :                 {
     419                 :           0 :                         char       *nspname_literal;
     420                 :           0 :                         char       *seqname_literal;
     421                 :             : 
     422                 :           0 :                         LogicalRepSequenceInfo *seqinfo =
     423                 :           0 :                                 (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     424                 :             : 
     425         [ #  # ]:           0 :                         if (seqstr->len > 0)
     426                 :           0 :                                 appendStringInfoString(seqstr, ", ");
     427                 :             : 
     428                 :           0 :                         nspname_literal = quote_literal_cstr(seqinfo->nspname);
     429                 :           0 :                         seqname_literal = quote_literal_cstr(seqinfo->seqname);
     430                 :             : 
     431                 :           0 :                         appendStringInfo(seqstr, "(%s, %s, %d)",
     432                 :           0 :                                                          nspname_literal, seqname_literal, idx);
     433                 :             : 
     434         [ #  # ]:           0 :                         if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
     435                 :           0 :                                 break;
     436         [ #  # ]:           0 :                 }
     437                 :             : 
     438                 :             :                 /*
     439                 :             :                  * We deliberately avoid acquiring a local lock on the sequence before
     440                 :             :                  * querying the publisher to prevent potential distributed deadlocks
     441                 :             :                  * in bi-directional replication setups.
     442                 :             :                  *
     443                 :             :                  * Example scenario:
     444                 :             :                  *
     445                 :             :                  * - On each node, a background worker acquires a lock on a sequence
     446                 :             :                  * as part of a sync operation.
     447                 :             :                  *
     448                 :             :                  * - Concurrently, a user transaction attempts to alter the same
     449                 :             :                  * sequence, waiting on the background worker's lock.
     450                 :             :                  *
     451                 :             :                  * - Meanwhile, a query from the other node tries to access metadata
     452                 :             :                  * that depends on the completion of the alter operation.
     453                 :             :                  *
     454                 :             :                  * - This creates a circular wait across nodes:
     455                 :             :                  *
     456                 :             :                  * Node-1: Query -> waits on Alter -> waits on Sync Worker
     457                 :             :                  *
     458                 :             :                  * Node-2: Query -> waits on Alter -> waits on Sync Worker
     459                 :             :                  *
     460                 :             :                  * Since each node only sees part of the wait graph, the deadlock may
     461                 :             :                  * go undetected, leading to indefinite blocking.
     462                 :             :                  *
     463                 :             :                  * Note: Each entry in VALUES includes an index 'seqidx' that
     464                 :             :                  * represents the sequence's position in the local 'seqinfos' list.
     465                 :             :                  * This index is propagated to the query results and later used to
     466                 :             :                  * directly map the fetched publisher sequence rows back to their
     467                 :             :                  * corresponding local entries without relying on result order or name
     468                 :             :                  * matching.
     469                 :             :                  */
     470                 :           0 :                 appendStringInfo(cmd,
     471                 :             :                                                  "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
     472                 :             :                                                  "       seq.seqstart, seq.seqincrement, seq.seqmin,\n"
     473                 :             :                                                  "       seq.seqmax, seq.seqcycle\n"
     474                 :             :                                                  "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
     475                 :             :                                                  "JOIN pg_namespace n ON n.nspname = s.schname\n"
     476                 :             :                                                  "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
     477                 :             :                                                  "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
     478                 :             :                                                  "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
     479                 :           0 :                                                  seqstr->data);
     480                 :             : 
     481                 :           0 :                 res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
     482         [ #  # ]:           0 :                 if (res->status != WALRCV_OK_TUPLES)
     483   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     484                 :             :                                         errcode(ERRCODE_CONNECTION_FAILURE),
     485                 :             :                                         errmsg("could not fetch sequence information from the publisher: %s",
     486                 :             :                                                    res->err));
     487                 :             : 
     488                 :           0 :                 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     489         [ #  # ]:           0 :                 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     490                 :             :                 {
     491                 :           0 :                         CopySeqResult sync_status;
     492                 :           0 :                         LogicalRepSequenceInfo *seqinfo;
     493                 :           0 :                         int                     seqidx;
     494                 :             : 
     495         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
     496                 :             : 
     497         [ #  # ]:           0 :                         if (ConfigReloadPending)
     498                 :             :                         {
     499                 :           0 :                                 ConfigReloadPending = false;
     500                 :           0 :                                 ProcessConfigFile(PGC_SIGHUP);
     501                 :           0 :                         }
     502                 :             : 
     503                 :           0 :                         sync_status = get_and_validate_seq_info(slot, &sequence_rel,
     504                 :             :                                                                                                         &seqinfo, &seqidx);
     505         [ #  # ]:           0 :                         if (sync_status == COPYSEQ_SUCCESS)
     506                 :           0 :                                 sync_status = copy_sequence(seqinfo,
     507                 :           0 :                                                                                         sequence_rel->rd_rel->relowner);
     508                 :             : 
     509   [ #  #  #  #  :           0 :                         switch (sync_status)
                      # ]
     510                 :             :                         {
     511                 :             :                                 case COPYSEQ_SUCCESS:
     512   [ #  #  #  # ]:           0 :                                         elog(DEBUG1,
     513                 :             :                                                  "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
     514                 :             :                                                  MySubscription->name, seqinfo->nspname,
     515                 :             :                                                  seqinfo->seqname);
     516                 :           0 :                                         batch_succeeded_count++;
     517                 :           0 :                                         break;
     518                 :             :                                 case COPYSEQ_MISMATCH:
     519                 :             : 
     520                 :             :                                         /*
     521                 :             :                                          * Remember mismatched sequences in a long-lived memory
     522                 :             :                                          * context since these will be used after the transaction
     523                 :             :                                          * is committed.
     524                 :             :                                          */
     525                 :           0 :                                         oldctx = MemoryContextSwitchTo(ApplyContext);
     526                 :           0 :                                         mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
     527                 :           0 :                                                                                                           seqidx);
     528                 :           0 :                                         MemoryContextSwitchTo(oldctx);
     529                 :           0 :                                         batch_mismatched_count++;
     530                 :           0 :                                         break;
     531                 :             :                                 case COPYSEQ_INSUFFICIENT_PERM:
     532                 :             : 
     533                 :             :                                         /*
     534                 :             :                                          * Remember sequences with insufficient privileges in a
     535                 :             :                                          * long-lived memory context since these will be used
     536                 :             :                                          * after the transaction is committed.
     537                 :             :                                          */
     538                 :           0 :                                         oldctx = MemoryContextSwitchTo(ApplyContext);
     539                 :           0 :                                         insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
     540                 :           0 :                                                                                                           seqidx);
     541                 :           0 :                                         MemoryContextSwitchTo(oldctx);
     542                 :           0 :                                         batch_insuffperm_count++;
     543                 :           0 :                                         break;
     544                 :             :                                 case COPYSEQ_SKIPPED:
     545                 :             : 
     546                 :             :                                         /*
     547                 :             :                                          * Concurrent removal of a sequence on the subscriber is
     548                 :             :                                          * treated as success, since the only viable action is to
     549                 :             :                                          * skip the corresponding sequence data. Missing sequences
     550                 :             :                                          * on the publisher are treated as ERROR.
     551                 :             :                                          */
     552         [ #  # ]:           0 :                                         if (seqinfo->found_on_pub)
     553                 :             :                                         {
     554   [ #  #  #  # ]:           0 :                                                 ereport(LOG,
     555                 :             :                                                                 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
     556                 :             :                                                                            seqinfo->nspname,
     557                 :             :                                                                            seqinfo->seqname));
     558                 :           0 :                                                 batch_skipped_count++;
     559                 :           0 :                                         }
     560                 :           0 :                                         break;
     561                 :             :                         }
     562                 :             : 
     563         [ #  # ]:           0 :                         if (sequence_rel)
     564                 :           0 :                                 table_close(sequence_rel, NoLock);
     565                 :           0 :                 }
     566                 :             : 
     567                 :           0 :                 ExecDropSingleTupleTableSlot(slot);
     568                 :           0 :                 walrcv_clear_result(res);
     569                 :           0 :                 resetStringInfo(seqstr);
     570                 :           0 :                 resetStringInfo(cmd);
     571                 :             : 
     572                 :           0 :                 batch_missing_count = batch_size - (batch_succeeded_count +
     573                 :           0 :                                                                                         batch_mismatched_count +
     574                 :           0 :                                                                                         batch_insuffperm_count +
     575                 :           0 :                                                                                         batch_skipped_count);
     576                 :             : 
     577   [ #  #  #  # ]:           0 :                 elog(DEBUG1,
     578                 :             :                          "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
     579                 :             :                          MySubscription->name,
     580                 :             :                          (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
     581                 :             :                          batch_size, batch_succeeded_count, batch_mismatched_count,
     582                 :             :                          batch_insuffperm_count, batch_missing_count, batch_skipped_count);
     583                 :             : 
     584                 :             :                 /* Commit this batch, and prepare for next batch */
     585                 :           0 :                 CommitTransactionCommand();
     586                 :             : 
     587         [ #  # ]:           0 :                 if (batch_missing_count)
     588                 :             :                 {
     589         [ #  # ]:           0 :                         for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
     590                 :             :                         {
     591                 :           0 :                                 LogicalRepSequenceInfo *seqinfo =
     592                 :           0 :                                         (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     593                 :             : 
     594                 :             :                                 /* If the sequence was not found on publisher, record it */
     595         [ #  # ]:           0 :                                 if (!seqinfo->found_on_pub)
     596                 :           0 :                                         missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
     597                 :           0 :                         }
     598                 :           0 :                 }
     599                 :             : 
     600                 :             :                 /*
     601                 :             :                  * cur_batch_base_index is not incremented sequentially because some
     602                 :             :                  * sequences may be missing, and the number of fetched rows may not
     603                 :             :                  * match the batch size.
     604                 :             :                  */
     605                 :           0 :                 cur_batch_base_index += batch_size;
     606                 :           0 :         }
     607                 :             : 
     608                 :             :         /* Report mismatches, permission issues, or missing sequences */
     609                 :           0 :         report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
     610                 :           0 :                                                    missing_seqs_idx);
     611                 :           0 : }
     612                 :             : 
     613                 :             : /*
     614                 :             :  * Identifies sequences that require synchronization and initiates the
     615                 :             :  * synchronization process.
     616                 :             :  */
     617                 :             : static void
     618                 :           0 : LogicalRepSyncSequences(void)
     619                 :             : {
     620                 :           0 :         char       *err;
     621                 :           0 :         bool            must_use_password;
     622                 :           0 :         Relation        rel;
     623                 :           0 :         HeapTuple       tup;
     624                 :           0 :         ScanKeyData skey[2];
     625                 :           0 :         SysScanDesc scan;
     626                 :           0 :         Oid                     subid = MyLogicalRepWorker->subid;
     627                 :           0 :         StringInfoData app_name;
     628                 :             : 
     629                 :           0 :         StartTransactionCommand();
     630                 :             : 
     631                 :           0 :         rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     632                 :             : 
     633                 :           0 :         ScanKeyInit(&skey[0],
     634                 :             :                                 Anum_pg_subscription_rel_srsubid,
     635                 :             :                                 BTEqualStrategyNumber, F_OIDEQ,
     636                 :           0 :                                 ObjectIdGetDatum(subid));
     637                 :             : 
     638                 :           0 :         ScanKeyInit(&skey[1],
     639                 :             :                                 Anum_pg_subscription_rel_srsubstate,
     640                 :             :                                 BTEqualStrategyNumber, F_CHAREQ,
     641                 :           0 :                                 CharGetDatum(SUBREL_STATE_INIT));
     642                 :             : 
     643                 :           0 :         scan = systable_beginscan(rel, InvalidOid, false,
     644                 :           0 :                                                           NULL, 2, skey);
     645         [ #  # ]:           0 :         while (HeapTupleIsValid(tup = systable_getnext(scan)))
     646                 :             :         {
     647                 :           0 :                 Form_pg_subscription_rel subrel;
     648                 :           0 :                 LogicalRepSequenceInfo *seq;
     649                 :           0 :                 Relation        sequence_rel;
     650                 :           0 :                 MemoryContext oldctx;
     651                 :             : 
     652         [ #  # ]:           0 :                 CHECK_FOR_INTERRUPTS();
     653                 :             : 
     654                 :           0 :                 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     655                 :             : 
     656                 :           0 :                 sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
     657                 :             : 
     658                 :             :                 /* Skip if sequence was dropped concurrently */
     659         [ #  # ]:           0 :                 if (!sequence_rel)
     660                 :           0 :                         continue;
     661                 :             : 
     662                 :             :                 /* Skip if the relation is not a sequence */
     663         [ #  # ]:           0 :                 if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
     664                 :             :                 {
     665                 :           0 :                         table_close(sequence_rel, NoLock);
     666                 :           0 :                         continue;
     667                 :             :                 }
     668                 :             : 
     669                 :             :                 /*
     670                 :             :                  * Worker needs to process sequences across transaction boundary, so
     671                 :             :                  * allocate them under long-lived context.
     672                 :             :                  */
     673                 :           0 :                 oldctx = MemoryContextSwitchTo(ApplyContext);
     674                 :             : 
     675                 :           0 :                 seq = palloc0_object(LogicalRepSequenceInfo);
     676                 :           0 :                 seq->localrelid = subrel->srrelid;
     677                 :           0 :                 seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
     678                 :           0 :                 seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
     679                 :           0 :                 seqinfos = lappend(seqinfos, seq);
     680                 :             : 
     681                 :           0 :                 MemoryContextSwitchTo(oldctx);
     682                 :             : 
     683                 :           0 :                 table_close(sequence_rel, NoLock);
     684         [ #  # ]:           0 :         }
     685                 :             : 
     686                 :             :         /* Cleanup */
     687                 :           0 :         systable_endscan(scan);
     688                 :           0 :         table_close(rel, AccessShareLock);
     689                 :             : 
     690                 :           0 :         CommitTransactionCommand();
     691                 :             : 
     692                 :             :         /*
     693                 :             :          * Exit early if no catalog entries found, likely due to concurrent drops.
     694                 :             :          */
     695         [ #  # ]:           0 :         if (!seqinfos)
     696                 :           0 :                 return;
     697                 :             : 
     698                 :             :         /* Is the use of a password mandatory? */
     699         [ #  # ]:           0 :         must_use_password = MySubscription->passwordrequired &&
     700                 :           0 :                 !MySubscription->ownersuperuser;
     701                 :             : 
     702                 :           0 :         initStringInfo(&app_name);
     703                 :           0 :         appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
     704                 :           0 :                                          MySubscription->oid, GetSystemIdentifier());
     705                 :             : 
     706                 :             :         /*
     707                 :             :          * Establish the connection to the publisher for sequence synchronization.
     708                 :             :          */
     709                 :           0 :         LogRepWorkerWalRcvConn =
     710                 :           0 :                 walrcv_connect(MySubscription->conninfo, true, true,
     711                 :             :                                            must_use_password,
     712                 :             :                                            app_name.data, &err);
     713         [ #  # ]:           0 :         if (LogRepWorkerWalRcvConn == NULL)
     714   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     715                 :             :                                 errcode(ERRCODE_CONNECTION_FAILURE),
     716                 :             :                                 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
     717                 :             :                                            MySubscription->name, err));
     718                 :             : 
     719                 :           0 :         pfree(app_name.data);
     720                 :             : 
     721                 :           0 :         copy_sequences(LogRepWorkerWalRcvConn);
     722                 :           0 : }
     723                 :             : 
     724                 :             : /*
     725                 :             :  * Execute the initial sync with error handling. Disable the subscription,
     726                 :             :  * if required.
     727                 :             :  *
     728                 :             :  * Note that we don't handle FATAL errors which are probably because of system
     729                 :             :  * resource error and are not repeatable.
     730                 :             :  */
     731                 :             : static void
     732                 :           0 : start_sequence_sync(void)
     733                 :             : {
     734         [ #  # ]:           0 :         Assert(am_sequencesync_worker());
     735                 :             : 
     736         [ #  # ]:           0 :         PG_TRY();
     737                 :             :         {
     738                 :             :                 /* Call initial sync. */
     739                 :           0 :                 LogicalRepSyncSequences();
     740                 :             :         }
     741                 :           0 :         PG_CATCH();
     742                 :             :         {
     743         [ #  # ]:           0 :                 if (MySubscription->disableonerr)
     744                 :           0 :                         DisableSubscriptionAndExit();
     745                 :             :                 else
     746                 :             :                 {
     747                 :             :                         /*
     748                 :             :                          * Report the worker failed during sequence synchronization. Abort
     749                 :             :                          * the current transaction so that the stats message is sent in an
     750                 :             :                          * idle state.
     751                 :             :                          */
     752                 :           0 :                         AbortOutOfAnyTransaction();
     753                 :           0 :                         pgstat_report_subscription_error(MySubscription->oid,
     754                 :             :                                                                                          WORKERTYPE_SEQUENCESYNC);
     755                 :             : 
     756                 :           0 :                         PG_RE_THROW();
     757                 :             :                 }
     758                 :             :         }
     759         [ #  # ]:           0 :         PG_END_TRY();
     760                 :           0 : }
     761                 :             : 
     762                 :             : /* Logical Replication sequencesync worker entry point */
     763                 :             : void
     764                 :           0 : SequenceSyncWorkerMain(Datum main_arg)
     765                 :             : {
     766                 :           0 :         int                     worker_slot = DatumGetInt32(main_arg);
     767                 :             : 
     768                 :           0 :         SetupApplyOrSyncWorker(worker_slot);
     769                 :             : 
     770                 :           0 :         start_sequence_sync();
     771                 :             : 
     772                 :           0 :         FinishSyncWorker();
     773                 :             : }
        

Generated by: LCOV version 2.3.2-1