LCOV - code coverage report
Current view: top level - src/include/replication - worker_internal.h (source / functions) Coverage Total Hit
Test: Code coverage Lines: 70.0 % 10 7
Test Date: 2026-01-26 10:56:24 Functions: 75.0 % 4 3
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 30.0 % 10 3

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * worker_internal.h
       4                 :             :  *        Internal headers shared by logical replication workers.
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 2016-2026, PostgreSQL Global Development Group
       7                 :             :  *
       8                 :             :  * src/include/replication/worker_internal.h
       9                 :             :  *
      10                 :             :  *-------------------------------------------------------------------------
      11                 :             :  */
      12                 :             : #ifndef WORKER_INTERNAL_H
      13                 :             : #define WORKER_INTERNAL_H
      14                 :             : 
      15                 :             : #include "access/xlogdefs.h"
      16                 :             : #include "catalog/pg_subscription.h"
      17                 :             : #include "datatype/timestamp.h"
      18                 :             : #include "miscadmin.h"
      19                 :             : #include "replication/logicalrelation.h"
      20                 :             : #include "replication/walreceiver.h"
      21                 :             : #include "storage/buffile.h"
      22                 :             : #include "storage/fileset.h"
      23                 :             : #include "storage/lock.h"
      24                 :             : #include "storage/shm_mq.h"
      25                 :             : #include "storage/shm_toc.h"
      26                 :             : #include "storage/spin.h"
      27                 :             : 
      28                 :             : /* Different types of worker */
      29                 :             : typedef enum LogicalRepWorkerType
      30                 :             : {
      31                 :             :         WORKERTYPE_UNKNOWN = 0,
      32                 :             :         WORKERTYPE_TABLESYNC,
      33                 :             :         WORKERTYPE_SEQUENCESYNC,
      34                 :             :         WORKERTYPE_APPLY,
      35                 :             :         WORKERTYPE_PARALLEL_APPLY,
      36                 :             : } LogicalRepWorkerType;
      37                 :             : 
      38                 :             : typedef struct LogicalRepWorker
      39                 :             : {
      40                 :             :         /* What type of worker is this? */
      41                 :             :         LogicalRepWorkerType type;
      42                 :             : 
      43                 :             :         /* Time at which this worker was launched. */
      44                 :             :         TimestampTz launch_time;
      45                 :             : 
      46                 :             :         /* Indicates if this slot is used or free. */
      47                 :             :         bool            in_use;
      48                 :             : 
      49                 :             :         /* Increased every time the slot is taken by new worker. */
      50                 :             :         uint16          generation;
      51                 :             : 
      52                 :             :         /* Pointer to proc array. NULL if not running. */
      53                 :             :         PGPROC     *proc;
      54                 :             : 
      55                 :             :         /* Database id to connect to. */
      56                 :             :         Oid                     dbid;
      57                 :             : 
      58                 :             :         /* User to use for connection (will be same as owner of subscription). */
      59                 :             :         Oid                     userid;
      60                 :             : 
      61                 :             :         /* Subscription id for the worker. */
      62                 :             :         Oid                     subid;
      63                 :             : 
      64                 :             :         /* Used for initial table synchronization. */
      65                 :             :         Oid                     relid;
      66                 :             :         char            relstate;
      67                 :             :         XLogRecPtr      relstate_lsn;
      68                 :             :         slock_t         relmutex;
      69                 :             : 
      70                 :             :         /*
      71                 :             :          * Used to create the changes and subxact files for the streaming
      72                 :             :          * transactions.  Upon the arrival of the first streaming transaction or
      73                 :             :          * when the first-time leader apply worker times out while sending changes
      74                 :             :          * to the parallel apply worker, the fileset will be initialized, and it
      75                 :             :          * will be deleted when the worker exits.  Under this, separate buffiles
      76                 :             :          * would be created for each transaction which will be deleted after the
      77                 :             :          * transaction is finished.
      78                 :             :          */
      79                 :             :         FileSet    *stream_fileset;
      80                 :             : 
      81                 :             :         /*
      82                 :             :          * PID of leader apply worker if this slot is used for a parallel apply
      83                 :             :          * worker, InvalidPid otherwise.
      84                 :             :          */
      85                 :             :         pid_t           leader_pid;
      86                 :             : 
      87                 :             :         /* Indicates whether apply can be performed in parallel. */
      88                 :             :         bool            parallel_apply;
      89                 :             : 
      90                 :             :         /*
      91                 :             :          * Changes made by this transaction and subsequent ones must be preserved.
      92                 :             :          * This ensures that update_deleted conflicts can be accurately detected
      93                 :             :          * during the apply phase of logical replication by this worker.
      94                 :             :          *
      95                 :             :          * The logical replication launcher manages an internal replication slot
      96                 :             :          * named "pg_conflict_detection". It asynchronously collects this ID to
      97                 :             :          * decide when to advance the xmin value of the slot.
      98                 :             :          *
      99                 :             :          * This ID is set to InvalidTransactionId when the apply worker stops
     100                 :             :          * retaining information needed for conflict detection.
     101                 :             :          */
     102                 :             :         TransactionId oldest_nonremovable_xid;
     103                 :             : 
     104                 :             :         /* Stats. */
     105                 :             :         XLogRecPtr      last_lsn;
     106                 :             :         TimestampTz last_send_time;
     107                 :             :         TimestampTz last_recv_time;
     108                 :             :         XLogRecPtr      reply_lsn;
     109                 :             :         TimestampTz reply_time;
     110                 :             : 
     111                 :             :         TimestampTz last_seqsync_start_time;
     112                 :             : } LogicalRepWorker;
     113                 :             : 
     114                 :             : /*
     115                 :             :  * State of the transaction in parallel apply worker.
     116                 :             :  *
     117                 :             :  * The enum values must have the same order as the transaction state
     118                 :             :  * transitions.
     119                 :             :  */
     120                 :             : typedef enum ParallelTransState
     121                 :             : {
     122                 :             :         PARALLEL_TRANS_UNKNOWN,
     123                 :             :         PARALLEL_TRANS_STARTED,
     124                 :             :         PARALLEL_TRANS_FINISHED,
     125                 :             : } ParallelTransState;
     126                 :             : 
     127                 :             : /*
     128                 :             :  * State of fileset used to communicate changes from leader to parallel
     129                 :             :  * apply worker.
     130                 :             :  *
     131                 :             :  * FS_EMPTY indicates an initial state where the leader doesn't need to use
     132                 :             :  * the file to communicate with the parallel apply worker.
     133                 :             :  *
     134                 :             :  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
     135                 :             :  * to the file.
     136                 :             :  *
     137                 :             :  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
     138                 :             :  * the file.
     139                 :             :  *
     140                 :             :  * FS_READY indicates that it is now ok for a parallel apply worker to
     141                 :             :  * read the file.
     142                 :             :  */
     143                 :             : typedef enum PartialFileSetState
     144                 :             : {
     145                 :             :         FS_EMPTY,
     146                 :             :         FS_SERIALIZE_IN_PROGRESS,
     147                 :             :         FS_SERIALIZE_DONE,
     148                 :             :         FS_READY,
     149                 :             : } PartialFileSetState;
     150                 :             : 
     151                 :             : /*
     152                 :             :  * Struct for sharing information between leader apply worker and parallel
     153                 :             :  * apply workers.
     154                 :             :  */
     155                 :             : typedef struct ParallelApplyWorkerShared
     156                 :             : {
     157                 :             :         slock_t         mutex;
     158                 :             : 
     159                 :             :         TransactionId xid;
     160                 :             : 
     161                 :             :         /*
     162                 :             :          * State used to ensure commit ordering.
     163                 :             :          *
     164                 :             :          * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
     165                 :             :          * handling the transaction finish commands while the apply leader will
     166                 :             :          * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
     167                 :             :          * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
     168                 :             :          * STREAM_ABORT).
     169                 :             :          */
     170                 :             :         ParallelTransState xact_state;
     171                 :             : 
     172                 :             :         /* Information from the corresponding LogicalRepWorker slot. */
     173                 :             :         uint16          logicalrep_worker_generation;
     174                 :             :         int                     logicalrep_worker_slot_no;
     175                 :             : 
     176                 :             :         /*
     177                 :             :          * Indicates whether there are pending streaming blocks in the queue. The
     178                 :             :          * parallel apply worker will check it before starting to wait.
     179                 :             :          */
     180                 :             :         pg_atomic_uint32 pending_stream_count;
     181                 :             : 
     182                 :             :         /*
     183                 :             :          * XactLastCommitEnd from the parallel apply worker. This is required by
     184                 :             :          * the leader worker so it can update the lsn_mappings.
     185                 :             :          */
     186                 :             :         XLogRecPtr      last_commit_end;
     187                 :             : 
     188                 :             :         /*
     189                 :             :          * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
     190                 :             :          * serialize changes to the file, and share the fileset with the parallel
     191                 :             :          * apply worker when processing the transaction finish command. Then the
     192                 :             :          * parallel apply worker will apply all the spooled messages.
     193                 :             :          *
     194                 :             :          * FileSet is used here instead of SharedFileSet because we need it to
     195                 :             :          * survive after releasing the shared memory so that the leader apply
     196                 :             :          * worker can re-use the same fileset for the next streaming transaction.
     197                 :             :          */
     198                 :             :         PartialFileSetState fileset_state;
     199                 :             :         FileSet         fileset;
     200                 :             : } ParallelApplyWorkerShared;
     201                 :             : 
     202                 :             : /*
     203                 :             :  * Information which is used to manage the parallel apply worker.
     204                 :             :  */
     205                 :             : typedef struct ParallelApplyWorkerInfo
     206                 :             : {
     207                 :             :         /*
     208                 :             :          * This queue is used to send changes from the leader apply worker to the
     209                 :             :          * parallel apply worker.
     210                 :             :          */
     211                 :             :         shm_mq_handle *mq_handle;
     212                 :             : 
     213                 :             :         /*
     214                 :             :          * This queue is used to transfer error messages from the parallel apply
     215                 :             :          * worker to the leader apply worker.
     216                 :             :          */
     217                 :             :         shm_mq_handle *error_mq_handle;
     218                 :             : 
     219                 :             :         dsm_segment *dsm_seg;
     220                 :             : 
     221                 :             :         /*
     222                 :             :          * Indicates whether the leader apply worker needs to serialize the
     223                 :             :          * remaining changes to a file due to timeout when attempting to send data
     224                 :             :          * to the parallel apply worker via shared memory.
     225                 :             :          */
     226                 :             :         bool            serialize_changes;
     227                 :             : 
     228                 :             :         /*
     229                 :             :          * True if the worker is being used to process a parallel apply
     230                 :             :          * transaction. False indicates this worker is available for re-use.
     231                 :             :          */
     232                 :             :         bool            in_use;
     233                 :             : 
     234                 :             :         ParallelApplyWorkerShared *shared;
     235                 :             : } ParallelApplyWorkerInfo;
     236                 :             : 
     237                 :             : /* Main memory context for apply worker. Permanent during worker lifetime. */
     238                 :             : extern PGDLLIMPORT MemoryContext ApplyContext;
     239                 :             : 
     240                 :             : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
     241                 :             : 
     242                 :             : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
     243                 :             : 
     244                 :             : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
     245                 :             : 
     246                 :             : /* libpqreceiver connection */
     247                 :             : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
     248                 :             : 
     249                 :             : /* Worker and subscription objects. */
     250                 :             : extern PGDLLIMPORT Subscription *MySubscription;
     251                 :             : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
     252                 :             : 
     253                 :             : extern PGDLLIMPORT bool in_remote_transaction;
     254                 :             : 
     255                 :             : extern PGDLLIMPORT bool InitializingApplyWorker;
     256                 :             : 
     257                 :             : extern PGDLLIMPORT List *table_states_not_ready;
     258                 :             : 
     259                 :             : extern void logicalrep_worker_attach(int slot);
     260                 :             : extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
     261                 :             :                                                                                                 Oid subid, Oid relid,
     262                 :             :                                                                                                 bool only_running);
     263                 :             : extern List *logicalrep_workers_find(Oid subid, bool only_running,
     264                 :             :                                                                          bool acquire_lock);
     265                 :             : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
     266                 :             :                                                                          Oid dbid, Oid subid, const char *subname,
     267                 :             :                                                                          Oid userid, Oid relid,
     268                 :             :                                                                          dsm_handle subworker_dsm,
     269                 :             :                                                                          bool retain_dead_tuples);
     270                 :             : extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
     271                 :             :                                                                    Oid relid);
     272                 :             : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
     273                 :             : extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
     274                 :             :                                                                          Oid relid);
     275                 :             : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
     276                 :             : 
     277                 :             : extern void logicalrep_reset_seqsync_start_time(void);
     278                 :             : extern int      logicalrep_sync_worker_count(Oid subid);
     279                 :             : 
     280                 :             : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
     281                 :             :                                                                                            char *originname, Size szoriginname);
     282                 :             : 
     283                 :             : extern bool AllTablesyncsReady(void);
     284                 :             : extern bool HasSubscriptionTablesCached(void);
     285                 :             : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
     286                 :             : 
     287                 :             : extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
     288                 :             : extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
     289                 :             : extern void ProcessSequencesForSync(void);
     290                 :             : 
     291                 :             : pg_noreturn extern void FinishSyncWorker(void);
     292                 :             : extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
     293                 :             : extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
     294                 :             :                                                            Oid relid, TimestampTz *last_start_time);
     295                 :             : extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
     296                 :             : extern void FetchRelationStates(bool *has_pending_subtables,
     297                 :             :                                                                 bool *has_pending_subsequences, bool *started_tx);
     298                 :             : 
     299                 :             : extern void stream_start_internal(TransactionId xid, bool first_segment);
     300                 :             : extern void stream_stop_internal(TransactionId xid);
     301                 :             : 
     302                 :             : /* Common streaming function to apply all the spooled messages */
     303                 :             : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
     304                 :             :                                                                    XLogRecPtr lsn);
     305                 :             : 
     306                 :             : extern void apply_dispatch(StringInfo s);
     307                 :             : 
     308                 :             : extern void maybe_reread_subscription(void);
     309                 :             : 
     310                 :             : extern void stream_cleanup_files(Oid subid, TransactionId xid);
     311                 :             : 
     312                 :             : extern void set_stream_options(WalRcvStreamOptions *options,
     313                 :             :                                                            char *slotname,
     314                 :             :                                                            XLogRecPtr *origin_startpos);
     315                 :             : 
     316                 :             : extern void start_apply(XLogRecPtr origin_startpos);
     317                 :             : 
     318                 :             : extern void InitializeLogRepWorker(void);
     319                 :             : 
     320                 :             : extern void SetupApplyOrSyncWorker(int worker_slot);
     321                 :             : 
     322                 :             : extern void DisableSubscriptionAndExit(void);
     323                 :             : 
     324                 :             : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
     325                 :             : 
     326                 :             : /* Function for apply error callback */
     327                 :             : extern void apply_error_callback(void *arg);
     328                 :             : extern void set_apply_error_context_origin(char *originname);
     329                 :             : 
     330                 :             : /* Parallel apply worker setup and interactions */
     331                 :             : extern void pa_allocate_worker(TransactionId xid);
     332                 :             : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
     333                 :             : extern void pa_detach_all_error_mq(void);
     334                 :             : 
     335                 :             : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
     336                 :             :                                                  const void *data);
     337                 :             : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
     338                 :             :                                                                                    bool stream_locked);
     339                 :             : 
     340                 :             : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
     341                 :             :                                                           ParallelTransState xact_state);
     342                 :             : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
     343                 :             : 
     344                 :             : extern void pa_start_subtrans(TransactionId current_xid,
     345                 :             :                                                           TransactionId top_xid);
     346                 :             : extern void pa_reset_subtrans(void);
     347                 :             : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
     348                 :             : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
     349                 :             :                                                                  PartialFileSetState fileset_state);
     350                 :             : 
     351                 :             : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
     352                 :             : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
     353                 :             : 
     354                 :             : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
     355                 :             : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
     356                 :             : 
     357                 :             : extern void pa_decr_and_wait_stream_block(void);
     358                 :             : 
     359                 :             : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
     360                 :             :                                                    XLogRecPtr remote_lsn);
     361                 :             : 
     362                 :             : #define isParallelApplyWorker(worker) ((worker)->in_use && \
     363                 :             :                                                                            (worker)->type == WORKERTYPE_PARALLEL_APPLY)
     364                 :             : #define isTableSyncWorker(worker) ((worker)->in_use && \
     365                 :             :                                                                    (worker)->type == WORKERTYPE_TABLESYNC)
     366                 :             : #define isSequenceSyncWorker(worker) ((worker)->in_use && \
     367                 :             :                                                                           (worker)->type == WORKERTYPE_SEQUENCESYNC)
     368                 :             : 
     369                 :             : static inline bool
     370                 :           1 : am_tablesync_worker(void)
     371                 :             : {
     372         [ -  + ]:           1 :         return isTableSyncWorker(MyLogicalRepWorker);
     373                 :             : }
     374                 :             : 
     375                 :             : static inline bool
     376                 :           1 : am_sequencesync_worker(void)
     377                 :             : {
     378         [ -  + ]:           1 :         return isSequenceSyncWorker(MyLogicalRepWorker);
     379                 :             : }
     380                 :             : 
     381                 :             : static inline bool
     382                 :           2 : am_leader_apply_worker(void)
     383                 :             : {
     384         [ +  - ]:           2 :         Assert(MyLogicalRepWorker->in_use);
     385                 :           2 :         return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
     386                 :             : }
     387                 :             : 
     388                 :             : static inline bool
     389                 :           0 : am_parallel_apply_worker(void)
     390                 :             : {
     391         [ #  # ]:           0 :         Assert(MyLogicalRepWorker->in_use);
     392         [ #  # ]:           0 :         return isParallelApplyWorker(MyLogicalRepWorker);
     393                 :             : }
     394                 :             : 
     395                 :             : #endif                                                  /* WORKER_INTERNAL_H */
        

Generated by: LCOV version 2.3.2-1