LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 524 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 20 0
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 0.0 % 256 0

             Branch data     Line data    Source code
       1                 :             : /* -------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * decode.c
       4                 :             :  *              This module decodes WAL records read using xlogreader.h's APIs for the
       5                 :             :  *              purpose of logical decoding by passing information to the
       6                 :             :  *              reorderbuffer module (containing the actual changes) and to the
       7                 :             :  *              snapbuild module to build a fitting catalog snapshot (to be able to
       8                 :             :  *              properly decode the changes in the reorderbuffer).
       9                 :             :  *
      10                 :             :  * NOTE:
      11                 :             :  *              This basically tries to handle all low level xlog stuff for
      12                 :             :  *              reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13                 :             :  *              specific record's struct is used to pass data along, but those just
      14                 :             :  *              happen to contain the right amount of data in a convenient
      15                 :             :  *              format. There isn't and shouldn't be much intelligence about the
      16                 :             :  *              contents of records in here except turning them into a more usable
      17                 :             :  *              format.
      18                 :             :  *
      19                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
      21                 :             :  *
      22                 :             :  * IDENTIFICATION
      23                 :             :  *        src/backend/replication/logical/decode.c
      24                 :             :  *
      25                 :             :  * -------------------------------------------------------------------------
      26                 :             :  */
      27                 :             : #include "postgres.h"
      28                 :             : 
      29                 :             : #include "access/heapam_xlog.h"
      30                 :             : #include "access/transam.h"
      31                 :             : #include "access/xact.h"
      32                 :             : #include "access/xlog_internal.h"
      33                 :             : #include "access/xlogreader.h"
      34                 :             : #include "access/xlogrecord.h"
      35                 :             : #include "catalog/pg_control.h"
      36                 :             : #include "replication/decode.h"
      37                 :             : #include "replication/logical.h"
      38                 :             : #include "replication/message.h"
      39                 :             : #include "replication/reorderbuffer.h"
      40                 :             : #include "replication/snapbuild.h"
      41                 :             : #include "storage/standbydefs.h"
      42                 :             : 
      43                 :             : /* individual record(group)'s handlers */
      44                 :             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      45                 :             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46                 :             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47                 :             : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48                 :             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49                 :             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50                 :             : 
      51                 :             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      52                 :             :                                                  xl_xact_parsed_commit *parsed, TransactionId xid,
      53                 :             :                                                  bool two_phase);
      54                 :             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      55                 :             :                                                 xl_xact_parsed_abort *parsed, TransactionId xid,
      56                 :             :                                                 bool two_phase);
      57                 :             : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      58                 :             :                                                   xl_xact_parsed_prepare *parsed);
      59                 :             : 
      60                 :             : 
      61                 :             : /* common function to decode tuples */
      62                 :             : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      63                 :             : 
      64                 :             : /* helper functions for decoding transactions */
      65                 :             : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      66                 :             :                                                                  TransactionId xid, const char *gid);
      67                 :             : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      68                 :             :                                                           XLogRecordBuffer *buf, Oid txn_dbid,
      69                 :             :                                                           RepOriginId origin_id);
      70                 :             : 
      71                 :             : /*
      72                 :             :  * Take every XLogReadRecord()ed record and perform the actions required to
      73                 :             :  * decode it using the output plugin already setup in the logical decoding
      74                 :             :  * context.
      75                 :             :  *
      76                 :             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      77                 :             :  * (xids contained in the content of records are not relevant for this rule).
      78                 :             :  * That means that for records which'd otherwise not go through the
      79                 :             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      80                 :             :  * call ReorderBufferProcessXid for each record type by default, because
      81                 :             :  * e.g. empty xacts can be handled more efficiently if there's no previous
      82                 :             :  * state for them.
      83                 :             :  *
      84                 :             :  * We also support the ability to fast forward thru records, skipping some
      85                 :             :  * record types completely - see individual record types for details.
      86                 :             :  */
      87                 :             : void
      88                 :           0 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      89                 :             : {
      90                 :           0 :         XLogRecordBuffer buf;
      91                 :           0 :         TransactionId txid;
      92                 :           0 :         RmgrData        rmgr;
      93                 :             : 
      94                 :           0 :         buf.origptr = ctx->reader->ReadRecPtr;
      95                 :           0 :         buf.endptr = ctx->reader->EndRecPtr;
      96                 :           0 :         buf.record = record;
      97                 :             : 
      98                 :           0 :         txid = XLogRecGetTopXid(record);
      99                 :             : 
     100                 :             :         /*
     101                 :             :          * If the top-level xid is valid, we need to assign the subxact to the
     102                 :             :          * top-level xact. We need to do this for all records, hence we do it
     103                 :             :          * before the switch.
     104                 :             :          */
     105         [ #  # ]:           0 :         if (TransactionIdIsValid(txid))
     106                 :             :         {
     107                 :           0 :                 ReorderBufferAssignChild(ctx->reorder,
     108                 :           0 :                                                                  txid,
     109                 :           0 :                                                                  XLogRecGetXid(record),
     110                 :           0 :                                                                  buf.origptr);
     111                 :           0 :         }
     112                 :             : 
     113                 :           0 :         rmgr = GetRmgr(XLogRecGetRmid(record));
     114                 :             : 
     115         [ #  # ]:           0 :         if (rmgr.rm_decode != NULL)
     116                 :           0 :                 rmgr.rm_decode(ctx, &buf);
     117                 :             :         else
     118                 :             :         {
     119                 :             :                 /* just deal with xid, and done */
     120                 :           0 :                 ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     121                 :           0 :                                                                 buf.origptr);
     122                 :             :         }
     123                 :           0 : }
     124                 :             : 
     125                 :             : /*
     126                 :             :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     127                 :             :  */
     128                 :             : void
     129                 :           0 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     130                 :             : {
     131                 :           0 :         SnapBuild  *builder = ctx->snapshot_builder;
     132                 :           0 :         uint8           info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     133                 :             : 
     134                 :           0 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     135                 :           0 :                                                         buf->origptr);
     136                 :             : 
     137   [ #  #  #  #  :           0 :         switch (info)
                      # ]
     138                 :             :         {
     139                 :             :                         /* this is also used in END_OF_RECOVERY checkpoints */
     140                 :             :                 case XLOG_CHECKPOINT_SHUTDOWN:
     141                 :             :                 case XLOG_END_OF_RECOVERY:
     142                 :           0 :                         SnapBuildSerializationPoint(builder, buf->origptr);
     143                 :             : 
     144                 :           0 :                         break;
     145                 :             :                 case XLOG_CHECKPOINT_ONLINE:
     146                 :             : 
     147                 :             :                         /*
     148                 :             :                          * a RUNNING_XACTS record will have been logged near to this, we
     149                 :             :                          * can restart from there.
     150                 :             :                          */
     151                 :             :                         break;
     152                 :             :                 case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
     153                 :             :                         {
     154                 :           0 :                                 bool            logical_decoding;
     155                 :             : 
     156                 :           0 :                                 memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
     157                 :             : 
     158                 :             :                                 /*
     159                 :             :                                  * Error out as we should not decode this WAL record.
     160                 :             :                                  *
     161                 :             :                                  * Logical decoding is disabled, and existing logical slots on
     162                 :             :                                  * the standby are invalidated when this WAL record is
     163                 :             :                                  * replayed. No logical decoder can process this WAL record
     164                 :             :                                  * until replay completes, and by then the slots are already
     165                 :             :                                  * invalidated. Furthermore, no new logical slots can be
     166                 :             :                                  * created while logical decoding is disabled. This cannot
     167                 :             :                                  * occur even on primary either, since it will not restart
     168                 :             :                                  * with wal_level < replica if any logical slots exist.
     169                 :             :                                  */
     170   [ #  #  #  # ]:           0 :                                 elog(ERROR, "unexpected logical decoding status change %d",
     171                 :             :                                          logical_decoding);
     172                 :             : 
     173                 :             :                                 break;
     174                 :           0 :                         }
     175                 :             :                 case XLOG_NOOP:
     176                 :             :                 case XLOG_NEXTOID:
     177                 :             :                 case XLOG_SWITCH:
     178                 :             :                 case XLOG_BACKUP_END:
     179                 :             :                 case XLOG_PARAMETER_CHANGE:
     180                 :             :                 case XLOG_RESTORE_POINT:
     181                 :             :                 case XLOG_FPW_CHANGE:
     182                 :             :                 case XLOG_FPI_FOR_HINT:
     183                 :             :                 case XLOG_FPI:
     184                 :             :                 case XLOG_OVERWRITE_CONTRECORD:
     185                 :             :                 case XLOG_CHECKPOINT_REDO:
     186                 :           0 :                         break;
     187                 :             :                 default:
     188   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     189                 :           0 :         }
     190                 :           0 : }
     191                 :             : 
     192                 :             : /*
     193                 :             :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     194                 :             :  */
     195                 :             : void
     196                 :           0 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     197                 :             : {
     198                 :           0 :         SnapBuild  *builder = ctx->snapshot_builder;
     199                 :           0 :         ReorderBuffer *reorder = ctx->reorder;
     200                 :           0 :         XLogReaderState *r = buf->record;
     201                 :           0 :         uint8           info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     202                 :             : 
     203                 :             :         /*
     204                 :             :          * If the snapshot isn't yet fully built, we cannot decode anything, so
     205                 :             :          * bail out.
     206                 :             :          */
     207         [ #  # ]:           0 :         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     208                 :           0 :                 return;
     209                 :             : 
     210   [ #  #  #  #  :           0 :         switch (info)
                   #  # ]
     211                 :             :         {
     212                 :             :                 case XLOG_XACT_COMMIT:
     213                 :             :                 case XLOG_XACT_COMMIT_PREPARED:
     214                 :             :                         {
     215                 :           0 :                                 xl_xact_commit *xlrec;
     216                 :           0 :                                 xl_xact_parsed_commit parsed;
     217                 :           0 :                                 TransactionId xid;
     218                 :           0 :                                 bool            two_phase = false;
     219                 :             : 
     220                 :           0 :                                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     221                 :           0 :                                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     222                 :             : 
     223         [ #  # ]:           0 :                                 if (!TransactionIdIsValid(parsed.twophase_xid))
     224                 :           0 :                                         xid = XLogRecGetXid(r);
     225                 :             :                                 else
     226                 :           0 :                                         xid = parsed.twophase_xid;
     227                 :             : 
     228                 :             :                                 /*
     229                 :             :                                  * We would like to process the transaction in a two-phase
     230                 :             :                                  * manner iff output plugin supports two-phase commits and
     231                 :             :                                  * doesn't filter the transaction at prepare time.
     232                 :             :                                  */
     233         [ #  # ]:           0 :                                 if (info == XLOG_XACT_COMMIT_PREPARED)
     234                 :           0 :                                         two_phase = !(FilterPrepare(ctx, xid,
     235                 :           0 :                                                                                                 parsed.twophase_gid));
     236                 :             : 
     237                 :           0 :                                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     238                 :             :                                 break;
     239                 :           0 :                         }
     240                 :             :                 case XLOG_XACT_ABORT:
     241                 :             :                 case XLOG_XACT_ABORT_PREPARED:
     242                 :             :                         {
     243                 :           0 :                                 xl_xact_abort *xlrec;
     244                 :           0 :                                 xl_xact_parsed_abort parsed;
     245                 :           0 :                                 TransactionId xid;
     246                 :           0 :                                 bool            two_phase = false;
     247                 :             : 
     248                 :           0 :                                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     249                 :           0 :                                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     250                 :             : 
     251         [ #  # ]:           0 :                                 if (!TransactionIdIsValid(parsed.twophase_xid))
     252                 :           0 :                                         xid = XLogRecGetXid(r);
     253                 :             :                                 else
     254                 :           0 :                                         xid = parsed.twophase_xid;
     255                 :             : 
     256                 :             :                                 /*
     257                 :             :                                  * We would like to process the transaction in a two-phase
     258                 :             :                                  * manner iff output plugin supports two-phase commits and
     259                 :             :                                  * doesn't filter the transaction at prepare time.
     260                 :             :                                  */
     261         [ #  # ]:           0 :                                 if (info == XLOG_XACT_ABORT_PREPARED)
     262                 :           0 :                                         two_phase = !(FilterPrepare(ctx, xid,
     263                 :           0 :                                                                                                 parsed.twophase_gid));
     264                 :             : 
     265                 :           0 :                                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     266                 :             :                                 break;
     267                 :           0 :                         }
     268                 :             :                 case XLOG_XACT_ASSIGNMENT:
     269                 :             : 
     270                 :             :                         /*
     271                 :             :                          * We assign subxact to the toplevel xact while processing each
     272                 :             :                          * record if required.  So, we don't need to do anything here. See
     273                 :             :                          * LogicalDecodingProcessRecord.
     274                 :             :                          */
     275                 :             :                         break;
     276                 :             :                 case XLOG_XACT_INVALIDATIONS:
     277                 :             :                         {
     278                 :           0 :                                 TransactionId xid;
     279                 :           0 :                                 xl_xact_invals *invals;
     280                 :             : 
     281                 :           0 :                                 xid = XLogRecGetXid(r);
     282                 :           0 :                                 invals = (xl_xact_invals *) XLogRecGetData(r);
     283                 :             : 
     284                 :             :                                 /*
     285                 :             :                                  * Execute the invalidations for xid-less transactions,
     286                 :             :                                  * otherwise, accumulate them so that they can be processed at
     287                 :             :                                  * the commit time.
     288                 :             :                                  */
     289         [ #  # ]:           0 :                                 if (TransactionIdIsValid(xid))
     290                 :             :                                 {
     291         [ #  # ]:           0 :                                         if (!ctx->fast_forward)
     292                 :           0 :                                                 ReorderBufferAddInvalidations(reorder, xid,
     293                 :           0 :                                                                                                           buf->origptr,
     294                 :           0 :                                                                                                           invals->nmsgs,
     295                 :           0 :                                                                                                           invals->msgs);
     296                 :           0 :                                         ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     297                 :           0 :                                                                                                           buf->origptr);
     298                 :           0 :                                 }
     299         [ #  # ]:           0 :                                 else if (!ctx->fast_forward)
     300                 :           0 :                                         ReorderBufferImmediateInvalidation(ctx->reorder,
     301                 :           0 :                                                                                                            invals->nmsgs,
     302                 :           0 :                                                                                                            invals->msgs);
     303                 :             : 
     304                 :             :                                 break;
     305                 :           0 :                         }
     306                 :             :                 case XLOG_XACT_PREPARE:
     307                 :             :                         {
     308                 :           0 :                                 xl_xact_parsed_prepare parsed;
     309                 :           0 :                                 xl_xact_prepare *xlrec;
     310                 :             : 
     311                 :             :                                 /* ok, parse it */
     312                 :           0 :                                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     313                 :           0 :                                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     314                 :           0 :                                                                    xlrec, &parsed);
     315                 :             : 
     316                 :             :                                 /*
     317                 :             :                                  * We would like to process the transaction in a two-phase
     318                 :             :                                  * manner iff output plugin supports two-phase commits and
     319                 :             :                                  * doesn't filter the transaction at prepare time.
     320                 :             :                                  */
     321   [ #  #  #  # ]:           0 :                                 if (FilterPrepare(ctx, parsed.twophase_xid,
     322                 :           0 :                                                                   parsed.twophase_gid))
     323                 :             :                                 {
     324                 :           0 :                                         ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     325                 :           0 :                                                                                         buf->origptr);
     326                 :           0 :                                         break;
     327                 :             :                                 }
     328                 :             : 
     329                 :             :                                 /*
     330                 :             :                                  * Note that if the prepared transaction has locked [user]
     331                 :             :                                  * catalog tables exclusively then decoding prepare can block
     332                 :             :                                  * till the main transaction is committed because it needs to
     333                 :             :                                  * lock the catalog tables.
     334                 :             :                                  *
     335                 :             :                                  * XXX Now, this can even lead to a deadlock if the prepare
     336                 :             :                                  * transaction is waiting to get it logically replicated for
     337                 :             :                                  * distributed 2PC. This can be avoided by disallowing
     338                 :             :                                  * preparing transactions that have locked [user] catalog
     339                 :             :                                  * tables exclusively but as of now, we ask users not to do
     340                 :             :                                  * such an operation.
     341                 :             :                                  */
     342                 :           0 :                                 DecodePrepare(ctx, buf, &parsed);
     343                 :           0 :                                 break;
     344                 :           0 :                         }
     345                 :             :                 default:
     346   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     347                 :           0 :         }
     348         [ #  # ]:           0 : }
     349                 :             : 
     350                 :             : /*
     351                 :             :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     352                 :             :  */
     353                 :             : void
     354                 :           0 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     355                 :             : {
     356                 :           0 :         SnapBuild  *builder = ctx->snapshot_builder;
     357                 :           0 :         XLogReaderState *r = buf->record;
     358                 :           0 :         uint8           info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     359                 :             : 
     360                 :           0 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     361                 :             : 
     362      [ #  #  # ]:           0 :         switch (info)
     363                 :             :         {
     364                 :             :                 case XLOG_RUNNING_XACTS:
     365                 :             :                         {
     366                 :           0 :                                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     367                 :             : 
     368                 :           0 :                                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     369                 :             : 
     370                 :             :                                 /*
     371                 :             :                                  * Abort all transactions that we keep track of, that are
     372                 :             :                                  * older than the record's oldestRunningXid. This is the most
     373                 :             :                                  * convenient spot for doing so since, in contrast to shutdown
     374                 :             :                                  * or end-of-recovery checkpoints, we have information about
     375                 :             :                                  * all running transactions which includes prepared ones,
     376                 :             :                                  * while shutdown checkpoints just know that no non-prepared
     377                 :             :                                  * transactions are in progress.
     378                 :             :                                  */
     379                 :           0 :                                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     380                 :           0 :                         }
     381                 :           0 :                         break;
     382                 :             :                 case XLOG_STANDBY_LOCK:
     383                 :             :                         break;
     384                 :             :                 case XLOG_INVALIDATIONS:
     385                 :             : 
     386                 :             :                         /*
     387                 :             :                          * We are processing the invalidations at the command level via
     388                 :             :                          * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     389                 :             :                          */
     390                 :             :                         break;
     391                 :             :                 default:
     392   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     393                 :           0 :         }
     394                 :           0 : }
     395                 :             : 
     396                 :             : /*
     397                 :             :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     398                 :             :  */
     399                 :             : void
     400                 :           0 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     401                 :             : {
     402                 :           0 :         uint8           info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     403                 :           0 :         TransactionId xid = XLogRecGetXid(buf->record);
     404                 :           0 :         SnapBuild  *builder = ctx->snapshot_builder;
     405                 :             : 
     406                 :           0 :         ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     407                 :             : 
     408                 :             :         /*
     409                 :             :          * If we don't have snapshot or we are just fast-forwarding, there is no
     410                 :             :          * point in decoding data changes. However, it's crucial to build the base
     411                 :             :          * snapshot during fast-forward mode (as is done in
     412                 :             :          * SnapBuildProcessChange()) because we require the snapshot's xmin when
     413                 :             :          * determining the candidate catalog_xmin for the replication slot. See
     414                 :             :          * SnapBuildProcessRunningXacts().
     415                 :             :          */
     416         [ #  # ]:           0 :         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     417                 :           0 :                 return;
     418                 :             : 
     419   [ #  #  #  #  :           0 :         switch (info)
                      # ]
     420                 :             :         {
     421                 :             :                 case XLOG_HEAP2_MULTI_INSERT:
     422   [ #  #  #  # ]:           0 :                         if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     423                 :           0 :                                 !ctx->fast_forward)
     424                 :           0 :                                 DecodeMultiInsert(ctx, buf);
     425                 :           0 :                         break;
     426                 :             :                 case XLOG_HEAP2_NEW_CID:
     427         [ #  # ]:           0 :                         if (!ctx->fast_forward)
     428                 :             :                         {
     429                 :           0 :                                 xl_heap_new_cid *xlrec;
     430                 :             : 
     431                 :           0 :                                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     432                 :           0 :                                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     433                 :             : 
     434                 :             :                                 break;
     435                 :           0 :                         }
     436                 :             :                 case XLOG_HEAP2_REWRITE:
     437                 :             : 
     438                 :             :                         /*
     439                 :             :                          * Although these records only exist to serve the needs of logical
     440                 :             :                          * decoding, all the work happens as part of crash or archive
     441                 :             :                          * recovery, so we don't need to do anything here.
     442                 :             :                          */
     443                 :             :                         break;
     444                 :             : 
     445                 :             :                         /*
     446                 :             :                          * Everything else here is just low level physical stuff we're not
     447                 :             :                          * interested in.
     448                 :             :                          */
     449                 :             :                 case XLOG_HEAP2_PRUNE_ON_ACCESS:
     450                 :             :                 case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     451                 :             :                 case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     452                 :             :                 case XLOG_HEAP2_VISIBLE:
     453                 :             :                 case XLOG_HEAP2_LOCK_UPDATED:
     454                 :           0 :                         break;
     455                 :             :                 default:
     456   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     457                 :           0 :         }
     458         [ #  # ]:           0 : }
     459                 :             : 
     460                 :             : /*
     461                 :             :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     462                 :             :  */
     463                 :             : void
     464                 :           0 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     465                 :             : {
     466                 :           0 :         uint8           info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     467                 :           0 :         TransactionId xid = XLogRecGetXid(buf->record);
     468                 :           0 :         SnapBuild  *builder = ctx->snapshot_builder;
     469                 :             : 
     470                 :           0 :         ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     471                 :             : 
     472                 :             :         /*
     473                 :             :          * If we don't have snapshot or we are just fast-forwarding, there is no
     474                 :             :          * point in decoding data changes. However, it's crucial to build the base
     475                 :             :          * snapshot during fast-forward mode (as is done in
     476                 :             :          * SnapBuildProcessChange()) because we require the snapshot's xmin when
     477                 :             :          * determining the candidate catalog_xmin for the replication slot. See
     478                 :             :          * SnapBuildProcessRunningXacts().
     479                 :             :          */
     480         [ #  # ]:           0 :         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     481                 :           0 :                 return;
     482                 :             : 
     483   [ #  #  #  #  :           0 :         switch (info)
                #  #  # ]
     484                 :             :         {
     485                 :             :                 case XLOG_HEAP_INSERT:
     486   [ #  #  #  # ]:           0 :                         if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     487                 :           0 :                                 !ctx->fast_forward)
     488                 :           0 :                                 DecodeInsert(ctx, buf);
     489                 :           0 :                         break;
     490                 :             : 
     491                 :             :                         /*
     492                 :             :                          * Treat HOT update as normal updates. There is no useful
     493                 :             :                          * information in the fact that we could make it a HOT update
     494                 :             :                          * locally and the WAL layout is compatible.
     495                 :             :                          */
     496                 :             :                 case XLOG_HEAP_HOT_UPDATE:
     497                 :             :                 case XLOG_HEAP_UPDATE:
     498   [ #  #  #  # ]:           0 :                         if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     499                 :           0 :                                 !ctx->fast_forward)
     500                 :           0 :                                 DecodeUpdate(ctx, buf);
     501                 :           0 :                         break;
     502                 :             : 
     503                 :             :                 case XLOG_HEAP_DELETE:
     504   [ #  #  #  # ]:           0 :                         if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     505                 :           0 :                                 !ctx->fast_forward)
     506                 :           0 :                                 DecodeDelete(ctx, buf);
     507                 :           0 :                         break;
     508                 :             : 
     509                 :             :                 case XLOG_HEAP_TRUNCATE:
     510   [ #  #  #  # ]:           0 :                         if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     511                 :           0 :                                 !ctx->fast_forward)
     512                 :           0 :                                 DecodeTruncate(ctx, buf);
     513                 :           0 :                         break;
     514                 :             : 
     515                 :             :                 case XLOG_HEAP_INPLACE:
     516                 :             : 
     517                 :             :                         /*
     518                 :             :                          * Inplace updates are only ever performed on catalog tuples and
     519                 :             :                          * can, per definition, not change tuple visibility.  Since we
     520                 :             :                          * also don't decode catalog tuples, we're not interested in the
     521                 :             :                          * record's contents.
     522                 :             :                          */
     523                 :             :                         break;
     524                 :             : 
     525                 :             :                 case XLOG_HEAP_CONFIRM:
     526   [ #  #  #  # ]:           0 :                         if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     527                 :           0 :                                 !ctx->fast_forward)
     528                 :           0 :                                 DecodeSpecConfirm(ctx, buf);
     529                 :           0 :                         break;
     530                 :             : 
     531                 :             :                 case XLOG_HEAP_LOCK:
     532                 :             :                         /* we don't care about row level locks for now */
     533                 :             :                         break;
     534                 :             : 
     535                 :             :                 default:
     536   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     537                 :           0 :                         break;
     538                 :             :         }
     539         [ #  # ]:           0 : }
     540                 :             : 
     541                 :             : /*
     542                 :             :  * Ask output plugin whether we want to skip this PREPARE and send
     543                 :             :  * this transaction as a regular commit later.
     544                 :             :  */
     545                 :             : static inline bool
     546                 :           0 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     547                 :             :                           const char *gid)
     548                 :             : {
     549                 :             :         /*
     550                 :             :          * Skip if decoding of two-phase transactions at PREPARE time is not
     551                 :             :          * enabled. In that case, all two-phase transactions are considered
     552                 :             :          * filtered out and will be applied as regular transactions at COMMIT
     553                 :             :          * PREPARED.
     554                 :             :          */
     555         [ #  # ]:           0 :         if (!ctx->twophase)
     556                 :           0 :                 return true;
     557                 :             : 
     558                 :             :         /*
     559                 :             :          * The filter_prepare callback is optional. When not supplied, all
     560                 :             :          * prepared transactions should go through.
     561                 :             :          */
     562         [ #  # ]:           0 :         if (ctx->callbacks.filter_prepare_cb == NULL)
     563                 :           0 :                 return false;
     564                 :             : 
     565                 :           0 :         return filter_prepare_cb_wrapper(ctx, xid, gid);
     566                 :           0 : }
     567                 :             : 
     568                 :             : static inline bool
     569                 :           0 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     570                 :             : {
     571         [ #  # ]:           0 :         if (ctx->callbacks.filter_by_origin_cb == NULL)
     572                 :           0 :                 return false;
     573                 :             : 
     574                 :           0 :         return filter_by_origin_cb_wrapper(ctx, origin_id);
     575                 :           0 : }
     576                 :             : 
     577                 :             : /*
     578                 :             :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     579                 :             :  */
     580                 :             : void
     581                 :           0 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     582                 :             : {
     583                 :           0 :         SnapBuild  *builder = ctx->snapshot_builder;
     584                 :           0 :         XLogReaderState *r = buf->record;
     585                 :           0 :         TransactionId xid = XLogRecGetXid(r);
     586                 :           0 :         uint8           info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     587                 :           0 :         RepOriginId origin_id = XLogRecGetOrigin(r);
     588                 :           0 :         Snapshot        snapshot = NULL;
     589                 :           0 :         xl_logical_message *message;
     590                 :             : 
     591         [ #  # ]:           0 :         if (info != XLOG_LOGICAL_MESSAGE)
     592   [ #  #  #  # ]:           0 :                 elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     593                 :             : 
     594                 :           0 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     595                 :             : 
     596                 :             :         /* If we don't have snapshot, there is no point in decoding messages */
     597         [ #  # ]:           0 :         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     598                 :           0 :                 return;
     599                 :             : 
     600                 :           0 :         message = (xl_logical_message *) XLogRecGetData(r);
     601                 :             : 
     602   [ #  #  #  # ]:           0 :         if (message->dbId != ctx->slot->data.database ||
     603                 :           0 :                 FilterByOrigin(ctx, origin_id))
     604                 :           0 :                 return;
     605                 :             : 
     606   [ #  #  #  # ]:           0 :         if (message->transactional &&
     607                 :           0 :                 !SnapBuildProcessChange(builder, xid, buf->origptr))
     608                 :           0 :                 return;
     609   [ #  #  #  # ]:           0 :         else if (!message->transactional &&
     610         [ #  # ]:           0 :                          (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     611                 :           0 :                           SnapBuildXactNeedsSkip(builder, buf->origptr)))
     612                 :           0 :                 return;
     613                 :             : 
     614                 :             :         /*
     615                 :             :          * We also skip decoding in fast_forward mode. This check must be last
     616                 :             :          * because we don't want to set the processing_required flag unless we
     617                 :             :          * have a decodable message.
     618                 :             :          */
     619         [ #  # ]:           0 :         if (ctx->fast_forward)
     620                 :             :         {
     621                 :             :                 /*
     622                 :             :                  * We need to set processing_required flag to notify the message's
     623                 :             :                  * existence to the caller. Usually, the flag is set when either the
     624                 :             :                  * COMMIT or ABORT records are decoded, but this must be turned on
     625                 :             :                  * here because the non-transactional logical message is decoded
     626                 :             :                  * without waiting for these records.
     627                 :             :                  */
     628         [ #  # ]:           0 :                 if (!message->transactional)
     629                 :           0 :                         ctx->processing_required = true;
     630                 :             : 
     631                 :           0 :                 return;
     632                 :             :         }
     633                 :             : 
     634                 :             :         /*
     635                 :             :          * If this is a non-transactional change, get the snapshot we're expected
     636                 :             :          * to use. We only get here when the snapshot is consistent, and the
     637                 :             :          * change is not meant to be skipped.
     638                 :             :          *
     639                 :             :          * For transactional changes we don't need a snapshot, we'll use the
     640                 :             :          * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     641                 :             :          */
     642         [ #  # ]:           0 :         if (!message->transactional)
     643                 :           0 :                 snapshot = SnapBuildGetOrBuildSnapshot(builder);
     644                 :             : 
     645                 :           0 :         ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     646                 :           0 :                                                           message->transactional,
     647                 :           0 :                                                           message->message, /* first part of message is
     648                 :             :                                                                                                  * prefix */
     649                 :           0 :                                                           message->message_size,
     650                 :           0 :                                                           message->message + message->prefix_size);
     651         [ #  # ]:           0 : }
     652                 :             : 
     653                 :             : /*
     654                 :             :  * Consolidated commit record handling between the different form of commit
     655                 :             :  * records.
     656                 :             :  *
     657                 :             :  * 'two_phase' indicates that caller wants to process the transaction in two
     658                 :             :  * phases, first process prepare if not already done and then process
     659                 :             :  * commit_prepared.
     660                 :             :  */
     661                 :             : static void
     662                 :           0 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     663                 :             :                          xl_xact_parsed_commit *parsed, TransactionId xid,
     664                 :             :                          bool two_phase)
     665                 :             : {
     666                 :           0 :         XLogRecPtr      origin_lsn = InvalidXLogRecPtr;
     667                 :           0 :         TimestampTz commit_time = parsed->xact_time;
     668                 :           0 :         RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     669                 :           0 :         int                     i;
     670                 :             : 
     671         [ #  # ]:           0 :         if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     672                 :             :         {
     673                 :           0 :                 origin_lsn = parsed->origin_lsn;
     674                 :           0 :                 commit_time = parsed->origin_timestamp;
     675                 :           0 :         }
     676                 :             : 
     677                 :           0 :         SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     678                 :           0 :                                            parsed->nsubxacts, parsed->subxacts,
     679                 :           0 :                                            parsed->xinfo);
     680                 :             : 
     681                 :             :         /* ----
     682                 :             :          * Check whether we are interested in this specific transaction, and tell
     683                 :             :          * the reorderbuffer to forget the content of the (sub-)transactions
     684                 :             :          * if not.
     685                 :             :          *
     686                 :             :          * We can't just use ReorderBufferAbort() here, because we need to execute
     687                 :             :          * the transaction's invalidations.  This currently won't be needed if
     688                 :             :          * we're just skipping over the transaction because currently we only do
     689                 :             :          * so during startup, to get to the first transaction the client needs. As
     690                 :             :          * we have reset the catalog caches before starting to read WAL, and we
     691                 :             :          * haven't yet touched any catalogs, there can't be anything to invalidate.
     692                 :             :          * But if we're "forgetting" this commit because it happened in another
     693                 :             :          * database, the invalidations might be important, because they could be
     694                 :             :          * for shared catalogs and we might have loaded data into the relevant
     695                 :             :          * syscaches.
     696                 :             :          * ---
     697                 :             :          */
     698         [ #  # ]:           0 :         if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     699                 :             :         {
     700         [ #  # ]:           0 :                 for (i = 0; i < parsed->nsubxacts; i++)
     701                 :             :                 {
     702                 :           0 :                         ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     703                 :           0 :                 }
     704                 :           0 :                 ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     705                 :             : 
     706                 :           0 :                 return;
     707                 :             :         }
     708                 :             : 
     709                 :             :         /* tell the reorderbuffer about the surviving subtransactions */
     710         [ #  # ]:           0 :         for (i = 0; i < parsed->nsubxacts; i++)
     711                 :             :         {
     712                 :           0 :                 ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     713                 :           0 :                                                                  buf->origptr, buf->endptr);
     714                 :           0 :         }
     715                 :             : 
     716                 :             :         /*
     717                 :             :          * Send the final commit record if the transaction data is already
     718                 :             :          * decoded, otherwise, process the entire transaction.
     719                 :             :          */
     720         [ #  # ]:           0 :         if (two_phase)
     721                 :             :         {
     722                 :           0 :                 ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     723                 :           0 :                                                                         SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     724                 :           0 :                                                                         commit_time, origin_id, origin_lsn,
     725                 :           0 :                                                                         parsed->twophase_gid, true);
     726                 :           0 :         }
     727                 :             :         else
     728                 :             :         {
     729                 :           0 :                 ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     730                 :           0 :                                                         commit_time, origin_id, origin_lsn);
     731                 :             :         }
     732                 :             : 
     733                 :             :         /*
     734                 :             :          * Update the decoding stats at transaction prepare/commit/abort.
     735                 :             :          * Additionally we send the stats when we spill or stream the changes to
     736                 :             :          * avoid losing them in case the decoding is interrupted. It is not clear
     737                 :             :          * that sending more or less frequently than this would be better.
     738                 :             :          */
     739                 :           0 :         UpdateDecodingStats(ctx);
     740         [ #  # ]:           0 : }
     741                 :             : 
     742                 :             : /*
     743                 :             :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     744                 :             :  *
     745                 :             :  * Note that we don't skip prepare even if have detected concurrent abort
     746                 :             :  * because it is quite possible that we had already sent some changes before we
     747                 :             :  * detect abort in which case we need to abort those changes in the subscriber.
     748                 :             :  * To abort such changes, we do send the prepare and then the rollback prepared
     749                 :             :  * which is what happened on the publisher-side as well. Now, we can invent a
     750                 :             :  * new abort API wherein in such cases we send abort and skip sending prepared
     751                 :             :  * and rollback prepared but then it is not that straightforward because we
     752                 :             :  * might have streamed this transaction by that time in which case it is
     753                 :             :  * handled when the rollback is encountered. It is not impossible to optimize
     754                 :             :  * the concurrent abort case but it can introduce design complexity w.r.t
     755                 :             :  * handling different cases so leaving it for now as it doesn't seem worth it.
     756                 :             :  */
     757                 :             : static void
     758                 :           0 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     759                 :             :                           xl_xact_parsed_prepare *parsed)
     760                 :             : {
     761                 :           0 :         SnapBuild  *builder = ctx->snapshot_builder;
     762                 :           0 :         XLogRecPtr      origin_lsn = parsed->origin_lsn;
     763                 :           0 :         TimestampTz prepare_time = parsed->xact_time;
     764                 :           0 :         RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     765                 :           0 :         int                     i;
     766                 :           0 :         TransactionId xid = parsed->twophase_xid;
     767                 :             : 
     768         [ #  # ]:           0 :         if (parsed->origin_timestamp != 0)
     769                 :           0 :                 prepare_time = parsed->origin_timestamp;
     770                 :             : 
     771                 :             :         /*
     772                 :             :          * Remember the prepare info for a txn so that it can be used later in
     773                 :             :          * commit prepared if required. See ReorderBufferFinishPrepared.
     774                 :             :          */
     775   [ #  #  #  # ]:           0 :         if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     776                 :           0 :                                                                                   buf->endptr, prepare_time, origin_id,
     777                 :           0 :                                                                                   origin_lsn))
     778                 :           0 :                 return;
     779                 :             : 
     780                 :             :         /* We can't start streaming unless a consistent state is reached. */
     781         [ #  # ]:           0 :         if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     782                 :             :         {
     783                 :           0 :                 ReorderBufferSkipPrepare(ctx->reorder, xid);
     784                 :           0 :                 return;
     785                 :             :         }
     786                 :             : 
     787                 :             :         /*
     788                 :             :          * Check whether we need to process this transaction. See
     789                 :             :          * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     790                 :             :          * transaction.
     791                 :             :          *
     792                 :             :          * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     793                 :             :          * hasn't yet been committed, removing this txn before a commit might
     794                 :             :          * result in the computation of an incorrect restart_lsn. See
     795                 :             :          * SnapBuildProcessRunningXacts. But we need to process cache
     796                 :             :          * invalidations if there are any for the reasons mentioned in
     797                 :             :          * DecodeCommit.
     798                 :             :          */
     799         [ #  # ]:           0 :         if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     800                 :             :         {
     801                 :           0 :                 ReorderBufferSkipPrepare(ctx->reorder, xid);
     802                 :           0 :                 ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     803                 :           0 :                 return;
     804                 :             :         }
     805                 :             : 
     806                 :             :         /* Tell the reorderbuffer about the surviving subtransactions. */
     807         [ #  # ]:           0 :         for (i = 0; i < parsed->nsubxacts; i++)
     808                 :             :         {
     809                 :           0 :                 ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     810                 :           0 :                                                                  buf->origptr, buf->endptr);
     811                 :           0 :         }
     812                 :             : 
     813                 :             :         /* replay actions of all transaction + subtransactions in order */
     814                 :           0 :         ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     815                 :             : 
     816                 :             :         /*
     817                 :             :          * Update the decoding stats at transaction prepare/commit/abort.
     818                 :             :          * Additionally we send the stats when we spill or stream the changes to
     819                 :             :          * avoid losing them in case the decoding is interrupted. It is not clear
     820                 :             :          * that sending more or less frequently than this would be better.
     821                 :             :          */
     822                 :           0 :         UpdateDecodingStats(ctx);
     823         [ #  # ]:           0 : }
     824                 :             : 
     825                 :             : 
     826                 :             : /*
     827                 :             :  * Get the data from the various forms of abort records and pass it on to
     828                 :             :  * snapbuild.c and reorderbuffer.c.
     829                 :             :  *
     830                 :             :  * 'two_phase' indicates to finish prepared transaction.
     831                 :             :  */
     832                 :             : static void
     833                 :           0 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     834                 :             :                         xl_xact_parsed_abort *parsed, TransactionId xid,
     835                 :             :                         bool two_phase)
     836                 :             : {
     837                 :           0 :         int                     i;
     838                 :           0 :         XLogRecPtr      origin_lsn = InvalidXLogRecPtr;
     839                 :           0 :         TimestampTz abort_time = parsed->xact_time;
     840                 :           0 :         RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     841                 :           0 :         bool            skip_xact;
     842                 :             : 
     843         [ #  # ]:           0 :         if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     844                 :             :         {
     845                 :           0 :                 origin_lsn = parsed->origin_lsn;
     846                 :           0 :                 abort_time = parsed->origin_timestamp;
     847                 :           0 :         }
     848                 :             : 
     849                 :             :         /*
     850                 :             :          * Check whether we need to process this transaction. See
     851                 :             :          * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     852                 :             :          * transaction.
     853                 :             :          */
     854                 :           0 :         skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     855                 :             : 
     856                 :             :         /*
     857                 :             :          * Send the final rollback record for a prepared transaction unless we
     858                 :             :          * need to skip it. For non-two-phase xacts, simply forget the xact.
     859                 :             :          */
     860   [ #  #  #  # ]:           0 :         if (two_phase && !skip_xact)
     861                 :             :         {
     862                 :           0 :                 ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     863                 :             :                                                                         InvalidXLogRecPtr,
     864                 :           0 :                                                                         abort_time, origin_id, origin_lsn,
     865                 :           0 :                                                                         parsed->twophase_gid, false);
     866                 :           0 :         }
     867                 :             :         else
     868                 :             :         {
     869         [ #  # ]:           0 :                 for (i = 0; i < parsed->nsubxacts; i++)
     870                 :             :                 {
     871                 :           0 :                         ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     872                 :           0 :                                                            buf->record->EndRecPtr, abort_time);
     873                 :           0 :                 }
     874                 :             : 
     875                 :           0 :                 ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     876                 :           0 :                                                    abort_time);
     877                 :             :         }
     878                 :             : 
     879                 :             :         /* update the decoding stats */
     880                 :           0 :         UpdateDecodingStats(ctx);
     881                 :           0 : }
     882                 :             : 
     883                 :             : /*
     884                 :             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     885                 :             :  *
     886                 :             :  * Inserts can contain the new tuple.
     887                 :             :  */
     888                 :             : static void
     889                 :           0 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     890                 :             : {
     891                 :           0 :         Size            datalen;
     892                 :           0 :         char       *tupledata;
     893                 :           0 :         Size            tuplelen;
     894                 :           0 :         XLogReaderState *r = buf->record;
     895                 :           0 :         xl_heap_insert *xlrec;
     896                 :           0 :         ReorderBufferChange *change;
     897                 :           0 :         RelFileLocator target_locator;
     898                 :             : 
     899                 :           0 :         xlrec = (xl_heap_insert *) XLogRecGetData(r);
     900                 :             : 
     901                 :             :         /*
     902                 :             :          * Ignore insert records without new tuples (this does happen when
     903                 :             :          * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     904                 :             :          */
     905         [ #  # ]:           0 :         if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     906                 :           0 :                 return;
     907                 :             : 
     908                 :             :         /* only interested in our database */
     909                 :           0 :         XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     910         [ #  # ]:           0 :         if (target_locator.dbOid != ctx->slot->data.database)
     911                 :           0 :                 return;
     912                 :             : 
     913                 :             :         /* output plugin doesn't look for this origin, no need to queue */
     914         [ #  # ]:           0 :         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     915                 :           0 :                 return;
     916                 :             : 
     917                 :           0 :         change = ReorderBufferAllocChange(ctx->reorder);
     918         [ #  # ]:           0 :         if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     919                 :           0 :                 change->action = REORDER_BUFFER_CHANGE_INSERT;
     920                 :             :         else
     921                 :           0 :                 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     922                 :           0 :         change->origin_id = XLogRecGetOrigin(r);
     923                 :             : 
     924                 :           0 :         memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     925                 :             : 
     926                 :           0 :         tupledata = XLogRecGetBlockData(r, 0, &datalen);
     927                 :           0 :         tuplelen = datalen - SizeOfHeapHeader;
     928                 :             : 
     929                 :           0 :         change->data.tp.newtuple =
     930                 :           0 :                 ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     931                 :             : 
     932                 :           0 :         DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     933                 :             : 
     934                 :           0 :         change->data.tp.clear_toast_afterwards = true;
     935                 :             : 
     936                 :           0 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     937                 :           0 :                                                          change,
     938                 :           0 :                                                          xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     939         [ #  # ]:           0 : }
     940                 :             : 
     941                 :             : /*
     942                 :             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     943                 :             :  * in the record, from wal into proper tuplebufs.
     944                 :             :  *
     945                 :             :  * Updates can possibly contain a new tuple and the old primary key.
     946                 :             :  */
     947                 :             : static void
     948                 :           0 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     949                 :             : {
     950                 :           0 :         XLogReaderState *r = buf->record;
     951                 :           0 :         xl_heap_update *xlrec;
     952                 :           0 :         ReorderBufferChange *change;
     953                 :           0 :         char       *data;
     954                 :           0 :         RelFileLocator target_locator;
     955                 :             : 
     956                 :           0 :         xlrec = (xl_heap_update *) XLogRecGetData(r);
     957                 :             : 
     958                 :             :         /* only interested in our database */
     959                 :           0 :         XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     960         [ #  # ]:           0 :         if (target_locator.dbOid != ctx->slot->data.database)
     961                 :           0 :                 return;
     962                 :             : 
     963                 :             :         /* output plugin doesn't look for this origin, no need to queue */
     964         [ #  # ]:           0 :         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     965                 :           0 :                 return;
     966                 :             : 
     967                 :           0 :         change = ReorderBufferAllocChange(ctx->reorder);
     968                 :           0 :         change->action = REORDER_BUFFER_CHANGE_UPDATE;
     969                 :           0 :         change->origin_id = XLogRecGetOrigin(r);
     970                 :           0 :         memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     971                 :             : 
     972         [ #  # ]:           0 :         if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     973                 :             :         {
     974                 :           0 :                 Size            datalen;
     975                 :           0 :                 Size            tuplelen;
     976                 :             : 
     977                 :           0 :                 data = XLogRecGetBlockData(r, 0, &datalen);
     978                 :             : 
     979                 :           0 :                 tuplelen = datalen - SizeOfHeapHeader;
     980                 :             : 
     981                 :           0 :                 change->data.tp.newtuple =
     982                 :           0 :                         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     983                 :             : 
     984                 :           0 :                 DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     985                 :           0 :         }
     986                 :             : 
     987         [ #  # ]:           0 :         if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
     988                 :             :         {
     989                 :           0 :                 Size            datalen;
     990                 :           0 :                 Size            tuplelen;
     991                 :             : 
     992                 :             :                 /* caution, remaining data in record is not aligned */
     993                 :           0 :                 data = XLogRecGetData(r) + SizeOfHeapUpdate;
     994                 :           0 :                 datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
     995                 :           0 :                 tuplelen = datalen - SizeOfHeapHeader;
     996                 :             : 
     997                 :           0 :                 change->data.tp.oldtuple =
     998                 :           0 :                         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     999                 :             : 
    1000                 :           0 :                 DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1001                 :           0 :         }
    1002                 :             : 
    1003                 :           0 :         change->data.tp.clear_toast_afterwards = true;
    1004                 :             : 
    1005                 :           0 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1006                 :           0 :                                                          change, false);
    1007         [ #  # ]:           0 : }
    1008                 :             : 
    1009                 :             : /*
    1010                 :             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1011                 :             :  *
    1012                 :             :  * Deletes can possibly contain the old primary key.
    1013                 :             :  */
    1014                 :             : static void
    1015                 :           0 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1016                 :             : {
    1017                 :           0 :         XLogReaderState *r = buf->record;
    1018                 :           0 :         xl_heap_delete *xlrec;
    1019                 :           0 :         ReorderBufferChange *change;
    1020                 :           0 :         RelFileLocator target_locator;
    1021                 :             : 
    1022                 :           0 :         xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1023                 :             : 
    1024                 :             :         /* only interested in our database */
    1025                 :           0 :         XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1026         [ #  # ]:           0 :         if (target_locator.dbOid != ctx->slot->data.database)
    1027                 :           0 :                 return;
    1028                 :             : 
    1029                 :             :         /* output plugin doesn't look for this origin, no need to queue */
    1030         [ #  # ]:           0 :         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1031                 :           0 :                 return;
    1032                 :             : 
    1033                 :           0 :         change = ReorderBufferAllocChange(ctx->reorder);
    1034                 :             : 
    1035         [ #  # ]:           0 :         if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1036                 :           0 :                 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1037                 :             :         else
    1038                 :           0 :                 change->action = REORDER_BUFFER_CHANGE_DELETE;
    1039                 :             : 
    1040                 :           0 :         change->origin_id = XLogRecGetOrigin(r);
    1041                 :             : 
    1042                 :           0 :         memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1043                 :             : 
    1044                 :             :         /* old primary key stored */
    1045         [ #  # ]:           0 :         if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1046                 :             :         {
    1047                 :           0 :                 Size            datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1048                 :           0 :                 Size            tuplelen = datalen - SizeOfHeapHeader;
    1049                 :             : 
    1050         [ #  # ]:           0 :                 Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1051                 :             : 
    1052                 :           0 :                 change->data.tp.oldtuple =
    1053                 :           0 :                         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1054                 :             : 
    1055                 :           0 :                 DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1056                 :           0 :                                                 datalen, change->data.tp.oldtuple);
    1057                 :           0 :         }
    1058                 :             : 
    1059                 :           0 :         change->data.tp.clear_toast_afterwards = true;
    1060                 :             : 
    1061                 :           0 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1062                 :           0 :                                                          change, false);
    1063         [ #  # ]:           0 : }
    1064                 :             : 
    1065                 :             : /*
    1066                 :             :  * Parse XLOG_HEAP_TRUNCATE from wal
    1067                 :             :  */
    1068                 :             : static void
    1069                 :           0 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1070                 :             : {
    1071                 :           0 :         XLogReaderState *r = buf->record;
    1072                 :           0 :         xl_heap_truncate *xlrec;
    1073                 :           0 :         ReorderBufferChange *change;
    1074                 :             : 
    1075                 :           0 :         xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1076                 :             : 
    1077                 :             :         /* only interested in our database */
    1078         [ #  # ]:           0 :         if (xlrec->dbId != ctx->slot->data.database)
    1079                 :           0 :                 return;
    1080                 :             : 
    1081                 :             :         /* output plugin doesn't look for this origin, no need to queue */
    1082         [ #  # ]:           0 :         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1083                 :           0 :                 return;
    1084                 :             : 
    1085                 :           0 :         change = ReorderBufferAllocChange(ctx->reorder);
    1086                 :           0 :         change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1087                 :           0 :         change->origin_id = XLogRecGetOrigin(r);
    1088         [ #  # ]:           0 :         if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1089                 :           0 :                 change->data.truncate.cascade = true;
    1090         [ #  # ]:           0 :         if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1091                 :           0 :                 change->data.truncate.restart_seqs = true;
    1092                 :           0 :         change->data.truncate.nrelids = xlrec->nrelids;
    1093                 :           0 :         change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1094                 :           0 :                                                                                                                         xlrec->nrelids);
    1095                 :           0 :         memcpy(change->data.truncate.relids, xlrec->relids,
    1096                 :             :                    xlrec->nrelids * sizeof(Oid));
    1097                 :           0 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1098                 :           0 :                                                          buf->origptr, change, false);
    1099         [ #  # ]:           0 : }
    1100                 :             : 
    1101                 :             : /*
    1102                 :             :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1103                 :             :  *
    1104                 :             :  * Currently MULTI_INSERT will always contain the full tuples.
    1105                 :             :  */
    1106                 :             : static void
    1107                 :           0 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1108                 :             : {
    1109                 :           0 :         XLogReaderState *r = buf->record;
    1110                 :           0 :         xl_heap_multi_insert *xlrec;
    1111                 :           0 :         int                     i;
    1112                 :           0 :         char       *data;
    1113                 :           0 :         char       *tupledata;
    1114                 :           0 :         Size            tuplelen;
    1115                 :           0 :         RelFileLocator rlocator;
    1116                 :             : 
    1117                 :           0 :         xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1118                 :             : 
    1119                 :             :         /*
    1120                 :             :          * Ignore insert records without new tuples.  This happens when a
    1121                 :             :          * multi_insert is done on a catalog or on a non-persistent relation.
    1122                 :             :          */
    1123         [ #  # ]:           0 :         if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1124                 :           0 :                 return;
    1125                 :             : 
    1126                 :             :         /* only interested in our database */
    1127                 :           0 :         XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1128         [ #  # ]:           0 :         if (rlocator.dbOid != ctx->slot->data.database)
    1129                 :           0 :                 return;
    1130                 :             : 
    1131                 :             :         /* output plugin doesn't look for this origin, no need to queue */
    1132         [ #  # ]:           0 :         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1133                 :           0 :                 return;
    1134                 :             : 
    1135                 :             :         /*
    1136                 :             :          * We know that this multi_insert isn't for a catalog, so the block should
    1137                 :             :          * always have data even if a full-page write of it is taken.
    1138                 :             :          */
    1139                 :           0 :         tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1140         [ #  # ]:           0 :         Assert(tupledata != NULL);
    1141                 :             : 
    1142                 :           0 :         data = tupledata;
    1143         [ #  # ]:           0 :         for (i = 0; i < xlrec->ntuples; i++)
    1144                 :             :         {
    1145                 :           0 :                 ReorderBufferChange *change;
    1146                 :           0 :                 xl_multi_insert_tuple *xlhdr;
    1147                 :           0 :                 int                     datalen;
    1148                 :           0 :                 HeapTuple       tuple;
    1149                 :           0 :                 HeapTupleHeader header;
    1150                 :             : 
    1151                 :           0 :                 change = ReorderBufferAllocChange(ctx->reorder);
    1152                 :           0 :                 change->action = REORDER_BUFFER_CHANGE_INSERT;
    1153                 :           0 :                 change->origin_id = XLogRecGetOrigin(r);
    1154                 :             : 
    1155                 :           0 :                 memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1156                 :             : 
    1157                 :           0 :                 xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1158                 :           0 :                 data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1159                 :           0 :                 datalen = xlhdr->datalen;
    1160                 :             : 
    1161                 :           0 :                 change->data.tp.newtuple =
    1162                 :           0 :                         ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1163                 :             : 
    1164                 :           0 :                 tuple = change->data.tp.newtuple;
    1165                 :           0 :                 header = tuple->t_data;
    1166                 :             : 
    1167                 :             :                 /* not a disk based tuple */
    1168                 :           0 :                 ItemPointerSetInvalid(&tuple->t_self);
    1169                 :             : 
    1170                 :             :                 /*
    1171                 :             :                  * We can only figure this out after reassembling the transactions.
    1172                 :             :                  */
    1173                 :           0 :                 tuple->t_tableOid = InvalidOid;
    1174                 :             : 
    1175                 :           0 :                 tuple->t_len = datalen + SizeofHeapTupleHeader;
    1176                 :             : 
    1177                 :           0 :                 memset(header, 0, SizeofHeapTupleHeader);
    1178                 :             : 
    1179                 :           0 :                 memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1180                 :           0 :                 header->t_infomask = xlhdr->t_infomask;
    1181                 :           0 :                 header->t_infomask2 = xlhdr->t_infomask2;
    1182                 :           0 :                 header->t_hoff = xlhdr->t_hoff;
    1183                 :             : 
    1184                 :             :                 /*
    1185                 :             :                  * Reset toast reassembly state only after the last row in the last
    1186                 :             :                  * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1187                 :             :                  * call.
    1188                 :             :                  */
    1189   [ #  #  #  # ]:           0 :                 if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1190                 :           0 :                         (i + 1) == xlrec->ntuples)
    1191                 :           0 :                         change->data.tp.clear_toast_afterwards = true;
    1192                 :             :                 else
    1193                 :           0 :                         change->data.tp.clear_toast_afterwards = false;
    1194                 :             : 
    1195                 :           0 :                 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1196                 :           0 :                                                                  buf->origptr, change, false);
    1197                 :             : 
    1198                 :             :                 /* move to the next xl_multi_insert_tuple entry */
    1199                 :           0 :                 data += datalen;
    1200                 :           0 :         }
    1201         [ #  # ]:           0 :         Assert(data == tupledata + tuplelen);
    1202         [ #  # ]:           0 : }
    1203                 :             : 
    1204                 :             : /*
    1205                 :             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1206                 :             :  *
    1207                 :             :  * This is pretty trivial, all the state essentially already setup by the
    1208                 :             :  * speculative insertion.
    1209                 :             :  */
    1210                 :             : static void
    1211                 :           0 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1212                 :             : {
    1213                 :           0 :         XLogReaderState *r = buf->record;
    1214                 :           0 :         ReorderBufferChange *change;
    1215                 :           0 :         RelFileLocator target_locator;
    1216                 :             : 
    1217                 :             :         /* only interested in our database */
    1218                 :           0 :         XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1219         [ #  # ]:           0 :         if (target_locator.dbOid != ctx->slot->data.database)
    1220                 :           0 :                 return;
    1221                 :             : 
    1222                 :             :         /* output plugin doesn't look for this origin, no need to queue */
    1223         [ #  # ]:           0 :         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1224                 :           0 :                 return;
    1225                 :             : 
    1226                 :           0 :         change = ReorderBufferAllocChange(ctx->reorder);
    1227                 :           0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1228                 :           0 :         change->origin_id = XLogRecGetOrigin(r);
    1229                 :             : 
    1230                 :           0 :         memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1231                 :             : 
    1232                 :           0 :         change->data.tp.clear_toast_afterwards = true;
    1233                 :             : 
    1234                 :           0 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1235                 :           0 :                                                          change, false);
    1236         [ #  # ]:           0 : }
    1237                 :             : 
    1238                 :             : 
    1239                 :             : /*
    1240                 :             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1241                 :             :  * (but not by heap_multi_insert) into a tuplebuf.
    1242                 :             :  *
    1243                 :             :  * The size 'len' and the pointer 'data' in the record need to be
    1244                 :             :  * computed outside as they are record specific.
    1245                 :             :  */
    1246                 :             : static void
    1247                 :           0 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1248                 :             : {
    1249                 :           0 :         xl_heap_header xlhdr;
    1250                 :           0 :         int                     datalen = len - SizeOfHeapHeader;
    1251                 :           0 :         HeapTupleHeader header;
    1252                 :             : 
    1253         [ #  # ]:           0 :         Assert(datalen >= 0);
    1254                 :             : 
    1255                 :           0 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1256                 :           0 :         header = tuple->t_data;
    1257                 :             : 
    1258                 :             :         /* not a disk based tuple */
    1259                 :           0 :         ItemPointerSetInvalid(&tuple->t_self);
    1260                 :             : 
    1261                 :             :         /* we can only figure this out after reassembling the transactions */
    1262                 :           0 :         tuple->t_tableOid = InvalidOid;
    1263                 :             : 
    1264                 :             :         /* data is not stored aligned, copy to aligned storage */
    1265                 :           0 :         memcpy(&xlhdr, data, SizeOfHeapHeader);
    1266                 :             : 
    1267                 :           0 :         memset(header, 0, SizeofHeapTupleHeader);
    1268                 :             : 
    1269                 :           0 :         memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1270                 :             :                    data + SizeOfHeapHeader,
    1271                 :             :                    datalen);
    1272                 :             : 
    1273                 :           0 :         header->t_infomask = xlhdr.t_infomask;
    1274                 :           0 :         header->t_infomask2 = xlhdr.t_infomask2;
    1275                 :           0 :         header->t_hoff = xlhdr.t_hoff;
    1276                 :           0 : }
    1277                 :             : 
    1278                 :             : /*
    1279                 :             :  * Check whether we are interested in this specific transaction.
    1280                 :             :  *
    1281                 :             :  * There can be several reasons we might not be interested in this
    1282                 :             :  * transaction:
    1283                 :             :  * 1) We might not be interested in decoding transactions up to this
    1284                 :             :  *        LSN. This can happen because we previously decoded it and now just
    1285                 :             :  *        are restarting or if we haven't assembled a consistent snapshot yet.
    1286                 :             :  * 2) The transaction happened in another database.
    1287                 :             :  * 3) The output plugin is not interested in the origin.
    1288                 :             :  * 4) We are doing fast-forwarding
    1289                 :             :  */
    1290                 :             : static bool
    1291                 :           0 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1292                 :             :                                   Oid txn_dbid, RepOriginId origin_id)
    1293                 :             : {
    1294         [ #  # ]:           0 :         if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1295   [ #  #  #  # ]:           0 :                 (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1296                 :           0 :                 FilterByOrigin(ctx, origin_id))
    1297                 :           0 :                 return true;
    1298                 :             : 
    1299                 :             :         /*
    1300                 :             :          * We also skip decoding in fast_forward mode. In passing set the
    1301                 :             :          * processing_required flag to indicate that if it were not for
    1302                 :             :          * fast_forward mode, processing would have been required.
    1303                 :             :          */
    1304         [ #  # ]:           0 :         if (ctx->fast_forward)
    1305                 :             :         {
    1306                 :           0 :                 ctx->processing_required = true;
    1307                 :           0 :                 return true;
    1308                 :             :         }
    1309                 :             : 
    1310                 :           0 :         return false;
    1311                 :           0 : }
        

Generated by: LCOV version 2.3.2-1