LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 478 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 28 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * test_decoding.c
       4              :  *                example logical decoding output plugin
       5              :  *
       6              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *                contrib/test_decoding/test_decoding.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #include "catalog/pg_type.h"
      16              : 
      17              : #include "replication/logical.h"
      18              : #include "replication/origin.h"
      19              : 
      20              : #include "utils/builtins.h"
      21              : #include "utils/lsyscache.h"
      22              : #include "utils/memutils.h"
      23              : #include "utils/rel.h"
      24              : 
      25            0 : PG_MODULE_MAGIC_EXT(
      26              :                                         .name = "test_decoding",
      27              :                                         .version = PG_VERSION
      28              : );
      29              : 
      30              : typedef struct
      31              : {
      32              :         MemoryContext context;
      33              :         bool            include_xids;
      34              :         bool            include_timestamp;
      35              :         bool            skip_empty_xacts;
      36              :         bool            only_local;
      37              : } TestDecodingData;
      38              : 
      39              : /*
      40              :  * Maintain the per-transaction level variables to track whether the
      41              :  * transaction and or streams have written any changes. In streaming mode the
      42              :  * transaction can be decoded in streams so along with maintaining whether the
      43              :  * transaction has written any changes, we also need to track whether the
      44              :  * current stream has written any changes. This is required so that if user
      45              :  * has requested to skip the empty transactions we can skip the empty streams
      46              :  * even though the transaction has written some changes.
      47              :  */
      48              : typedef struct
      49              : {
      50              :         bool            xact_wrote_changes;
      51              :         bool            stream_wrote_changes;
      52              : } TestDecodingTxnData;
      53              : 
      54              : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      55              :                                                           bool is_init);
      56              : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      57              : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      58              :                                                                 ReorderBufferTXN *txn);
      59              : static void pg_output_begin(LogicalDecodingContext *ctx,
      60              :                                                         TestDecodingData *data,
      61              :                                                         ReorderBufferTXN *txn,
      62              :                                                         bool last_write);
      63              : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      64              :                                                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      65              : static void pg_decode_change(LogicalDecodingContext *ctx,
      66              :                                                          ReorderBufferTXN *txn, Relation relation,
      67              :                                                          ReorderBufferChange *change);
      68              : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      69              :                                                            ReorderBufferTXN *txn,
      70              :                                                            int nrelations, Relation relations[],
      71              :                                                            ReorderBufferChange *change);
      72              : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      73              :                                                          RepOriginId origin_id);
      74              : static void pg_decode_message(LogicalDecodingContext *ctx,
      75              :                                                           ReorderBufferTXN *txn, XLogRecPtr lsn,
      76              :                                                           bool transactional, const char *prefix,
      77              :                                                           Size sz, const char *message);
      78              : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
      79              :                                                                          TransactionId xid,
      80              :                                                                          const char *gid);
      81              : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
      82              :                                                                                 ReorderBufferTXN *txn);
      83              : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
      84              :                                                                   ReorderBufferTXN *txn,
      85              :                                                                   XLogRecPtr prepare_lsn);
      86              : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
      87              :                                                                                   ReorderBufferTXN *txn,
      88              :                                                                                   XLogRecPtr commit_lsn);
      89              : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
      90              :                                                                                         ReorderBufferTXN *txn,
      91              :                                                                                         XLogRecPtr prepare_end_lsn,
      92              :                                                                                         TimestampTz prepare_time);
      93              : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
      94              :                                                                    ReorderBufferTXN *txn);
      95              : static void pg_output_stream_start(LogicalDecodingContext *ctx,
      96              :                                                                    TestDecodingData *data,
      97              :                                                                    ReorderBufferTXN *txn,
      98              :                                                                    bool last_write);
      99              : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
     100              :                                                                   ReorderBufferTXN *txn);
     101              : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
     102              :                                                                    ReorderBufferTXN *txn,
     103              :                                                                    XLogRecPtr abort_lsn);
     104              : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     105              :                                                                          ReorderBufferTXN *txn,
     106              :                                                                          XLogRecPtr prepare_lsn);
     107              : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
     108              :                                                                         ReorderBufferTXN *txn,
     109              :                                                                         XLogRecPtr commit_lsn);
     110              : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
     111              :                                                                         ReorderBufferTXN *txn,
     112              :                                                                         Relation relation,
     113              :                                                                         ReorderBufferChange *change);
     114              : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
     115              :                                                                          ReorderBufferTXN *txn, XLogRecPtr lsn,
     116              :                                                                          bool transactional, const char *prefix,
     117              :                                                                          Size sz, const char *message);
     118              : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
     119              :                                                                           ReorderBufferTXN *txn,
     120              :                                                                           int nrelations, Relation relations[],
     121              :                                                                           ReorderBufferChange *change);
     122              : 
     123              : void
     124            0 : _PG_init(void)
     125              : {
     126              :         /* other plugins can perform things here */
     127            0 : }
     128              : 
     129              : /* specify output plugin callbacks */
     130              : void
     131            0 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     132              : {
     133            0 :         cb->startup_cb = pg_decode_startup;
     134            0 :         cb->begin_cb = pg_decode_begin_txn;
     135            0 :         cb->change_cb = pg_decode_change;
     136            0 :         cb->truncate_cb = pg_decode_truncate;
     137            0 :         cb->commit_cb = pg_decode_commit_txn;
     138            0 :         cb->filter_by_origin_cb = pg_decode_filter;
     139            0 :         cb->shutdown_cb = pg_decode_shutdown;
     140            0 :         cb->message_cb = pg_decode_message;
     141            0 :         cb->filter_prepare_cb = pg_decode_filter_prepare;
     142            0 :         cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
     143            0 :         cb->prepare_cb = pg_decode_prepare_txn;
     144            0 :         cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
     145            0 :         cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
     146            0 :         cb->stream_start_cb = pg_decode_stream_start;
     147            0 :         cb->stream_stop_cb = pg_decode_stream_stop;
     148            0 :         cb->stream_abort_cb = pg_decode_stream_abort;
     149            0 :         cb->stream_prepare_cb = pg_decode_stream_prepare;
     150            0 :         cb->stream_commit_cb = pg_decode_stream_commit;
     151            0 :         cb->stream_change_cb = pg_decode_stream_change;
     152            0 :         cb->stream_message_cb = pg_decode_stream_message;
     153            0 :         cb->stream_truncate_cb = pg_decode_stream_truncate;
     154            0 : }
     155              : 
     156              : 
     157              : /* initialize this plugin */
     158              : static void
     159            0 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     160              :                                   bool is_init)
     161              : {
     162            0 :         ListCell   *option;
     163            0 :         TestDecodingData *data;
     164            0 :         bool            enable_streaming = false;
     165              : 
     166            0 :         data = palloc0_object(TestDecodingData);
     167            0 :         data->context = AllocSetContextCreate(ctx->context,
     168              :                                                                                   "text conversion context",
     169              :                                                                                   ALLOCSET_DEFAULT_SIZES);
     170            0 :         data->include_xids = true;
     171            0 :         data->include_timestamp = false;
     172            0 :         data->skip_empty_xacts = false;
     173            0 :         data->only_local = false;
     174              : 
     175            0 :         ctx->output_plugin_private = data;
     176              : 
     177            0 :         opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     178            0 :         opt->receive_rewrites = false;
     179              : 
     180            0 :         foreach(option, ctx->output_plugin_options)
     181              :         {
     182            0 :                 DefElem    *elem = lfirst(option);
     183              : 
     184            0 :                 Assert(elem->arg == NULL || IsA(elem->arg, String));
     185              : 
     186            0 :                 if (strcmp(elem->defname, "include-xids") == 0)
     187              :                 {
     188              :                         /* if option does not provide a value, it means its value is true */
     189            0 :                         if (elem->arg == NULL)
     190            0 :                                 data->include_xids = true;
     191            0 :                         else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     192            0 :                                 ereport(ERROR,
     193              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     194              :                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
     195              :                                                                 strVal(elem->arg), elem->defname)));
     196            0 :                 }
     197            0 :                 else if (strcmp(elem->defname, "include-timestamp") == 0)
     198              :                 {
     199            0 :                         if (elem->arg == NULL)
     200            0 :                                 data->include_timestamp = true;
     201            0 :                         else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     202            0 :                                 ereport(ERROR,
     203              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     204              :                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
     205              :                                                                 strVal(elem->arg), elem->defname)));
     206            0 :                 }
     207            0 :                 else if (strcmp(elem->defname, "force-binary") == 0)
     208              :                 {
     209            0 :                         bool            force_binary;
     210              : 
     211            0 :                         if (elem->arg == NULL)
     212            0 :                                 continue;
     213            0 :                         else if (!parse_bool(strVal(elem->arg), &force_binary))
     214            0 :                                 ereport(ERROR,
     215              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     216              :                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
     217              :                                                                 strVal(elem->arg), elem->defname)));
     218              : 
     219            0 :                         if (force_binary)
     220            0 :                                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     221            0 :                 }
     222            0 :                 else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     223              :                 {
     224              : 
     225            0 :                         if (elem->arg == NULL)
     226            0 :                                 data->skip_empty_xacts = true;
     227            0 :                         else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     228            0 :                                 ereport(ERROR,
     229              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     230              :                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
     231              :                                                                 strVal(elem->arg), elem->defname)));
     232            0 :                 }
     233            0 :                 else if (strcmp(elem->defname, "only-local") == 0)
     234              :                 {
     235              : 
     236            0 :                         if (elem->arg == NULL)
     237            0 :                                 data->only_local = true;
     238            0 :                         else if (!parse_bool(strVal(elem->arg), &data->only_local))
     239            0 :                                 ereport(ERROR,
     240              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     241              :                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
     242              :                                                                 strVal(elem->arg), elem->defname)));
     243            0 :                 }
     244            0 :                 else if (strcmp(elem->defname, "include-rewrites") == 0)
     245              :                 {
     246              : 
     247            0 :                         if (elem->arg == NULL)
     248            0 :                                 continue;
     249            0 :                         else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     250            0 :                                 ereport(ERROR,
     251              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     252              :                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
     253              :                                                                 strVal(elem->arg), elem->defname)));
     254            0 :                 }
     255            0 :                 else if (strcmp(elem->defname, "stream-changes") == 0)
     256              :                 {
     257            0 :                         if (elem->arg == NULL)
     258            0 :                                 continue;
     259            0 :                         else if (!parse_bool(strVal(elem->arg), &enable_streaming))
     260            0 :                                 ereport(ERROR,
     261              :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     262              :                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
     263              :                                                                 strVal(elem->arg), elem->defname)));
     264            0 :                 }
     265              :                 else
     266              :                 {
     267            0 :                         ereport(ERROR,
     268              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     269              :                                          errmsg("option \"%s\" = \"%s\" is unknown",
     270              :                                                         elem->defname,
     271              :                                                         elem->arg ? strVal(elem->arg) : "(null)")));
     272              :                 }
     273            0 :         }
     274              : 
     275            0 :         ctx->streaming &= enable_streaming;
     276            0 : }
     277              : 
     278              : /* cleanup this plugin's resources */
     279              : static void
     280            0 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     281              : {
     282            0 :         TestDecodingData *data = ctx->output_plugin_private;
     283              : 
     284              :         /* cleanup our own resources via memory context reset */
     285            0 :         MemoryContextDelete(data->context);
     286            0 : }
     287              : 
     288              : /* BEGIN callback */
     289              : static void
     290            0 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     291              : {
     292            0 :         TestDecodingData *data = ctx->output_plugin_private;
     293            0 :         TestDecodingTxnData *txndata =
     294            0 :                 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     295              : 
     296            0 :         txndata->xact_wrote_changes = false;
     297            0 :         txn->output_plugin_private = txndata;
     298              : 
     299              :         /*
     300              :          * If asked to skip empty transactions, we'll emit BEGIN at the point
     301              :          * where the first operation is received for this transaction.
     302              :          */
     303            0 :         if (data->skip_empty_xacts)
     304            0 :                 return;
     305              : 
     306            0 :         pg_output_begin(ctx, data, txn, true);
     307            0 : }
     308              : 
     309              : static void
     310            0 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     311              : {
     312            0 :         OutputPluginPrepareWrite(ctx, last_write);
     313            0 :         if (data->include_xids)
     314            0 :                 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     315              :         else
     316            0 :                 appendStringInfoString(ctx->out, "BEGIN");
     317            0 :         OutputPluginWrite(ctx, last_write);
     318            0 : }
     319              : 
     320              : /* COMMIT callback */
     321              : static void
     322            0 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     323              :                                          XLogRecPtr commit_lsn)
     324              : {
     325            0 :         TestDecodingData *data = ctx->output_plugin_private;
     326            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     327            0 :         bool            xact_wrote_changes = txndata->xact_wrote_changes;
     328              : 
     329            0 :         pfree(txndata);
     330            0 :         txn->output_plugin_private = NULL;
     331              : 
     332            0 :         if (data->skip_empty_xacts && !xact_wrote_changes)
     333            0 :                 return;
     334              : 
     335            0 :         OutputPluginPrepareWrite(ctx, true);
     336            0 :         if (data->include_xids)
     337            0 :                 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     338              :         else
     339            0 :                 appendStringInfoString(ctx->out, "COMMIT");
     340              : 
     341            0 :         if (data->include_timestamp)
     342            0 :                 appendStringInfo(ctx->out, " (at %s)",
     343            0 :                                                  timestamptz_to_str(txn->commit_time));
     344              : 
     345            0 :         OutputPluginWrite(ctx, true);
     346            0 : }
     347              : 
     348              : /* BEGIN PREPARE callback */
     349              : static void
     350            0 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     351              : {
     352            0 :         TestDecodingData *data = ctx->output_plugin_private;
     353            0 :         TestDecodingTxnData *txndata =
     354            0 :                 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     355              : 
     356            0 :         txndata->xact_wrote_changes = false;
     357            0 :         txn->output_plugin_private = txndata;
     358              : 
     359              :         /*
     360              :          * If asked to skip empty transactions, we'll emit BEGIN at the point
     361              :          * where the first operation is received for this transaction.
     362              :          */
     363            0 :         if (data->skip_empty_xacts)
     364            0 :                 return;
     365              : 
     366            0 :         pg_output_begin(ctx, data, txn, true);
     367            0 : }
     368              : 
     369              : /* PREPARE callback */
     370              : static void
     371            0 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     372              :                                           XLogRecPtr prepare_lsn)
     373              : {
     374            0 :         TestDecodingData *data = ctx->output_plugin_private;
     375            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     376              : 
     377              :         /*
     378              :          * If asked to skip empty transactions, we'll emit PREPARE at the point
     379              :          * where the first operation is received for this transaction.
     380              :          */
     381            0 :         if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     382            0 :                 return;
     383              : 
     384            0 :         OutputPluginPrepareWrite(ctx, true);
     385              : 
     386            0 :         appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
     387            0 :                                          quote_literal_cstr(txn->gid));
     388              : 
     389            0 :         if (data->include_xids)
     390            0 :                 appendStringInfo(ctx->out, ", txid %u", txn->xid);
     391              : 
     392            0 :         if (data->include_timestamp)
     393            0 :                 appendStringInfo(ctx->out, " (at %s)",
     394            0 :                                                  timestamptz_to_str(txn->prepare_time));
     395              : 
     396            0 :         OutputPluginWrite(ctx, true);
     397            0 : }
     398              : 
     399              : /* COMMIT PREPARED callback */
     400              : static void
     401            0 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     402              :                                                           XLogRecPtr commit_lsn)
     403              : {
     404            0 :         TestDecodingData *data = ctx->output_plugin_private;
     405              : 
     406            0 :         OutputPluginPrepareWrite(ctx, true);
     407              : 
     408            0 :         appendStringInfo(ctx->out, "COMMIT PREPARED %s",
     409            0 :                                          quote_literal_cstr(txn->gid));
     410              : 
     411            0 :         if (data->include_xids)
     412            0 :                 appendStringInfo(ctx->out, ", txid %u", txn->xid);
     413              : 
     414            0 :         if (data->include_timestamp)
     415            0 :                 appendStringInfo(ctx->out, " (at %s)",
     416            0 :                                                  timestamptz_to_str(txn->commit_time));
     417              : 
     418            0 :         OutputPluginWrite(ctx, true);
     419            0 : }
     420              : 
     421              : /* ROLLBACK PREPARED callback */
     422              : static void
     423            0 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
     424              :                                                                 ReorderBufferTXN *txn,
     425              :                                                                 XLogRecPtr prepare_end_lsn,
     426              :                                                                 TimestampTz prepare_time)
     427              : {
     428            0 :         TestDecodingData *data = ctx->output_plugin_private;
     429              : 
     430            0 :         OutputPluginPrepareWrite(ctx, true);
     431              : 
     432            0 :         appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
     433            0 :                                          quote_literal_cstr(txn->gid));
     434              : 
     435            0 :         if (data->include_xids)
     436            0 :                 appendStringInfo(ctx->out, ", txid %u", txn->xid);
     437              : 
     438            0 :         if (data->include_timestamp)
     439            0 :                 appendStringInfo(ctx->out, " (at %s)",
     440            0 :                                                  timestamptz_to_str(txn->commit_time));
     441              : 
     442            0 :         OutputPluginWrite(ctx, true);
     443            0 : }
     444              : 
     445              : /*
     446              :  * Filter out two-phase transactions.
     447              :  *
     448              :  * Each plugin can implement its own filtering logic. Here we demonstrate a
     449              :  * simple logic by checking the GID. If the GID contains the "_nodecode"
     450              :  * substring, then we filter it out.
     451              :  */
     452              : static bool
     453            0 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
     454              :                                                  const char *gid)
     455              : {
     456            0 :         if (strstr(gid, "_nodecode") != NULL)
     457            0 :                 return true;
     458              : 
     459            0 :         return false;
     460            0 : }
     461              : 
     462              : static bool
     463            0 : pg_decode_filter(LogicalDecodingContext *ctx,
     464              :                                  RepOriginId origin_id)
     465              : {
     466            0 :         TestDecodingData *data = ctx->output_plugin_private;
     467              : 
     468            0 :         if (data->only_local && origin_id != InvalidRepOriginId)
     469            0 :                 return true;
     470            0 :         return false;
     471            0 : }
     472              : 
     473              : /*
     474              :  * Print literal `outputstr' already represented as string of type `typid'
     475              :  * into stringbuf `s'.
     476              :  *
     477              :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done
     478              :  * per standard SQL rules.
     479              :  */
     480              : static void
     481            0 : print_literal(StringInfo s, Oid typid, char *outputstr)
     482              : {
     483            0 :         const char *valptr;
     484              : 
     485            0 :         switch (typid)
     486              :         {
     487              :                 case INT2OID:
     488              :                 case INT4OID:
     489              :                 case INT8OID:
     490              :                 case OIDOID:
     491              :                 case FLOAT4OID:
     492              :                 case FLOAT8OID:
     493              :                 case NUMERICOID:
     494              :                         /* NB: We don't care about Inf, NaN et al. */
     495            0 :                         appendStringInfoString(s, outputstr);
     496            0 :                         break;
     497              : 
     498              :                 case BITOID:
     499              :                 case VARBITOID:
     500            0 :                         appendStringInfo(s, "B'%s'", outputstr);
     501            0 :                         break;
     502              : 
     503              :                 case BOOLOID:
     504            0 :                         if (strcmp(outputstr, "t") == 0)
     505            0 :                                 appendStringInfoString(s, "true");
     506              :                         else
     507            0 :                                 appendStringInfoString(s, "false");
     508            0 :                         break;
     509              : 
     510              :                 default:
     511            0 :                         appendStringInfoChar(s, '\'');
     512            0 :                         for (valptr = outputstr; *valptr; valptr++)
     513              :                         {
     514            0 :                                 char            ch = *valptr;
     515              : 
     516            0 :                                 if (SQL_STR_DOUBLE(ch, false))
     517            0 :                                         appendStringInfoChar(s, ch);
     518            0 :                                 appendStringInfoChar(s, ch);
     519            0 :                         }
     520            0 :                         appendStringInfoChar(s, '\'');
     521            0 :                         break;
     522              :         }
     523            0 : }
     524              : 
     525              : /* print the tuple 'tuple' into the StringInfo s */
     526              : static void
     527            0 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     528              : {
     529            0 :         int                     natt;
     530              : 
     531              :         /* print all columns individually */
     532            0 :         for (natt = 0; natt < tupdesc->natts; natt++)
     533              :         {
     534            0 :                 Form_pg_attribute attr; /* the attribute itself */
     535            0 :                 Oid                     typid;          /* type of current attribute */
     536            0 :                 Oid                     typoutput;      /* output function */
     537            0 :                 bool            typisvarlena;
     538            0 :                 Datum           origval;        /* possibly toasted Datum */
     539            0 :                 bool            isnull;         /* column is null? */
     540              : 
     541            0 :                 attr = TupleDescAttr(tupdesc, natt);
     542              : 
     543              :                 /*
     544              :                  * don't print dropped columns, we can't be sure everything is
     545              :                  * available for them
     546              :                  */
     547            0 :                 if (attr->attisdropped)
     548            0 :                         continue;
     549              : 
     550              :                 /*
     551              :                  * Don't print system columns, oid will already have been printed if
     552              :                  * present.
     553              :                  */
     554            0 :                 if (attr->attnum < 0)
     555            0 :                         continue;
     556              : 
     557            0 :                 typid = attr->atttypid;
     558              : 
     559              :                 /* get Datum from tuple */
     560            0 :                 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     561              : 
     562            0 :                 if (isnull && skip_nulls)
     563            0 :                         continue;
     564              : 
     565              :                 /* print attribute name */
     566            0 :                 appendStringInfoChar(s, ' ');
     567            0 :                 appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     568              : 
     569              :                 /* print attribute type */
     570            0 :                 appendStringInfoChar(s, '[');
     571            0 :                 appendStringInfoString(s, format_type_be(typid));
     572            0 :                 appendStringInfoChar(s, ']');
     573              : 
     574              :                 /* query output function */
     575            0 :                 getTypeOutputInfo(typid,
     576              :                                                   &typoutput, &typisvarlena);
     577              : 
     578              :                 /* print separator */
     579            0 :                 appendStringInfoChar(s, ':');
     580              : 
     581              :                 /* print data */
     582            0 :                 if (isnull)
     583            0 :                         appendStringInfoString(s, "null");
     584            0 :                 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
     585            0 :                         appendStringInfoString(s, "unchanged-toast-datum");
     586            0 :                 else if (!typisvarlena)
     587            0 :                         print_literal(s, typid,
     588            0 :                                                   OidOutputFunctionCall(typoutput, origval));
     589              :                 else
     590              :                 {
     591            0 :                         Datum           val;    /* definitely detoasted Datum */
     592              : 
     593            0 :                         val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     594            0 :                         print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     595            0 :                 }
     596            0 :         }
     597            0 : }
     598              : 
     599              : /*
     600              :  * callback for individual changed tuples
     601              :  */
     602              : static void
     603            0 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     604              :                                  Relation relation, ReorderBufferChange *change)
     605              : {
     606            0 :         TestDecodingData *data;
     607            0 :         TestDecodingTxnData *txndata;
     608            0 :         Form_pg_class class_form;
     609            0 :         TupleDesc       tupdesc;
     610            0 :         MemoryContext old;
     611              : 
     612            0 :         data = ctx->output_plugin_private;
     613            0 :         txndata = txn->output_plugin_private;
     614              : 
     615              :         /* output BEGIN if we haven't yet */
     616            0 :         if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     617              :         {
     618            0 :                 pg_output_begin(ctx, data, txn, false);
     619            0 :         }
     620            0 :         txndata->xact_wrote_changes = true;
     621              : 
     622            0 :         class_form = RelationGetForm(relation);
     623            0 :         tupdesc = RelationGetDescr(relation);
     624              : 
     625              :         /* Avoid leaking memory by using and resetting our own context */
     626            0 :         old = MemoryContextSwitchTo(data->context);
     627              : 
     628            0 :         OutputPluginPrepareWrite(ctx, true);
     629              : 
     630            0 :         appendStringInfoString(ctx->out, "table ");
     631            0 :         appendStringInfoString(ctx->out,
     632            0 :                                                    quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     633            0 :                                                                                                           class_form->relrewrite ?
     634            0 :                                                                                                           get_rel_name(class_form->relrewrite) :
     635            0 :                                                                                                           NameStr(class_form->relname)));
     636            0 :         appendStringInfoChar(ctx->out, ':');
     637              : 
     638            0 :         switch (change->action)
     639              :         {
     640              :                 case REORDER_BUFFER_CHANGE_INSERT:
     641            0 :                         appendStringInfoString(ctx->out, " INSERT:");
     642            0 :                         if (change->data.tp.newtuple == NULL)
     643            0 :                                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     644              :                         else
     645            0 :                                 tuple_to_stringinfo(ctx->out, tupdesc,
     646            0 :                                                                         change->data.tp.newtuple,
     647              :                                                                         false);
     648            0 :                         break;
     649              :                 case REORDER_BUFFER_CHANGE_UPDATE:
     650            0 :                         appendStringInfoString(ctx->out, " UPDATE:");
     651            0 :                         if (change->data.tp.oldtuple != NULL)
     652              :                         {
     653            0 :                                 appendStringInfoString(ctx->out, " old-key:");
     654            0 :                                 tuple_to_stringinfo(ctx->out, tupdesc,
     655            0 :                                                                         change->data.tp.oldtuple,
     656              :                                                                         true);
     657            0 :                                 appendStringInfoString(ctx->out, " new-tuple:");
     658            0 :                         }
     659              : 
     660            0 :                         if (change->data.tp.newtuple == NULL)
     661            0 :                                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     662              :                         else
     663            0 :                                 tuple_to_stringinfo(ctx->out, tupdesc,
     664            0 :                                                                         change->data.tp.newtuple,
     665              :                                                                         false);
     666            0 :                         break;
     667              :                 case REORDER_BUFFER_CHANGE_DELETE:
     668            0 :                         appendStringInfoString(ctx->out, " DELETE:");
     669              : 
     670              :                         /* if there was no PK, we only know that a delete happened */
     671            0 :                         if (change->data.tp.oldtuple == NULL)
     672            0 :                                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     673              :                         /* In DELETE, only the replica identity is present; display that */
     674              :                         else
     675            0 :                                 tuple_to_stringinfo(ctx->out, tupdesc,
     676            0 :                                                                         change->data.tp.oldtuple,
     677              :                                                                         true);
     678            0 :                         break;
     679              :                 default:
     680            0 :                         Assert(false);
     681            0 :         }
     682              : 
     683            0 :         MemoryContextSwitchTo(old);
     684            0 :         MemoryContextReset(data->context);
     685              : 
     686            0 :         OutputPluginWrite(ctx, true);
     687            0 : }
     688              : 
     689              : static void
     690            0 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     691              :                                    int nrelations, Relation relations[], ReorderBufferChange *change)
     692              : {
     693            0 :         TestDecodingData *data;
     694            0 :         TestDecodingTxnData *txndata;
     695            0 :         MemoryContext old;
     696            0 :         int                     i;
     697              : 
     698            0 :         data = ctx->output_plugin_private;
     699            0 :         txndata = txn->output_plugin_private;
     700              : 
     701              :         /* output BEGIN if we haven't yet */
     702            0 :         if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     703              :         {
     704            0 :                 pg_output_begin(ctx, data, txn, false);
     705            0 :         }
     706            0 :         txndata->xact_wrote_changes = true;
     707              : 
     708              :         /* Avoid leaking memory by using and resetting our own context */
     709            0 :         old = MemoryContextSwitchTo(data->context);
     710              : 
     711            0 :         OutputPluginPrepareWrite(ctx, true);
     712              : 
     713            0 :         appendStringInfoString(ctx->out, "table ");
     714              : 
     715            0 :         for (i = 0; i < nrelations; i++)
     716              :         {
     717            0 :                 if (i > 0)
     718            0 :                         appendStringInfoString(ctx->out, ", ");
     719              : 
     720            0 :                 appendStringInfoString(ctx->out,
     721            0 :                                                            quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     722            0 :                                                                                                                   NameStr(relations[i]->rd_rel->relname)));
     723            0 :         }
     724              : 
     725            0 :         appendStringInfoString(ctx->out, ": TRUNCATE:");
     726              : 
     727            0 :         if (change->data.truncate.restart_seqs
     728            0 :                 || change->data.truncate.cascade)
     729              :         {
     730            0 :                 if (change->data.truncate.restart_seqs)
     731            0 :                         appendStringInfoString(ctx->out, " restart_seqs");
     732            0 :                 if (change->data.truncate.cascade)
     733            0 :                         appendStringInfoString(ctx->out, " cascade");
     734            0 :         }
     735              :         else
     736            0 :                 appendStringInfoString(ctx->out, " (no-flags)");
     737              : 
     738            0 :         MemoryContextSwitchTo(old);
     739            0 :         MemoryContextReset(data->context);
     740              : 
     741            0 :         OutputPluginWrite(ctx, true);
     742            0 : }
     743              : 
     744              : static void
     745            0 : pg_decode_message(LogicalDecodingContext *ctx,
     746              :                                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     747              :                                   const char *prefix, Size sz, const char *message)
     748              : {
     749            0 :         TestDecodingData *data = ctx->output_plugin_private;
     750            0 :         TestDecodingTxnData *txndata;
     751              : 
     752            0 :         txndata = transactional ? txn->output_plugin_private : NULL;
     753              : 
     754              :         /* output BEGIN if we haven't yet for transactional messages */
     755            0 :         if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
     756            0 :                 pg_output_begin(ctx, data, txn, false);
     757              : 
     758            0 :         if (transactional)
     759            0 :                 txndata->xact_wrote_changes = true;
     760              : 
     761            0 :         OutputPluginPrepareWrite(ctx, true);
     762            0 :         appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     763            0 :                                          transactional, prefix, sz);
     764            0 :         appendBinaryStringInfo(ctx->out, message, sz);
     765            0 :         OutputPluginWrite(ctx, true);
     766            0 : }
     767              : 
     768              : static void
     769            0 : pg_decode_stream_start(LogicalDecodingContext *ctx,
     770              :                                            ReorderBufferTXN *txn)
     771              : {
     772            0 :         TestDecodingData *data = ctx->output_plugin_private;
     773            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     774              : 
     775              :         /*
     776              :          * Allocate the txn plugin data for the first stream in the transaction.
     777              :          */
     778            0 :         if (txndata == NULL)
     779              :         {
     780            0 :                 txndata =
     781            0 :                         MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     782            0 :                 txndata->xact_wrote_changes = false;
     783            0 :                 txn->output_plugin_private = txndata;
     784            0 :         }
     785              : 
     786            0 :         txndata->stream_wrote_changes = false;
     787            0 :         if (data->skip_empty_xacts)
     788            0 :                 return;
     789            0 :         pg_output_stream_start(ctx, data, txn, true);
     790            0 : }
     791              : 
     792              : static void
     793            0 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     794              : {
     795            0 :         OutputPluginPrepareWrite(ctx, last_write);
     796            0 :         if (data->include_xids)
     797            0 :                 appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
     798              :         else
     799            0 :                 appendStringInfoString(ctx->out, "opening a streamed block for transaction");
     800            0 :         OutputPluginWrite(ctx, last_write);
     801            0 : }
     802              : 
     803              : static void
     804            0 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
     805              :                                           ReorderBufferTXN *txn)
     806              : {
     807            0 :         TestDecodingData *data = ctx->output_plugin_private;
     808            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     809              : 
     810            0 :         if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     811            0 :                 return;
     812              : 
     813            0 :         OutputPluginPrepareWrite(ctx, true);
     814            0 :         if (data->include_xids)
     815            0 :                 appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
     816              :         else
     817            0 :                 appendStringInfoString(ctx->out, "closing a streamed block for transaction");
     818            0 :         OutputPluginWrite(ctx, true);
     819            0 : }
     820              : 
     821              : static void
     822            0 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
     823              :                                            ReorderBufferTXN *txn,
     824              :                                            XLogRecPtr abort_lsn)
     825              : {
     826            0 :         TestDecodingData *data = ctx->output_plugin_private;
     827              : 
     828              :         /*
     829              :          * stream abort can be sent for an individual subtransaction but we
     830              :          * maintain the output_plugin_private only under the toptxn so if this is
     831              :          * not the toptxn then fetch the toptxn.
     832              :          */
     833            0 :         ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
     834            0 :         TestDecodingTxnData *txndata = toptxn->output_plugin_private;
     835            0 :         bool            xact_wrote_changes = txndata->xact_wrote_changes;
     836              : 
     837            0 :         if (rbtxn_is_toptxn(txn))
     838              :         {
     839            0 :                 Assert(txn->output_plugin_private != NULL);
     840            0 :                 pfree(txndata);
     841            0 :                 txn->output_plugin_private = NULL;
     842            0 :         }
     843              : 
     844            0 :         if (data->skip_empty_xacts && !xact_wrote_changes)
     845            0 :                 return;
     846              : 
     847            0 :         OutputPluginPrepareWrite(ctx, true);
     848            0 :         if (data->include_xids)
     849            0 :                 appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
     850              :         else
     851            0 :                 appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
     852            0 :         OutputPluginWrite(ctx, true);
     853            0 : }
     854              : 
     855              : static void
     856            0 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     857              :                                                  ReorderBufferTXN *txn,
     858              :                                                  XLogRecPtr prepare_lsn)
     859              : {
     860            0 :         TestDecodingData *data = ctx->output_plugin_private;
     861            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     862              : 
     863            0 :         if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     864            0 :                 return;
     865              : 
     866            0 :         OutputPluginPrepareWrite(ctx, true);
     867              : 
     868            0 :         if (data->include_xids)
     869            0 :                 appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
     870            0 :                                                  quote_literal_cstr(txn->gid), txn->xid);
     871              :         else
     872            0 :                 appendStringInfo(ctx->out, "preparing streamed transaction %s",
     873            0 :                                                  quote_literal_cstr(txn->gid));
     874              : 
     875            0 :         if (data->include_timestamp)
     876            0 :                 appendStringInfo(ctx->out, " (at %s)",
     877            0 :                                                  timestamptz_to_str(txn->prepare_time));
     878              : 
     879            0 :         OutputPluginWrite(ctx, true);
     880            0 : }
     881              : 
     882              : static void
     883            0 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
     884              :                                                 ReorderBufferTXN *txn,
     885              :                                                 XLogRecPtr commit_lsn)
     886              : {
     887            0 :         TestDecodingData *data = ctx->output_plugin_private;
     888            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     889            0 :         bool            xact_wrote_changes = txndata->xact_wrote_changes;
     890              : 
     891            0 :         pfree(txndata);
     892            0 :         txn->output_plugin_private = NULL;
     893              : 
     894            0 :         if (data->skip_empty_xacts && !xact_wrote_changes)
     895            0 :                 return;
     896              : 
     897            0 :         OutputPluginPrepareWrite(ctx, true);
     898              : 
     899            0 :         if (data->include_xids)
     900            0 :                 appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
     901              :         else
     902            0 :                 appendStringInfoString(ctx->out, "committing streamed transaction");
     903              : 
     904            0 :         if (data->include_timestamp)
     905            0 :                 appendStringInfo(ctx->out, " (at %s)",
     906            0 :                                                  timestamptz_to_str(txn->commit_time));
     907              : 
     908            0 :         OutputPluginWrite(ctx, true);
     909            0 : }
     910              : 
     911              : /*
     912              :  * In streaming mode, we don't display the changes as the transaction can abort
     913              :  * at a later point in time.  We don't want users to see the changes until the
     914              :  * transaction is committed.
     915              :  */
     916              : static void
     917            0 : pg_decode_stream_change(LogicalDecodingContext *ctx,
     918              :                                                 ReorderBufferTXN *txn,
     919              :                                                 Relation relation,
     920              :                                                 ReorderBufferChange *change)
     921              : {
     922            0 :         TestDecodingData *data = ctx->output_plugin_private;
     923            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     924              : 
     925              :         /* output stream start if we haven't yet */
     926            0 :         if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     927              :         {
     928            0 :                 pg_output_stream_start(ctx, data, txn, false);
     929            0 :         }
     930            0 :         txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     931              : 
     932            0 :         OutputPluginPrepareWrite(ctx, true);
     933            0 :         if (data->include_xids)
     934            0 :                 appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
     935              :         else
     936            0 :                 appendStringInfoString(ctx->out, "streaming change for transaction");
     937            0 :         OutputPluginWrite(ctx, true);
     938            0 : }
     939              : 
     940              : /*
     941              :  * In streaming mode, we don't display the contents for transactional messages
     942              :  * as the transaction can abort at a later point in time.  We don't want users to
     943              :  * see the message contents until the transaction is committed.
     944              :  */
     945              : static void
     946            0 : pg_decode_stream_message(LogicalDecodingContext *ctx,
     947              :                                                  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     948              :                                                  const char *prefix, Size sz, const char *message)
     949              : {
     950              :         /* Output stream start if we haven't yet for transactional messages. */
     951            0 :         if (transactional)
     952              :         {
     953            0 :                 TestDecodingData *data = ctx->output_plugin_private;
     954            0 :                 TestDecodingTxnData *txndata = txn->output_plugin_private;
     955              : 
     956            0 :                 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     957              :                 {
     958            0 :                         pg_output_stream_start(ctx, data, txn, false);
     959            0 :                 }
     960            0 :                 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     961            0 :         }
     962              : 
     963            0 :         OutputPluginPrepareWrite(ctx, true);
     964              : 
     965            0 :         if (transactional)
     966              :         {
     967            0 :                 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
     968            0 :                                                  transactional, prefix, sz);
     969            0 :         }
     970              :         else
     971              :         {
     972            0 :                 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
     973            0 :                                                  transactional, prefix, sz);
     974            0 :                 appendBinaryStringInfo(ctx->out, message, sz);
     975              :         }
     976              : 
     977            0 :         OutputPluginWrite(ctx, true);
     978            0 : }
     979              : 
     980              : /*
     981              :  * In streaming mode, we don't display the detailed information of Truncate.
     982              :  * See pg_decode_stream_change.
     983              :  */
     984              : static void
     985            0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     986              :                                                   int nrelations, Relation relations[],
     987              :                                                   ReorderBufferChange *change)
     988              : {
     989            0 :         TestDecodingData *data = ctx->output_plugin_private;
     990            0 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     991              : 
     992            0 :         if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     993              :         {
     994            0 :                 pg_output_stream_start(ctx, data, txn, false);
     995            0 :         }
     996            0 :         txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     997              : 
     998            0 :         OutputPluginPrepareWrite(ctx, true);
     999            0 :         if (data->include_xids)
    1000            0 :                 appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
    1001              :         else
    1002            0 :                 appendStringInfoString(ctx->out, "streaming truncate for transaction");
    1003            0 :         OutputPluginWrite(ctx, true);
    1004            0 : }
        

Generated by: LCOV version 2.3.2-1