Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tcn.c
4 : * triggered change notification support for PostgreSQL
5 : *
6 : * Portions Copyright (c) 2011-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * contrib/tcn/tcn.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/htup_details.h"
19 : #include "commands/async.h"
20 : #include "commands/trigger.h"
21 : #include "executor/spi.h"
22 : #include "lib/stringinfo.h"
23 : #include "utils/rel.h"
24 : #include "utils/syscache.h"
25 :
26 0 : PG_MODULE_MAGIC_EXT(
27 : .name = "tcn",
28 : .version = PG_VERSION
29 : );
30 :
31 : /*
32 : * Copy from s (for source) to r (for result), wrapping with q (quote)
33 : * characters and doubling any quote characters found.
34 : */
35 : static void
36 0 : strcpy_quoted(StringInfo r, const char *s, const char q)
37 : {
38 0 : appendStringInfoCharMacro(r, q);
39 0 : while (*s)
40 : {
41 0 : if (*s == q)
42 0 : appendStringInfoCharMacro(r, q);
43 0 : appendStringInfoCharMacro(r, *s);
44 0 : s++;
45 : }
46 0 : appendStringInfoCharMacro(r, q);
47 0 : }
48 :
49 : /*
50 : * triggered_change_notification
51 : *
52 : * This trigger function will send a notification of data modification with
53 : * primary key values. The channel will be "tcn" unless the trigger is
54 : * created with a parameter, in which case that parameter will be used.
55 : */
56 0 : PG_FUNCTION_INFO_V1(triggered_change_notification);
57 :
58 : Datum
59 0 : triggered_change_notification(PG_FUNCTION_ARGS)
60 : {
61 0 : TriggerData *trigdata = (TriggerData *) fcinfo->context;
62 0 : Trigger *trigger;
63 0 : int nargs;
64 0 : HeapTuple trigtuple;
65 0 : Relation rel;
66 0 : TupleDesc tupdesc;
67 0 : char *channel;
68 0 : char operation;
69 0 : StringInfoData payload;
70 0 : bool foundPK;
71 :
72 0 : List *indexoidlist;
73 0 : ListCell *indexoidscan;
74 :
75 0 : initStringInfo(&payload);
76 : /* make sure it's called as a trigger */
77 0 : if (!CALLED_AS_TRIGGER(fcinfo))
78 0 : ereport(ERROR,
79 : (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
80 : errmsg("triggered_change_notification: must be called as trigger")));
81 :
82 : /* and that it's called after the change */
83 0 : if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
84 0 : ereport(ERROR,
85 : (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
86 : errmsg("triggered_change_notification: must be called after the change")));
87 :
88 : /* and that it's called for each row */
89 0 : if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
90 0 : ereport(ERROR,
91 : (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
92 : errmsg("triggered_change_notification: must be called for each row")));
93 :
94 0 : if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
95 0 : operation = 'I';
96 0 : else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
97 0 : operation = 'U';
98 0 : else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
99 0 : operation = 'D';
100 : else
101 : {
102 0 : elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
103 0 : operation = 'X'; /* silence compiler warning */
104 : }
105 :
106 0 : trigger = trigdata->tg_trigger;
107 0 : nargs = trigger->tgnargs;
108 0 : if (nargs > 1)
109 0 : ereport(ERROR,
110 : (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
111 : errmsg("triggered_change_notification: must not be called with more than one parameter")));
112 :
113 0 : if (nargs == 0)
114 0 : channel = "tcn";
115 : else
116 0 : channel = trigger->tgargs[0];
117 :
118 : /* get tuple data */
119 0 : trigtuple = trigdata->tg_trigtuple;
120 0 : rel = trigdata->tg_relation;
121 0 : tupdesc = rel->rd_att;
122 :
123 0 : foundPK = false;
124 :
125 : /*
126 : * Get the list of index OIDs for the table from the relcache, and look up
127 : * each one in the pg_index syscache until we find one marked primary key
128 : * (hopefully there isn't more than one such).
129 : */
130 0 : indexoidlist = RelationGetIndexList(rel);
131 :
132 0 : foreach(indexoidscan, indexoidlist)
133 : {
134 0 : Oid indexoid = lfirst_oid(indexoidscan);
135 0 : HeapTuple indexTuple;
136 0 : Form_pg_index index;
137 :
138 0 : indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
139 0 : if (!HeapTupleIsValid(indexTuple)) /* should not happen */
140 0 : elog(ERROR, "cache lookup failed for index %u", indexoid);
141 0 : index = (Form_pg_index) GETSTRUCT(indexTuple);
142 : /* we're only interested if it is the primary key and valid */
143 0 : if (index->indisprimary && index->indisvalid)
144 : {
145 0 : int indnkeyatts = index->indnkeyatts;
146 :
147 0 : if (indnkeyatts > 0)
148 : {
149 0 : int i;
150 :
151 0 : foundPK = true;
152 :
153 0 : strcpy_quoted(&payload, RelationGetRelationName(rel), '"');
154 0 : appendStringInfoCharMacro(&payload, ',');
155 0 : appendStringInfoCharMacro(&payload, operation);
156 :
157 0 : for (i = 0; i < indnkeyatts; i++)
158 : {
159 0 : int colno = index->indkey.values[i];
160 0 : Form_pg_attribute attr = TupleDescAttr(tupdesc, colno - 1);
161 :
162 0 : appendStringInfoCharMacro(&payload, ',');
163 0 : strcpy_quoted(&payload, NameStr(attr->attname), '"');
164 0 : appendStringInfoCharMacro(&payload, '=');
165 0 : strcpy_quoted(&payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
166 0 : }
167 :
168 0 : Async_Notify(channel, payload.data);
169 0 : }
170 0 : ReleaseSysCache(indexTuple);
171 : break;
172 0 : }
173 0 : ReleaseSysCache(indexTuple);
174 0 : }
175 :
176 0 : list_free(indexoidlist);
177 :
178 0 : if (!foundPK)
179 0 : ereport(ERROR,
180 : (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
181 : errmsg("triggered_change_notification: must be called on a table with a primary key")));
182 :
183 0 : return PointerGetDatum(NULL); /* after trigger; value doesn't matter */
184 0 : }
|