LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 33.1 % 602 199
Test Date: 2026-01-26 10:56:24 Functions: 37.5 % 32 12
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 14.9 % 464 69

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * origin.c
       4                 :             :  *        Logical replication progress tracking support.
       5                 :             :  *
       6                 :             :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7                 :             :  *
       8                 :             :  * IDENTIFICATION
       9                 :             :  *        src/backend/replication/logical/origin.c
      10                 :             :  *
      11                 :             :  * NOTES
      12                 :             :  *
      13                 :             :  * This file provides the following:
      14                 :             :  * * An infrastructure to name nodes in a replication setup
      15                 :             :  * * A facility to efficiently store and persist replication progress in an
      16                 :             :  *       efficient and durable manner.
      17                 :             :  *
      18                 :             :  * Replication origin consists of a descriptive, user defined, external
      19                 :             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20                 :             :  * exists because replication origin have to be stored in WAL and shared
      21                 :             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22                 :             :  * for the internal id of a replication origin as it seems unlikely that there
      23                 :             :  * soon will be more than 65k nodes in one replication setup; and using only
      24                 :             :  * two bytes allow us to be more space efficient.
      25                 :             :  *
      26                 :             :  * Replication progress is tracked in a shared memory table
      27                 :             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28                 :             :  * ('slots') in this table are identified by the internal id. That's the case
      29                 :             :  * because it allows to increase replication progress during crash
      30                 :             :  * recovery. To allow doing so we store the original LSN (from the originating
      31                 :             :  * system) of a transaction in the commit record. That allows to recover the
      32                 :             :  * precise replayed state after crash recovery; without requiring synchronous
      33                 :             :  * commits. Allowing logical replication to use asynchronous commit is
      34                 :             :  * generally good for performance, but especially important as it allows a
      35                 :             :  * single threaded replay process to keep up with a source that has multiple
      36                 :             :  * backends generating changes concurrently.  For efficiency and simplicity
      37                 :             :  * reasons a backend can setup one replication origin that's from then used as
      38                 :             :  * the source of changes produced by the backend, until reset again.
      39                 :             :  *
      40                 :             :  * This infrastructure is intended to be used in cooperation with logical
      41                 :             :  * decoding. When replaying from a remote system the configured origin is
      42                 :             :  * provided to output plugins, allowing prevention of replication loops and
      43                 :             :  * other filtering.
      44                 :             :  *
      45                 :             :  * There are several levels of locking at work:
      46                 :             :  *
      47                 :             :  * * To create and drop replication origins an exclusive lock on
      48                 :             :  *       pg_replication_slot is required for the duration. That allows us to
      49                 :             :  *       safely and conflict free assign new origins using a dirty snapshot.
      50                 :             :  *
      51                 :             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52                 :             :  *       LWLock has to be held exclusively; when iterating over the replication
      53                 :             :  *       progress a shared lock has to be held, the same when advancing the
      54                 :             :  *       replication progress of an individual backend that has not setup as the
      55                 :             :  *       session's replication origin.
      56                 :             :  *
      57                 :             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58                 :             :  *       replication progress slot that slot's lwlock has to be held. That's
      59                 :             :  *       primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60                 :             :  *       all our platforms, but it also simplifies memory ordering concerns
      61                 :             :  *       between the remote and local lsn. We use a lwlock instead of a spinlock
      62                 :             :  *       so it's less harmful to hold the lock over a WAL write
      63                 :             :  *       (cf. AdvanceReplicationProgress).
      64                 :             :  *
      65                 :             :  * ---------------------------------------------------------------------------
      66                 :             :  */
      67                 :             : 
      68                 :             : #include "postgres.h"
      69                 :             : 
      70                 :             : #include <unistd.h>
      71                 :             : #include <sys/stat.h>
      72                 :             : 
      73                 :             : #include "access/genam.h"
      74                 :             : #include "access/htup_details.h"
      75                 :             : #include "access/table.h"
      76                 :             : #include "access/xact.h"
      77                 :             : #include "access/xloginsert.h"
      78                 :             : #include "catalog/catalog.h"
      79                 :             : #include "catalog/indexing.h"
      80                 :             : #include "catalog/pg_subscription.h"
      81                 :             : #include "funcapi.h"
      82                 :             : #include "miscadmin.h"
      83                 :             : #include "nodes/execnodes.h"
      84                 :             : #include "pgstat.h"
      85                 :             : #include "replication/origin.h"
      86                 :             : #include "replication/slot.h"
      87                 :             : #include "storage/condition_variable.h"
      88                 :             : #include "storage/fd.h"
      89                 :             : #include "storage/ipc.h"
      90                 :             : #include "storage/lmgr.h"
      91                 :             : #include "utils/builtins.h"
      92                 :             : #include "utils/fmgroids.h"
      93                 :             : #include "utils/guc.h"
      94                 :             : #include "utils/pg_lsn.h"
      95                 :             : #include "utils/rel.h"
      96                 :             : #include "utils/snapmgr.h"
      97                 :             : #include "utils/syscache.h"
      98                 :             : 
      99                 :             : /* paths for replication origin checkpoint files */
     100                 :             : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     101                 :             : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     102                 :             : 
     103                 :             : /* GUC variables */
     104                 :             : int                     max_active_replication_origins = 10;
     105                 :             : 
     106                 :             : /*
     107                 :             :  * Replay progress of a single remote node.
     108                 :             :  */
     109                 :             : typedef struct ReplicationState
     110                 :             : {
     111                 :             :         /*
     112                 :             :          * Local identifier for the remote node.
     113                 :             :          */
     114                 :             :         RepOriginId roident;
     115                 :             : 
     116                 :             :         /*
     117                 :             :          * Location of the latest commit from the remote side.
     118                 :             :          */
     119                 :             :         XLogRecPtr      remote_lsn;
     120                 :             : 
     121                 :             :         /*
     122                 :             :          * Remember the local lsn of the commit record so we can XLogFlush() to it
     123                 :             :          * during a checkpoint so we know the commit record actually is safe on
     124                 :             :          * disk.
     125                 :             :          */
     126                 :             :         XLogRecPtr      local_lsn;
     127                 :             : 
     128                 :             :         /*
     129                 :             :          * PID of backend that's acquired slot, or 0 if none.
     130                 :             :          */
     131                 :             :         int                     acquired_by;
     132                 :             : 
     133                 :             :         /* Count of processes that are currently using this origin. */
     134                 :             :         int                     refcount;
     135                 :             : 
     136                 :             :         /*
     137                 :             :          * Condition variable that's signaled when acquired_by changes.
     138                 :             :          */
     139                 :             :         ConditionVariable origin_cv;
     140                 :             : 
     141                 :             :         /*
     142                 :             :          * Lock protecting remote_lsn and local_lsn.
     143                 :             :          */
     144                 :             :         LWLock          lock;
     145                 :             : } ReplicationState;
     146                 :             : 
     147                 :             : /*
     148                 :             :  * On disk version of ReplicationState.
     149                 :             :  */
     150                 :             : typedef struct ReplicationStateOnDisk
     151                 :             : {
     152                 :             :         RepOriginId roident;
     153                 :             :         XLogRecPtr      remote_lsn;
     154                 :             : } ReplicationStateOnDisk;
     155                 :             : 
     156                 :             : 
     157                 :             : typedef struct ReplicationStateCtl
     158                 :             : {
     159                 :             :         /* Tranche to use for per-origin LWLocks */
     160                 :             :         int                     tranche_id;
     161                 :             :         /* Array of length max_active_replication_origins */
     162                 :             :         ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     163                 :             : } ReplicationStateCtl;
     164                 :             : 
     165                 :             : /* external variables */
     166                 :             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     167                 :             : XLogRecPtr      replorigin_session_origin_lsn = InvalidXLogRecPtr;
     168                 :             : TimestampTz replorigin_session_origin_timestamp = 0;
     169                 :             : 
     170                 :             : /*
     171                 :             :  * Base address into a shared memory array of replication states of size
     172                 :             :  * max_active_replication_origins.
     173                 :             :  */
     174                 :             : static ReplicationState *replication_states;
     175                 :             : 
     176                 :             : /*
     177                 :             :  * Actual shared memory block (replication_states[] is now part of this).
     178                 :             :  */
     179                 :             : static ReplicationStateCtl *replication_states_ctl;
     180                 :             : 
     181                 :             : /*
     182                 :             :  * We keep a pointer to this backend's ReplicationState to avoid having to
     183                 :             :  * search the replication_states array in replorigin_session_advance for each
     184                 :             :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     185                 :             :  * that backend.)
     186                 :             :  */
     187                 :             : static ReplicationState *session_replication_state = NULL;
     188                 :             : 
     189                 :             : /* Magic for on disk files. */
     190                 :             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     191                 :             : 
     192                 :             : static void
     193                 :           1 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     194                 :             : {
     195   [ -  +  #  # ]:           1 :         if (check_origins && max_active_replication_origins == 0)
     196   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     197                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     198                 :             :                                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     199                 :             : 
     200   [ +  -  +  - ]:           1 :         if (!recoveryOK && RecoveryInProgress())
     201   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     202                 :             :                                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     203                 :             :                                  errmsg("cannot manipulate replication origins during recovery")));
     204                 :           1 : }
     205                 :             : 
     206                 :             : 
     207                 :             : /*
     208                 :             :  * IsReservedOriginName
     209                 :             :  *              True iff name is either "none" or "any".
     210                 :             :  */
     211                 :             : static bool
     212                 :           1 : IsReservedOriginName(const char *name)
     213                 :             : {
     214         [ -  + ]:           1 :         return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     215                 :           1 :                         (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     216                 :             : }
     217                 :             : 
     218                 :             : /* ---------------------------------------------------------------------------
     219                 :             :  * Functions for working with replication origins themselves.
     220                 :             :  * ---------------------------------------------------------------------------
     221                 :             :  */
     222                 :             : 
     223                 :             : /*
     224                 :             :  * Check for a persistent replication origin identified by name.
     225                 :             :  *
     226                 :             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     227                 :             :  */
     228                 :             : RepOriginId
     229                 :          14 : replorigin_by_name(const char *roname, bool missing_ok)
     230                 :             : {
     231                 :          14 :         Form_pg_replication_origin ident;
     232                 :          14 :         Oid                     roident = InvalidOid;
     233                 :          14 :         HeapTuple       tuple;
     234                 :          14 :         Datum           roname_d;
     235                 :             : 
     236                 :          14 :         roname_d = CStringGetTextDatum(roname);
     237                 :             : 
     238                 :          14 :         tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     239         [ +  - ]:          14 :         if (HeapTupleIsValid(tuple))
     240                 :             :         {
     241                 :          14 :                 ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     242                 :          14 :                 roident = ident->roident;
     243                 :          14 :                 ReleaseSysCache(tuple);
     244                 :          14 :         }
     245         [ #  # ]:           0 :         else if (!missing_ok)
     246   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     247                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     248                 :             :                                  errmsg("replication origin \"%s\" does not exist",
     249                 :             :                                                 roname)));
     250                 :             : 
     251                 :          28 :         return roident;
     252                 :          14 : }
     253                 :             : 
     254                 :             : /*
     255                 :             :  * Create a replication origin.
     256                 :             :  *
     257                 :             :  * Needs to be called in a transaction.
     258                 :             :  */
     259                 :             : RepOriginId
     260                 :          15 : replorigin_create(const char *roname)
     261                 :             : {
     262                 :          15 :         Oid                     roident;
     263                 :          15 :         HeapTuple       tuple = NULL;
     264                 :          15 :         Relation        rel;
     265                 :          15 :         Datum           roname_d;
     266                 :          15 :         SnapshotData SnapshotDirty;
     267                 :          15 :         SysScanDesc scan;
     268                 :          15 :         ScanKeyData key;
     269                 :             : 
     270                 :             :         /*
     271                 :             :          * To avoid needing a TOAST table for pg_replication_origin, we limit
     272                 :             :          * replication origin names to 512 bytes.  This should be more than enough
     273                 :             :          * for all practical use.
     274                 :             :          */
     275         [ +  + ]:          15 :         if (strlen(roname) > MAX_RONAME_LEN)
     276   [ +  -  +  - ]:           1 :                 ereport(ERROR,
     277                 :             :                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     278                 :             :                                  errmsg("replication origin name is too long"),
     279                 :             :                                  errdetail("Replication origin names must be no longer than %d bytes.",
     280                 :             :                                                    MAX_RONAME_LEN)));
     281                 :             : 
     282                 :          14 :         roname_d = CStringGetTextDatum(roname);
     283                 :             : 
     284         [ +  - ]:          14 :         Assert(IsTransactionState());
     285                 :             : 
     286                 :             :         /*
     287                 :             :          * We need the numeric replication origin to be 16bit wide, so we cannot
     288                 :             :          * rely on the normal oid allocation. Instead we simply scan
     289                 :             :          * pg_replication_origin for the first unused id. That's not particularly
     290                 :             :          * efficient, but this should be a fairly infrequent operation - we can
     291                 :             :          * easily spend a bit more code on this when it turns out it needs to be
     292                 :             :          * faster.
     293                 :             :          *
     294                 :             :          * We handle concurrency by taking an exclusive lock (allowing reads!)
     295                 :             :          * over the table for the duration of the search. Because we use a "dirty
     296                 :             :          * snapshot" we can read rows that other in-progress sessions have
     297                 :             :          * written, even though they would be invisible with normal snapshots. Due
     298                 :             :          * to the exclusive lock there's no danger that new rows can appear while
     299                 :             :          * we're checking.
     300                 :             :          */
     301                 :          14 :         InitDirtySnapshot(SnapshotDirty);
     302                 :             : 
     303                 :          14 :         rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     304                 :             : 
     305                 :             :         /*
     306                 :             :          * We want to be able to access pg_replication_origin without setting up a
     307                 :             :          * snapshot.  To make that safe, it needs to not have a TOAST table, since
     308                 :             :          * TOASTed data cannot be fetched without a snapshot.  As of this writing,
     309                 :             :          * its only varlena column is roname, which we limit to 512 bytes to avoid
     310                 :             :          * needing out-of-line storage.  If you add a TOAST table to this catalog,
     311                 :             :          * be sure to set up a snapshot everywhere it might be needed.  For more
     312                 :             :          * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
     313                 :             :          */
     314         [ +  - ]:          14 :         Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
     315                 :             : 
     316         [ -  + ]:          18 :         for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     317                 :             :         {
     318                 :          18 :                 bool            nulls[Natts_pg_replication_origin];
     319                 :          18 :                 Datum           values[Natts_pg_replication_origin];
     320                 :          18 :                 bool            collides;
     321                 :             : 
     322         [ +  - ]:          18 :                 CHECK_FOR_INTERRUPTS();
     323                 :             : 
     324                 :          18 :                 ScanKeyInit(&key,
     325                 :             :                                         Anum_pg_replication_origin_roident,
     326                 :             :                                         BTEqualStrategyNumber, F_OIDEQ,
     327                 :          18 :                                         ObjectIdGetDatum(roident));
     328                 :             : 
     329                 :          18 :                 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     330                 :             :                                                                   true /* indexOK */ ,
     331                 :             :                                                                   &SnapshotDirty,
     332                 :             :                                                                   1, &key);
     333                 :             : 
     334                 :          18 :                 collides = HeapTupleIsValid(systable_getnext(scan));
     335                 :             : 
     336                 :          18 :                 systable_endscan(scan);
     337                 :             : 
     338         [ +  + ]:          18 :                 if (!collides)
     339                 :             :                 {
     340                 :             :                         /*
     341                 :             :                          * Ok, found an unused roident, insert the new row and do a CCI,
     342                 :             :                          * so our callers can look it up if they want to.
     343                 :             :                          */
     344                 :          14 :                         memset(&nulls, 0, sizeof(nulls));
     345                 :             : 
     346                 :          14 :                         values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     347                 :          14 :                         values[Anum_pg_replication_origin_roname - 1] = roname_d;
     348                 :             : 
     349                 :          14 :                         tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     350                 :          14 :                         CatalogTupleInsert(rel, tuple);
     351                 :          14 :                         CommandCounterIncrement();
     352                 :          14 :                         break;
     353                 :             :                 }
     354      [ -  +  + ]:          18 :         }
     355                 :             : 
     356                 :             :         /* now release lock again,      */
     357                 :          14 :         table_close(rel, ExclusiveLock);
     358                 :             : 
     359         [ +  - ]:          14 :         if (tuple == NULL)
     360   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     361                 :             :                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     362                 :             :                                  errmsg("could not find free replication origin ID")));
     363                 :             : 
     364                 :          14 :         heap_freetuple(tuple);
     365                 :          28 :         return roident;
     366                 :          14 : }
     367                 :             : 
     368                 :             : /*
     369                 :             :  * Helper function to drop a replication origin.
     370                 :             :  */
     371                 :             : static void
     372                 :          13 : replorigin_state_clear(RepOriginId roident, bool nowait)
     373                 :             : {
     374                 :          13 :         int                     i;
     375                 :             : 
     376                 :             :         /*
     377                 :             :          * Clean up the slot state info, if there is any matching slot.
     378                 :             :          */
     379                 :             : restart:
     380                 :          13 :         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     381                 :             : 
     382         [ +  + ]:         143 :         for (i = 0; i < max_active_replication_origins; i++)
     383                 :             :         {
     384                 :         130 :                 ReplicationState *state = &replication_states[i];
     385                 :             : 
     386         [ -  + ]:         130 :                 if (state->roident == roident)
     387                 :             :                 {
     388                 :             :                         /* found our slot, is it busy? */
     389         [ #  # ]:           0 :                         if (state->refcount > 0)
     390                 :             :                         {
     391                 :           0 :                                 ConditionVariable *cv;
     392                 :             : 
     393         [ #  # ]:           0 :                                 if (nowait)
     394   [ #  #  #  #  :           0 :                                         ereport(ERROR,
                   #  # ]
     395                 :             :                                                         (errcode(ERRCODE_OBJECT_IN_USE),
     396                 :             :                                                          (state->acquired_by != 0)
     397                 :             :                                                          ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
     398                 :             :                                                                           state->roident,
     399                 :             :                                                                           state->acquired_by)
     400                 :             :                                                          : errmsg("could not drop replication origin with ID %d, in use by another process",
     401                 :             :                                                                           state->roident)));
     402                 :             : 
     403                 :             :                                 /*
     404                 :             :                                  * We must wait and then retry.  Since we don't know which CV
     405                 :             :                                  * to wait on until here, we can't readily use
     406                 :             :                                  * ConditionVariablePrepareToSleep (calling it here would be
     407                 :             :                                  * wrong, since we could miss the signal if we did so); just
     408                 :             :                                  * use ConditionVariableSleep directly.
     409                 :             :                                  */
     410                 :           0 :                                 cv = &state->origin_cv;
     411                 :             : 
     412                 :           0 :                                 LWLockRelease(ReplicationOriginLock);
     413                 :             : 
     414                 :           0 :                                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     415                 :             :                                 goto restart;
     416                 :           0 :                         }
     417                 :             : 
     418                 :             :                         /* first make a WAL log entry */
     419                 :             :                         {
     420                 :           0 :                                 xl_replorigin_drop xlrec;
     421                 :             : 
     422                 :           0 :                                 xlrec.node_id = roident;
     423                 :           0 :                                 XLogBeginInsert();
     424                 :           0 :                                 XLogRegisterData(&xlrec, sizeof(xlrec));
     425                 :           0 :                                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     426                 :           0 :                         }
     427                 :             : 
     428                 :             :                         /* then clear the in-memory slot */
     429                 :           0 :                         state->roident = InvalidRepOriginId;
     430                 :           0 :                         state->remote_lsn = InvalidXLogRecPtr;
     431                 :           0 :                         state->local_lsn = InvalidXLogRecPtr;
     432                 :           0 :                         break;
     433                 :             :                 }
     434   [ -  -  -  + ]:         130 :         }
     435                 :          13 :         LWLockRelease(ReplicationOriginLock);
     436                 :          13 :         ConditionVariableCancelSleep();
     437                 :          13 : }
     438                 :             : 
     439                 :             : /*
     440                 :             :  * Drop replication origin (by name).
     441                 :             :  *
     442                 :             :  * Needs to be called in a transaction.
     443                 :             :  */
     444                 :             : void
     445                 :          13 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     446                 :             : {
     447                 :          13 :         RepOriginId roident;
     448                 :          13 :         Relation        rel;
     449                 :          13 :         HeapTuple       tuple;
     450                 :             : 
     451         [ +  - ]:          13 :         Assert(IsTransactionState());
     452                 :             : 
     453                 :          13 :         rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     454                 :             : 
     455                 :          13 :         roident = replorigin_by_name(name, missing_ok);
     456                 :             : 
     457                 :             :         /* Lock the origin to prevent concurrent drops. */
     458                 :          13 :         LockSharedObject(ReplicationOriginRelationId, roident, 0,
     459                 :             :                                          AccessExclusiveLock);
     460                 :             : 
     461                 :          13 :         tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     462         [ +  - ]:          13 :         if (!HeapTupleIsValid(tuple))
     463                 :             :         {
     464         [ #  # ]:           0 :                 if (!missing_ok)
     465   [ #  #  #  # ]:           0 :                         elog(ERROR, "cache lookup failed for replication origin with ID %d",
     466                 :             :                                  roident);
     467                 :             : 
     468                 :             :                 /*
     469                 :             :                  * We don't need to retain the locks if the origin is already dropped.
     470                 :             :                  */
     471                 :           0 :                 UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     472                 :             :                                                    AccessExclusiveLock);
     473                 :           0 :                 table_close(rel, RowExclusiveLock);
     474                 :           0 :                 return;
     475                 :             :         }
     476                 :             : 
     477                 :          13 :         replorigin_state_clear(roident, nowait);
     478                 :             : 
     479                 :             :         /*
     480                 :             :          * Now, we can delete the catalog entry.
     481                 :             :          */
     482                 :          13 :         CatalogTupleDelete(rel, &tuple->t_self);
     483                 :          13 :         ReleaseSysCache(tuple);
     484                 :             : 
     485                 :          13 :         CommandCounterIncrement();
     486                 :             : 
     487                 :             :         /* We keep the lock on pg_replication_origin until commit */
     488                 :          13 :         table_close(rel, NoLock);
     489         [ -  + ]:          13 : }
     490                 :             : 
     491                 :             : /*
     492                 :             :  * Lookup replication origin via its oid and return the name.
     493                 :             :  *
     494                 :             :  * The external name is palloc'd in the calling context.
     495                 :             :  *
     496                 :             :  * Returns true if the origin is known, false otherwise.
     497                 :             :  */
     498                 :             : bool
     499                 :           0 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     500                 :             : {
     501                 :           0 :         HeapTuple       tuple;
     502                 :           0 :         Form_pg_replication_origin ric;
     503                 :             : 
     504         [ #  # ]:           0 :         Assert(OidIsValid((Oid) roident));
     505         [ #  # ]:           0 :         Assert(roident != InvalidRepOriginId);
     506         [ #  # ]:           0 :         Assert(roident != DoNotReplicateId);
     507                 :             : 
     508                 :           0 :         tuple = SearchSysCache1(REPLORIGIDENT,
     509                 :           0 :                                                         ObjectIdGetDatum((Oid) roident));
     510                 :             : 
     511         [ #  # ]:           0 :         if (HeapTupleIsValid(tuple))
     512                 :             :         {
     513                 :           0 :                 ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     514                 :           0 :                 *roname = text_to_cstring(&ric->roname);
     515                 :           0 :                 ReleaseSysCache(tuple);
     516                 :             : 
     517                 :           0 :                 return true;
     518                 :             :         }
     519                 :             :         else
     520                 :             :         {
     521                 :           0 :                 *roname = NULL;
     522                 :             : 
     523         [ #  # ]:           0 :                 if (!missing_ok)
     524   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     525                 :             :                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     526                 :             :                                          errmsg("replication origin with ID %d does not exist",
     527                 :             :                                                         roident)));
     528                 :             : 
     529                 :           0 :                 return false;
     530                 :             :         }
     531                 :           0 : }
     532                 :             : 
     533                 :             : 
     534                 :             : /* ---------------------------------------------------------------------------
     535                 :             :  * Functions for handling replication progress.
     536                 :             :  * ---------------------------------------------------------------------------
     537                 :             :  */
     538                 :             : 
     539                 :             : Size
     540                 :          21 : ReplicationOriginShmemSize(void)
     541                 :             : {
     542                 :          21 :         Size            size = 0;
     543                 :             : 
     544         [ +  - ]:          21 :         if (max_active_replication_origins == 0)
     545                 :           0 :                 return size;
     546                 :             : 
     547                 :          21 :         size = add_size(size, offsetof(ReplicationStateCtl, states));
     548                 :             : 
     549                 :          42 :         size = add_size(size,
     550                 :          21 :                                         mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     551                 :          21 :         return size;
     552                 :          21 : }
     553                 :             : 
     554                 :             : void
     555                 :           6 : ReplicationOriginShmemInit(void)
     556                 :             : {
     557                 :           6 :         bool            found;
     558                 :             : 
     559         [ +  - ]:           6 :         if (max_active_replication_origins == 0)
     560                 :           0 :                 return;
     561                 :             : 
     562                 :           6 :         replication_states_ctl = (ReplicationStateCtl *)
     563                 :           6 :                 ShmemInitStruct("ReplicationOriginState",
     564                 :           6 :                                                 ReplicationOriginShmemSize(),
     565                 :             :                                                 &found);
     566                 :           6 :         replication_states = replication_states_ctl->states;
     567                 :             : 
     568         [ -  + ]:           6 :         if (!found)
     569                 :             :         {
     570                 :           6 :                 int                     i;
     571                 :             : 
     572   [ +  -  +  -  :         492 :                 MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
          +  -  -  +  +  
                      + ]
     573                 :             : 
     574                 :           6 :                 replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     575                 :             : 
     576         [ +  + ]:          66 :                 for (i = 0; i < max_active_replication_origins; i++)
     577                 :             :                 {
     578                 :         120 :                         LWLockInitialize(&replication_states[i].lock,
     579                 :          60 :                                                          replication_states_ctl->tranche_id);
     580                 :          60 :                         ConditionVariableInit(&replication_states[i].origin_cv);
     581                 :          60 :                 }
     582                 :           6 :         }
     583         [ -  + ]:           6 : }
     584                 :             : 
     585                 :             : /* ---------------------------------------------------------------------------
     586                 :             :  * Perform a checkpoint of each replication origin's progress with respect to
     587                 :             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     588                 :             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     589                 :             :  * if the transactions were originally committed asynchronously.
     590                 :             :  *
     591                 :             :  * We store checkpoints in the following format:
     592                 :             :  * +-------+------------------------+------------------+-----+--------+
     593                 :             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     594                 :             :  * +-------+------------------------+------------------+-----+--------+
     595                 :             :  *
     596                 :             :  * So its just the magic, followed by the statically sized
     597                 :             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     598                 :             :  * ReplicationState is determined by max_active_replication_origins.
     599                 :             :  * ---------------------------------------------------------------------------
     600                 :             :  */
     601                 :             : void
     602                 :           7 : CheckPointReplicationOrigin(void)
     603                 :             : {
     604                 :           7 :         const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     605                 :           7 :         const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     606                 :           7 :         int                     tmpfd;
     607                 :           7 :         int                     i;
     608                 :           7 :         uint32          magic = REPLICATION_STATE_MAGIC;
     609                 :           7 :         pg_crc32c       crc;
     610                 :             : 
     611         [ +  - ]:           7 :         if (max_active_replication_origins == 0)
     612                 :           0 :                 return;
     613                 :             : 
     614                 :           7 :         INIT_CRC32C(crc);
     615                 :             : 
     616                 :             :         /* make sure no old temp file is remaining */
     617   [ +  -  +  - ]:           7 :         if (unlink(tmppath) < 0 && errno != ENOENT)
     618   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     619                 :             :                                 (errcode_for_file_access(),
     620                 :             :                                  errmsg("could not remove file \"%s\": %m",
     621                 :             :                                                 tmppath)));
     622                 :             : 
     623                 :             :         /*
     624                 :             :          * no other backend can perform this at the same time; only one checkpoint
     625                 :             :          * can happen at a time.
     626                 :             :          */
     627                 :           7 :         tmpfd = OpenTransientFile(tmppath,
     628                 :             :                                                           O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     629         [ +  - ]:           7 :         if (tmpfd < 0)
     630   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     631                 :             :                                 (errcode_for_file_access(),
     632                 :             :                                  errmsg("could not create file \"%s\": %m",
     633                 :             :                                                 tmppath)));
     634                 :             : 
     635                 :             :         /* write magic */
     636                 :           7 :         errno = 0;
     637         [ +  - ]:           7 :         if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     638                 :             :         {
     639                 :             :                 /* if write didn't set errno, assume problem is no disk space */
     640         [ #  # ]:           0 :                 if (errno == 0)
     641                 :           0 :                         errno = ENOSPC;
     642   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     643                 :             :                                 (errcode_for_file_access(),
     644                 :             :                                  errmsg("could not write to file \"%s\": %m",
     645                 :             :                                                 tmppath)));
     646                 :           0 :         }
     647                 :           7 :         COMP_CRC32C(crc, &magic, sizeof(magic));
     648                 :             : 
     649                 :             :         /* prevent concurrent creations/drops */
     650                 :           7 :         LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     651                 :             : 
     652                 :             :         /* write actual data */
     653         [ +  + ]:          77 :         for (i = 0; i < max_active_replication_origins; i++)
     654                 :             :         {
     655                 :          70 :                 ReplicationStateOnDisk disk_state;
     656                 :          70 :                 ReplicationState *curstate = &replication_states[i];
     657                 :          70 :                 XLogRecPtr      local_lsn;
     658                 :             : 
     659         [ -  + ]:          70 :                 if (curstate->roident == InvalidRepOriginId)
     660                 :          70 :                         continue;
     661                 :             : 
     662                 :             :                 /* zero, to avoid uninitialized padding bytes */
     663                 :           0 :                 memset(&disk_state, 0, sizeof(disk_state));
     664                 :             : 
     665                 :           0 :                 LWLockAcquire(&curstate->lock, LW_SHARED);
     666                 :             : 
     667                 :           0 :                 disk_state.roident = curstate->roident;
     668                 :             : 
     669                 :           0 :                 disk_state.remote_lsn = curstate->remote_lsn;
     670                 :           0 :                 local_lsn = curstate->local_lsn;
     671                 :             : 
     672                 :           0 :                 LWLockRelease(&curstate->lock);
     673                 :             : 
     674                 :             :                 /* make sure we only write out a commit that's persistent */
     675                 :           0 :                 XLogFlush(local_lsn);
     676                 :             : 
     677                 :           0 :                 errno = 0;
     678         [ #  # ]:           0 :                 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     679                 :             :                         sizeof(disk_state))
     680                 :             :                 {
     681                 :             :                         /* if write didn't set errno, assume problem is no disk space */
     682         [ #  # ]:           0 :                         if (errno == 0)
     683                 :           0 :                                 errno = ENOSPC;
     684   [ #  #  #  # ]:           0 :                         ereport(PANIC,
     685                 :             :                                         (errcode_for_file_access(),
     686                 :             :                                          errmsg("could not write to file \"%s\": %m",
     687                 :             :                                                         tmppath)));
     688                 :           0 :                 }
     689                 :             : 
     690                 :           0 :                 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     691         [ +  - ]:          70 :         }
     692                 :             : 
     693                 :           7 :         LWLockRelease(ReplicationOriginLock);
     694                 :             : 
     695                 :             :         /* write out the CRC */
     696                 :           7 :         FIN_CRC32C(crc);
     697                 :           7 :         errno = 0;
     698         [ +  - ]:           7 :         if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     699                 :             :         {
     700                 :             :                 /* if write didn't set errno, assume problem is no disk space */
     701         [ #  # ]:           0 :                 if (errno == 0)
     702                 :           0 :                         errno = ENOSPC;
     703   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     704                 :             :                                 (errcode_for_file_access(),
     705                 :             :                                  errmsg("could not write to file \"%s\": %m",
     706                 :             :                                                 tmppath)));
     707                 :           0 :         }
     708                 :             : 
     709         [ +  - ]:           7 :         if (CloseTransientFile(tmpfd) != 0)
     710   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     711                 :             :                                 (errcode_for_file_access(),
     712                 :             :                                  errmsg("could not close file \"%s\": %m",
     713                 :             :                                                 tmppath)));
     714                 :             : 
     715                 :             :         /* fsync, rename to permanent file, fsync file and directory */
     716                 :           7 :         durable_rename(tmppath, path, PANIC);
     717                 :           7 : }
     718                 :             : 
     719                 :             : /*
     720                 :             :  * Recover replication replay status from checkpoint data saved earlier by
     721                 :             :  * CheckPointReplicationOrigin.
     722                 :             :  *
     723                 :             :  * This only needs to be called at startup and *not* during every checkpoint
     724                 :             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     725                 :             :  * state thereafter can be recovered by looking at commit records.
     726                 :             :  */
     727                 :             : void
     728                 :           4 : StartupReplicationOrigin(void)
     729                 :             : {
     730                 :           4 :         const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     731                 :           4 :         int                     fd;
     732                 :           4 :         int                     readBytes;
     733                 :           4 :         uint32          magic = REPLICATION_STATE_MAGIC;
     734                 :           4 :         int                     last_state = 0;
     735                 :           4 :         pg_crc32c       file_crc;
     736                 :           4 :         pg_crc32c       crc;
     737                 :             : 
     738                 :             :         /* don't want to overwrite already existing state */
     739                 :             : #ifdef USE_ASSERT_CHECKING
     740                 :             :         static bool already_started = false;
     741                 :             : 
     742         [ +  - ]:           4 :         Assert(!already_started);
     743                 :           4 :         already_started = true;
     744                 :             : #endif
     745                 :             : 
     746         [ +  - ]:           4 :         if (max_active_replication_origins == 0)
     747                 :           0 :                 return;
     748                 :             : 
     749                 :           4 :         INIT_CRC32C(crc);
     750                 :             : 
     751   [ -  +  -  + ]:           4 :         elog(DEBUG2, "starting up replication origin progress state");
     752                 :             : 
     753                 :           4 :         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     754                 :             : 
     755                 :             :         /*
     756                 :             :          * might have had max_active_replication_origins == 0 last run, or we just
     757                 :             :          * brought up a standby.
     758                 :             :          */
     759   [ +  +  -  + ]:           4 :         if (fd < 0 && errno == ENOENT)
     760                 :           1 :                 return;
     761         [ +  - ]:           3 :         else if (fd < 0)
     762   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     763                 :             :                                 (errcode_for_file_access(),
     764                 :             :                                  errmsg("could not open file \"%s\": %m",
     765                 :             :                                                 path)));
     766                 :             : 
     767                 :             :         /* verify magic, that is written even if nothing was active */
     768                 :           3 :         readBytes = read(fd, &magic, sizeof(magic));
     769         [ +  - ]:           3 :         if (readBytes != sizeof(magic))
     770                 :             :         {
     771         [ #  # ]:           0 :                 if (readBytes < 0)
     772   [ #  #  #  # ]:           0 :                         ereport(PANIC,
     773                 :             :                                         (errcode_for_file_access(),
     774                 :             :                                          errmsg("could not read file \"%s\": %m",
     775                 :             :                                                         path)));
     776                 :             :                 else
     777   [ #  #  #  # ]:           0 :                         ereport(PANIC,
     778                 :             :                                         (errcode(ERRCODE_DATA_CORRUPTED),
     779                 :             :                                          errmsg("could not read file \"%s\": read %d of %zu",
     780                 :             :                                                         path, readBytes, sizeof(magic))));
     781                 :           0 :         }
     782                 :           3 :         COMP_CRC32C(crc, &magic, sizeof(magic));
     783                 :             : 
     784         [ +  - ]:           3 :         if (magic != REPLICATION_STATE_MAGIC)
     785   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     786                 :             :                                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     787                 :             :                                                 magic, REPLICATION_STATE_MAGIC)));
     788                 :             : 
     789                 :             :         /* we can skip locking here, no other access is possible */
     790                 :             : 
     791                 :             :         /* recover individual states, until there are no more to be found */
     792                 :           3 :         while (true)
     793                 :             :         {
     794                 :           3 :                 ReplicationStateOnDisk disk_state;
     795                 :             : 
     796                 :           3 :                 readBytes = read(fd, &disk_state, sizeof(disk_state));
     797                 :             : 
     798         [ +  - ]:           3 :                 if (readBytes < 0)
     799                 :             :                 {
     800   [ #  #  #  # ]:           0 :                         ereport(PANIC,
     801                 :             :                                         (errcode_for_file_access(),
     802                 :             :                                          errmsg("could not read file \"%s\": %m",
     803                 :             :                                                         path)));
     804                 :           0 :                 }
     805                 :             : 
     806                 :             :                 /* no further data */
     807         [ +  - ]:           3 :                 if (readBytes == sizeof(crc))
     808                 :             :                 {
     809                 :           3 :                         memcpy(&file_crc, &disk_state, sizeof(file_crc));
     810                 :           3 :                         break;
     811                 :             :                 }
     812                 :             : 
     813         [ #  # ]:           0 :                 if (readBytes != sizeof(disk_state))
     814                 :             :                 {
     815   [ #  #  #  # ]:           0 :                         ereport(PANIC,
     816                 :             :                                         (errcode_for_file_access(),
     817                 :             :                                          errmsg("could not read file \"%s\": read %d of %zu",
     818                 :             :                                                         path, readBytes, sizeof(disk_state))));
     819                 :           0 :                 }
     820                 :             : 
     821                 :           0 :                 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     822                 :             : 
     823         [ #  # ]:           0 :                 if (last_state == max_active_replication_origins)
     824   [ #  #  #  # ]:           0 :                         ereport(PANIC,
     825                 :             :                                         (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     826                 :             :                                          errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     827                 :             : 
     828                 :             :                 /* copy data to shared memory */
     829                 :           0 :                 replication_states[last_state].roident = disk_state.roident;
     830                 :           0 :                 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     831                 :           0 :                 last_state++;
     832                 :             : 
     833   [ #  #  #  # ]:           0 :                 ereport(LOG,
     834                 :             :                                 errmsg("recovered replication state of node %d to %X/%08X",
     835                 :             :                                            disk_state.roident,
     836                 :             :                                            LSN_FORMAT_ARGS(disk_state.remote_lsn)));
     837         [ -  + ]:           3 :         }
     838                 :             : 
     839                 :             :         /* now check checksum */
     840                 :           3 :         FIN_CRC32C(crc);
     841         [ +  - ]:           3 :         if (file_crc != crc)
     842   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     843                 :             :                                 (errcode(ERRCODE_DATA_CORRUPTED),
     844                 :             :                                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     845                 :             :                                                 crc, file_crc)));
     846                 :             : 
     847         [ +  - ]:           3 :         if (CloseTransientFile(fd) != 0)
     848   [ #  #  #  # ]:           0 :                 ereport(PANIC,
     849                 :             :                                 (errcode_for_file_access(),
     850                 :             :                                  errmsg("could not close file \"%s\": %m",
     851                 :             :                                                 path)));
     852                 :           4 : }
     853                 :             : 
     854                 :             : void
     855                 :           0 : replorigin_redo(XLogReaderState *record)
     856                 :             : {
     857                 :           0 :         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     858                 :             : 
     859      [ #  #  # ]:           0 :         switch (info)
     860                 :             :         {
     861                 :             :                 case XLOG_REPLORIGIN_SET:
     862                 :             :                         {
     863                 :           0 :                                 xl_replorigin_set *xlrec =
     864                 :           0 :                                         (xl_replorigin_set *) XLogRecGetData(record);
     865                 :             : 
     866                 :           0 :                                 replorigin_advance(xlrec->node_id,
     867                 :           0 :                                                                    xlrec->remote_lsn, record->EndRecPtr,
     868                 :           0 :                                                                    xlrec->force /* backward */ ,
     869                 :             :                                                                    false /* WAL log */ );
     870                 :             :                                 break;
     871                 :           0 :                         }
     872                 :             :                 case XLOG_REPLORIGIN_DROP:
     873                 :             :                         {
     874                 :           0 :                                 xl_replorigin_drop *xlrec;
     875                 :           0 :                                 int                     i;
     876                 :             : 
     877                 :           0 :                                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     878                 :             : 
     879         [ #  # ]:           0 :                                 for (i = 0; i < max_active_replication_origins; i++)
     880                 :             :                                 {
     881                 :           0 :                                         ReplicationState *state = &replication_states[i];
     882                 :             : 
     883                 :             :                                         /* found our slot */
     884         [ #  # ]:           0 :                                         if (state->roident == xlrec->node_id)
     885                 :             :                                         {
     886                 :             :                                                 /* reset entry */
     887                 :           0 :                                                 state->roident = InvalidRepOriginId;
     888                 :           0 :                                                 state->remote_lsn = InvalidXLogRecPtr;
     889                 :           0 :                                                 state->local_lsn = InvalidXLogRecPtr;
     890                 :           0 :                                                 break;
     891                 :             :                                         }
     892      [ #  #  # ]:           0 :                                 }
     893                 :             :                                 break;
     894                 :           0 :                         }
     895                 :             :                 default:
     896   [ #  #  #  # ]:           0 :                         elog(PANIC, "replorigin_redo: unknown op code %u", info);
     897                 :           0 :         }
     898                 :           0 : }
     899                 :             : 
     900                 :             : 
     901                 :             : /*
     902                 :             :  * Tell the replication origin progress machinery that a commit from 'node'
     903                 :             :  * that originated at the LSN remote_commit on the remote node was replayed
     904                 :             :  * successfully and that we don't need to do so again. In combination with
     905                 :             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     906                 :             :  * that ensures we won't lose knowledge about that after a crash if the
     907                 :             :  * transaction had a persistent effect (think of asynchronous commits).
     908                 :             :  *
     909                 :             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     910                 :             :  * upon a checkpoint that enough WAL has been persisted to disk.
     911                 :             :  *
     912                 :             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     913                 :             :  * unless running in recovery.
     914                 :             :  */
     915                 :             : void
     916                 :           0 : replorigin_advance(RepOriginId node,
     917                 :             :                                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     918                 :             :                                    bool go_backward, bool wal_log)
     919                 :             : {
     920                 :           0 :         int                     i;
     921                 :           0 :         ReplicationState *replication_state = NULL;
     922                 :           0 :         ReplicationState *free_state = NULL;
     923                 :             : 
     924         [ #  # ]:           0 :         Assert(node != InvalidRepOriginId);
     925                 :             : 
     926                 :             :         /* we don't track DoNotReplicateId */
     927         [ #  # ]:           0 :         if (node == DoNotReplicateId)
     928                 :           0 :                 return;
     929                 :             : 
     930                 :             :         /*
     931                 :             :          * XXX: For the case where this is called by WAL replay, it'd be more
     932                 :             :          * efficient to restore into a backend local hashtable and only dump into
     933                 :             :          * shmem after recovery is finished. Let's wait with implementing that
     934                 :             :          * till it's shown to be a measurable expense
     935                 :             :          */
     936                 :             : 
     937                 :             :         /* Lock exclusively, as we may have to create a new table entry. */
     938                 :           0 :         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     939                 :             : 
     940                 :             :         /*
     941                 :             :          * Search for either an existing slot for the origin, or a free one we can
     942                 :             :          * use.
     943                 :             :          */
     944         [ #  # ]:           0 :         for (i = 0; i < max_active_replication_origins; i++)
     945                 :             :         {
     946                 :           0 :                 ReplicationState *curstate = &replication_states[i];
     947                 :             : 
     948                 :             :                 /* remember where to insert if necessary */
     949   [ #  #  #  # ]:           0 :                 if (curstate->roident == InvalidRepOriginId &&
     950                 :           0 :                         free_state == NULL)
     951                 :             :                 {
     952                 :           0 :                         free_state = curstate;
     953                 :           0 :                         continue;
     954                 :             :                 }
     955                 :             : 
     956                 :             :                 /* not our slot */
     957         [ #  # ]:           0 :                 if (curstate->roident != node)
     958                 :             :                 {
     959                 :           0 :                         continue;
     960                 :             :                 }
     961                 :             : 
     962                 :             :                 /* ok, found slot */
     963                 :           0 :                 replication_state = curstate;
     964                 :             : 
     965                 :           0 :                 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     966                 :             : 
     967                 :             :                 /* Make sure it's not used by somebody else */
     968         [ #  # ]:           0 :                 if (replication_state->refcount > 0)
     969                 :             :                 {
     970   [ #  #  #  #  :           0 :                         ereport(ERROR,
                   #  # ]
     971                 :             :                                         (errcode(ERRCODE_OBJECT_IN_USE),
     972                 :             :                                          (replication_state->acquired_by != 0)
     973                 :             :                                          ? errmsg("replication origin with ID %d is already active for PID %d",
     974                 :             :                                                           replication_state->roident,
     975                 :             :                                                           replication_state->acquired_by)
     976                 :             :                                          : errmsg("replication origin with ID %d is already active in another process",
     977                 :             :                                                           replication_state->roident)));
     978                 :           0 :                 }
     979                 :             : 
     980                 :           0 :                 break;
     981         [ #  # ]:           0 :         }
     982                 :             : 
     983   [ #  #  #  # ]:           0 :         if (replication_state == NULL && free_state == NULL)
     984   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     985                 :             :                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     986                 :             :                                  errmsg("could not find free replication state slot for replication origin with ID %d",
     987                 :             :                                                 node),
     988                 :             :                                  errhint("Increase \"max_active_replication_origins\" and try again.")));
     989                 :             : 
     990         [ #  # ]:           0 :         if (replication_state == NULL)
     991                 :             :         {
     992                 :             :                 /* initialize new slot */
     993                 :           0 :                 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     994                 :           0 :                 replication_state = free_state;
     995         [ #  # ]:           0 :                 Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
     996         [ #  # ]:           0 :                 Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
     997                 :           0 :                 replication_state->roident = node;
     998                 :           0 :         }
     999                 :             : 
    1000         [ #  # ]:           0 :         Assert(replication_state->roident != InvalidRepOriginId);
    1001                 :             : 
    1002                 :             :         /*
    1003                 :             :          * If somebody "forcefully" sets this slot, WAL log it, so it's durable
    1004                 :             :          * and the standby gets the message. Primarily this will be called during
    1005                 :             :          * WAL replay (of commit records) where no WAL logging is necessary.
    1006                 :             :          */
    1007         [ #  # ]:           0 :         if (wal_log)
    1008                 :             :         {
    1009                 :           0 :                 xl_replorigin_set xlrec;
    1010                 :             : 
    1011                 :           0 :                 xlrec.remote_lsn = remote_commit;
    1012                 :           0 :                 xlrec.node_id = node;
    1013                 :           0 :                 xlrec.force = go_backward;
    1014                 :             : 
    1015                 :           0 :                 XLogBeginInsert();
    1016                 :           0 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
    1017                 :             : 
    1018                 :           0 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
    1019                 :           0 :         }
    1020                 :             : 
    1021                 :             :         /*
    1022                 :             :          * Due to - harmless - race conditions during a checkpoint we could see
    1023                 :             :          * values here that are older than the ones we already have in memory. We
    1024                 :             :          * could also see older values for prepared transactions when the prepare
    1025                 :             :          * is sent at a later point of time along with commit prepared and there
    1026                 :             :          * are other transactions commits between prepare and commit prepared. See
    1027                 :             :          * ReorderBufferFinishPrepared. Don't overwrite those.
    1028                 :             :          */
    1029   [ #  #  #  # ]:           0 :         if (go_backward || replication_state->remote_lsn < remote_commit)
    1030                 :           0 :                 replication_state->remote_lsn = remote_commit;
    1031   [ #  #  #  # ]:           0 :         if (XLogRecPtrIsValid(local_commit) &&
    1032         [ #  # ]:           0 :                 (go_backward || replication_state->local_lsn < local_commit))
    1033                 :           0 :                 replication_state->local_lsn = local_commit;
    1034                 :           0 :         LWLockRelease(&replication_state->lock);
    1035                 :             : 
    1036                 :             :         /*
    1037                 :             :          * Release *after* changing the LSNs, slot isn't acquired and thus could
    1038                 :             :          * otherwise be dropped anytime.
    1039                 :             :          */
    1040                 :           0 :         LWLockRelease(ReplicationOriginLock);
    1041                 :           0 : }
    1042                 :             : 
    1043                 :             : 
    1044                 :             : XLogRecPtr
    1045                 :           1 : replorigin_get_progress(RepOriginId node, bool flush)
    1046                 :             : {
    1047                 :           1 :         int                     i;
    1048                 :           1 :         XLogRecPtr      local_lsn = InvalidXLogRecPtr;
    1049                 :           1 :         XLogRecPtr      remote_lsn = InvalidXLogRecPtr;
    1050                 :             : 
    1051                 :             :         /* prevent slots from being concurrently dropped */
    1052                 :           1 :         LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1053                 :             : 
    1054         [ +  + ]:          11 :         for (i = 0; i < max_active_replication_origins; i++)
    1055                 :             :         {
    1056                 :          10 :                 ReplicationState *state;
    1057                 :             : 
    1058                 :          10 :                 state = &replication_states[i];
    1059                 :             : 
    1060         [ -  + ]:          10 :                 if (state->roident == node)
    1061                 :             :                 {
    1062                 :           0 :                         LWLockAcquire(&state->lock, LW_SHARED);
    1063                 :             : 
    1064                 :           0 :                         remote_lsn = state->remote_lsn;
    1065                 :           0 :                         local_lsn = state->local_lsn;
    1066                 :             : 
    1067                 :           0 :                         LWLockRelease(&state->lock);
    1068                 :             : 
    1069                 :           0 :                         break;
    1070                 :             :                 }
    1071      [ -  -  + ]:          10 :         }
    1072                 :             : 
    1073                 :           1 :         LWLockRelease(ReplicationOriginLock);
    1074                 :             : 
    1075   [ -  +  #  # ]:           1 :         if (flush && XLogRecPtrIsValid(local_lsn))
    1076                 :           0 :                 XLogFlush(local_lsn);
    1077                 :             : 
    1078                 :           2 :         return remote_lsn;
    1079                 :           1 : }
    1080                 :             : 
    1081                 :             : /* Helper function to reset the session replication origin */
    1082                 :             : static void
    1083                 :           0 : replorigin_session_reset_internal(void)
    1084                 :             : {
    1085                 :           0 :         ConditionVariable *cv;
    1086                 :             : 
    1087         [ #  # ]:           0 :         Assert(session_replication_state != NULL);
    1088                 :             : 
    1089                 :           0 :         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1090                 :             : 
    1091                 :             :         /* The origin must be held by at least one process at this point. */
    1092         [ #  # ]:           0 :         Assert(session_replication_state->refcount > 0);
    1093                 :             : 
    1094                 :             :         /*
    1095                 :             :          * Reset the PID only if the current session is the first to set up this
    1096                 :             :          * origin. This avoids clearing the first process's PID when any other
    1097                 :             :          * session releases the origin.
    1098                 :             :          */
    1099         [ #  # ]:           0 :         if (session_replication_state->acquired_by == MyProcPid)
    1100                 :           0 :                 session_replication_state->acquired_by = 0;
    1101                 :             : 
    1102                 :           0 :         session_replication_state->refcount--;
    1103                 :             : 
    1104                 :           0 :         cv = &session_replication_state->origin_cv;
    1105                 :           0 :         session_replication_state = NULL;
    1106                 :             : 
    1107                 :           0 :         LWLockRelease(ReplicationOriginLock);
    1108                 :             : 
    1109                 :           0 :         ConditionVariableBroadcast(cv);
    1110                 :           0 : }
    1111                 :             : 
    1112                 :             : /*
    1113                 :             :  * Tear down a (possibly) configured session replication origin during process
    1114                 :             :  * exit.
    1115                 :             :  */
    1116                 :             : static void
    1117                 :           0 : ReplicationOriginExitCleanup(int code, Datum arg)
    1118                 :             : {
    1119         [ #  # ]:           0 :         if (session_replication_state == NULL)
    1120                 :           0 :                 return;
    1121                 :             : 
    1122                 :           0 :         replorigin_session_reset_internal();
    1123                 :           0 : }
    1124                 :             : 
    1125                 :             : /*
    1126                 :             :  * Setup a replication origin in the shared memory struct if it doesn't
    1127                 :             :  * already exist and cache access to the specific ReplicationSlot so the
    1128                 :             :  * array doesn't have to be searched when calling
    1129                 :             :  * replorigin_session_advance().
    1130                 :             :  *
    1131                 :             :  * Normally only one such cached origin can exist per process so the cached
    1132                 :             :  * value can only be set again after the previous value is torn down with
    1133                 :             :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1134                 :             :  * (meaning the slot is not allowed to be already acquired by another process).
    1135                 :             :  *
    1136                 :             :  * However, sometimes multiple processes can safely re-use the same origin slot
    1137                 :             :  * (for example, multiple parallel apply processes can safely use the same
    1138                 :             :  * origin, provided they maintain commit order by allowing only one process to
    1139                 :             :  * commit at a time). For this case the first process must pass acquired_by =
    1140                 :             :  * 0, and then the other processes sharing that same origin can pass
    1141                 :             :  * acquired_by = PID of the first process.
    1142                 :             :  */
    1143                 :             : void
    1144                 :           0 : replorigin_session_setup(RepOriginId node, int acquired_by)
    1145                 :             : {
    1146                 :             :         static bool registered_cleanup;
    1147                 :           0 :         int                     i;
    1148                 :           0 :         int                     free_slot = -1;
    1149                 :             : 
    1150         [ #  # ]:           0 :         if (!registered_cleanup)
    1151                 :             :         {
    1152                 :           0 :                 on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1153                 :           0 :                 registered_cleanup = true;
    1154                 :           0 :         }
    1155                 :             : 
    1156         [ #  # ]:           0 :         Assert(max_active_replication_origins > 0);
    1157                 :             : 
    1158         [ #  # ]:           0 :         if (session_replication_state != NULL)
    1159   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1160                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1161                 :             :                                  errmsg("cannot setup replication origin when one is already setup")));
    1162                 :             : 
    1163                 :             :         /* Lock exclusively, as we may have to create a new table entry. */
    1164                 :           0 :         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1165                 :             : 
    1166                 :             :         /*
    1167                 :             :          * Search for either an existing slot for the origin, or a free one we can
    1168                 :             :          * use.
    1169                 :             :          */
    1170         [ #  # ]:           0 :         for (i = 0; i < max_active_replication_origins; i++)
    1171                 :             :         {
    1172                 :           0 :                 ReplicationState *curstate = &replication_states[i];
    1173                 :             : 
    1174                 :             :                 /* remember where to insert if necessary */
    1175   [ #  #  #  # ]:           0 :                 if (curstate->roident == InvalidRepOriginId &&
    1176                 :           0 :                         free_slot == -1)
    1177                 :             :                 {
    1178                 :           0 :                         free_slot = i;
    1179                 :           0 :                         continue;
    1180                 :             :                 }
    1181                 :             : 
    1182                 :             :                 /* not our slot */
    1183         [ #  # ]:           0 :                 if (curstate->roident != node)
    1184                 :           0 :                         continue;
    1185                 :             : 
    1186   [ #  #  #  # ]:           0 :                 else if (curstate->acquired_by != 0 && acquired_by == 0)
    1187                 :             :                 {
    1188   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1189                 :             :                                         (errcode(ERRCODE_OBJECT_IN_USE),
    1190                 :             :                                          errmsg("replication origin with ID %d is already active for PID %d",
    1191                 :             :                                                         curstate->roident, curstate->acquired_by)));
    1192                 :           0 :                 }
    1193                 :             : 
    1194         [ #  # ]:           0 :                 else if (curstate->acquired_by != acquired_by)
    1195                 :             :                 {
    1196   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1197                 :             :                                         (errcode(ERRCODE_OBJECT_IN_USE),
    1198                 :             :                                          errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1199                 :             :                                                         node, acquired_by)));
    1200                 :           0 :                 }
    1201                 :             : 
    1202                 :             :                 /*
    1203                 :             :                  * The origin is in use, but PID is not recorded. This can happen if
    1204                 :             :                  * the process that originally acquired the origin exited without
    1205                 :             :                  * releasing it. To ensure correctness, other processes cannot acquire
    1206                 :             :                  * the origin until all processes currently using it have released it.
    1207                 :             :                  */
    1208   [ #  #  #  # ]:           0 :                 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
    1209   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1210                 :             :                                         (errcode(ERRCODE_OBJECT_IN_USE),
    1211                 :             :                                          errmsg("replication origin with ID %d is already active in another process",
    1212                 :             :                                                         curstate->roident)));
    1213                 :             : 
    1214                 :             :                 /* ok, found slot */
    1215                 :           0 :                 session_replication_state = curstate;
    1216                 :           0 :                 break;
    1217      [ #  #  # ]:           0 :         }
    1218                 :             : 
    1219                 :             : 
    1220   [ #  #  #  # ]:           0 :         if (session_replication_state == NULL && free_slot == -1)
    1221   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1222                 :             :                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1223                 :             :                                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1224                 :             :                                                 node),
    1225                 :             :                                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1226         [ #  # ]:           0 :         else if (session_replication_state == NULL)
    1227                 :             :         {
    1228         [ #  # ]:           0 :                 if (acquired_by)
    1229   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1230                 :             :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1231                 :             :                                          errmsg("cannot use PID %d for inactive replication origin with ID %d",
    1232                 :             :                                                         acquired_by, node)));
    1233                 :             : 
    1234                 :             :                 /* initialize new slot */
    1235                 :           0 :                 session_replication_state = &replication_states[free_slot];
    1236         [ #  # ]:           0 :                 Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
    1237         [ #  # ]:           0 :                 Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
    1238                 :           0 :                 session_replication_state->roident = node;
    1239                 :           0 :         }
    1240                 :             : 
    1241                 :             : 
    1242         [ #  # ]:           0 :         Assert(session_replication_state->roident != InvalidRepOriginId);
    1243                 :             : 
    1244         [ #  # ]:           0 :         if (acquired_by == 0)
    1245                 :             :         {
    1246                 :           0 :                 session_replication_state->acquired_by = MyProcPid;
    1247         [ #  # ]:           0 :                 Assert(session_replication_state->refcount == 0);
    1248                 :           0 :         }
    1249                 :             :         else
    1250                 :             :         {
    1251                 :             :                 /*
    1252                 :             :                  * Sanity check: the origin must already be acquired by the process
    1253                 :             :                  * passed as input, and at least one process must be using it.
    1254                 :             :                  */
    1255         [ #  # ]:           0 :                 Assert(session_replication_state->acquired_by == acquired_by);
    1256         [ #  # ]:           0 :                 Assert(session_replication_state->refcount > 0);
    1257                 :             :         }
    1258                 :             : 
    1259                 :           0 :         session_replication_state->refcount++;
    1260                 :             : 
    1261                 :           0 :         LWLockRelease(ReplicationOriginLock);
    1262                 :             : 
    1263                 :             :         /* probably this one is pointless */
    1264                 :           0 :         ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1265                 :           0 : }
    1266                 :             : 
    1267                 :             : /*
    1268                 :             :  * Reset replay state previously setup in this session.
    1269                 :             :  *
    1270                 :             :  * This function may only be called if an origin was setup with
    1271                 :             :  * replorigin_session_setup().
    1272                 :             :  */
    1273                 :             : void
    1274                 :           0 : replorigin_session_reset(void)
    1275                 :             : {
    1276         [ #  # ]:           0 :         Assert(max_active_replication_origins != 0);
    1277                 :             : 
    1278         [ #  # ]:           0 :         if (session_replication_state == NULL)
    1279   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1280                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1281                 :             :                                  errmsg("no replication origin is configured")));
    1282                 :             : 
    1283                 :             :         /*
    1284                 :             :          * Restrict explicit resetting of the replication origin if it was first
    1285                 :             :          * acquired by this process and others are still using it. While the
    1286                 :             :          * system handles this safely (as happens if the first session exits
    1287                 :             :          * without calling reset), it is best to avoid doing so.
    1288                 :             :          */
    1289   [ #  #  #  # ]:           0 :         if (session_replication_state->acquired_by == MyProcPid &&
    1290                 :           0 :                 session_replication_state->refcount > 1)
    1291   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1292                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1293                 :             :                                  errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
    1294                 :             :                                                 session_replication_state->roident),
    1295                 :             :                                  errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
    1296                 :             :                                  errhint("Reset the replication origin in all other processes before retrying.")));
    1297                 :             : 
    1298                 :           0 :         replorigin_session_reset_internal();
    1299                 :           0 : }
    1300                 :             : 
    1301                 :             : /*
    1302                 :             :  * Do the same work replorigin_advance() does, just on the session's
    1303                 :             :  * configured origin.
    1304                 :             :  *
    1305                 :             :  * This is noticeably cheaper than using replorigin_advance().
    1306                 :             :  */
    1307                 :             : void
    1308                 :           0 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1309                 :             : {
    1310         [ #  # ]:           0 :         Assert(session_replication_state != NULL);
    1311         [ #  # ]:           0 :         Assert(session_replication_state->roident != InvalidRepOriginId);
    1312                 :             : 
    1313                 :           0 :         LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1314         [ #  # ]:           0 :         if (session_replication_state->local_lsn < local_commit)
    1315                 :           0 :                 session_replication_state->local_lsn = local_commit;
    1316         [ #  # ]:           0 :         if (session_replication_state->remote_lsn < remote_commit)
    1317                 :           0 :                 session_replication_state->remote_lsn = remote_commit;
    1318                 :           0 :         LWLockRelease(&session_replication_state->lock);
    1319                 :           0 : }
    1320                 :             : 
    1321                 :             : /*
    1322                 :             :  * Ask the machinery about the point up to which we successfully replayed
    1323                 :             :  * changes from an already setup replication origin.
    1324                 :             :  */
    1325                 :             : XLogRecPtr
    1326                 :           0 : replorigin_session_get_progress(bool flush)
    1327                 :             : {
    1328                 :           0 :         XLogRecPtr      remote_lsn;
    1329                 :           0 :         XLogRecPtr      local_lsn;
    1330                 :             : 
    1331         [ #  # ]:           0 :         Assert(session_replication_state != NULL);
    1332                 :             : 
    1333                 :           0 :         LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1334                 :           0 :         remote_lsn = session_replication_state->remote_lsn;
    1335                 :           0 :         local_lsn = session_replication_state->local_lsn;
    1336                 :           0 :         LWLockRelease(&session_replication_state->lock);
    1337                 :             : 
    1338   [ #  #  #  # ]:           0 :         if (flush && XLogRecPtrIsValid(local_lsn))
    1339                 :           0 :                 XLogFlush(local_lsn);
    1340                 :             : 
    1341                 :           0 :         return remote_lsn;
    1342                 :           0 : }
    1343                 :             : 
    1344                 :             : 
    1345                 :             : 
    1346                 :             : /* ---------------------------------------------------------------------------
    1347                 :             :  * SQL functions for working with replication origin.
    1348                 :             :  *
    1349                 :             :  * These mostly should be fairly short wrappers around more generic functions.
    1350                 :             :  * ---------------------------------------------------------------------------
    1351                 :             :  */
    1352                 :             : 
    1353                 :             : /*
    1354                 :             :  * Create replication origin for the passed in name, and return the assigned
    1355                 :             :  * oid.
    1356                 :             :  */
    1357                 :             : Datum
    1358                 :           1 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1359                 :             : {
    1360                 :           1 :         char       *name;
    1361                 :           1 :         RepOriginId roident;
    1362                 :             : 
    1363                 :           1 :         replorigin_check_prerequisites(false, false);
    1364                 :             : 
    1365                 :           1 :         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1366                 :             : 
    1367                 :             :         /*
    1368                 :             :          * Replication origins "any and "none" are reserved for system options.
    1369                 :             :          * The origins "pg_xxx" are reserved for internal use.
    1370                 :             :          */
    1371         [ +  - ]:           1 :         if (IsReservedName(name) || IsReservedOriginName(name))
    1372   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1373                 :             :                                 (errcode(ERRCODE_RESERVED_NAME),
    1374                 :             :                                  errmsg("replication origin name \"%s\" is reserved",
    1375                 :             :                                                 name),
    1376                 :             :                                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1377                 :             :                                                    LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1378                 :             : 
    1379                 :             :         /*
    1380                 :             :          * If built with appropriate switch, whine when regression-testing
    1381                 :             :          * conventions for replication origin names are violated.
    1382                 :             :          */
    1383                 :             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1384                 :             :         if (strncmp(name, "regress_", 8) != 0)
    1385                 :             :                 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1386                 :             : #endif
    1387                 :             : 
    1388                 :           1 :         roident = replorigin_create(name);
    1389                 :             : 
    1390                 :           1 :         pfree(name);
    1391                 :             : 
    1392                 :           2 :         PG_RETURN_OID(roident);
    1393                 :           1 : }
    1394                 :             : 
    1395                 :             : /*
    1396                 :             :  * Drop replication origin.
    1397                 :             :  */
    1398                 :             : Datum
    1399                 :           0 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1400                 :             : {
    1401                 :           0 :         char       *name;
    1402                 :             : 
    1403                 :           0 :         replorigin_check_prerequisites(false, false);
    1404                 :             : 
    1405                 :           0 :         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1406                 :             : 
    1407                 :           0 :         replorigin_drop_by_name(name, false, true);
    1408                 :             : 
    1409                 :           0 :         pfree(name);
    1410                 :             : 
    1411                 :           0 :         PG_RETURN_VOID();
    1412                 :           0 : }
    1413                 :             : 
    1414                 :             : /*
    1415                 :             :  * Return oid of a replication origin.
    1416                 :             :  */
    1417                 :             : Datum
    1418                 :           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1419                 :             : {
    1420                 :           0 :         char       *name;
    1421                 :           0 :         RepOriginId roident;
    1422                 :             : 
    1423                 :           0 :         replorigin_check_prerequisites(false, false);
    1424                 :             : 
    1425                 :           0 :         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1426                 :           0 :         roident = replorigin_by_name(name, true);
    1427                 :             : 
    1428                 :           0 :         pfree(name);
    1429                 :             : 
    1430         [ #  # ]:           0 :         if (OidIsValid(roident))
    1431                 :           0 :                 PG_RETURN_OID(roident);
    1432                 :           0 :         PG_RETURN_NULL();
    1433         [ #  # ]:           0 : }
    1434                 :             : 
    1435                 :             : /*
    1436                 :             :  * Setup a replication origin for this session.
    1437                 :             :  */
    1438                 :             : Datum
    1439                 :           0 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1440                 :             : {
    1441                 :           0 :         char       *name;
    1442                 :           0 :         RepOriginId origin;
    1443                 :           0 :         int                     pid;
    1444                 :             : 
    1445                 :           0 :         replorigin_check_prerequisites(true, false);
    1446                 :             : 
    1447                 :           0 :         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1448                 :           0 :         origin = replorigin_by_name(name, false);
    1449                 :           0 :         pid = PG_GETARG_INT32(1);
    1450                 :           0 :         replorigin_session_setup(origin, pid);
    1451                 :             : 
    1452                 :           0 :         replorigin_session_origin = origin;
    1453                 :             : 
    1454                 :           0 :         pfree(name);
    1455                 :             : 
    1456                 :           0 :         PG_RETURN_VOID();
    1457                 :           0 : }
    1458                 :             : 
    1459                 :             : /*
    1460                 :             :  * Reset previously setup origin in this session
    1461                 :             :  */
    1462                 :             : Datum
    1463                 :           0 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1464                 :             : {
    1465                 :           0 :         replorigin_check_prerequisites(true, false);
    1466                 :             : 
    1467                 :           0 :         replorigin_session_reset();
    1468                 :             : 
    1469                 :           0 :         replorigin_session_origin = InvalidRepOriginId;
    1470                 :           0 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1471                 :           0 :         replorigin_session_origin_timestamp = 0;
    1472                 :             : 
    1473                 :           0 :         PG_RETURN_VOID();
    1474                 :             : }
    1475                 :             : 
    1476                 :             : /*
    1477                 :             :  * Has a replication origin been setup for this session.
    1478                 :             :  */
    1479                 :             : Datum
    1480                 :           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1481                 :             : {
    1482                 :           0 :         replorigin_check_prerequisites(false, false);
    1483                 :             : 
    1484                 :           0 :         PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1485                 :             : }
    1486                 :             : 
    1487                 :             : 
    1488                 :             : /*
    1489                 :             :  * Return the replication progress for origin setup in the current session.
    1490                 :             :  *
    1491                 :             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1492                 :             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1493                 :             :  * commits are used when replaying replicated transactions.
    1494                 :             :  */
    1495                 :             : Datum
    1496                 :           0 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1497                 :             : {
    1498                 :           0 :         XLogRecPtr      remote_lsn = InvalidXLogRecPtr;
    1499                 :           0 :         bool            flush = PG_GETARG_BOOL(0);
    1500                 :             : 
    1501                 :           0 :         replorigin_check_prerequisites(true, false);
    1502                 :             : 
    1503         [ #  # ]:           0 :         if (session_replication_state == NULL)
    1504   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1505                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1506                 :             :                                  errmsg("no replication origin is configured")));
    1507                 :             : 
    1508                 :           0 :         remote_lsn = replorigin_session_get_progress(flush);
    1509                 :             : 
    1510         [ #  # ]:           0 :         if (!XLogRecPtrIsValid(remote_lsn))
    1511                 :           0 :                 PG_RETURN_NULL();
    1512                 :             : 
    1513                 :           0 :         PG_RETURN_LSN(remote_lsn);
    1514                 :           0 : }
    1515                 :             : 
    1516                 :             : Datum
    1517                 :           0 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1518                 :             : {
    1519                 :           0 :         XLogRecPtr      location = PG_GETARG_LSN(0);
    1520                 :             : 
    1521                 :           0 :         replorigin_check_prerequisites(true, false);
    1522                 :             : 
    1523         [ #  # ]:           0 :         if (session_replication_state == NULL)
    1524   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1525                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1526                 :             :                                  errmsg("no replication origin is configured")));
    1527                 :             : 
    1528                 :           0 :         replorigin_session_origin_lsn = location;
    1529                 :           0 :         replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1530                 :             : 
    1531                 :           0 :         PG_RETURN_VOID();
    1532                 :           0 : }
    1533                 :             : 
    1534                 :             : Datum
    1535                 :           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1536                 :             : {
    1537                 :           0 :         replorigin_check_prerequisites(true, false);
    1538                 :             : 
    1539                 :           0 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1540                 :           0 :         replorigin_session_origin_timestamp = 0;
    1541                 :             : 
    1542                 :           0 :         PG_RETURN_VOID();
    1543                 :             : }
    1544                 :             : 
    1545                 :             : 
    1546                 :             : Datum
    1547                 :           0 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1548                 :             : {
    1549                 :           0 :         text       *name = PG_GETARG_TEXT_PP(0);
    1550                 :           0 :         XLogRecPtr      remote_commit = PG_GETARG_LSN(1);
    1551                 :           0 :         RepOriginId node;
    1552                 :             : 
    1553                 :           0 :         replorigin_check_prerequisites(true, false);
    1554                 :             : 
    1555                 :             :         /* lock to prevent the replication origin from vanishing */
    1556                 :           0 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1557                 :             : 
    1558                 :           0 :         node = replorigin_by_name(text_to_cstring(name), false);
    1559                 :             : 
    1560                 :             :         /*
    1561                 :             :          * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1562                 :             :          * xact hasn't committed yet. This is why this function should be used to
    1563                 :             :          * set up the initial replication state, but not for replay.
    1564                 :             :          */
    1565                 :           0 :         replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1566                 :             :                                            true /* go backward */ , true /* WAL log */ );
    1567                 :             : 
    1568                 :           0 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1569                 :             : 
    1570                 :           0 :         PG_RETURN_VOID();
    1571                 :           0 : }
    1572                 :             : 
    1573                 :             : 
    1574                 :             : /*
    1575                 :             :  * Return the replication progress for an individual replication origin.
    1576                 :             :  *
    1577                 :             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1578                 :             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1579                 :             :  * commits are used when replaying replicated transactions.
    1580                 :             :  */
    1581                 :             : Datum
    1582                 :           0 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1583                 :             : {
    1584                 :           0 :         char       *name;
    1585                 :           0 :         bool            flush;
    1586                 :           0 :         RepOriginId roident;
    1587                 :           0 :         XLogRecPtr      remote_lsn = InvalidXLogRecPtr;
    1588                 :             : 
    1589                 :           0 :         replorigin_check_prerequisites(true, true);
    1590                 :             : 
    1591                 :           0 :         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1592                 :           0 :         flush = PG_GETARG_BOOL(1);
    1593                 :             : 
    1594                 :           0 :         roident = replorigin_by_name(name, false);
    1595         [ #  # ]:           0 :         Assert(OidIsValid(roident));
    1596                 :             : 
    1597                 :           0 :         remote_lsn = replorigin_get_progress(roident, flush);
    1598                 :             : 
    1599         [ #  # ]:           0 :         if (!XLogRecPtrIsValid(remote_lsn))
    1600                 :           0 :                 PG_RETURN_NULL();
    1601                 :             : 
    1602                 :           0 :         PG_RETURN_LSN(remote_lsn);
    1603                 :           0 : }
    1604                 :             : 
    1605                 :             : 
    1606                 :             : Datum
    1607                 :           0 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1608                 :             : {
    1609                 :           0 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1610                 :           0 :         int                     i;
    1611                 :             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1612                 :             : 
    1613                 :             :         /* we want to return 0 rows if slot is set to zero */
    1614                 :           0 :         replorigin_check_prerequisites(false, true);
    1615                 :             : 
    1616                 :           0 :         InitMaterializedSRF(fcinfo, 0);
    1617                 :             : 
    1618                 :             :         /* prevent slots from being concurrently dropped */
    1619                 :           0 :         LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1620                 :             : 
    1621                 :             :         /*
    1622                 :             :          * Iterate through all possible replication_states, display if they are
    1623                 :             :          * filled. Note that we do not take any locks, so slightly corrupted/out
    1624                 :             :          * of date values are a possibility.
    1625                 :             :          */
    1626         [ #  # ]:           0 :         for (i = 0; i < max_active_replication_origins; i++)
    1627                 :             :         {
    1628                 :           0 :                 ReplicationState *state;
    1629                 :           0 :                 Datum           values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1630                 :           0 :                 bool            nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1631                 :           0 :                 char       *roname;
    1632                 :             : 
    1633                 :           0 :                 state = &replication_states[i];
    1634                 :             : 
    1635                 :             :                 /* unused slot, nothing to display */
    1636         [ #  # ]:           0 :                 if (state->roident == InvalidRepOriginId)
    1637                 :           0 :                         continue;
    1638                 :             : 
    1639                 :           0 :                 memset(values, 0, sizeof(values));
    1640                 :           0 :                 memset(nulls, 1, sizeof(nulls));
    1641                 :             : 
    1642                 :           0 :                 values[0] = ObjectIdGetDatum(state->roident);
    1643                 :           0 :                 nulls[0] = false;
    1644                 :             : 
    1645                 :             :                 /*
    1646                 :             :                  * We're not preventing the origin to be dropped concurrently, so
    1647                 :             :                  * silently accept that it might be gone.
    1648                 :             :                  */
    1649         [ #  # ]:           0 :                 if (replorigin_by_oid(state->roident, true,
    1650                 :             :                                                           &roname))
    1651                 :             :                 {
    1652                 :           0 :                         values[1] = CStringGetTextDatum(roname);
    1653                 :           0 :                         nulls[1] = false;
    1654                 :           0 :                 }
    1655                 :             : 
    1656                 :           0 :                 LWLockAcquire(&state->lock, LW_SHARED);
    1657                 :             : 
    1658                 :           0 :                 values[2] = LSNGetDatum(state->remote_lsn);
    1659                 :           0 :                 nulls[2] = false;
    1660                 :             : 
    1661                 :           0 :                 values[3] = LSNGetDatum(state->local_lsn);
    1662                 :           0 :                 nulls[3] = false;
    1663                 :             : 
    1664                 :           0 :                 LWLockRelease(&state->lock);
    1665                 :             : 
    1666                 :           0 :                 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1667                 :           0 :                                                          values, nulls);
    1668      [ #  #  # ]:           0 :         }
    1669                 :             : 
    1670                 :           0 :         LWLockRelease(ReplicationOriginLock);
    1671                 :             : 
    1672                 :             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1673                 :             : 
    1674                 :           0 :         return (Datum) 0;
    1675                 :           0 : }
        

Generated by: LCOV version 2.3.2-1