LCOV - code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.8 % 524 4
Test Date: 2026-01-26 10:56:24 Functions: 2.8 % 36 1
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 0.7 % 287 2

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  * applyparallelworker.c
       3                 :             :  *         Support routines for applying xact by parallel apply worker
       4                 :             :  *
       5                 :             :  * Copyright (c) 2023-2026, PostgreSQL Global Development Group
       6                 :             :  *
       7                 :             :  * IDENTIFICATION
       8                 :             :  *        src/backend/replication/logical/applyparallelworker.c
       9                 :             :  *
      10                 :             :  * This file contains the code to launch, set up, and teardown a parallel apply
      11                 :             :  * worker which receives the changes from the leader worker and invokes routines
      12                 :             :  * to apply those on the subscriber database. Additionally, this file contains
      13                 :             :  * routines that are intended to support setting up, using, and tearing down a
      14                 :             :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
      15                 :             :  * apply workers can communicate with each other.
      16                 :             :  *
      17                 :             :  * The parallel apply workers are assigned (if available) as soon as xact's
      18                 :             :  * first stream is received for subscriptions that have set their 'streaming'
      19                 :             :  * option as parallel. The leader apply worker will send changes to this new
      20                 :             :  * worker via shared memory. We keep this worker assigned till the transaction
      21                 :             :  * commit is received and also wait for the worker to finish at commit. This
      22                 :             :  * preserves commit ordering and avoid file I/O in most cases, although we
      23                 :             :  * still need to spill to a file if there is no worker available. See comments
      24                 :             :  * atop logical/worker to know more about streamed xacts whose changes are
      25                 :             :  * spilled to disk. It is important to maintain commit order to avoid failures
      26                 :             :  * due to: (a) transaction dependencies - say if we insert a row in the first
      27                 :             :  * transaction and update it in the second transaction on publisher then
      28                 :             :  * allowing the subscriber to apply both in parallel can lead to failure in the
      29                 :             :  * update; (b) deadlocks - allowing transactions that update the same set of
      30                 :             :  * rows/tables in the opposite order to be applied in parallel can lead to
      31                 :             :  * deadlocks.
      32                 :             :  *
      33                 :             :  * A worker pool is used to avoid restarting workers for each streaming
      34                 :             :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
      35                 :             :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
      36                 :             :  * its information is added to the ParallelApplyWorkerPool. Once the worker
      37                 :             :  * finishes applying the transaction, it is marked as available for re-use.
      38                 :             :  * Now, before starting a new worker to apply the streaming transaction, we
      39                 :             :  * check the list for any available worker. Note that we retain a maximum of
      40                 :             :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
      41                 :             :  * after that, we simply exit the worker after applying the transaction.
      42                 :             :  *
      43                 :             :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
      44                 :             :  * variable for this in the future if required.
      45                 :             :  *
      46                 :             :  * The leader apply worker will create a separate dynamic shared memory segment
      47                 :             :  * when each parallel apply worker starts. The reason for this design is that
      48                 :             :  * we cannot predict how many workers will be needed. It may be possible to
      49                 :             :  * allocate enough shared memory in one segment based on the maximum number of
      50                 :             :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
      51                 :             :  * this would waste memory if no process is actually started.
      52                 :             :  *
      53                 :             :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
      54                 :             :  * send changes in the transaction from leader apply worker to parallel apply
      55                 :             :  * worker; (b) another shm_mq that is used to send errors (and other messages
      56                 :             :  * reported via elog/ereport) from the parallel apply worker to leader apply
      57                 :             :  * worker; (c) necessary information to be shared among parallel apply workers
      58                 :             :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
      59                 :             :  *
      60                 :             :  * Locking Considerations
      61                 :             :  * ----------------------
      62                 :             :  * We have a risk of deadlock due to concurrently applying the transactions in
      63                 :             :  * parallel mode that were independent on the publisher side but became
      64                 :             :  * dependent on the subscriber side due to the different database structures
      65                 :             :  * (like schema of subscription tables, constraints, etc.) on each side. This
      66                 :             :  * can happen even without parallel mode when there are concurrent operations
      67                 :             :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
      68                 :             :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
      69                 :             :  * next stream (set of changes) and LA waits for PA to finish the transaction.
      70                 :             :  * An alternative approach could be to not allow parallelism when the schema of
      71                 :             :  * tables is different between the publisher and subscriber but that would be
      72                 :             :  * too restrictive and would require the publisher to send much more
      73                 :             :  * information than it is currently sending.
      74                 :             :  *
      75                 :             :  * Consider a case where the subscribed table does not have a unique key on the
      76                 :             :  * publisher and has a unique key on the subscriber. The deadlock can happen in
      77                 :             :  * the following ways:
      78                 :             :  *
      79                 :             :  * 1) Deadlock between the leader apply worker and a parallel apply worker
      80                 :             :  *
      81                 :             :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
      82                 :             :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
      83                 :             :  * Now, LA is waiting for PA because of the unique key constraint of the
      84                 :             :  * subscribed table while PA is waiting for LA to send the next stream of
      85                 :             :  * changes or transaction finish command message.
      86                 :             :  *
      87                 :             :  * In order for lmgr to detect this, we have LA acquire a session lock on the
      88                 :             :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
      89                 :             :  * trying to receive the next stream of changes. Specifically, LA will acquire
      90                 :             :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
      91                 :             :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
      92                 :             :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
      93                 :             :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
      94                 :             :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
      95                 :             :  * after acquiring it.
      96                 :             :  *
      97                 :             :  * The lock graph for the above example will look as follows:
      98                 :             :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
      99                 :             :  * acquire the stream lock) -> LA
     100                 :             :  *
     101                 :             :  * This way, when PA is waiting for LA for the next stream of changes, we can
     102                 :             :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
     103                 :             :  * deadlock between LA and PA.
     104                 :             :  *
     105                 :             :  * 2) Deadlock between the leader apply worker and parallel apply workers
     106                 :             :  *
     107                 :             :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
     108                 :             :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
     109                 :             :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
     110                 :             :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
     111                 :             :  * transaction in order to preserve the commit order. There is a deadlock among
     112                 :             :  * the three processes.
     113                 :             :  *
     114                 :             :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
     115                 :             :  * a different lock than referred in the previous case, see
     116                 :             :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
     117                 :             :  * the lock before proceeding in the transaction finish commands. Specifically,
     118                 :             :  * PA will acquire this lock in AccessExclusive mode before executing the first
     119                 :             :  * message of the transaction and release it at the xact end. LA will acquire
     120                 :             :  * this lock in AccessShare mode at transaction finish commands and release it
     121                 :             :  * immediately.
     122                 :             :  *
     123                 :             :  * The lock graph for the above example will look as follows:
     124                 :             :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
     125                 :             :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
     126                 :             :  * lock) -> LA
     127                 :             :  *
     128                 :             :  * This way when LA is waiting to finish the transaction end command to preserve
     129                 :             :  * the commit order, we will be able to detect deadlock, if any.
     130                 :             :  *
     131                 :             :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
     132                 :             :  * considers PREPARED TRANSACTION as still in progress which means the lock
     133                 :             :  * won't be released even after the parallel apply worker has prepared the
     134                 :             :  * transaction.
     135                 :             :  *
     136                 :             :  * 3) Deadlock when the shm_mq buffer is full
     137                 :             :  *
     138                 :             :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
     139                 :             :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
     140                 :             :  * wait to send messages, and this wait doesn't appear in lmgr.
     141                 :             :  *
     142                 :             :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
     143                 :             :  * the timeout is exceeded, the LA will serialize all the pending messages to
     144                 :             :  * a file and indicate PA-2 that it needs to read that file for the remaining
     145                 :             :  * messages. Then LA will start waiting for commit as in the previous case
     146                 :             :  * which will detect deadlock if any. See pa_send_data() and
     147                 :             :  * enum TransApplyAction.
     148                 :             :  *
     149                 :             :  * Lock types
     150                 :             :  * ----------
     151                 :             :  * Both the stream lock and the transaction lock mentioned above are
     152                 :             :  * session-level locks because both locks could be acquired outside the
     153                 :             :  * transaction, and the stream lock in the leader needs to persist across
     154                 :             :  * transaction boundaries i.e. until the end of the streaming transaction.
     155                 :             :  *-------------------------------------------------------------------------
     156                 :             :  */
     157                 :             : 
     158                 :             : #include "postgres.h"
     159                 :             : 
     160                 :             : #include "libpq/pqformat.h"
     161                 :             : #include "libpq/pqmq.h"
     162                 :             : #include "pgstat.h"
     163                 :             : #include "postmaster/interrupt.h"
     164                 :             : #include "replication/logicallauncher.h"
     165                 :             : #include "replication/logicalworker.h"
     166                 :             : #include "replication/origin.h"
     167                 :             : #include "replication/worker_internal.h"
     168                 :             : #include "storage/ipc.h"
     169                 :             : #include "storage/lmgr.h"
     170                 :             : #include "tcop/tcopprot.h"
     171                 :             : #include "utils/inval.h"
     172                 :             : #include "utils/memutils.h"
     173                 :             : #include "utils/syscache.h"
     174                 :             : 
     175                 :             : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
     176                 :             : 
     177                 :             : /*
     178                 :             :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
     179                 :             :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
     180                 :             :  * can use small integers.
     181                 :             :  */
     182                 :             : #define PARALLEL_APPLY_KEY_SHARED               1
     183                 :             : #define PARALLEL_APPLY_KEY_MQ                   2
     184                 :             : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
     185                 :             : 
     186                 :             : /* Queue size of DSM, 16 MB for now. */
     187                 :             : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
     188                 :             : 
     189                 :             : /*
     190                 :             :  * Error queue size of DSM. It is desirable to make it large enough that a
     191                 :             :  * typical ErrorResponse can be sent without blocking. That way, a worker that
     192                 :             :  * errors out can write the whole message into the queue and terminate without
     193                 :             :  * waiting for the user backend.
     194                 :             :  */
     195                 :             : #define DSM_ERROR_QUEUE_SIZE                    (16 * 1024)
     196                 :             : 
     197                 :             : /*
     198                 :             :  * There are three fields in each message received by the parallel apply
     199                 :             :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
     200                 :             :  * statistics in the leader apply worker, we can ignore these fields in the
     201                 :             :  * parallel apply worker (see function LogicalRepApplyLoop).
     202                 :             :  */
     203                 :             : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
     204                 :             : 
     205                 :             : /*
     206                 :             :  * The type of session-level lock on a transaction being applied on a logical
     207                 :             :  * replication subscriber.
     208                 :             :  */
     209                 :             : #define PARALLEL_APPLY_LOCK_STREAM      0
     210                 :             : #define PARALLEL_APPLY_LOCK_XACT        1
     211                 :             : 
     212                 :             : /*
     213                 :             :  * Hash table entry to map xid to the parallel apply worker state.
     214                 :             :  */
     215                 :             : typedef struct ParallelApplyWorkerEntry
     216                 :             : {
     217                 :             :         TransactionId xid;                      /* Hash key -- must be first */
     218                 :             :         ParallelApplyWorkerInfo *winfo;
     219                 :             : } ParallelApplyWorkerEntry;
     220                 :             : 
     221                 :             : /*
     222                 :             :  * A hash table used to cache the state of streaming transactions being applied
     223                 :             :  * by the parallel apply workers.
     224                 :             :  */
     225                 :             : static HTAB *ParallelApplyTxnHash = NULL;
     226                 :             : 
     227                 :             : /*
     228                 :             : * A list (pool) of active parallel apply workers. The information for
     229                 :             : * the new worker is added to the list after successfully launching it. The
     230                 :             : * list entry is removed if there are already enough workers in the worker
     231                 :             : * pool at the end of the transaction. For more information about the worker
     232                 :             : * pool, see comments atop this file.
     233                 :             :  */
     234                 :             : static List *ParallelApplyWorkerPool = NIL;
     235                 :             : 
     236                 :             : /*
     237                 :             :  * Information shared between leader apply worker and parallel apply worker.
     238                 :             :  */
     239                 :             : ParallelApplyWorkerShared *MyParallelShared = NULL;
     240                 :             : 
     241                 :             : /*
     242                 :             :  * Is there a message sent by a parallel apply worker that the leader apply
     243                 :             :  * worker needs to receive?
     244                 :             :  */
     245                 :             : volatile sig_atomic_t ParallelApplyMessagePending = false;
     246                 :             : 
     247                 :             : /*
     248                 :             :  * Cache the parallel apply worker information required for applying the
     249                 :             :  * current streaming transaction. It is used to save the cost of searching the
     250                 :             :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
     251                 :             :  */
     252                 :             : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
     253                 :             : 
     254                 :             : /* A list to maintain subtransactions, if any. */
     255                 :             : static List *subxactlist = NIL;
     256                 :             : 
     257                 :             : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
     258                 :             : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
     259                 :             : static PartialFileSetState pa_get_fileset_state(void);
     260                 :             : 
     261                 :             : /*
     262                 :             :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
     263                 :             :  */
     264                 :             : static bool
     265                 :           0 : pa_can_start(void)
     266                 :             : {
     267                 :             :         /* Only leader apply workers can start parallel apply workers. */
     268         [ #  # ]:           0 :         if (!am_leader_apply_worker())
     269                 :           0 :                 return false;
     270                 :             : 
     271                 :             :         /*
     272                 :             :          * It is good to check for any change in the subscription parameter to
     273                 :             :          * avoid the case where for a very long time the change doesn't get
     274                 :             :          * reflected. This can happen when there is a constant flow of streaming
     275                 :             :          * transactions that are handled by parallel apply workers.
     276                 :             :          *
     277                 :             :          * It is better to do it before the below checks so that the latest values
     278                 :             :          * of subscription can be used for the checks.
     279                 :             :          */
     280                 :           0 :         maybe_reread_subscription();
     281                 :             : 
     282                 :             :         /*
     283                 :             :          * Don't start a new parallel apply worker if the subscription is not
     284                 :             :          * using parallel streaming mode, or if the publisher does not support
     285                 :             :          * parallel apply.
     286                 :             :          */
     287         [ #  # ]:           0 :         if (!MyLogicalRepWorker->parallel_apply)
     288                 :           0 :                 return false;
     289                 :             : 
     290                 :             :         /*
     291                 :             :          * Don't start a new parallel worker if user has set skiplsn as it's
     292                 :             :          * possible that they want to skip the streaming transaction. For
     293                 :             :          * streaming transactions, we need to serialize the transaction to a file
     294                 :             :          * so that we can get the last LSN of the transaction to judge whether to
     295                 :             :          * skip before starting to apply the change.
     296                 :             :          *
     297                 :             :          * One might think that we could allow parallelism if the first lsn of the
     298                 :             :          * transaction is greater than skiplsn, but we don't send it with the
     299                 :             :          * STREAM START message, and it doesn't seem worth sending the extra eight
     300                 :             :          * bytes with the STREAM START to enable parallelism for this case.
     301                 :             :          */
     302         [ #  # ]:           0 :         if (XLogRecPtrIsValid(MySubscription->skiplsn))
     303                 :           0 :                 return false;
     304                 :             : 
     305                 :             :         /*
     306                 :             :          * For streaming transactions that are being applied using a parallel
     307                 :             :          * apply worker, we cannot decide whether to apply the change for a
     308                 :             :          * relation that is not in the READY state (see
     309                 :             :          * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
     310                 :             :          * time. So, we don't start the new parallel apply worker in this case.
     311                 :             :          */
     312         [ #  # ]:           0 :         if (!AllTablesyncsReady())
     313                 :           0 :                 return false;
     314                 :             : 
     315                 :           0 :         return true;
     316                 :           0 : }
     317                 :             : 
     318                 :             : /*
     319                 :             :  * Set up a dynamic shared memory segment.
     320                 :             :  *
     321                 :             :  * We set up a control region that contains a fixed-size worker info
     322                 :             :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
     323                 :             :  *
     324                 :             :  * Returns true on success, false on failure.
     325                 :             :  */
     326                 :             : static bool
     327                 :           0 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
     328                 :             : {
     329                 :           0 :         shm_toc_estimator e;
     330                 :           0 :         Size            segsize;
     331                 :           0 :         dsm_segment *seg;
     332                 :           0 :         shm_toc    *toc;
     333                 :           0 :         ParallelApplyWorkerShared *shared;
     334                 :           0 :         shm_mq     *mq;
     335                 :           0 :         Size            queue_size = DSM_QUEUE_SIZE;
     336                 :           0 :         Size            error_queue_size = DSM_ERROR_QUEUE_SIZE;
     337                 :             : 
     338                 :             :         /*
     339                 :             :          * Estimate how much shared memory we need.
     340                 :             :          *
     341                 :             :          * Because the TOC machinery may choose to insert padding of oddly-sized
     342                 :             :          * requests, we must estimate each chunk separately.
     343                 :             :          *
     344                 :             :          * We need one key to register the location of the header, and two other
     345                 :             :          * keys to track the locations of the message queue and the error message
     346                 :             :          * queue.
     347                 :             :          */
     348                 :           0 :         shm_toc_initialize_estimator(&e);
     349                 :           0 :         shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
     350                 :           0 :         shm_toc_estimate_chunk(&e, queue_size);
     351                 :           0 :         shm_toc_estimate_chunk(&e, error_queue_size);
     352                 :             : 
     353                 :           0 :         shm_toc_estimate_keys(&e, 3);
     354                 :           0 :         segsize = shm_toc_estimate(&e);
     355                 :             : 
     356                 :             :         /* Create the shared memory segment and establish a table of contents. */
     357                 :           0 :         seg = dsm_create(shm_toc_estimate(&e), 0);
     358         [ #  # ]:           0 :         if (!seg)
     359                 :           0 :                 return false;
     360                 :             : 
     361                 :           0 :         toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
     362                 :           0 :                                                  segsize);
     363                 :             : 
     364                 :             :         /* Set up the header region. */
     365                 :           0 :         shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
     366                 :           0 :         SpinLockInit(&shared->mutex);
     367                 :             : 
     368                 :           0 :         shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     369                 :           0 :         pg_atomic_init_u32(&(shared->pending_stream_count), 0);
     370                 :           0 :         shared->last_commit_end = InvalidXLogRecPtr;
     371                 :           0 :         shared->fileset_state = FS_EMPTY;
     372                 :             : 
     373                 :           0 :         shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
     374                 :             : 
     375                 :             :         /* Set up message queue for the worker. */
     376                 :           0 :         mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
     377                 :           0 :         shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
     378                 :           0 :         shm_mq_set_sender(mq, MyProc);
     379                 :             : 
     380                 :             :         /* Attach the queue. */
     381                 :           0 :         winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
     382                 :             : 
     383                 :             :         /* Set up error queue for the worker. */
     384                 :           0 :         mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
     385                 :           0 :                                            error_queue_size);
     386                 :           0 :         shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
     387                 :           0 :         shm_mq_set_receiver(mq, MyProc);
     388                 :             : 
     389                 :             :         /* Attach the queue. */
     390                 :           0 :         winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
     391                 :             : 
     392                 :             :         /* Return results to caller. */
     393                 :           0 :         winfo->dsm_seg = seg;
     394                 :           0 :         winfo->shared = shared;
     395                 :             : 
     396                 :           0 :         return true;
     397                 :           0 : }
     398                 :             : 
     399                 :             : /*
     400                 :             :  * Try to get a parallel apply worker from the pool. If none is available then
     401                 :             :  * start a new one.
     402                 :             :  */
     403                 :             : static ParallelApplyWorkerInfo *
     404                 :           0 : pa_launch_parallel_worker(void)
     405                 :             : {
     406                 :           0 :         MemoryContext oldcontext;
     407                 :           0 :         bool            launched;
     408                 :           0 :         ParallelApplyWorkerInfo *winfo;
     409                 :           0 :         ListCell   *lc;
     410                 :             : 
     411                 :             :         /* Try to get an available parallel apply worker from the worker pool. */
     412   [ #  #  #  #  :           0 :         foreach(lc, ParallelApplyWorkerPool)
             #  #  #  # ]
     413                 :             :         {
     414                 :           0 :                 winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     415                 :             : 
     416         [ #  # ]:           0 :                 if (!winfo->in_use)
     417                 :           0 :                         return winfo;
     418                 :           0 :         }
     419                 :             : 
     420                 :             :         /*
     421                 :             :          * Start a new parallel apply worker.
     422                 :             :          *
     423                 :             :          * The worker info can be used for the lifetime of the worker process, so
     424                 :             :          * create it in a permanent context.
     425                 :             :          */
     426                 :           0 :         oldcontext = MemoryContextSwitchTo(ApplyContext);
     427                 :             : 
     428                 :           0 :         winfo = palloc0_object(ParallelApplyWorkerInfo);
     429                 :             : 
     430                 :             :         /* Setup shared memory. */
     431         [ #  # ]:           0 :         if (!pa_setup_dsm(winfo))
     432                 :             :         {
     433                 :           0 :                 MemoryContextSwitchTo(oldcontext);
     434                 :           0 :                 pfree(winfo);
     435                 :           0 :                 return NULL;
     436                 :             :         }
     437                 :             : 
     438                 :           0 :         launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
     439                 :           0 :                                                                                 MyLogicalRepWorker->dbid,
     440                 :           0 :                                                                                 MySubscription->oid,
     441                 :           0 :                                                                                 MySubscription->name,
     442                 :           0 :                                                                                 MyLogicalRepWorker->userid,
     443                 :             :                                                                                 InvalidOid,
     444                 :           0 :                                                                                 dsm_segment_handle(winfo->dsm_seg),
     445                 :             :                                                                                 false);
     446                 :             : 
     447         [ #  # ]:           0 :         if (launched)
     448                 :             :         {
     449                 :           0 :                 ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
     450                 :           0 :         }
     451                 :             :         else
     452                 :             :         {
     453                 :           0 :                 pa_free_worker_info(winfo);
     454                 :           0 :                 winfo = NULL;
     455                 :             :         }
     456                 :             : 
     457                 :           0 :         MemoryContextSwitchTo(oldcontext);
     458                 :             : 
     459                 :           0 :         return winfo;
     460                 :           0 : }
     461                 :             : 
     462                 :             : /*
     463                 :             :  * Allocate a parallel apply worker that will be used for the specified xid.
     464                 :             :  *
     465                 :             :  * We first try to get an available worker from the pool, if any and then try
     466                 :             :  * to launch a new worker. On successful allocation, remember the worker
     467                 :             :  * information in the hash table so that we can get it later for processing the
     468                 :             :  * streaming changes.
     469                 :             :  */
     470                 :             : void
     471                 :           0 : pa_allocate_worker(TransactionId xid)
     472                 :             : {
     473                 :           0 :         bool            found;
     474                 :           0 :         ParallelApplyWorkerInfo *winfo = NULL;
     475                 :           0 :         ParallelApplyWorkerEntry *entry;
     476                 :             : 
     477         [ #  # ]:           0 :         if (!pa_can_start())
     478                 :           0 :                 return;
     479                 :             : 
     480                 :           0 :         winfo = pa_launch_parallel_worker();
     481         [ #  # ]:           0 :         if (!winfo)
     482                 :           0 :                 return;
     483                 :             : 
     484                 :             :         /* First time through, initialize parallel apply worker state hashtable. */
     485         [ #  # ]:           0 :         if (!ParallelApplyTxnHash)
     486                 :             :         {
     487                 :           0 :                 HASHCTL         ctl;
     488                 :             : 
     489   [ #  #  #  #  :           0 :                 MemSet(&ctl, 0, sizeof(ctl));
          #  #  #  #  #  
                      # ]
     490                 :           0 :                 ctl.keysize = sizeof(TransactionId);
     491                 :           0 :                 ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
     492                 :           0 :                 ctl.hcxt = ApplyContext;
     493                 :             : 
     494                 :           0 :                 ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
     495                 :             :                                                                                    16, &ctl,
     496                 :             :                                                                                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     497                 :           0 :         }
     498                 :             : 
     499                 :             :         /* Create an entry for the requested transaction. */
     500                 :           0 :         entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
     501         [ #  # ]:           0 :         if (found)
     502   [ #  #  #  # ]:           0 :                 elog(ERROR, "hash table corrupted");
     503                 :             : 
     504                 :             :         /* Update the transaction information in shared memory. */
     505         [ #  # ]:           0 :         SpinLockAcquire(&winfo->shared->mutex);
     506                 :           0 :         winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     507                 :           0 :         winfo->shared->xid = xid;
     508                 :           0 :         SpinLockRelease(&winfo->shared->mutex);
     509                 :             : 
     510                 :           0 :         winfo->in_use = true;
     511                 :           0 :         winfo->serialize_changes = false;
     512                 :           0 :         entry->winfo = winfo;
     513         [ #  # ]:           0 : }
     514                 :             : 
     515                 :             : /*
     516                 :             :  * Find the assigned worker for the given transaction, if any.
     517                 :             :  */
     518                 :             : ParallelApplyWorkerInfo *
     519                 :           0 : pa_find_worker(TransactionId xid)
     520                 :             : {
     521                 :           0 :         bool            found;
     522                 :           0 :         ParallelApplyWorkerEntry *entry;
     523                 :             : 
     524         [ #  # ]:           0 :         if (!TransactionIdIsValid(xid))
     525                 :           0 :                 return NULL;
     526                 :             : 
     527         [ #  # ]:           0 :         if (!ParallelApplyTxnHash)
     528                 :           0 :                 return NULL;
     529                 :             : 
     530                 :             :         /* Return the cached parallel apply worker if valid. */
     531         [ #  # ]:           0 :         if (stream_apply_worker)
     532                 :           0 :                 return stream_apply_worker;
     533                 :             : 
     534                 :             :         /* Find an entry for the requested transaction. */
     535                 :           0 :         entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
     536         [ #  # ]:           0 :         if (found)
     537                 :             :         {
     538                 :             :                 /* The worker must not have exited.  */
     539         [ #  # ]:           0 :                 Assert(entry->winfo->in_use);
     540                 :           0 :                 return entry->winfo;
     541                 :             :         }
     542                 :             : 
     543                 :           0 :         return NULL;
     544                 :           0 : }
     545                 :             : 
     546                 :             : /*
     547                 :             :  * Makes the worker available for reuse.
     548                 :             :  *
     549                 :             :  * This removes the parallel apply worker entry from the hash table so that it
     550                 :             :  * can't be used. If there are enough workers in the pool, it stops the worker
     551                 :             :  * and frees the corresponding info. Otherwise it just marks the worker as
     552                 :             :  * available for reuse.
     553                 :             :  *
     554                 :             :  * For more information about the worker pool, see comments atop this file.
     555                 :             :  */
     556                 :             : static void
     557                 :           0 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
     558                 :             : {
     559         [ #  # ]:           0 :         Assert(!am_parallel_apply_worker());
     560         [ #  # ]:           0 :         Assert(winfo->in_use);
     561         [ #  # ]:           0 :         Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
     562                 :             : 
     563         [ #  # ]:           0 :         if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
     564   [ #  #  #  # ]:           0 :                 elog(ERROR, "hash table corrupted");
     565                 :             : 
     566                 :             :         /*
     567                 :             :          * Stop the worker if there are enough workers in the pool.
     568                 :             :          *
     569                 :             :          * XXX Additionally, we also stop the worker if the leader apply worker
     570                 :             :          * serialize part of the transaction data due to a send timeout. This is
     571                 :             :          * because the message could be partially written to the queue and there
     572                 :             :          * is no way to clean the queue other than resending the message until it
     573                 :             :          * succeeds. Instead of trying to send the data which anyway would have
     574                 :             :          * been serialized and then letting the parallel apply worker deal with
     575                 :             :          * the spurious message, we stop the worker.
     576                 :             :          */
     577   [ #  #  #  # ]:           0 :         if (winfo->serialize_changes ||
     578                 :           0 :                 list_length(ParallelApplyWorkerPool) >
     579                 :           0 :                 (max_parallel_apply_workers_per_subscription / 2))
     580                 :             :         {
     581                 :           0 :                 logicalrep_pa_worker_stop(winfo);
     582                 :           0 :                 pa_free_worker_info(winfo);
     583                 :             : 
     584                 :           0 :                 return;
     585                 :             :         }
     586                 :             : 
     587                 :           0 :         winfo->in_use = false;
     588                 :           0 :         winfo->serialize_changes = false;
     589                 :           0 : }
     590                 :             : 
     591                 :             : /*
     592                 :             :  * Free the parallel apply worker information and unlink the files with
     593                 :             :  * serialized changes if any.
     594                 :             :  */
     595                 :             : static void
     596                 :           0 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
     597                 :             : {
     598         [ #  # ]:           0 :         Assert(winfo);
     599                 :             : 
     600         [ #  # ]:           0 :         if (winfo->mq_handle)
     601                 :           0 :                 shm_mq_detach(winfo->mq_handle);
     602                 :             : 
     603         [ #  # ]:           0 :         if (winfo->error_mq_handle)
     604                 :           0 :                 shm_mq_detach(winfo->error_mq_handle);
     605                 :             : 
     606                 :             :         /* Unlink the files with serialized changes. */
     607         [ #  # ]:           0 :         if (winfo->serialize_changes)
     608                 :           0 :                 stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
     609                 :             : 
     610         [ #  # ]:           0 :         if (winfo->dsm_seg)
     611                 :           0 :                 dsm_detach(winfo->dsm_seg);
     612                 :             : 
     613                 :             :         /* Remove from the worker pool. */
     614                 :           0 :         ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
     615                 :             : 
     616                 :           0 :         pfree(winfo);
     617                 :           0 : }
     618                 :             : 
     619                 :             : /*
     620                 :             :  * Detach the error queue for all parallel apply workers.
     621                 :             :  */
     622                 :             : void
     623                 :           1 : pa_detach_all_error_mq(void)
     624                 :             : {
     625                 :           1 :         ListCell   *lc;
     626                 :             : 
     627   [ -  +  #  #  :           1 :         foreach(lc, ParallelApplyWorkerPool)
                   +  - ]
     628                 :             :         {
     629                 :           0 :                 ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     630                 :             : 
     631         [ #  # ]:           0 :                 if (winfo->error_mq_handle)
     632                 :             :                 {
     633                 :           0 :                         shm_mq_detach(winfo->error_mq_handle);
     634                 :           0 :                         winfo->error_mq_handle = NULL;
     635                 :           0 :                 }
     636                 :           0 :         }
     637                 :           1 : }
     638                 :             : 
     639                 :             : /*
     640                 :             :  * Check if there are any pending spooled messages.
     641                 :             :  */
     642                 :             : static bool
     643                 :           0 : pa_has_spooled_message_pending(void)
     644                 :             : {
     645                 :           0 :         PartialFileSetState fileset_state;
     646                 :             : 
     647                 :           0 :         fileset_state = pa_get_fileset_state();
     648                 :             : 
     649                 :           0 :         return (fileset_state != FS_EMPTY);
     650                 :           0 : }
     651                 :             : 
     652                 :             : /*
     653                 :             :  * Replay the spooled messages once the leader apply worker has finished
     654                 :             :  * serializing changes to the file.
     655                 :             :  *
     656                 :             :  * Returns false if there aren't any pending spooled messages, true otherwise.
     657                 :             :  */
     658                 :             : static bool
     659                 :           0 : pa_process_spooled_messages_if_required(void)
     660                 :             : {
     661                 :           0 :         PartialFileSetState fileset_state;
     662                 :             : 
     663                 :           0 :         fileset_state = pa_get_fileset_state();
     664                 :             : 
     665         [ #  # ]:           0 :         if (fileset_state == FS_EMPTY)
     666                 :           0 :                 return false;
     667                 :             : 
     668                 :             :         /*
     669                 :             :          * If the leader apply worker is busy serializing the partial changes then
     670                 :             :          * acquire the stream lock now and wait for the leader worker to finish
     671                 :             :          * serializing the changes. Otherwise, the parallel apply worker won't get
     672                 :             :          * a chance to receive a STREAM_STOP (and acquire the stream lock) until
     673                 :             :          * the leader had serialized all changes which can lead to undetected
     674                 :             :          * deadlock.
     675                 :             :          *
     676                 :             :          * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
     677                 :             :          * worker has finished serializing the changes.
     678                 :             :          */
     679         [ #  # ]:           0 :         if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
     680                 :             :         {
     681                 :           0 :                 pa_lock_stream(MyParallelShared->xid, AccessShareLock);
     682                 :           0 :                 pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
     683                 :             : 
     684                 :           0 :                 fileset_state = pa_get_fileset_state();
     685                 :           0 :         }
     686                 :             : 
     687                 :             :         /*
     688                 :             :          * We cannot read the file immediately after the leader has serialized all
     689                 :             :          * changes to the file because there may still be messages in the memory
     690                 :             :          * queue. We will apply all spooled messages the next time we call this
     691                 :             :          * function and that will ensure there are no messages left in the memory
     692                 :             :          * queue.
     693                 :             :          */
     694         [ #  # ]:           0 :         if (fileset_state == FS_SERIALIZE_DONE)
     695                 :             :         {
     696                 :           0 :                 pa_set_fileset_state(MyParallelShared, FS_READY);
     697                 :           0 :         }
     698         [ #  # ]:           0 :         else if (fileset_state == FS_READY)
     699                 :             :         {
     700                 :           0 :                 apply_spooled_messages(&MyParallelShared->fileset,
     701                 :           0 :                                                            MyParallelShared->xid,
     702                 :             :                                                            InvalidXLogRecPtr);
     703                 :           0 :                 pa_set_fileset_state(MyParallelShared, FS_EMPTY);
     704                 :           0 :         }
     705                 :             : 
     706                 :           0 :         return true;
     707                 :           0 : }
     708                 :             : 
     709                 :             : /*
     710                 :             :  * Interrupt handler for main loop of parallel apply worker.
     711                 :             :  */
     712                 :             : static void
     713                 :           0 : ProcessParallelApplyInterrupts(void)
     714                 :             : {
     715         [ #  # ]:           0 :         CHECK_FOR_INTERRUPTS();
     716                 :             : 
     717         [ #  # ]:           0 :         if (ShutdownRequestPending)
     718                 :             :         {
     719   [ #  #  #  # ]:           0 :                 ereport(LOG,
     720                 :             :                                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
     721                 :             :                                                 MySubscription->name)));
     722                 :             : 
     723                 :           0 :                 proc_exit(0);
     724                 :             :         }
     725                 :             : 
     726         [ #  # ]:           0 :         if (ConfigReloadPending)
     727                 :             :         {
     728                 :           0 :                 ConfigReloadPending = false;
     729                 :           0 :                 ProcessConfigFile(PGC_SIGHUP);
     730                 :           0 :         }
     731                 :           0 : }
     732                 :             : 
     733                 :             : /* Parallel apply worker main loop. */
     734                 :             : static void
     735                 :           0 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
     736                 :             : {
     737                 :           0 :         shm_mq_result shmq_res;
     738                 :           0 :         ErrorContextCallback errcallback;
     739                 :           0 :         MemoryContext oldcxt = CurrentMemoryContext;
     740                 :             : 
     741                 :             :         /*
     742                 :             :          * Init the ApplyMessageContext which we clean up after each replication
     743                 :             :          * protocol message.
     744                 :             :          */
     745                 :           0 :         ApplyMessageContext = AllocSetContextCreate(ApplyContext,
     746                 :             :                                                                                                 "ApplyMessageContext",
     747                 :             :                                                                                                 ALLOCSET_DEFAULT_SIZES);
     748                 :             : 
     749                 :             :         /*
     750                 :             :          * Push apply error context callback. Fields will be filled while applying
     751                 :             :          * a change.
     752                 :             :          */
     753                 :           0 :         errcallback.callback = apply_error_callback;
     754                 :           0 :         errcallback.previous = error_context_stack;
     755                 :           0 :         error_context_stack = &errcallback;
     756                 :             : 
     757                 :           0 :         for (;;)
     758                 :             :         {
     759                 :           0 :                 void       *data;
     760                 :           0 :                 Size            len;
     761                 :             : 
     762                 :           0 :                 ProcessParallelApplyInterrupts();
     763                 :             : 
     764                 :             :                 /* Ensure we are reading the data into our memory context. */
     765                 :           0 :                 MemoryContextSwitchTo(ApplyMessageContext);
     766                 :             : 
     767                 :           0 :                 shmq_res = shm_mq_receive(mqh, &len, &data, true);
     768                 :             : 
     769         [ #  # ]:           0 :                 if (shmq_res == SHM_MQ_SUCCESS)
     770                 :             :                 {
     771                 :           0 :                         StringInfoData s;
     772                 :           0 :                         int                     c;
     773                 :             : 
     774         [ #  # ]:           0 :                         if (len == 0)
     775   [ #  #  #  # ]:           0 :                                 elog(ERROR, "invalid message length");
     776                 :             : 
     777                 :           0 :                         initReadOnlyStringInfo(&s, data, len);
     778                 :             : 
     779                 :             :                         /*
     780                 :             :                          * The first byte of messages sent from leader apply worker to
     781                 :             :                          * parallel apply workers can only be PqReplMsg_WALData.
     782                 :             :                          */
     783                 :           0 :                         c = pq_getmsgbyte(&s);
     784         [ #  # ]:           0 :                         if (c != PqReplMsg_WALData)
     785   [ #  #  #  # ]:           0 :                                 elog(ERROR, "unexpected message \"%c\"", c);
     786                 :             : 
     787                 :             :                         /*
     788                 :             :                          * Ignore statistics fields that have been updated by the leader
     789                 :             :                          * apply worker.
     790                 :             :                          *
     791                 :             :                          * XXX We can avoid sending the statistics fields from the leader
     792                 :             :                          * apply worker but for that, it needs to rebuild the entire
     793                 :             :                          * message by removing these fields which could be more work than
     794                 :             :                          * simply ignoring these fields in the parallel apply worker.
     795                 :             :                          */
     796                 :           0 :                         s.cursor += SIZE_STATS_MESSAGE;
     797                 :             : 
     798                 :           0 :                         apply_dispatch(&s);
     799                 :           0 :                 }
     800         [ #  # ]:           0 :                 else if (shmq_res == SHM_MQ_WOULD_BLOCK)
     801                 :             :                 {
     802                 :             :                         /* Replay the changes from the file, if any. */
     803         [ #  # ]:           0 :                         if (!pa_process_spooled_messages_if_required())
     804                 :             :                         {
     805                 :           0 :                                 int                     rc;
     806                 :             : 
     807                 :             :                                 /* Wait for more work. */
     808                 :           0 :                                 rc = WaitLatch(MyLatch,
     809                 :             :                                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     810                 :             :                                                            1000L,
     811                 :             :                                                            WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
     812                 :             : 
     813         [ #  # ]:           0 :                                 if (rc & WL_LATCH_SET)
     814                 :           0 :                                         ResetLatch(MyLatch);
     815                 :           0 :                         }
     816                 :           0 :                 }
     817                 :             :                 else
     818                 :             :                 {
     819         [ #  # ]:           0 :                         Assert(shmq_res == SHM_MQ_DETACHED);
     820                 :             : 
     821   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     822                 :             :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     823                 :             :                                          errmsg("lost connection to the logical replication apply worker")));
     824                 :             :                 }
     825                 :             : 
     826                 :           0 :                 MemoryContextReset(ApplyMessageContext);
     827                 :           0 :                 MemoryContextSwitchTo(oldcxt);
     828                 :           0 :         }
     829                 :             : 
     830                 :             :         /* Pop the error context stack. */
     831                 :             :         error_context_stack = errcallback.previous;
     832                 :             : 
     833                 :             :         MemoryContextSwitchTo(oldcxt);
     834                 :             : }
     835                 :             : 
     836                 :             : /*
     837                 :             :  * Make sure the leader apply worker tries to read from our error queue one more
     838                 :             :  * time. This guards against the case where we exit uncleanly without sending
     839                 :             :  * an ErrorResponse, for example because some code calls proc_exit directly.
     840                 :             :  *
     841                 :             :  * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
     842                 :             :  * if any. See ParallelWorkerShutdown for details.
     843                 :             :  */
     844                 :             : static void
     845                 :           0 : pa_shutdown(int code, Datum arg)
     846                 :             : {
     847                 :           0 :         SendProcSignal(MyLogicalRepWorker->leader_pid,
     848                 :             :                                    PROCSIG_PARALLEL_APPLY_MESSAGE,
     849                 :             :                                    INVALID_PROC_NUMBER);
     850                 :             : 
     851                 :           0 :         dsm_detach((dsm_segment *) DatumGetPointer(arg));
     852                 :           0 : }
     853                 :             : 
     854                 :             : /*
     855                 :             :  * Parallel apply worker entry point.
     856                 :             :  */
     857                 :             : void
     858                 :           0 : ParallelApplyWorkerMain(Datum main_arg)
     859                 :             : {
     860                 :           0 :         ParallelApplyWorkerShared *shared;
     861                 :           0 :         dsm_handle      handle;
     862                 :           0 :         dsm_segment *seg;
     863                 :           0 :         shm_toc    *toc;
     864                 :           0 :         shm_mq     *mq;
     865                 :           0 :         shm_mq_handle *mqh;
     866                 :           0 :         shm_mq_handle *error_mqh;
     867                 :           0 :         RepOriginId originid;
     868                 :           0 :         int                     worker_slot = DatumGetInt32(main_arg);
     869                 :           0 :         char            originname[NAMEDATALEN];
     870                 :             : 
     871                 :           0 :         InitializingApplyWorker = true;
     872                 :             : 
     873                 :             :         /*
     874                 :             :          * Setup signal handling.
     875                 :             :          *
     876                 :             :          * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
     877                 :             :          * initiated by the leader apply worker. This helps to differentiate it
     878                 :             :          * from the case where we abort the current transaction and exit on
     879                 :             :          * receiving SIGTERM.
     880                 :             :          */
     881                 :           0 :         pqsignal(SIGHUP, SignalHandlerForConfigReload);
     882                 :           0 :         pqsignal(SIGTERM, die);
     883                 :           0 :         pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
     884                 :           0 :         BackgroundWorkerUnblockSignals();
     885                 :             : 
     886                 :             :         /*
     887                 :             :          * Attach to the dynamic shared memory segment for the parallel apply, and
     888                 :             :          * find its table of contents.
     889                 :             :          *
     890                 :             :          * Like parallel query, we don't need resource owner by this time. See
     891                 :             :          * ParallelWorkerMain.
     892                 :             :          */
     893                 :           0 :         memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
     894                 :           0 :         seg = dsm_attach(handle);
     895         [ #  # ]:           0 :         if (!seg)
     896   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     897                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     898                 :             :                                  errmsg("could not map dynamic shared memory segment")));
     899                 :             : 
     900                 :           0 :         toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
     901         [ #  # ]:           0 :         if (!toc)
     902   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     903                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     904                 :             :                                  errmsg("invalid magic number in dynamic shared memory segment")));
     905                 :             : 
     906                 :             :         /* Look up the shared information. */
     907                 :           0 :         shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
     908                 :           0 :         MyParallelShared = shared;
     909                 :             : 
     910                 :             :         /*
     911                 :             :          * Attach to the message queue.
     912                 :             :          */
     913                 :           0 :         mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
     914                 :           0 :         shm_mq_set_receiver(mq, MyProc);
     915                 :           0 :         mqh = shm_mq_attach(mq, seg, NULL);
     916                 :             : 
     917                 :             :         /*
     918                 :             :          * Primary initialization is complete. Now, we can attach to our slot.
     919                 :             :          * This is to ensure that the leader apply worker does not write data to
     920                 :             :          * the uninitialized memory queue.
     921                 :             :          */
     922                 :           0 :         logicalrep_worker_attach(worker_slot);
     923                 :             : 
     924                 :             :         /*
     925                 :             :          * Register the shutdown callback after we are attached to the worker
     926                 :             :          * slot. This is to ensure that MyLogicalRepWorker remains valid when this
     927                 :             :          * callback is invoked.
     928                 :             :          */
     929                 :           0 :         before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
     930                 :             : 
     931         [ #  # ]:           0 :         SpinLockAcquire(&MyParallelShared->mutex);
     932                 :           0 :         MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
     933                 :           0 :         MyParallelShared->logicalrep_worker_slot_no = worker_slot;
     934                 :           0 :         SpinLockRelease(&MyParallelShared->mutex);
     935                 :             : 
     936                 :             :         /*
     937                 :             :          * Attach to the error queue.
     938                 :             :          */
     939                 :           0 :         mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
     940                 :           0 :         shm_mq_set_sender(mq, MyProc);
     941                 :           0 :         error_mqh = shm_mq_attach(mq, seg, NULL);
     942                 :             : 
     943                 :           0 :         pq_redirect_to_shm_mq(seg, error_mqh);
     944                 :           0 :         pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
     945                 :             :                                                    INVALID_PROC_NUMBER);
     946                 :             : 
     947                 :           0 :         MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
     948                 :           0 :                 MyLogicalRepWorker->reply_time = 0;
     949                 :             : 
     950                 :           0 :         InitializeLogRepWorker();
     951                 :             : 
     952                 :           0 :         InitializingApplyWorker = false;
     953                 :             : 
     954                 :             :         /* Setup replication origin tracking. */
     955                 :           0 :         StartTransactionCommand();
     956                 :           0 :         ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
     957                 :           0 :                                                                            originname, sizeof(originname));
     958                 :           0 :         originid = replorigin_by_name(originname, false);
     959                 :             : 
     960                 :             :         /*
     961                 :             :          * The parallel apply worker doesn't need to monopolize this replication
     962                 :             :          * origin which was already acquired by its leader process.
     963                 :             :          */
     964                 :           0 :         replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
     965                 :           0 :         replorigin_session_origin = originid;
     966                 :           0 :         CommitTransactionCommand();
     967                 :             : 
     968                 :             :         /*
     969                 :             :          * Setup callback for syscache so that we know when something changes in
     970                 :             :          * the subscription relation state.
     971                 :             :          */
     972                 :           0 :         CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
     973                 :             :                                                                   InvalidateSyncingRelStates,
     974                 :             :                                                                   (Datum) 0);
     975                 :             : 
     976                 :           0 :         set_apply_error_context_origin(originname);
     977                 :             : 
     978                 :           0 :         LogicalParallelApplyLoop(mqh);
     979                 :             : 
     980                 :             :         /*
     981                 :             :          * The parallel apply worker must not get here because the parallel apply
     982                 :             :          * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
     983                 :             :          * leader, or SIGINT from itself, or when there is an error. None of these
     984                 :             :          * cases will allow the code to reach here.
     985                 :             :          */
     986                 :           0 :         Assert(false);
     987                 :           0 : }
     988                 :             : 
     989                 :             : /*
     990                 :             :  * Handle receipt of an interrupt indicating a parallel apply worker message.
     991                 :             :  *
     992                 :             :  * Note: this is called within a signal handler! All we can do is set a flag
     993                 :             :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     994                 :             :  * ProcessParallelApplyMessages().
     995                 :             :  */
     996                 :             : void
     997                 :           0 : HandleParallelApplyMessageInterrupt(void)
     998                 :             : {
     999                 :           0 :         InterruptPending = true;
    1000                 :           0 :         ParallelApplyMessagePending = true;
    1001                 :           0 :         SetLatch(MyLatch);
    1002                 :           0 : }
    1003                 :             : 
    1004                 :             : /*
    1005                 :             :  * Process a single protocol message received from a single parallel apply
    1006                 :             :  * worker.
    1007                 :             :  */
    1008                 :             : static void
    1009                 :           0 : ProcessParallelApplyMessage(StringInfo msg)
    1010                 :             : {
    1011                 :           0 :         char            msgtype;
    1012                 :             : 
    1013                 :           0 :         msgtype = pq_getmsgbyte(msg);
    1014                 :             : 
    1015      [ #  #  # ]:           0 :         switch (msgtype)
    1016                 :             :         {
    1017                 :             :                 case PqMsg_ErrorResponse:
    1018                 :             :                         {
    1019                 :           0 :                                 ErrorData       edata;
    1020                 :             : 
    1021                 :             :                                 /* Parse ErrorResponse. */
    1022                 :           0 :                                 pq_parse_errornotice(msg, &edata);
    1023                 :             : 
    1024                 :             :                                 /*
    1025                 :             :                                  * If desired, add a context line to show that this is a
    1026                 :             :                                  * message propagated from a parallel apply worker. Otherwise,
    1027                 :             :                                  * it can sometimes be confusing to understand what actually
    1028                 :             :                                  * happened.
    1029                 :             :                                  */
    1030         [ #  # ]:           0 :                                 if (edata.context)
    1031                 :           0 :                                         edata.context = psprintf("%s\n%s", edata.context,
    1032                 :           0 :                                                                                          _("logical replication parallel apply worker"));
    1033                 :             :                                 else
    1034                 :           0 :                                         edata.context = pstrdup(_("logical replication parallel apply worker"));
    1035                 :             : 
    1036                 :             :                                 /*
    1037                 :             :                                  * Context beyond that should use the error context callbacks
    1038                 :             :                                  * that were in effect in LogicalRepApplyLoop().
    1039                 :             :                                  */
    1040                 :           0 :                                 error_context_stack = apply_error_context_stack;
    1041                 :             : 
    1042                 :             :                                 /*
    1043                 :             :                                  * The actual error must have been reported by the parallel
    1044                 :             :                                  * apply worker.
    1045                 :             :                                  */
    1046   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
    1047                 :             :                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1048                 :             :                                                  errmsg("logical replication parallel apply worker exited due to error"),
    1049                 :             :                                                  errcontext("%s", edata.context)));
    1050                 :           0 :                         }
    1051                 :             : 
    1052                 :             :                         /*
    1053                 :             :                          * Don't need to do anything about NoticeResponse and
    1054                 :             :                          * NotificationResponse as the logical replication worker doesn't
    1055                 :             :                          * need to send messages to the client.
    1056                 :             :                          */
    1057                 :             :                 case PqMsg_NoticeResponse:
    1058                 :             :                 case PqMsg_NotificationResponse:
    1059                 :           0 :                         break;
    1060                 :             : 
    1061                 :             :                 default:
    1062   [ #  #  #  # ]:           0 :                         elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
    1063                 :             :                                  msgtype, msg->len);
    1064                 :           0 :         }
    1065                 :           0 : }
    1066                 :             : 
    1067                 :             : /*
    1068                 :             :  * Handle any queued protocol messages received from parallel apply workers.
    1069                 :             :  */
    1070                 :             : void
    1071                 :           0 : ProcessParallelApplyMessages(void)
    1072                 :             : {
    1073                 :           0 :         ListCell   *lc;
    1074                 :           0 :         MemoryContext oldcontext;
    1075                 :             : 
    1076                 :             :         static MemoryContext hpam_context = NULL;
    1077                 :             : 
    1078                 :             :         /*
    1079                 :             :          * This is invoked from ProcessInterrupts(), and since some of the
    1080                 :             :          * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1081                 :             :          * for recursive calls if more signals are received while this runs. It's
    1082                 :             :          * unclear that recursive entry would be safe, and it doesn't seem useful
    1083                 :             :          * even if it is safe, so let's block interrupts until done.
    1084                 :             :          */
    1085                 :           0 :         HOLD_INTERRUPTS();
    1086                 :             : 
    1087                 :             :         /*
    1088                 :             :          * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
    1089                 :             :          * don't want to risk leaking data into long-lived contexts, so let's do
    1090                 :             :          * our work here in a private context that we can reset on each use.
    1091                 :             :          */
    1092         [ #  # ]:           0 :         if (!hpam_context)                      /* first time through? */
    1093                 :           0 :                 hpam_context = AllocSetContextCreate(TopMemoryContext,
    1094                 :             :                                                                                          "ProcessParallelApplyMessages",
    1095                 :             :                                                                                          ALLOCSET_DEFAULT_SIZES);
    1096                 :             :         else
    1097                 :           0 :                 MemoryContextReset(hpam_context);
    1098                 :             : 
    1099                 :           0 :         oldcontext = MemoryContextSwitchTo(hpam_context);
    1100                 :             : 
    1101                 :           0 :         ParallelApplyMessagePending = false;
    1102                 :             : 
    1103   [ #  #  #  #  :           0 :         foreach(lc, ParallelApplyWorkerPool)
                   #  # ]
    1104                 :             :         {
    1105                 :           0 :                 shm_mq_result res;
    1106                 :           0 :                 Size            nbytes;
    1107                 :           0 :                 void       *data;
    1108                 :           0 :                 ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
    1109                 :             : 
    1110                 :             :                 /*
    1111                 :             :                  * The leader will detach from the error queue and set it to NULL
    1112                 :             :                  * before preparing to stop all parallel apply workers, so we don't
    1113                 :             :                  * need to handle error messages anymore. See
    1114                 :             :                  * logicalrep_worker_detach.
    1115                 :             :                  */
    1116         [ #  # ]:           0 :                 if (!winfo->error_mq_handle)
    1117                 :           0 :                         continue;
    1118                 :             : 
    1119                 :           0 :                 res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
    1120                 :             : 
    1121         [ #  # ]:           0 :                 if (res == SHM_MQ_WOULD_BLOCK)
    1122                 :           0 :                         continue;
    1123         [ #  # ]:           0 :                 else if (res == SHM_MQ_SUCCESS)
    1124                 :             :                 {
    1125                 :           0 :                         StringInfoData msg;
    1126                 :             : 
    1127                 :           0 :                         initStringInfo(&msg);
    1128                 :           0 :                         appendBinaryStringInfo(&msg, data, nbytes);
    1129                 :           0 :                         ProcessParallelApplyMessage(&msg);
    1130                 :           0 :                         pfree(msg.data);
    1131                 :           0 :                 }
    1132                 :             :                 else
    1133   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1134                 :             :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1135                 :             :                                          errmsg("lost connection to the logical replication parallel apply worker")));
    1136      [ #  #  # ]:           0 :         }
    1137                 :             : 
    1138                 :           0 :         MemoryContextSwitchTo(oldcontext);
    1139                 :             : 
    1140                 :             :         /* Might as well clear the context on our way out */
    1141                 :           0 :         MemoryContextReset(hpam_context);
    1142                 :             : 
    1143         [ #  # ]:           0 :         RESUME_INTERRUPTS();
    1144                 :           0 : }
    1145                 :             : 
    1146                 :             : /*
    1147                 :             :  * Send the data to the specified parallel apply worker via shared-memory
    1148                 :             :  * queue.
    1149                 :             :  *
    1150                 :             :  * Returns false if the attempt to send data via shared memory times out, true
    1151                 :             :  * otherwise.
    1152                 :             :  */
    1153                 :             : bool
    1154                 :           0 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
    1155                 :             : {
    1156                 :           0 :         int                     rc;
    1157                 :           0 :         shm_mq_result result;
    1158                 :           0 :         TimestampTz startTime = 0;
    1159                 :             : 
    1160         [ #  # ]:           0 :         Assert(!IsTransactionState());
    1161         [ #  # ]:           0 :         Assert(!winfo->serialize_changes);
    1162                 :             : 
    1163                 :             :         /*
    1164                 :             :          * We don't try to send data to parallel worker for 'immediate' mode. This
    1165                 :             :          * is primarily used for testing purposes.
    1166                 :             :          */
    1167         [ #  # ]:           0 :         if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
    1168                 :           0 :                 return false;
    1169                 :             : 
    1170                 :             : /*
    1171                 :             :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
    1172                 :             :  * to send the message unless the parallel apply worker is waiting on some
    1173                 :             :  * lock or there is a serious resource crunch. See the comments atop this file
    1174                 :             :  * to know why we are using a non-blocking way to send the message.
    1175                 :             :  */
    1176                 :             : #define SHM_SEND_RETRY_INTERVAL_MS 1000
    1177                 :             : #define SHM_SEND_TIMEOUT_MS             (10000 - SHM_SEND_RETRY_INTERVAL_MS)
    1178                 :             : 
    1179                 :           0 :         for (;;)
    1180                 :             :         {
    1181                 :           0 :                 result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
    1182                 :             : 
    1183         [ #  # ]:           0 :                 if (result == SHM_MQ_SUCCESS)
    1184                 :           0 :                         return true;
    1185         [ #  # ]:           0 :                 else if (result == SHM_MQ_DETACHED)
    1186   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1187                 :             :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1188                 :             :                                          errmsg("could not send data to shared-memory queue")));
    1189                 :             : 
    1190         [ #  # ]:           0 :                 Assert(result == SHM_MQ_WOULD_BLOCK);
    1191                 :             : 
    1192                 :             :                 /* Wait before retrying. */
    1193                 :           0 :                 rc = WaitLatch(MyLatch,
    1194                 :             :                                            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1195                 :             :                                            SHM_SEND_RETRY_INTERVAL_MS,
    1196                 :             :                                            WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
    1197                 :             : 
    1198         [ #  # ]:           0 :                 if (rc & WL_LATCH_SET)
    1199                 :             :                 {
    1200                 :           0 :                         ResetLatch(MyLatch);
    1201         [ #  # ]:           0 :                         CHECK_FOR_INTERRUPTS();
    1202                 :           0 :                 }
    1203                 :             : 
    1204         [ #  # ]:           0 :                 if (startTime == 0)
    1205                 :           0 :                         startTime = GetCurrentTimestamp();
    1206         [ #  # ]:           0 :                 else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
    1207                 :             :                                                                                         SHM_SEND_TIMEOUT_MS))
    1208                 :           0 :                         return false;
    1209                 :             :         }
    1210                 :           0 : }
    1211                 :             : 
    1212                 :             : /*
    1213                 :             :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
    1214                 :             :  * that the current data and any subsequent data for this transaction will be
    1215                 :             :  * serialized to a file. This is done to prevent possible deadlocks with
    1216                 :             :  * another parallel apply worker (refer to the comments atop this file).
    1217                 :             :  */
    1218                 :             : void
    1219                 :           0 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
    1220                 :             :                                                            bool stream_locked)
    1221                 :             : {
    1222   [ #  #  #  # ]:           0 :         ereport(LOG,
    1223                 :             :                         (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
    1224                 :             :                                         winfo->shared->xid)));
    1225                 :             : 
    1226                 :             :         /*
    1227                 :             :          * The parallel apply worker could be stuck for some reason (say waiting
    1228                 :             :          * on some lock by other backend), so stop trying to send data directly to
    1229                 :             :          * it and start serializing data to the file instead.
    1230                 :             :          */
    1231                 :           0 :         winfo->serialize_changes = true;
    1232                 :             : 
    1233                 :             :         /* Initialize the stream fileset. */
    1234                 :           0 :         stream_start_internal(winfo->shared->xid, true);
    1235                 :             : 
    1236                 :             :         /*
    1237                 :             :          * Acquires the stream lock if not already to make sure that the parallel
    1238                 :             :          * apply worker will wait for the leader to release the stream lock until
    1239                 :             :          * the end of the transaction.
    1240                 :             :          */
    1241         [ #  # ]:           0 :         if (!stream_locked)
    1242                 :           0 :                 pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1243                 :             : 
    1244                 :           0 :         pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
    1245                 :           0 : }
    1246                 :             : 
    1247                 :             : /*
    1248                 :             :  * Wait until the parallel apply worker's transaction state has reached or
    1249                 :             :  * exceeded the given xact_state.
    1250                 :             :  */
    1251                 :             : static void
    1252                 :           0 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
    1253                 :             :                                            ParallelTransState xact_state)
    1254                 :             : {
    1255                 :           0 :         for (;;)
    1256                 :             :         {
    1257                 :             :                 /*
    1258                 :             :                  * Stop if the transaction state has reached or exceeded the given
    1259                 :             :                  * xact_state.
    1260                 :             :                  */
    1261         [ #  # ]:           0 :                 if (pa_get_xact_state(winfo->shared) >= xact_state)
    1262                 :           0 :                         break;
    1263                 :             : 
    1264                 :             :                 /* Wait to be signalled. */
    1265                 :           0 :                 (void) WaitLatch(MyLatch,
    1266                 :             :                                                  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1267                 :             :                                                  10L,
    1268                 :             :                                                  WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
    1269                 :             : 
    1270                 :             :                 /* Reset the latch so we don't spin. */
    1271                 :           0 :                 ResetLatch(MyLatch);
    1272                 :             : 
    1273                 :             :                 /* An interrupt may have occurred while we were waiting. */
    1274         [ #  # ]:           0 :                 CHECK_FOR_INTERRUPTS();
    1275                 :             :         }
    1276                 :           0 : }
    1277                 :             : 
    1278                 :             : /*
    1279                 :             :  * Wait until the parallel apply worker's transaction finishes.
    1280                 :             :  */
    1281                 :             : static void
    1282                 :           0 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
    1283                 :             : {
    1284                 :             :         /*
    1285                 :             :          * Wait until the parallel apply worker set the state to
    1286                 :             :          * PARALLEL_TRANS_STARTED which means it has acquired the transaction
    1287                 :             :          * lock. This is to prevent leader apply worker from acquiring the
    1288                 :             :          * transaction lock earlier than the parallel apply worker.
    1289                 :             :          */
    1290                 :           0 :         pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
    1291                 :             : 
    1292                 :             :         /*
    1293                 :             :          * Wait for the transaction lock to be released. This is required to
    1294                 :             :          * detect deadlock among leader and parallel apply workers. Refer to the
    1295                 :             :          * comments atop this file.
    1296                 :             :          */
    1297                 :           0 :         pa_lock_transaction(winfo->shared->xid, AccessShareLock);
    1298                 :           0 :         pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
    1299                 :             : 
    1300                 :             :         /*
    1301                 :             :          * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
    1302                 :             :          * apply worker failed while applying changes causing the lock to be
    1303                 :             :          * released.
    1304                 :             :          */
    1305         [ #  # ]:           0 :         if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
    1306   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1307                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1308                 :             :                                  errmsg("lost connection to the logical replication parallel apply worker")));
    1309                 :           0 : }
    1310                 :             : 
    1311                 :             : /*
    1312                 :             :  * Set the transaction state for a given parallel apply worker.
    1313                 :             :  */
    1314                 :             : void
    1315                 :           0 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
    1316                 :             :                                   ParallelTransState xact_state)
    1317                 :             : {
    1318         [ #  # ]:           0 :         SpinLockAcquire(&wshared->mutex);
    1319                 :           0 :         wshared->xact_state = xact_state;
    1320                 :           0 :         SpinLockRelease(&wshared->mutex);
    1321                 :           0 : }
    1322                 :             : 
    1323                 :             : /*
    1324                 :             :  * Get the transaction state for a given parallel apply worker.
    1325                 :             :  */
    1326                 :             : static ParallelTransState
    1327                 :           0 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
    1328                 :             : {
    1329                 :           0 :         ParallelTransState xact_state;
    1330                 :             : 
    1331         [ #  # ]:           0 :         SpinLockAcquire(&wshared->mutex);
    1332                 :           0 :         xact_state = wshared->xact_state;
    1333                 :           0 :         SpinLockRelease(&wshared->mutex);
    1334                 :             : 
    1335                 :           0 :         return xact_state;
    1336                 :           0 : }
    1337                 :             : 
    1338                 :             : /*
    1339                 :             :  * Cache the parallel apply worker information.
    1340                 :             :  */
    1341                 :             : void
    1342                 :           0 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
    1343                 :             : {
    1344                 :           0 :         stream_apply_worker = winfo;
    1345                 :           0 : }
    1346                 :             : 
    1347                 :             : /*
    1348                 :             :  * Form a unique savepoint name for the streaming transaction.
    1349                 :             :  *
    1350                 :             :  * Note that different subscriptions for publications on different nodes can
    1351                 :             :  * receive same remote xid, so we need to use subscription id along with it.
    1352                 :             :  *
    1353                 :             :  * Returns the name in the supplied buffer.
    1354                 :             :  */
    1355                 :             : static void
    1356                 :           0 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
    1357                 :             : {
    1358                 :           0 :         snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
    1359                 :           0 : }
    1360                 :             : 
    1361                 :             : /*
    1362                 :             :  * Define a savepoint for a subxact in parallel apply worker if needed.
    1363                 :             :  *
    1364                 :             :  * The parallel apply worker can figure out if a new subtransaction was
    1365                 :             :  * started by checking if the new change arrived with a different xid. In that
    1366                 :             :  * case define a named savepoint, so that we are able to rollback to it
    1367                 :             :  * if required.
    1368                 :             :  */
    1369                 :             : void
    1370                 :           0 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
    1371                 :             : {
    1372   [ #  #  #  # ]:           0 :         if (current_xid != top_xid &&
    1373                 :           0 :                 !list_member_xid(subxactlist, current_xid))
    1374                 :             :         {
    1375                 :           0 :                 MemoryContext oldctx;
    1376                 :           0 :                 char            spname[NAMEDATALEN];
    1377                 :             : 
    1378                 :           0 :                 pa_savepoint_name(MySubscription->oid, current_xid,
    1379                 :           0 :                                                   spname, sizeof(spname));
    1380                 :             : 
    1381   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
    1382                 :             : 
    1383                 :             :                 /* We must be in transaction block to define the SAVEPOINT. */
    1384         [ #  # ]:           0 :                 if (!IsTransactionBlock())
    1385                 :             :                 {
    1386         [ #  # ]:           0 :                         if (!IsTransactionState())
    1387                 :           0 :                                 StartTransactionCommand();
    1388                 :             : 
    1389                 :           0 :                         BeginTransactionBlock();
    1390                 :           0 :                         CommitTransactionCommand();
    1391                 :           0 :                 }
    1392                 :             : 
    1393                 :           0 :                 DefineSavepoint(spname);
    1394                 :             : 
    1395                 :             :                 /*
    1396                 :             :                  * CommitTransactionCommand is needed to start a subtransaction after
    1397                 :             :                  * issuing a SAVEPOINT inside a transaction block (see
    1398                 :             :                  * StartSubTransaction()).
    1399                 :             :                  */
    1400                 :           0 :                 CommitTransactionCommand();
    1401                 :             : 
    1402                 :           0 :                 oldctx = MemoryContextSwitchTo(TopTransactionContext);
    1403                 :           0 :                 subxactlist = lappend_xid(subxactlist, current_xid);
    1404                 :           0 :                 MemoryContextSwitchTo(oldctx);
    1405                 :           0 :         }
    1406                 :           0 : }
    1407                 :             : 
    1408                 :             : /* Reset the list that maintains subtransactions. */
    1409                 :             : void
    1410                 :           0 : pa_reset_subtrans(void)
    1411                 :             : {
    1412                 :             :         /*
    1413                 :             :          * We don't need to free this explicitly as the allocated memory will be
    1414                 :             :          * freed at the transaction end.
    1415                 :             :          */
    1416                 :           0 :         subxactlist = NIL;
    1417                 :           0 : }
    1418                 :             : 
    1419                 :             : /*
    1420                 :             :  * Handle STREAM ABORT message when the transaction was applied in a parallel
    1421                 :             :  * apply worker.
    1422                 :             :  */
    1423                 :             : void
    1424                 :           0 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
    1425                 :             : {
    1426                 :           0 :         TransactionId xid = abort_data->xid;
    1427                 :           0 :         TransactionId subxid = abort_data->subxid;
    1428                 :             : 
    1429                 :             :         /*
    1430                 :             :          * Update origin state so we can restart streaming from correct position
    1431                 :             :          * in case of crash.
    1432                 :             :          */
    1433                 :           0 :         replorigin_session_origin_lsn = abort_data->abort_lsn;
    1434                 :           0 :         replorigin_session_origin_timestamp = abort_data->abort_time;
    1435                 :             : 
    1436                 :             :         /*
    1437                 :             :          * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1438                 :             :          * just free the subxactlist.
    1439                 :             :          */
    1440         [ #  # ]:           0 :         if (subxid == xid)
    1441                 :             :         {
    1442                 :           0 :                 pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1443                 :             : 
    1444                 :             :                 /*
    1445                 :             :                  * Release the lock as we might be processing an empty streaming
    1446                 :             :                  * transaction in which case the lock won't be released during
    1447                 :             :                  * transaction rollback.
    1448                 :             :                  *
    1449                 :             :                  * Note that it's ok to release the transaction lock before aborting
    1450                 :             :                  * the transaction because even if the parallel apply worker dies due
    1451                 :             :                  * to crash or some other reason, such a transaction would still be
    1452                 :             :                  * considered aborted.
    1453                 :             :                  */
    1454                 :           0 :                 pa_unlock_transaction(xid, AccessExclusiveLock);
    1455                 :             : 
    1456                 :           0 :                 AbortCurrentTransaction();
    1457                 :             : 
    1458         [ #  # ]:           0 :                 if (IsTransactionBlock())
    1459                 :             :                 {
    1460                 :           0 :                         EndTransactionBlock(false);
    1461                 :           0 :                         CommitTransactionCommand();
    1462                 :           0 :                 }
    1463                 :             : 
    1464                 :           0 :                 pa_reset_subtrans();
    1465                 :             : 
    1466                 :           0 :                 pgstat_report_activity(STATE_IDLE, NULL);
    1467                 :           0 :         }
    1468                 :             :         else
    1469                 :             :         {
    1470                 :             :                 /* OK, so it's a subxact. Rollback to the savepoint. */
    1471                 :           0 :                 int                     i;
    1472                 :           0 :                 char            spname[NAMEDATALEN];
    1473                 :             : 
    1474                 :           0 :                 pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
    1475                 :             : 
    1476   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
    1477                 :             : 
    1478                 :             :                 /*
    1479                 :             :                  * Search the subxactlist, determine the offset tracked for the
    1480                 :             :                  * subxact, and truncate the list.
    1481                 :             :                  *
    1482                 :             :                  * Note that for an empty sub-transaction we won't find the subxid
    1483                 :             :                  * here.
    1484                 :             :                  */
    1485         [ #  # ]:           0 :                 for (i = list_length(subxactlist) - 1; i >= 0; i--)
    1486                 :             :                 {
    1487                 :           0 :                         TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
    1488                 :             : 
    1489         [ #  # ]:           0 :                         if (xid_tmp == subxid)
    1490                 :             :                         {
    1491                 :           0 :                                 RollbackToSavepoint(spname);
    1492                 :           0 :                                 CommitTransactionCommand();
    1493                 :           0 :                                 subxactlist = list_truncate(subxactlist, i);
    1494                 :           0 :                                 break;
    1495                 :             :                         }
    1496      [ #  #  # ]:           0 :                 }
    1497                 :           0 :         }
    1498                 :           0 : }
    1499                 :             : 
    1500                 :             : /*
    1501                 :             :  * Set the fileset state for a particular parallel apply worker. The fileset
    1502                 :             :  * will be set once the leader worker serialized all changes to the file
    1503                 :             :  * so that it can be used by parallel apply worker.
    1504                 :             :  */
    1505                 :             : void
    1506                 :           0 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
    1507                 :             :                                          PartialFileSetState fileset_state)
    1508                 :             : {
    1509         [ #  # ]:           0 :         SpinLockAcquire(&wshared->mutex);
    1510                 :           0 :         wshared->fileset_state = fileset_state;
    1511                 :             : 
    1512         [ #  # ]:           0 :         if (fileset_state == FS_SERIALIZE_DONE)
    1513                 :             :         {
    1514         [ #  # ]:           0 :                 Assert(am_leader_apply_worker());
    1515         [ #  # ]:           0 :                 Assert(MyLogicalRepWorker->stream_fileset);
    1516                 :           0 :                 wshared->fileset = *MyLogicalRepWorker->stream_fileset;
    1517                 :           0 :         }
    1518                 :             : 
    1519                 :           0 :         SpinLockRelease(&wshared->mutex);
    1520                 :           0 : }
    1521                 :             : 
    1522                 :             : /*
    1523                 :             :  * Get the fileset state for the current parallel apply worker.
    1524                 :             :  */
    1525                 :             : static PartialFileSetState
    1526                 :           0 : pa_get_fileset_state(void)
    1527                 :             : {
    1528                 :           0 :         PartialFileSetState fileset_state;
    1529                 :             : 
    1530         [ #  # ]:           0 :         Assert(am_parallel_apply_worker());
    1531                 :             : 
    1532         [ #  # ]:           0 :         SpinLockAcquire(&MyParallelShared->mutex);
    1533                 :           0 :         fileset_state = MyParallelShared->fileset_state;
    1534                 :           0 :         SpinLockRelease(&MyParallelShared->mutex);
    1535                 :             : 
    1536                 :           0 :         return fileset_state;
    1537                 :           0 : }
    1538                 :             : 
    1539                 :             : /*
    1540                 :             :  * Helper functions to acquire and release a lock for each stream block.
    1541                 :             :  *
    1542                 :             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
    1543                 :             :  * stream lock.
    1544                 :             :  *
    1545                 :             :  * Refer to the comments atop this file to see how the stream lock is used.
    1546                 :             :  */
    1547                 :             : void
    1548                 :           0 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
    1549                 :             : {
    1550                 :           0 :         LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1551                 :           0 :                                                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1552                 :           0 : }
    1553                 :             : 
    1554                 :             : void
    1555                 :           0 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
    1556                 :             : {
    1557                 :           0 :         UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1558                 :           0 :                                                                          PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1559                 :           0 : }
    1560                 :             : 
    1561                 :             : /*
    1562                 :             :  * Helper functions to acquire and release a lock for each local transaction
    1563                 :             :  * apply.
    1564                 :             :  *
    1565                 :             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
    1566                 :             :  * transaction lock.
    1567                 :             :  *
    1568                 :             :  * Note that all the callers must pass a remote transaction ID instead of a
    1569                 :             :  * local transaction ID as xid. This is because the local transaction ID will
    1570                 :             :  * only be assigned while applying the first change in the parallel apply but
    1571                 :             :  * it's possible that the first change in the parallel apply worker is blocked
    1572                 :             :  * by a concurrently executing transaction in another parallel apply worker. We
    1573                 :             :  * can only communicate the local transaction id to the leader after applying
    1574                 :             :  * the first change so it won't be able to wait after sending the xact finish
    1575                 :             :  * command using this lock.
    1576                 :             :  *
    1577                 :             :  * Refer to the comments atop this file to see how the transaction lock is
    1578                 :             :  * used.
    1579                 :             :  */
    1580                 :             : void
    1581                 :           0 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
    1582                 :             : {
    1583                 :           0 :         LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1584                 :           0 :                                                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
    1585                 :           0 : }
    1586                 :             : 
    1587                 :             : void
    1588                 :           0 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
    1589                 :             : {
    1590                 :           0 :         UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1591                 :           0 :                                                                          PARALLEL_APPLY_LOCK_XACT, lockmode);
    1592                 :           0 : }
    1593                 :             : 
    1594                 :             : /*
    1595                 :             :  * Decrement the number of pending streaming blocks and wait on the stream lock
    1596                 :             :  * if there is no pending block available.
    1597                 :             :  */
    1598                 :             : void
    1599                 :           0 : pa_decr_and_wait_stream_block(void)
    1600                 :             : {
    1601         [ #  # ]:           0 :         Assert(am_parallel_apply_worker());
    1602                 :             : 
    1603                 :             :         /*
    1604                 :             :          * It is only possible to not have any pending stream chunks when we are
    1605                 :             :          * applying spooled messages.
    1606                 :             :          */
    1607         [ #  # ]:           0 :         if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
    1608                 :             :         {
    1609         [ #  # ]:           0 :                 if (pa_has_spooled_message_pending())
    1610                 :           0 :                         return;
    1611                 :             : 
    1612   [ #  #  #  # ]:           0 :                 elog(ERROR, "invalid pending streaming chunk 0");
    1613                 :           0 :         }
    1614                 :             : 
    1615         [ #  # ]:           0 :         if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
    1616                 :             :         {
    1617                 :           0 :                 pa_lock_stream(MyParallelShared->xid, AccessShareLock);
    1618                 :           0 :                 pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
    1619                 :           0 :         }
    1620                 :           0 : }
    1621                 :             : 
    1622                 :             : /*
    1623                 :             :  * Finish processing the streaming transaction in the leader apply worker.
    1624                 :             :  */
    1625                 :             : void
    1626                 :           0 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
    1627                 :             : {
    1628         [ #  # ]:           0 :         Assert(am_leader_apply_worker());
    1629                 :             : 
    1630                 :             :         /*
    1631                 :             :          * Unlock the shared object lock so that parallel apply worker can
    1632                 :             :          * continue to receive and apply changes.
    1633                 :             :          */
    1634                 :           0 :         pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1635                 :             : 
    1636                 :             :         /*
    1637                 :             :          * Wait for that worker to finish. This is necessary to maintain commit
    1638                 :             :          * order which avoids failures due to transaction dependencies and
    1639                 :             :          * deadlocks.
    1640                 :             :          */
    1641                 :           0 :         pa_wait_for_xact_finish(winfo);
    1642                 :             : 
    1643         [ #  # ]:           0 :         if (XLogRecPtrIsValid(remote_lsn))
    1644                 :           0 :                 store_flush_position(remote_lsn, winfo->shared->last_commit_end);
    1645                 :             : 
    1646                 :           0 :         pa_free_worker(winfo);
    1647                 :           0 : }
        

Generated by: LCOV version 2.3.2-1