LCOV - code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 3.5 % 714 25
Test Date: 2026-01-26 10:56:24 Functions: 6.2 % 32 2
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 1.2 % 582 7

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * snapbuild.c
       4                 :             :  *
       5                 :             :  *        Infrastructure for building historic catalog snapshots based on contents
       6                 :             :  *        of the WAL, for the purpose of decoding heapam.c style values in the
       7                 :             :  *        WAL.
       8                 :             :  *
       9                 :             :  * NOTES:
      10                 :             :  *
      11                 :             :  * We build snapshots which can *only* be used to read catalog contents and we
      12                 :             :  * do so by reading and interpreting the WAL stream. The aim is to build a
      13                 :             :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
      14                 :             :  * at the time the XLogRecord was generated.
      15                 :             :  *
      16                 :             :  * To build the snapshots we reuse the infrastructure built for Hot
      17                 :             :  * Standby. The in-memory snapshots we build look different than HS' because
      18                 :             :  * we have different needs. To successfully decode data from the WAL we only
      19                 :             :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
      20                 :             :  * tables since the data we decode is wholly contained in the WAL
      21                 :             :  * records. Also, our snapshots need to be different in comparison to normal
      22                 :             :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
      23                 :             :  * pg_subtrans for information about committed transactions because they might
      24                 :             :  * commit in the future from the POV of the WAL entry we're currently
      25                 :             :  * decoding. This definition has the advantage that we only need to prevent
      26                 :             :  * removal of catalog rows, while normal table's rows can still be
      27                 :             :  * removed. This is achieved by using the replication slot mechanism.
      28                 :             :  *
      29                 :             :  * As the percentage of transactions modifying the catalog normally is fairly
      30                 :             :  * small in comparisons to ones only manipulating user data, we keep track of
      31                 :             :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
      32                 :             :  * track of all running transactions like it's done in a normal snapshot. Note
      33                 :             :  * that we're generally only looking at transactions that have acquired an
      34                 :             :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
      35                 :             :  * that we consider committed, everything else is considered aborted/in
      36                 :             :  * progress. That also allows us not to care about subtransactions before they
      37                 :             :  * have committed which means this module, in contrast to HS, doesn't have to
      38                 :             :  * care about suboverflowed subtransactions and similar.
      39                 :             :  *
      40                 :             :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
      41                 :             :  * transactions we need Snapshots that see intermediate versions of the
      42                 :             :  * catalog in a transaction. During normal operation this is achieved by using
      43                 :             :  * CommandIds/cmin/cmax. The problem with that however is that for space
      44                 :             :  * efficiency reasons, the cmin and cmax are not included in WAL records. We
      45                 :             :  * cannot read the cmin/cmax from the tuple itself, either, because it is
      46                 :             :  * reset on crash recovery. Even if we could, we could not decode combocids
      47                 :             :  * which are only tracked in the original backend's memory. To work around
      48                 :             :  * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
      49                 :             :  * catalog row is modified, which includes the cmin and cmax of the
      50                 :             :  * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
      51                 :             :  * reorder buffer, and use them at visibility checks instead of the cmin/cmax
      52                 :             :  * on the tuple itself. Check the reorderbuffer.c's comment above
      53                 :             :  * ResolveCminCmaxDuringDecoding() for details.
      54                 :             :  *
      55                 :             :  * To facilitate all this we need our own visibility routine, as the normal
      56                 :             :  * ones are optimized for different usecases.
      57                 :             :  *
      58                 :             :  * To replace the normal catalog snapshots with decoding ones use the
      59                 :             :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
      60                 :             :  *
      61                 :             :  *
      62                 :             :  *
      63                 :             :  * The snapbuild machinery is starting up in several stages, as illustrated
      64                 :             :  * by the following graph describing the SnapBuild->state transitions:
      65                 :             :  *
      66                 :             :  *                 +-------------------------+
      67                 :             :  *        +----|                 START                   |-------------+
      68                 :             :  *        |    +-------------------------+                         |
      69                 :             :  *        |                                     |                                                  |
      70                 :             :  *        |                                     |                                                  |
      71                 :             :  *        |                running_xacts #1                                        |
      72                 :             :  *        |                                     |                                                  |
      73                 :             :  *        |                                     |                                                  |
      74                 :             :  *        |                                     v                                                  |
      75                 :             :  *        |    +-------------------------+                         v
      76                 :             :  *        |    |   BUILDING_SNAPSHOT     |------------>|
      77                 :             :  *        |    +-------------------------+                         |
      78                 :             :  *        |                                     |                                                  |
      79                 :             :  *        |                                     |                                                  |
      80                 :             :  *        | running_xacts #2, xacts from #1 finished   |
      81                 :             :  *        |                                     |                                                  |
      82                 :             :  *        |                                     |                                                  |
      83                 :             :  *        |                                     v                                                  |
      84                 :             :  *        |    +-------------------------+                         v
      85                 :             :  *        |    |           FULL_SNAPSHOT         |------------>|
      86                 :             :  *        |    +-------------------------+                         |
      87                 :             :  *        |                                     |                                                  |
      88                 :             :  * running_xacts                |                                          saved snapshot
      89                 :             :  * with zero xacts              |                                 at running_xacts's lsn
      90                 :             :  *        |                                     |                                                  |
      91                 :             :  *        | running_xacts with xacts from #2 finished  |
      92                 :             :  *        |                                     |                                                  |
      93                 :             :  *        |                                     v                                                  |
      94                 :             :  *        |    +-------------------------+                         |
      95                 :             :  *        +--->|SNAPBUILD_CONSISTENT  |<------------+
      96                 :             :  *                 +-------------------------+
      97                 :             :  *
      98                 :             :  * Initially the machinery is in the START stage. When an xl_running_xacts
      99                 :             :  * record is read that is sufficiently new (above the safe xmin horizon),
     100                 :             :  * there's a state transition. If there were no running xacts when the
     101                 :             :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
     102                 :             :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
     103                 :             :  * snapshot means that all transactions that start henceforth can be decoded
     104                 :             :  * in their entirety, but transactions that started previously can't. In
     105                 :             :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
     106                 :             :  * running transactions have committed or aborted.
     107                 :             :  *
     108                 :             :  * Only transactions that commit after CONSISTENT state has been reached will
     109                 :             :  * be replayed, even though they might have started while still in
     110                 :             :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
     111                 :             :  * changes has been exported, but all the following ones will be. That point
     112                 :             :  * is a convenient point to initialize replication from, which is why we
     113                 :             :  * export a snapshot at that point, which *can* be used to read normal data.
     114                 :             :  *
     115                 :             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
     116                 :             :  *
     117                 :             :  * IDENTIFICATION
     118                 :             :  *        src/backend/replication/logical/snapbuild.c
     119                 :             :  *
     120                 :             :  *-------------------------------------------------------------------------
     121                 :             :  */
     122                 :             : 
     123                 :             : #include "postgres.h"
     124                 :             : 
     125                 :             : #include <sys/stat.h>
     126                 :             : #include <unistd.h>
     127                 :             : 
     128                 :             : #include "access/heapam_xlog.h"
     129                 :             : #include "access/transam.h"
     130                 :             : #include "access/xact.h"
     131                 :             : #include "common/file_utils.h"
     132                 :             : #include "miscadmin.h"
     133                 :             : #include "pgstat.h"
     134                 :             : #include "replication/logical.h"
     135                 :             : #include "replication/reorderbuffer.h"
     136                 :             : #include "replication/snapbuild.h"
     137                 :             : #include "replication/snapbuild_internal.h"
     138                 :             : #include "storage/fd.h"
     139                 :             : #include "storage/lmgr.h"
     140                 :             : #include "storage/proc.h"
     141                 :             : #include "storage/procarray.h"
     142                 :             : #include "storage/standby.h"
     143                 :             : #include "utils/builtins.h"
     144                 :             : #include "utils/memutils.h"
     145                 :             : #include "utils/snapmgr.h"
     146                 :             : #include "utils/snapshot.h"
     147                 :             : /*
     148                 :             :  * Starting a transaction -- which we need to do while exporting a snapshot --
     149                 :             :  * removes knowledge about the previously used resowner, so we save it here.
     150                 :             :  */
     151                 :             : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
     152                 :             : static bool ExportInProgress = false;
     153                 :             : 
     154                 :             : /* ->committed and ->catchange manipulation */
     155                 :             : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
     156                 :             : 
     157                 :             : /* snapshot building/manipulation/distribution functions */
     158                 :             : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
     159                 :             : 
     160                 :             : static void SnapBuildFreeSnapshot(Snapshot snap);
     161                 :             : 
     162                 :             : static void SnapBuildSnapIncRefcount(Snapshot snap);
     163                 :             : 
     164                 :             : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
     165                 :             : 
     166                 :             : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
     167                 :             :                                                                                                  uint32 xinfo);
     168                 :             : 
     169                 :             : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
     170                 :             : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
     171                 :             : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
     172                 :             : 
     173                 :             : /* serialization functions */
     174                 :             : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
     175                 :             : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
     176                 :             : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
     177                 :             : 
     178                 :             : /*
     179                 :             :  * Allocate a new snapshot builder.
     180                 :             :  *
     181                 :             :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
     182                 :             :  * removed, start_lsn is the LSN >= we want to replay commits.
     183                 :             :  */
     184                 :             : SnapBuild *
     185                 :           0 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
     186                 :             :                                                 TransactionId xmin_horizon,
     187                 :             :                                                 XLogRecPtr start_lsn,
     188                 :             :                                                 bool need_full_snapshot,
     189                 :             :                                                 bool in_slot_creation,
     190                 :             :                                                 XLogRecPtr two_phase_at)
     191                 :             : {
     192                 :           0 :         MemoryContext context;
     193                 :           0 :         MemoryContext oldcontext;
     194                 :           0 :         SnapBuild  *builder;
     195                 :             : 
     196                 :             :         /* allocate memory in own context, to have better accountability */
     197                 :           0 :         context = AllocSetContextCreate(CurrentMemoryContext,
     198                 :             :                                                                         "snapshot builder context",
     199                 :             :                                                                         ALLOCSET_DEFAULT_SIZES);
     200                 :           0 :         oldcontext = MemoryContextSwitchTo(context);
     201                 :             : 
     202                 :           0 :         builder = palloc0_object(SnapBuild);
     203                 :             : 
     204                 :           0 :         builder->state = SNAPBUILD_START;
     205                 :           0 :         builder->context = context;
     206                 :           0 :         builder->reorder = reorder;
     207                 :             :         /* Other struct members initialized by zeroing via palloc0 above */
     208                 :             : 
     209                 :           0 :         builder->committed.xcnt = 0;
     210                 :           0 :         builder->committed.xcnt_space = 128; /* arbitrary number */
     211                 :           0 :         builder->committed.xip =
     212                 :           0 :                 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
     213                 :           0 :         builder->committed.includes_all_transactions = true;
     214                 :             : 
     215                 :           0 :         builder->catchange.xcnt = 0;
     216                 :           0 :         builder->catchange.xip = NULL;
     217                 :             : 
     218                 :           0 :         builder->initial_xmin_horizon = xmin_horizon;
     219                 :           0 :         builder->start_decoding_at = start_lsn;
     220                 :           0 :         builder->in_slot_creation = in_slot_creation;
     221                 :           0 :         builder->building_full_snapshot = need_full_snapshot;
     222                 :           0 :         builder->two_phase_at = two_phase_at;
     223                 :             : 
     224                 :           0 :         MemoryContextSwitchTo(oldcontext);
     225                 :             : 
     226                 :           0 :         return builder;
     227                 :           0 : }
     228                 :             : 
     229                 :             : /*
     230                 :             :  * Free a snapshot builder.
     231                 :             :  */
     232                 :             : void
     233                 :           0 : FreeSnapshotBuilder(SnapBuild *builder)
     234                 :             : {
     235                 :           0 :         MemoryContext context = builder->context;
     236                 :             : 
     237                 :             :         /* free snapshot explicitly, that contains some error checking */
     238         [ #  # ]:           0 :         if (builder->snapshot != NULL)
     239                 :             :         {
     240                 :           0 :                 SnapBuildSnapDecRefcount(builder->snapshot);
     241                 :           0 :                 builder->snapshot = NULL;
     242                 :           0 :         }
     243                 :             : 
     244                 :             :         /* other resources are deallocated via memory context reset */
     245                 :           0 :         MemoryContextDelete(context);
     246                 :           0 : }
     247                 :             : 
     248                 :             : /*
     249                 :             :  * Free an unreferenced snapshot that has previously been built by us.
     250                 :             :  */
     251                 :             : static void
     252                 :           0 : SnapBuildFreeSnapshot(Snapshot snap)
     253                 :             : {
     254                 :             :         /* make sure we don't get passed an external snapshot */
     255         [ #  # ]:           0 :         Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     256                 :             : 
     257                 :             :         /* make sure nobody modified our snapshot */
     258         [ #  # ]:           0 :         Assert(snap->curcid == FirstCommandId);
     259         [ #  # ]:           0 :         Assert(!snap->suboverflowed);
     260         [ #  # ]:           0 :         Assert(!snap->takenDuringRecovery);
     261         [ #  # ]:           0 :         Assert(snap->regd_count == 0);
     262                 :             : 
     263                 :             :         /* slightly more likely, so it's checked even without c-asserts */
     264         [ #  # ]:           0 :         if (snap->copied)
     265   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot free a copied snapshot");
     266                 :             : 
     267         [ #  # ]:           0 :         if (snap->active_count)
     268   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot free an active snapshot");
     269                 :             : 
     270                 :           0 :         pfree(snap);
     271                 :           0 : }
     272                 :             : 
     273                 :             : /*
     274                 :             :  * In which state of snapshot building are we?
     275                 :             :  */
     276                 :             : SnapBuildState
     277                 :           0 : SnapBuildCurrentState(SnapBuild *builder)
     278                 :             : {
     279                 :           0 :         return builder->state;
     280                 :             : }
     281                 :             : 
     282                 :             : /*
     283                 :             :  * Return the LSN at which the two-phase decoding was first enabled.
     284                 :             :  */
     285                 :             : XLogRecPtr
     286                 :           0 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
     287                 :             : {
     288                 :           0 :         return builder->two_phase_at;
     289                 :             : }
     290                 :             : 
     291                 :             : /*
     292                 :             :  * Set the LSN at which two-phase decoding is enabled.
     293                 :             :  */
     294                 :             : void
     295                 :           0 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
     296                 :             : {
     297                 :           0 :         builder->two_phase_at = ptr;
     298                 :           0 : }
     299                 :             : 
     300                 :             : /*
     301                 :             :  * Should the contents of transaction ending at 'ptr' be decoded?
     302                 :             :  */
     303                 :             : bool
     304                 :           0 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
     305                 :             : {
     306                 :           0 :         return ptr < builder->start_decoding_at;
     307                 :             : }
     308                 :             : 
     309                 :             : /*
     310                 :             :  * Increase refcount of a snapshot.
     311                 :             :  *
     312                 :             :  * This is used when handing out a snapshot to some external resource or when
     313                 :             :  * adding a Snapshot as builder->snapshot.
     314                 :             :  */
     315                 :             : static void
     316                 :           0 : SnapBuildSnapIncRefcount(Snapshot snap)
     317                 :             : {
     318                 :           0 :         snap->active_count++;
     319                 :           0 : }
     320                 :             : 
     321                 :             : /*
     322                 :             :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
     323                 :             :  *
     324                 :             :  * Externally visible, so that external resources that have been handed an
     325                 :             :  * IncRef'ed Snapshot can adjust its refcount easily.
     326                 :             :  */
     327                 :             : void
     328                 :           0 : SnapBuildSnapDecRefcount(Snapshot snap)
     329                 :             : {
     330                 :             :         /* make sure we don't get passed an external snapshot */
     331         [ #  # ]:           0 :         Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     332                 :             : 
     333                 :             :         /* make sure nobody modified our snapshot */
     334         [ #  # ]:           0 :         Assert(snap->curcid == FirstCommandId);
     335         [ #  # ]:           0 :         Assert(!snap->suboverflowed);
     336         [ #  # ]:           0 :         Assert(!snap->takenDuringRecovery);
     337                 :             : 
     338         [ #  # ]:           0 :         Assert(snap->regd_count == 0);
     339                 :             : 
     340         [ #  # ]:           0 :         Assert(snap->active_count > 0);
     341                 :             : 
     342                 :             :         /* slightly more likely, so it's checked even without casserts */
     343         [ #  # ]:           0 :         if (snap->copied)
     344   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot free a copied snapshot");
     345                 :             : 
     346                 :           0 :         snap->active_count--;
     347         [ #  # ]:           0 :         if (snap->active_count == 0)
     348                 :           0 :                 SnapBuildFreeSnapshot(snap);
     349                 :           0 : }
     350                 :             : 
     351                 :             : /*
     352                 :             :  * Build a new snapshot, based on currently committed catalog-modifying
     353                 :             :  * transactions.
     354                 :             :  *
     355                 :             :  * In-progress transactions with catalog access are *not* allowed to modify
     356                 :             :  * these snapshots; they have to copy them and fill in appropriate ->curcid
     357                 :             :  * and ->subxip/subxcnt values.
     358                 :             :  */
     359                 :             : static Snapshot
     360                 :           0 : SnapBuildBuildSnapshot(SnapBuild *builder)
     361                 :             : {
     362                 :           0 :         Snapshot        snapshot;
     363                 :           0 :         Size            ssize;
     364                 :             : 
     365         [ #  # ]:           0 :         Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
     366                 :             : 
     367                 :           0 :         ssize = sizeof(SnapshotData)
     368                 :           0 :                 + sizeof(TransactionId) * builder->committed.xcnt
     369                 :           0 :                 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
     370                 :             : 
     371                 :           0 :         snapshot = MemoryContextAllocZero(builder->context, ssize);
     372                 :             : 
     373                 :           0 :         snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
     374                 :             : 
     375                 :             :         /*
     376                 :             :          * We misuse the original meaning of SnapshotData's xip and subxip fields
     377                 :             :          * to make the more fitting for our needs.
     378                 :             :          *
     379                 :             :          * In the 'xip' array we store transactions that have to be treated as
     380                 :             :          * committed. Since we will only ever look at tuples from transactions
     381                 :             :          * that have modified the catalog it's more efficient to store those few
     382                 :             :          * that exist between xmin and xmax (frequently there are none).
     383                 :             :          *
     384                 :             :          * Snapshots that are used in transactions that have modified the catalog
     385                 :             :          * also use the 'subxip' array to store their toplevel xid and all the
     386                 :             :          * subtransaction xids so we can recognize when we need to treat rows as
     387                 :             :          * visible that are not in xip but still need to be visible. Subxip only
     388                 :             :          * gets filled when the transaction is copied into the context of a
     389                 :             :          * catalog modifying transaction since we otherwise share a snapshot
     390                 :             :          * between transactions. As long as a txn hasn't modified the catalog it
     391                 :             :          * doesn't need to treat any uncommitted rows as visible, so there is no
     392                 :             :          * need for those xids.
     393                 :             :          *
     394                 :             :          * Both arrays are qsort'ed so that we can use bsearch() on them.
     395                 :             :          */
     396         [ #  # ]:           0 :         Assert(TransactionIdIsNormal(builder->xmin));
     397         [ #  # ]:           0 :         Assert(TransactionIdIsNormal(builder->xmax));
     398                 :             : 
     399                 :           0 :         snapshot->xmin = builder->xmin;
     400                 :           0 :         snapshot->xmax = builder->xmax;
     401                 :             : 
     402                 :             :         /* store all transactions to be treated as committed by this snapshot */
     403                 :           0 :         snapshot->xip =
     404                 :           0 :                 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
     405                 :           0 :         snapshot->xcnt = builder->committed.xcnt;
     406                 :           0 :         memcpy(snapshot->xip,
     407                 :             :                    builder->committed.xip,
     408                 :             :                    builder->committed.xcnt * sizeof(TransactionId));
     409                 :             : 
     410                 :             :         /* sort so we can bsearch() */
     411                 :           0 :         qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
     412                 :             : 
     413                 :             :         /*
     414                 :             :          * Initially, subxip is empty, i.e. it's a snapshot to be used by
     415                 :             :          * transactions that don't modify the catalog. Will be filled by
     416                 :             :          * ReorderBufferCopySnap() if necessary.
     417                 :             :          */
     418                 :           0 :         snapshot->subxcnt = 0;
     419                 :           0 :         snapshot->subxip = NULL;
     420                 :             : 
     421                 :           0 :         snapshot->suboverflowed = false;
     422                 :           0 :         snapshot->takenDuringRecovery = false;
     423                 :           0 :         snapshot->copied = false;
     424                 :           0 :         snapshot->curcid = FirstCommandId;
     425                 :           0 :         snapshot->active_count = 0;
     426                 :           0 :         snapshot->regd_count = 0;
     427                 :           0 :         snapshot->snapXactCompletionCount = 0;
     428                 :             : 
     429                 :           0 :         return snapshot;
     430                 :           0 : }
     431                 :             : 
     432                 :             : /*
     433                 :             :  * Build the initial slot snapshot and convert it to a normal snapshot that
     434                 :             :  * is understood by HeapTupleSatisfiesMVCC.
     435                 :             :  *
     436                 :             :  * The snapshot will be usable directly in current transaction or exported
     437                 :             :  * for loading in different transaction.
     438                 :             :  */
     439                 :             : Snapshot
     440                 :           0 : SnapBuildInitialSnapshot(SnapBuild *builder)
     441                 :             : {
     442                 :           0 :         Snapshot        snap;
     443                 :           0 :         TransactionId xid;
     444                 :           0 :         TransactionId safeXid;
     445                 :           0 :         TransactionId *newxip;
     446                 :           0 :         int                     newxcnt = 0;
     447                 :             : 
     448         [ #  # ]:           0 :         Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     449         [ #  # ]:           0 :         Assert(builder->building_full_snapshot);
     450                 :             : 
     451                 :             :         /* don't allow older snapshots */
     452                 :           0 :         InvalidateCatalogSnapshot();    /* about to overwrite MyProc->xmin */
     453         [ #  # ]:           0 :         if (HaveRegisteredOrActiveSnapshot())
     454   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
     455         [ #  # ]:           0 :         Assert(!HistoricSnapshotActive());
     456                 :             : 
     457         [ #  # ]:           0 :         if (builder->state != SNAPBUILD_CONSISTENT)
     458   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
     459                 :             : 
     460         [ #  # ]:           0 :         if (!builder->committed.includes_all_transactions)
     461   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
     462                 :             : 
     463                 :             :         /* so we don't overwrite the existing value */
     464         [ #  # ]:           0 :         if (TransactionIdIsValid(MyProc->xmin))
     465   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
     466                 :             : 
     467                 :           0 :         snap = SnapBuildBuildSnapshot(builder);
     468                 :             : 
     469                 :             :         /*
     470                 :             :          * We know that snap->xmin is alive, enforced by the logical xmin
     471                 :             :          * mechanism. Due to that we can do this without locks, we're only
     472                 :             :          * changing our own value.
     473                 :             :          *
     474                 :             :          * Building an initial snapshot is expensive and an unenforced xmin
     475                 :             :          * horizon would have bad consequences, therefore always double-check that
     476                 :             :          * the horizon is enforced.
     477                 :             :          */
     478                 :           0 :         LWLockAcquire(ProcArrayLock, LW_SHARED);
     479                 :           0 :         safeXid = GetOldestSafeDecodingTransactionId(false);
     480                 :           0 :         LWLockRelease(ProcArrayLock);
     481                 :             : 
     482         [ #  # ]:           0 :         if (TransactionIdFollows(safeXid, snap->xmin))
     483   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
     484                 :             :                          safeXid, snap->xmin);
     485                 :             : 
     486                 :           0 :         MyProc->xmin = snap->xmin;
     487                 :             : 
     488                 :             :         /* allocate in transaction context */
     489                 :           0 :         newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
     490                 :             : 
     491                 :             :         /*
     492                 :             :          * snapbuild.c builds transactions in an "inverted" manner, which means it
     493                 :             :          * stores committed transactions in ->xip, not ones in progress. Build a
     494                 :             :          * classical snapshot by marking all non-committed transactions as
     495                 :             :          * in-progress. This can be expensive.
     496                 :             :          */
     497   [ #  #  #  # ]:           0 :         for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
     498                 :             :         {
     499                 :           0 :                 void       *test;
     500                 :             : 
     501                 :             :                 /*
     502                 :             :                  * Check whether transaction committed using the decoding snapshot
     503                 :             :                  * meaning of ->xip.
     504                 :             :                  */
     505                 :           0 :                 test = bsearch(&xid, snap->xip, snap->xcnt,
     506                 :             :                                            sizeof(TransactionId), xidComparator);
     507                 :             : 
     508         [ #  # ]:           0 :                 if (test == NULL)
     509                 :             :                 {
     510         [ #  # ]:           0 :                         if (newxcnt >= GetMaxSnapshotXidCount())
     511   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
     512                 :             :                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     513                 :             :                                                  errmsg("initial slot snapshot too large")));
     514                 :             : 
     515                 :           0 :                         newxip[newxcnt++] = xid;
     516                 :           0 :                 }
     517                 :             : 
     518         [ #  # ]:           0 :                 TransactionIdAdvance(xid);
     519                 :           0 :         }
     520                 :             : 
     521                 :             :         /* adjust remaining snapshot fields as needed */
     522                 :           0 :         snap->snapshot_type = SNAPSHOT_MVCC;
     523                 :           0 :         snap->xcnt = newxcnt;
     524                 :           0 :         snap->xip = newxip;
     525                 :             : 
     526                 :           0 :         return snap;
     527                 :           0 : }
     528                 :             : 
     529                 :             : /*
     530                 :             :  * Export a snapshot so it can be set in another session with SET TRANSACTION
     531                 :             :  * SNAPSHOT.
     532                 :             :  *
     533                 :             :  * For that we need to start a transaction in the current backend as the
     534                 :             :  * importing side checks whether the source transaction is still open to make
     535                 :             :  * sure the xmin horizon hasn't advanced since then.
     536                 :             :  */
     537                 :             : const char *
     538                 :           0 : SnapBuildExportSnapshot(SnapBuild *builder)
     539                 :             : {
     540                 :           0 :         Snapshot        snap;
     541                 :           0 :         char       *snapname;
     542                 :             : 
     543         [ #  # ]:           0 :         if (IsTransactionOrTransactionBlock())
     544   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot export a snapshot from within a transaction");
     545                 :             : 
     546         [ #  # ]:           0 :         if (SavedResourceOwnerDuringExport)
     547   [ #  #  #  # ]:           0 :                 elog(ERROR, "can only export one snapshot at a time");
     548                 :             : 
     549                 :           0 :         SavedResourceOwnerDuringExport = CurrentResourceOwner;
     550                 :           0 :         ExportInProgress = true;
     551                 :             : 
     552                 :           0 :         StartTransactionCommand();
     553                 :             : 
     554                 :             :         /* There doesn't seem to a nice API to set these */
     555                 :           0 :         XactIsoLevel = XACT_REPEATABLE_READ;
     556                 :           0 :         XactReadOnly = true;
     557                 :             : 
     558                 :           0 :         snap = SnapBuildInitialSnapshot(builder);
     559                 :             : 
     560                 :             :         /*
     561                 :             :          * now that we've built a plain snapshot, make it active and use the
     562                 :             :          * normal mechanisms for exporting it
     563                 :             :          */
     564                 :           0 :         snapname = ExportSnapshot(snap);
     565                 :             : 
     566   [ #  #  #  # ]:           0 :         ereport(LOG,
     567                 :             :                         (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
     568                 :             :                                                    "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
     569                 :             :                                                    snap->xcnt,
     570                 :             :                                                    snapname, snap->xcnt)));
     571                 :           0 :         return snapname;
     572                 :           0 : }
     573                 :             : 
     574                 :             : /*
     575                 :             :  * Ensure there is a snapshot and if not build one for current transaction.
     576                 :             :  */
     577                 :             : Snapshot
     578                 :           0 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     579                 :             : {
     580         [ #  # ]:           0 :         Assert(builder->state == SNAPBUILD_CONSISTENT);
     581                 :             : 
     582                 :             :         /* only build a new snapshot if we don't have a prebuilt one */
     583         [ #  # ]:           0 :         if (builder->snapshot == NULL)
     584                 :             :         {
     585                 :           0 :                 builder->snapshot = SnapBuildBuildSnapshot(builder);
     586                 :             :                 /* increase refcount for the snapshot builder */
     587                 :           0 :                 SnapBuildSnapIncRefcount(builder->snapshot);
     588                 :           0 :         }
     589                 :             : 
     590                 :           0 :         return builder->snapshot;
     591                 :             : }
     592                 :             : 
     593                 :             : /*
     594                 :             :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
     595                 :             :  * any. Aborts the previously started transaction and resets the resource
     596                 :             :  * owner back to its original value.
     597                 :             :  */
     598                 :             : void
     599                 :           0 : SnapBuildClearExportedSnapshot(void)
     600                 :             : {
     601                 :           0 :         ResourceOwner tmpResOwner;
     602                 :             : 
     603                 :             :         /* nothing exported, that is the usual case */
     604         [ #  # ]:           0 :         if (!ExportInProgress)
     605                 :           0 :                 return;
     606                 :             : 
     607         [ #  # ]:           0 :         if (!IsTransactionState())
     608   [ #  #  #  # ]:           0 :                 elog(ERROR, "clearing exported snapshot in wrong transaction state");
     609                 :             : 
     610                 :             :         /*
     611                 :             :          * AbortCurrentTransaction() takes care of resetting the snapshot state,
     612                 :             :          * so remember SavedResourceOwnerDuringExport.
     613                 :             :          */
     614                 :           0 :         tmpResOwner = SavedResourceOwnerDuringExport;
     615                 :             : 
     616                 :             :         /* make sure nothing could have ever happened */
     617                 :           0 :         AbortCurrentTransaction();
     618                 :             : 
     619                 :           0 :         CurrentResourceOwner = tmpResOwner;
     620         [ #  # ]:           0 : }
     621                 :             : 
     622                 :             : /*
     623                 :             :  * Clear snapshot export state during transaction abort.
     624                 :             :  */
     625                 :             : void
     626                 :        7016 : SnapBuildResetExportedSnapshotState(void)
     627                 :             : {
     628                 :        7016 :         SavedResourceOwnerDuringExport = NULL;
     629                 :        7016 :         ExportInProgress = false;
     630                 :        7016 : }
     631                 :             : 
     632                 :             : /*
     633                 :             :  * Handle the effects of a single heap change, appropriate to the current state
     634                 :             :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
     635                 :             :  * be decoded.
     636                 :             :  */
     637                 :             : bool
     638                 :           0 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
     639                 :             : {
     640                 :             :         /*
     641                 :             :          * We can't handle data in transactions if we haven't built a snapshot
     642                 :             :          * yet, so don't store them.
     643                 :             :          */
     644         [ #  # ]:           0 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
     645                 :           0 :                 return false;
     646                 :             : 
     647                 :             :         /*
     648                 :             :          * No point in keeping track of changes in transactions that we don't have
     649                 :             :          * enough information about to decode. This means that they started before
     650                 :             :          * we got into the SNAPBUILD_FULL_SNAPSHOT state.
     651                 :             :          */
     652   [ #  #  #  # ]:           0 :         if (builder->state < SNAPBUILD_CONSISTENT &&
     653                 :           0 :                 TransactionIdPrecedes(xid, builder->next_phase_at))
     654                 :           0 :                 return false;
     655                 :             : 
     656                 :             :         /*
     657                 :             :          * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
     658                 :             :          * be needed to decode the change we're currently processing.
     659                 :             :          */
     660         [ #  # ]:           0 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
     661                 :             :         {
     662                 :             :                 /* only build a new snapshot if we don't have a prebuilt one */
     663         [ #  # ]:           0 :                 if (builder->snapshot == NULL)
     664                 :             :                 {
     665                 :           0 :                         builder->snapshot = SnapBuildBuildSnapshot(builder);
     666                 :             :                         /* increase refcount for the snapshot builder */
     667                 :           0 :                         SnapBuildSnapIncRefcount(builder->snapshot);
     668                 :           0 :                 }
     669                 :             : 
     670                 :             :                 /*
     671                 :             :                  * Increase refcount for the transaction we're handing the snapshot
     672                 :             :                  * out to.
     673                 :             :                  */
     674                 :           0 :                 SnapBuildSnapIncRefcount(builder->snapshot);
     675                 :           0 :                 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
     676                 :           0 :                                                                          builder->snapshot);
     677                 :           0 :         }
     678                 :             : 
     679                 :           0 :         return true;
     680                 :           0 : }
     681                 :             : 
     682                 :             : /*
     683                 :             :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
     684                 :             :  * This implies that a transaction has done some form of write to system
     685                 :             :  * catalogs.
     686                 :             :  */
     687                 :             : void
     688                 :           0 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
     689                 :             :                                            XLogRecPtr lsn, xl_heap_new_cid *xlrec)
     690                 :             : {
     691                 :           0 :         CommandId       cid;
     692                 :             : 
     693                 :             :         /*
     694                 :             :          * we only log new_cid's if a catalog tuple was modified, so mark the
     695                 :             :          * transaction as containing catalog modifications
     696                 :             :          */
     697                 :           0 :         ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
     698                 :             : 
     699                 :           0 :         ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
     700                 :           0 :                                                                  xlrec->target_locator, xlrec->target_tid,
     701                 :           0 :                                                                  xlrec->cmin, xlrec->cmax,
     702                 :           0 :                                                                  xlrec->combocid);
     703                 :             : 
     704                 :             :         /* figure out new command id */
     705   [ #  #  #  # ]:           0 :         if (xlrec->cmin != InvalidCommandId &&
     706                 :           0 :                 xlrec->cmax != InvalidCommandId)
     707         [ #  # ]:           0 :                 cid = Max(xlrec->cmin, xlrec->cmax);
     708         [ #  # ]:           0 :         else if (xlrec->cmax != InvalidCommandId)
     709                 :           0 :                 cid = xlrec->cmax;
     710         [ #  # ]:           0 :         else if (xlrec->cmin != InvalidCommandId)
     711                 :           0 :                 cid = xlrec->cmin;
     712                 :             :         else
     713                 :             :         {
     714                 :           0 :                 cid = InvalidCommandId; /* silence compiler */
     715   [ #  #  #  # ]:           0 :                 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
     716                 :             :         }
     717                 :             : 
     718                 :           0 :         ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
     719                 :           0 : }
     720                 :             : 
     721                 :             : /*
     722                 :             :  * Add a new Snapshot and invalidation messages to all transactions we're
     723                 :             :  * decoding that currently are in-progress so they can see new catalog contents
     724                 :             :  * made by the transaction that just committed. This is necessary because those
     725                 :             :  * in-progress transactions will use the new catalog's contents from here on
     726                 :             :  * (at the very least everything they do needs to be compatible with newer
     727                 :             :  * catalog contents).
     728                 :             :  */
     729                 :             : static void
     730                 :           0 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
     731                 :             : {
     732                 :           0 :         dlist_iter      txn_i;
     733                 :           0 :         ReorderBufferTXN *txn;
     734                 :             : 
     735                 :             :         /*
     736                 :             :          * Iterate through all toplevel transactions. This can include
     737                 :             :          * subtransactions which we just don't yet know to be that, but that's
     738                 :             :          * fine, they will just get an unnecessary snapshot and invalidations
     739                 :             :          * queued.
     740                 :             :          */
     741   [ #  #  #  # ]:           0 :         dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
     742                 :             :         {
     743                 :           0 :                 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
     744                 :             : 
     745         [ #  # ]:           0 :                 Assert(TransactionIdIsValid(txn->xid));
     746                 :             : 
     747                 :             :                 /*
     748                 :             :                  * If we don't have a base snapshot yet, there are no changes in this
     749                 :             :                  * transaction which in turn implies we don't yet need a snapshot at
     750                 :             :                  * all. We'll add a snapshot when the first change gets queued.
     751                 :             :                  *
     752                 :             :                  * Similarly, we don't need to add invalidations to a transaction
     753                 :             :                  * whose base snapshot is not yet set. Once a base snapshot is built,
     754                 :             :                  * it will include the xids of committed transactions that have
     755                 :             :                  * modified the catalog, thus reflecting the new catalog contents. The
     756                 :             :                  * existing catalog cache will have already been invalidated after
     757                 :             :                  * processing the invalidations in the transaction that modified
     758                 :             :                  * catalogs, ensuring that a fresh cache is constructed during
     759                 :             :                  * decoding.
     760                 :             :                  *
     761                 :             :                  * NB: This works correctly even for subtransactions because
     762                 :             :                  * ReorderBufferAssignChild() takes care to transfer the base snapshot
     763                 :             :                  * to the top-level transaction, and while iterating the changequeue
     764                 :             :                  * we'll get the change from the subtxn.
     765                 :             :                  */
     766         [ #  # ]:           0 :                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
     767                 :           0 :                         continue;
     768                 :             : 
     769                 :             :                 /*
     770                 :             :                  * We don't need to add snapshot or invalidations to prepared
     771                 :             :                  * transactions as they should not see the new catalog contents.
     772                 :             :                  */
     773         [ #  # ]:           0 :                 if (rbtxn_is_prepared(txn))
     774                 :           0 :                         continue;
     775                 :             : 
     776   [ #  #  #  # ]:           0 :                 elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
     777                 :             :                          txn->xid, LSN_FORMAT_ARGS(lsn));
     778                 :             : 
     779                 :             :                 /*
     780                 :             :                  * increase the snapshot's refcount for the transaction we are handing
     781                 :             :                  * it out to
     782                 :             :                  */
     783                 :           0 :                 SnapBuildSnapIncRefcount(builder->snapshot);
     784                 :           0 :                 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
     785                 :           0 :                                                                  builder->snapshot);
     786                 :             : 
     787                 :             :                 /*
     788                 :             :                  * Add invalidation messages to the reorder buffer of in-progress
     789                 :             :                  * transactions except the current committed transaction, for which we
     790                 :             :                  * will execute invalidations at the end.
     791                 :             :                  *
     792                 :             :                  * It is required, otherwise, we will end up using the stale catcache
     793                 :             :                  * contents built by the current transaction even after its decoding,
     794                 :             :                  * which should have been invalidated due to concurrent catalog
     795                 :             :                  * changing transaction.
     796                 :             :                  *
     797                 :             :                  * Distribute only the invalidation messages generated by the current
     798                 :             :                  * committed transaction. Invalidation messages received from other
     799                 :             :                  * transactions would have already been propagated to the relevant
     800                 :             :                  * in-progress transactions. This transaction would have processed
     801                 :             :                  * those invalidations, ensuring that subsequent transactions observe
     802                 :             :                  * a consistent cache state.
     803                 :             :                  */
     804         [ #  # ]:           0 :                 if (txn->xid != xid)
     805                 :             :                 {
     806                 :           0 :                         uint32          ninvalidations;
     807                 :           0 :                         SharedInvalidationMessage *msgs = NULL;
     808                 :             : 
     809                 :           0 :                         ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
     810                 :           0 :                                                                                                                    xid, &msgs);
     811                 :             : 
     812         [ #  # ]:           0 :                         if (ninvalidations > 0)
     813                 :             :                         {
     814         [ #  # ]:           0 :                                 Assert(msgs != NULL);
     815                 :             : 
     816                 :           0 :                                 ReorderBufferAddDistributedInvalidations(builder->reorder,
     817                 :           0 :                                                                                                                  txn->xid, lsn,
     818                 :           0 :                                                                                                                  ninvalidations, msgs);
     819                 :           0 :                         }
     820                 :           0 :                 }
     821                 :           0 :         }
     822                 :           0 : }
     823                 :             : 
     824                 :             : /*
     825                 :             :  * Keep track of a new catalog changing transaction that has committed.
     826                 :             :  */
     827                 :             : static void
     828                 :           0 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
     829                 :             : {
     830         [ #  # ]:           0 :         Assert(TransactionIdIsValid(xid));
     831                 :             : 
     832         [ #  # ]:           0 :         if (builder->committed.xcnt == builder->committed.xcnt_space)
     833                 :             :         {
     834                 :           0 :                 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
     835                 :             : 
     836   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "increasing space for committed transactions to %u",
     837                 :             :                          (uint32) builder->committed.xcnt_space);
     838                 :             : 
     839                 :           0 :                 builder->committed.xip = repalloc(builder->committed.xip,
     840                 :           0 :                                                                                   builder->committed.xcnt_space * sizeof(TransactionId));
     841                 :           0 :         }
     842                 :             : 
     843                 :             :         /*
     844                 :             :          * TODO: It might make sense to keep the array sorted here instead of
     845                 :             :          * doing it every time we build a new snapshot. On the other hand this
     846                 :             :          * gets called repeatedly when a transaction with subtransactions commits.
     847                 :             :          */
     848                 :           0 :         builder->committed.xip[builder->committed.xcnt++] = xid;
     849                 :           0 : }
     850                 :             : 
     851                 :             : /*
     852                 :             :  * Remove knowledge about transactions we treat as committed or containing catalog
     853                 :             :  * changes that are smaller than ->xmin. Those won't ever get checked via
     854                 :             :  * the ->committed or ->catchange array, respectively. The committed xids will
     855                 :             :  * get checked via the clog machinery.
     856                 :             :  *
     857                 :             :  * We can ideally remove the transaction from catchange array once it is
     858                 :             :  * finished (committed/aborted) but that could be costly as we need to maintain
     859                 :             :  * the xids order in the array.
     860                 :             :  */
     861                 :             : static void
     862                 :           0 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
     863                 :             : {
     864                 :           0 :         int                     off;
     865                 :           0 :         TransactionId *workspace;
     866                 :           0 :         int                     surviving_xids = 0;
     867                 :             : 
     868                 :             :         /* not ready yet */
     869         [ #  # ]:           0 :         if (!TransactionIdIsNormal(builder->xmin))
     870                 :           0 :                 return;
     871                 :             : 
     872                 :             :         /* TODO: Neater algorithm than just copying and iterating? */
     873                 :           0 :         workspace =
     874                 :           0 :                 MemoryContextAlloc(builder->context,
     875                 :           0 :                                                    builder->committed.xcnt * sizeof(TransactionId));
     876                 :             : 
     877                 :             :         /* copy xids that still are interesting to workspace */
     878         [ #  # ]:           0 :         for (off = 0; off < builder->committed.xcnt; off++)
     879                 :             :         {
     880   [ #  #  #  # ]:           0 :                 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
     881                 :             :                                                                                 builder->xmin))
     882                 :             :                         ;                                       /* remove */
     883                 :             :                 else
     884                 :           0 :                         workspace[surviving_xids++] = builder->committed.xip[off];
     885                 :           0 :         }
     886                 :             : 
     887                 :             :         /* copy workspace back to persistent state */
     888                 :           0 :         memcpy(builder->committed.xip, workspace,
     889                 :             :                    surviving_xids * sizeof(TransactionId));
     890                 :             : 
     891   [ #  #  #  # ]:           0 :         elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
     892                 :             :                  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
     893                 :             :                  builder->xmin, builder->xmax);
     894                 :           0 :         builder->committed.xcnt = surviving_xids;
     895                 :             : 
     896                 :           0 :         pfree(workspace);
     897                 :             : 
     898                 :             :         /*
     899                 :             :          * Purge xids in ->catchange as well. The purged array must also be sorted
     900                 :             :          * in xidComparator order.
     901                 :             :          */
     902         [ #  # ]:           0 :         if (builder->catchange.xcnt > 0)
     903                 :             :         {
     904                 :             :                 /*
     905                 :             :                  * Since catchange.xip is sorted, we find the lower bound of xids that
     906                 :             :                  * are still interesting.
     907                 :             :                  */
     908         [ #  # ]:           0 :                 for (off = 0; off < builder->catchange.xcnt; off++)
     909                 :             :                 {
     910   [ #  #  #  # ]:           0 :                         if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
     911                 :           0 :                                                                                          builder->xmin))
     912                 :           0 :                                 break;
     913                 :           0 :                 }
     914                 :             : 
     915                 :           0 :                 surviving_xids = builder->catchange.xcnt - off;
     916                 :             : 
     917         [ #  # ]:           0 :                 if (surviving_xids > 0)
     918                 :             :                 {
     919                 :           0 :                         memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
     920                 :             :                                         surviving_xids * sizeof(TransactionId));
     921                 :           0 :                 }
     922                 :             :                 else
     923                 :             :                 {
     924                 :           0 :                         pfree(builder->catchange.xip);
     925                 :           0 :                         builder->catchange.xip = NULL;
     926                 :             :                 }
     927                 :             : 
     928   [ #  #  #  # ]:           0 :                 elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
     929                 :             :                          (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
     930                 :             :                          builder->xmin, builder->xmax);
     931                 :           0 :                 builder->catchange.xcnt = surviving_xids;
     932                 :           0 :         }
     933         [ #  # ]:           0 : }
     934                 :             : 
     935                 :             : /*
     936                 :             :  * Handle everything that needs to be done when a transaction commits
     937                 :             :  */
     938                 :             : void
     939                 :           0 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
     940                 :             :                                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
     941                 :             : {
     942                 :           0 :         int                     nxact;
     943                 :             : 
     944                 :           0 :         bool            needs_snapshot = false;
     945                 :           0 :         bool            needs_timetravel = false;
     946                 :           0 :         bool            sub_needs_timetravel = false;
     947                 :             : 
     948                 :           0 :         TransactionId xmax = xid;
     949                 :             : 
     950                 :             :         /*
     951                 :             :          * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
     952                 :             :          * will they be part of a snapshot.  So we don't need to record anything.
     953                 :             :          */
     954   [ #  #  #  # ]:           0 :         if (builder->state == SNAPBUILD_START ||
     955         [ #  # ]:           0 :                 (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
     956                 :           0 :                  TransactionIdPrecedes(xid, builder->next_phase_at)))
     957                 :             :         {
     958                 :             :                 /* ensure that only commits after this are getting replayed */
     959         [ #  # ]:           0 :                 if (builder->start_decoding_at <= lsn)
     960                 :           0 :                         builder->start_decoding_at = lsn + 1;
     961                 :           0 :                 return;
     962                 :             :         }
     963                 :             : 
     964         [ #  # ]:           0 :         if (builder->state < SNAPBUILD_CONSISTENT)
     965                 :             :         {
     966                 :             :                 /* ensure that only commits after this are getting replayed */
     967         [ #  # ]:           0 :                 if (builder->start_decoding_at <= lsn)
     968                 :           0 :                         builder->start_decoding_at = lsn + 1;
     969                 :             : 
     970                 :             :                 /*
     971                 :             :                  * If building an exportable snapshot, force xid to be tracked, even
     972                 :             :                  * if the transaction didn't modify the catalog.
     973                 :             :                  */
     974         [ #  # ]:           0 :                 if (builder->building_full_snapshot)
     975                 :             :                 {
     976                 :           0 :                         needs_timetravel = true;
     977                 :           0 :                 }
     978                 :           0 :         }
     979                 :             : 
     980         [ #  # ]:           0 :         for (nxact = 0; nxact < nsubxacts; nxact++)
     981                 :             :         {
     982                 :           0 :                 TransactionId subxid = subxacts[nxact];
     983                 :             : 
     984                 :             :                 /*
     985                 :             :                  * Add subtransaction to base snapshot if catalog modifying, we don't
     986                 :             :                  * distinguish to toplevel transactions there.
     987                 :             :                  */
     988         [ #  # ]:           0 :                 if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
     989                 :             :                 {
     990                 :           0 :                         sub_needs_timetravel = true;
     991                 :           0 :                         needs_snapshot = true;
     992                 :             : 
     993   [ #  #  #  # ]:           0 :                         elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
     994                 :             :                                  xid, subxid);
     995                 :             : 
     996                 :           0 :                         SnapBuildAddCommittedTxn(builder, subxid);
     997                 :             : 
     998   [ #  #  #  # ]:           0 :                         if (NormalTransactionIdFollows(subxid, xmax))
     999                 :           0 :                                 xmax = subxid;
    1000                 :           0 :                 }
    1001                 :             : 
    1002                 :             :                 /*
    1003                 :             :                  * If we're forcing timetravel we also need visibility information
    1004                 :             :                  * about subtransaction, so keep track of subtransaction's state, even
    1005                 :             :                  * if not catalog modifying.  Don't need to distribute a snapshot in
    1006                 :             :                  * that case.
    1007                 :             :                  */
    1008         [ #  # ]:           0 :                 else if (needs_timetravel)
    1009                 :             :                 {
    1010                 :           0 :                         SnapBuildAddCommittedTxn(builder, subxid);
    1011   [ #  #  #  # ]:           0 :                         if (NormalTransactionIdFollows(subxid, xmax))
    1012                 :           0 :                                 xmax = subxid;
    1013                 :           0 :                 }
    1014                 :           0 :         }
    1015                 :             : 
    1016                 :             :         /* if top-level modified catalog, it'll need a snapshot */
    1017         [ #  # ]:           0 :         if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    1018                 :             :         {
    1019   [ #  #  #  # ]:           0 :                 elog(DEBUG2, "found top level transaction %u, with catalog changes",
    1020                 :             :                          xid);
    1021                 :           0 :                 needs_snapshot = true;
    1022                 :           0 :                 needs_timetravel = true;
    1023                 :           0 :                 SnapBuildAddCommittedTxn(builder, xid);
    1024                 :           0 :         }
    1025         [ #  # ]:           0 :         else if (sub_needs_timetravel)
    1026                 :             :         {
    1027                 :             :                 /* track toplevel txn as well, subxact alone isn't meaningful */
    1028   [ #  #  #  # ]:           0 :                 elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
    1029                 :             :                          xid);
    1030                 :           0 :                 needs_timetravel = true;
    1031                 :           0 :                 SnapBuildAddCommittedTxn(builder, xid);
    1032                 :           0 :         }
    1033         [ #  # ]:           0 :         else if (needs_timetravel)
    1034                 :             :         {
    1035   [ #  #  #  # ]:           0 :                 elog(DEBUG2, "forced transaction %u to do timetravel", xid);
    1036                 :             : 
    1037                 :           0 :                 SnapBuildAddCommittedTxn(builder, xid);
    1038                 :           0 :         }
    1039                 :             : 
    1040         [ #  # ]:           0 :         if (!needs_timetravel)
    1041                 :             :         {
    1042                 :             :                 /* record that we cannot export a general snapshot anymore */
    1043                 :           0 :                 builder->committed.includes_all_transactions = false;
    1044                 :           0 :         }
    1045                 :             : 
    1046   [ #  #  #  # ]:           0 :         Assert(!needs_snapshot || needs_timetravel);
    1047                 :             : 
    1048                 :             :         /*
    1049                 :             :          * Adjust xmax of the snapshot builder, we only do that for committed,
    1050                 :             :          * catalog modifying, transactions, everything else isn't interesting for
    1051                 :             :          * us since we'll never look at the respective rows.
    1052                 :             :          */
    1053   [ #  #  #  # ]:           0 :         if (needs_timetravel &&
    1054         [ #  # ]:           0 :                 (!TransactionIdIsValid(builder->xmax) ||
    1055                 :           0 :                  TransactionIdFollowsOrEquals(xmax, builder->xmax)))
    1056                 :             :         {
    1057                 :           0 :                 builder->xmax = xmax;
    1058         [ #  # ]:           0 :                 TransactionIdAdvance(builder->xmax);
    1059                 :           0 :         }
    1060                 :             : 
    1061                 :             :         /* if there's any reason to build a historic snapshot, do so now */
    1062         [ #  # ]:           0 :         if (needs_snapshot)
    1063                 :             :         {
    1064                 :             :                 /*
    1065                 :             :                  * If we haven't built a complete snapshot yet there's no need to hand
    1066                 :             :                  * it out, it wouldn't (and couldn't) be used anyway.
    1067                 :             :                  */
    1068         [ #  # ]:           0 :                 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
    1069                 :           0 :                         return;
    1070                 :             : 
    1071                 :             :                 /*
    1072                 :             :                  * Decrease the snapshot builder's refcount of the old snapshot, note
    1073                 :             :                  * that it still will be used if it has been handed out to the
    1074                 :             :                  * reorderbuffer earlier.
    1075                 :             :                  */
    1076         [ #  # ]:           0 :                 if (builder->snapshot)
    1077                 :           0 :                         SnapBuildSnapDecRefcount(builder->snapshot);
    1078                 :             : 
    1079                 :           0 :                 builder->snapshot = SnapBuildBuildSnapshot(builder);
    1080                 :             : 
    1081                 :             :                 /* we might need to execute invalidations, add snapshot */
    1082         [ #  # ]:           0 :                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
    1083                 :             :                 {
    1084                 :           0 :                         SnapBuildSnapIncRefcount(builder->snapshot);
    1085                 :           0 :                         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
    1086                 :           0 :                                                                                  builder->snapshot);
    1087                 :           0 :                 }
    1088                 :             : 
    1089                 :             :                 /* refcount of the snapshot builder for the new snapshot */
    1090                 :           0 :                 SnapBuildSnapIncRefcount(builder->snapshot);
    1091                 :             : 
    1092                 :             :                 /*
    1093                 :             :                  * Add a new catalog snapshot and invalidations messages to all
    1094                 :             :                  * currently running transactions.
    1095                 :             :                  */
    1096                 :           0 :                 SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
    1097                 :           0 :         }
    1098         [ #  # ]:           0 : }
    1099                 :             : 
    1100                 :             : /*
    1101                 :             :  * Check the reorder buffer and the snapshot to see if the given transaction has
    1102                 :             :  * modified catalogs.
    1103                 :             :  */
    1104                 :             : static inline bool
    1105                 :           0 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
    1106                 :             :                                                           uint32 xinfo)
    1107                 :             : {
    1108         [ #  # ]:           0 :         if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
    1109                 :           0 :                 return true;
    1110                 :             : 
    1111                 :             :         /*
    1112                 :             :          * The transactions that have changed catalogs must have invalidation
    1113                 :             :          * info.
    1114                 :             :          */
    1115         [ #  # ]:           0 :         if (!(xinfo & XACT_XINFO_HAS_INVALS))
    1116                 :           0 :                 return false;
    1117                 :             : 
    1118                 :             :         /* Check the catchange XID array */
    1119         [ #  # ]:           0 :         return ((builder->catchange.xcnt > 0) &&
    1120                 :           0 :                         (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
    1121                 :           0 :                                          sizeof(TransactionId), xidComparator) != NULL));
    1122                 :           0 : }
    1123                 :             : 
    1124                 :             : /* -----------------------------------
    1125                 :             :  * Snapshot building functions dealing with xlog records
    1126                 :             :  * -----------------------------------
    1127                 :             :  */
    1128                 :             : 
    1129                 :             : /*
    1130                 :             :  * Process a running xacts record, and use its information to first build a
    1131                 :             :  * historic snapshot and later to release resources that aren't needed
    1132                 :             :  * anymore.
    1133                 :             :  */
    1134                 :             : void
    1135                 :           0 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1136                 :             : {
    1137                 :           0 :         ReorderBufferTXN *txn;
    1138                 :           0 :         TransactionId xmin;
    1139                 :             : 
    1140                 :             :         /*
    1141                 :             :          * If we're not consistent yet, inspect the record to see whether it
    1142                 :             :          * allows to get closer to being consistent. If we are consistent, dump
    1143                 :             :          * our snapshot so others or we, after a restart, can use it.
    1144                 :             :          */
    1145         [ #  # ]:           0 :         if (builder->state < SNAPBUILD_CONSISTENT)
    1146                 :             :         {
    1147                 :             :                 /* returns false if there's no point in performing cleanup just yet */
    1148         [ #  # ]:           0 :                 if (!SnapBuildFindSnapshot(builder, lsn, running))
    1149                 :           0 :                         return;
    1150                 :           0 :         }
    1151                 :             :         else
    1152                 :           0 :                 SnapBuildSerialize(builder, lsn);
    1153                 :             : 
    1154                 :             :         /*
    1155                 :             :          * Update range of interesting xids based on the running xacts
    1156                 :             :          * information. We don't increase ->xmax using it, because once we are in
    1157                 :             :          * a consistent state we can do that ourselves and much more efficiently
    1158                 :             :          * so, because we only need to do it for catalog transactions since we
    1159                 :             :          * only ever look at those.
    1160                 :             :          *
    1161                 :             :          * NB: We only increase xmax when a catalog modifying transaction commits
    1162                 :             :          * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
    1163                 :             :          * xmin, which looks odd but is correct and actually more efficient, since
    1164                 :             :          * we hit fast paths in heapam_visibility.c.
    1165                 :             :          */
    1166                 :           0 :         builder->xmin = running->oldestRunningXid;
    1167                 :             : 
    1168                 :             :         /* Remove transactions we don't need to keep track off anymore */
    1169                 :           0 :         SnapBuildPurgeOlderTxn(builder);
    1170                 :             : 
    1171                 :             :         /*
    1172                 :             :          * Advance the xmin limit for the current replication slot, to allow
    1173                 :             :          * vacuum to clean up the tuples this slot has been protecting.
    1174                 :             :          *
    1175                 :             :          * The reorderbuffer might have an xmin among the currently running
    1176                 :             :          * snapshots; use it if so.  If not, we need only consider the snapshots
    1177                 :             :          * we'll produce later, which can't be less than the oldest running xid in
    1178                 :             :          * the record we're reading now.
    1179                 :             :          */
    1180                 :           0 :         xmin = ReorderBufferGetOldestXmin(builder->reorder);
    1181         [ #  # ]:           0 :         if (xmin == InvalidTransactionId)
    1182                 :           0 :                 xmin = running->oldestRunningXid;
    1183   [ #  #  #  # ]:           0 :         elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
    1184                 :             :                  builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
    1185                 :           0 :         LogicalIncreaseXminForSlot(lsn, xmin);
    1186                 :             : 
    1187                 :             :         /*
    1188                 :             :          * Also tell the slot where we can restart decoding from. We don't want to
    1189                 :             :          * do that after every commit because changing that implies an fsync of
    1190                 :             :          * the logical slot's state file, so we only do it every time we see a
    1191                 :             :          * running xacts record.
    1192                 :             :          *
    1193                 :             :          * Do so by looking for the oldest in progress transaction (determined by
    1194                 :             :          * the first LSN of any of its relevant records). Every transaction
    1195                 :             :          * remembers the last location we stored the snapshot to disk before its
    1196                 :             :          * beginning. That point is where we can restart from.
    1197                 :             :          */
    1198                 :             : 
    1199                 :             :         /*
    1200                 :             :          * Can't know about a serialized snapshot's location if we're not
    1201                 :             :          * consistent.
    1202                 :             :          */
    1203         [ #  # ]:           0 :         if (builder->state < SNAPBUILD_CONSISTENT)
    1204                 :           0 :                 return;
    1205                 :             : 
    1206                 :           0 :         txn = ReorderBufferGetOldestTXN(builder->reorder);
    1207                 :             : 
    1208                 :             :         /*
    1209                 :             :          * oldest ongoing txn might have started when we didn't yet serialize
    1210                 :             :          * anything because we hadn't reached a consistent state yet.
    1211                 :             :          */
    1212   [ #  #  #  # ]:           0 :         if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
    1213                 :           0 :                 LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    1214                 :             : 
    1215                 :             :         /*
    1216                 :             :          * No in-progress transaction, can reuse the last serialized snapshot if
    1217                 :             :          * we have one.
    1218                 :             :          */
    1219         [ #  # ]:           0 :         else if (txn == NULL &&
    1220   [ #  #  #  # ]:           0 :                          XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
    1221                 :           0 :                          XLogRecPtrIsValid(builder->last_serialized_snapshot))
    1222                 :           0 :                 LogicalIncreaseRestartDecodingForSlot(lsn,
    1223                 :           0 :                                                                                           builder->last_serialized_snapshot);
    1224         [ #  # ]:           0 : }
    1225                 :             : 
    1226                 :             : 
    1227                 :             : /*
    1228                 :             :  * Build the start of a snapshot that's capable of decoding the catalog.
    1229                 :             :  *
    1230                 :             :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
    1231                 :             :  * consistent.
    1232                 :             :  *
    1233                 :             :  * Returns true if there is a point in performing internal maintenance/cleanup
    1234                 :             :  * using the xl_running_xacts record.
    1235                 :             :  */
    1236                 :             : static bool
    1237                 :           0 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1238                 :             : {
    1239                 :             :         /* ---
    1240                 :             :          * Build catalog decoding snapshot incrementally using information about
    1241                 :             :          * the currently running transactions. There are several ways to do that:
    1242                 :             :          *
    1243                 :             :          * a) There were no running transactions when the xl_running_xacts record
    1244                 :             :          *        was inserted, jump to CONSISTENT immediately. We might find such a
    1245                 :             :          *        state while waiting on c)'s sub-states.
    1246                 :             :          *
    1247                 :             :          * b) This (in a previous run) or another decoding slot serialized a
    1248                 :             :          *        snapshot to disk that we can use. Can't use this method while finding
    1249                 :             :          *        the start point for decoding changes as the restart LSN would be an
    1250                 :             :          *        arbitrary LSN but we need to find the start point to extract changes
    1251                 :             :          *        where we won't see the data for partial transactions. Also, we cannot
    1252                 :             :          *        use this method when a slot needs a full snapshot for export or direct
    1253                 :             :          *        use, as that snapshot will only contain catalog modifying transactions.
    1254                 :             :          *
    1255                 :             :          * c) First incrementally build a snapshot for catalog tuples
    1256                 :             :          *        (BUILDING_SNAPSHOT), that requires all, already in-progress,
    1257                 :             :          *        transactions to finish.  Every transaction starting after that
    1258                 :             :          *        (FULL_SNAPSHOT state), has enough information to be decoded.  But
    1259                 :             :          *        for older running transactions no viable snapshot exists yet, so
    1260                 :             :          *        CONSISTENT will only be reached once all of those have finished.
    1261                 :             :          * ---
    1262                 :             :          */
    1263                 :             : 
    1264                 :             :         /*
    1265                 :             :          * xl_running_xacts record is older than what we can use, we might not
    1266                 :             :          * have all necessary catalog rows anymore.
    1267                 :             :          */
    1268   [ #  #  #  # ]:           0 :         if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
    1269         [ #  # ]:           0 :                 NormalTransactionIdPrecedes(running->oldestRunningXid,
    1270                 :             :                                                                         builder->initial_xmin_horizon))
    1271                 :             :         {
    1272   [ #  #  #  # ]:           0 :                 ereport(DEBUG1,
    1273                 :             :                                 errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
    1274                 :             :                                                                 LSN_FORMAT_ARGS(lsn)),
    1275                 :             :                                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
    1276                 :             :                                                                    builder->initial_xmin_horizon, running->oldestRunningXid));
    1277                 :             : 
    1278                 :             : 
    1279                 :           0 :                 SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
    1280                 :             : 
    1281                 :           0 :                 return true;
    1282                 :             :         }
    1283                 :             : 
    1284                 :             :         /*
    1285                 :             :          * a) No transaction were running, we can jump to consistent.
    1286                 :             :          *
    1287                 :             :          * This is not affected by races around xl_running_xacts, because we can
    1288                 :             :          * miss transaction commits, but currently not transactions starting.
    1289                 :             :          *
    1290                 :             :          * NB: We might have already started to incrementally assemble a snapshot,
    1291                 :             :          * so we need to be careful to deal with that.
    1292                 :             :          */
    1293         [ #  # ]:           0 :         if (running->oldestRunningXid == running->nextXid)
    1294                 :             :         {
    1295   [ #  #  #  # ]:           0 :                 if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
    1296                 :           0 :                         builder->start_decoding_at <= lsn)
    1297                 :             :                         /* can decode everything after this */
    1298                 :           0 :                         builder->start_decoding_at = lsn + 1;
    1299                 :             : 
    1300                 :             :                 /* As no transactions were running xmin/xmax can be trivially set. */
    1301                 :           0 :                 builder->xmin = running->nextXid; /* < are finished */
    1302                 :           0 :                 builder->xmax = running->nextXid; /* >= are running */
    1303                 :             : 
    1304                 :             :                 /* so we can safely use the faster comparisons */
    1305         [ #  # ]:           0 :                 Assert(TransactionIdIsNormal(builder->xmin));
    1306         [ #  # ]:           0 :                 Assert(TransactionIdIsNormal(builder->xmax));
    1307                 :             : 
    1308                 :           0 :                 builder->state = SNAPBUILD_CONSISTENT;
    1309                 :           0 :                 builder->next_phase_at = InvalidTransactionId;
    1310                 :             : 
    1311   [ #  #  #  # ]:           0 :                 ereport(LOG,
    1312                 :             :                                 errmsg("logical decoding found consistent point at %X/%08X",
    1313                 :             :                                            LSN_FORMAT_ARGS(lsn)),
    1314                 :             :                                 errdetail("There are no running transactions."));
    1315                 :             : 
    1316                 :           0 :                 return false;
    1317                 :             :         }
    1318                 :             : 
    1319                 :             :         /*
    1320                 :             :          * b) valid on disk state and while neither building full snapshot nor
    1321                 :             :          * creating a slot.
    1322                 :             :          */
    1323         [ #  # ]:           0 :         else if (!builder->building_full_snapshot &&
    1324   [ #  #  #  # ]:           0 :                          !builder->in_slot_creation &&
    1325                 :           0 :                          SnapBuildRestore(builder, lsn))
    1326                 :             :         {
    1327                 :             :                 /* there won't be any state to cleanup */
    1328                 :           0 :                 return false;
    1329                 :             :         }
    1330                 :             : 
    1331                 :             :         /*
    1332                 :             :          * c) transition from START to BUILDING_SNAPSHOT.
    1333                 :             :          *
    1334                 :             :          * In START state, and a xl_running_xacts record with running xacts is
    1335                 :             :          * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
    1336                 :             :          * record xl_running_xacts->nextXid.  Once all running xacts have finished
    1337                 :             :          * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
    1338                 :             :          * might look that we could use xl_running_xacts's ->xids information to
    1339                 :             :          * get there quicker, but that is problematic because transactions marked
    1340                 :             :          * as running, might already have inserted their commit record - it's
    1341                 :             :          * infeasible to change that with locking.
    1342                 :             :          */
    1343         [ #  # ]:           0 :         else if (builder->state == SNAPBUILD_START)
    1344                 :             :         {
    1345                 :           0 :                 builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
    1346                 :           0 :                 builder->next_phase_at = running->nextXid;
    1347                 :             : 
    1348                 :             :                 /*
    1349                 :             :                  * Start with an xmin/xmax that's correct for future, when all the
    1350                 :             :                  * currently running transactions have finished. We'll update both
    1351                 :             :                  * while waiting for the pending transactions to finish.
    1352                 :             :                  */
    1353                 :           0 :                 builder->xmin = running->nextXid; /* < are finished */
    1354                 :           0 :                 builder->xmax = running->nextXid; /* >= are running */
    1355                 :             : 
    1356                 :             :                 /* so we can safely use the faster comparisons */
    1357         [ #  # ]:           0 :                 Assert(TransactionIdIsNormal(builder->xmin));
    1358         [ #  # ]:           0 :                 Assert(TransactionIdIsNormal(builder->xmax));
    1359                 :             : 
    1360   [ #  #  #  # ]:           0 :                 ereport(LOG,
    1361                 :             :                                 errmsg("logical decoding found initial starting point at %X/%08X",
    1362                 :             :                                            LSN_FORMAT_ARGS(lsn)),
    1363                 :             :                                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1364                 :             :                                                   running->xcnt, running->nextXid));
    1365                 :             : 
    1366                 :           0 :                 SnapBuildWaitSnapshot(running, running->nextXid);
    1367                 :           0 :         }
    1368                 :             : 
    1369                 :             :         /*
    1370                 :             :          * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
    1371                 :             :          *
    1372                 :             :          * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
    1373                 :             :          * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
    1374                 :             :          * means all transactions starting afterwards have enough information to
    1375                 :             :          * be decoded.  Switch to FULL_SNAPSHOT.
    1376                 :             :          */
    1377   [ #  #  #  # ]:           0 :         else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1378                 :           0 :                          TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1379                 :           0 :                                                                                    running->oldestRunningXid))
    1380                 :             :         {
    1381                 :           0 :                 builder->state = SNAPBUILD_FULL_SNAPSHOT;
    1382                 :           0 :                 builder->next_phase_at = running->nextXid;
    1383                 :             : 
    1384   [ #  #  #  # ]:           0 :                 ereport(LOG,
    1385                 :             :                                 errmsg("logical decoding found initial consistent point at %X/%08X",
    1386                 :             :                                            LSN_FORMAT_ARGS(lsn)),
    1387                 :             :                                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1388                 :             :                                                   running->xcnt, running->nextXid));
    1389                 :             : 
    1390                 :           0 :                 SnapBuildWaitSnapshot(running, running->nextXid);
    1391                 :           0 :         }
    1392                 :             : 
    1393                 :             :         /*
    1394                 :             :          * c) transition from FULL_SNAPSHOT to CONSISTENT.
    1395                 :             :          *
    1396                 :             :          * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
    1397                 :             :          * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
    1398                 :             :          * transactions that are currently in progress have a catalog snapshot,
    1399                 :             :          * and all their changes have been collected.  Switch to CONSISTENT.
    1400                 :             :          */
    1401   [ #  #  #  # ]:           0 :         else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
    1402                 :           0 :                          TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1403                 :           0 :                                                                                    running->oldestRunningXid))
    1404                 :             :         {
    1405                 :           0 :                 builder->state = SNAPBUILD_CONSISTENT;
    1406                 :           0 :                 builder->next_phase_at = InvalidTransactionId;
    1407                 :             : 
    1408   [ #  #  #  # ]:           0 :                 ereport(LOG,
    1409                 :             :                                 errmsg("logical decoding found consistent point at %X/%08X",
    1410                 :             :                                            LSN_FORMAT_ARGS(lsn)),
    1411                 :             :                                 errdetail("There are no old transactions anymore."));
    1412                 :           0 :         }
    1413                 :             : 
    1414                 :             :         /*
    1415                 :             :          * We already started to track running xacts and need to wait for all
    1416                 :             :          * in-progress ones to finish. We fall through to the normal processing of
    1417                 :             :          * records so incremental cleanup can be performed.
    1418                 :             :          */
    1419                 :           0 :         return true;
    1420                 :           0 : }
    1421                 :             : 
    1422                 :             : /* ---
    1423                 :             :  * Iterate through xids in record, wait for all older than the cutoff to
    1424                 :             :  * finish.  Then, if possible, log a new xl_running_xacts record.
    1425                 :             :  *
    1426                 :             :  * This isn't required for the correctness of decoding, but to:
    1427                 :             :  * a) allow isolationtester to notice that we're currently waiting for
    1428                 :             :  *        something.
    1429                 :             :  * b) log a new xl_running_xacts record where it'd be helpful, without having
    1430                 :             :  *        to wait for bgwriter or checkpointer.
    1431                 :             :  * ---
    1432                 :             :  */
    1433                 :             : static void
    1434                 :           0 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
    1435                 :             : {
    1436                 :           0 :         int                     off;
    1437                 :             : 
    1438         [ #  # ]:           0 :         for (off = 0; off < running->xcnt; off++)
    1439                 :             :         {
    1440                 :           0 :                 TransactionId xid = running->xids[off];
    1441                 :             : 
    1442                 :             :                 /*
    1443                 :             :                  * Upper layers should prevent that we ever need to wait on ourselves.
    1444                 :             :                  * Check anyway, since failing to do so would either result in an
    1445                 :             :                  * endless wait or an Assert() failure.
    1446                 :             :                  */
    1447         [ #  # ]:           0 :                 if (TransactionIdIsCurrentTransactionId(xid))
    1448   [ #  #  #  # ]:           0 :                         elog(ERROR, "waiting for ourselves");
    1449                 :             : 
    1450         [ #  # ]:           0 :                 if (TransactionIdFollows(xid, cutoff))
    1451                 :           0 :                         continue;
    1452                 :             : 
    1453                 :           0 :                 XactLockTableWait(xid, NULL, NULL, XLTW_None);
    1454      [ #  #  # ]:           0 :         }
    1455                 :             : 
    1456                 :             :         /*
    1457                 :             :          * All transactions we needed to finish finished - try to ensure there is
    1458                 :             :          * another xl_running_xacts record in a timely manner, without having to
    1459                 :             :          * wait for bgwriter or checkpointer to log one.  During recovery we can't
    1460                 :             :          * enforce that, so we'll have to wait.
    1461                 :             :          */
    1462         [ #  # ]:           0 :         if (!RecoveryInProgress())
    1463                 :             :         {
    1464                 :           0 :                 LogStandbySnapshot();
    1465                 :           0 :         }
    1466                 :           0 : }
    1467                 :             : 
    1468                 :             : #define SnapBuildOnDiskConstantSize \
    1469                 :             :         offsetof(SnapBuildOnDisk, builder)
    1470                 :             : #define SnapBuildOnDiskNotChecksummedSize \
    1471                 :             :         offsetof(SnapBuildOnDisk, version)
    1472                 :             : 
    1473                 :             : #define SNAPBUILD_MAGIC 0x51A1E001
    1474                 :             : #define SNAPBUILD_VERSION 6
    1475                 :             : 
    1476                 :             : /*
    1477                 :             :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
    1478                 :             :  *
    1479                 :             :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
    1480                 :             :  * a record that's a potential location for a serialized snapshot.
    1481                 :             :  */
    1482                 :             : void
    1483                 :           0 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
    1484                 :             : {
    1485         [ #  # ]:           0 :         if (builder->state < SNAPBUILD_CONSISTENT)
    1486                 :           0 :                 SnapBuildRestore(builder, lsn);
    1487                 :             :         else
    1488                 :           0 :                 SnapBuildSerialize(builder, lsn);
    1489                 :           0 : }
    1490                 :             : 
    1491                 :             : /*
    1492                 :             :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
    1493                 :             :  * been done by another decoding process.
    1494                 :             :  */
    1495                 :             : static void
    1496                 :           0 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    1497                 :             : {
    1498                 :           0 :         Size            needed_length;
    1499                 :           0 :         SnapBuildOnDisk *ondisk = NULL;
    1500                 :           0 :         TransactionId *catchange_xip = NULL;
    1501                 :           0 :         MemoryContext old_ctx;
    1502                 :           0 :         size_t          catchange_xcnt;
    1503                 :           0 :         char       *ondisk_c;
    1504                 :           0 :         int                     fd;
    1505                 :           0 :         char            tmppath[MAXPGPATH];
    1506                 :           0 :         char            path[MAXPGPATH];
    1507                 :           0 :         int                     ret;
    1508                 :           0 :         struct stat stat_buf;
    1509                 :           0 :         Size            sz;
    1510                 :             : 
    1511         [ #  # ]:           0 :         Assert(XLogRecPtrIsValid(lsn));
    1512   [ #  #  #  # ]:           0 :         Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
    1513                 :             :                    builder->last_serialized_snapshot <= lsn);
    1514                 :             : 
    1515                 :             :         /*
    1516                 :             :          * no point in serializing if we cannot continue to work immediately after
    1517                 :             :          * restoring the snapshot
    1518                 :             :          */
    1519         [ #  # ]:           0 :         if (builder->state < SNAPBUILD_CONSISTENT)
    1520                 :           0 :                 return;
    1521                 :             : 
    1522                 :             :         /* consistent snapshots have no next phase */
    1523         [ #  # ]:           0 :         Assert(builder->next_phase_at == InvalidTransactionId);
    1524                 :             : 
    1525                 :             :         /*
    1526                 :             :          * We identify snapshots by the LSN they are valid for. We don't need to
    1527                 :             :          * include timelines in the name as each LSN maps to exactly one timeline
    1528                 :             :          * unless the user used pg_resetwal or similar. If a user did so, there's
    1529                 :             :          * no hope continuing to decode anyway.
    1530                 :             :          */
    1531                 :           0 :         sprintf(path, "%s/%X-%X.snap",
    1532                 :             :                         PG_LOGICAL_SNAPSHOTS_DIR,
    1533                 :           0 :                         LSN_FORMAT_ARGS(lsn));
    1534                 :             : 
    1535                 :             :         /*
    1536                 :             :          * first check whether some other backend already has written the snapshot
    1537                 :             :          * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
    1538                 :             :          * as a valid state. Everything else is an unexpected error.
    1539                 :             :          */
    1540                 :           0 :         ret = stat(path, &stat_buf);
    1541                 :             : 
    1542   [ #  #  #  # ]:           0 :         if (ret != 0 && errno != ENOENT)
    1543   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1544                 :             :                                 (errcode_for_file_access(),
    1545                 :             :                                  errmsg("could not stat file \"%s\": %m", path)));
    1546                 :             : 
    1547         [ #  # ]:           0 :         else if (ret == 0)
    1548                 :             :         {
    1549                 :             :                 /*
    1550                 :             :                  * somebody else has already serialized to this point, don't overwrite
    1551                 :             :                  * but remember location, so we don't need to read old data again.
    1552                 :             :                  *
    1553                 :             :                  * To be sure it has been synced to disk after the rename() from the
    1554                 :             :                  * tempfile filename to the real filename, we just repeat the fsync.
    1555                 :             :                  * That ought to be cheap because in most scenarios it should already
    1556                 :             :                  * be safely on disk.
    1557                 :             :                  */
    1558                 :           0 :                 fsync_fname(path, false);
    1559                 :           0 :                 fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1560                 :             : 
    1561                 :           0 :                 builder->last_serialized_snapshot = lsn;
    1562                 :           0 :                 goto out;
    1563                 :             :         }
    1564                 :             : 
    1565                 :             :         /*
    1566                 :             :          * there is an obvious race condition here between the time we stat(2) the
    1567                 :             :          * file and us writing the file. But we rename the file into place
    1568                 :             :          * atomically and all files created need to contain the same data anyway,
    1569                 :             :          * so this is perfectly fine, although a bit of a resource waste. Locking
    1570                 :             :          * seems like pointless complication.
    1571                 :             :          */
    1572   [ #  #  #  # ]:           0 :         elog(DEBUG1, "serializing snapshot to %s", path);
    1573                 :             : 
    1574                 :             :         /* to make sure only we will write to this tempfile, include pid */
    1575                 :           0 :         sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
    1576                 :             :                         PG_LOGICAL_SNAPSHOTS_DIR,
    1577                 :           0 :                         LSN_FORMAT_ARGS(lsn), MyProcPid);
    1578                 :             : 
    1579                 :             :         /*
    1580                 :             :          * Unlink temporary file if it already exists, needs to have been before a
    1581                 :             :          * crash/error since we won't enter this function twice from within a
    1582                 :             :          * single decoding slot/backend and the temporary file contains the pid of
    1583                 :             :          * the current process.
    1584                 :             :          */
    1585   [ #  #  #  # ]:           0 :         if (unlink(tmppath) != 0 && errno != ENOENT)
    1586   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1587                 :             :                                 (errcode_for_file_access(),
    1588                 :             :                                  errmsg("could not remove file \"%s\": %m", tmppath)));
    1589                 :             : 
    1590                 :           0 :         old_ctx = MemoryContextSwitchTo(builder->context);
    1591                 :             : 
    1592                 :             :         /* Get the catalog modifying transactions that are yet not committed */
    1593                 :           0 :         catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
    1594                 :           0 :         catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
    1595                 :             : 
    1596                 :           0 :         needed_length = sizeof(SnapBuildOnDisk) +
    1597                 :           0 :                 sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
    1598                 :             : 
    1599                 :           0 :         ondisk_c = palloc0(needed_length);
    1600                 :           0 :         ondisk = (SnapBuildOnDisk *) ondisk_c;
    1601                 :           0 :         ondisk->magic = SNAPBUILD_MAGIC;
    1602                 :           0 :         ondisk->version = SNAPBUILD_VERSION;
    1603                 :           0 :         ondisk->length = needed_length;
    1604                 :           0 :         INIT_CRC32C(ondisk->checksum);
    1605                 :           0 :         COMP_CRC32C(ondisk->checksum,
    1606                 :             :                                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1607                 :             :                                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1608                 :           0 :         ondisk_c += sizeof(SnapBuildOnDisk);
    1609                 :             : 
    1610                 :           0 :         memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
    1611                 :             :         /* NULL-ify memory-only data */
    1612                 :           0 :         ondisk->builder.context = NULL;
    1613                 :           0 :         ondisk->builder.snapshot = NULL;
    1614                 :           0 :         ondisk->builder.reorder = NULL;
    1615                 :           0 :         ondisk->builder.committed.xip = NULL;
    1616                 :           0 :         ondisk->builder.catchange.xip = NULL;
    1617                 :             :         /* update catchange only on disk data */
    1618                 :           0 :         ondisk->builder.catchange.xcnt = catchange_xcnt;
    1619                 :             : 
    1620                 :           0 :         COMP_CRC32C(ondisk->checksum,
    1621                 :             :                                 &ondisk->builder,
    1622                 :             :                                 sizeof(SnapBuild));
    1623                 :             : 
    1624                 :             :         /* copy committed xacts */
    1625         [ #  # ]:           0 :         if (builder->committed.xcnt > 0)
    1626                 :             :         {
    1627                 :           0 :                 sz = sizeof(TransactionId) * builder->committed.xcnt;
    1628                 :           0 :                 memcpy(ondisk_c, builder->committed.xip, sz);
    1629                 :           0 :                 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1630                 :           0 :                 ondisk_c += sz;
    1631                 :           0 :         }
    1632                 :             : 
    1633                 :             :         /* copy catalog modifying xacts */
    1634         [ #  # ]:           0 :         if (catchange_xcnt > 0)
    1635                 :             :         {
    1636                 :           0 :                 sz = sizeof(TransactionId) * catchange_xcnt;
    1637                 :           0 :                 memcpy(ondisk_c, catchange_xip, sz);
    1638                 :           0 :                 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1639                 :           0 :                 ondisk_c += sz;
    1640                 :           0 :         }
    1641                 :             : 
    1642                 :           0 :         FIN_CRC32C(ondisk->checksum);
    1643                 :             : 
    1644                 :             :         /* we have valid data now, open tempfile and write it there */
    1645                 :           0 :         fd = OpenTransientFile(tmppath,
    1646                 :             :                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1647         [ #  # ]:           0 :         if (fd < 0)
    1648   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1649                 :             :                                 (errcode_for_file_access(),
    1650                 :             :                                  errmsg("could not open file \"%s\": %m", tmppath)));
    1651                 :             : 
    1652                 :           0 :         errno = 0;
    1653                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
    1654         [ #  # ]:           0 :         if ((write(fd, ondisk, needed_length)) != needed_length)
    1655                 :             :         {
    1656                 :           0 :                 int                     save_errno = errno;
    1657                 :             : 
    1658                 :           0 :                 CloseTransientFile(fd);
    1659                 :             : 
    1660                 :             :                 /* if write didn't set errno, assume problem is no disk space */
    1661         [ #  # ]:           0 :                 errno = save_errno ? save_errno : ENOSPC;
    1662   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1663                 :             :                                 (errcode_for_file_access(),
    1664                 :             :                                  errmsg("could not write to file \"%s\": %m", tmppath)));
    1665                 :           0 :         }
    1666                 :           0 :         pgstat_report_wait_end();
    1667                 :             : 
    1668                 :             :         /*
    1669                 :             :          * fsync the file before renaming so that even if we crash after this we
    1670                 :             :          * have either a fully valid file or nothing.
    1671                 :             :          *
    1672                 :             :          * It's safe to just ERROR on fsync() here because we'll retry the whole
    1673                 :             :          * operation including the writes.
    1674                 :             :          *
    1675                 :             :          * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
    1676                 :             :          * some noticeable overhead since it's performed synchronously during
    1677                 :             :          * decoding?
    1678                 :             :          */
    1679                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
    1680         [ #  # ]:           0 :         if (pg_fsync(fd) != 0)
    1681                 :             :         {
    1682                 :           0 :                 int                     save_errno = errno;
    1683                 :             : 
    1684                 :           0 :                 CloseTransientFile(fd);
    1685                 :           0 :                 errno = save_errno;
    1686   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1687                 :             :                                 (errcode_for_file_access(),
    1688                 :             :                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
    1689                 :           0 :         }
    1690                 :           0 :         pgstat_report_wait_end();
    1691                 :             : 
    1692         [ #  # ]:           0 :         if (CloseTransientFile(fd) != 0)
    1693   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1694                 :             :                                 (errcode_for_file_access(),
    1695                 :             :                                  errmsg("could not close file \"%s\": %m", tmppath)));
    1696                 :             : 
    1697                 :           0 :         fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1698                 :             : 
    1699                 :             :         /*
    1700                 :             :          * We may overwrite the work from some other backend, but that's ok, our
    1701                 :             :          * snapshot is valid as well, we'll just have done some superfluous work.
    1702                 :             :          */
    1703         [ #  # ]:           0 :         if (rename(tmppath, path) != 0)
    1704                 :             :         {
    1705   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1706                 :             :                                 (errcode_for_file_access(),
    1707                 :             :                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1708                 :             :                                                 tmppath, path)));
    1709                 :           0 :         }
    1710                 :             : 
    1711                 :             :         /* make sure we persist */
    1712                 :           0 :         fsync_fname(path, false);
    1713                 :           0 :         fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1714                 :             : 
    1715                 :             :         /*
    1716                 :             :          * Now there's no way we can lose the dumped state anymore, remember this
    1717                 :             :          * as a serialization point.
    1718                 :             :          */
    1719                 :           0 :         builder->last_serialized_snapshot = lsn;
    1720                 :             : 
    1721                 :           0 :         MemoryContextSwitchTo(old_ctx);
    1722                 :             : 
    1723                 :             : out:
    1724                 :           0 :         ReorderBufferSetRestartPoint(builder->reorder,
    1725                 :           0 :                                                                  builder->last_serialized_snapshot);
    1726                 :             :         /* be tidy */
    1727         [ #  # ]:           0 :         if (ondisk)
    1728                 :           0 :                 pfree(ondisk);
    1729         [ #  # ]:           0 :         if (catchange_xip)
    1730                 :           0 :                 pfree(catchange_xip);
    1731         [ #  # ]:           0 : }
    1732                 :             : 
    1733                 :             : /*
    1734                 :             :  * Restore the logical snapshot file contents to 'ondisk'.
    1735                 :             :  *
    1736                 :             :  * 'context' is the memory context where the catalog modifying/committed xid
    1737                 :             :  * will live.
    1738                 :             :  * If 'missing_ok' is true, will not throw an error if the file is not found.
    1739                 :             :  */
    1740                 :             : bool
    1741                 :           0 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
    1742                 :             :                                                  MemoryContext context, bool missing_ok)
    1743                 :             : {
    1744                 :           0 :         int                     fd;
    1745                 :           0 :         pg_crc32c       checksum;
    1746                 :           0 :         Size            sz;
    1747                 :           0 :         char            path[MAXPGPATH];
    1748                 :             : 
    1749                 :           0 :         sprintf(path, "%s/%X-%X.snap",
    1750                 :             :                         PG_LOGICAL_SNAPSHOTS_DIR,
    1751                 :           0 :                         LSN_FORMAT_ARGS(lsn));
    1752                 :             : 
    1753                 :           0 :         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1754                 :             : 
    1755         [ #  # ]:           0 :         if (fd < 0)
    1756                 :             :         {
    1757         [ #  # ]:           0 :                 if (missing_ok && errno == ENOENT)
    1758                 :           0 :                         return false;
    1759                 :             : 
    1760   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1761                 :             :                                 (errcode_for_file_access(),
    1762                 :             :                                  errmsg("could not open file \"%s\": %m", path)));
    1763                 :           0 :         }
    1764                 :             : 
    1765                 :             :         /* ----
    1766                 :             :          * Make sure the snapshot had been stored safely to disk, that's normally
    1767                 :             :          * cheap.
    1768                 :             :          * Note that we do not need PANIC here, nobody will be able to use the
    1769                 :             :          * slot without fsyncing, and saving it won't succeed without an fsync()
    1770                 :             :          * either...
    1771                 :             :          * ----
    1772                 :             :          */
    1773                 :           0 :         fsync_fname(path, false);
    1774                 :           0 :         fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1775                 :             : 
    1776                 :             :         /* read statically sized portion of snapshot */
    1777                 :           0 :         SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
    1778                 :             : 
    1779         [ #  # ]:           0 :         if (ondisk->magic != SNAPBUILD_MAGIC)
    1780   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1781                 :             :                                 (errcode(ERRCODE_DATA_CORRUPTED),
    1782                 :             :                                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
    1783                 :             :                                                 path, ondisk->magic, SNAPBUILD_MAGIC)));
    1784                 :             : 
    1785         [ #  # ]:           0 :         if (ondisk->version != SNAPBUILD_VERSION)
    1786   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1787                 :             :                                 (errcode(ERRCODE_DATA_CORRUPTED),
    1788                 :             :                                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
    1789                 :             :                                                 path, ondisk->version, SNAPBUILD_VERSION)));
    1790                 :             : 
    1791                 :           0 :         INIT_CRC32C(checksum);
    1792                 :           0 :         COMP_CRC32C(checksum,
    1793                 :             :                                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1794                 :             :                                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1795                 :             : 
    1796                 :             :         /* read SnapBuild */
    1797                 :           0 :         SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
    1798                 :           0 :         COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
    1799                 :             : 
    1800                 :             :         /* restore committed xacts information */
    1801         [ #  # ]:           0 :         if (ondisk->builder.committed.xcnt > 0)
    1802                 :             :         {
    1803                 :           0 :                 sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
    1804                 :           0 :                 ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
    1805                 :           0 :                 SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
    1806                 :           0 :                 COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
    1807                 :           0 :         }
    1808                 :             : 
    1809                 :             :         /* restore catalog modifying xacts information */
    1810         [ #  # ]:           0 :         if (ondisk->builder.catchange.xcnt > 0)
    1811                 :             :         {
    1812                 :           0 :                 sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
    1813                 :           0 :                 ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
    1814                 :           0 :                 SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
    1815                 :           0 :                 COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
    1816                 :           0 :         }
    1817                 :             : 
    1818         [ #  # ]:           0 :         if (CloseTransientFile(fd) != 0)
    1819   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1820                 :             :                                 (errcode_for_file_access(),
    1821                 :             :                                  errmsg("could not close file \"%s\": %m", path)));
    1822                 :             : 
    1823                 :           0 :         FIN_CRC32C(checksum);
    1824                 :             : 
    1825                 :             :         /* verify checksum of what we've read */
    1826         [ #  # ]:           0 :         if (!EQ_CRC32C(checksum, ondisk->checksum))
    1827   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1828                 :             :                                 (errcode(ERRCODE_DATA_CORRUPTED),
    1829                 :             :                                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
    1830                 :             :                                                 path, checksum, ondisk->checksum)));
    1831                 :             : 
    1832                 :           0 :         return true;
    1833                 :           0 : }
    1834                 :             : 
    1835                 :             : /*
    1836                 :             :  * Restore a snapshot into 'builder' if previously one has been stored at the
    1837                 :             :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
    1838                 :             :  */
    1839                 :             : static bool
    1840                 :           0 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    1841                 :             : {
    1842                 :           0 :         SnapBuildOnDisk ondisk;
    1843                 :             : 
    1844                 :             :         /* no point in loading a snapshot if we're already there */
    1845         [ #  # ]:           0 :         if (builder->state == SNAPBUILD_CONSISTENT)
    1846                 :           0 :                 return false;
    1847                 :             : 
    1848                 :             :         /* validate and restore the snapshot to 'ondisk' */
    1849         [ #  # ]:           0 :         if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
    1850                 :           0 :                 return false;
    1851                 :             : 
    1852                 :             :         /*
    1853                 :             :          * ok, we now have a sensible snapshot here, figure out if it has more
    1854                 :             :          * information than we have.
    1855                 :             :          */
    1856                 :             : 
    1857                 :             :         /*
    1858                 :             :          * We are only interested in consistent snapshots for now, comparing
    1859                 :             :          * whether one incomplete snapshot is more "advanced" seems to be
    1860                 :             :          * unnecessarily complex.
    1861                 :             :          */
    1862         [ #  # ]:           0 :         if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
    1863                 :           0 :                 goto snapshot_not_interesting;
    1864                 :             : 
    1865                 :             :         /*
    1866                 :             :          * Don't use a snapshot that requires an xmin that we cannot guarantee to
    1867                 :             :          * be available.
    1868                 :             :          */
    1869         [ #  # ]:           0 :         if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
    1870                 :           0 :                 goto snapshot_not_interesting;
    1871                 :             : 
    1872                 :             :         /*
    1873                 :             :          * Consistent snapshots have no next phase. Reset next_phase_at as it is
    1874                 :             :          * possible that an old value may remain.
    1875                 :             :          */
    1876         [ #  # ]:           0 :         Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
    1877                 :           0 :         builder->next_phase_at = InvalidTransactionId;
    1878                 :             : 
    1879                 :             :         /* ok, we think the snapshot is sensible, copy over everything important */
    1880                 :           0 :         builder->xmin = ondisk.builder.xmin;
    1881                 :           0 :         builder->xmax = ondisk.builder.xmax;
    1882                 :           0 :         builder->state = ondisk.builder.state;
    1883                 :             : 
    1884                 :           0 :         builder->committed.xcnt = ondisk.builder.committed.xcnt;
    1885                 :             :         /* We only allocated/stored xcnt, not xcnt_space xids ! */
    1886                 :             :         /* don't overwrite preallocated xip, if we don't have anything here */
    1887         [ #  # ]:           0 :         if (builder->committed.xcnt > 0)
    1888                 :             :         {
    1889                 :           0 :                 pfree(builder->committed.xip);
    1890                 :           0 :                 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
    1891                 :           0 :                 builder->committed.xip = ondisk.builder.committed.xip;
    1892                 :           0 :         }
    1893                 :           0 :         ondisk.builder.committed.xip = NULL;
    1894                 :             : 
    1895                 :             :         /* set catalog modifying transactions */
    1896         [ #  # ]:           0 :         if (builder->catchange.xip)
    1897                 :           0 :                 pfree(builder->catchange.xip);
    1898                 :           0 :         builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
    1899                 :           0 :         builder->catchange.xip = ondisk.builder.catchange.xip;
    1900                 :           0 :         ondisk.builder.catchange.xip = NULL;
    1901                 :             : 
    1902                 :             :         /* our snapshot is not interesting anymore, build a new one */
    1903         [ #  # ]:           0 :         if (builder->snapshot != NULL)
    1904                 :             :         {
    1905                 :           0 :                 SnapBuildSnapDecRefcount(builder->snapshot);
    1906                 :           0 :         }
    1907                 :           0 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
    1908                 :           0 :         SnapBuildSnapIncRefcount(builder->snapshot);
    1909                 :             : 
    1910                 :           0 :         ReorderBufferSetRestartPoint(builder->reorder, lsn);
    1911                 :             : 
    1912         [ #  # ]:           0 :         Assert(builder->state == SNAPBUILD_CONSISTENT);
    1913                 :             : 
    1914   [ #  #  #  # ]:           0 :         ereport(LOG,
    1915                 :             :                         errmsg("logical decoding found consistent point at %X/%08X",
    1916                 :             :                                    LSN_FORMAT_ARGS(lsn)),
    1917                 :             :                         errdetail("Logical decoding will begin using saved snapshot."));
    1918                 :           0 :         return true;
    1919                 :             : 
    1920                 :             : snapshot_not_interesting:
    1921         [ #  # ]:           0 :         if (ondisk.builder.committed.xip != NULL)
    1922                 :           0 :                 pfree(ondisk.builder.committed.xip);
    1923         [ #  # ]:           0 :         if (ondisk.builder.catchange.xip != NULL)
    1924                 :           0 :                 pfree(ondisk.builder.catchange.xip);
    1925                 :           0 :         return false;
    1926                 :           0 : }
    1927                 :             : 
    1928                 :             : /*
    1929                 :             :  * Read the contents of the serialized snapshot to 'dest'.
    1930                 :             :  */
    1931                 :             : static void
    1932                 :           0 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
    1933                 :             : {
    1934                 :           0 :         int                     readBytes;
    1935                 :             : 
    1936                 :           0 :         pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
    1937                 :           0 :         readBytes = read(fd, dest, size);
    1938                 :           0 :         pgstat_report_wait_end();
    1939         [ #  # ]:           0 :         if (readBytes != size)
    1940                 :             :         {
    1941                 :           0 :                 int                     save_errno = errno;
    1942                 :             : 
    1943                 :           0 :                 CloseTransientFile(fd);
    1944                 :             : 
    1945         [ #  # ]:           0 :                 if (readBytes < 0)
    1946                 :             :                 {
    1947                 :           0 :                         errno = save_errno;
    1948   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1949                 :             :                                         (errcode_for_file_access(),
    1950                 :             :                                          errmsg("could not read file \"%s\": %m", path)));
    1951                 :           0 :                 }
    1952                 :             :                 else
    1953   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1954                 :             :                                         (errcode(ERRCODE_DATA_CORRUPTED),
    1955                 :             :                                          errmsg("could not read file \"%s\": read %d of %zu",
    1956                 :             :                                                         path, readBytes, size)));
    1957                 :           0 :         }
    1958                 :           0 : }
    1959                 :             : 
    1960                 :             : /*
    1961                 :             :  * Remove all serialized snapshots that are not required anymore because no
    1962                 :             :  * slot can need them. This doesn't actually have to run during a checkpoint,
    1963                 :             :  * but it's a convenient point to schedule this.
    1964                 :             :  *
    1965                 :             :  * NB: We run this during checkpoints even if logical decoding is disabled so
    1966                 :             :  * we cleanup old slots at some point after it got disabled.
    1967                 :             :  */
    1968                 :             : void
    1969                 :           7 : CheckPointSnapBuild(void)
    1970                 :             : {
    1971                 :           7 :         XLogRecPtr      cutoff;
    1972                 :           7 :         XLogRecPtr      redo;
    1973                 :           7 :         DIR                *snap_dir;
    1974                 :           7 :         struct dirent *snap_de;
    1975                 :           7 :         char            path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
    1976                 :             : 
    1977                 :             :         /*
    1978                 :             :          * We start off with a minimum of the last redo pointer. No new
    1979                 :             :          * replication slot will start before that, so that's a safe upper bound
    1980                 :             :          * for removal.
    1981                 :             :          */
    1982                 :           7 :         redo = GetRedoRecPtr();
    1983                 :             : 
    1984                 :             :         /* now check for the restart ptrs from existing slots */
    1985                 :           7 :         cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    1986                 :             : 
    1987                 :             :         /* don't start earlier than the restart lsn */
    1988         [ +  - ]:           7 :         if (redo < cutoff)
    1989                 :           0 :                 cutoff = redo;
    1990                 :             : 
    1991                 :           7 :         snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
    1992         [ +  + ]:          21 :         while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
    1993                 :             :         {
    1994                 :          14 :                 uint32          hi;
    1995                 :          14 :                 uint32          lo;
    1996                 :          14 :                 XLogRecPtr      lsn;
    1997                 :          14 :                 PGFileType      de_type;
    1998                 :             : 
    1999   [ +  +  +  - ]:          14 :                 if (strcmp(snap_de->d_name, ".") == 0 ||
    2000                 :           7 :                         strcmp(snap_de->d_name, "..") == 0)
    2001                 :          14 :                         continue;
    2002                 :             : 
    2003                 :           0 :                 snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
    2004                 :           0 :                 de_type = get_dirent_type(path, snap_de, false, DEBUG1);
    2005                 :             : 
    2006   [ #  #  #  # ]:           0 :                 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    2007                 :             :                 {
    2008   [ #  #  #  # ]:           0 :                         elog(DEBUG1, "only regular files expected: %s", path);
    2009                 :           0 :                         continue;
    2010                 :             :                 }
    2011                 :             : 
    2012                 :             :                 /*
    2013                 :             :                  * temporary filenames from SnapBuildSerialize() include the LSN and
    2014                 :             :                  * everything but are postfixed by .$pid.tmp. We can just remove them
    2015                 :             :                  * the same as other files because there can be none that are
    2016                 :             :                  * currently being written that are older than cutoff.
    2017                 :             :                  *
    2018                 :             :                  * We just log a message if a file doesn't fit the pattern, it's
    2019                 :             :                  * probably some editors lock/state file or similar...
    2020                 :             :                  */
    2021         [ #  # ]:           0 :                 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
    2022                 :             :                 {
    2023   [ #  #  #  # ]:           0 :                         ereport(LOG,
    2024                 :             :                                         (errmsg("could not parse file name \"%s\"", path)));
    2025                 :           0 :                         continue;
    2026                 :             :                 }
    2027                 :             : 
    2028                 :           0 :                 lsn = ((uint64) hi) << 32 | lo;
    2029                 :             : 
    2030                 :             :                 /* check whether we still need it */
    2031   [ #  #  #  # ]:           0 :                 if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
    2032                 :             :                 {
    2033   [ #  #  #  # ]:           0 :                         elog(DEBUG1, "removing snapbuild snapshot %s", path);
    2034                 :             : 
    2035                 :             :                         /*
    2036                 :             :                          * It's not particularly harmful, though strange, if we can't
    2037                 :             :                          * remove the file here. Don't prevent the checkpoint from
    2038                 :             :                          * completing, that'd be a cure worse than the disease.
    2039                 :             :                          */
    2040         [ #  # ]:           0 :                         if (unlink(path) < 0)
    2041                 :             :                         {
    2042   [ #  #  #  # ]:           0 :                                 ereport(LOG,
    2043                 :             :                                                 (errcode_for_file_access(),
    2044                 :             :                                                  errmsg("could not remove file \"%s\": %m",
    2045                 :             :                                                                 path)));
    2046                 :           0 :                                 continue;
    2047                 :             :                         }
    2048                 :           0 :                 }
    2049      [ -  +  - ]:          14 :         }
    2050                 :           7 :         FreeDir(snap_dir);
    2051                 :           7 : }
    2052                 :             : 
    2053                 :             : /*
    2054                 :             :  * Check if a logical snapshot at the specified point has been serialized.
    2055                 :             :  */
    2056                 :             : bool
    2057                 :           0 : SnapBuildSnapshotExists(XLogRecPtr lsn)
    2058                 :             : {
    2059                 :           0 :         char            path[MAXPGPATH];
    2060                 :           0 :         int                     ret;
    2061                 :           0 :         struct stat stat_buf;
    2062                 :             : 
    2063                 :           0 :         sprintf(path, "%s/%X-%X.snap",
    2064                 :             :                         PG_LOGICAL_SNAPSHOTS_DIR,
    2065                 :           0 :                         LSN_FORMAT_ARGS(lsn));
    2066                 :             : 
    2067                 :           0 :         ret = stat(path, &stat_buf);
    2068                 :             : 
    2069   [ #  #  #  # ]:           0 :         if (ret != 0 && errno != ENOENT)
    2070   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2071                 :             :                                 (errcode_for_file_access(),
    2072                 :             :                                  errmsg("could not stat file \"%s\": %m", path)));
    2073                 :             : 
    2074                 :           0 :         return ret == 0;
    2075                 :           0 : }
        

Generated by: LCOV version 2.3.2-1