LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 627 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 16 0
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 0.0 % 332 0

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  * tablesync.c
       3                 :             :  *        PostgreSQL logical replication: initial table data synchronization
       4                 :             :  *
       5                 :             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       6                 :             :  *
       7                 :             :  * IDENTIFICATION
       8                 :             :  *        src/backend/replication/logical/tablesync.c
       9                 :             :  *
      10                 :             :  * NOTES
      11                 :             :  *        This file contains code for initial table data synchronization for
      12                 :             :  *        logical replication.
      13                 :             :  *
      14                 :             :  *        The initial data synchronization is done separately for each table,
      15                 :             :  *        in a separate apply worker that only fetches the initial snapshot data
      16                 :             :  *        from the publisher and then synchronizes the position in the stream with
      17                 :             :  *        the leader apply worker.
      18                 :             :  *
      19                 :             :  *        There are several reasons for doing the synchronization this way:
      20                 :             :  *         - It allows us to parallelize the initial data synchronization
      21                 :             :  *               which lowers the time needed for it to happen.
      22                 :             :  *         - The initial synchronization does not have to hold the xid and LSN
      23                 :             :  *               for the time it takes to copy data of all tables, causing less
      24                 :             :  *               bloat and lower disk consumption compared to doing the
      25                 :             :  *               synchronization in a single process for the whole database.
      26                 :             :  *         - It allows us to synchronize any tables added after the initial
      27                 :             :  *               synchronization has finished.
      28                 :             :  *
      29                 :             :  *        The stream position synchronization works in multiple steps:
      30                 :             :  *         - Apply worker requests a tablesync worker to start, setting the new
      31                 :             :  *               table state to INIT.
      32                 :             :  *         - Tablesync worker starts; changes table state from INIT to DATASYNC while
      33                 :             :  *               copying.
      34                 :             :  *         - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
      35                 :             :  *               worker specific) state to indicate when the copy phase has completed, so
      36                 :             :  *               if the worker crashes with this (non-memory) state then the copy will not
      37                 :             :  *               be re-attempted.
      38                 :             :  *         - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
      39                 :             :  *         - Apply worker periodically checks for tables in SYNCWAIT state.  When
      40                 :             :  *               any appear, it sets the table state to CATCHUP and starts loop-waiting
      41                 :             :  *               until either the table state is set to SYNCDONE or the sync worker
      42                 :             :  *               exits.
      43                 :             :  *         - After the sync worker has seen the state change to CATCHUP, it will
      44                 :             :  *               read the stream and apply changes (acting like an apply worker) until
      45                 :             :  *               it catches up to the specified stream position.  Then it sets the
      46                 :             :  *               state to SYNCDONE.  There might be zero changes applied between
      47                 :             :  *               CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      48                 :             :  *               apply worker.
      49                 :             :  *         - Once the state is set to SYNCDONE, the apply will continue tracking
      50                 :             :  *               the table until it reaches the SYNCDONE stream position, at which
      51                 :             :  *               point it sets state to READY and stops tracking.  Again, there might
      52                 :             :  *               be zero changes in between.
      53                 :             :  *
      54                 :             :  *        So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
      55                 :             :  *        -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
      56                 :             :  *
      57                 :             :  *        The catalog pg_subscription_rel is used to keep information about
      58                 :             :  *        subscribed tables and their state.  The catalog holds all states
      59                 :             :  *        except SYNCWAIT and CATCHUP which are only in shared memory.
      60                 :             :  *
      61                 :             :  *        Example flows look like this:
      62                 :             :  *         - Apply is in front:
      63                 :             :  *                sync:8
      64                 :             :  *                      -> set in catalog FINISHEDCOPY
      65                 :             :  *                      -> set in memory SYNCWAIT
      66                 :             :  *                apply:10
      67                 :             :  *                      -> set in memory CATCHUP
      68                 :             :  *                      -> enter wait-loop
      69                 :             :  *                sync:10
      70                 :             :  *                      -> set in catalog SYNCDONE
      71                 :             :  *                      -> exit
      72                 :             :  *                apply:10
      73                 :             :  *                      -> exit wait-loop
      74                 :             :  *                      -> continue rep
      75                 :             :  *                apply:11
      76                 :             :  *                      -> set in catalog READY
      77                 :             :  *
      78                 :             :  *         - Sync is in front:
      79                 :             :  *                sync:10
      80                 :             :  *                      -> set in catalog FINISHEDCOPY
      81                 :             :  *                      -> set in memory SYNCWAIT
      82                 :             :  *                apply:8
      83                 :             :  *                      -> set in memory CATCHUP
      84                 :             :  *                      -> continue per-table filtering
      85                 :             :  *                sync:10
      86                 :             :  *                      -> set in catalog SYNCDONE
      87                 :             :  *                      -> exit
      88                 :             :  *                apply:10
      89                 :             :  *                      -> set in catalog READY
      90                 :             :  *                      -> stop per-table filtering
      91                 :             :  *                      -> continue rep
      92                 :             :  *-------------------------------------------------------------------------
      93                 :             :  */
      94                 :             : 
      95                 :             : #include "postgres.h"
      96                 :             : 
      97                 :             : #include "access/table.h"
      98                 :             : #include "access/xact.h"
      99                 :             : #include "catalog/indexing.h"
     100                 :             : #include "catalog/pg_subscription_rel.h"
     101                 :             : #include "catalog/pg_type.h"
     102                 :             : #include "commands/copy.h"
     103                 :             : #include "miscadmin.h"
     104                 :             : #include "nodes/makefuncs.h"
     105                 :             : #include "parser/parse_relation.h"
     106                 :             : #include "pgstat.h"
     107                 :             : #include "replication/logicallauncher.h"
     108                 :             : #include "replication/logicalrelation.h"
     109                 :             : #include "replication/logicalworker.h"
     110                 :             : #include "replication/origin.h"
     111                 :             : #include "replication/slot.h"
     112                 :             : #include "replication/walreceiver.h"
     113                 :             : #include "replication/worker_internal.h"
     114                 :             : #include "storage/ipc.h"
     115                 :             : #include "storage/lmgr.h"
     116                 :             : #include "utils/acl.h"
     117                 :             : #include "utils/array.h"
     118                 :             : #include "utils/builtins.h"
     119                 :             : #include "utils/lsyscache.h"
     120                 :             : #include "utils/rls.h"
     121                 :             : #include "utils/snapmgr.h"
     122                 :             : #include "utils/syscache.h"
     123                 :             : #include "utils/usercontext.h"
     124                 :             : 
     125                 :             : List       *table_states_not_ready = NIL;
     126                 :             : 
     127                 :             : static StringInfo copybuf = NULL;
     128                 :             : 
     129                 :             : /*
     130                 :             :  * Wait until the relation sync state is set in the catalog to the expected
     131                 :             :  * one; return true when it happens.
     132                 :             :  *
     133                 :             :  * Returns false if the table sync worker or the table itself have
     134                 :             :  * disappeared, or the table state has been reset.
     135                 :             :  *
     136                 :             :  * Currently, this is used in the apply worker when transitioning from
     137                 :             :  * CATCHUP state to SYNCDONE.
     138                 :             :  */
     139                 :             : static bool
     140                 :           0 : wait_for_table_state_change(Oid relid, char expected_state)
     141                 :             : {
     142                 :           0 :         char            state;
     143                 :             : 
     144                 :           0 :         for (;;)
     145                 :             :         {
     146                 :           0 :                 LogicalRepWorker *worker;
     147                 :           0 :                 XLogRecPtr      statelsn;
     148                 :             : 
     149         [ #  # ]:           0 :                 CHECK_FOR_INTERRUPTS();
     150                 :             : 
     151                 :           0 :                 InvalidateCatalogSnapshot();
     152                 :           0 :                 state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     153                 :           0 :                                                                                 relid, &statelsn);
     154                 :             : 
     155         [ #  # ]:           0 :                 if (state == SUBREL_STATE_UNKNOWN)
     156                 :           0 :                         break;
     157                 :             : 
     158         [ #  # ]:           0 :                 if (state == expected_state)
     159                 :           0 :                         return true;
     160                 :             : 
     161                 :             :                 /* Check if the sync worker is still running and bail if not. */
     162                 :           0 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     163                 :           0 :                 worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
     164                 :           0 :                                                                                 MyLogicalRepWorker->subid, relid,
     165                 :             :                                                                                 false);
     166                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     167         [ #  # ]:           0 :                 if (!worker)
     168                 :           0 :                         break;
     169                 :             : 
     170                 :           0 :                 (void) WaitLatch(MyLatch,
     171                 :             :                                                  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     172                 :             :                                                  1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     173                 :             : 
     174                 :           0 :                 ResetLatch(MyLatch);
     175      [ #  #  # ]:           0 :         }
     176                 :             : 
     177                 :           0 :         return false;
     178                 :           0 : }
     179                 :             : 
     180                 :             : /*
     181                 :             :  * Wait until the apply worker changes the state of our synchronization
     182                 :             :  * worker to the expected one.
     183                 :             :  *
     184                 :             :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     185                 :             :  *
     186                 :             :  * Returns false if the apply worker has disappeared.
     187                 :             :  */
     188                 :             : static bool
     189                 :           0 : wait_for_worker_state_change(char expected_state)
     190                 :             : {
     191                 :           0 :         int                     rc;
     192                 :             : 
     193                 :           0 :         for (;;)
     194                 :             :         {
     195                 :           0 :                 LogicalRepWorker *worker;
     196                 :             : 
     197         [ #  # ]:           0 :                 CHECK_FOR_INTERRUPTS();
     198                 :             : 
     199                 :             :                 /*
     200                 :             :                  * Done if already in correct state.  (We assume this fetch is atomic
     201                 :             :                  * enough to not give a misleading answer if we do it with no lock.)
     202                 :             :                  */
     203         [ #  # ]:           0 :                 if (MyLogicalRepWorker->relstate == expected_state)
     204                 :           0 :                         return true;
     205                 :             : 
     206                 :             :                 /*
     207                 :             :                  * Bail out if the apply worker has died, else signal it we're
     208                 :             :                  * waiting.
     209                 :             :                  */
     210                 :           0 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     211                 :           0 :                 worker = logicalrep_worker_find(WORKERTYPE_APPLY,
     212                 :           0 :                                                                                 MyLogicalRepWorker->subid, InvalidOid,
     213                 :             :                                                                                 false);
     214   [ #  #  #  # ]:           0 :                 if (worker && worker->proc)
     215                 :           0 :                         logicalrep_worker_wakeup_ptr(worker);
     216                 :           0 :                 LWLockRelease(LogicalRepWorkerLock);
     217         [ #  # ]:           0 :                 if (!worker)
     218                 :           0 :                         break;
     219                 :             : 
     220                 :             :                 /*
     221                 :             :                  * Wait.  We expect to get a latch signal back from the apply worker,
     222                 :             :                  * but use a timeout in case it dies without sending one.
     223                 :             :                  */
     224                 :           0 :                 rc = WaitLatch(MyLatch,
     225                 :             :                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     226                 :             :                                            1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     227                 :             : 
     228         [ #  # ]:           0 :                 if (rc & WL_LATCH_SET)
     229                 :           0 :                         ResetLatch(MyLatch);
     230      [ #  #  # ]:           0 :         }
     231                 :             : 
     232                 :           0 :         return false;
     233                 :           0 : }
     234                 :             : 
     235                 :             : /*
     236                 :             :  * Handle table synchronization cooperation from the synchronization
     237                 :             :  * worker.
     238                 :             :  *
     239                 :             :  * If the sync worker is in CATCHUP state and reached (or passed) the
     240                 :             :  * predetermined synchronization point in the WAL stream, mark the table as
     241                 :             :  * SYNCDONE and finish.
     242                 :             :  */
     243                 :             : void
     244                 :           0 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
     245                 :             : {
     246         [ #  # ]:           0 :         SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     247                 :             : 
     248   [ #  #  #  # ]:           0 :         if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     249                 :           0 :                 current_lsn >= MyLogicalRepWorker->relstate_lsn)
     250                 :             :         {
     251                 :           0 :                 TimeLineID      tli;
     252                 :           0 :                 char            syncslotname[NAMEDATALEN] = {0};
     253                 :           0 :                 char            originname[NAMEDATALEN] = {0};
     254                 :             : 
     255                 :           0 :                 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     256                 :           0 :                 MyLogicalRepWorker->relstate_lsn = current_lsn;
     257                 :             : 
     258                 :           0 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     259                 :             : 
     260                 :             :                 /*
     261                 :             :                  * UpdateSubscriptionRelState must be called within a transaction.
     262                 :             :                  */
     263         [ #  # ]:           0 :                 if (!IsTransactionState())
     264                 :           0 :                         StartTransactionCommand();
     265                 :             : 
     266                 :           0 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     267                 :           0 :                                                                    MyLogicalRepWorker->relid,
     268                 :           0 :                                                                    MyLogicalRepWorker->relstate,
     269                 :           0 :                                                                    MyLogicalRepWorker->relstate_lsn,
     270                 :             :                                                                    false);
     271                 :             : 
     272                 :             :                 /*
     273                 :             :                  * End streaming so that LogRepWorkerWalRcvConn can be used to drop
     274                 :             :                  * the slot.
     275                 :             :                  */
     276                 :           0 :                 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
     277                 :             : 
     278                 :             :                 /*
     279                 :             :                  * Cleanup the tablesync slot.
     280                 :             :                  *
     281                 :             :                  * This has to be done after updating the state because otherwise if
     282                 :             :                  * there is an error while doing the database operations we won't be
     283                 :             :                  * able to rollback dropped slot.
     284                 :             :                  */
     285                 :           0 :                 ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
     286                 :           0 :                                                                                 MyLogicalRepWorker->relid,
     287                 :           0 :                                                                                 syncslotname,
     288                 :             :                                                                                 sizeof(syncslotname));
     289                 :             : 
     290                 :             :                 /*
     291                 :             :                  * It is important to give an error if we are unable to drop the slot,
     292                 :             :                  * otherwise, it won't be dropped till the corresponding subscription
     293                 :             :                  * is dropped. So passing missing_ok = false.
     294                 :             :                  */
     295                 :           0 :                 ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
     296                 :             : 
     297                 :           0 :                 CommitTransactionCommand();
     298                 :           0 :                 pgstat_report_stat(false);
     299                 :             : 
     300                 :             :                 /*
     301                 :             :                  * Start a new transaction to clean up the tablesync origin tracking.
     302                 :             :                  * This transaction will be ended within the FinishSyncWorker(). Now,
     303                 :             :                  * even, if we fail to remove this here, the apply worker will ensure
     304                 :             :                  * to clean it up afterward.
     305                 :             :                  *
     306                 :             :                  * We need to do this after the table state is set to SYNCDONE.
     307                 :             :                  * Otherwise, if an error occurs while performing the database
     308                 :             :                  * operation, the worker will be restarted and the in-memory state of
     309                 :             :                  * replication progress (remote_lsn) won't be rolled-back which would
     310                 :             :                  * have been cleared before restart. So, the restarted worker will use
     311                 :             :                  * invalid replication progress state resulting in replay of
     312                 :             :                  * transactions that have already been applied.
     313                 :             :                  */
     314                 :           0 :                 StartTransactionCommand();
     315                 :             : 
     316                 :           0 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     317                 :           0 :                                                                                    MyLogicalRepWorker->relid,
     318                 :           0 :                                                                                    originname,
     319                 :             :                                                                                    sizeof(originname));
     320                 :             : 
     321                 :             :                 /*
     322                 :             :                  * Resetting the origin session removes the ownership of the slot.
     323                 :             :                  * This is needed to allow the origin to be dropped.
     324                 :             :                  */
     325                 :           0 :                 replorigin_session_reset();
     326                 :           0 :                 replorigin_session_origin = InvalidRepOriginId;
     327                 :           0 :                 replorigin_session_origin_lsn = InvalidXLogRecPtr;
     328                 :           0 :                 replorigin_session_origin_timestamp = 0;
     329                 :             : 
     330                 :             :                 /*
     331                 :             :                  * Drop the tablesync's origin tracking if exists.
     332                 :             :                  *
     333                 :             :                  * There is a chance that the user is concurrently performing refresh
     334                 :             :                  * for the subscription where we remove the table state and its origin
     335                 :             :                  * or the apply worker would have removed this origin. So passing
     336                 :             :                  * missing_ok = true.
     337                 :             :                  */
     338                 :           0 :                 replorigin_drop_by_name(originname, true, false);
     339                 :             : 
     340                 :           0 :                 FinishSyncWorker();
     341                 :             :         }
     342                 :             :         else
     343                 :           0 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     344                 :           0 : }
     345                 :             : 
     346                 :             : /*
     347                 :             :  * Handle table synchronization cooperation from the apply worker.
     348                 :             :  *
     349                 :             :  * Walk over all subscription tables that are individually tracked by the
     350                 :             :  * apply process (currently, all that have state other than
     351                 :             :  * SUBREL_STATE_READY) and manage synchronization for them.
     352                 :             :  *
     353                 :             :  * If there are tables that need synchronizing and are not being synchronized
     354                 :             :  * yet, start sync workers for them (if there are free slots for sync
     355                 :             :  * workers).  To prevent starting the sync worker for the same relation at a
     356                 :             :  * high frequency after a failure, we store its last start time with each sync
     357                 :             :  * state info.  We start the sync worker for the same relation after waiting
     358                 :             :  * at least wal_retrieve_retry_interval.
     359                 :             :  *
     360                 :             :  * For tables that are being synchronized already, check if sync workers
     361                 :             :  * either need action from the apply worker or have finished.  This is the
     362                 :             :  * SYNCWAIT to CATCHUP transition.
     363                 :             :  *
     364                 :             :  * If the synchronization position is reached (SYNCDONE), then the table can
     365                 :             :  * be marked as READY and is no longer tracked.
     366                 :             :  */
     367                 :             : void
     368                 :           0 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
     369                 :             : {
     370                 :             :         struct tablesync_start_time_mapping
     371                 :             :         {
     372                 :             :                 Oid                     relid;
     373                 :             :                 TimestampTz last_start_time;
     374                 :             :         };
     375                 :             :         static HTAB *last_start_times = NULL;
     376                 :           0 :         ListCell   *lc;
     377                 :           0 :         bool            started_tx;
     378                 :           0 :         bool            should_exit = false;
     379                 :           0 :         Relation        rel = NULL;
     380                 :             : 
     381         [ #  # ]:           0 :         Assert(!IsTransactionState());
     382                 :             : 
     383                 :             :         /* We need up-to-date sync state info for subscription tables here. */
     384                 :           0 :         FetchRelationStates(NULL, NULL, &started_tx);
     385                 :             : 
     386                 :             :         /*
     387                 :             :          * Prepare a hash table for tracking last start times of workers, to avoid
     388                 :             :          * immediate restarts.  We don't need it if there are no tables that need
     389                 :             :          * syncing.
     390                 :             :          */
     391   [ #  #  #  # ]:           0 :         if (table_states_not_ready != NIL && !last_start_times)
     392                 :             :         {
     393                 :           0 :                 HASHCTL         ctl;
     394                 :             : 
     395                 :           0 :                 ctl.keysize = sizeof(Oid);
     396                 :           0 :                 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     397                 :           0 :                 last_start_times = hash_create("Logical replication table sync worker start times",
     398                 :             :                                                                            256, &ctl, HASH_ELEM | HASH_BLOBS);
     399                 :           0 :         }
     400                 :             : 
     401                 :             :         /*
     402                 :             :          * Clean up the hash table when we're done with all tables (just to
     403                 :             :          * release the bit of memory).
     404                 :             :          */
     405   [ #  #  #  # ]:           0 :         else if (table_states_not_ready == NIL && last_start_times)
     406                 :             :         {
     407                 :           0 :                 hash_destroy(last_start_times);
     408                 :           0 :                 last_start_times = NULL;
     409                 :           0 :         }
     410                 :             : 
     411                 :             :         /*
     412                 :             :          * Process all tables that are being synchronized.
     413                 :             :          */
     414   [ #  #  #  #  :           0 :         foreach(lc, table_states_not_ready)
                   #  # ]
     415                 :             :         {
     416                 :           0 :                 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     417                 :             : 
     418         [ #  # ]:           0 :                 if (!started_tx)
     419                 :             :                 {
     420                 :           0 :                         StartTransactionCommand();
     421                 :           0 :                         started_tx = true;
     422                 :           0 :                 }
     423                 :             : 
     424         [ #  # ]:           0 :                 Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
     425                 :             : 
     426         [ #  # ]:           0 :                 if (rstate->state == SUBREL_STATE_SYNCDONE)
     427                 :             :                 {
     428                 :             :                         /*
     429                 :             :                          * Apply has caught up to the position where the table sync has
     430                 :             :                          * finished.  Mark the table as ready so that the apply will just
     431                 :             :                          * continue to replicate it normally.
     432                 :             :                          */
     433         [ #  # ]:           0 :                         if (current_lsn >= rstate->lsn)
     434                 :             :                         {
     435                 :           0 :                                 char            originname[NAMEDATALEN];
     436                 :             : 
     437                 :           0 :                                 rstate->state = SUBREL_STATE_READY;
     438                 :           0 :                                 rstate->lsn = current_lsn;
     439                 :             : 
     440                 :             :                                 /*
     441                 :             :                                  * Remove the tablesync origin tracking if exists.
     442                 :             :                                  *
     443                 :             :                                  * There is a chance that the user is concurrently performing
     444                 :             :                                  * refresh for the subscription where we remove the table
     445                 :             :                                  * state and its origin or the tablesync worker would have
     446                 :             :                                  * already removed this origin. We can't rely on tablesync
     447                 :             :                                  * worker to remove the origin tracking as if there is any
     448                 :             :                                  * error while dropping we won't restart it to drop the
     449                 :             :                                  * origin. So passing missing_ok = true.
     450                 :             :                                  *
     451                 :             :                                  * Lock the subscription and origin in the same order as we
     452                 :             :                                  * are doing during DDL commands to avoid deadlocks. See
     453                 :             :                                  * AlterSubscription_refresh.
     454                 :             :                                  */
     455                 :           0 :                                 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
     456                 :             :                                                                  0, AccessShareLock);
     457                 :             : 
     458         [ #  # ]:           0 :                                 if (!rel)
     459                 :           0 :                                         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     460                 :             : 
     461                 :           0 :                                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     462                 :           0 :                                                                                                    rstate->relid,
     463                 :           0 :                                                                                                    originname,
     464                 :             :                                                                                                    sizeof(originname));
     465                 :           0 :                                 replorigin_drop_by_name(originname, true, false);
     466                 :             : 
     467                 :             :                                 /*
     468                 :             :                                  * Update the state to READY only after the origin cleanup.
     469                 :             :                                  */
     470                 :           0 :                                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     471                 :           0 :                                                                                    rstate->relid, rstate->state,
     472                 :           0 :                                                                                    rstate->lsn, true);
     473                 :           0 :                         }
     474                 :           0 :                 }
     475                 :             :                 else
     476                 :             :                 {
     477                 :           0 :                         LogicalRepWorker *syncworker;
     478                 :             : 
     479                 :             :                         /*
     480                 :             :                          * Look for a sync worker for this relation.
     481                 :             :                          */
     482                 :           0 :                         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     483                 :             : 
     484                 :           0 :                         syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
     485                 :           0 :                                                                                                 MyLogicalRepWorker->subid,
     486                 :           0 :                                                                                                 rstate->relid, false);
     487                 :             : 
     488         [ #  # ]:           0 :                         if (syncworker)
     489                 :             :                         {
     490                 :             :                                 /* Found one, update our copy of its state */
     491         [ #  # ]:           0 :                                 SpinLockAcquire(&syncworker->relmutex);
     492                 :           0 :                                 rstate->state = syncworker->relstate;
     493                 :           0 :                                 rstate->lsn = syncworker->relstate_lsn;
     494         [ #  # ]:           0 :                                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     495                 :             :                                 {
     496                 :             :                                         /*
     497                 :             :                                          * Sync worker is waiting for apply.  Tell sync worker it
     498                 :             :                                          * can catchup now.
     499                 :             :                                          */
     500                 :           0 :                                         syncworker->relstate = SUBREL_STATE_CATCHUP;
     501                 :           0 :                                         syncworker->relstate_lsn =
     502         [ #  # ]:           0 :                                                 Max(syncworker->relstate_lsn, current_lsn);
     503                 :           0 :                                 }
     504                 :           0 :                                 SpinLockRelease(&syncworker->relmutex);
     505                 :             : 
     506                 :             :                                 /* If we told worker to catch up, wait for it. */
     507         [ #  # ]:           0 :                                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     508                 :             :                                 {
     509                 :             :                                         /* Signal the sync worker, as it may be waiting for us. */
     510         [ #  # ]:           0 :                                         if (syncworker->proc)
     511                 :           0 :                                                 logicalrep_worker_wakeup_ptr(syncworker);
     512                 :             : 
     513                 :             :                                         /* Now safe to release the LWLock */
     514                 :           0 :                                         LWLockRelease(LogicalRepWorkerLock);
     515                 :             : 
     516         [ #  # ]:           0 :                                         if (started_tx)
     517                 :             :                                         {
     518                 :             :                                                 /*
     519                 :             :                                                  * We must commit the existing transaction to release
     520                 :             :                                                  * the existing locks before entering a busy loop.
     521                 :             :                                                  * This is required to avoid any undetected deadlocks
     522                 :             :                                                  * due to any existing lock as deadlock detector won't
     523                 :             :                                                  * be able to detect the waits on the latch.
     524                 :             :                                                  *
     525                 :             :                                                  * Also close any tables prior to the commit.
     526                 :             :                                                  */
     527         [ #  # ]:           0 :                                                 if (rel)
     528                 :             :                                                 {
     529                 :           0 :                                                         table_close(rel, NoLock);
     530                 :           0 :                                                         rel = NULL;
     531                 :           0 :                                                 }
     532                 :           0 :                                                 CommitTransactionCommand();
     533                 :           0 :                                                 pgstat_report_stat(false);
     534                 :           0 :                                         }
     535                 :             : 
     536                 :             :                                         /*
     537                 :             :                                          * Enter busy loop and wait for synchronization worker to
     538                 :             :                                          * reach expected state (or die trying).
     539                 :             :                                          */
     540                 :           0 :                                         StartTransactionCommand();
     541                 :           0 :                                         started_tx = true;
     542                 :             : 
     543                 :           0 :                                         wait_for_table_state_change(rstate->relid,
     544                 :             :                                                                                                 SUBREL_STATE_SYNCDONE);
     545                 :           0 :                                 }
     546                 :             :                                 else
     547                 :           0 :                                         LWLockRelease(LogicalRepWorkerLock);
     548                 :           0 :                         }
     549                 :             :                         else
     550                 :             :                         {
     551                 :             :                                 /*
     552                 :             :                                  * If there is no sync worker for this table yet, count
     553                 :             :                                  * running sync workers for this subscription, while we have
     554                 :             :                                  * the lock.
     555                 :             :                                  */
     556                 :           0 :                                 int                     nsyncworkers =
     557                 :           0 :                                         logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     558                 :           0 :                                 struct tablesync_start_time_mapping *hentry;
     559                 :           0 :                                 bool            found;
     560                 :             : 
     561                 :             :                                 /* Now safe to release the LWLock */
     562                 :           0 :                                 LWLockRelease(LogicalRepWorkerLock);
     563                 :             : 
     564                 :           0 :                                 hentry = hash_search(last_start_times, &rstate->relid,
     565                 :             :                                                                          HASH_ENTER, &found);
     566         [ #  # ]:           0 :                                 if (!found)
     567                 :           0 :                                         hentry->last_start_time = 0;
     568                 :             : 
     569                 :           0 :                                 launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
     570                 :           0 :                                                                    rstate->relid, &hentry->last_start_time);
     571                 :           0 :                         }
     572                 :           0 :                 }
     573                 :           0 :         }
     574                 :             : 
     575                 :             :         /* Close table if opened */
     576         [ #  # ]:           0 :         if (rel)
     577                 :           0 :                 table_close(rel, NoLock);
     578                 :             : 
     579                 :             : 
     580         [ #  # ]:           0 :         if (started_tx)
     581                 :             :         {
     582                 :             :                 /*
     583                 :             :                  * Even when the two_phase mode is requested by the user, it remains
     584                 :             :                  * as 'pending' until all tablesyncs have reached READY state.
     585                 :             :                  *
     586                 :             :                  * When this happens, we restart the apply worker and (if the
     587                 :             :                  * conditions are still ok) then the two_phase tri-state will become
     588                 :             :                  * 'enabled' at that time.
     589                 :             :                  *
     590                 :             :                  * Note: If the subscription has no tables then leave the state as
     591                 :             :                  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
     592                 :             :                  * work.
     593                 :             :                  */
     594         [ #  # ]:           0 :                 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
     595                 :             :                 {
     596                 :           0 :                         CommandCounterIncrement();      /* make updates visible */
     597         [ #  # ]:           0 :                         if (AllTablesyncsReady())
     598                 :             :                         {
     599   [ #  #  #  # ]:           0 :                                 ereport(LOG,
     600                 :             :                                                 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
     601                 :             :                                                                 MySubscription->name)));
     602                 :           0 :                                 should_exit = true;
     603                 :           0 :                         }
     604                 :           0 :                 }
     605                 :             : 
     606                 :           0 :                 CommitTransactionCommand();
     607                 :           0 :                 pgstat_report_stat(true);
     608                 :           0 :         }
     609                 :             : 
     610         [ #  # ]:           0 :         if (should_exit)
     611                 :             :         {
     612                 :             :                 /*
     613                 :             :                  * Reset the last-start time for this worker so that the launcher will
     614                 :             :                  * restart it without waiting for wal_retrieve_retry_interval.
     615                 :             :                  */
     616                 :           0 :                 ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
     617                 :             : 
     618                 :           0 :                 proc_exit(0);
     619                 :             :         }
     620                 :           0 : }
     621                 :             : 
     622                 :             : /*
     623                 :             :  * Create list of columns for COPY based on logical relation mapping.
     624                 :             :  */
     625                 :             : static List *
     626                 :           0 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     627                 :             : {
     628                 :           0 :         List       *attnamelist = NIL;
     629                 :           0 :         int                     i;
     630                 :             : 
     631         [ #  # ]:           0 :         for (i = 0; i < rel->remoterel.natts; i++)
     632                 :             :         {
     633                 :           0 :                 attnamelist = lappend(attnamelist,
     634                 :           0 :                                                           makeString(rel->remoterel.attnames[i]));
     635                 :           0 :         }
     636                 :             : 
     637                 :             : 
     638                 :           0 :         return attnamelist;
     639                 :           0 : }
     640                 :             : 
     641                 :             : /*
     642                 :             :  * Data source callback for the COPY FROM, which reads from the remote
     643                 :             :  * connection and passes the data back to our local COPY.
     644                 :             :  */
     645                 :             : static int
     646                 :           0 : copy_read_data(void *outbuf, int minread, int maxread)
     647                 :             : {
     648                 :           0 :         int                     bytesread = 0;
     649                 :           0 :         int                     avail;
     650                 :             : 
     651                 :             :         /* If there are some leftover data from previous read, use it. */
     652                 :           0 :         avail = copybuf->len - copybuf->cursor;
     653         [ #  # ]:           0 :         if (avail)
     654                 :             :         {
     655         [ #  # ]:           0 :                 if (avail > maxread)
     656                 :           0 :                         avail = maxread;
     657                 :           0 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     658                 :           0 :                 copybuf->cursor += avail;
     659                 :           0 :                 maxread -= avail;
     660                 :           0 :                 bytesread += avail;
     661                 :           0 :         }
     662                 :             : 
     663   [ #  #  #  # ]:           0 :         while (maxread > 0 && bytesread < minread)
     664                 :             :         {
     665                 :           0 :                 pgsocket        fd = PGINVALID_SOCKET;
     666                 :           0 :                 int                     len;
     667                 :           0 :                 char       *buf = NULL;
     668                 :             : 
     669                 :           0 :                 for (;;)
     670                 :             :                 {
     671                 :             :                         /* Try read the data. */
     672                 :           0 :                         len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
     673                 :             : 
     674         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
     675                 :             : 
     676         [ #  # ]:           0 :                         if (len == 0)
     677                 :           0 :                                 break;
     678         [ #  # ]:           0 :                         else if (len < 0)
     679                 :           0 :                                 return bytesread;
     680                 :             :                         else
     681                 :             :                         {
     682                 :             :                                 /* Process the data */
     683                 :           0 :                                 copybuf->data = buf;
     684                 :           0 :                                 copybuf->len = len;
     685                 :           0 :                                 copybuf->cursor = 0;
     686                 :             : 
     687                 :           0 :                                 avail = copybuf->len - copybuf->cursor;
     688         [ #  # ]:           0 :                                 if (avail > maxread)
     689                 :           0 :                                         avail = maxread;
     690                 :           0 :                                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     691                 :           0 :                                 outbuf = (char *) outbuf + avail;
     692                 :           0 :                                 copybuf->cursor += avail;
     693                 :           0 :                                 maxread -= avail;
     694                 :           0 :                                 bytesread += avail;
     695                 :             :                         }
     696                 :             : 
     697   [ #  #  #  # ]:           0 :                         if (maxread <= 0 || bytesread >= minread)
     698                 :           0 :                                 return bytesread;
     699                 :             :                 }
     700                 :             : 
     701                 :             :                 /*
     702                 :             :                  * Wait for more data or latch.
     703                 :             :                  */
     704                 :           0 :                 (void) WaitLatchOrSocket(MyLatch,
     705                 :             :                                                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     706                 :             :                                                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     707                 :           0 :                                                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     708                 :             : 
     709                 :           0 :                 ResetLatch(MyLatch);
     710         [ #  # ]:           0 :         }
     711                 :             : 
     712                 :           0 :         return bytesread;
     713                 :           0 : }
     714                 :             : 
     715                 :             : 
     716                 :             : /*
     717                 :             :  * Get information about remote relation in similar fashion the RELATION
     718                 :             :  * message provides during replication.
     719                 :             :  *
     720                 :             :  * This function also returns (a) the relation qualifications to be used in
     721                 :             :  * the COPY command, and (b) whether the remote relation has published any
     722                 :             :  * generated column.
     723                 :             :  */
     724                 :             : static void
     725                 :           0 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
     726                 :             :                                                 List **qual, bool *gencol_published)
     727                 :             : {
     728                 :           0 :         WalRcvExecResult *res;
     729                 :           0 :         StringInfoData cmd;
     730                 :           0 :         TupleTableSlot *slot;
     731                 :           0 :         Oid                     tableRow[] = {OIDOID, CHAROID, CHAROID};
     732                 :           0 :         Oid                     attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
     733                 :           0 :         Oid                     qualRow[] = {TEXTOID};
     734                 :           0 :         bool            isnull;
     735                 :           0 :         int                     natt;
     736                 :           0 :         StringInfo      pub_names = NULL;
     737                 :           0 :         Bitmapset  *included_cols = NULL;
     738                 :           0 :         int                     server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
     739                 :             : 
     740                 :           0 :         lrel->nspname = nspname;
     741                 :           0 :         lrel->relname = relname;
     742                 :             : 
     743                 :             :         /* First fetch Oid and replica identity. */
     744                 :           0 :         initStringInfo(&cmd);
     745                 :           0 :         appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
     746                 :             :                                          "  FROM pg_catalog.pg_class c"
     747                 :             :                                          "  INNER JOIN pg_catalog.pg_namespace n"
     748                 :             :                                          "        ON (c.relnamespace = n.oid)"
     749                 :             :                                          " WHERE n.nspname = %s"
     750                 :             :                                          "   AND c.relname = %s",
     751                 :           0 :                                          quote_literal_cstr(nspname),
     752                 :           0 :                                          quote_literal_cstr(relname));
     753                 :           0 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     754                 :             :                                           lengthof(tableRow), tableRow);
     755                 :             : 
     756         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
     757   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     758                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
     759                 :             :                                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     760                 :             :                                                 nspname, relname, res->err)));
     761                 :             : 
     762                 :           0 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     763         [ #  # ]:           0 :         if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     764   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     765                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     766                 :             :                                  errmsg("table \"%s.%s\" not found on publisher",
     767                 :             :                                                 nspname, relname)));
     768                 :             : 
     769                 :           0 :         lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     770         [ #  # ]:           0 :         Assert(!isnull);
     771                 :           0 :         lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     772         [ #  # ]:           0 :         Assert(!isnull);
     773                 :           0 :         lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
     774         [ #  # ]:           0 :         Assert(!isnull);
     775                 :             : 
     776                 :           0 :         ExecDropSingleTupleTableSlot(slot);
     777                 :           0 :         walrcv_clear_result(res);
     778                 :             : 
     779                 :             : 
     780                 :             :         /*
     781                 :             :          * Get column lists for each relation.
     782                 :             :          *
     783                 :             :          * We need to do this before fetching info about column names and types,
     784                 :             :          * so that we can skip columns that should not be replicated.
     785                 :             :          */
     786         [ #  # ]:           0 :         if (server_version >= 150000)
     787                 :             :         {
     788                 :           0 :                 WalRcvExecResult *pubres;
     789                 :           0 :                 TupleTableSlot *tslot;
     790                 :           0 :                 Oid                     attrsRow[] = {INT2VECTOROID};
     791                 :             : 
     792                 :             :                 /* Build the pub_names comma-separated string. */
     793                 :           0 :                 pub_names = makeStringInfo();
     794                 :           0 :                 GetPublicationsStr(MySubscription->publications, pub_names, true);
     795                 :             : 
     796                 :             :                 /*
     797                 :             :                  * Fetch info about column lists for the relation (from all the
     798                 :             :                  * publications).
     799                 :             :                  */
     800                 :           0 :                 resetStringInfo(&cmd);
     801                 :           0 :                 appendStringInfo(&cmd,
     802                 :             :                                                  "SELECT DISTINCT"
     803                 :             :                                                  "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
     804                 :             :                                                  "   THEN NULL ELSE gpt.attrs END)"
     805                 :             :                                                  "  FROM pg_publication p,"
     806                 :             :                                                  "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
     807                 :             :                                                  "  pg_class c"
     808                 :             :                                                  " WHERE gpt.relid = %u AND c.oid = gpt.relid"
     809                 :             :                                                  "   AND p.pubname IN ( %s )",
     810                 :           0 :                                                  lrel->remoteid,
     811                 :           0 :                                                  pub_names->data);
     812                 :             : 
     813                 :           0 :                 pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     814                 :             :                                                          lengthof(attrsRow), attrsRow);
     815                 :             : 
     816         [ #  # ]:           0 :                 if (pubres->status != WALRCV_OK_TUPLES)
     817   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     818                 :             :                                         (errcode(ERRCODE_CONNECTION_FAILURE),
     819                 :             :                                          errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
     820                 :             :                                                         nspname, relname, pubres->err)));
     821                 :             : 
     822                 :             :                 /*
     823                 :             :                  * We don't support the case where the column list is different for
     824                 :             :                  * the same table when combining publications. See comments atop
     825                 :             :                  * fetch_relation_list. So there should be only one row returned.
     826                 :             :                  * Although we already checked this when creating the subscription, we
     827                 :             :                  * still need to check here in case the column list was changed after
     828                 :             :                  * creating the subscription and before the sync worker is started.
     829                 :             :                  */
     830         [ #  # ]:           0 :                 if (tuplestore_tuple_count(pubres->tuplestore) > 1)
     831   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     832                 :             :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     833                 :             :                                         errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
     834                 :             :                                                    nspname, relname));
     835                 :             : 
     836                 :             :                 /*
     837                 :             :                  * Get the column list and build a single bitmap with the attnums.
     838                 :             :                  *
     839                 :             :                  * If we find a NULL value, it means all the columns should be
     840                 :             :                  * replicated.
     841                 :             :                  */
     842                 :           0 :                 tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
     843         [ #  # ]:           0 :                 if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
     844                 :             :                 {
     845                 :           0 :                         Datum           cfval = slot_getattr(tslot, 1, &isnull);
     846                 :             : 
     847         [ #  # ]:           0 :                         if (!isnull)
     848                 :             :                         {
     849                 :           0 :                                 ArrayType  *arr;
     850                 :           0 :                                 int                     nelems;
     851                 :           0 :                                 int16      *elems;
     852                 :             : 
     853                 :           0 :                                 arr = DatumGetArrayTypeP(cfval);
     854                 :           0 :                                 nelems = ARR_DIMS(arr)[0];
     855         [ #  # ]:           0 :                                 elems = (int16 *) ARR_DATA_PTR(arr);
     856                 :             : 
     857         [ #  # ]:           0 :                                 for (natt = 0; natt < nelems; natt++)
     858                 :           0 :                                         included_cols = bms_add_member(included_cols, elems[natt]);
     859                 :           0 :                         }
     860                 :             : 
     861                 :           0 :                         ExecClearTuple(tslot);
     862                 :           0 :                 }
     863                 :           0 :                 ExecDropSingleTupleTableSlot(tslot);
     864                 :             : 
     865                 :           0 :                 walrcv_clear_result(pubres);
     866                 :           0 :         }
     867                 :             : 
     868                 :             :         /*
     869                 :             :          * Now fetch column names and types.
     870                 :             :          */
     871                 :           0 :         resetStringInfo(&cmd);
     872                 :           0 :         appendStringInfoString(&cmd,
     873                 :             :                                                    "SELECT a.attnum,"
     874                 :             :                                                    "       a.attname,"
     875                 :             :                                                    "       a.atttypid,"
     876                 :             :                                                    "       a.attnum = ANY(i.indkey)");
     877                 :             : 
     878                 :             :         /* Generated columns can be replicated since version 18. */
     879         [ #  # ]:           0 :         if (server_version >= 180000)
     880                 :           0 :                 appendStringInfoString(&cmd, ", a.attgenerated != ''");
     881                 :             : 
     882                 :           0 :         appendStringInfo(&cmd,
     883                 :             :                                          "  FROM pg_catalog.pg_attribute a"
     884                 :             :                                          "  LEFT JOIN pg_catalog.pg_index i"
     885                 :             :                                          "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     886                 :             :                                          " WHERE a.attnum > 0::pg_catalog.int2"
     887                 :             :                                          "   AND NOT a.attisdropped %s"
     888                 :             :                                          "   AND a.attrelid = %u"
     889                 :             :                                          " ORDER BY a.attnum",
     890                 :           0 :                                          lrel->remoteid,
     891         [ #  # ]:           0 :                                          (server_version >= 120000 && server_version < 180000 ?
     892                 :             :                                           "AND a.attgenerated = ''" : ""),
     893                 :           0 :                                          lrel->remoteid);
     894                 :           0 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     895                 :             :                                           server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
     896                 :             : 
     897         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
     898   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     899                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
     900                 :             :                                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     901                 :             :                                                 nspname, relname, res->err)));
     902                 :             : 
     903                 :             :         /* We don't know the number of rows coming, so allocate enough space. */
     904                 :           0 :         lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     905                 :           0 :         lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     906                 :           0 :         lrel->attkeys = NULL;
     907                 :             : 
     908                 :             :         /*
     909                 :             :          * Store the columns as a list of names.  Ignore those that are not
     910                 :             :          * present in the column list, if there is one.
     911                 :             :          */
     912                 :           0 :         natt = 0;
     913                 :           0 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     914         [ #  # ]:           0 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     915                 :             :         {
     916                 :           0 :                 char       *rel_colname;
     917                 :           0 :                 AttrNumber      attnum;
     918                 :             : 
     919                 :           0 :                 attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
     920         [ #  # ]:           0 :                 Assert(!isnull);
     921                 :             : 
     922                 :             :                 /* If the column is not in the column list, skip it. */
     923   [ #  #  #  # ]:           0 :                 if (included_cols != NULL && !bms_is_member(attnum, included_cols))
     924                 :             :                 {
     925                 :           0 :                         ExecClearTuple(slot);
     926                 :           0 :                         continue;
     927                 :             :                 }
     928                 :             : 
     929                 :           0 :                 rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
     930         [ #  # ]:           0 :                 Assert(!isnull);
     931                 :             : 
     932                 :           0 :                 lrel->attnames[natt] = rel_colname;
     933                 :           0 :                 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
     934         [ #  # ]:           0 :                 Assert(!isnull);
     935                 :             : 
     936         [ #  # ]:           0 :                 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
     937                 :           0 :                         lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     938                 :             : 
     939                 :             :                 /* Remember if the remote table has published any generated column. */
     940   [ #  #  #  # ]:           0 :                 if (server_version >= 180000 && !(*gencol_published))
     941                 :             :                 {
     942                 :           0 :                         *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
     943         [ #  # ]:           0 :                         Assert(!isnull);
     944                 :           0 :                 }
     945                 :             : 
     946                 :             :                 /* Should never happen. */
     947         [ #  # ]:           0 :                 if (++natt >= MaxTupleAttributeNumber)
     948   [ #  #  #  # ]:           0 :                         elog(ERROR, "too many columns in remote table \"%s.%s\"",
     949                 :             :                                  nspname, relname);
     950                 :             : 
     951                 :           0 :                 ExecClearTuple(slot);
     952         [ #  # ]:           0 :         }
     953                 :           0 :         ExecDropSingleTupleTableSlot(slot);
     954                 :             : 
     955                 :           0 :         lrel->natts = natt;
     956                 :             : 
     957                 :           0 :         walrcv_clear_result(res);
     958                 :             : 
     959                 :             :         /*
     960                 :             :          * Get relation's row filter expressions. DISTINCT avoids the same
     961                 :             :          * expression of a table in multiple publications from being included
     962                 :             :          * multiple times in the final expression.
     963                 :             :          *
     964                 :             :          * We need to copy the row even if it matches just one of the
     965                 :             :          * publications, so we later combine all the quals with OR.
     966                 :             :          *
     967                 :             :          * For initial synchronization, row filtering can be ignored in following
     968                 :             :          * cases:
     969                 :             :          *
     970                 :             :          * 1) one of the subscribed publications for the table hasn't specified
     971                 :             :          * any row filter
     972                 :             :          *
     973                 :             :          * 2) one of the subscribed publications has puballtables set to true
     974                 :             :          *
     975                 :             :          * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
     976                 :             :          * that includes this relation
     977                 :             :          */
     978         [ #  # ]:           0 :         if (server_version >= 150000)
     979                 :             :         {
     980                 :             :                 /* Reuse the already-built pub_names. */
     981         [ #  # ]:           0 :                 Assert(pub_names != NULL);
     982                 :             : 
     983                 :             :                 /* Check for row filters. */
     984                 :           0 :                 resetStringInfo(&cmd);
     985                 :           0 :                 appendStringInfo(&cmd,
     986                 :             :                                                  "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
     987                 :             :                                                  "  FROM pg_publication p,"
     988                 :             :                                                  "  LATERAL pg_get_publication_tables(p.pubname) gpt"
     989                 :             :                                                  " WHERE gpt.relid = %u"
     990                 :             :                                                  "   AND p.pubname IN ( %s )",
     991                 :           0 :                                                  lrel->remoteid,
     992                 :           0 :                                                  pub_names->data);
     993                 :             : 
     994                 :           0 :                 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
     995                 :             : 
     996         [ #  # ]:           0 :                 if (res->status != WALRCV_OK_TUPLES)
     997   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     998                 :             :                                         (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
     999                 :             :                                                         nspname, relname, res->err)));
    1000                 :             : 
    1001                 :             :                 /*
    1002                 :             :                  * Multiple row filter expressions for the same table will be combined
    1003                 :             :                  * by COPY using OR. If any of the filter expressions for this table
    1004                 :             :                  * are null, it means the whole table will be copied. In this case it
    1005                 :             :                  * is not necessary to construct a unified row filter expression at
    1006                 :             :                  * all.
    1007                 :             :                  */
    1008                 :           0 :                 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1009         [ #  # ]:           0 :                 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1010                 :             :                 {
    1011                 :           0 :                         Datum           rf = slot_getattr(slot, 1, &isnull);
    1012                 :             : 
    1013         [ #  # ]:           0 :                         if (!isnull)
    1014                 :           0 :                                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
    1015                 :             :                         else
    1016                 :             :                         {
    1017                 :             :                                 /* Ignore filters and cleanup as necessary. */
    1018         [ #  # ]:           0 :                                 if (*qual)
    1019                 :             :                                 {
    1020                 :           0 :                                         list_free_deep(*qual);
    1021                 :           0 :                                         *qual = NIL;
    1022                 :           0 :                                 }
    1023                 :           0 :                                 break;
    1024                 :             :                         }
    1025                 :             : 
    1026                 :           0 :                         ExecClearTuple(slot);
    1027         [ #  # ]:           0 :                 }
    1028                 :           0 :                 ExecDropSingleTupleTableSlot(slot);
    1029                 :             : 
    1030                 :           0 :                 walrcv_clear_result(res);
    1031                 :           0 :                 destroyStringInfo(pub_names);
    1032                 :           0 :         }
    1033                 :             : 
    1034                 :           0 :         pfree(cmd.data);
    1035                 :           0 : }
    1036                 :             : 
    1037                 :             : /*
    1038                 :             :  * Copy existing data of a table from publisher.
    1039                 :             :  *
    1040                 :             :  * Caller is responsible for locking the local relation.
    1041                 :             :  */
    1042                 :             : static void
    1043                 :           0 : copy_table(Relation rel)
    1044                 :             : {
    1045                 :           0 :         LogicalRepRelMapEntry *relmapentry;
    1046                 :           0 :         LogicalRepRelation lrel;
    1047                 :           0 :         List       *qual = NIL;
    1048                 :           0 :         WalRcvExecResult *res;
    1049                 :           0 :         StringInfoData cmd;
    1050                 :           0 :         CopyFromState cstate;
    1051                 :           0 :         List       *attnamelist;
    1052                 :           0 :         ParseState *pstate;
    1053                 :           0 :         List       *options = NIL;
    1054                 :           0 :         bool            gencol_published = false;
    1055                 :             : 
    1056                 :             :         /* Get the publisher relation info. */
    1057                 :           0 :         fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
    1058                 :           0 :                                                         RelationGetRelationName(rel), &lrel, &qual,
    1059                 :             :                                                         &gencol_published);
    1060                 :             : 
    1061                 :             :         /* Put the relation into relmap. */
    1062                 :           0 :         logicalrep_relmap_update(&lrel);
    1063                 :             : 
    1064                 :             :         /* Map the publisher relation to local one. */
    1065                 :           0 :         relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
    1066         [ #  # ]:           0 :         Assert(rel == relmapentry->localrel);
    1067                 :             : 
    1068                 :             :         /* Start copy on the publisher. */
    1069                 :           0 :         initStringInfo(&cmd);
    1070                 :             : 
    1071                 :             :         /* Regular or partitioned table with no row filter or generated columns */
    1072         [ #  # ]:           0 :         if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
    1073   [ #  #  #  # ]:           0 :                 && qual == NIL && !gencol_published)
    1074                 :             :         {
    1075                 :           0 :                 appendStringInfo(&cmd, "COPY %s",
    1076                 :           0 :                                                  quote_qualified_identifier(lrel.nspname, lrel.relname));
    1077                 :             : 
    1078                 :             :                 /* If the table has columns, then specify the columns */
    1079         [ #  # ]:           0 :                 if (lrel.natts)
    1080                 :             :                 {
    1081                 :           0 :                         appendStringInfoString(&cmd, " (");
    1082                 :             : 
    1083                 :             :                         /*
    1084                 :             :                          * XXX Do we need to list the columns in all cases? Maybe we're
    1085                 :             :                          * replicating all columns?
    1086                 :             :                          */
    1087         [ #  # ]:           0 :                         for (int i = 0; i < lrel.natts; i++)
    1088                 :             :                         {
    1089         [ #  # ]:           0 :                                 if (i > 0)
    1090                 :           0 :                                         appendStringInfoString(&cmd, ", ");
    1091                 :             : 
    1092                 :           0 :                                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1093                 :           0 :                         }
    1094                 :             : 
    1095                 :           0 :                         appendStringInfoChar(&cmd, ')');
    1096                 :           0 :                 }
    1097                 :             : 
    1098                 :           0 :                 appendStringInfoString(&cmd, " TO STDOUT");
    1099                 :           0 :         }
    1100                 :             :         else
    1101                 :             :         {
    1102                 :             :                 /*
    1103                 :             :                  * For non-tables and tables with row filters, we need to do COPY
    1104                 :             :                  * (SELECT ...), but we can't just do SELECT * because we may need to
    1105                 :             :                  * copy only subset of columns including generated columns. For tables
    1106                 :             :                  * with any row filters, build a SELECT query with OR'ed row filters
    1107                 :             :                  * for COPY.
    1108                 :             :                  *
    1109                 :             :                  * We also need to use this same COPY (SELECT ...) syntax when
    1110                 :             :                  * generated columns are published, because copy of generated columns
    1111                 :             :                  * is not supported by the normal COPY.
    1112                 :             :                  */
    1113                 :           0 :                 appendStringInfoString(&cmd, "COPY (SELECT ");
    1114         [ #  # ]:           0 :                 for (int i = 0; i < lrel.natts; i++)
    1115                 :             :                 {
    1116                 :           0 :                         appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1117         [ #  # ]:           0 :                         if (i < lrel.natts - 1)
    1118                 :           0 :                                 appendStringInfoString(&cmd, ", ");
    1119                 :           0 :                 }
    1120                 :             : 
    1121                 :           0 :                 appendStringInfoString(&cmd, " FROM ");
    1122                 :             : 
    1123                 :             :                 /*
    1124                 :             :                  * For regular tables, make sure we don't copy data from a child that
    1125                 :             :                  * inherits the named table as those will be copied separately.
    1126                 :             :                  */
    1127         [ #  # ]:           0 :                 if (lrel.relkind == RELKIND_RELATION)
    1128                 :           0 :                         appendStringInfoString(&cmd, "ONLY ");
    1129                 :             : 
    1130                 :           0 :                 appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
    1131                 :             :                 /* list of OR'ed filters */
    1132         [ #  # ]:           0 :                 if (qual != NIL)
    1133                 :             :                 {
    1134                 :           0 :                         ListCell   *lc;
    1135                 :           0 :                         char       *q = strVal(linitial(qual));
    1136                 :             : 
    1137                 :           0 :                         appendStringInfo(&cmd, " WHERE %s", q);
    1138   [ #  #  #  #  :           0 :                         for_each_from(lc, qual, 1)
                   #  # ]
    1139                 :             :                         {
    1140                 :           0 :                                 q = strVal(lfirst(lc));
    1141                 :           0 :                                 appendStringInfo(&cmd, " OR %s", q);
    1142                 :           0 :                         }
    1143                 :           0 :                         list_free_deep(qual);
    1144                 :           0 :                 }
    1145                 :             : 
    1146                 :           0 :                 appendStringInfoString(&cmd, ") TO STDOUT");
    1147                 :             :         }
    1148                 :             : 
    1149                 :             :         /*
    1150                 :             :          * Prior to v16, initial table synchronization will use text format even
    1151                 :             :          * if the binary option is enabled for a subscription.
    1152                 :             :          */
    1153   [ #  #  #  # ]:           0 :         if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
    1154                 :           0 :                 MySubscription->binary)
    1155                 :             :         {
    1156                 :           0 :                 appendStringInfoString(&cmd, " WITH (FORMAT binary)");
    1157                 :           0 :                 options = list_make1(makeDefElem("format",
    1158                 :             :                                                                                  (Node *) makeString("binary"), -1));
    1159                 :           0 :         }
    1160                 :             : 
    1161                 :           0 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
    1162                 :           0 :         pfree(cmd.data);
    1163         [ #  # ]:           0 :         if (res->status != WALRCV_OK_COPY_OUT)
    1164   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1165                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1166                 :             :                                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
    1167                 :             :                                                 lrel.nspname, lrel.relname, res->err)));
    1168                 :           0 :         walrcv_clear_result(res);
    1169                 :             : 
    1170                 :           0 :         copybuf = makeStringInfo();
    1171                 :             : 
    1172                 :           0 :         pstate = make_parsestate(NULL);
    1173                 :           0 :         (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
    1174                 :             :                                                                                  NULL, false, false);
    1175                 :             : 
    1176                 :           0 :         attnamelist = make_copy_attnamelist(relmapentry);
    1177                 :           0 :         cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
    1178                 :             : 
    1179                 :             :         /* Do the copy */
    1180                 :           0 :         (void) CopyFrom(cstate);
    1181                 :             : 
    1182                 :           0 :         logicalrep_rel_close(relmapentry, NoLock);
    1183                 :           0 : }
    1184                 :             : 
    1185                 :             : /*
    1186                 :             :  * Determine the tablesync slot name.
    1187                 :             :  *
    1188                 :             :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
    1189                 :             :  * on slot name length. We append system_identifier to avoid slot_name
    1190                 :             :  * collision with subscriptions in other clusters. With the current scheme
    1191                 :             :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
    1192                 :             :  * length of slot_name will be 50.
    1193                 :             :  *
    1194                 :             :  * The returned slot name is stored in the supplied buffer (syncslotname) with
    1195                 :             :  * the given size.
    1196                 :             :  *
    1197                 :             :  * Note: We don't use the subscription slot name as part of tablesync slot name
    1198                 :             :  * because we are responsible for cleaning up these slots and it could become
    1199                 :             :  * impossible to recalculate what name to cleanup if the subscription slot name
    1200                 :             :  * had changed.
    1201                 :             :  */
    1202                 :             : void
    1203                 :           0 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
    1204                 :             :                                                                 char *syncslotname, Size szslot)
    1205                 :             : {
    1206                 :           0 :         snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
    1207                 :           0 :                          relid, GetSystemIdentifier());
    1208                 :           0 : }
    1209                 :             : 
    1210                 :             : /*
    1211                 :             :  * Start syncing the table in the sync worker.
    1212                 :             :  *
    1213                 :             :  * If nothing needs to be done to sync the table, we exit the worker without
    1214                 :             :  * any further action.
    1215                 :             :  *
    1216                 :             :  * The returned slot name is palloc'ed in current memory context.
    1217                 :             :  */
    1218                 :             : static char *
    1219                 :           0 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    1220                 :             : {
    1221                 :           0 :         char       *slotname;
    1222                 :           0 :         char       *err;
    1223                 :           0 :         char            relstate;
    1224                 :           0 :         XLogRecPtr      relstate_lsn;
    1225                 :           0 :         Relation        rel;
    1226                 :           0 :         AclResult       aclresult;
    1227                 :           0 :         WalRcvExecResult *res;
    1228                 :           0 :         char            originname[NAMEDATALEN];
    1229                 :           0 :         RepOriginId originid;
    1230                 :           0 :         UserContext ucxt;
    1231                 :           0 :         bool            must_use_password;
    1232                 :           0 :         bool            run_as_owner;
    1233                 :             : 
    1234                 :             :         /* Check the state of the table synchronization. */
    1235                 :           0 :         StartTransactionCommand();
    1236                 :           0 :         relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
    1237                 :           0 :                                                                            MyLogicalRepWorker->relid,
    1238                 :             :                                                                            &relstate_lsn);
    1239                 :           0 :         CommitTransactionCommand();
    1240                 :             : 
    1241                 :             :         /* Is the use of a password mandatory? */
    1242         [ #  # ]:           0 :         must_use_password = MySubscription->passwordrequired &&
    1243                 :           0 :                 !MySubscription->ownersuperuser;
    1244                 :             : 
    1245         [ #  # ]:           0 :         SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1246                 :           0 :         MyLogicalRepWorker->relstate = relstate;
    1247                 :           0 :         MyLogicalRepWorker->relstate_lsn = relstate_lsn;
    1248                 :           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1249                 :             : 
    1250                 :             :         /*
    1251                 :             :          * If synchronization is already done or no longer necessary, exit now
    1252                 :             :          * that we've updated shared memory state.
    1253                 :             :          */
    1254         [ #  # ]:           0 :         switch (relstate)
    1255                 :             :         {
    1256                 :             :                 case SUBREL_STATE_SYNCDONE:
    1257                 :             :                 case SUBREL_STATE_READY:
    1258                 :             :                 case SUBREL_STATE_UNKNOWN:
    1259                 :           0 :                         FinishSyncWorker(); /* doesn't return */
    1260                 :             :         }
    1261                 :             : 
    1262                 :             :         /* Calculate the name of the tablesync slot. */
    1263                 :           0 :         slotname = (char *) palloc(NAMEDATALEN);
    1264                 :           0 :         ReplicationSlotNameForTablesync(MySubscription->oid,
    1265                 :           0 :                                                                         MyLogicalRepWorker->relid,
    1266                 :           0 :                                                                         slotname,
    1267                 :             :                                                                         NAMEDATALEN);
    1268                 :             : 
    1269                 :             :         /*
    1270                 :             :          * Here we use the slot name instead of the subscription name as the
    1271                 :             :          * application_name, so that it is different from the leader apply worker,
    1272                 :             :          * so that synchronous replication can distinguish them.
    1273                 :             :          */
    1274                 :           0 :         LogRepWorkerWalRcvConn =
    1275                 :           0 :                 walrcv_connect(MySubscription->conninfo, true, true,
    1276                 :             :                                            must_use_password,
    1277                 :             :                                            slotname, &err);
    1278         [ #  # ]:           0 :         if (LogRepWorkerWalRcvConn == NULL)
    1279   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1280                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1281                 :             :                                  errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
    1282                 :             :                                                 MySubscription->name, err)));
    1283                 :             : 
    1284   [ #  #  #  #  :           0 :         Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
                   #  # ]
    1285                 :             :                    MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
    1286                 :             :                    MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
    1287                 :             : 
    1288                 :             :         /* Assign the origin tracking record name. */
    1289                 :           0 :         ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1290                 :           0 :                                                                            MyLogicalRepWorker->relid,
    1291                 :           0 :                                                                            originname,
    1292                 :             :                                                                            sizeof(originname));
    1293                 :             : 
    1294         [ #  # ]:           0 :         if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
    1295                 :             :         {
    1296                 :             :                 /*
    1297                 :             :                  * We have previously errored out before finishing the copy so the
    1298                 :             :                  * replication slot might exist. We want to remove the slot if it
    1299                 :             :                  * already exists and proceed.
    1300                 :             :                  *
    1301                 :             :                  * XXX We could also instead try to drop the slot, last time we failed
    1302                 :             :                  * but for that, we might need to clean up the copy state as it might
    1303                 :             :                  * be in the middle of fetching the rows. Also, if there is a network
    1304                 :             :                  * breakdown then it wouldn't have succeeded so trying it next time
    1305                 :             :                  * seems like a better bet.
    1306                 :             :                  */
    1307                 :           0 :                 ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
    1308                 :           0 :         }
    1309         [ #  # ]:           0 :         else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
    1310                 :             :         {
    1311                 :             :                 /*
    1312                 :             :                  * The COPY phase was previously done, but tablesync then crashed
    1313                 :             :                  * before it was able to finish normally.
    1314                 :             :                  */
    1315                 :           0 :                 StartTransactionCommand();
    1316                 :             : 
    1317                 :             :                 /*
    1318                 :             :                  * The origin tracking name must already exist. It was created first
    1319                 :             :                  * time this tablesync was launched.
    1320                 :             :                  */
    1321                 :           0 :                 originid = replorigin_by_name(originname, false);
    1322                 :           0 :                 replorigin_session_setup(originid, 0);
    1323                 :           0 :                 replorigin_session_origin = originid;
    1324                 :           0 :                 *origin_startpos = replorigin_session_get_progress(false);
    1325                 :             : 
    1326                 :           0 :                 CommitTransactionCommand();
    1327                 :             : 
    1328                 :           0 :                 goto copy_table_done;
    1329                 :             :         }
    1330                 :             : 
    1331         [ #  # ]:           0 :         SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1332                 :           0 :         MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
    1333                 :           0 :         MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
    1334                 :           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1335                 :             : 
    1336                 :             :         /*
    1337                 :             :          * Update the state, create the replication origin, and make them visible
    1338                 :             :          * to others.
    1339                 :             :          */
    1340                 :           0 :         StartTransactionCommand();
    1341                 :           0 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1342                 :           0 :                                                            MyLogicalRepWorker->relid,
    1343                 :           0 :                                                            MyLogicalRepWorker->relstate,
    1344                 :           0 :                                                            MyLogicalRepWorker->relstate_lsn,
    1345                 :             :                                                            false);
    1346                 :             : 
    1347                 :             :         /*
    1348                 :             :          * Create the replication origin in a separate transaction from the one
    1349                 :             :          * that sets up the origin in shared memory. This prevents the risk that
    1350                 :             :          * changes to the origin in shared memory cannot be rolled back if the
    1351                 :             :          * transaction aborts.
    1352                 :             :          */
    1353                 :           0 :         originid = replorigin_by_name(originname, true);
    1354         [ #  # ]:           0 :         if (!OidIsValid(originid))
    1355                 :           0 :                 originid = replorigin_create(originname);
    1356                 :             : 
    1357                 :           0 :         CommitTransactionCommand();
    1358                 :           0 :         pgstat_report_stat(true);
    1359                 :             : 
    1360                 :           0 :         StartTransactionCommand();
    1361                 :             : 
    1362                 :             :         /*
    1363                 :             :          * Use a standard write lock here. It might be better to disallow access
    1364                 :             :          * to the table while it's being synchronized. But we don't want to block
    1365                 :             :          * the main apply process from working and it has to open the relation in
    1366                 :             :          * RowExclusiveLock when remapping remote relation id to local one.
    1367                 :             :          */
    1368                 :           0 :         rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
    1369                 :             : 
    1370                 :             :         /*
    1371                 :             :          * Start a transaction in the remote node in REPEATABLE READ mode.  This
    1372                 :             :          * ensures that both the replication slot we create (see below) and the
    1373                 :             :          * COPY are consistent with each other.
    1374                 :             :          */
    1375                 :           0 :         res = walrcv_exec(LogRepWorkerWalRcvConn,
    1376                 :             :                                           "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
    1377                 :             :                                           0, NULL);
    1378         [ #  # ]:           0 :         if (res->status != WALRCV_OK_COMMAND)
    1379   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1380                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1381                 :             :                                  errmsg("table copy could not start transaction on publisher: %s",
    1382                 :             :                                                 res->err)));
    1383                 :           0 :         walrcv_clear_result(res);
    1384                 :             : 
    1385                 :             :         /*
    1386                 :             :          * Create a new permanent logical decoding slot. This slot will be used
    1387                 :             :          * for the catchup phase after COPY is done, so tell it to use the
    1388                 :             :          * snapshot to make the final data consistent.
    1389                 :             :          */
    1390                 :           0 :         walrcv_create_slot(LogRepWorkerWalRcvConn,
    1391                 :             :                                            slotname, false /* permanent */ , false /* two_phase */ ,
    1392                 :             :                                            MySubscription->failover,
    1393                 :             :                                            CRS_USE_SNAPSHOT, origin_startpos);
    1394                 :             : 
    1395                 :             :         /*
    1396                 :             :          * Advance the origin to the LSN got from walrcv_create_slot and then set
    1397                 :             :          * up the origin. The advancement is WAL logged for the purpose of
    1398                 :             :          * recovery. Locks are to prevent the replication origin from vanishing
    1399                 :             :          * while advancing.
    1400                 :             :          *
    1401                 :             :          * The purpose of doing these before the copy is to avoid doing the copy
    1402                 :             :          * again due to any error in advancing or setting up origin tracking.
    1403                 :             :          */
    1404                 :           0 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1405                 :           0 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
    1406                 :             :                                            true /* go backward */ , true /* WAL log */ );
    1407                 :           0 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1408                 :             : 
    1409                 :           0 :         replorigin_session_setup(originid, 0);
    1410                 :           0 :         replorigin_session_origin = originid;
    1411                 :             : 
    1412                 :             :         /*
    1413                 :             :          * If the user did not opt to run as the owner of the subscription
    1414                 :             :          * ('run_as_owner'), then copy the table as the owner of the table.
    1415                 :             :          */
    1416                 :           0 :         run_as_owner = MySubscription->runasowner;
    1417         [ #  # ]:           0 :         if (!run_as_owner)
    1418                 :           0 :                 SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
    1419                 :             : 
    1420                 :             :         /*
    1421                 :             :          * Check that our table sync worker has permission to insert into the
    1422                 :             :          * target table.
    1423                 :             :          */
    1424                 :           0 :         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    1425                 :             :                                                                   ACL_INSERT);
    1426         [ #  # ]:           0 :         if (aclresult != ACLCHECK_OK)
    1427                 :           0 :                 aclcheck_error(aclresult,
    1428                 :           0 :                                            get_relkind_objtype(rel->rd_rel->relkind),
    1429                 :           0 :                                            RelationGetRelationName(rel));
    1430                 :             : 
    1431                 :             :         /*
    1432                 :             :          * COPY FROM does not honor RLS policies.  That is not a problem for
    1433                 :             :          * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
    1434                 :             :          * who has it implicitly), but other roles should not be able to
    1435                 :             :          * circumvent RLS.  Disallow logical replication into RLS enabled
    1436                 :             :          * relations for such roles.
    1437                 :             :          */
    1438         [ #  # ]:           0 :         if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
    1439   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1440                 :             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1441                 :             :                                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    1442                 :             :                                                 GetUserNameFromId(GetUserId(), true),
    1443                 :             :                                                 RelationGetRelationName(rel))));
    1444                 :             : 
    1445                 :             :         /* Now do the initial data copy */
    1446                 :           0 :         PushActiveSnapshot(GetTransactionSnapshot());
    1447                 :           0 :         copy_table(rel);
    1448                 :           0 :         PopActiveSnapshot();
    1449                 :             : 
    1450                 :           0 :         res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
    1451         [ #  # ]:           0 :         if (res->status != WALRCV_OK_COMMAND)
    1452   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1453                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1454                 :             :                                  errmsg("table copy could not finish transaction on publisher: %s",
    1455                 :             :                                                 res->err)));
    1456                 :           0 :         walrcv_clear_result(res);
    1457                 :             : 
    1458         [ #  # ]:           0 :         if (!run_as_owner)
    1459                 :           0 :                 RestoreUserContext(&ucxt);
    1460                 :             : 
    1461                 :           0 :         table_close(rel, NoLock);
    1462                 :             : 
    1463                 :             :         /* Make the copy visible. */
    1464                 :           0 :         CommandCounterIncrement();
    1465                 :             : 
    1466                 :             :         /*
    1467                 :             :          * Update the persisted state to indicate the COPY phase is done; make it
    1468                 :             :          * visible to others.
    1469                 :             :          */
    1470                 :           0 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1471                 :           0 :                                                            MyLogicalRepWorker->relid,
    1472                 :             :                                                            SUBREL_STATE_FINISHEDCOPY,
    1473                 :           0 :                                                            MyLogicalRepWorker->relstate_lsn,
    1474                 :             :                                                            false);
    1475                 :             : 
    1476                 :           0 :         CommitTransactionCommand();
    1477                 :             : 
    1478                 :             : copy_table_done:
    1479                 :             : 
    1480   [ #  #  #  # ]:           0 :         elog(DEBUG1,
    1481                 :             :                  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
    1482                 :             :                  originname, LSN_FORMAT_ARGS(*origin_startpos));
    1483                 :             : 
    1484                 :             :         /*
    1485                 :             :          * We are done with the initial data synchronization, update the state.
    1486                 :             :          */
    1487         [ #  # ]:           0 :         SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1488                 :           0 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
    1489                 :           0 :         MyLogicalRepWorker->relstate_lsn = *origin_startpos;
    1490                 :           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1491                 :             : 
    1492                 :             :         /*
    1493                 :             :          * Finally, wait until the leader apply worker tells us to catch up and
    1494                 :             :          * then return to let LogicalRepApplyLoop do it.
    1495                 :             :          */
    1496                 :           0 :         wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
    1497                 :           0 :         return slotname;
    1498                 :           0 : }
    1499                 :             : 
    1500                 :             : /*
    1501                 :             :  * Execute the initial sync with error handling. Disable the subscription,
    1502                 :             :  * if it's required.
    1503                 :             :  *
    1504                 :             :  * Allocate the slot name in long-lived context on return. Note that we don't
    1505                 :             :  * handle FATAL errors which are probably because of system resource error and
    1506                 :             :  * are not repeatable.
    1507                 :             :  */
    1508                 :             : static void
    1509                 :           0 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
    1510                 :             : {
    1511                 :           0 :         char       *sync_slotname = NULL;
    1512                 :             : 
    1513         [ #  # ]:           0 :         Assert(am_tablesync_worker());
    1514                 :             : 
    1515         [ #  # ]:           0 :         PG_TRY();
    1516                 :             :         {
    1517                 :             :                 /* Call initial sync. */
    1518                 :           0 :                 sync_slotname = LogicalRepSyncTableStart(origin_startpos);
    1519                 :             :         }
    1520                 :           0 :         PG_CATCH();
    1521                 :             :         {
    1522         [ #  # ]:           0 :                 if (MySubscription->disableonerr)
    1523                 :           0 :                         DisableSubscriptionAndExit();
    1524                 :             :                 else
    1525                 :             :                 {
    1526                 :             :                         /*
    1527                 :             :                          * Report the worker failed during table synchronization. Abort
    1528                 :             :                          * the current transaction so that the stats message is sent in an
    1529                 :             :                          * idle state.
    1530                 :             :                          */
    1531                 :           0 :                         AbortOutOfAnyTransaction();
    1532                 :           0 :                         pgstat_report_subscription_error(MySubscription->oid,
    1533                 :             :                                                                                          WORKERTYPE_TABLESYNC);
    1534                 :             : 
    1535                 :           0 :                         PG_RE_THROW();
    1536                 :             :                 }
    1537                 :             :         }
    1538         [ #  # ]:           0 :         PG_END_TRY();
    1539                 :             : 
    1540                 :             :         /* allocate slot name in long-lived context */
    1541                 :           0 :         *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
    1542                 :           0 :         pfree(sync_slotname);
    1543                 :           0 : }
    1544                 :             : 
    1545                 :             : /*
    1546                 :             :  * Runs the tablesync worker.
    1547                 :             :  *
    1548                 :             :  * It starts syncing tables. After a successful sync, sets streaming options
    1549                 :             :  * and starts streaming to catchup with apply worker.
    1550                 :             :  */
    1551                 :             : static void
    1552                 :           0 : run_tablesync_worker(void)
    1553                 :             : {
    1554                 :           0 :         char            originname[NAMEDATALEN];
    1555                 :           0 :         XLogRecPtr      origin_startpos = InvalidXLogRecPtr;
    1556                 :           0 :         char       *slotname = NULL;
    1557                 :           0 :         WalRcvStreamOptions options;
    1558                 :             : 
    1559                 :           0 :         start_table_sync(&origin_startpos, &slotname);
    1560                 :             : 
    1561                 :           0 :         ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1562                 :           0 :                                                                            MyLogicalRepWorker->relid,
    1563                 :           0 :                                                                            originname,
    1564                 :             :                                                                            sizeof(originname));
    1565                 :             : 
    1566                 :           0 :         set_apply_error_context_origin(originname);
    1567                 :             : 
    1568                 :           0 :         set_stream_options(&options, slotname, &origin_startpos);
    1569                 :             : 
    1570                 :           0 :         walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    1571                 :             : 
    1572                 :             :         /* Apply the changes till we catchup with the apply worker. */
    1573                 :           0 :         start_apply(origin_startpos);
    1574                 :           0 : }
    1575                 :             : 
    1576                 :             : /* Logical Replication Tablesync worker entry point */
    1577                 :             : void
    1578                 :           0 : TableSyncWorkerMain(Datum main_arg)
    1579                 :             : {
    1580                 :           0 :         int                     worker_slot = DatumGetInt32(main_arg);
    1581                 :             : 
    1582                 :           0 :         SetupApplyOrSyncWorker(worker_slot);
    1583                 :             : 
    1584                 :           0 :         run_tablesync_worker();
    1585                 :             : 
    1586                 :           0 :         FinishSyncWorker();
    1587                 :             : }
    1588                 :             : 
    1589                 :             : /*
    1590                 :             :  * If the subscription has no tables then return false.
    1591                 :             :  *
    1592                 :             :  * Otherwise, are all tablesyncs READY?
    1593                 :             :  *
    1594                 :             :  * Note: This function is not suitable to be called from outside of apply or
    1595                 :             :  * tablesync workers because MySubscription needs to be already initialized.
    1596                 :             :  */
    1597                 :             : bool
    1598                 :           0 : AllTablesyncsReady(void)
    1599                 :             : {
    1600                 :           0 :         bool            started_tx;
    1601                 :           0 :         bool            has_tables;
    1602                 :             : 
    1603                 :             :         /* We need up-to-date sync state info for subscription tables here. */
    1604                 :           0 :         FetchRelationStates(&has_tables, NULL, &started_tx);
    1605                 :             : 
    1606         [ #  # ]:           0 :         if (started_tx)
    1607                 :             :         {
    1608                 :           0 :                 CommitTransactionCommand();
    1609                 :           0 :                 pgstat_report_stat(true);
    1610                 :           0 :         }
    1611                 :             : 
    1612                 :             :         /*
    1613                 :             :          * Return false when there are no tables in subscription or not all tables
    1614                 :             :          * are in ready state; true otherwise.
    1615                 :             :          */
    1616         [ #  # ]:           0 :         return has_tables && (table_states_not_ready == NIL);
    1617                 :           0 : }
    1618                 :             : 
    1619                 :             : /*
    1620                 :             :  * Return whether the subscription currently has any tables.
    1621                 :             :  *
    1622                 :             :  * Note: Unlike HasSubscriptionTables(), this function relies on cached
    1623                 :             :  * information for subscription tables. Additionally, it should not be
    1624                 :             :  * invoked outside of apply or tablesync workers, as MySubscription must be
    1625                 :             :  * initialized first.
    1626                 :             :  */
    1627                 :             : bool
    1628                 :           0 : HasSubscriptionTablesCached(void)
    1629                 :             : {
    1630                 :           0 :         bool            started_tx;
    1631                 :           0 :         bool            has_tables;
    1632                 :             : 
    1633                 :             :         /* We need up-to-date subscription tables info here */
    1634                 :           0 :         FetchRelationStates(&has_tables, NULL, &started_tx);
    1635                 :             : 
    1636         [ #  # ]:           0 :         if (started_tx)
    1637                 :             :         {
    1638                 :           0 :                 CommitTransactionCommand();
    1639                 :           0 :                 pgstat_report_stat(true);
    1640                 :           0 :         }
    1641                 :             : 
    1642                 :           0 :         return has_tables;
    1643                 :           0 : }
    1644                 :             : 
    1645                 :             : /*
    1646                 :             :  * Update the two_phase state of the specified subscription in pg_subscription.
    1647                 :             :  */
    1648                 :             : void
    1649                 :           0 : UpdateTwoPhaseState(Oid suboid, char new_state)
    1650                 :             : {
    1651                 :           0 :         Relation        rel;
    1652                 :           0 :         HeapTuple       tup;
    1653                 :           0 :         bool            nulls[Natts_pg_subscription];
    1654                 :           0 :         bool            replaces[Natts_pg_subscription];
    1655                 :           0 :         Datum           values[Natts_pg_subscription];
    1656                 :             : 
    1657   [ #  #  #  #  :           0 :         Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
                   #  # ]
    1658                 :             :                    new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
    1659                 :             :                    new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
    1660                 :             : 
    1661                 :           0 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1662                 :           0 :         tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
    1663         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
    1664   [ #  #  #  # ]:           0 :                 elog(ERROR,
    1665                 :             :                          "cache lookup failed for subscription oid %u",
    1666                 :             :                          suboid);
    1667                 :             : 
    1668                 :             :         /* Form a new tuple. */
    1669                 :           0 :         memset(values, 0, sizeof(values));
    1670                 :           0 :         memset(nulls, false, sizeof(nulls));
    1671                 :           0 :         memset(replaces, false, sizeof(replaces));
    1672                 :             : 
    1673                 :             :         /* And update/set two_phase state */
    1674                 :           0 :         values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
    1675                 :           0 :         replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1676                 :             : 
    1677                 :           0 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel),
    1678                 :           0 :                                                         values, nulls, replaces);
    1679                 :           0 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    1680                 :             : 
    1681                 :           0 :         heap_freetuple(tup);
    1682                 :           0 :         table_close(rel, RowExclusiveLock);
    1683                 :           0 : }
        

Generated by: LCOV version 2.3.2-1