LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 942 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 42 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pgoutput.c
       4              :  *              Logical Replication output plugin
       5              :  *
       6              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *                src/backend/replication/pgoutput/pgoutput.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #include "access/tupconvert.h"
      16              : #include "catalog/partition.h"
      17              : #include "catalog/pg_publication.h"
      18              : #include "catalog/pg_publication_rel.h"
      19              : #include "catalog/pg_subscription.h"
      20              : #include "commands/defrem.h"
      21              : #include "commands/subscriptioncmds.h"
      22              : #include "executor/executor.h"
      23              : #include "fmgr.h"
      24              : #include "nodes/makefuncs.h"
      25              : #include "parser/parse_relation.h"
      26              : #include "replication/logical.h"
      27              : #include "replication/logicalproto.h"
      28              : #include "replication/origin.h"
      29              : #include "replication/pgoutput.h"
      30              : #include "rewrite/rewriteHandler.h"
      31              : #include "utils/builtins.h"
      32              : #include "utils/inval.h"
      33              : #include "utils/lsyscache.h"
      34              : #include "utils/memutils.h"
      35              : #include "utils/rel.h"
      36              : #include "utils/syscache.h"
      37              : #include "utils/varlena.h"
      38              : 
      39            0 : PG_MODULE_MAGIC_EXT(
      40              :                                         .name = "pgoutput",
      41              :                                         .version = PG_VERSION
      42              : );
      43              : 
      44              : static void pgoutput_startup(LogicalDecodingContext *ctx,
      45              :                                                          OutputPluginOptions *opt, bool is_init);
      46              : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      47              : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      48              :                                                            ReorderBufferTXN *txn);
      49              : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      50              :                                                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      51              : static void pgoutput_change(LogicalDecodingContext *ctx,
      52              :                                                         ReorderBufferTXN *txn, Relation relation,
      53              :                                                         ReorderBufferChange *change);
      54              : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      55              :                                                           ReorderBufferTXN *txn, int nrelations, Relation relations[],
      56              :                                                           ReorderBufferChange *change);
      57              : static void pgoutput_message(LogicalDecodingContext *ctx,
      58              :                                                          ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      59              :                                                          bool transactional, const char *prefix,
      60              :                                                          Size sz, const char *message);
      61              : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      62              :                                                                    RepOriginId origin_id);
      63              : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
      64              :                                                                            ReorderBufferTXN *txn);
      65              : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
      66              :                                                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      67              : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
      68              :                                                                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      69              : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
      70              :                                                                                    ReorderBufferTXN *txn,
      71              :                                                                                    XLogRecPtr prepare_end_lsn,
      72              :                                                                                    TimestampTz prepare_time);
      73              : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      74              :                                                                   ReorderBufferTXN *txn);
      75              : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      76              :                                                                  ReorderBufferTXN *txn);
      77              : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      78              :                                                                   ReorderBufferTXN *txn,
      79              :                                                                   XLogRecPtr abort_lsn);
      80              : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      81              :                                                                    ReorderBufferTXN *txn,
      82              :                                                                    XLogRecPtr commit_lsn);
      83              : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
      84              :                                                                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      85              : 
      86              : static bool publications_valid;
      87              : 
      88              : static List *LoadPublications(List *pubnames);
      89              : static void publication_invalidation_cb(Datum arg, int cacheid,
      90              :                                                                                 uint32 hashvalue);
      91              : static void send_repl_origin(LogicalDecodingContext *ctx,
      92              :                                                          RepOriginId origin_id, XLogRecPtr origin_lsn,
      93              :                                                          bool send_origin);
      94              : 
      95              : /*
      96              :  * Only 3 publication actions are used for row filtering ("insert", "update",
      97              :  * "delete"). See RelationSyncEntry.exprstate[].
      98              :  */
      99              : enum RowFilterPubAction
     100              : {
     101              :         PUBACTION_INSERT,
     102              :         PUBACTION_UPDATE,
     103              :         PUBACTION_DELETE,
     104              : };
     105              : 
     106              : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
     107              : 
     108              : /*
     109              :  * Entry in the map used to remember which relation schemas we sent.
     110              :  *
     111              :  * The schema_sent flag determines if the current schema record for the
     112              :  * relation (and for its ancestor if publish_as_relid is set) was already
     113              :  * sent to the subscriber (in which case we don't need to send it again).
     114              :  *
     115              :  * The schema cache on downstream is however updated only at commit time,
     116              :  * and with streamed transactions the commit order may be different from
     117              :  * the order the transactions are sent in. Also, the (sub) transactions
     118              :  * might get aborted so we need to send the schema for each (sub) transaction
     119              :  * so that we don't lose the schema information on abort. For handling this,
     120              :  * we maintain the list of xids (streamed_txns) for those we have already sent
     121              :  * the schema.
     122              :  *
     123              :  * For partitions, 'pubactions' considers not only the table's own
     124              :  * publications, but also those of all of its ancestors.
     125              :  */
     126              : typedef struct RelationSyncEntry
     127              : {
     128              :         Oid                     relid;                  /* relation oid */
     129              : 
     130              :         bool            replicate_valid;        /* overall validity flag for entry */
     131              : 
     132              :         bool            schema_sent;
     133              : 
     134              :         /*
     135              :          * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
     136              :          * columns and the 'publish_generated_columns' parameter is set to
     137              :          * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
     138              :          * indicating that no generated columns should be published, unless
     139              :          * explicitly specified in the column list.
     140              :          */
     141              :         PublishGencolsType include_gencols_type;
     142              :         List       *streamed_txns;      /* streamed toplevel transactions with this
     143              :                                                                  * schema */
     144              : 
     145              :         /* are we publishing this rel? */
     146              :         PublicationActions pubactions;
     147              : 
     148              :         /*
     149              :          * ExprState array for row filter. Different publication actions don't
     150              :          * allow multiple expressions to always be combined into one, because
     151              :          * updates or deletes restrict the column in expression to be part of the
     152              :          * replica identity index whereas inserts do not have this restriction, so
     153              :          * there is one ExprState per publication action.
     154              :          */
     155              :         ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
     156              :         EState     *estate;                     /* executor state used for row filter */
     157              :         TupleTableSlot *new_slot;       /* slot for storing new tuple */
     158              :         TupleTableSlot *old_slot;       /* slot for storing old tuple */
     159              : 
     160              :         /*
     161              :          * OID of the relation to publish changes as.  For a partition, this may
     162              :          * be set to one of its ancestors whose schema will be used when
     163              :          * replicating changes, if publish_via_partition_root is set for the
     164              :          * publication.
     165              :          */
     166              :         Oid                     publish_as_relid;
     167              : 
     168              :         /*
     169              :          * Map used when replicating using an ancestor's schema to convert tuples
     170              :          * from partition's type to the ancestor's; NULL if publish_as_relid is
     171              :          * same as 'relid' or if unnecessary due to partition and the ancestor
     172              :          * having identical TupleDesc.
     173              :          */
     174              :         AttrMap    *attrmap;
     175              : 
     176              :         /*
     177              :          * Columns included in the publication, or NULL if all columns are
     178              :          * included implicitly.  Note that the attnums in this bitmap are not
     179              :          * shifted by FirstLowInvalidHeapAttributeNumber.
     180              :          */
     181              :         Bitmapset  *columns;
     182              : 
     183              :         /*
     184              :          * Private context to store additional data for this entry - state for the
     185              :          * row filter expressions, column list, etc.
     186              :          */
     187              :         MemoryContext entry_cxt;
     188              : } RelationSyncEntry;
     189              : 
     190              : /*
     191              :  * Maintain a per-transaction level variable to track whether the transaction
     192              :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
     193              :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
     194              :  * messages for empty transactions which saves network bandwidth.
     195              :  *
     196              :  * This optimization is not used for prepared transactions because if the
     197              :  * WALSender restarts after prepare of a transaction and before commit prepared
     198              :  * of the same transaction then we won't be able to figure out if we have
     199              :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
     200              :  * because we would have lost the in-memory txndata information that was
     201              :  * present prior to the restart. This will result in sending a spurious
     202              :  * COMMIT PREPARED without a corresponding prepared transaction at the
     203              :  * downstream which would lead to an error when it tries to process it.
     204              :  *
     205              :  * XXX We could achieve this optimization by changing protocol to send
     206              :  * additional information so that downstream can detect that the corresponding
     207              :  * prepare has not been sent. However, adding such a check for every
     208              :  * transaction in the downstream could be costly so we might want to do it
     209              :  * optionally.
     210              :  *
     211              :  * We also don't have this optimization for streamed transactions because
     212              :  * they can contain prepared transactions.
     213              :  */
     214              : typedef struct PGOutputTxnData
     215              : {
     216              :         bool            sent_begin_txn; /* flag indicating whether BEGIN has been sent */
     217              : } PGOutputTxnData;
     218              : 
     219              : /* Map used to remember which relation schemas we sent. */
     220              : static HTAB *RelationSyncCache = NULL;
     221              : 
     222              : static void init_rel_sync_cache(MemoryContext cachectx);
     223              : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     224              : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
     225              :                                                                                          Relation relation);
     226              : static void send_relation_and_attrs(Relation relation, TransactionId xid,
     227              :                                                                         LogicalDecodingContext *ctx,
     228              :                                                                         RelationSyncEntry *relentry);
     229              : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     230              : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     231              :                                                                                   uint32 hashvalue);
     232              : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     233              :                                                                                         TransactionId xid);
     234              : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     235              :                                                                                         TransactionId xid);
     236              : static void init_tuple_slot(PGOutputData *data, Relation relation,
     237              :                                                         RelationSyncEntry *entry);
     238              : static void pgoutput_memory_context_reset(void *arg);
     239              : 
     240              : /* row filter routines */
     241              : static EState *create_estate_for_relation(Relation rel);
     242              : static void pgoutput_row_filter_init(PGOutputData *data,
     243              :                                                                          List *publications,
     244              :                                                                          RelationSyncEntry *entry);
     245              : static bool pgoutput_row_filter_exec_expr(ExprState *state,
     246              :                                                                                   ExprContext *econtext);
     247              : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
     248              :                                                                 TupleTableSlot **new_slot_ptr,
     249              :                                                                 RelationSyncEntry *entry,
     250              :                                                                 ReorderBufferChangeType *action);
     251              : 
     252              : /* column list routines */
     253              : static void pgoutput_column_list_init(PGOutputData *data,
     254              :                                                                           List *publications,
     255              :                                                                           RelationSyncEntry *entry);
     256              : 
     257              : /*
     258              :  * Specify output plugin callbacks
     259              :  */
     260              : void
     261            0 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     262              : {
     263            0 :         cb->startup_cb = pgoutput_startup;
     264            0 :         cb->begin_cb = pgoutput_begin_txn;
     265            0 :         cb->change_cb = pgoutput_change;
     266            0 :         cb->truncate_cb = pgoutput_truncate;
     267            0 :         cb->message_cb = pgoutput_message;
     268            0 :         cb->commit_cb = pgoutput_commit_txn;
     269              : 
     270            0 :         cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
     271            0 :         cb->prepare_cb = pgoutput_prepare_txn;
     272            0 :         cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
     273            0 :         cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
     274            0 :         cb->filter_by_origin_cb = pgoutput_origin_filter;
     275            0 :         cb->shutdown_cb = pgoutput_shutdown;
     276              : 
     277              :         /* transaction streaming */
     278            0 :         cb->stream_start_cb = pgoutput_stream_start;
     279            0 :         cb->stream_stop_cb = pgoutput_stream_stop;
     280            0 :         cb->stream_abort_cb = pgoutput_stream_abort;
     281            0 :         cb->stream_commit_cb = pgoutput_stream_commit;
     282            0 :         cb->stream_change_cb = pgoutput_change;
     283            0 :         cb->stream_message_cb = pgoutput_message;
     284            0 :         cb->stream_truncate_cb = pgoutput_truncate;
     285              :         /* transaction streaming - two-phase commit */
     286            0 :         cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
     287            0 : }
     288              : 
     289              : static void
     290            0 : parse_output_parameters(List *options, PGOutputData *data)
     291              : {
     292            0 :         ListCell   *lc;
     293            0 :         bool            protocol_version_given = false;
     294            0 :         bool            publication_names_given = false;
     295            0 :         bool            binary_option_given = false;
     296            0 :         bool            messages_option_given = false;
     297            0 :         bool            streaming_given = false;
     298            0 :         bool            two_phase_option_given = false;
     299            0 :         bool            origin_option_given = false;
     300              : 
     301              :         /* Initialize optional parameters to defaults */
     302            0 :         data->binary = false;
     303            0 :         data->streaming = LOGICALREP_STREAM_OFF;
     304            0 :         data->messages = false;
     305            0 :         data->two_phase = false;
     306            0 :         data->publish_no_origin = false;
     307              : 
     308            0 :         foreach(lc, options)
     309              :         {
     310            0 :                 DefElem    *defel = (DefElem *) lfirst(lc);
     311              : 
     312            0 :                 Assert(defel->arg == NULL || IsA(defel->arg, String));
     313              : 
     314              :                 /* Check each param, whether or not we recognize it */
     315            0 :                 if (strcmp(defel->defname, "proto_version") == 0)
     316              :                 {
     317            0 :                         unsigned long parsed;
     318            0 :                         char       *endptr;
     319              : 
     320            0 :                         if (protocol_version_given)
     321            0 :                                 ereport(ERROR,
     322              :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     323              :                                                  errmsg("conflicting or redundant options")));
     324            0 :                         protocol_version_given = true;
     325              : 
     326            0 :                         errno = 0;
     327            0 :                         parsed = strtoul(strVal(defel->arg), &endptr, 10);
     328            0 :                         if (errno != 0 || *endptr != '\0')
     329            0 :                                 ereport(ERROR,
     330              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     331              :                                                  errmsg("invalid proto_version")));
     332              : 
     333            0 :                         if (parsed > PG_UINT32_MAX)
     334            0 :                                 ereport(ERROR,
     335              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     336              :                                                  errmsg("proto_version \"%s\" out of range",
     337              :                                                                 strVal(defel->arg))));
     338              : 
     339            0 :                         data->protocol_version = (uint32) parsed;
     340            0 :                 }
     341            0 :                 else if (strcmp(defel->defname, "publication_names") == 0)
     342              :                 {
     343            0 :                         if (publication_names_given)
     344            0 :                                 ereport(ERROR,
     345              :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     346              :                                                  errmsg("conflicting or redundant options")));
     347            0 :                         publication_names_given = true;
     348              : 
     349              :                         /*
     350              :                          * Pass a copy of the DefElem->arg since SplitIdentifierString
     351              :                          * modifies its input.
     352              :                          */
     353            0 :                         if (!SplitIdentifierString(pstrdup(strVal(defel->arg)), ',',
     354            0 :                                                                            &data->publication_names))
     355            0 :                                 ereport(ERROR,
     356              :                                                 (errcode(ERRCODE_INVALID_NAME),
     357              :                                                  errmsg("invalid publication_names syntax")));
     358            0 :                 }
     359            0 :                 else if (strcmp(defel->defname, "binary") == 0)
     360              :                 {
     361            0 :                         if (binary_option_given)
     362            0 :                                 ereport(ERROR,
     363              :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     364              :                                                  errmsg("conflicting or redundant options")));
     365            0 :                         binary_option_given = true;
     366              : 
     367            0 :                         data->binary = defGetBoolean(defel);
     368            0 :                 }
     369            0 :                 else if (strcmp(defel->defname, "messages") == 0)
     370              :                 {
     371            0 :                         if (messages_option_given)
     372            0 :                                 ereport(ERROR,
     373              :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     374              :                                                  errmsg("conflicting or redundant options")));
     375            0 :                         messages_option_given = true;
     376              : 
     377            0 :                         data->messages = defGetBoolean(defel);
     378            0 :                 }
     379            0 :                 else if (strcmp(defel->defname, "streaming") == 0)
     380              :                 {
     381            0 :                         if (streaming_given)
     382            0 :                                 ereport(ERROR,
     383              :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     384              :                                                  errmsg("conflicting or redundant options")));
     385            0 :                         streaming_given = true;
     386              : 
     387            0 :                         data->streaming = defGetStreamingMode(defel);
     388            0 :                 }
     389            0 :                 else if (strcmp(defel->defname, "two_phase") == 0)
     390              :                 {
     391            0 :                         if (two_phase_option_given)
     392            0 :                                 ereport(ERROR,
     393              :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     394              :                                                  errmsg("conflicting or redundant options")));
     395            0 :                         two_phase_option_given = true;
     396              : 
     397            0 :                         data->two_phase = defGetBoolean(defel);
     398            0 :                 }
     399            0 :                 else if (strcmp(defel->defname, "origin") == 0)
     400              :                 {
     401            0 :                         char       *origin;
     402              : 
     403            0 :                         if (origin_option_given)
     404            0 :                                 ereport(ERROR,
     405              :                                                 errcode(ERRCODE_SYNTAX_ERROR),
     406              :                                                 errmsg("conflicting or redundant options"));
     407            0 :                         origin_option_given = true;
     408              : 
     409            0 :                         origin = defGetString(defel);
     410            0 :                         if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
     411            0 :                                 data->publish_no_origin = true;
     412            0 :                         else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
     413            0 :                                 data->publish_no_origin = false;
     414              :                         else
     415            0 :                                 ereport(ERROR,
     416              :                                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     417              :                                                 errmsg("unrecognized origin value: \"%s\"", origin));
     418            0 :                 }
     419              :                 else
     420            0 :                         elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     421            0 :         }
     422              : 
     423              :         /* Check required options */
     424            0 :         if (!protocol_version_given)
     425            0 :                 ereport(ERROR,
     426              :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     427              :                                 errmsg("option \"%s\" missing", "proto_version"));
     428            0 :         if (!publication_names_given)
     429            0 :                 ereport(ERROR,
     430              :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     431              :                                 errmsg("option \"%s\" missing", "publication_names"));
     432            0 : }
     433              : 
     434              : /*
     435              :  * Memory context reset callback of PGOutputData->context.
     436              :  */
     437              : static void
     438            0 : pgoutput_memory_context_reset(void *arg)
     439              : {
     440            0 :         if (RelationSyncCache)
     441              :         {
     442            0 :                 hash_destroy(RelationSyncCache);
     443            0 :                 RelationSyncCache = NULL;
     444            0 :         }
     445            0 : }
     446              : 
     447              : /*
     448              :  * Initialize this plugin
     449              :  */
     450              : static void
     451            0 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     452              :                                  bool is_init)
     453              : {
     454            0 :         PGOutputData *data = palloc0_object(PGOutputData);
     455              :         static bool publication_callback_registered = false;
     456            0 :         MemoryContextCallback *mcallback;
     457              : 
     458              :         /* Create our memory context for private allocations. */
     459            0 :         data->context = AllocSetContextCreate(ctx->context,
     460              :                                                                                   "logical replication output context",
     461              :                                                                                   ALLOCSET_DEFAULT_SIZES);
     462              : 
     463            0 :         data->cachectx = AllocSetContextCreate(ctx->context,
     464              :                                                                                    "logical replication cache context",
     465              :                                                                                    ALLOCSET_DEFAULT_SIZES);
     466              : 
     467            0 :         data->pubctx = AllocSetContextCreate(ctx->context,
     468              :                                                                                  "logical replication publication list context",
     469              :                                                                                  ALLOCSET_SMALL_SIZES);
     470              : 
     471              :         /*
     472              :          * Ensure to cleanup RelationSyncCache even when logical decoding invoked
     473              :          * via SQL interface ends up with an error.
     474              :          */
     475            0 :         mcallback = palloc0_object(MemoryContextCallback);
     476            0 :         mcallback->func = pgoutput_memory_context_reset;
     477            0 :         MemoryContextRegisterResetCallback(ctx->context, mcallback);
     478              : 
     479            0 :         ctx->output_plugin_private = data;
     480              : 
     481              :         /* This plugin uses binary protocol. */
     482            0 :         opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     483              : 
     484              :         /*
     485              :          * This is replication start and not slot initialization.
     486              :          *
     487              :          * Parse and validate options passed by the client.
     488              :          */
     489            0 :         if (!is_init)
     490              :         {
     491              :                 /* Parse the params and ERROR if we see any we don't recognize */
     492            0 :                 parse_output_parameters(ctx->output_plugin_options, data);
     493              : 
     494              :                 /* Check if we support requested protocol */
     495            0 :                 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     496            0 :                         ereport(ERROR,
     497              :                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     498              :                                          errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
     499              :                                                         data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     500              : 
     501            0 :                 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     502            0 :                         ereport(ERROR,
     503              :                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     504              :                                          errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
     505              :                                                         data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     506              : 
     507              :                 /*
     508              :                  * Decide whether to enable streaming. It is disabled by default, in
     509              :                  * which case we just update the flag in decoding context. Otherwise
     510              :                  * we only allow it with sufficient version of the protocol, and when
     511              :                  * the output plugin supports it.
     512              :                  */
     513            0 :                 if (data->streaming == LOGICALREP_STREAM_OFF)
     514            0 :                         ctx->streaming = false;
     515            0 :                 else if (data->streaming == LOGICALREP_STREAM_ON &&
     516            0 :                                  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     517            0 :                         ereport(ERROR,
     518              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     519              :                                          errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     520              :                                                         data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     521            0 :                 else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
     522            0 :                                  data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
     523            0 :                         ereport(ERROR,
     524              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     525              :                                          errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
     526              :                                                         data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
     527            0 :                 else if (!ctx->streaming)
     528            0 :                         ereport(ERROR,
     529              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     530              :                                          errmsg("streaming requested, but not supported by output plugin")));
     531              : 
     532              :                 /*
     533              :                  * Here, we just check whether the two-phase option is passed by
     534              :                  * plugin and decide whether to enable it at later point of time. It
     535              :                  * remains enabled if the previous start-up has done so. But we only
     536              :                  * allow the option to be passed in with sufficient version of the
     537              :                  * protocol, and when the output plugin supports it.
     538              :                  */
     539            0 :                 if (!data->two_phase)
     540            0 :                         ctx->twophase_opt_given = false;
     541            0 :                 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
     542            0 :                         ereport(ERROR,
     543              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     544              :                                          errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
     545              :                                                         data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
     546            0 :                 else if (!ctx->twophase)
     547            0 :                         ereport(ERROR,
     548              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     549              :                                          errmsg("two-phase commit requested, but not supported by output plugin")));
     550              :                 else
     551            0 :                         ctx->twophase_opt_given = true;
     552              : 
     553              :                 /* Init publication state. */
     554            0 :                 data->publications = NIL;
     555            0 :                 publications_valid = false;
     556              : 
     557              :                 /*
     558              :                  * Register callback for pg_publication if we didn't already do that
     559              :                  * during some previous call in this process.
     560              :                  */
     561            0 :                 if (!publication_callback_registered)
     562              :                 {
     563            0 :                         CacheRegisterSyscacheCallback(PUBLICATIONOID,
     564              :                                                                                   publication_invalidation_cb,
     565              :                                                                                   (Datum) 0);
     566            0 :                         CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
     567              :                                                                                  (Datum) 0);
     568            0 :                         publication_callback_registered = true;
     569            0 :                 }
     570              : 
     571              :                 /* Initialize relation schema cache. */
     572            0 :                 init_rel_sync_cache(CacheMemoryContext);
     573            0 :         }
     574              :         else
     575              :         {
     576              :                 /*
     577              :                  * Disable the streaming and prepared transactions during the slot
     578              :                  * initialization mode.
     579              :                  */
     580            0 :                 ctx->streaming = false;
     581            0 :                 ctx->twophase = false;
     582              :         }
     583            0 : }
     584              : 
     585              : /*
     586              :  * BEGIN callback.
     587              :  *
     588              :  * Don't send the BEGIN message here instead postpone it until the first
     589              :  * change. In logical replication, a common scenario is to replicate a set of
     590              :  * tables (instead of all tables) and transactions whose changes were on
     591              :  * the table(s) that are not published will produce empty transactions. These
     592              :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
     593              :  * using bandwidth on something with little/no use for logical replication.
     594              :  */
     595              : static void
     596            0 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     597              : {
     598            0 :         PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
     599              :                                                                                                           sizeof(PGOutputTxnData));
     600              : 
     601            0 :         txn->output_plugin_private = txndata;
     602            0 : }
     603              : 
     604              : /*
     605              :  * Send BEGIN.
     606              :  *
     607              :  * This is called while processing the first change of the transaction.
     608              :  */
     609              : static void
     610            0 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     611              : {
     612            0 :         bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
     613            0 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     614              : 
     615            0 :         Assert(txndata);
     616            0 :         Assert(!txndata->sent_begin_txn);
     617              : 
     618            0 :         OutputPluginPrepareWrite(ctx, !send_replication_origin);
     619            0 :         logicalrep_write_begin(ctx->out, txn);
     620            0 :         txndata->sent_begin_txn = true;
     621              : 
     622            0 :         send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     623            0 :                                          send_replication_origin);
     624              : 
     625            0 :         OutputPluginWrite(ctx, true);
     626            0 : }
     627              : 
     628              : /*
     629              :  * COMMIT callback
     630              :  */
     631              : static void
     632            0 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     633              :                                         XLogRecPtr commit_lsn)
     634              : {
     635            0 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     636            0 :         bool            sent_begin_txn;
     637              : 
     638            0 :         Assert(txndata);
     639              : 
     640              :         /*
     641              :          * We don't need to send the commit message unless some relevant change
     642              :          * from this transaction has been sent to the downstream.
     643              :          */
     644            0 :         sent_begin_txn = txndata->sent_begin_txn;
     645            0 :         OutputPluginUpdateProgress(ctx, !sent_begin_txn);
     646            0 :         pfree(txndata);
     647            0 :         txn->output_plugin_private = NULL;
     648              : 
     649            0 :         if (!sent_begin_txn)
     650              :         {
     651            0 :                 elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
     652            0 :                 return;
     653              :         }
     654              : 
     655            0 :         OutputPluginPrepareWrite(ctx, true);
     656            0 :         logicalrep_write_commit(ctx->out, txn, commit_lsn);
     657            0 :         OutputPluginWrite(ctx, true);
     658            0 : }
     659              : 
     660              : /*
     661              :  * BEGIN PREPARE callback
     662              :  */
     663              : static void
     664            0 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     665              : {
     666            0 :         bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
     667              : 
     668            0 :         OutputPluginPrepareWrite(ctx, !send_replication_origin);
     669            0 :         logicalrep_write_begin_prepare(ctx->out, txn);
     670              : 
     671            0 :         send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     672            0 :                                          send_replication_origin);
     673              : 
     674            0 :         OutputPluginWrite(ctx, true);
     675            0 : }
     676              : 
     677              : /*
     678              :  * PREPARE callback
     679              :  */
     680              : static void
     681            0 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     682              :                                          XLogRecPtr prepare_lsn)
     683              : {
     684            0 :         OutputPluginUpdateProgress(ctx, false);
     685              : 
     686            0 :         OutputPluginPrepareWrite(ctx, true);
     687            0 :         logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
     688            0 :         OutputPluginWrite(ctx, true);
     689            0 : }
     690              : 
     691              : /*
     692              :  * COMMIT PREPARED callback
     693              :  */
     694              : static void
     695            0 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     696              :                                                          XLogRecPtr commit_lsn)
     697              : {
     698            0 :         OutputPluginUpdateProgress(ctx, false);
     699              : 
     700            0 :         OutputPluginPrepareWrite(ctx, true);
     701            0 :         logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
     702            0 :         OutputPluginWrite(ctx, true);
     703            0 : }
     704              : 
     705              : /*
     706              :  * ROLLBACK PREPARED callback
     707              :  */
     708              : static void
     709            0 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     710              :                                                            ReorderBufferTXN *txn,
     711              :                                                            XLogRecPtr prepare_end_lsn,
     712              :                                                            TimestampTz prepare_time)
     713              : {
     714            0 :         OutputPluginUpdateProgress(ctx, false);
     715              : 
     716            0 :         OutputPluginPrepareWrite(ctx, true);
     717            0 :         logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
     718            0 :                                                                            prepare_time);
     719            0 :         OutputPluginWrite(ctx, true);
     720            0 : }
     721              : 
     722              : /*
     723              :  * Write the current schema of the relation and its ancestor (if any) if not
     724              :  * done yet.
     725              :  */
     726              : static void
     727            0 : maybe_send_schema(LogicalDecodingContext *ctx,
     728              :                                   ReorderBufferChange *change,
     729              :                                   Relation relation, RelationSyncEntry *relentry)
     730              : {
     731            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     732            0 :         bool            schema_sent;
     733            0 :         TransactionId xid = InvalidTransactionId;
     734            0 :         TransactionId topxid = InvalidTransactionId;
     735              : 
     736              :         /*
     737              :          * Remember XID of the (sub)transaction for the change. We don't care if
     738              :          * it's top-level transaction or not (we have already sent that XID in
     739              :          * start of the current streaming block).
     740              :          *
     741              :          * If we're not in a streaming block, just use InvalidTransactionId and
     742              :          * the write methods will not include it.
     743              :          */
     744            0 :         if (data->in_streaming)
     745            0 :                 xid = change->txn->xid;
     746              : 
     747            0 :         if (rbtxn_is_subtxn(change->txn))
     748            0 :                 topxid = rbtxn_get_toptxn(change->txn)->xid;
     749              :         else
     750            0 :                 topxid = xid;
     751              : 
     752              :         /*
     753              :          * Do we need to send the schema? We do track streamed transactions
     754              :          * separately, because those may be applied later (and the regular
     755              :          * transactions won't see their effects until then) and in an order that
     756              :          * we don't know at this point.
     757              :          *
     758              :          * XXX There is a scope of optimization here. Currently, we always send
     759              :          * the schema first time in a streaming transaction but we can probably
     760              :          * avoid that by checking 'relentry->schema_sent' flag. However, before
     761              :          * doing that we need to study its impact on the case where we have a mix
     762              :          * of streaming and non-streaming transactions.
     763              :          */
     764            0 :         if (data->in_streaming)
     765            0 :                 schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     766              :         else
     767            0 :                 schema_sent = relentry->schema_sent;
     768              : 
     769              :         /* Nothing to do if we already sent the schema. */
     770            0 :         if (schema_sent)
     771            0 :                 return;
     772              : 
     773              :         /*
     774              :          * Send the schema.  If the changes will be published using an ancestor's
     775              :          * schema, not the relation's own, send that ancestor's schema before
     776              :          * sending relation's own (XXX - maybe sending only the former suffices?).
     777              :          */
     778            0 :         if (relentry->publish_as_relid != RelationGetRelid(relation))
     779              :         {
     780            0 :                 Relation        ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     781              : 
     782            0 :                 send_relation_and_attrs(ancestor, xid, ctx, relentry);
     783            0 :                 RelationClose(ancestor);
     784            0 :         }
     785              : 
     786            0 :         send_relation_and_attrs(relation, xid, ctx, relentry);
     787              : 
     788            0 :         if (data->in_streaming)
     789            0 :                 set_schema_sent_in_streamed_txn(relentry, topxid);
     790              :         else
     791            0 :                 relentry->schema_sent = true;
     792            0 : }
     793              : 
     794              : /*
     795              :  * Sends a relation
     796              :  */
     797              : static void
     798            0 : send_relation_and_attrs(Relation relation, TransactionId xid,
     799              :                                                 LogicalDecodingContext *ctx,
     800              :                                                 RelationSyncEntry *relentry)
     801              : {
     802            0 :         TupleDesc       desc = RelationGetDescr(relation);
     803            0 :         Bitmapset  *columns = relentry->columns;
     804            0 :         PublishGencolsType include_gencols_type = relentry->include_gencols_type;
     805            0 :         int                     i;
     806              : 
     807              :         /*
     808              :          * Write out type info if needed.  We do that only for user-created types.
     809              :          * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     810              :          * objects with hand-assigned OIDs to be "built in", not for instance any
     811              :          * function or type defined in the information_schema. This is important
     812              :          * because only hand-assigned OIDs can be expected to remain stable across
     813              :          * major versions.
     814              :          */
     815            0 :         for (i = 0; i < desc->natts; i++)
     816              :         {
     817            0 :                 Form_pg_attribute att = TupleDescAttr(desc, i);
     818              : 
     819            0 :                 if (!logicalrep_should_publish_column(att, columns,
     820            0 :                                                                                           include_gencols_type))
     821            0 :                         continue;
     822              : 
     823            0 :                 if (att->atttypid < FirstGenbkiObjectId)
     824            0 :                         continue;
     825              : 
     826            0 :                 OutputPluginPrepareWrite(ctx, false);
     827            0 :                 logicalrep_write_typ(ctx->out, xid, att->atttypid);
     828            0 :                 OutputPluginWrite(ctx, false);
     829            0 :         }
     830              : 
     831            0 :         OutputPluginPrepareWrite(ctx, false);
     832            0 :         logicalrep_write_rel(ctx->out, xid, relation, columns,
     833            0 :                                                  include_gencols_type);
     834            0 :         OutputPluginWrite(ctx, false);
     835            0 : }
     836              : 
     837              : /*
     838              :  * Executor state preparation for evaluation of row filter expressions for the
     839              :  * specified relation.
     840              :  */
     841              : static EState *
     842            0 : create_estate_for_relation(Relation rel)
     843              : {
     844            0 :         EState     *estate;
     845            0 :         RangeTblEntry *rte;
     846            0 :         List       *perminfos = NIL;
     847              : 
     848            0 :         estate = CreateExecutorState();
     849              : 
     850            0 :         rte = makeNode(RangeTblEntry);
     851            0 :         rte->rtekind = RTE_RELATION;
     852            0 :         rte->relid = RelationGetRelid(rel);
     853            0 :         rte->relkind = rel->rd_rel->relkind;
     854            0 :         rte->rellockmode = AccessShareLock;
     855              : 
     856            0 :         addRTEPermissionInfo(&perminfos, rte);
     857              : 
     858            0 :         ExecInitRangeTable(estate, list_make1(rte), perminfos,
     859            0 :                                            bms_make_singleton(1));
     860              : 
     861            0 :         estate->es_output_cid = GetCurrentCommandId(false);
     862              : 
     863            0 :         return estate;
     864            0 : }
     865              : 
     866              : /*
     867              :  * Evaluates row filter.
     868              :  *
     869              :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
     870              :  * isn't replicated.
     871              :  */
     872              : static bool
     873            0 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
     874              : {
     875            0 :         Datum           ret;
     876            0 :         bool            isnull;
     877              : 
     878            0 :         Assert(state != NULL);
     879              : 
     880            0 :         ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
     881              : 
     882            0 :         elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
     883              :                  isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
     884              :                  isnull ? "true" : "false");
     885              : 
     886            0 :         if (isnull)
     887            0 :                 return false;
     888              : 
     889            0 :         return DatumGetBool(ret);
     890            0 : }
     891              : 
     892              : /*
     893              :  * Make sure the per-entry memory context exists.
     894              :  */
     895              : static void
     896            0 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
     897              : {
     898            0 :         Relation        relation;
     899              : 
     900              :         /* The context may already exist, in which case bail out. */
     901            0 :         if (entry->entry_cxt)
     902            0 :                 return;
     903              : 
     904            0 :         relation = RelationIdGetRelation(entry->publish_as_relid);
     905              : 
     906            0 :         entry->entry_cxt = AllocSetContextCreate(data->cachectx,
     907              :                                                                                          "entry private context",
     908              :                                                                                          ALLOCSET_SMALL_SIZES);
     909              : 
     910            0 :         MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
     911              :                                                                           RelationGetRelationName(relation));
     912            0 : }
     913              : 
     914              : /*
     915              :  * Initialize the row filter.
     916              :  */
     917              : static void
     918            0 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
     919              :                                                  RelationSyncEntry *entry)
     920              : {
     921            0 :         ListCell   *lc;
     922            0 :         List       *rfnodes[] = {NIL, NIL, NIL};        /* One per pubaction */
     923            0 :         bool            no_filter[] = {false, false, false};    /* One per pubaction */
     924            0 :         MemoryContext oldctx;
     925            0 :         int                     idx;
     926            0 :         bool            has_filter = true;
     927            0 :         Oid                     schemaid = get_rel_namespace(entry->publish_as_relid);
     928              : 
     929              :         /*
     930              :          * Find if there are any row filters for this relation. If there are, then
     931              :          * prepare the necessary ExprState and cache it in entry->exprstate. To
     932              :          * build an expression state, we need to ensure the following:
     933              :          *
     934              :          * All the given publication-table mappings must be checked.
     935              :          *
     936              :          * Multiple publications might have multiple row filters for this
     937              :          * relation. Since row filter usage depends on the DML operation, there
     938              :          * are multiple lists (one for each operation) to which row filters will
     939              :          * be appended.
     940              :          *
     941              :          * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
     942              :          * expression" so it takes precedence.
     943              :          */
     944            0 :         foreach(lc, publications)
     945              :         {
     946            0 :                 Publication *pub = lfirst(lc);
     947            0 :                 HeapTuple       rftuple = NULL;
     948            0 :                 Datum           rfdatum = 0;
     949            0 :                 bool            pub_no_filter = true;
     950              : 
     951              :                 /*
     952              :                  * If the publication is FOR ALL TABLES, or the publication includes a
     953              :                  * FOR TABLES IN SCHEMA where the table belongs to the referred
     954              :                  * schema, then it is treated the same as if there are no row filters
     955              :                  * (even if other publications have a row filter).
     956              :                  */
     957            0 :                 if (!pub->alltables &&
     958            0 :                         !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     959              :                                                                    ObjectIdGetDatum(schemaid),
     960              :                                                                    ObjectIdGetDatum(pub->oid)))
     961              :                 {
     962              :                         /*
     963              :                          * Check for the presence of a row filter in this publication.
     964              :                          */
     965            0 :                         rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     966            0 :                                                                           ObjectIdGetDatum(entry->publish_as_relid),
     967            0 :                                                                           ObjectIdGetDatum(pub->oid));
     968              : 
     969            0 :                         if (HeapTupleIsValid(rftuple))
     970              :                         {
     971              :                                 /* Null indicates no filter. */
     972            0 :                                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     973              :                                                                                   Anum_pg_publication_rel_prqual,
     974              :                                                                                   &pub_no_filter);
     975            0 :                         }
     976            0 :                 }
     977              : 
     978            0 :                 if (pub_no_filter)
     979              :                 {
     980            0 :                         if (rftuple)
     981            0 :                                 ReleaseSysCache(rftuple);
     982              : 
     983            0 :                         no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
     984            0 :                         no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
     985            0 :                         no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
     986              : 
     987              :                         /*
     988              :                          * Quick exit if all the DML actions are publicized via this
     989              :                          * publication.
     990              :                          */
     991            0 :                         if (no_filter[PUBACTION_INSERT] &&
     992            0 :                                 no_filter[PUBACTION_UPDATE] &&
     993            0 :                                 no_filter[PUBACTION_DELETE])
     994              :                         {
     995            0 :                                 has_filter = false;
     996            0 :                                 break;
     997              :                         }
     998              : 
     999              :                         /* No additional work for this publication. Next one. */
    1000            0 :                         continue;
    1001              :                 }
    1002              : 
    1003              :                 /* Form the per pubaction row filter lists. */
    1004            0 :                 if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
    1005            0 :                         rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
    1006            0 :                                                                                                 TextDatumGetCString(rfdatum));
    1007            0 :                 if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
    1008            0 :                         rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
    1009            0 :                                                                                                 TextDatumGetCString(rfdatum));
    1010            0 :                 if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
    1011            0 :                         rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
    1012            0 :                                                                                                 TextDatumGetCString(rfdatum));
    1013              : 
    1014            0 :                 ReleaseSysCache(rftuple);
    1015            0 :         }                                                       /* loop all subscribed publications */
    1016              : 
    1017              :         /* Clean the row filter */
    1018            0 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
    1019              :         {
    1020            0 :                 if (no_filter[idx])
    1021              :                 {
    1022            0 :                         list_free_deep(rfnodes[idx]);
    1023            0 :                         rfnodes[idx] = NIL;
    1024            0 :                 }
    1025            0 :         }
    1026              : 
    1027            0 :         if (has_filter)
    1028              :         {
    1029            0 :                 Relation        relation = RelationIdGetRelation(entry->publish_as_relid);
    1030              : 
    1031            0 :                 pgoutput_ensure_entry_cxt(data, entry);
    1032              : 
    1033              :                 /*
    1034              :                  * Now all the filters for all pubactions are known. Combine them when
    1035              :                  * their pubactions are the same.
    1036              :                  */
    1037            0 :                 oldctx = MemoryContextSwitchTo(entry->entry_cxt);
    1038            0 :                 entry->estate = create_estate_for_relation(relation);
    1039            0 :                 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
    1040              :                 {
    1041            0 :                         List       *filters = NIL;
    1042            0 :                         Expr       *rfnode;
    1043              : 
    1044            0 :                         if (rfnodes[idx] == NIL)
    1045            0 :                                 continue;
    1046              : 
    1047            0 :                         foreach(lc, rfnodes[idx])
    1048            0 :                                 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
    1049              : 
    1050              :                         /* combine the row filter and cache the ExprState */
    1051            0 :                         rfnode = make_orclause(filters);
    1052            0 :                         entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
    1053            0 :                 }                                               /* for each pubaction */
    1054            0 :                 MemoryContextSwitchTo(oldctx);
    1055              : 
    1056            0 :                 RelationClose(relation);
    1057            0 :         }
    1058            0 : }
    1059              : 
    1060              : /*
    1061              :  * If the table contains a generated column, check for any conflicting
    1062              :  * values of 'publish_generated_columns' parameter in the publications.
    1063              :  */
    1064              : static void
    1065            0 : check_and_init_gencol(PGOutputData *data, List *publications,
    1066              :                                           RelationSyncEntry *entry)
    1067              : {
    1068            0 :         Relation        relation = RelationIdGetRelation(entry->publish_as_relid);
    1069            0 :         TupleDesc       desc = RelationGetDescr(relation);
    1070            0 :         bool            gencolpresent = false;
    1071            0 :         bool            first = true;
    1072              : 
    1073              :         /* Check if there is any generated column present. */
    1074            0 :         for (int i = 0; i < desc->natts; i++)
    1075              :         {
    1076            0 :                 CompactAttribute *att = TupleDescCompactAttr(desc, i);
    1077              : 
    1078            0 :                 if (att->attgenerated)
    1079              :                 {
    1080            0 :                         gencolpresent = true;
    1081            0 :                         break;
    1082              :                 }
    1083            0 :         }
    1084              : 
    1085              :         /* There are no generated columns to be published. */
    1086            0 :         if (!gencolpresent)
    1087              :         {
    1088            0 :                 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    1089            0 :                 return;
    1090              :         }
    1091              : 
    1092              :         /*
    1093              :          * There may be a conflicting value for 'publish_generated_columns'
    1094              :          * parameter in the publications.
    1095              :          */
    1096            0 :         foreach_ptr(Publication, pub, publications)
    1097              :         {
    1098              :                 /*
    1099              :                  * The column list takes precedence over the
    1100              :                  * 'publish_generated_columns' parameter. Those will be checked later,
    1101              :                  * see pgoutput_column_list_init.
    1102              :                  */
    1103            0 :                 if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
    1104            0 :                         continue;
    1105              : 
    1106            0 :                 if (first)
    1107              :                 {
    1108            0 :                         entry->include_gencols_type = pub->pubgencols_type;
    1109            0 :                         first = false;
    1110            0 :                 }
    1111            0 :                 else if (entry->include_gencols_type != pub->pubgencols_type)
    1112            0 :                         ereport(ERROR,
    1113              :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1114              :                                         errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
    1115              :                                                    get_namespace_name(RelationGetNamespace(relation)),
    1116              :                                                    RelationGetRelationName(relation)));
    1117            0 :         }
    1118            0 : }
    1119              : 
    1120              : /*
    1121              :  * Initialize the column list.
    1122              :  */
    1123              : static void
    1124            0 : pgoutput_column_list_init(PGOutputData *data, List *publications,
    1125              :                                                   RelationSyncEntry *entry)
    1126              : {
    1127            0 :         ListCell   *lc;
    1128            0 :         bool            first = true;
    1129            0 :         Relation        relation = RelationIdGetRelation(entry->publish_as_relid);
    1130            0 :         bool            found_pub_collist = false;
    1131            0 :         Bitmapset  *relcols = NULL;
    1132              : 
    1133            0 :         pgoutput_ensure_entry_cxt(data, entry);
    1134              : 
    1135              :         /*
    1136              :          * Find if there are any column lists for this relation. If there are,
    1137              :          * build a bitmap using the column lists.
    1138              :          *
    1139              :          * Multiple publications might have multiple column lists for this
    1140              :          * relation.
    1141              :          *
    1142              :          * Note that we don't support the case where the column list is different
    1143              :          * for the same table when combining publications. See comments atop
    1144              :          * fetch_relation_list. But one can later change the publication so we
    1145              :          * still need to check all the given publication-table mappings and report
    1146              :          * an error if any publications have a different column list.
    1147              :          */
    1148            0 :         foreach(lc, publications)
    1149              :         {
    1150            0 :                 Publication *pub = lfirst(lc);
    1151            0 :                 Bitmapset  *cols = NULL;
    1152              : 
    1153              :                 /* Retrieve the bitmap of columns for a column list publication. */
    1154            0 :                 found_pub_collist |= check_and_fetch_column_list(pub,
    1155            0 :                                                                                                                  entry->publish_as_relid,
    1156            0 :                                                                                                                  entry->entry_cxt, &cols);
    1157              : 
    1158              :                 /*
    1159              :                  * For non-column list publications — e.g. TABLE (without a column
    1160              :                  * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
    1161              :                  * of the table (including generated columns when
    1162              :                  * 'publish_generated_columns' parameter is true).
    1163              :                  */
    1164            0 :                 if (!cols)
    1165              :                 {
    1166              :                         /*
    1167              :                          * Cache the table columns for the first publication with no
    1168              :                          * specified column list to detect publication with a different
    1169              :                          * column list.
    1170              :                          */
    1171            0 :                         if (!relcols && (list_length(publications) > 1))
    1172              :                         {
    1173            0 :                                 MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
    1174              : 
    1175            0 :                                 relcols = pub_form_cols_map(relation,
    1176            0 :                                                                                         entry->include_gencols_type);
    1177            0 :                                 MemoryContextSwitchTo(oldcxt);
    1178            0 :                         }
    1179              : 
    1180            0 :                         cols = relcols;
    1181            0 :                 }
    1182              : 
    1183            0 :                 if (first)
    1184              :                 {
    1185            0 :                         entry->columns = cols;
    1186            0 :                         first = false;
    1187            0 :                 }
    1188            0 :                 else if (!bms_equal(entry->columns, cols))
    1189            0 :                         ereport(ERROR,
    1190              :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1191              :                                         errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    1192              :                                                    get_namespace_name(RelationGetNamespace(relation)),
    1193              :                                                    RelationGetRelationName(relation)));
    1194            0 :         }                                                       /* loop all subscribed publications */
    1195              : 
    1196              :         /*
    1197              :          * If no column list publications exist, columns to be published will be
    1198              :          * computed later according to the 'publish_generated_columns' parameter.
    1199              :          */
    1200            0 :         if (!found_pub_collist)
    1201            0 :                 entry->columns = NULL;
    1202              : 
    1203            0 :         RelationClose(relation);
    1204            0 : }
    1205              : 
    1206              : /*
    1207              :  * Initialize the slot for storing new and old tuples, and build the map that
    1208              :  * will be used to convert the relation's tuples into the ancestor's format.
    1209              :  */
    1210              : static void
    1211            0 : init_tuple_slot(PGOutputData *data, Relation relation,
    1212              :                                 RelationSyncEntry *entry)
    1213              : {
    1214            0 :         MemoryContext oldctx;
    1215            0 :         TupleDesc       oldtupdesc;
    1216            0 :         TupleDesc       newtupdesc;
    1217              : 
    1218            0 :         oldctx = MemoryContextSwitchTo(data->cachectx);
    1219              : 
    1220              :         /*
    1221              :          * Create tuple table slots. Create a copy of the TupleDesc as it needs to
    1222              :          * live as long as the cache remains.
    1223              :          */
    1224            0 :         oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1225            0 :         newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1226              : 
    1227            0 :         entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
    1228            0 :         entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
    1229              : 
    1230            0 :         MemoryContextSwitchTo(oldctx);
    1231              : 
    1232              :         /*
    1233              :          * Cache the map that will be used to convert the relation's tuples into
    1234              :          * the ancestor's format, if needed.
    1235              :          */
    1236            0 :         if (entry->publish_as_relid != RelationGetRelid(relation))
    1237              :         {
    1238            0 :                 Relation        ancestor = RelationIdGetRelation(entry->publish_as_relid);
    1239            0 :                 TupleDesc       indesc = RelationGetDescr(relation);
    1240            0 :                 TupleDesc       outdesc = RelationGetDescr(ancestor);
    1241              : 
    1242              :                 /* Map must live as long as the logical decoding context. */
    1243            0 :                 oldctx = MemoryContextSwitchTo(data->cachectx);
    1244              : 
    1245            0 :                 entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
    1246              : 
    1247            0 :                 MemoryContextSwitchTo(oldctx);
    1248            0 :                 RelationClose(ancestor);
    1249            0 :         }
    1250            0 : }
    1251              : 
    1252              : /*
    1253              :  * Change is checked against the row filter if any.
    1254              :  *
    1255              :  * Returns true if the change is to be replicated, else false.
    1256              :  *
    1257              :  * For inserts, evaluate the row filter for new tuple.
    1258              :  * For deletes, evaluate the row filter for old tuple.
    1259              :  * For updates, evaluate the row filter for old and new tuple.
    1260              :  *
    1261              :  * For updates, if both evaluations are true, we allow sending the UPDATE and
    1262              :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
    1263              :  * only one of the tuples matches the row filter expression, we transform
    1264              :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
    1265              :  * following rules:
    1266              :  *
    1267              :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
    1268              :  * Case 2: old-row (no match)    new row (match)     -> INSERT
    1269              :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
    1270              :  * Case 4: old-row (match)       new row (match)     -> UPDATE
    1271              :  *
    1272              :  * The new action is updated in the action parameter.
    1273              :  *
    1274              :  * The new slot could be updated when transforming the UPDATE into INSERT,
    1275              :  * because the original new tuple might not have column values from the replica
    1276              :  * identity.
    1277              :  *
    1278              :  * Examples:
    1279              :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
    1280              :  * Since the old tuple satisfies, the initial table synchronization copied this
    1281              :  * row (or another method was used to guarantee that there is data
    1282              :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
    1283              :  * row filter, so from a data consistency perspective, that row should be
    1284              :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
    1285              :  * statement and be sent to the subscriber. Keeping this row on the subscriber
    1286              :  * is undesirable because it doesn't reflect what was defined in the row filter
    1287              :  * expression on the publisher. This row on the subscriber would likely not be
    1288              :  * modified by replication again. If someone inserted a new row with the same
    1289              :  * old identifier, replication could stop due to a constraint violation.
    1290              :  *
    1291              :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
    1292              :  * Since the old tuple doesn't satisfy, the initial table synchronization
    1293              :  * probably didn't copy this row. However, after the UPDATE the new tuple does
    1294              :  * satisfy the row filter, so from a data consistency perspective, that row
    1295              :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
    1296              :  * statements have no effect (it matches no row -- see
    1297              :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
    1298              :  * INSERT statement and be sent to the subscriber. However, this might surprise
    1299              :  * someone who expects the data set to satisfy the row filter expression on the
    1300              :  * provider.
    1301              :  */
    1302              : static bool
    1303            0 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
    1304              :                                         TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
    1305              :                                         ReorderBufferChangeType *action)
    1306              : {
    1307            0 :         TupleDesc       desc;
    1308            0 :         int                     i;
    1309            0 :         bool            old_matched,
    1310              :                                 new_matched,
    1311              :                                 result;
    1312            0 :         TupleTableSlot *tmp_new_slot;
    1313            0 :         TupleTableSlot *new_slot = *new_slot_ptr;
    1314            0 :         ExprContext *ecxt;
    1315            0 :         ExprState  *filter_exprstate;
    1316              : 
    1317              :         /*
    1318              :          * We need this map to avoid relying on ReorderBufferChangeType enums
    1319              :          * having specific values.
    1320              :          */
    1321              :         static const int map_changetype_pubaction[] = {
    1322              :                 [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
    1323              :                 [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
    1324              :                 [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
    1325              :         };
    1326              : 
    1327            0 :         Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
    1328              :                    *action == REORDER_BUFFER_CHANGE_UPDATE ||
    1329              :                    *action == REORDER_BUFFER_CHANGE_DELETE);
    1330              : 
    1331            0 :         Assert(new_slot || old_slot);
    1332              : 
    1333              :         /* Get the corresponding row filter */
    1334            0 :         filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
    1335              : 
    1336              :         /* Bail out if there is no row filter */
    1337            0 :         if (!filter_exprstate)
    1338            0 :                 return true;
    1339              : 
    1340            0 :         elog(DEBUG3, "table \"%s.%s\" has row filter",
    1341              :                  get_namespace_name(RelationGetNamespace(relation)),
    1342              :                  RelationGetRelationName(relation));
    1343              : 
    1344            0 :         ResetPerTupleExprContext(entry->estate);
    1345              : 
    1346            0 :         ecxt = GetPerTupleExprContext(entry->estate);
    1347              : 
    1348              :         /*
    1349              :          * For the following occasions where there is only one tuple, we can
    1350              :          * evaluate the row filter for that tuple and return.
    1351              :          *
    1352              :          * For inserts, we only have the new tuple.
    1353              :          *
    1354              :          * For updates, we can have only a new tuple when none of the replica
    1355              :          * identity columns changed and none of those columns have external data
    1356              :          * but we still need to evaluate the row filter for the new tuple as the
    1357              :          * existing values of those columns might not match the filter. Also,
    1358              :          * users can use constant expressions in the row filter, so we anyway need
    1359              :          * to evaluate it for the new tuple.
    1360              :          *
    1361              :          * For deletes, we only have the old tuple.
    1362              :          */
    1363            0 :         if (!new_slot || !old_slot)
    1364              :         {
    1365            0 :                 ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
    1366            0 :                 result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1367              : 
    1368            0 :                 return result;
    1369              :         }
    1370              : 
    1371              :         /*
    1372              :          * Both the old and new tuples must be valid only for updates and need to
    1373              :          * be checked against the row filter.
    1374              :          */
    1375            0 :         Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
    1376              : 
    1377            0 :         slot_getallattrs(new_slot);
    1378            0 :         slot_getallattrs(old_slot);
    1379              : 
    1380            0 :         tmp_new_slot = NULL;
    1381            0 :         desc = RelationGetDescr(relation);
    1382              : 
    1383              :         /*
    1384              :          * The new tuple might not have all the replica identity columns, in which
    1385              :          * case it needs to be copied over from the old tuple.
    1386              :          */
    1387            0 :         for (i = 0; i < desc->natts; i++)
    1388              :         {
    1389            0 :                 CompactAttribute *att = TupleDescCompactAttr(desc, i);
    1390              : 
    1391              :                 /*
    1392              :                  * if the column in the new tuple or old tuple is null, nothing to do
    1393              :                  */
    1394            0 :                 if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
    1395            0 :                         continue;
    1396              : 
    1397              :                 /*
    1398              :                  * Unchanged toasted replica identity columns are only logged in the
    1399              :                  * old tuple. Copy this over to the new tuple. The changed (or WAL
    1400              :                  * Logged) toast values are always assembled in memory and set as
    1401              :                  * VARTAG_INDIRECT. See ReorderBufferToastReplace.
    1402              :                  */
    1403            0 :                 if (att->attlen == -1 &&
    1404            0 :                         VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
    1405            0 :                         !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
    1406              :                 {
    1407            0 :                         if (!tmp_new_slot)
    1408              :                         {
    1409            0 :                                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
    1410            0 :                                 ExecClearTuple(tmp_new_slot);
    1411              : 
    1412            0 :                                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
    1413              :                                            desc->natts * sizeof(Datum));
    1414            0 :                                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
    1415              :                                            desc->natts * sizeof(bool));
    1416            0 :                         }
    1417              : 
    1418            0 :                         tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
    1419            0 :                         tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
    1420            0 :                 }
    1421            0 :         }
    1422              : 
    1423            0 :         ecxt->ecxt_scantuple = old_slot;
    1424            0 :         old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1425              : 
    1426            0 :         if (tmp_new_slot)
    1427              :         {
    1428            0 :                 ExecStoreVirtualTuple(tmp_new_slot);
    1429            0 :                 ecxt->ecxt_scantuple = tmp_new_slot;
    1430            0 :         }
    1431              :         else
    1432            0 :                 ecxt->ecxt_scantuple = new_slot;
    1433              : 
    1434            0 :         new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1435              : 
    1436              :         /*
    1437              :          * Case 1: if both tuples don't match the row filter, bailout. Send
    1438              :          * nothing.
    1439              :          */
    1440            0 :         if (!old_matched && !new_matched)
    1441            0 :                 return false;
    1442              : 
    1443              :         /*
    1444              :          * Case 2: if the old tuple doesn't satisfy the row filter but the new
    1445              :          * tuple does, transform the UPDATE into INSERT.
    1446              :          *
    1447              :          * Use the newly transformed tuple that must contain the column values for
    1448              :          * all the replica identity columns. This is required to ensure that the
    1449              :          * while inserting the tuple in the downstream node, we have all the
    1450              :          * required column values.
    1451              :          */
    1452            0 :         if (!old_matched && new_matched)
    1453              :         {
    1454            0 :                 *action = REORDER_BUFFER_CHANGE_INSERT;
    1455              : 
    1456            0 :                 if (tmp_new_slot)
    1457            0 :                         *new_slot_ptr = tmp_new_slot;
    1458            0 :         }
    1459              : 
    1460              :         /*
    1461              :          * Case 3: if the old tuple satisfies the row filter but the new tuple
    1462              :          * doesn't, transform the UPDATE into DELETE.
    1463              :          *
    1464              :          * This transformation does not require another tuple. The Old tuple will
    1465              :          * be used for DELETE.
    1466              :          */
    1467            0 :         else if (old_matched && !new_matched)
    1468            0 :                 *action = REORDER_BUFFER_CHANGE_DELETE;
    1469              : 
    1470              :         /*
    1471              :          * Case 4: if both tuples match the row filter, transformation isn't
    1472              :          * required. (*action is default UPDATE).
    1473              :          */
    1474              : 
    1475            0 :         return true;
    1476            0 : }
    1477              : 
    1478              : /*
    1479              :  * Sends the decoded DML over wire.
    1480              :  *
    1481              :  * This is called both in streaming and non-streaming modes.
    1482              :  */
    1483              : static void
    1484            0 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1485              :                                 Relation relation, ReorderBufferChange *change)
    1486              : {
    1487            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1488            0 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1489            0 :         MemoryContext old;
    1490            0 :         RelationSyncEntry *relentry;
    1491            0 :         TransactionId xid = InvalidTransactionId;
    1492            0 :         Relation        ancestor = NULL;
    1493            0 :         Relation        targetrel = relation;
    1494            0 :         ReorderBufferChangeType action = change->action;
    1495            0 :         TupleTableSlot *old_slot = NULL;
    1496            0 :         TupleTableSlot *new_slot = NULL;
    1497              : 
    1498            0 :         if (!is_publishable_relation(relation))
    1499            0 :                 return;
    1500              : 
    1501              :         /*
    1502              :          * Remember the xid for the change in streaming mode. We need to send xid
    1503              :          * with each change in the streaming mode so that subscriber can make
    1504              :          * their association and on aborts, it can discard the corresponding
    1505              :          * changes.
    1506              :          */
    1507            0 :         if (data->in_streaming)
    1508            0 :                 xid = change->txn->xid;
    1509              : 
    1510            0 :         relentry = get_rel_sync_entry(data, relation);
    1511              : 
    1512              :         /* First check the table filter */
    1513            0 :         switch (action)
    1514              :         {
    1515              :                 case REORDER_BUFFER_CHANGE_INSERT:
    1516            0 :                         if (!relentry->pubactions.pubinsert)
    1517            0 :                                 return;
    1518            0 :                         break;
    1519              :                 case REORDER_BUFFER_CHANGE_UPDATE:
    1520            0 :                         if (!relentry->pubactions.pubupdate)
    1521            0 :                                 return;
    1522            0 :                         break;
    1523              :                 case REORDER_BUFFER_CHANGE_DELETE:
    1524            0 :                         if (!relentry->pubactions.pubdelete)
    1525            0 :                                 return;
    1526              : 
    1527              :                         /*
    1528              :                          * This is only possible if deletes are allowed even when replica
    1529              :                          * identity is not defined for a table. Since the DELETE action
    1530              :                          * can't be published, we simply return.
    1531              :                          */
    1532            0 :                         if (!change->data.tp.oldtuple)
    1533              :                         {
    1534            0 :                                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
    1535            0 :                                 return;
    1536              :                         }
    1537            0 :                         break;
    1538              :                 default:
    1539            0 :                         Assert(false);
    1540            0 :         }
    1541              : 
    1542              :         /* Avoid leaking memory by using and resetting our own context */
    1543            0 :         old = MemoryContextSwitchTo(data->context);
    1544              : 
    1545              :         /* Switch relation if publishing via root. */
    1546            0 :         if (relentry->publish_as_relid != RelationGetRelid(relation))
    1547              :         {
    1548            0 :                 Assert(relation->rd_rel->relispartition);
    1549            0 :                 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1550            0 :                 targetrel = ancestor;
    1551            0 :         }
    1552              : 
    1553            0 :         if (change->data.tp.oldtuple)
    1554              :         {
    1555            0 :                 old_slot = relentry->old_slot;
    1556            0 :                 ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
    1557              : 
    1558              :                 /* Convert tuple if needed. */
    1559            0 :                 if (relentry->attrmap)
    1560              :                 {
    1561            0 :                         TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1562              :                                                                                                           &TTSOpsVirtual);
    1563              : 
    1564            0 :                         old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
    1565            0 :                 }
    1566            0 :         }
    1567              : 
    1568            0 :         if (change->data.tp.newtuple)
    1569              :         {
    1570            0 :                 new_slot = relentry->new_slot;
    1571            0 :                 ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
    1572              : 
    1573              :                 /* Convert tuple if needed. */
    1574            0 :                 if (relentry->attrmap)
    1575              :                 {
    1576            0 :                         TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1577              :                                                                                                           &TTSOpsVirtual);
    1578              : 
    1579            0 :                         new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
    1580            0 :                 }
    1581            0 :         }
    1582              : 
    1583              :         /*
    1584              :          * Check row filter.
    1585              :          *
    1586              :          * Updates could be transformed to inserts or deletes based on the results
    1587              :          * of the row filter for old and new tuple.
    1588              :          */
    1589            0 :         if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
    1590            0 :                 goto cleanup;
    1591              : 
    1592              :         /*
    1593              :          * Send BEGIN if we haven't yet.
    1594              :          *
    1595              :          * We send the BEGIN message after ensuring that we will actually send the
    1596              :          * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
    1597              :          * transactions.
    1598              :          */
    1599            0 :         if (txndata && !txndata->sent_begin_txn)
    1600            0 :                 pgoutput_send_begin(ctx, txn);
    1601              : 
    1602              :         /*
    1603              :          * Schema should be sent using the original relation because it also sends
    1604              :          * the ancestor's relation.
    1605              :          */
    1606            0 :         maybe_send_schema(ctx, change, relation, relentry);
    1607              : 
    1608            0 :         OutputPluginPrepareWrite(ctx, true);
    1609              : 
    1610              :         /* Send the data */
    1611            0 :         switch (action)
    1612              :         {
    1613              :                 case REORDER_BUFFER_CHANGE_INSERT:
    1614            0 :                         logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
    1615            0 :                                                                         data->binary, relentry->columns,
    1616            0 :                                                                         relentry->include_gencols_type);
    1617            0 :                         break;
    1618              :                 case REORDER_BUFFER_CHANGE_UPDATE:
    1619            0 :                         logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
    1620            0 :                                                                         new_slot, data->binary, relentry->columns,
    1621            0 :                                                                         relentry->include_gencols_type);
    1622            0 :                         break;
    1623              :                 case REORDER_BUFFER_CHANGE_DELETE:
    1624            0 :                         logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
    1625            0 :                                                                         data->binary, relentry->columns,
    1626            0 :                                                                         relentry->include_gencols_type);
    1627            0 :                         break;
    1628              :                 default:
    1629            0 :                         Assert(false);
    1630            0 :         }
    1631              : 
    1632            0 :         OutputPluginWrite(ctx, true);
    1633              : 
    1634              : cleanup:
    1635            0 :         if (RelationIsValid(ancestor))
    1636              :         {
    1637            0 :                 RelationClose(ancestor);
    1638            0 :                 ancestor = NULL;
    1639            0 :         }
    1640              : 
    1641              :         /* Drop the new slots that were used to store the converted tuples. */
    1642            0 :         if (relentry->attrmap)
    1643              :         {
    1644            0 :                 if (old_slot)
    1645            0 :                         ExecDropSingleTupleTableSlot(old_slot);
    1646              : 
    1647            0 :                 if (new_slot)
    1648            0 :                         ExecDropSingleTupleTableSlot(new_slot);
    1649            0 :         }
    1650              : 
    1651            0 :         MemoryContextSwitchTo(old);
    1652            0 :         MemoryContextReset(data->context);
    1653            0 : }
    1654              : 
    1655              : static void
    1656            0 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1657              :                                   int nrelations, Relation relations[], ReorderBufferChange *change)
    1658              : {
    1659            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1660            0 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1661            0 :         MemoryContext old;
    1662            0 :         RelationSyncEntry *relentry;
    1663            0 :         int                     i;
    1664            0 :         int                     nrelids;
    1665            0 :         Oid                *relids;
    1666            0 :         TransactionId xid = InvalidTransactionId;
    1667              : 
    1668              :         /* Remember the xid for the change in streaming mode. See pgoutput_change. */
    1669            0 :         if (data->in_streaming)
    1670            0 :                 xid = change->txn->xid;
    1671              : 
    1672            0 :         old = MemoryContextSwitchTo(data->context);
    1673              : 
    1674            0 :         relids = palloc0(nrelations * sizeof(Oid));
    1675            0 :         nrelids = 0;
    1676              : 
    1677            0 :         for (i = 0; i < nrelations; i++)
    1678              :         {
    1679            0 :                 Relation        relation = relations[i];
    1680            0 :                 Oid                     relid = RelationGetRelid(relation);
    1681              : 
    1682            0 :                 if (!is_publishable_relation(relation))
    1683            0 :                         continue;
    1684              : 
    1685            0 :                 relentry = get_rel_sync_entry(data, relation);
    1686              : 
    1687            0 :                 if (!relentry->pubactions.pubtruncate)
    1688            0 :                         continue;
    1689              : 
    1690              :                 /*
    1691              :                  * Don't send partitions if the publication wants to send only the
    1692              :                  * root tables through it.
    1693              :                  */
    1694            0 :                 if (relation->rd_rel->relispartition &&
    1695            0 :                         relentry->publish_as_relid != relid)
    1696            0 :                         continue;
    1697              : 
    1698            0 :                 relids[nrelids++] = relid;
    1699              : 
    1700              :                 /* Send BEGIN if we haven't yet */
    1701            0 :                 if (txndata && !txndata->sent_begin_txn)
    1702            0 :                         pgoutput_send_begin(ctx, txn);
    1703              : 
    1704            0 :                 maybe_send_schema(ctx, change, relation, relentry);
    1705            0 :         }
    1706              : 
    1707            0 :         if (nrelids > 0)
    1708              :         {
    1709            0 :                 OutputPluginPrepareWrite(ctx, true);
    1710            0 :                 logicalrep_write_truncate(ctx->out,
    1711            0 :                                                                   xid,
    1712            0 :                                                                   nrelids,
    1713            0 :                                                                   relids,
    1714            0 :                                                                   change->data.truncate.cascade,
    1715            0 :                                                                   change->data.truncate.restart_seqs);
    1716            0 :                 OutputPluginWrite(ctx, true);
    1717            0 :         }
    1718              : 
    1719            0 :         MemoryContextSwitchTo(old);
    1720            0 :         MemoryContextReset(data->context);
    1721            0 : }
    1722              : 
    1723              : static void
    1724            0 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1725              :                                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
    1726              :                                  const char *message)
    1727              : {
    1728            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1729            0 :         TransactionId xid = InvalidTransactionId;
    1730              : 
    1731            0 :         if (!data->messages)
    1732            0 :                 return;
    1733              : 
    1734              :         /*
    1735              :          * Remember the xid for the message in streaming mode. See
    1736              :          * pgoutput_change.
    1737              :          */
    1738            0 :         if (data->in_streaming)
    1739            0 :                 xid = txn->xid;
    1740              : 
    1741              :         /*
    1742              :          * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
    1743              :          */
    1744            0 :         if (transactional)
    1745              :         {
    1746            0 :                 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1747              : 
    1748              :                 /* Send BEGIN if we haven't yet */
    1749            0 :                 if (txndata && !txndata->sent_begin_txn)
    1750            0 :                         pgoutput_send_begin(ctx, txn);
    1751            0 :         }
    1752              : 
    1753            0 :         OutputPluginPrepareWrite(ctx, true);
    1754            0 :         logicalrep_write_message(ctx->out,
    1755            0 :                                                          xid,
    1756            0 :                                                          message_lsn,
    1757            0 :                                                          transactional,
    1758            0 :                                                          prefix,
    1759            0 :                                                          sz,
    1760            0 :                                                          message);
    1761            0 :         OutputPluginWrite(ctx, true);
    1762            0 : }
    1763              : 
    1764              : /*
    1765              :  * Return true if the data is associated with an origin and the user has
    1766              :  * requested the changes that don't have an origin, false otherwise.
    1767              :  */
    1768              : static bool
    1769            0 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
    1770              :                                            RepOriginId origin_id)
    1771              : {
    1772            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1773              : 
    1774            0 :         if (data->publish_no_origin && origin_id != InvalidRepOriginId)
    1775            0 :                 return true;
    1776              : 
    1777            0 :         return false;
    1778            0 : }
    1779              : 
    1780              : /*
    1781              :  * Shutdown the output plugin.
    1782              :  *
    1783              :  * Note, we don't need to clean the data->context, data->cachectx, and
    1784              :  * data->pubctx as they are child contexts of the ctx->context so they
    1785              :  * will be cleaned up by logical decoding machinery.
    1786              :  */
    1787              : static void
    1788            0 : pgoutput_shutdown(LogicalDecodingContext *ctx)
    1789              : {
    1790            0 :         pgoutput_memory_context_reset(NULL);
    1791            0 : }
    1792              : 
    1793              : /*
    1794              :  * Load publications from the list of publication names.
    1795              :  *
    1796              :  * Here, we skip the publications that don't exist yet. This will allow us
    1797              :  * to silently continue the replication in the absence of a missing publication.
    1798              :  * This is required because we allow the users to create publications after they
    1799              :  * have specified the required publications at the time of replication start.
    1800              :  */
    1801              : static List *
    1802            0 : LoadPublications(List *pubnames)
    1803              : {
    1804            0 :         List       *result = NIL;
    1805            0 :         ListCell   *lc;
    1806              : 
    1807            0 :         foreach(lc, pubnames)
    1808              :         {
    1809            0 :                 char       *pubname = (char *) lfirst(lc);
    1810            0 :                 Publication *pub = GetPublicationByName(pubname, true);
    1811              : 
    1812            0 :                 if (pub)
    1813            0 :                         result = lappend(result, pub);
    1814              :                 else
    1815            0 :                         ereport(WARNING,
    1816              :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1817              :                                         errmsg("skipped loading publication \"%s\"", pubname),
    1818              :                                         errdetail("The publication does not exist at this point in the WAL."),
    1819              :                                         errhint("Create the publication if it does not exist."));
    1820            0 :         }
    1821              : 
    1822            0 :         return result;
    1823            0 : }
    1824              : 
    1825              : /*
    1826              :  * Publication syscache invalidation callback.
    1827              :  *
    1828              :  * Called for invalidations on pg_publication.
    1829              :  */
    1830              : static void
    1831            0 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    1832              : {
    1833            0 :         publications_valid = false;
    1834            0 : }
    1835              : 
    1836              : /*
    1837              :  * START STREAM callback
    1838              :  */
    1839              : static void
    1840            0 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
    1841              :                                           ReorderBufferTXN *txn)
    1842              : {
    1843            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1844            0 :         bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
    1845              : 
    1846              :         /* we can't nest streaming of transactions */
    1847            0 :         Assert(!data->in_streaming);
    1848              : 
    1849              :         /*
    1850              :          * If we already sent the first stream for this transaction then don't
    1851              :          * send the origin id in the subsequent streams.
    1852              :          */
    1853            0 :         if (rbtxn_is_streamed(txn))
    1854            0 :                 send_replication_origin = false;
    1855              : 
    1856            0 :         OutputPluginPrepareWrite(ctx, !send_replication_origin);
    1857            0 :         logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
    1858              : 
    1859            0 :         send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
    1860            0 :                                          send_replication_origin);
    1861              : 
    1862            0 :         OutputPluginWrite(ctx, true);
    1863              : 
    1864              :         /* we're streaming a chunk of transaction now */
    1865            0 :         data->in_streaming = true;
    1866            0 : }
    1867              : 
    1868              : /*
    1869              :  * STOP STREAM callback
    1870              :  */
    1871              : static void
    1872            0 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
    1873              :                                          ReorderBufferTXN *txn)
    1874              : {
    1875            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1876              : 
    1877              :         /* we should be streaming a transaction */
    1878            0 :         Assert(data->in_streaming);
    1879              : 
    1880            0 :         OutputPluginPrepareWrite(ctx, true);
    1881            0 :         logicalrep_write_stream_stop(ctx->out);
    1882            0 :         OutputPluginWrite(ctx, true);
    1883              : 
    1884              :         /* we've stopped streaming a transaction */
    1885            0 :         data->in_streaming = false;
    1886            0 : }
    1887              : 
    1888              : /*
    1889              :  * Notify downstream to discard the streamed transaction (along with all
    1890              :  * its subtransactions, if it's a toplevel transaction).
    1891              :  */
    1892              : static void
    1893            0 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    1894              :                                           ReorderBufferTXN *txn,
    1895              :                                           XLogRecPtr abort_lsn)
    1896              : {
    1897            0 :         ReorderBufferTXN *toptxn;
    1898            0 :         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1899            0 :         bool            write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
    1900              : 
    1901              :         /*
    1902              :          * The abort should happen outside streaming block, even for streamed
    1903              :          * transactions. The transaction has to be marked as streamed, though.
    1904              :          */
    1905            0 :         Assert(!data->in_streaming);
    1906              : 
    1907              :         /* determine the toplevel transaction */
    1908            0 :         toptxn = rbtxn_get_toptxn(txn);
    1909              : 
    1910            0 :         Assert(rbtxn_is_streamed(toptxn));
    1911              : 
    1912            0 :         OutputPluginPrepareWrite(ctx, true);
    1913            0 :         logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
    1914            0 :                                                                   txn->abort_time, write_abort_info);
    1915              : 
    1916            0 :         OutputPluginWrite(ctx, true);
    1917              : 
    1918            0 :         cleanup_rel_sync_cache(toptxn->xid, false);
    1919            0 : }
    1920              : 
    1921              : /*
    1922              :  * Notify downstream to apply the streamed transaction (along with all
    1923              :  * its subtransactions).
    1924              :  */
    1925              : static void
    1926            0 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    1927              :                                            ReorderBufferTXN *txn,
    1928              :                                            XLogRecPtr commit_lsn)
    1929              : {
    1930            0 :         PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
    1931              : 
    1932              :         /*
    1933              :          * The commit should happen outside streaming block, even for streamed
    1934              :          * transactions. The transaction has to be marked as streamed, though.
    1935              :          */
    1936            0 :         Assert(!data->in_streaming);
    1937            0 :         Assert(rbtxn_is_streamed(txn));
    1938              : 
    1939            0 :         OutputPluginUpdateProgress(ctx, false);
    1940              : 
    1941            0 :         OutputPluginPrepareWrite(ctx, true);
    1942            0 :         logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
    1943            0 :         OutputPluginWrite(ctx, true);
    1944              : 
    1945            0 :         cleanup_rel_sync_cache(txn->xid, true);
    1946            0 : }
    1947              : 
    1948              : /*
    1949              :  * PREPARE callback (for streaming two-phase commit).
    1950              :  *
    1951              :  * Notify the downstream to prepare the transaction.
    1952              :  */
    1953              : static void
    1954            0 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
    1955              :                                                         ReorderBufferTXN *txn,
    1956              :                                                         XLogRecPtr prepare_lsn)
    1957              : {
    1958            0 :         Assert(rbtxn_is_streamed(txn));
    1959              : 
    1960            0 :         OutputPluginUpdateProgress(ctx, false);
    1961            0 :         OutputPluginPrepareWrite(ctx, true);
    1962            0 :         logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
    1963            0 :         OutputPluginWrite(ctx, true);
    1964            0 : }
    1965              : 
    1966              : /*
    1967              :  * Initialize the relation schema sync cache for a decoding session.
    1968              :  *
    1969              :  * The hash table is destroyed at the end of a decoding session. While
    1970              :  * relcache invalidations still exist and will still be invoked, they
    1971              :  * will just see the null hash table global and take no action.
    1972              :  */
    1973              : static void
    1974            0 : init_rel_sync_cache(MemoryContext cachectx)
    1975              : {
    1976            0 :         HASHCTL         ctl;
    1977              :         static bool relation_callbacks_registered = false;
    1978              : 
    1979              :         /* Nothing to do if hash table already exists */
    1980            0 :         if (RelationSyncCache != NULL)
    1981            0 :                 return;
    1982              : 
    1983              :         /* Make a new hash table for the cache */
    1984            0 :         ctl.keysize = sizeof(Oid);
    1985            0 :         ctl.entrysize = sizeof(RelationSyncEntry);
    1986            0 :         ctl.hcxt = cachectx;
    1987              : 
    1988            0 :         RelationSyncCache = hash_create("logical replication output relation cache",
    1989              :                                                                         128, &ctl,
    1990              :                                                                         HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
    1991              : 
    1992            0 :         Assert(RelationSyncCache != NULL);
    1993              : 
    1994              :         /* No more to do if we already registered callbacks */
    1995            0 :         if (relation_callbacks_registered)
    1996            0 :                 return;
    1997              : 
    1998              :         /* We must update the cache entry for a relation after a relcache flush */
    1999            0 :         CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
    2000              : 
    2001              :         /*
    2002              :          * Flush all cache entries after a pg_namespace change, in case it was a
    2003              :          * schema rename affecting a relation being replicated.
    2004              :          *
    2005              :          * XXX: It is not a good idea to invalidate all the relation entries in
    2006              :          * RelationSyncCache on schema rename. We can optimize it to invalidate
    2007              :          * only the required relations by either having a specific invalidation
    2008              :          * message containing impacted relations or by having schema information
    2009              :          * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
    2010              :          * passed to the callback.
    2011              :          */
    2012            0 :         CacheRegisterSyscacheCallback(NAMESPACEOID,
    2013              :                                                                   rel_sync_cache_publication_cb,
    2014              :                                                                   (Datum) 0);
    2015              : 
    2016            0 :         relation_callbacks_registered = true;
    2017            0 : }
    2018              : 
    2019              : /*
    2020              :  * We expect relatively small number of streamed transactions.
    2021              :  */
    2022              : static bool
    2023            0 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    2024              : {
    2025            0 :         return list_member_xid(entry->streamed_txns, xid);
    2026              : }
    2027              : 
    2028              : /*
    2029              :  * Add the xid in the rel sync entry for which we have already sent the schema
    2030              :  * of the relation.
    2031              :  */
    2032              : static void
    2033            0 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    2034              : {
    2035            0 :         MemoryContext oldctx;
    2036              : 
    2037            0 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    2038              : 
    2039            0 :         entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
    2040              : 
    2041            0 :         MemoryContextSwitchTo(oldctx);
    2042            0 : }
    2043              : 
    2044              : /*
    2045              :  * Find or create entry in the relation schema cache.
    2046              :  *
    2047              :  * This looks up publications that the given relation is directly or
    2048              :  * indirectly part of (the latter if it's really the relation's ancestor that
    2049              :  * is part of a publication) and fills up the found entry with the information
    2050              :  * about which operations to publish and whether to use an ancestor's schema
    2051              :  * when publishing.
    2052              :  */
    2053              : static RelationSyncEntry *
    2054            0 : get_rel_sync_entry(PGOutputData *data, Relation relation)
    2055              : {
    2056            0 :         RelationSyncEntry *entry;
    2057            0 :         bool            found;
    2058            0 :         MemoryContext oldctx;
    2059            0 :         Oid                     relid = RelationGetRelid(relation);
    2060              : 
    2061            0 :         Assert(RelationSyncCache != NULL);
    2062              : 
    2063              :         /* Find cached relation info, creating if not found */
    2064            0 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
    2065              :                                                                                           &relid,
    2066              :                                                                                           HASH_ENTER, &found);
    2067            0 :         Assert(entry != NULL);
    2068              : 
    2069              :         /* initialize entry, if it's new */
    2070            0 :         if (!found)
    2071              :         {
    2072            0 :                 entry->replicate_valid = false;
    2073            0 :                 entry->schema_sent = false;
    2074            0 :                 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    2075            0 :                 entry->streamed_txns = NIL;
    2076            0 :                 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
    2077            0 :                         entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
    2078            0 :                 entry->new_slot = NULL;
    2079            0 :                 entry->old_slot = NULL;
    2080            0 :                 memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2081            0 :                 entry->entry_cxt = NULL;
    2082            0 :                 entry->publish_as_relid = InvalidOid;
    2083            0 :                 entry->columns = NULL;
    2084            0 :                 entry->attrmap = NULL;
    2085            0 :         }
    2086              : 
    2087              :         /* Validate the entry */
    2088            0 :         if (!entry->replicate_valid)
    2089              :         {
    2090            0 :                 Oid                     schemaId = get_rel_namespace(relid);
    2091            0 :                 List       *pubids = GetRelationPublications(relid);
    2092              : 
    2093              :                 /*
    2094              :                  * We don't acquire a lock on the namespace system table as we build
    2095              :                  * the cache entry using a historic snapshot and all the later changes
    2096              :                  * are absorbed while decoding WAL.
    2097              :                  */
    2098            0 :                 List       *schemaPubids = GetSchemaPublications(schemaId);
    2099            0 :                 ListCell   *lc;
    2100            0 :                 Oid                     publish_as_relid = relid;
    2101            0 :                 int                     publish_ancestor_level = 0;
    2102            0 :                 bool            am_partition = get_rel_relispartition(relid);
    2103            0 :                 char            relkind = get_rel_relkind(relid);
    2104            0 :                 List       *rel_publications = NIL;
    2105              : 
    2106              :                 /* Reload publications if needed before use. */
    2107            0 :                 if (!publications_valid)
    2108              :                 {
    2109            0 :                         MemoryContextReset(data->pubctx);
    2110              : 
    2111            0 :                         oldctx = MemoryContextSwitchTo(data->pubctx);
    2112            0 :                         data->publications = LoadPublications(data->publication_names);
    2113            0 :                         MemoryContextSwitchTo(oldctx);
    2114            0 :                         publications_valid = true;
    2115            0 :                 }
    2116              : 
    2117              :                 /*
    2118              :                  * Reset schema_sent status as the relation definition may have
    2119              :                  * changed.  Also reset pubactions to empty in case rel was dropped
    2120              :                  * from a publication.  Also free any objects that depended on the
    2121              :                  * earlier definition.
    2122              :                  */
    2123            0 :                 entry->schema_sent = false;
    2124            0 :                 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    2125            0 :                 list_free(entry->streamed_txns);
    2126            0 :                 entry->streamed_txns = NIL;
    2127            0 :                 bms_free(entry->columns);
    2128            0 :                 entry->columns = NULL;
    2129            0 :                 entry->pubactions.pubinsert = false;
    2130            0 :                 entry->pubactions.pubupdate = false;
    2131            0 :                 entry->pubactions.pubdelete = false;
    2132            0 :                 entry->pubactions.pubtruncate = false;
    2133              : 
    2134              :                 /*
    2135              :                  * Tuple slots cleanups. (Will be rebuilt later if needed).
    2136              :                  */
    2137            0 :                 if (entry->old_slot)
    2138              :                 {
    2139            0 :                         TupleDesc       desc = entry->old_slot->tts_tupleDescriptor;
    2140              : 
    2141            0 :                         Assert(desc->tdrefcount == -1);
    2142              : 
    2143            0 :                         ExecDropSingleTupleTableSlot(entry->old_slot);
    2144              : 
    2145              :                         /*
    2146              :                          * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2147              :                          * do it now to avoid any leaks.
    2148              :                          */
    2149            0 :                         FreeTupleDesc(desc);
    2150            0 :                 }
    2151            0 :                 if (entry->new_slot)
    2152              :                 {
    2153            0 :                         TupleDesc       desc = entry->new_slot->tts_tupleDescriptor;
    2154              : 
    2155            0 :                         Assert(desc->tdrefcount == -1);
    2156              : 
    2157            0 :                         ExecDropSingleTupleTableSlot(entry->new_slot);
    2158              : 
    2159              :                         /*
    2160              :                          * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2161              :                          * do it now to avoid any leaks.
    2162              :                          */
    2163            0 :                         FreeTupleDesc(desc);
    2164            0 :                 }
    2165              : 
    2166            0 :                 entry->old_slot = NULL;
    2167            0 :                 entry->new_slot = NULL;
    2168              : 
    2169            0 :                 if (entry->attrmap)
    2170            0 :                         free_attrmap(entry->attrmap);
    2171            0 :                 entry->attrmap = NULL;
    2172              : 
    2173              :                 /*
    2174              :                  * Row filter cache cleanups.
    2175              :                  */
    2176            0 :                 if (entry->entry_cxt)
    2177            0 :                         MemoryContextDelete(entry->entry_cxt);
    2178              : 
    2179            0 :                 entry->entry_cxt = NULL;
    2180            0 :                 entry->estate = NULL;
    2181            0 :                 memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2182              : 
    2183              :                 /*
    2184              :                  * Build publication cache. We can't use one provided by relcache as
    2185              :                  * relcache considers all publications that the given relation is in,
    2186              :                  * but here we only need to consider ones that the subscriber
    2187              :                  * requested.
    2188              :                  */
    2189            0 :                 foreach(lc, data->publications)
    2190              :                 {
    2191            0 :                         Publication *pub = lfirst(lc);
    2192            0 :                         bool            publish = false;
    2193              : 
    2194              :                         /*
    2195              :                          * Under what relid should we publish changes in this publication?
    2196              :                          * We'll use the top-most relid across all publications. Also
    2197              :                          * track the ancestor level for this publication.
    2198              :                          */
    2199            0 :                         Oid                     pub_relid = relid;
    2200            0 :                         int                     ancestor_level = 0;
    2201              : 
    2202              :                         /*
    2203              :                          * If this is a FOR ALL TABLES publication, pick the partition
    2204              :                          * root and set the ancestor level accordingly.
    2205              :                          */
    2206            0 :                         if (pub->alltables)
    2207              :                         {
    2208            0 :                                 publish = true;
    2209            0 :                                 if (pub->pubviaroot && am_partition)
    2210              :                                 {
    2211            0 :                                         List       *ancestors = get_partition_ancestors(relid);
    2212              : 
    2213            0 :                                         pub_relid = llast_oid(ancestors);
    2214            0 :                                         ancestor_level = list_length(ancestors);
    2215            0 :                                 }
    2216            0 :                         }
    2217              : 
    2218            0 :                         if (!publish)
    2219              :                         {
    2220            0 :                                 bool            ancestor_published = false;
    2221              : 
    2222              :                                 /*
    2223              :                                  * For a partition, check if any of the ancestors are
    2224              :                                  * published.  If so, note down the topmost ancestor that is
    2225              :                                  * published via this publication, which will be used as the
    2226              :                                  * relation via which to publish the partition's changes.
    2227              :                                  */
    2228            0 :                                 if (am_partition)
    2229              :                                 {
    2230            0 :                                         Oid                     ancestor;
    2231            0 :                                         int                     level;
    2232            0 :                                         List       *ancestors = get_partition_ancestors(relid);
    2233              : 
    2234            0 :                                         ancestor = GetTopMostAncestorInPublication(pub->oid,
    2235            0 :                                                                                                                            ancestors,
    2236              :                                                                                                                            &level);
    2237              : 
    2238            0 :                                         if (ancestor != InvalidOid)
    2239              :                                         {
    2240            0 :                                                 ancestor_published = true;
    2241            0 :                                                 if (pub->pubviaroot)
    2242              :                                                 {
    2243            0 :                                                         pub_relid = ancestor;
    2244            0 :                                                         ancestor_level = level;
    2245            0 :                                                 }
    2246            0 :                                         }
    2247            0 :                                 }
    2248              : 
    2249            0 :                                 if (list_member_oid(pubids, pub->oid) ||
    2250            0 :                                         list_member_oid(schemaPubids, pub->oid) ||
    2251            0 :                                         ancestor_published)
    2252            0 :                                         publish = true;
    2253            0 :                         }
    2254              : 
    2255              :                         /*
    2256              :                          * If the relation is to be published, determine actions to
    2257              :                          * publish, and list of columns, if appropriate.
    2258              :                          *
    2259              :                          * Don't publish changes for partitioned tables, because
    2260              :                          * publishing those of its partitions suffices, unless partition
    2261              :                          * changes won't be published due to pubviaroot being set.
    2262              :                          */
    2263            0 :                         if (publish &&
    2264            0 :                                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    2265              :                         {
    2266            0 :                                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    2267            0 :                                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    2268            0 :                                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    2269            0 :                                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    2270              : 
    2271              :                                 /*
    2272              :                                  * We want to publish the changes as the top-most ancestor
    2273              :                                  * across all publications. So we need to check if the already
    2274              :                                  * calculated level is higher than the new one. If yes, we can
    2275              :                                  * ignore the new value (as it's a child). Otherwise the new
    2276              :                                  * value is an ancestor, so we keep it.
    2277              :                                  */
    2278            0 :                                 if (publish_ancestor_level > ancestor_level)
    2279            0 :                                         continue;
    2280              : 
    2281              :                                 /*
    2282              :                                  * If we found an ancestor higher up in the tree, discard the
    2283              :                                  * list of publications through which we replicate it, and use
    2284              :                                  * the new ancestor.
    2285              :                                  */
    2286            0 :                                 if (publish_ancestor_level < ancestor_level)
    2287              :                                 {
    2288            0 :                                         publish_as_relid = pub_relid;
    2289            0 :                                         publish_ancestor_level = ancestor_level;
    2290              : 
    2291              :                                         /* reset the publication list for this relation */
    2292            0 :                                         rel_publications = NIL;
    2293            0 :                                 }
    2294              :                                 else
    2295              :                                 {
    2296              :                                         /* Same ancestor level, has to be the same OID. */
    2297            0 :                                         Assert(publish_as_relid == pub_relid);
    2298              :                                 }
    2299              : 
    2300              :                                 /* Track publications for this ancestor. */
    2301            0 :                                 rel_publications = lappend(rel_publications, pub);
    2302            0 :                         }
    2303            0 :                 }
    2304              : 
    2305            0 :                 entry->publish_as_relid = publish_as_relid;
    2306              : 
    2307              :                 /*
    2308              :                  * Initialize the tuple slot, map, and row filter. These are only used
    2309              :                  * when publishing inserts, updates, or deletes.
    2310              :                  */
    2311            0 :                 if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
    2312            0 :                         entry->pubactions.pubdelete)
    2313              :                 {
    2314              :                         /* Initialize the tuple slot and map */
    2315            0 :                         init_tuple_slot(data, relation, entry);
    2316              : 
    2317              :                         /* Initialize the row filter */
    2318            0 :                         pgoutput_row_filter_init(data, rel_publications, entry);
    2319              : 
    2320              :                         /* Check whether to publish generated columns. */
    2321            0 :                         check_and_init_gencol(data, rel_publications, entry);
    2322              : 
    2323              :                         /* Initialize the column list */
    2324            0 :                         pgoutput_column_list_init(data, rel_publications, entry);
    2325            0 :                 }
    2326              : 
    2327            0 :                 list_free(pubids);
    2328            0 :                 list_free(schemaPubids);
    2329            0 :                 list_free(rel_publications);
    2330              : 
    2331            0 :                 entry->replicate_valid = true;
    2332            0 :         }
    2333              : 
    2334            0 :         return entry;
    2335            0 : }
    2336              : 
    2337              : /*
    2338              :  * Cleanup list of streamed transactions and update the schema_sent flag.
    2339              :  *
    2340              :  * When a streamed transaction commits or aborts, we need to remove the
    2341              :  * toplevel XID from the schema cache. If the transaction aborted, the
    2342              :  * subscriber will simply throw away the schema records we streamed, so
    2343              :  * we don't need to do anything else.
    2344              :  *
    2345              :  * If the transaction is committed, the subscriber will update the relation
    2346              :  * cache - so tweak the schema_sent flag accordingly.
    2347              :  */
    2348              : static void
    2349            0 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    2350              : {
    2351            0 :         HASH_SEQ_STATUS hash_seq;
    2352            0 :         RelationSyncEntry *entry;
    2353              : 
    2354            0 :         Assert(RelationSyncCache != NULL);
    2355              : 
    2356            0 :         hash_seq_init(&hash_seq, RelationSyncCache);
    2357            0 :         while ((entry = hash_seq_search(&hash_seq)) != NULL)
    2358              :         {
    2359              :                 /*
    2360              :                  * We can set the schema_sent flag for an entry that has committed xid
    2361              :                  * in the list as that ensures that the subscriber would have the
    2362              :                  * corresponding schema and we don't need to send it unless there is
    2363              :                  * any invalidation for that relation.
    2364              :                  */
    2365            0 :                 foreach_xid(streamed_txn, entry->streamed_txns)
    2366              :                 {
    2367            0 :                         if (xid == streamed_txn)
    2368              :                         {
    2369            0 :                                 if (is_commit)
    2370            0 :                                         entry->schema_sent = true;
    2371              : 
    2372            0 :                                 entry->streamed_txns =
    2373            0 :                                         foreach_delete_current(entry->streamed_txns, streamed_txn);
    2374            0 :                                 break;
    2375              :                         }
    2376            0 :                 }
    2377              :         }
    2378            0 : }
    2379              : 
    2380              : /*
    2381              :  * Relcache invalidation callback
    2382              :  */
    2383              : static void
    2384            0 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    2385              : {
    2386            0 :         RelationSyncEntry *entry;
    2387              : 
    2388              :         /*
    2389              :          * We can get here if the plugin was used in SQL interface as the
    2390              :          * RelationSyncCache is destroyed when the decoding finishes, but there is
    2391              :          * no way to unregister the relcache invalidation callback.
    2392              :          */
    2393            0 :         if (RelationSyncCache == NULL)
    2394            0 :                 return;
    2395              : 
    2396              :         /*
    2397              :          * Nobody keeps pointers to entries in this hash table around outside
    2398              :          * logical decoding callback calls - but invalidation events can come in
    2399              :          * *during* a callback if we do any syscache access in the callback.
    2400              :          * Because of that we must mark the cache entry as invalid but not damage
    2401              :          * any of its substructure here.  The next get_rel_sync_entry() call will
    2402              :          * rebuild it all.
    2403              :          */
    2404            0 :         if (OidIsValid(relid))
    2405              :         {
    2406              :                 /*
    2407              :                  * Getting invalidations for relations that aren't in the table is
    2408              :                  * entirely normal.  So we don't care if it's found or not.
    2409              :                  */
    2410            0 :                 entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    2411              :                                                                                                   HASH_FIND, NULL);
    2412            0 :                 if (entry != NULL)
    2413            0 :                         entry->replicate_valid = false;
    2414            0 :         }
    2415              :         else
    2416              :         {
    2417              :                 /* Whole cache must be flushed. */
    2418            0 :                 HASH_SEQ_STATUS status;
    2419              : 
    2420            0 :                 hash_seq_init(&status, RelationSyncCache);
    2421            0 :                 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2422              :                 {
    2423            0 :                         entry->replicate_valid = false;
    2424              :                 }
    2425            0 :         }
    2426            0 : }
    2427              : 
    2428              : /*
    2429              :  * Publication relation/schema map syscache invalidation callback
    2430              :  *
    2431              :  * Called for invalidations on pg_namespace.
    2432              :  */
    2433              : static void
    2434            0 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    2435              : {
    2436            0 :         HASH_SEQ_STATUS status;
    2437            0 :         RelationSyncEntry *entry;
    2438              : 
    2439              :         /*
    2440              :          * We can get here if the plugin was used in SQL interface as the
    2441              :          * RelationSyncCache is destroyed when the decoding finishes, but there is
    2442              :          * no way to unregister the invalidation callbacks.
    2443              :          */
    2444            0 :         if (RelationSyncCache == NULL)
    2445            0 :                 return;
    2446              : 
    2447              :         /*
    2448              :          * We have no easy way to identify which cache entries this invalidation
    2449              :          * event might have affected, so just mark them all invalid.
    2450              :          */
    2451            0 :         hash_seq_init(&status, RelationSyncCache);
    2452            0 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2453              :         {
    2454            0 :                 entry->replicate_valid = false;
    2455              :         }
    2456            0 : }
    2457              : 
    2458              : /* Send Replication origin */
    2459              : static void
    2460            0 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
    2461              :                                  XLogRecPtr origin_lsn, bool send_origin)
    2462              : {
    2463            0 :         if (send_origin)
    2464              :         {
    2465            0 :                 char       *origin;
    2466              : 
    2467              :                 /*----------
    2468              :                  * XXX: which behaviour do we want here?
    2469              :                  *
    2470              :                  * Alternatives:
    2471              :                  *  - don't send origin message if origin name not found
    2472              :                  *    (that's what we do now)
    2473              :                  *  - throw error - that will break replication, not good
    2474              :                  *  - send some special "unknown" origin
    2475              :                  *----------
    2476              :                  */
    2477            0 :                 if (replorigin_by_oid(origin_id, true, &origin))
    2478              :                 {
    2479              :                         /* Message boundary */
    2480            0 :                         OutputPluginWrite(ctx, false);
    2481            0 :                         OutputPluginPrepareWrite(ctx, true);
    2482              : 
    2483            0 :                         logicalrep_write_origin(ctx->out, origin, origin_lsn);
    2484            0 :                 }
    2485            0 :         }
    2486            0 : }
        

Generated by: LCOV version 2.3.2-1