Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * test_decoding.c
4 : * example logical decoding output plugin
5 : *
6 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/test_decoding/test_decoding.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "catalog/pg_type.h"
16 :
17 : #include "replication/logical.h"
18 : #include "replication/origin.h"
19 :
20 : #include "utils/builtins.h"
21 : #include "utils/lsyscache.h"
22 : #include "utils/memutils.h"
23 : #include "utils/rel.h"
24 :
25 0 : PG_MODULE_MAGIC_EXT(
26 : .name = "test_decoding",
27 : .version = PG_VERSION
28 : );
29 :
30 : typedef struct
31 : {
32 : MemoryContext context;
33 : bool include_xids;
34 : bool include_timestamp;
35 : bool skip_empty_xacts;
36 : bool only_local;
37 : } TestDecodingData;
38 :
39 : /*
40 : * Maintain the per-transaction level variables to track whether the
41 : * transaction and or streams have written any changes. In streaming mode the
42 : * transaction can be decoded in streams so along with maintaining whether the
43 : * transaction has written any changes, we also need to track whether the
44 : * current stream has written any changes. This is required so that if user
45 : * has requested to skip the empty transactions we can skip the empty streams
46 : * even though the transaction has written some changes.
47 : */
48 : typedef struct
49 : {
50 : bool xact_wrote_changes;
51 : bool stream_wrote_changes;
52 : } TestDecodingTxnData;
53 :
54 : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
55 : bool is_init);
56 : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
57 : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
58 : ReorderBufferTXN *txn);
59 : static void pg_output_begin(LogicalDecodingContext *ctx,
60 : TestDecodingData *data,
61 : ReorderBufferTXN *txn,
62 : bool last_write);
63 : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 : static void pg_decode_change(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn, Relation relation,
67 : ReorderBufferChange *change);
68 : static void pg_decode_truncate(LogicalDecodingContext *ctx,
69 : ReorderBufferTXN *txn,
70 : int nrelations, Relation relations[],
71 : ReorderBufferChange *change);
72 : static bool pg_decode_filter(LogicalDecodingContext *ctx,
73 : RepOriginId origin_id);
74 : static void pg_decode_message(LogicalDecodingContext *ctx,
75 : ReorderBufferTXN *txn, XLogRecPtr lsn,
76 : bool transactional, const char *prefix,
77 : Size sz, const char *message);
78 : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
79 : TransactionId xid,
80 : const char *gid);
81 : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
82 : ReorderBufferTXN *txn);
83 : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn,
85 : XLogRecPtr prepare_lsn);
86 : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
87 : ReorderBufferTXN *txn,
88 : XLogRecPtr commit_lsn);
89 : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
90 : ReorderBufferTXN *txn,
91 : XLogRecPtr prepare_end_lsn,
92 : TimestampTz prepare_time);
93 : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
94 : ReorderBufferTXN *txn);
95 : static void pg_output_stream_start(LogicalDecodingContext *ctx,
96 : TestDecodingData *data,
97 : ReorderBufferTXN *txn,
98 : bool last_write);
99 : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
100 : ReorderBufferTXN *txn);
101 : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
102 : ReorderBufferTXN *txn,
103 : XLogRecPtr abort_lsn);
104 : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
105 : ReorderBufferTXN *txn,
106 : XLogRecPtr prepare_lsn);
107 : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
108 : ReorderBufferTXN *txn,
109 : XLogRecPtr commit_lsn);
110 : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
111 : ReorderBufferTXN *txn,
112 : Relation relation,
113 : ReorderBufferChange *change);
114 : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
115 : ReorderBufferTXN *txn, XLogRecPtr lsn,
116 : bool transactional, const char *prefix,
117 : Size sz, const char *message);
118 : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
119 : ReorderBufferTXN *txn,
120 : int nrelations, Relation relations[],
121 : ReorderBufferChange *change);
122 :
123 : void
124 0 : _PG_init(void)
125 : {
126 : /* other plugins can perform things here */
127 0 : }
128 :
129 : /* specify output plugin callbacks */
130 : void
131 0 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
132 : {
133 0 : cb->startup_cb = pg_decode_startup;
134 0 : cb->begin_cb = pg_decode_begin_txn;
135 0 : cb->change_cb = pg_decode_change;
136 0 : cb->truncate_cb = pg_decode_truncate;
137 0 : cb->commit_cb = pg_decode_commit_txn;
138 0 : cb->filter_by_origin_cb = pg_decode_filter;
139 0 : cb->shutdown_cb = pg_decode_shutdown;
140 0 : cb->message_cb = pg_decode_message;
141 0 : cb->filter_prepare_cb = pg_decode_filter_prepare;
142 0 : cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
143 0 : cb->prepare_cb = pg_decode_prepare_txn;
144 0 : cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
145 0 : cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
146 0 : cb->stream_start_cb = pg_decode_stream_start;
147 0 : cb->stream_stop_cb = pg_decode_stream_stop;
148 0 : cb->stream_abort_cb = pg_decode_stream_abort;
149 0 : cb->stream_prepare_cb = pg_decode_stream_prepare;
150 0 : cb->stream_commit_cb = pg_decode_stream_commit;
151 0 : cb->stream_change_cb = pg_decode_stream_change;
152 0 : cb->stream_message_cb = pg_decode_stream_message;
153 0 : cb->stream_truncate_cb = pg_decode_stream_truncate;
154 0 : }
155 :
156 :
157 : /* initialize this plugin */
158 : static void
159 0 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
160 : bool is_init)
161 : {
162 0 : ListCell *option;
163 0 : TestDecodingData *data;
164 0 : bool enable_streaming = false;
165 :
166 0 : data = palloc0_object(TestDecodingData);
167 0 : data->context = AllocSetContextCreate(ctx->context,
168 : "text conversion context",
169 : ALLOCSET_DEFAULT_SIZES);
170 0 : data->include_xids = true;
171 0 : data->include_timestamp = false;
172 0 : data->skip_empty_xacts = false;
173 0 : data->only_local = false;
174 :
175 0 : ctx->output_plugin_private = data;
176 :
177 0 : opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
178 0 : opt->receive_rewrites = false;
179 :
180 0 : foreach(option, ctx->output_plugin_options)
181 : {
182 0 : DefElem *elem = lfirst(option);
183 :
184 0 : Assert(elem->arg == NULL || IsA(elem->arg, String));
185 :
186 0 : if (strcmp(elem->defname, "include-xids") == 0)
187 : {
188 : /* if option does not provide a value, it means its value is true */
189 0 : if (elem->arg == NULL)
190 0 : data->include_xids = true;
191 0 : else if (!parse_bool(strVal(elem->arg), &data->include_xids))
192 0 : ereport(ERROR,
193 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
194 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
195 : strVal(elem->arg), elem->defname)));
196 0 : }
197 0 : else if (strcmp(elem->defname, "include-timestamp") == 0)
198 : {
199 0 : if (elem->arg == NULL)
200 0 : data->include_timestamp = true;
201 0 : else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
202 0 : ereport(ERROR,
203 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
204 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
205 : strVal(elem->arg), elem->defname)));
206 0 : }
207 0 : else if (strcmp(elem->defname, "force-binary") == 0)
208 : {
209 0 : bool force_binary;
210 :
211 0 : if (elem->arg == NULL)
212 0 : continue;
213 0 : else if (!parse_bool(strVal(elem->arg), &force_binary))
214 0 : ereport(ERROR,
215 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
216 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
217 : strVal(elem->arg), elem->defname)));
218 :
219 0 : if (force_binary)
220 0 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
221 0 : }
222 0 : else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
223 : {
224 :
225 0 : if (elem->arg == NULL)
226 0 : data->skip_empty_xacts = true;
227 0 : else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
228 0 : ereport(ERROR,
229 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
231 : strVal(elem->arg), elem->defname)));
232 0 : }
233 0 : else if (strcmp(elem->defname, "only-local") == 0)
234 : {
235 :
236 0 : if (elem->arg == NULL)
237 0 : data->only_local = true;
238 0 : else if (!parse_bool(strVal(elem->arg), &data->only_local))
239 0 : ereport(ERROR,
240 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
242 : strVal(elem->arg), elem->defname)));
243 0 : }
244 0 : else if (strcmp(elem->defname, "include-rewrites") == 0)
245 : {
246 :
247 0 : if (elem->arg == NULL)
248 0 : continue;
249 0 : else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
250 0 : ereport(ERROR,
251 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
253 : strVal(elem->arg), elem->defname)));
254 0 : }
255 0 : else if (strcmp(elem->defname, "stream-changes") == 0)
256 : {
257 0 : if (elem->arg == NULL)
258 0 : continue;
259 0 : else if (!parse_bool(strVal(elem->arg), &enable_streaming))
260 0 : ereport(ERROR,
261 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
262 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
263 : strVal(elem->arg), elem->defname)));
264 0 : }
265 : else
266 : {
267 0 : ereport(ERROR,
268 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
269 : errmsg("option \"%s\" = \"%s\" is unknown",
270 : elem->defname,
271 : elem->arg ? strVal(elem->arg) : "(null)")));
272 : }
273 0 : }
274 :
275 0 : ctx->streaming &= enable_streaming;
276 0 : }
277 :
278 : /* cleanup this plugin's resources */
279 : static void
280 0 : pg_decode_shutdown(LogicalDecodingContext *ctx)
281 : {
282 0 : TestDecodingData *data = ctx->output_plugin_private;
283 :
284 : /* cleanup our own resources via memory context reset */
285 0 : MemoryContextDelete(data->context);
286 0 : }
287 :
288 : /* BEGIN callback */
289 : static void
290 0 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
291 : {
292 0 : TestDecodingData *data = ctx->output_plugin_private;
293 0 : TestDecodingTxnData *txndata =
294 0 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
295 :
296 0 : txndata->xact_wrote_changes = false;
297 0 : txn->output_plugin_private = txndata;
298 :
299 : /*
300 : * If asked to skip empty transactions, we'll emit BEGIN at the point
301 : * where the first operation is received for this transaction.
302 : */
303 0 : if (data->skip_empty_xacts)
304 0 : return;
305 :
306 0 : pg_output_begin(ctx, data, txn, true);
307 0 : }
308 :
309 : static void
310 0 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
311 : {
312 0 : OutputPluginPrepareWrite(ctx, last_write);
313 0 : if (data->include_xids)
314 0 : appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
315 : else
316 0 : appendStringInfoString(ctx->out, "BEGIN");
317 0 : OutputPluginWrite(ctx, last_write);
318 0 : }
319 :
320 : /* COMMIT callback */
321 : static void
322 0 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
323 : XLogRecPtr commit_lsn)
324 : {
325 0 : TestDecodingData *data = ctx->output_plugin_private;
326 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
327 0 : bool xact_wrote_changes = txndata->xact_wrote_changes;
328 :
329 0 : pfree(txndata);
330 0 : txn->output_plugin_private = NULL;
331 :
332 0 : if (data->skip_empty_xacts && !xact_wrote_changes)
333 0 : return;
334 :
335 0 : OutputPluginPrepareWrite(ctx, true);
336 0 : if (data->include_xids)
337 0 : appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
338 : else
339 0 : appendStringInfoString(ctx->out, "COMMIT");
340 :
341 0 : if (data->include_timestamp)
342 0 : appendStringInfo(ctx->out, " (at %s)",
343 0 : timestamptz_to_str(txn->commit_time));
344 :
345 0 : OutputPluginWrite(ctx, true);
346 0 : }
347 :
348 : /* BEGIN PREPARE callback */
349 : static void
350 0 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
351 : {
352 0 : TestDecodingData *data = ctx->output_plugin_private;
353 0 : TestDecodingTxnData *txndata =
354 0 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
355 :
356 0 : txndata->xact_wrote_changes = false;
357 0 : txn->output_plugin_private = txndata;
358 :
359 : /*
360 : * If asked to skip empty transactions, we'll emit BEGIN at the point
361 : * where the first operation is received for this transaction.
362 : */
363 0 : if (data->skip_empty_xacts)
364 0 : return;
365 :
366 0 : pg_output_begin(ctx, data, txn, true);
367 0 : }
368 :
369 : /* PREPARE callback */
370 : static void
371 0 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
372 : XLogRecPtr prepare_lsn)
373 : {
374 0 : TestDecodingData *data = ctx->output_plugin_private;
375 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
376 :
377 : /*
378 : * If asked to skip empty transactions, we'll emit PREPARE at the point
379 : * where the first operation is received for this transaction.
380 : */
381 0 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
382 0 : return;
383 :
384 0 : OutputPluginPrepareWrite(ctx, true);
385 :
386 0 : appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
387 0 : quote_literal_cstr(txn->gid));
388 :
389 0 : if (data->include_xids)
390 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
391 :
392 0 : if (data->include_timestamp)
393 0 : appendStringInfo(ctx->out, " (at %s)",
394 0 : timestamptz_to_str(txn->prepare_time));
395 :
396 0 : OutputPluginWrite(ctx, true);
397 0 : }
398 :
399 : /* COMMIT PREPARED callback */
400 : static void
401 0 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
402 : XLogRecPtr commit_lsn)
403 : {
404 0 : TestDecodingData *data = ctx->output_plugin_private;
405 :
406 0 : OutputPluginPrepareWrite(ctx, true);
407 :
408 0 : appendStringInfo(ctx->out, "COMMIT PREPARED %s",
409 0 : quote_literal_cstr(txn->gid));
410 :
411 0 : if (data->include_xids)
412 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
413 :
414 0 : if (data->include_timestamp)
415 0 : appendStringInfo(ctx->out, " (at %s)",
416 0 : timestamptz_to_str(txn->commit_time));
417 :
418 0 : OutputPluginWrite(ctx, true);
419 0 : }
420 :
421 : /* ROLLBACK PREPARED callback */
422 : static void
423 0 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
424 : ReorderBufferTXN *txn,
425 : XLogRecPtr prepare_end_lsn,
426 : TimestampTz prepare_time)
427 : {
428 0 : TestDecodingData *data = ctx->output_plugin_private;
429 :
430 0 : OutputPluginPrepareWrite(ctx, true);
431 :
432 0 : appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
433 0 : quote_literal_cstr(txn->gid));
434 :
435 0 : if (data->include_xids)
436 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
437 :
438 0 : if (data->include_timestamp)
439 0 : appendStringInfo(ctx->out, " (at %s)",
440 0 : timestamptz_to_str(txn->commit_time));
441 :
442 0 : OutputPluginWrite(ctx, true);
443 0 : }
444 :
445 : /*
446 : * Filter out two-phase transactions.
447 : *
448 : * Each plugin can implement its own filtering logic. Here we demonstrate a
449 : * simple logic by checking the GID. If the GID contains the "_nodecode"
450 : * substring, then we filter it out.
451 : */
452 : static bool
453 0 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
454 : const char *gid)
455 : {
456 0 : if (strstr(gid, "_nodecode") != NULL)
457 0 : return true;
458 :
459 0 : return false;
460 0 : }
461 :
462 : static bool
463 0 : pg_decode_filter(LogicalDecodingContext *ctx,
464 : RepOriginId origin_id)
465 : {
466 0 : TestDecodingData *data = ctx->output_plugin_private;
467 :
468 0 : if (data->only_local && origin_id != InvalidRepOriginId)
469 0 : return true;
470 0 : return false;
471 0 : }
472 :
473 : /*
474 : * Print literal `outputstr' already represented as string of type `typid'
475 : * into stringbuf `s'.
476 : *
477 : * Some builtin types aren't quoted, the rest is quoted. Escaping is done
478 : * per standard SQL rules.
479 : */
480 : static void
481 0 : print_literal(StringInfo s, Oid typid, char *outputstr)
482 : {
483 0 : const char *valptr;
484 :
485 0 : switch (typid)
486 : {
487 : case INT2OID:
488 : case INT4OID:
489 : case INT8OID:
490 : case OIDOID:
491 : case FLOAT4OID:
492 : case FLOAT8OID:
493 : case NUMERICOID:
494 : /* NB: We don't care about Inf, NaN et al. */
495 0 : appendStringInfoString(s, outputstr);
496 0 : break;
497 :
498 : case BITOID:
499 : case VARBITOID:
500 0 : appendStringInfo(s, "B'%s'", outputstr);
501 0 : break;
502 :
503 : case BOOLOID:
504 0 : if (strcmp(outputstr, "t") == 0)
505 0 : appendStringInfoString(s, "true");
506 : else
507 0 : appendStringInfoString(s, "false");
508 0 : break;
509 :
510 : default:
511 0 : appendStringInfoChar(s, '\'');
512 0 : for (valptr = outputstr; *valptr; valptr++)
513 : {
514 0 : char ch = *valptr;
515 :
516 0 : if (SQL_STR_DOUBLE(ch, false))
517 0 : appendStringInfoChar(s, ch);
518 0 : appendStringInfoChar(s, ch);
519 0 : }
520 0 : appendStringInfoChar(s, '\'');
521 0 : break;
522 : }
523 0 : }
524 :
525 : /* print the tuple 'tuple' into the StringInfo s */
526 : static void
527 0 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
528 : {
529 0 : int natt;
530 :
531 : /* print all columns individually */
532 0 : for (natt = 0; natt < tupdesc->natts; natt++)
533 : {
534 0 : Form_pg_attribute attr; /* the attribute itself */
535 0 : Oid typid; /* type of current attribute */
536 0 : Oid typoutput; /* output function */
537 0 : bool typisvarlena;
538 0 : Datum origval; /* possibly toasted Datum */
539 0 : bool isnull; /* column is null? */
540 :
541 0 : attr = TupleDescAttr(tupdesc, natt);
542 :
543 : /*
544 : * don't print dropped columns, we can't be sure everything is
545 : * available for them
546 : */
547 0 : if (attr->attisdropped)
548 0 : continue;
549 :
550 : /*
551 : * Don't print system columns, oid will already have been printed if
552 : * present.
553 : */
554 0 : if (attr->attnum < 0)
555 0 : continue;
556 :
557 0 : typid = attr->atttypid;
558 :
559 : /* get Datum from tuple */
560 0 : origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561 :
562 0 : if (isnull && skip_nulls)
563 0 : continue;
564 :
565 : /* print attribute name */
566 0 : appendStringInfoChar(s, ' ');
567 0 : appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
568 :
569 : /* print attribute type */
570 0 : appendStringInfoChar(s, '[');
571 0 : appendStringInfoString(s, format_type_be(typid));
572 0 : appendStringInfoChar(s, ']');
573 :
574 : /* query output function */
575 0 : getTypeOutputInfo(typid,
576 : &typoutput, &typisvarlena);
577 :
578 : /* print separator */
579 0 : appendStringInfoChar(s, ':');
580 :
581 : /* print data */
582 0 : if (isnull)
583 0 : appendStringInfoString(s, "null");
584 0 : else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
585 0 : appendStringInfoString(s, "unchanged-toast-datum");
586 0 : else if (!typisvarlena)
587 0 : print_literal(s, typid,
588 0 : OidOutputFunctionCall(typoutput, origval));
589 : else
590 : {
591 0 : Datum val; /* definitely detoasted Datum */
592 :
593 0 : val = PointerGetDatum(PG_DETOAST_DATUM(origval));
594 0 : print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
595 0 : }
596 0 : }
597 0 : }
598 :
599 : /*
600 : * callback for individual changed tuples
601 : */
602 : static void
603 0 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
604 : Relation relation, ReorderBufferChange *change)
605 : {
606 0 : TestDecodingData *data;
607 0 : TestDecodingTxnData *txndata;
608 0 : Form_pg_class class_form;
609 0 : TupleDesc tupdesc;
610 0 : MemoryContext old;
611 :
612 0 : data = ctx->output_plugin_private;
613 0 : txndata = txn->output_plugin_private;
614 :
615 : /* output BEGIN if we haven't yet */
616 0 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
617 : {
618 0 : pg_output_begin(ctx, data, txn, false);
619 0 : }
620 0 : txndata->xact_wrote_changes = true;
621 :
622 0 : class_form = RelationGetForm(relation);
623 0 : tupdesc = RelationGetDescr(relation);
624 :
625 : /* Avoid leaking memory by using and resetting our own context */
626 0 : old = MemoryContextSwitchTo(data->context);
627 :
628 0 : OutputPluginPrepareWrite(ctx, true);
629 :
630 0 : appendStringInfoString(ctx->out, "table ");
631 0 : appendStringInfoString(ctx->out,
632 0 : quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
633 0 : class_form->relrewrite ?
634 0 : get_rel_name(class_form->relrewrite) :
635 0 : NameStr(class_form->relname)));
636 0 : appendStringInfoChar(ctx->out, ':');
637 :
638 0 : switch (change->action)
639 : {
640 : case REORDER_BUFFER_CHANGE_INSERT:
641 0 : appendStringInfoString(ctx->out, " INSERT:");
642 0 : if (change->data.tp.newtuple == NULL)
643 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
644 : else
645 0 : tuple_to_stringinfo(ctx->out, tupdesc,
646 0 : change->data.tp.newtuple,
647 : false);
648 0 : break;
649 : case REORDER_BUFFER_CHANGE_UPDATE:
650 0 : appendStringInfoString(ctx->out, " UPDATE:");
651 0 : if (change->data.tp.oldtuple != NULL)
652 : {
653 0 : appendStringInfoString(ctx->out, " old-key:");
654 0 : tuple_to_stringinfo(ctx->out, tupdesc,
655 0 : change->data.tp.oldtuple,
656 : true);
657 0 : appendStringInfoString(ctx->out, " new-tuple:");
658 0 : }
659 :
660 0 : if (change->data.tp.newtuple == NULL)
661 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
662 : else
663 0 : tuple_to_stringinfo(ctx->out, tupdesc,
664 0 : change->data.tp.newtuple,
665 : false);
666 0 : break;
667 : case REORDER_BUFFER_CHANGE_DELETE:
668 0 : appendStringInfoString(ctx->out, " DELETE:");
669 :
670 : /* if there was no PK, we only know that a delete happened */
671 0 : if (change->data.tp.oldtuple == NULL)
672 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
673 : /* In DELETE, only the replica identity is present; display that */
674 : else
675 0 : tuple_to_stringinfo(ctx->out, tupdesc,
676 0 : change->data.tp.oldtuple,
677 : true);
678 0 : break;
679 : default:
680 0 : Assert(false);
681 0 : }
682 :
683 0 : MemoryContextSwitchTo(old);
684 0 : MemoryContextReset(data->context);
685 :
686 0 : OutputPluginWrite(ctx, true);
687 0 : }
688 :
689 : static void
690 0 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
691 : int nrelations, Relation relations[], ReorderBufferChange *change)
692 : {
693 0 : TestDecodingData *data;
694 0 : TestDecodingTxnData *txndata;
695 0 : MemoryContext old;
696 0 : int i;
697 :
698 0 : data = ctx->output_plugin_private;
699 0 : txndata = txn->output_plugin_private;
700 :
701 : /* output BEGIN if we haven't yet */
702 0 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
703 : {
704 0 : pg_output_begin(ctx, data, txn, false);
705 0 : }
706 0 : txndata->xact_wrote_changes = true;
707 :
708 : /* Avoid leaking memory by using and resetting our own context */
709 0 : old = MemoryContextSwitchTo(data->context);
710 :
711 0 : OutputPluginPrepareWrite(ctx, true);
712 :
713 0 : appendStringInfoString(ctx->out, "table ");
714 :
715 0 : for (i = 0; i < nrelations; i++)
716 : {
717 0 : if (i > 0)
718 0 : appendStringInfoString(ctx->out, ", ");
719 :
720 0 : appendStringInfoString(ctx->out,
721 0 : quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
722 0 : NameStr(relations[i]->rd_rel->relname)));
723 0 : }
724 :
725 0 : appendStringInfoString(ctx->out, ": TRUNCATE:");
726 :
727 0 : if (change->data.truncate.restart_seqs
728 0 : || change->data.truncate.cascade)
729 : {
730 0 : if (change->data.truncate.restart_seqs)
731 0 : appendStringInfoString(ctx->out, " restart_seqs");
732 0 : if (change->data.truncate.cascade)
733 0 : appendStringInfoString(ctx->out, " cascade");
734 0 : }
735 : else
736 0 : appendStringInfoString(ctx->out, " (no-flags)");
737 :
738 0 : MemoryContextSwitchTo(old);
739 0 : MemoryContextReset(data->context);
740 :
741 0 : OutputPluginWrite(ctx, true);
742 0 : }
743 :
744 : static void
745 0 : pg_decode_message(LogicalDecodingContext *ctx,
746 : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
747 : const char *prefix, Size sz, const char *message)
748 : {
749 0 : TestDecodingData *data = ctx->output_plugin_private;
750 0 : TestDecodingTxnData *txndata;
751 :
752 0 : txndata = transactional ? txn->output_plugin_private : NULL;
753 :
754 : /* output BEGIN if we haven't yet for transactional messages */
755 0 : if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
756 0 : pg_output_begin(ctx, data, txn, false);
757 :
758 0 : if (transactional)
759 0 : txndata->xact_wrote_changes = true;
760 :
761 0 : OutputPluginPrepareWrite(ctx, true);
762 0 : appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
763 0 : transactional, prefix, sz);
764 0 : appendBinaryStringInfo(ctx->out, message, sz);
765 0 : OutputPluginWrite(ctx, true);
766 0 : }
767 :
768 : static void
769 0 : pg_decode_stream_start(LogicalDecodingContext *ctx,
770 : ReorderBufferTXN *txn)
771 : {
772 0 : TestDecodingData *data = ctx->output_plugin_private;
773 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
774 :
775 : /*
776 : * Allocate the txn plugin data for the first stream in the transaction.
777 : */
778 0 : if (txndata == NULL)
779 : {
780 0 : txndata =
781 0 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
782 0 : txndata->xact_wrote_changes = false;
783 0 : txn->output_plugin_private = txndata;
784 0 : }
785 :
786 0 : txndata->stream_wrote_changes = false;
787 0 : if (data->skip_empty_xacts)
788 0 : return;
789 0 : pg_output_stream_start(ctx, data, txn, true);
790 0 : }
791 :
792 : static void
793 0 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
794 : {
795 0 : OutputPluginPrepareWrite(ctx, last_write);
796 0 : if (data->include_xids)
797 0 : appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
798 : else
799 0 : appendStringInfoString(ctx->out, "opening a streamed block for transaction");
800 0 : OutputPluginWrite(ctx, last_write);
801 0 : }
802 :
803 : static void
804 0 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
805 : ReorderBufferTXN *txn)
806 : {
807 0 : TestDecodingData *data = ctx->output_plugin_private;
808 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
809 :
810 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
811 0 : return;
812 :
813 0 : OutputPluginPrepareWrite(ctx, true);
814 0 : if (data->include_xids)
815 0 : appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
816 : else
817 0 : appendStringInfoString(ctx->out, "closing a streamed block for transaction");
818 0 : OutputPluginWrite(ctx, true);
819 0 : }
820 :
821 : static void
822 0 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
823 : ReorderBufferTXN *txn,
824 : XLogRecPtr abort_lsn)
825 : {
826 0 : TestDecodingData *data = ctx->output_plugin_private;
827 :
828 : /*
829 : * stream abort can be sent for an individual subtransaction but we
830 : * maintain the output_plugin_private only under the toptxn so if this is
831 : * not the toptxn then fetch the toptxn.
832 : */
833 0 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
834 0 : TestDecodingTxnData *txndata = toptxn->output_plugin_private;
835 0 : bool xact_wrote_changes = txndata->xact_wrote_changes;
836 :
837 0 : if (rbtxn_is_toptxn(txn))
838 : {
839 0 : Assert(txn->output_plugin_private != NULL);
840 0 : pfree(txndata);
841 0 : txn->output_plugin_private = NULL;
842 0 : }
843 :
844 0 : if (data->skip_empty_xacts && !xact_wrote_changes)
845 0 : return;
846 :
847 0 : OutputPluginPrepareWrite(ctx, true);
848 0 : if (data->include_xids)
849 0 : appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
850 : else
851 0 : appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
852 0 : OutputPluginWrite(ctx, true);
853 0 : }
854 :
855 : static void
856 0 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
857 : ReorderBufferTXN *txn,
858 : XLogRecPtr prepare_lsn)
859 : {
860 0 : TestDecodingData *data = ctx->output_plugin_private;
861 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
862 :
863 0 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
864 0 : return;
865 :
866 0 : OutputPluginPrepareWrite(ctx, true);
867 :
868 0 : if (data->include_xids)
869 0 : appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
870 0 : quote_literal_cstr(txn->gid), txn->xid);
871 : else
872 0 : appendStringInfo(ctx->out, "preparing streamed transaction %s",
873 0 : quote_literal_cstr(txn->gid));
874 :
875 0 : if (data->include_timestamp)
876 0 : appendStringInfo(ctx->out, " (at %s)",
877 0 : timestamptz_to_str(txn->prepare_time));
878 :
879 0 : OutputPluginWrite(ctx, true);
880 0 : }
881 :
882 : static void
883 0 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
884 : ReorderBufferTXN *txn,
885 : XLogRecPtr commit_lsn)
886 : {
887 0 : TestDecodingData *data = ctx->output_plugin_private;
888 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
889 0 : bool xact_wrote_changes = txndata->xact_wrote_changes;
890 :
891 0 : pfree(txndata);
892 0 : txn->output_plugin_private = NULL;
893 :
894 0 : if (data->skip_empty_xacts && !xact_wrote_changes)
895 0 : return;
896 :
897 0 : OutputPluginPrepareWrite(ctx, true);
898 :
899 0 : if (data->include_xids)
900 0 : appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
901 : else
902 0 : appendStringInfoString(ctx->out, "committing streamed transaction");
903 :
904 0 : if (data->include_timestamp)
905 0 : appendStringInfo(ctx->out, " (at %s)",
906 0 : timestamptz_to_str(txn->commit_time));
907 :
908 0 : OutputPluginWrite(ctx, true);
909 0 : }
910 :
911 : /*
912 : * In streaming mode, we don't display the changes as the transaction can abort
913 : * at a later point in time. We don't want users to see the changes until the
914 : * transaction is committed.
915 : */
916 : static void
917 0 : pg_decode_stream_change(LogicalDecodingContext *ctx,
918 : ReorderBufferTXN *txn,
919 : Relation relation,
920 : ReorderBufferChange *change)
921 : {
922 0 : TestDecodingData *data = ctx->output_plugin_private;
923 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
924 :
925 : /* output stream start if we haven't yet */
926 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
927 : {
928 0 : pg_output_stream_start(ctx, data, txn, false);
929 0 : }
930 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
931 :
932 0 : OutputPluginPrepareWrite(ctx, true);
933 0 : if (data->include_xids)
934 0 : appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
935 : else
936 0 : appendStringInfoString(ctx->out, "streaming change for transaction");
937 0 : OutputPluginWrite(ctx, true);
938 0 : }
939 :
940 : /*
941 : * In streaming mode, we don't display the contents for transactional messages
942 : * as the transaction can abort at a later point in time. We don't want users to
943 : * see the message contents until the transaction is committed.
944 : */
945 : static void
946 0 : pg_decode_stream_message(LogicalDecodingContext *ctx,
947 : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
948 : const char *prefix, Size sz, const char *message)
949 : {
950 : /* Output stream start if we haven't yet for transactional messages. */
951 0 : if (transactional)
952 : {
953 0 : TestDecodingData *data = ctx->output_plugin_private;
954 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
955 :
956 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
957 : {
958 0 : pg_output_stream_start(ctx, data, txn, false);
959 0 : }
960 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
961 0 : }
962 :
963 0 : OutputPluginPrepareWrite(ctx, true);
964 :
965 0 : if (transactional)
966 : {
967 0 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
968 0 : transactional, prefix, sz);
969 0 : }
970 : else
971 : {
972 0 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
973 0 : transactional, prefix, sz);
974 0 : appendBinaryStringInfo(ctx->out, message, sz);
975 : }
976 :
977 0 : OutputPluginWrite(ctx, true);
978 0 : }
979 :
980 : /*
981 : * In streaming mode, we don't display the detailed information of Truncate.
982 : * See pg_decode_stream_change.
983 : */
984 : static void
985 0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
986 : int nrelations, Relation relations[],
987 : ReorderBufferChange *change)
988 : {
989 0 : TestDecodingData *data = ctx->output_plugin_private;
990 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
991 :
992 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
993 : {
994 0 : pg_output_stream_start(ctx, data, txn, false);
995 0 : }
996 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
997 :
998 0 : OutputPluginPrepareWrite(ctx, true);
999 0 : if (data->include_xids)
1000 0 : appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
1001 : else
1002 0 : appendStringInfoString(ctx->out, "streaming truncate for transaction");
1003 0 : OutputPluginWrite(ctx, true);
1004 0 : }
|