Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * publicationcmds.c
4 : : * publication manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/publicationcmds.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/htup_details.h"
18 : : #include "access/table.h"
19 : : #include "access/xact.h"
20 : : #include "catalog/catalog.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/namespace.h"
23 : : #include "catalog/objectaccess.h"
24 : : #include "catalog/objectaddress.h"
25 : : #include "catalog/pg_database.h"
26 : : #include "catalog/pg_inherits.h"
27 : : #include "catalog/pg_namespace.h"
28 : : #include "catalog/pg_proc.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "commands/defrem.h"
33 : : #include "commands/event_trigger.h"
34 : : #include "commands/publicationcmds.h"
35 : : #include "miscadmin.h"
36 : : #include "nodes/nodeFuncs.h"
37 : : #include "parser/parse_clause.h"
38 : : #include "parser/parse_collate.h"
39 : : #include "parser/parse_relation.h"
40 : : #include "rewrite/rewriteHandler.h"
41 : : #include "storage/lmgr.h"
42 : : #include "utils/acl.h"
43 : : #include "utils/builtins.h"
44 : : #include "utils/inval.h"
45 : : #include "utils/lsyscache.h"
46 : : #include "utils/rel.h"
47 : : #include "utils/syscache.h"
48 : : #include "utils/varlena.h"
49 : :
50 : :
51 : : /*
52 : : * Information used to validate the columns in the row filter expression. See
53 : : * contain_invalid_rfcolumn_walker for details.
54 : : */
55 : : typedef struct rf_context
56 : : {
57 : : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : : bool pubviaroot; /* true if we are validating the parent
59 : : * relation's row filter */
60 : : Oid relid; /* relid of the relation */
61 : : Oid parentid; /* relid of the parent relation */
62 : : } rf_context;
63 : :
64 : : static List *OpenTableList(List *tables);
65 : : static void CloseTableList(List *rels);
66 : : static void LockSchemaList(List *schemalist);
67 : : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : : AlterPublicationStmt *stmt);
69 : : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : : AlterPublicationStmt *stmt);
72 : : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : : static char defGetGeneratedColsOption(DefElem *def);
74 : :
75 : :
76 : : static void
77 : 113 : parse_publication_options(ParseState *pstate,
78 : : List *options,
79 : : bool *publish_given,
80 : : PublicationActions *pubactions,
81 : : bool *publish_via_partition_root_given,
82 : : bool *publish_via_partition_root,
83 : : bool *publish_generated_columns_given,
84 : : char *publish_generated_columns)
85 : : {
86 : 113 : ListCell *lc;
87 : :
88 : 113 : *publish_given = false;
89 : 113 : *publish_via_partition_root_given = false;
90 : 113 : *publish_generated_columns_given = false;
91 : :
92 : : /* defaults */
93 : 113 : pubactions->pubinsert = true;
94 : 113 : pubactions->pubupdate = true;
95 : 113 : pubactions->pubdelete = true;
96 : 113 : pubactions->pubtruncate = true;
97 : 113 : *publish_via_partition_root = false;
98 : 113 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99 : :
100 : : /* Parse options */
101 [ + + + + : 163 : foreach(lc, options)
+ + ]
102 : : {
103 : 54 : DefElem *defel = (DefElem *) lfirst(lc);
104 : :
105 [ + + ]: 54 : if (strcmp(defel->defname, "publish") == 0)
106 : : {
107 : 19 : char *publish;
108 : 19 : List *publish_list;
109 : 19 : ListCell *lc2;
110 : :
111 [ + - ]: 19 : if (*publish_given)
112 : 0 : errorConflictingDefElem(defel, pstate);
113 : :
114 : : /*
115 : : * If publish option was given only the explicitly listed actions
116 : : * should be published.
117 : : */
118 : 19 : pubactions->pubinsert = false;
119 : 19 : pubactions->pubupdate = false;
120 : 19 : pubactions->pubdelete = false;
121 : 19 : pubactions->pubtruncate = false;
122 : :
123 : 19 : *publish_given = true;
124 : :
125 : : /*
126 : : * SplitIdentifierString destructively modifies its input, so make
127 : : * a copy so we don't modify the memory of the executing statement
128 : : */
129 : 19 : publish = pstrdup(defGetString(defel));
130 : :
131 [ + - ]: 19 : if (!SplitIdentifierString(publish, ',', &publish_list))
132 [ # # # # ]: 0 : ereport(ERROR,
133 : : (errcode(ERRCODE_SYNTAX_ERROR),
134 : : errmsg("invalid list syntax in parameter \"%s\"",
135 : : "publish")));
136 : :
137 : : /* Process the option list. */
138 [ + - + + : 41 : foreach(lc2, publish_list)
+ + ]
139 : : {
140 : 23 : char *publish_opt = (char *) lfirst(lc2);
141 : :
142 [ + + ]: 23 : if (strcmp(publish_opt, "insert") == 0)
143 : 17 : pubactions->pubinsert = true;
144 [ + + ]: 6 : else if (strcmp(publish_opt, "update") == 0)
145 : 3 : pubactions->pubupdate = true;
146 [ + + ]: 3 : else if (strcmp(publish_opt, "delete") == 0)
147 : 1 : pubactions->pubdelete = true;
148 [ + + ]: 2 : else if (strcmp(publish_opt, "truncate") == 0)
149 : 1 : pubactions->pubtruncate = true;
150 : : else
151 [ + - + - ]: 1 : ereport(ERROR,
152 : : (errcode(ERRCODE_SYNTAX_ERROR),
153 : : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
154 : : "publish", publish_opt)));
155 : 22 : }
156 : 18 : }
157 [ + + ]: 35 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
158 : : {
159 [ + + ]: 22 : if (*publish_via_partition_root_given)
160 : 1 : errorConflictingDefElem(defel, pstate);
161 : 21 : *publish_via_partition_root_given = true;
162 : 21 : *publish_via_partition_root = defGetBoolean(defel);
163 : 21 : }
164 [ + + ]: 13 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
165 : : {
166 [ + + ]: 12 : if (*publish_generated_columns_given)
167 : 1 : errorConflictingDefElem(defel, pstate);
168 : 11 : *publish_generated_columns_given = true;
169 : 11 : *publish_generated_columns = defGetGeneratedColsOption(defel);
170 : 11 : }
171 : : else
172 [ + - + - ]: 1 : ereport(ERROR,
173 : : (errcode(ERRCODE_SYNTAX_ERROR),
174 : : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 : 50 : }
176 : 109 : }
177 : :
178 : : /*
179 : : * Convert the PublicationObjSpecType list into schema oid list and
180 : : * PublicationTable list.
181 : : */
182 : : static void
183 : 220 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
184 : : List **rels, List **schemas)
185 : : {
186 : 220 : ListCell *cell;
187 : 220 : PublicationObjSpec *pubobj;
188 : :
189 [ + + ]: 220 : if (!pubobjspec_list)
190 : 10 : return;
191 : :
192 [ + - + + : 453 : foreach(cell, pubobjspec_list)
+ + ]
193 : : {
194 : 244 : Oid schemaid;
195 : 244 : List *search_path;
196 : :
197 : 244 : pubobj = (PublicationObjSpec *) lfirst(cell);
198 : :
199 [ + - + + ]: 244 : switch (pubobj->pubobjtype)
200 : : {
201 : : case PUBLICATIONOBJ_TABLE:
202 : 182 : *rels = lappend(*rels, pubobj->pubtable);
203 : 182 : break;
204 : : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
205 : 58 : schemaid = get_namespace_oid(pubobj->name, false);
206 : :
207 : : /* Filter out duplicates if user specifies "sch1, sch1" */
208 : 58 : *schemas = list_append_unique_oid(*schemas, schemaid);
209 : 58 : break;
210 : : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
211 : 4 : search_path = fetch_search_path(false);
212 [ + + ]: 4 : if (search_path == NIL) /* nothing valid in search_path? */
213 [ + - + - ]: 1 : ereport(ERROR,
214 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
215 : : errmsg("no schema has been selected for CURRENT_SCHEMA"));
216 : :
217 : 3 : schemaid = linitial_oid(search_path);
218 : 3 : list_free(search_path);
219 : :
220 : : /* Filter out duplicates if user specifies "sch1, sch1" */
221 : 3 : *schemas = list_append_unique_oid(*schemas, schemaid);
222 : 3 : break;
223 : : default:
224 : : /* shouldn't happen */
225 [ # # # # ]: 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
226 : 0 : break;
227 : : }
228 : 243 : }
229 [ - + ]: 219 : }
230 : :
231 : : /*
232 : : * Returns true if any of the columns used in the row filter WHERE expression is
233 : : * not part of REPLICA IDENTITY, false otherwise.
234 : : */
235 : : static bool
236 : 38 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
237 : : {
238 [ + - ]: 38 : if (node == NULL)
239 : 0 : return false;
240 : :
241 [ + + ]: 38 : if (IsA(node, Var))
242 : : {
243 : 16 : Var *var = (Var *) node;
244 : 16 : AttrNumber attnum = var->varattno;
245 : :
246 : : /*
247 : : * If pubviaroot is true, we are validating the row filter of the
248 : : * parent table, but the bitmap contains the replica identity
249 : : * information of the child table. So, get the column number of the
250 : : * child table as parent and child column order could be different.
251 : : */
252 [ + + ]: 16 : if (context->pubviaroot)
253 : : {
254 : 2 : char *colname = get_attname(context->parentid, attnum, false);
255 : :
256 : 2 : attnum = get_attnum(context->relid, colname);
257 : 2 : }
258 : :
259 [ + + + + ]: 32 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
260 : 16 : context->bms_replident))
261 : 10 : return true;
262 [ - + + ]: 16 : }
263 : :
264 : 28 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
265 : : context);
266 : 38 : }
267 : :
268 : : /*
269 : : * Check if all columns referenced in the filter expression are part of the
270 : : * REPLICA IDENTITY index or not.
271 : : *
272 : : * Returns true if any invalid column is found.
273 : : */
274 : : bool
275 : 69 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
276 : : bool pubviaroot)
277 : : {
278 : 69 : HeapTuple rftuple;
279 : 69 : Oid relid = RelationGetRelid(relation);
280 : 69 : Oid publish_as_relid = RelationGetRelid(relation);
281 : 69 : bool result = false;
282 : 69 : Datum rfdatum;
283 : 69 : bool rfisnull;
284 : :
285 : : /*
286 : : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
287 : : * allowed in the row filter and we can skip the validation.
288 : : */
289 [ + + ]: 69 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
290 : 14 : return false;
291 : :
292 : : /*
293 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
294 : : * is published via this publication as we need to use its row filter
295 : : * expression to filter the partition's changes.
296 : : *
297 : : * Note that even though the row filter used is for an ancestor, the
298 : : * REPLICA IDENTITY used will be for the actual child table.
299 : : */
300 [ + + - + ]: 55 : if (pubviaroot && relation->rd_rel->relispartition)
301 : : {
302 : : publish_as_relid
303 : 14 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
304 : :
305 [ + - ]: 14 : if (!OidIsValid(publish_as_relid))
306 : 0 : publish_as_relid = relid;
307 : 14 : }
308 : :
309 : 55 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
310 : 55 : ObjectIdGetDatum(publish_as_relid),
311 : 55 : ObjectIdGetDatum(pubid));
312 : :
313 [ + + ]: 55 : if (!HeapTupleIsValid(rftuple))
314 : 6 : return false;
315 : :
316 : 49 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
317 : : Anum_pg_publication_rel_prqual,
318 : : &rfisnull);
319 : :
320 [ + + ]: 49 : if (!rfisnull)
321 : : {
322 : 16 : rf_context context = {0};
323 : 16 : Node *rfnode;
324 : 16 : Bitmapset *bms = NULL;
325 : :
326 : 16 : context.pubviaroot = pubviaroot;
327 : 16 : context.parentid = publish_as_relid;
328 : 16 : context.relid = relid;
329 : :
330 : : /* Remember columns that are part of the REPLICA IDENTITY */
331 : 16 : bms = RelationGetIndexAttrBitmap(relation,
332 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
333 : :
334 : 16 : context.bms_replident = bms;
335 : 16 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
336 : 16 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
337 : 16 : }
338 : :
339 : 49 : ReleaseSysCache(rftuple);
340 : :
341 : 49 : return result;
342 : 69 : }
343 : :
344 : : /*
345 : : * Check for invalid columns in the publication table definition.
346 : : *
347 : : * This function evaluates two conditions:
348 : : *
349 : : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
350 : : * by the column list. If any column is missing, *invalid_column_list is set
351 : : * to true.
352 : : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
353 : : * are published, either by being explicitly named in the column list or, if
354 : : * no column list is specified, by setting the option
355 : : * publish_generated_columns to stored. If any unpublished
356 : : * generated column is found, *invalid_gen_col is set to true.
357 : : *
358 : : * Returns true if any of the above conditions are not met.
359 : : */
360 : : bool
361 : 74 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
362 : : bool pubviaroot, char pubgencols_type,
363 : : bool *invalid_column_list,
364 : : bool *invalid_gen_col)
365 : : {
366 : 74 : Oid relid = RelationGetRelid(relation);
367 : 74 : Oid publish_as_relid = RelationGetRelid(relation);
368 : 74 : Bitmapset *idattrs;
369 : 74 : Bitmapset *columns = NULL;
370 : 74 : TupleDesc desc = RelationGetDescr(relation);
371 : 74 : Publication *pub;
372 : 74 : int x;
373 : :
374 : 74 : *invalid_column_list = false;
375 : 74 : *invalid_gen_col = false;
376 : :
377 : : /*
378 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
379 : : * is published via this publication as we need to use its column list for
380 : : * the changes.
381 : : *
382 : : * Note that even though the column list used is for an ancestor, the
383 : : * REPLICA IDENTITY used will be for the actual child table.
384 : : */
385 [ + + - + ]: 74 : if (pubviaroot && relation->rd_rel->relispartition)
386 : : {
387 : 17 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
388 : :
389 [ + - ]: 17 : if (!OidIsValid(publish_as_relid))
390 : 0 : publish_as_relid = relid;
391 : 17 : }
392 : :
393 : : /* Fetch the column list */
394 : 74 : pub = GetPublication(pubid);
395 : 74 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
396 : :
397 [ + + ]: 74 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
398 : : {
399 : : /* With REPLICA IDENTITY FULL, no column list is allowed. */
400 : 14 : *invalid_column_list = (columns != NULL);
401 : :
402 : : /*
403 : : * As we don't allow a column list with REPLICA IDENTITY FULL, the
404 : : * publish_generated_columns option must be set to stored if the table
405 : : * has any stored generated columns.
406 : : */
407 [ + + ]: 14 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
408 [ + + + + ]: 12 : relation->rd_att->constr &&
409 : 8 : relation->rd_att->constr->has_generated_stored)
410 : 1 : *invalid_gen_col = true;
411 : :
412 : : /*
413 : : * Virtual generated columns are currently not supported for logical
414 : : * replication at all.
415 : : */
416 [ + + + + ]: 14 : if (relation->rd_att->constr &&
417 : 10 : relation->rd_att->constr->has_generated_virtual)
418 : 2 : *invalid_gen_col = true;
419 : :
420 [ + + + - ]: 14 : if (*invalid_gen_col && *invalid_column_list)
421 : 0 : return true;
422 : 14 : }
423 : :
424 : : /* Remember columns that are part of the REPLICA IDENTITY */
425 : 74 : idattrs = RelationGetIndexAttrBitmap(relation,
426 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
427 : :
428 : : /*
429 : : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
430 : : * (to handle system columns the usual way), while column list does not
431 : : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
432 : : * over the idattrs and check all of them are in the list.
433 : : */
434 : 74 : x = -1;
435 [ + + ]: 123 : while ((x = bms_next_member(idattrs, x)) >= 0)
436 : : {
437 : 49 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
438 : 49 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
439 : :
440 [ + + ]: 49 : if (columns == NULL)
441 : : {
442 : : /*
443 : : * The publish_generated_columns option must be set to stored if
444 : : * the REPLICA IDENTITY contains any stored generated column.
445 : : */
446 [ + + - + ]: 19 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
447 : : {
448 : 1 : *invalid_gen_col = true;
449 : 1 : break;
450 : : }
451 : :
452 : : /*
453 : : * The equivalent setting for virtual generated columns does not
454 : : * exist yet.
455 : : */
456 [ - + ]: 18 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
457 : : {
458 : 0 : *invalid_gen_col = true;
459 : 0 : break;
460 : : }
461 : :
462 : : /* Skip validating the column list since it is not defined */
463 : 18 : continue;
464 : : }
465 : :
466 : : /*
467 : : * If pubviaroot is true, we are validating the column list of the
468 : : * parent table, but the bitmap contains the replica identity
469 : : * information of the child table. The parent/child attnums may not
470 : : * match, so translate them to the parent - get the attname from the
471 : : * child, and look it up in the parent.
472 : : */
473 [ + + ]: 30 : if (pubviaroot)
474 : : {
475 : : /* attribute name in the child table */
476 : 11 : char *colname = get_attname(relid, attnum, false);
477 : :
478 : : /*
479 : : * Determine the attnum for the attribute name in parent (we are
480 : : * using the column list defined on the parent).
481 : : */
482 : 11 : attnum = get_attnum(publish_as_relid, colname);
483 : 11 : }
484 : :
485 : : /* replica identity column, not covered by the column list */
486 : 30 : *invalid_column_list |= !bms_is_member(attnum, columns);
487 : :
488 [ + + + - ]: 30 : if (*invalid_column_list && *invalid_gen_col)
489 : 0 : break;
490 [ - + + ]: 49 : }
491 : :
492 : 74 : bms_free(columns);
493 : 74 : bms_free(idattrs);
494 : :
495 [ + + ]: 74 : return *invalid_column_list || *invalid_gen_col;
496 : 74 : }
497 : :
498 : : /*
499 : : * Invalidate entries in the RelationSyncCache for relations included in the
500 : : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
501 : : *
502 : : * If 'puballtables' is true, invalidate all cache entries.
503 : : */
504 : : void
505 : 2 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
506 : : {
507 [ - + ]: 2 : if (puballtables)
508 : : {
509 : 0 : CacheInvalidateRelSyncAll();
510 : 0 : }
511 : : else
512 : : {
513 : 2 : List *relids = NIL;
514 : 2 : List *schemarelids = NIL;
515 : :
516 : : /*
517 : : * For partitioned tables, we must invalidate all partitions and
518 : : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
519 : : * a target. However, WAL records for TRUNCATE specify both a root and
520 : : * its leaves.
521 : : */
522 : 2 : relids = GetPublicationRelations(pubid,
523 : : PUBLICATION_PART_ALL);
524 : 2 : schemarelids = GetAllSchemaPublicationRelations(pubid,
525 : : PUBLICATION_PART_ALL);
526 : :
527 : 2 : relids = list_concat_unique_oid(relids, schemarelids);
528 : :
529 : : /* Invalidate the relsyncache */
530 [ + + - + : 4 : foreach_oid(relid, relids)
# # - + ]
531 : 2 : CacheInvalidateRelSync(relid);
532 : 2 : }
533 : :
534 : 2 : return;
535 : : }
536 : :
537 : : /* check_functions_in_node callback */
538 : : static bool
539 : 60 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
540 : : {
541 [ + + ]: 60 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
542 : 59 : func_id >= FirstNormalObjectId);
543 : : }
544 : :
545 : : /*
546 : : * The row filter walker checks if the row filter expression is a "simple
547 : : * expression".
548 : : *
549 : : * It allows only simple or compound expressions such as:
550 : : * - (Var Op Const)
551 : : * - (Var Op Var)
552 : : * - (Var Op Const) AND/OR (Var Op Const)
553 : : * - etc
554 : : * (where Var is a column of the table this filter belongs to)
555 : : *
556 : : * The simple expression has the following restrictions:
557 : : * - User-defined operators are not allowed;
558 : : * - User-defined functions are not allowed;
559 : : * - User-defined types are not allowed;
560 : : * - User-defined collations are not allowed;
561 : : * - Non-immutable built-in functions are not allowed;
562 : : * - System columns are not allowed.
563 : : *
564 : : * NOTES
565 : : *
566 : : * We don't allow user-defined functions/operators/types/collations because
567 : : * (a) if a user drops a user-defined object used in a row filter expression or
568 : : * if there is any other error while using it, the logical decoding
569 : : * infrastructure won't be able to recover from such an error even if the
570 : : * object is recreated again because a historic snapshot is used to evaluate
571 : : * the row filter;
572 : : * (b) a user-defined function can be used to access tables that could have
573 : : * unpleasant results because a historic snapshot is used. That's why only
574 : : * immutable built-in functions are allowed in row filter expressions.
575 : : *
576 : : * We don't allow system columns because currently, we don't have that
577 : : * information in the tuple passed to downstream. Also, as we don't replicate
578 : : * those to subscribers, there doesn't seem to be a need for a filter on those
579 : : * columns.
580 : : *
581 : : * We can allow other node types after more analysis and testing.
582 : : */
583 : : static bool
584 : 203 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
585 : : {
586 : 203 : char *errdetail_msg = NULL;
587 : :
588 [ + + ]: 203 : if (node == NULL)
589 : 1 : return false;
590 : :
591 [ + + + + : 202 : switch (nodeTag(node))
+ + ]
592 : : {
593 : : case T_Var:
594 : : /* System columns are not allowed. */
595 [ + + ]: 54 : if (((Var *) node)->varattno < InvalidAttrNumber)
596 : 1 : errdetail_msg = _("System columns are not allowed.");
597 : 54 : break;
598 : : case T_OpExpr:
599 : : case T_DistinctExpr:
600 : : case T_NullIfExpr:
601 : : /* OK, except user-defined operators are not allowed. */
602 [ + + ]: 53 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
603 : 1 : errdetail_msg = _("User-defined operators are not allowed.");
604 : 53 : break;
605 : : case T_ScalarArrayOpExpr:
606 : : /* OK, except user-defined operators are not allowed. */
607 [ + - ]: 1 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
608 : 0 : errdetail_msg = _("User-defined operators are not allowed.");
609 : :
610 : : /*
611 : : * We don't need to check the hashfuncid and negfuncid of
612 : : * ScalarArrayOpExpr as those functions are only built for a
613 : : * subquery.
614 : : */
615 : 1 : break;
616 : : case T_RowCompareExpr:
617 : : {
618 : 1 : ListCell *opid;
619 : :
620 : : /* OK, except user-defined operators are not allowed. */
621 [ + - + + : 3 : foreach(opid, ((RowCompareExpr *) node)->opnos)
+ + ]
622 : : {
623 [ - + ]: 2 : if (lfirst_oid(opid) >= FirstNormalObjectId)
624 : : {
625 : 0 : errdetail_msg = _("User-defined operators are not allowed.");
626 : 0 : break;
627 : : }
628 : 2 : }
629 : 1 : }
630 : 1 : break;
631 : : case T_Const:
632 : : case T_FuncExpr:
633 : : case T_BoolExpr:
634 : : case T_RelabelType:
635 : : case T_CollateExpr:
636 : : case T_CaseExpr:
637 : : case T_CaseTestExpr:
638 : : case T_ArrayExpr:
639 : : case T_RowExpr:
640 : : case T_CoalesceExpr:
641 : : case T_MinMaxExpr:
642 : : case T_XmlExpr:
643 : : case T_NullTest:
644 : : case T_BooleanTest:
645 : : case T_List:
646 : : /* OK, supported */
647 : 92 : break;
648 : : default:
649 : 1 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
650 : 1 : break;
651 : : }
652 : :
653 : : /*
654 : : * For all the supported nodes, if we haven't already found a problem,
655 : : * check the types, functions, and collations used in it. We check List
656 : : * by walking through each element.
657 : : */
658 [ + + + + ]: 202 : if (!errdetail_msg && !IsA(node, List))
659 : : {
660 [ + + ]: 190 : if (exprType(node) >= FirstNormalObjectId)
661 : 1 : errdetail_msg = _("User-defined types are not allowed.");
662 [ + + + + ]: 378 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
663 : 189 : pstate))
664 : 2 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
665 [ + - + + ]: 187 : else if (exprCollation(node) >= FirstNormalObjectId ||
666 : 187 : exprInputCollation(node) >= FirstNormalObjectId)
667 : 1 : errdetail_msg = _("User-defined collations are not allowed.");
668 : 190 : }
669 : :
670 : : /*
671 : : * If we found a problem in this node, throw error now. Otherwise keep
672 : : * going.
673 : : */
674 [ + + ]: 202 : if (errdetail_msg)
675 [ + - + - ]: 7 : ereport(ERROR,
676 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
677 : : errmsg("invalid publication WHERE expression"),
678 : : errdetail_internal("%s", errdetail_msg),
679 : : parser_errposition(pstate, exprLocation(node))));
680 : :
681 : 195 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
682 : : pstate);
683 : 196 : }
684 : :
685 : : /*
686 : : * Check if the row filter expression is a "simple expression".
687 : : *
688 : : * See check_simple_rowfilter_expr_walker for details.
689 : : */
690 : : static bool
691 : 53 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
692 : : {
693 : 53 : return check_simple_rowfilter_expr_walker(node, pstate);
694 : : }
695 : :
696 : : /*
697 : : * Transform the publication WHERE expression for all the relations in the list,
698 : : * ensuring it is coerced to boolean and necessary collation information is
699 : : * added if required, and add a new nsitem/RTE for the associated relation to
700 : : * the ParseState's namespace list.
701 : : *
702 : : * Also check the publication row filter expression and throw an error if
703 : : * anything not permitted or unexpected is encountered.
704 : : */
705 : : static void
706 : 142 : TransformPubWhereClauses(List *tables, const char *queryString,
707 : : bool pubviaroot)
708 : : {
709 : 142 : ListCell *lc;
710 : :
711 [ + + + + : 295 : foreach(lc, tables)
+ + ]
712 : : {
713 : 154 : ParseNamespaceItem *nsitem;
714 : 154 : Node *whereclause = NULL;
715 : 154 : ParseState *pstate;
716 : 154 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
717 : :
718 [ + + ]: 154 : if (pri->whereClause == NULL)
719 : 98 : continue;
720 : :
721 : : /*
722 : : * If the publication doesn't publish changes via the root partitioned
723 : : * table, the partition's row filter will be used. So disallow using
724 : : * WHERE clause on partitioned table in this case.
725 : : */
726 [ + + + + ]: 56 : if (!pubviaroot &&
727 : 53 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
728 [ + - + - ]: 1 : ereport(ERROR,
729 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
730 : : errmsg("cannot use publication WHERE clause for relation \"%s\"",
731 : : RelationGetRelationName(pri->relation)),
732 : : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
733 : : "publish_via_partition_root")));
734 : :
735 : : /*
736 : : * A fresh pstate is required so that we only have "this" table in its
737 : : * rangetable
738 : : */
739 : 55 : pstate = make_parsestate(NULL);
740 : 55 : pstate->p_sourcetext = queryString;
741 : 55 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
742 : : AccessShareLock, NULL,
743 : : false, false);
744 : 55 : addNSItemToQuery(pstate, nsitem, false, true, true);
745 : :
746 : 110 : whereclause = transformWhereClause(pstate,
747 : 55 : copyObject(pri->whereClause),
748 : : EXPR_KIND_WHERE,
749 : : "PUBLICATION WHERE");
750 : :
751 : : /* Fix up collation information */
752 : 55 : assign_expr_collations(pstate, whereclause);
753 : :
754 : 55 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
755 : :
756 : : /*
757 : : * We allow only simple expressions in row filters. See
758 : : * check_simple_rowfilter_expr_walker.
759 : : */
760 : 55 : check_simple_rowfilter_expr(whereclause, pstate);
761 : :
762 : 55 : free_parsestate(pstate);
763 : :
764 : 55 : pri->whereClause = whereclause;
765 [ - + + ]: 153 : }
766 : 141 : }
767 : :
768 : :
769 : : /*
770 : : * Given a list of tables that are going to be added to a publication,
771 : : * verify that they fulfill the necessary preconditions, namely: no tables
772 : : * have a column list if any schema is published; and partitioned tables do
773 : : * not have column lists if publish_via_partition_root is not set.
774 : : *
775 : : * 'publish_schema' indicates that the publication contains any TABLES IN
776 : : * SCHEMA elements (newly added in this command, or preexisting).
777 : : * 'pubviaroot' is the value of publish_via_partition_root.
778 : : */
779 : : static void
780 : 141 : CheckPubRelationColumnList(char *pubname, List *tables,
781 : : bool publish_schema, bool pubviaroot)
782 : : {
783 : 141 : ListCell *lc;
784 : :
785 [ + + + + : 280 : foreach(lc, tables)
+ + ]
786 : : {
787 : 144 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
788 : :
789 [ + + ]: 144 : if (pri->columns == NIL)
790 : 90 : continue;
791 : :
792 : : /*
793 : : * Disallow specifying column list if any schema is in the
794 : : * publication.
795 : : *
796 : : * XXX We could instead just forbid the case when the publication
797 : : * tries to publish the table with a column list and a schema for that
798 : : * table. However, if we do that then we need a restriction during
799 : : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
800 : : * seem to be a good idea.
801 : : */
802 [ + + ]: 54 : if (publish_schema)
803 [ + - + - ]: 4 : ereport(ERROR,
804 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
805 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
806 : : get_namespace_name(RelationGetNamespace(pri->relation)),
807 : : RelationGetRelationName(pri->relation), pubname),
808 : : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
809 : :
810 : : /*
811 : : * If the publication doesn't publish changes via the root partitioned
812 : : * table, the partition's column list will be used. So disallow using
813 : : * a column list on the partitioned table in this case.
814 : : */
815 [ + + + + ]: 50 : if (!pubviaroot &&
816 : 42 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
817 [ + - + - ]: 1 : ereport(ERROR,
818 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
819 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
820 : : get_namespace_name(RelationGetNamespace(pri->relation)),
821 : : RelationGetRelationName(pri->relation), pubname),
822 : : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
823 : : "publish_via_partition_root")));
824 [ - + + ]: 139 : }
825 : 136 : }
826 : :
827 : : /*
828 : : * Create new publication.
829 : : */
830 : : ObjectAddress
831 : 70 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
832 : : {
833 : 70 : Relation rel;
834 : : ObjectAddress myself;
835 : 70 : Oid puboid;
836 : 70 : bool nulls[Natts_pg_publication];
837 : 70 : Datum values[Natts_pg_publication];
838 : 70 : HeapTuple tup;
839 : 70 : bool publish_given;
840 : 70 : PublicationActions pubactions;
841 : 70 : bool publish_via_partition_root_given;
842 : 70 : bool publish_via_partition_root;
843 : 70 : bool publish_generated_columns_given;
844 : 70 : char publish_generated_columns;
845 : 70 : AclResult aclresult;
846 : 70 : List *relations = NIL;
847 : 70 : List *schemaidlist = NIL;
848 : :
849 : : /* must have CREATE privilege on database */
850 : 70 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
851 [ + + ]: 70 : if (aclresult != ACLCHECK_OK)
852 : 2 : aclcheck_error(aclresult, OBJECT_DATABASE,
853 : 1 : get_database_name(MyDatabaseId));
854 : :
855 : : /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
856 [ + + ]: 70 : if (!superuser())
857 : : {
858 [ + - ]: 3 : if (stmt->for_all_tables || stmt->for_all_sequences)
859 [ # # # # ]: 0 : ereport(ERROR,
860 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
861 : : errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
862 : 3 : }
863 : :
864 : 70 : rel = table_open(PublicationRelationId, RowExclusiveLock);
865 : :
866 : : /* Check if name is used */
867 : 70 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
868 : : CStringGetDatum(stmt->pubname));
869 [ + + ]: 70 : if (OidIsValid(puboid))
870 [ + - + - ]: 1 : ereport(ERROR,
871 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
872 : : errmsg("publication \"%s\" already exists",
873 : : stmt->pubname)));
874 : :
875 : : /* Form a tuple. */
876 : 69 : memset(values, 0, sizeof(values));
877 : 69 : memset(nulls, false, sizeof(nulls));
878 : :
879 : 69 : values[Anum_pg_publication_pubname - 1] =
880 : 69 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
881 : 69 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
882 : :
883 : 138 : parse_publication_options(pstate,
884 : 69 : stmt->options,
885 : : &publish_given, &pubactions,
886 : : &publish_via_partition_root_given,
887 : : &publish_via_partition_root,
888 : : &publish_generated_columns_given,
889 : : &publish_generated_columns);
890 : :
891 [ + + - + ]: 72 : if (stmt->for_all_sequences &&
892 [ + - + - ]: 3 : (publish_given || publish_via_partition_root_given ||
893 : 3 : publish_generated_columns_given))
894 [ # # # # ]: 0 : ereport(NOTICE,
895 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 : : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
897 : :
898 : 69 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
899 : : Anum_pg_publication_oid);
900 : 69 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
901 : 69 : values[Anum_pg_publication_puballtables - 1] =
902 : 69 : BoolGetDatum(stmt->for_all_tables);
903 : 69 : values[Anum_pg_publication_puballsequences - 1] =
904 : 69 : BoolGetDatum(stmt->for_all_sequences);
905 : 69 : values[Anum_pg_publication_pubinsert - 1] =
906 : 69 : BoolGetDatum(pubactions.pubinsert);
907 : 69 : values[Anum_pg_publication_pubupdate - 1] =
908 : 69 : BoolGetDatum(pubactions.pubupdate);
909 : 69 : values[Anum_pg_publication_pubdelete - 1] =
910 : 69 : BoolGetDatum(pubactions.pubdelete);
911 : 69 : values[Anum_pg_publication_pubtruncate - 1] =
912 : 69 : BoolGetDatum(pubactions.pubtruncate);
913 : 69 : values[Anum_pg_publication_pubviaroot - 1] =
914 : 69 : BoolGetDatum(publish_via_partition_root);
915 : 69 : values[Anum_pg_publication_pubgencols - 1] =
916 : 69 : CharGetDatum(publish_generated_columns);
917 : :
918 : 69 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
919 : :
920 : : /* Insert tuple into catalog. */
921 : 69 : CatalogTupleInsert(rel, tup);
922 : 69 : heap_freetuple(tup);
923 : :
924 : 69 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
925 : :
926 : 69 : ObjectAddressSet(myself, PublicationRelationId, puboid);
927 : :
928 : : /* Make the changes visible. */
929 : 69 : CommandCounterIncrement();
930 : :
931 : : /* Associate objects with the publication. */
932 [ + + ]: 69 : if (stmt->for_all_tables)
933 : : {
934 : : /*
935 : : * Invalidate relcache so that publication info is rebuilt. Sequences
936 : : * publication doesn't require invalidation, as replica identity
937 : : * checks don't apply to them.
938 : : */
939 : 7 : CacheInvalidateRelcacheAll();
940 : 7 : }
941 [ + + ]: 62 : else if (!stmt->for_all_sequences)
942 : : {
943 : 60 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
944 : : &schemaidlist);
945 : :
946 : : /* FOR TABLES IN SCHEMA requires superuser */
947 [ + + + + ]: 60 : if (schemaidlist != NIL && !superuser())
948 [ + - + - ]: 1 : ereport(ERROR,
949 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
950 : : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
951 : :
952 [ + + ]: 59 : if (relations != NIL)
953 : : {
954 : 45 : List *rels;
955 : :
956 : 45 : rels = OpenTableList(relations);
957 : 90 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
958 : 45 : publish_via_partition_root);
959 : :
960 : 90 : CheckPubRelationColumnList(stmt->pubname, rels,
961 : 45 : schemaidlist != NIL,
962 : 45 : publish_via_partition_root);
963 : :
964 : 45 : PublicationAddTables(puboid, rels, true, NULL);
965 : 45 : CloseTableList(rels);
966 : 45 : }
967 : :
968 [ + + ]: 59 : if (schemaidlist != NIL)
969 : : {
970 : : /*
971 : : * Schema lock is held until the publication is created to prevent
972 : : * concurrent schema deletion.
973 : : */
974 : 24 : LockSchemaList(schemaidlist);
975 : 24 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
976 : 24 : }
977 : 59 : }
978 : :
979 : 68 : table_close(rel, RowExclusiveLock);
980 : :
981 [ + - ]: 68 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
982 : :
983 : : /*
984 : : * We don't need this warning message when wal_level >= 'replica' since
985 : : * logical decoding is automatically enabled up on a logical slot
986 : : * creation.
987 : : */
988 [ - + ]: 68 : if (wal_level < WAL_LEVEL_REPLICA)
989 [ - + + - ]: 68 : ereport(WARNING,
990 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
991 : : errmsg("logical decoding must be enabled to publish logical changes"),
992 : : errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
993 : :
994 : : return myself;
995 : 68 : }
996 : :
997 : : /*
998 : : * Change options of a publication.
999 : : */
1000 : : static void
1001 : 20 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
1002 : : Relation rel, HeapTuple tup)
1003 : : {
1004 : 20 : bool nulls[Natts_pg_publication];
1005 : 20 : bool replaces[Natts_pg_publication];
1006 : 20 : Datum values[Natts_pg_publication];
1007 : 20 : bool publish_given;
1008 : 20 : PublicationActions pubactions;
1009 : 20 : bool publish_via_partition_root_given;
1010 : 20 : bool publish_via_partition_root;
1011 : 20 : bool publish_generated_columns_given;
1012 : 20 : char publish_generated_columns;
1013 : 20 : ObjectAddress obj;
1014 : 20 : Form_pg_publication pubform;
1015 : 20 : List *root_relids = NIL;
1016 : 20 : ListCell *lc;
1017 : :
1018 : 20 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1019 : :
1020 : 40 : parse_publication_options(pstate,
1021 : 20 : stmt->options,
1022 : : &publish_given, &pubactions,
1023 : : &publish_via_partition_root_given,
1024 : : &publish_via_partition_root,
1025 : : &publish_generated_columns_given,
1026 : : &publish_generated_columns);
1027 : :
1028 [ + + + - ]: 21 : if (pubform->puballsequences &&
1029 [ + + + - ]: 2 : (publish_given || publish_via_partition_root_given ||
1030 : 1 : publish_generated_columns_given))
1031 [ - + + - ]: 2 : ereport(NOTICE,
1032 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1033 : : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1034 : :
1035 : : /*
1036 : : * If the publication doesn't publish changes via the root partitioned
1037 : : * table, the partition's row filter and column list will be used. So
1038 : : * disallow using WHERE clause and column lists on partitioned table in
1039 : : * this case.
1040 : : */
1041 [ + + + + : 20 : if (!pubform->puballtables && publish_via_partition_root_given &&
+ + ]
1042 : 13 : !publish_via_partition_root)
1043 : : {
1044 : : /*
1045 : : * Lock the publication so nobody else can do anything with it. This
1046 : : * prevents concurrent alter to add partitioned table(s) with WHERE
1047 : : * clause(s) and/or column lists which we don't allow when not
1048 : : * publishing via root.
1049 : : */
1050 : 8 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1051 : : AccessShareLock);
1052 : :
1053 : 8 : root_relids = GetPublicationRelations(pubform->oid,
1054 : : PUBLICATION_PART_ROOT);
1055 : :
1056 [ + - + + : 14 : foreach(lc, root_relids)
+ + ]
1057 : : {
1058 : 8 : Oid relid = lfirst_oid(lc);
1059 : 8 : HeapTuple rftuple;
1060 : 8 : char relkind;
1061 : 8 : char *relname;
1062 : 8 : bool has_rowfilter;
1063 : 8 : bool has_collist;
1064 : :
1065 : : /*
1066 : : * Beware: we don't have lock on the relations, so cope silently
1067 : : * with the cache lookups returning NULL.
1068 : : */
1069 : :
1070 : 8 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1071 : 8 : ObjectIdGetDatum(relid),
1072 : 8 : ObjectIdGetDatum(pubform->oid));
1073 [ + - ]: 8 : if (!HeapTupleIsValid(rftuple))
1074 : 0 : continue;
1075 : 8 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1076 : 8 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1077 [ + + + + ]: 8 : if (!has_rowfilter && !has_collist)
1078 : : {
1079 : 2 : ReleaseSysCache(rftuple);
1080 : 2 : continue;
1081 : : }
1082 : :
1083 : 6 : relkind = get_rel_relkind(relid);
1084 [ + + ]: 6 : if (relkind != RELKIND_PARTITIONED_TABLE)
1085 : : {
1086 : 4 : ReleaseSysCache(rftuple);
1087 : 4 : continue;
1088 : : }
1089 : 2 : relname = get_rel_name(relid);
1090 [ - + ]: 2 : if (relname == NULL) /* table concurrently dropped */
1091 : : {
1092 : 0 : ReleaseSysCache(rftuple);
1093 : 0 : continue;
1094 : : }
1095 : :
1096 [ + + ]: 2 : if (has_rowfilter)
1097 [ + - + - ]: 1 : ereport(ERROR,
1098 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1099 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1100 : : "publish_via_partition_root",
1101 : : stmt->pubname),
1102 : : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1103 : : relname, "publish_via_partition_root")));
1104 [ + - ]: 1 : Assert(has_collist);
1105 [ + - + - ]: 1 : ereport(ERROR,
1106 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1107 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1108 : : "publish_via_partition_root",
1109 : : stmt->pubname),
1110 : : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1111 : : relname, "publish_via_partition_root")));
1112 [ - + - ]: 6 : }
1113 : 6 : }
1114 : :
1115 : : /* Everything ok, form a new tuple. */
1116 : 18 : memset(values, 0, sizeof(values));
1117 : 18 : memset(nulls, false, sizeof(nulls));
1118 : 18 : memset(replaces, false, sizeof(replaces));
1119 : :
1120 [ + + ]: 18 : if (publish_given)
1121 : : {
1122 : 5 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1123 : 5 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1124 : :
1125 : 5 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1126 : 5 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1127 : :
1128 : 5 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1129 : 5 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1130 : :
1131 : 5 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1132 : 5 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1133 : 5 : }
1134 : :
1135 [ + + ]: 18 : if (publish_via_partition_root_given)
1136 : : {
1137 : 11 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1138 : 11 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1139 : 11 : }
1140 : :
1141 [ + + ]: 18 : if (publish_generated_columns_given)
1142 : : {
1143 : 2 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1144 : 2 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1145 : 2 : }
1146 : :
1147 : 36 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1148 : 18 : replaces);
1149 : :
1150 : : /* Update the catalog. */
1151 : 18 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1152 : :
1153 : 18 : CommandCounterIncrement();
1154 : :
1155 : 18 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1156 : :
1157 : : /* Invalidate the relcache. */
1158 [ + + ]: 18 : if (pubform->puballtables)
1159 : : {
1160 : 3 : CacheInvalidateRelcacheAll();
1161 : 3 : }
1162 : : else
1163 : : {
1164 : 15 : List *relids = NIL;
1165 : 15 : List *schemarelids = NIL;
1166 : :
1167 : : /*
1168 : : * For any partitioned tables contained in the publication, we must
1169 : : * invalidate all partitions contained in the respective partition
1170 : : * trees, not just those explicitly mentioned in the publication.
1171 : : */
1172 [ + + ]: 15 : if (root_relids == NIL)
1173 : 9 : relids = GetPublicationRelations(pubform->oid,
1174 : : PUBLICATION_PART_ALL);
1175 : : else
1176 : : {
1177 : : /*
1178 : : * We already got tables explicitly mentioned in the publication.
1179 : : * Now get all partitions for the partitioned table in the list.
1180 : : */
1181 [ + - + + : 12 : foreach(lc, root_relids)
+ + ]
1182 : 12 : relids = GetPubPartitionOptionRelations(relids,
1183 : : PUBLICATION_PART_ALL,
1184 : 6 : lfirst_oid(lc));
1185 : : }
1186 : :
1187 : 15 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1188 : : PUBLICATION_PART_ALL);
1189 : 15 : relids = list_concat_unique_oid(relids, schemarelids);
1190 : :
1191 : 15 : InvalidatePublicationRels(relids);
1192 : 15 : }
1193 : :
1194 : 18 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1195 : 18 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1196 : 18 : (Node *) stmt);
1197 : :
1198 [ + - ]: 18 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1199 : 18 : }
1200 : :
1201 : : /*
1202 : : * Invalidate the relations.
1203 : : */
1204 : : void
1205 : 325 : InvalidatePublicationRels(List *relids)
1206 : : {
1207 : : /*
1208 : : * We don't want to send too many individual messages, at some point it's
1209 : : * cheaper to just reset whole relcache.
1210 : : */
1211 [ + - ]: 325 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1212 : : {
1213 : 325 : ListCell *lc;
1214 : :
1215 [ + + + + : 3246 : foreach(lc, relids)
+ + ]
1216 : 2921 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1217 : 325 : }
1218 : : else
1219 : 0 : CacheInvalidateRelcacheAll();
1220 : 325 : }
1221 : :
1222 : : /*
1223 : : * Add or remove table to/from publication.
1224 : : */
1225 : : static void
1226 : 130 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1227 : : List *tables, const char *queryString,
1228 : : bool publish_schema)
1229 : : {
1230 : 130 : List *rels = NIL;
1231 : 130 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1232 : 130 : Oid pubid = pubform->oid;
1233 : :
1234 : : /*
1235 : : * Nothing to do if no objects, except in SET: for that it is quite
1236 : : * possible that user has not specified any tables in which case we need
1237 : : * to remove all the existing tables.
1238 : : */
1239 [ + + + + ]: 130 : if (!tables && stmt->action != AP_SetObjects)
1240 : 10 : return;
1241 : :
1242 : 120 : rels = OpenTableList(tables);
1243 : :
1244 [ + + ]: 120 : if (stmt->action == AP_AddObjects)
1245 : : {
1246 : 38 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1247 : :
1248 : 38 : publish_schema |= is_schema_publication(pubid);
1249 : :
1250 : 76 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1251 : 38 : pubform->pubviaroot);
1252 : :
1253 : 38 : PublicationAddTables(pubid, rels, false, stmt);
1254 : 38 : }
1255 [ + + ]: 82 : else if (stmt->action == AP_DropObjects)
1256 : 16 : PublicationDropTables(pubid, rels, false);
1257 : : else /* AP_SetObjects */
1258 : : {
1259 : 66 : List *oldrelids = GetPublicationRelations(pubid,
1260 : : PUBLICATION_PART_ROOT);
1261 : 66 : List *delrels = NIL;
1262 : 66 : ListCell *oldlc;
1263 : :
1264 : 66 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1265 : :
1266 : 132 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1267 : 66 : pubform->pubviaroot);
1268 : :
1269 : : /*
1270 : : * To recreate the relation list for the publication, look for
1271 : : * existing relations that do not need to be dropped.
1272 : : */
1273 [ + + + + : 131 : foreach(oldlc, oldrelids)
+ + ]
1274 : : {
1275 : 65 : Oid oldrelid = lfirst_oid(oldlc);
1276 : 65 : ListCell *newlc;
1277 : 65 : PublicationRelInfo *oldrel;
1278 : 65 : bool found = false;
1279 : 65 : HeapTuple rftuple;
1280 : 65 : Node *oldrelwhereclause = NULL;
1281 : 65 : Bitmapset *oldcolumns = NULL;
1282 : :
1283 : : /* look up the cache for the old relmap */
1284 : 65 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1285 : 65 : ObjectIdGetDatum(oldrelid),
1286 : 65 : ObjectIdGetDatum(pubid));
1287 : :
1288 : : /*
1289 : : * See if the existing relation currently has a WHERE clause or a
1290 : : * column list. We need to compare those too.
1291 : : */
1292 [ - + ]: 65 : if (HeapTupleIsValid(rftuple))
1293 : : {
1294 : 65 : bool isnull = true;
1295 : 65 : Datum whereClauseDatum;
1296 : 65 : Datum columnListDatum;
1297 : :
1298 : : /* Load the WHERE clause for this table. */
1299 : 65 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1300 : : Anum_pg_publication_rel_prqual,
1301 : : &isnull);
1302 [ + + ]: 65 : if (!isnull)
1303 : 34 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1304 : :
1305 : : /* Transform the int2vector column list to a bitmap. */
1306 : 65 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1307 : : Anum_pg_publication_rel_prattrs,
1308 : : &isnull);
1309 : :
1310 [ + + ]: 65 : if (!isnull)
1311 : 22 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1312 : :
1313 : 65 : ReleaseSysCache(rftuple);
1314 : 65 : }
1315 : :
1316 [ + + + + : 129 : foreach(newlc, rels)
+ + ]
1317 : : {
1318 : 64 : PublicationRelInfo *newpubrel;
1319 : 64 : Oid newrelid;
1320 : 64 : Bitmapset *newcolumns = NULL;
1321 : :
1322 : 64 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1323 : 64 : newrelid = RelationGetRelid(newpubrel->relation);
1324 : :
1325 : : /*
1326 : : * Validate the column list. If the column list or WHERE
1327 : : * clause changes, then the validation done here will be
1328 : : * duplicated inside PublicationAddTables(). The validation
1329 : : * is cheap enough that that seems harmless.
1330 : : */
1331 : 128 : newcolumns = pub_collist_validate(newpubrel->relation,
1332 : 64 : newpubrel->columns);
1333 : :
1334 : : /*
1335 : : * Check if any of the new set of relations matches with the
1336 : : * existing relations in the publication. Additionally, if the
1337 : : * relation has an associated WHERE clause, check the WHERE
1338 : : * expressions also match. Same for the column list. Drop the
1339 : : * rest.
1340 : : */
1341 [ + + ]: 64 : if (newrelid == oldrelid)
1342 : : {
1343 [ + + + + ]: 32 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1344 : 12 : bms_equal(oldcolumns, newcolumns))
1345 : : {
1346 : 2 : found = true;
1347 : 2 : break;
1348 : : }
1349 : 30 : }
1350 [ + + ]: 64 : }
1351 : :
1352 : : /*
1353 : : * Add the non-matched relations to a list so that they can be
1354 : : * dropped.
1355 : : */
1356 [ + + ]: 65 : if (!found)
1357 : : {
1358 : 61 : oldrel = palloc_object(PublicationRelInfo);
1359 : 61 : oldrel->whereClause = NULL;
1360 : 61 : oldrel->columns = NIL;
1361 : 61 : oldrel->relation = table_open(oldrelid,
1362 : : ShareUpdateExclusiveLock);
1363 : 61 : delrels = lappend(delrels, oldrel);
1364 : 61 : }
1365 : 65 : }
1366 : :
1367 : : /* And drop them. */
1368 : 66 : PublicationDropTables(pubid, delrels, true);
1369 : :
1370 : : /*
1371 : : * Don't bother calculating the difference for adding, we'll catch and
1372 : : * skip existing ones when doing catalog update.
1373 : : */
1374 : 66 : PublicationAddTables(pubid, rels, true, stmt);
1375 : :
1376 : 66 : CloseTableList(delrels);
1377 : 66 : }
1378 : :
1379 : 120 : CloseTableList(rels);
1380 [ - + ]: 130 : }
1381 : :
1382 : : /*
1383 : : * Alter the publication schemas.
1384 : : *
1385 : : * Add or remove schemas to/from publication.
1386 : : */
1387 : : static void
1388 : 115 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1389 : : HeapTuple tup, List *schemaidlist)
1390 : : {
1391 : 115 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1392 : :
1393 : : /*
1394 : : * Nothing to do if no objects, except in SET: for that it is quite
1395 : : * possible that user has not specified any schemas in which case we need
1396 : : * to remove all the existing schemas.
1397 : : */
1398 [ + + + + ]: 115 : if (!schemaidlist && stmt->action != AP_SetObjects)
1399 : 39 : return;
1400 : :
1401 : : /*
1402 : : * Schema lock is held until the publication is altered to prevent
1403 : : * concurrent schema deletion.
1404 : : */
1405 : 76 : LockSchemaList(schemaidlist);
1406 [ + + ]: 76 : if (stmt->action == AP_AddObjects)
1407 : : {
1408 : 4 : ListCell *lc;
1409 : 4 : List *reloids;
1410 : :
1411 : 4 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1412 : :
1413 [ + + + + : 5 : foreach(lc, reloids)
+ + ]
1414 : : {
1415 : 2 : HeapTuple coltuple;
1416 : :
1417 : 2 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1418 : 2 : ObjectIdGetDatum(lfirst_oid(lc)),
1419 : 2 : ObjectIdGetDatum(pubform->oid));
1420 : :
1421 [ + - ]: 2 : if (!HeapTupleIsValid(coltuple))
1422 : 0 : continue;
1423 : :
1424 : : /*
1425 : : * Disallow adding schema if column list is already part of the
1426 : : * publication. See CheckPubRelationColumnList.
1427 : : */
1428 [ + + ]: 2 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1429 [ + - + - ]: 1 : ereport(ERROR,
1430 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1431 : : errmsg("cannot add schema to publication \"%s\"",
1432 : : stmt->pubname),
1433 : : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1434 : :
1435 : 1 : ReleaseSysCache(coltuple);
1436 [ - + ]: 1 : }
1437 : :
1438 : 3 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1439 : 3 : }
1440 [ + + ]: 72 : else if (stmt->action == AP_DropObjects)
1441 : 6 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1442 : : else /* AP_SetObjects */
1443 : : {
1444 : 66 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1445 : 66 : List *delschemas = NIL;
1446 : :
1447 : : /* Identify which schemas should be dropped */
1448 : 66 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1449 : :
1450 : : /*
1451 : : * Schema lock is held until the publication is altered to prevent
1452 : : * concurrent schema deletion.
1453 : : */
1454 : 66 : LockSchemaList(delschemas);
1455 : :
1456 : : /* And drop them */
1457 : 66 : PublicationDropSchemas(pubform->oid, delschemas, true);
1458 : :
1459 : : /*
1460 : : * Don't bother calculating the difference for adding, we'll catch and
1461 : : * skip existing ones when doing catalog update.
1462 : : */
1463 : 66 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1464 : 66 : }
1465 : 114 : }
1466 : :
1467 : : /*
1468 : : * Check if relations and schemas can be in a given publication and throw
1469 : : * appropriate error if not.
1470 : : */
1471 : : static void
1472 : 144 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1473 : : List *tables, List *schemaidlist)
1474 : : {
1475 : 144 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1476 : :
1477 [ + + + + ]: 144 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1478 [ + + ]: 75 : schemaidlist && !superuser())
1479 [ + - + - ]: 1 : ereport(ERROR,
1480 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1481 : : errmsg("must be superuser to add or set schemas")));
1482 : :
1483 : : /*
1484 : : * Check that user is allowed to manipulate the publication tables in
1485 : : * schema
1486 : : */
1487 [ + + + + ]: 83 : if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
1488 : : {
1489 [ + - + - ]: 3 : if (pubform->puballtables && pubform->puballsequences)
1490 [ # # # # ]: 0 : ereport(ERROR,
1491 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1492 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1493 : : NameStr(pubform->pubname)),
1494 : : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1495 [ + - ]: 3 : else if (pubform->puballtables)
1496 [ + - + - ]: 3 : ereport(ERROR,
1497 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1498 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1499 : : NameStr(pubform->pubname)),
1500 : : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1501 : : else
1502 [ # # # # ]: 0 : ereport(ERROR,
1503 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1504 : : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1505 : : NameStr(pubform->pubname)),
1506 : : errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1507 : 0 : }
1508 : :
1509 : : /* Check that user is allowed to manipulate the publication tables. */
1510 [ + + + + ]: 80 : if (tables && (pubform->puballtables || pubform->puballsequences))
1511 : : {
1512 [ + - + - ]: 3 : if (pubform->puballtables && pubform->puballsequences)
1513 [ # # # # ]: 0 : ereport(ERROR,
1514 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1515 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1516 : : NameStr(pubform->pubname)),
1517 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1518 [ + - ]: 3 : else if (pubform->puballtables)
1519 [ + - + - ]: 3 : ereport(ERROR,
1520 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1521 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1522 : : NameStr(pubform->pubname)),
1523 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1524 : : else
1525 [ # # # # ]: 0 : ereport(ERROR,
1526 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1527 : : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1528 : : NameStr(pubform->pubname)),
1529 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1530 : 0 : }
1531 : 167 : }
1532 : :
1533 : : /*
1534 : : * Alter the existing publication.
1535 : : *
1536 : : * This is dispatcher function for AlterPublicationOptions,
1537 : : * AlterPublicationSchemas and AlterPublicationTables.
1538 : : */
1539 : : void
1540 : 157 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1541 : : {
1542 : 157 : Relation rel;
1543 : 157 : HeapTuple tup;
1544 : 157 : Form_pg_publication pubform;
1545 : :
1546 : 157 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1547 : :
1548 : 157 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1549 : : CStringGetDatum(stmt->pubname));
1550 : :
1551 [ + - ]: 157 : if (!HeapTupleIsValid(tup))
1552 [ # # # # ]: 0 : ereport(ERROR,
1553 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1554 : : errmsg("publication \"%s\" does not exist",
1555 : : stmt->pubname)));
1556 : :
1557 : 157 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1558 : :
1559 : : /* must be owner */
1560 [ + - ]: 157 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1561 : 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1562 : 0 : stmt->pubname);
1563 : :
1564 [ + + ]: 157 : if (stmt->options)
1565 : 20 : AlterPublicationOptions(pstate, stmt, rel, tup);
1566 : : else
1567 : : {
1568 : 137 : List *relations = NIL;
1569 : 137 : List *schemaidlist = NIL;
1570 : 137 : Oid pubid = pubform->oid;
1571 : :
1572 : 137 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1573 : : &schemaidlist);
1574 : :
1575 : 137 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1576 : :
1577 : 137 : heap_freetuple(tup);
1578 : :
1579 : : /* Lock the publication so nobody else can do anything with it. */
1580 : 137 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1581 : : AccessExclusiveLock);
1582 : :
1583 : : /*
1584 : : * It is possible that by the time we acquire the lock on publication,
1585 : : * concurrent DDL has removed it. We can test this by checking the
1586 : : * existence of publication. We get the tuple again to avoid the risk
1587 : : * of any publication option getting changed.
1588 : : */
1589 : 137 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1590 [ + - ]: 137 : if (!HeapTupleIsValid(tup))
1591 [ # # # # ]: 0 : ereport(ERROR,
1592 : : errcode(ERRCODE_UNDEFINED_OBJECT),
1593 : : errmsg("publication \"%s\" does not exist",
1594 : : stmt->pubname));
1595 : :
1596 : 274 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1597 : 137 : schemaidlist != NIL);
1598 : 137 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1599 : 137 : }
1600 : :
1601 : : /* Cleanup. */
1602 : 157 : heap_freetuple(tup);
1603 : 157 : table_close(rel, RowExclusiveLock);
1604 : 157 : }
1605 : :
1606 : : /*
1607 : : * Remove relation from publication by mapping OID.
1608 : : */
1609 : : void
1610 : 119 : RemovePublicationRelById(Oid proid)
1611 : : {
1612 : 119 : Relation rel;
1613 : 119 : HeapTuple tup;
1614 : 119 : Form_pg_publication_rel pubrel;
1615 : 119 : List *relids = NIL;
1616 : :
1617 : 119 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1618 : :
1619 : 119 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1620 : :
1621 [ + - ]: 119 : if (!HeapTupleIsValid(tup))
1622 [ # # # # ]: 0 : elog(ERROR, "cache lookup failed for publication table %u",
1623 : : proid);
1624 : :
1625 : 119 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1626 : :
1627 : : /*
1628 : : * Invalidate relcache so that publication info is rebuilt.
1629 : : *
1630 : : * For the partitioned tables, we must invalidate all partitions contained
1631 : : * in the respective partition hierarchies, not just the one explicitly
1632 : : * mentioned in the publication. This is required because we implicitly
1633 : : * publish the child tables when the parent table is published.
1634 : : */
1635 : 238 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1636 : 119 : pubrel->prrelid);
1637 : :
1638 : 119 : InvalidatePublicationRels(relids);
1639 : :
1640 : 119 : CatalogTupleDelete(rel, &tup->t_self);
1641 : :
1642 : 119 : ReleaseSysCache(tup);
1643 : :
1644 : 119 : table_close(rel, RowExclusiveLock);
1645 : 119 : }
1646 : :
1647 : : /*
1648 : : * Remove the publication by mapping OID.
1649 : : */
1650 : : void
1651 : 63 : RemovePublicationById(Oid pubid)
1652 : : {
1653 : 63 : Relation rel;
1654 : 63 : HeapTuple tup;
1655 : 63 : Form_pg_publication pubform;
1656 : :
1657 : 63 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1658 : :
1659 : 63 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1660 [ + - ]: 63 : if (!HeapTupleIsValid(tup))
1661 [ # # # # ]: 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1662 : :
1663 : 63 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1664 : :
1665 : : /* Invalidate relcache so that publication info is rebuilt. */
1666 [ + + ]: 63 : if (pubform->puballtables)
1667 : 7 : CacheInvalidateRelcacheAll();
1668 : :
1669 : 63 : CatalogTupleDelete(rel, &tup->t_self);
1670 : :
1671 : 63 : ReleaseSysCache(tup);
1672 : :
1673 : 63 : table_close(rel, RowExclusiveLock);
1674 : 63 : }
1675 : :
1676 : : /*
1677 : : * Remove schema from publication by mapping OID.
1678 : : */
1679 : : void
1680 : 31 : RemovePublicationSchemaById(Oid psoid)
1681 : : {
1682 : 31 : Relation rel;
1683 : 31 : HeapTuple tup;
1684 : 31 : List *schemaRels = NIL;
1685 : 31 : Form_pg_publication_namespace pubsch;
1686 : :
1687 : 31 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1688 : :
1689 : 31 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1690 : :
1691 [ + - ]: 31 : if (!HeapTupleIsValid(tup))
1692 [ # # # # ]: 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1693 : :
1694 : 31 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1695 : :
1696 : : /*
1697 : : * Invalidate relcache so that publication info is rebuilt. See
1698 : : * RemovePublicationRelById for why we need to consider all the
1699 : : * partitions.
1700 : : */
1701 : 31 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1702 : : PUBLICATION_PART_ALL);
1703 : 31 : InvalidatePublicationRels(schemaRels);
1704 : :
1705 : 31 : CatalogTupleDelete(rel, &tup->t_self);
1706 : :
1707 : 31 : ReleaseSysCache(tup);
1708 : :
1709 : 31 : table_close(rel, RowExclusiveLock);
1710 : 31 : }
1711 : :
1712 : : /*
1713 : : * Open relations specified by a PublicationTable list.
1714 : : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1715 : : * add them to a publication.
1716 : : */
1717 : : static List *
1718 : 172 : OpenTableList(List *tables)
1719 : : {
1720 : 172 : List *relids = NIL;
1721 : 172 : List *rels = NIL;
1722 : 172 : ListCell *lc;
1723 : 172 : List *relids_with_rf = NIL;
1724 : 172 : List *relids_with_collist = NIL;
1725 : :
1726 : : /*
1727 : : * Open, share-lock, and check all the explicitly-specified relations
1728 : : */
1729 [ + + + + : 346 : foreach(lc, tables)
+ + ]
1730 : : {
1731 : 178 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1732 : 178 : bool recurse = t->relation->inh;
1733 : 178 : Relation rel;
1734 : 178 : Oid myrelid;
1735 : 178 : PublicationRelInfo *pub_rel;
1736 : :
1737 : : /* Allow query cancel in case this takes a long time */
1738 [ + - ]: 178 : CHECK_FOR_INTERRUPTS();
1739 : :
1740 : 178 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1741 : 178 : myrelid = RelationGetRelid(rel);
1742 : :
1743 : : /*
1744 : : * Filter out duplicates if user specifies "foo, foo".
1745 : : *
1746 : : * Note that this algorithm is known to not be very efficient (O(N^2))
1747 : : * but given that it only works on list of tables given to us by user
1748 : : * it's deemed acceptable.
1749 : : */
1750 [ + + ]: 178 : if (list_member_oid(relids, myrelid))
1751 : : {
1752 : : /* Disallow duplicate tables if there are any with row filters. */
1753 [ + + ]: 4 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1754 [ + - + - ]: 2 : ereport(ERROR,
1755 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1756 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1757 : : RelationGetRelationName(rel))));
1758 : :
1759 : : /* Disallow duplicate tables if there are any with column lists. */
1760 [ - + ]: 2 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1761 [ + - + - ]: 2 : ereport(ERROR,
1762 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1763 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1764 : : RelationGetRelationName(rel))));
1765 : :
1766 : 0 : table_close(rel, ShareUpdateExclusiveLock);
1767 : 0 : continue;
1768 : : }
1769 : :
1770 : 174 : pub_rel = palloc_object(PublicationRelInfo);
1771 : 174 : pub_rel->relation = rel;
1772 : 174 : pub_rel->whereClause = t->whereClause;
1773 : 174 : pub_rel->columns = t->columns;
1774 : 174 : rels = lappend(rels, pub_rel);
1775 : 174 : relids = lappend_oid(relids, myrelid);
1776 : :
1777 [ + + ]: 174 : if (t->whereClause)
1778 : 58 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1779 : :
1780 [ + + ]: 174 : if (t->columns)
1781 : 55 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1782 : :
1783 : : /*
1784 : : * Add children of this rel, if requested, so that they too are added
1785 : : * to the publication. A partitioned table can't have any inheritance
1786 : : * children other than its partitions, which need not be explicitly
1787 : : * added to the publication.
1788 : : */
1789 [ + + + + ]: 174 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1790 : : {
1791 : 145 : List *children;
1792 : 145 : ListCell *child;
1793 : :
1794 : 145 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1795 : : NULL);
1796 : :
1797 [ + - + + : 291 : foreach(child, children)
+ + ]
1798 : : {
1799 : 146 : Oid childrelid = lfirst_oid(child);
1800 : :
1801 : : /* Allow query cancel in case this takes a long time */
1802 [ + - ]: 146 : CHECK_FOR_INTERRUPTS();
1803 : :
1804 : : /*
1805 : : * Skip duplicates if user specified both parent and child
1806 : : * tables.
1807 : : */
1808 [ + + ]: 146 : if (list_member_oid(relids, childrelid))
1809 : : {
1810 : : /*
1811 : : * We don't allow to specify row filter for both parent
1812 : : * and child table at the same time as it is not very
1813 : : * clear which one should be given preference.
1814 : : */
1815 [ + - ]: 145 : if (childrelid != myrelid &&
1816 [ # # ]: 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1817 [ # # # # ]: 0 : ereport(ERROR,
1818 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1819 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1820 : : RelationGetRelationName(rel))));
1821 : :
1822 : : /*
1823 : : * We don't allow to specify column list for both parent
1824 : : * and child table at the same time as it is not very
1825 : : * clear which one should be given preference.
1826 : : */
1827 [ + - ]: 145 : if (childrelid != myrelid &&
1828 [ # # ]: 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1829 [ # # # # ]: 0 : ereport(ERROR,
1830 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1831 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1832 : : RelationGetRelationName(rel))));
1833 : :
1834 : 145 : continue;
1835 : : }
1836 : :
1837 : : /* find_all_inheritors already got lock */
1838 : 1 : rel = table_open(childrelid, NoLock);
1839 : 1 : pub_rel = palloc_object(PublicationRelInfo);
1840 : 1 : pub_rel->relation = rel;
1841 : : /* child inherits WHERE clause from parent */
1842 : 1 : pub_rel->whereClause = t->whereClause;
1843 : :
1844 : : /* child inherits column list from parent */
1845 : 1 : pub_rel->columns = t->columns;
1846 : 1 : rels = lappend(rels, pub_rel);
1847 : 1 : relids = lappend_oid(relids, childrelid);
1848 : :
1849 [ + - ]: 1 : if (t->whereClause)
1850 : 0 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1851 : :
1852 [ + - ]: 1 : if (t->columns)
1853 : 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1854 [ + + ]: 146 : }
1855 : 145 : }
1856 [ - + ]: 174 : }
1857 : :
1858 : 168 : list_free(relids);
1859 : 168 : list_free(relids_with_rf);
1860 : :
1861 : 336 : return rels;
1862 : 168 : }
1863 : :
1864 : : /*
1865 : : * Close all relations in the list.
1866 : : */
1867 : : static void
1868 : 202 : CloseTableList(List *rels)
1869 : : {
1870 : 202 : ListCell *lc;
1871 : :
1872 [ + + + + : 403 : foreach(lc, rels)
+ + ]
1873 : : {
1874 : 201 : PublicationRelInfo *pub_rel;
1875 : :
1876 : 201 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1877 : 201 : table_close(pub_rel->relation, NoLock);
1878 : 201 : }
1879 : :
1880 : 202 : list_free_deep(rels);
1881 : 202 : }
1882 : :
1883 : : /*
1884 : : * Lock the schemas specified in the schema list in AccessShareLock mode in
1885 : : * order to prevent concurrent schema deletion.
1886 : : */
1887 : : static void
1888 : 166 : LockSchemaList(List *schemalist)
1889 : : {
1890 : 166 : ListCell *lc;
1891 : :
1892 [ + + + + : 216 : foreach(lc, schemalist)
+ + ]
1893 : : {
1894 : 50 : Oid schemaid = lfirst_oid(lc);
1895 : :
1896 : : /* Allow query cancel in case this takes a long time */
1897 [ + - ]: 50 : CHECK_FOR_INTERRUPTS();
1898 : 50 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1899 : :
1900 : : /*
1901 : : * It is possible that by the time we acquire the lock on schema,
1902 : : * concurrent DDL has removed it. We can test this by checking the
1903 : : * existence of schema.
1904 : : */
1905 [ + - ]: 50 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1906 [ # # # # ]: 0 : ereport(ERROR,
1907 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
1908 : : errmsg("schema with OID %u does not exist", schemaid));
1909 : 50 : }
1910 : 166 : }
1911 : :
1912 : : /*
1913 : : * Add listed tables to the publication.
1914 : : */
1915 : : static void
1916 : 123 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1917 : : AlterPublicationStmt *stmt)
1918 : : {
1919 : 123 : ListCell *lc;
1920 : :
1921 [ + + + + : 260 : foreach(lc, rels)
+ + ]
1922 : : {
1923 : 137 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1924 : 137 : Relation rel = pub_rel->relation;
1925 : 137 : ObjectAddress obj;
1926 : :
1927 : : /* Must be owner of the table or superuser. */
1928 [ + + ]: 137 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1929 : 2 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1930 : 1 : RelationGetRelationName(rel));
1931 : :
1932 : 137 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1933 [ + + ]: 137 : if (stmt)
1934 : : {
1935 : 88 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1936 : 88 : (Node *) stmt);
1937 : :
1938 [ + - ]: 88 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1939 : : obj.objectId, 0);
1940 : 88 : }
1941 : 137 : }
1942 : 123 : }
1943 : :
1944 : : /*
1945 : : * Remove listed tables from the publication.
1946 : : */
1947 : : static void
1948 : 82 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1949 : : {
1950 : 82 : ObjectAddress obj;
1951 : 82 : ListCell *lc;
1952 : 82 : Oid prid;
1953 : :
1954 [ + + + + : 157 : foreach(lc, rels)
+ + ]
1955 : : {
1956 : 78 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1957 : 78 : Relation rel = pubrel->relation;
1958 : 78 : Oid relid = RelationGetRelid(rel);
1959 : :
1960 [ + - ]: 78 : if (pubrel->columns)
1961 [ # # # # ]: 0 : ereport(ERROR,
1962 : : errcode(ERRCODE_SYNTAX_ERROR),
1963 : : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1964 : :
1965 : 78 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1966 : : ObjectIdGetDatum(relid),
1967 : : ObjectIdGetDatum(pubid));
1968 [ + + ]: 78 : if (!OidIsValid(prid))
1969 : : {
1970 [ - + ]: 2 : if (missing_ok)
1971 : 0 : continue;
1972 : :
1973 [ + - + - ]: 2 : ereport(ERROR,
1974 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1975 : : errmsg("relation \"%s\" is not part of the publication",
1976 : : RelationGetRelationName(rel))));
1977 : 0 : }
1978 : :
1979 [ + + ]: 76 : if (pubrel->whereClause)
1980 [ + - + - ]: 1 : ereport(ERROR,
1981 : : (errcode(ERRCODE_SYNTAX_ERROR),
1982 : : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1983 : :
1984 : 75 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1985 : 75 : performDeletion(&obj, DROP_CASCADE, 0);
1986 [ - - + ]: 75 : }
1987 : 79 : }
1988 : :
1989 : : /*
1990 : : * Add listed schemas to the publication.
1991 : : */
1992 : : static void
1993 : 91 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1994 : : AlterPublicationStmt *stmt)
1995 : : {
1996 : 91 : ListCell *lc;
1997 : :
1998 [ + + + + : 131 : foreach(lc, schemas)
+ + ]
1999 : : {
2000 : 40 : Oid schemaid = lfirst_oid(lc);
2001 : 40 : ObjectAddress obj;
2002 : :
2003 : 40 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
2004 [ + + ]: 40 : if (stmt)
2005 : : {
2006 : 9 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2007 : 9 : (Node *) stmt);
2008 : :
2009 [ + - ]: 9 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2010 : : obj.objectId, 0);
2011 : 9 : }
2012 : 40 : }
2013 : 91 : }
2014 : :
2015 : : /*
2016 : : * Remove listed schemas from the publication.
2017 : : */
2018 : : static void
2019 : 72 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2020 : : {
2021 : 72 : ObjectAddress obj;
2022 : 72 : ListCell *lc;
2023 : 72 : Oid psid;
2024 : :
2025 [ + + + + : 80 : foreach(lc, schemas)
+ + ]
2026 : : {
2027 : 9 : Oid schemaid = lfirst_oid(lc);
2028 : :
2029 : 9 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2030 : : Anum_pg_publication_namespace_oid,
2031 : : ObjectIdGetDatum(schemaid),
2032 : : ObjectIdGetDatum(pubid));
2033 [ + + ]: 9 : if (!OidIsValid(psid))
2034 : : {
2035 [ - + ]: 1 : if (missing_ok)
2036 : 0 : continue;
2037 : :
2038 [ + - + - ]: 1 : ereport(ERROR,
2039 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2040 : : errmsg("tables from schema \"%s\" are not part of the publication",
2041 : : get_namespace_name(schemaid))));
2042 : 0 : }
2043 : :
2044 : 8 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2045 : 8 : performDeletion(&obj, DROP_CASCADE, 0);
2046 [ - - + ]: 8 : }
2047 : 71 : }
2048 : :
2049 : : /*
2050 : : * Internal workhorse for changing a publication owner
2051 : : */
2052 : : static void
2053 : 4 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2054 : : {
2055 : 4 : Form_pg_publication form;
2056 : :
2057 : 4 : form = (Form_pg_publication) GETSTRUCT(tup);
2058 : :
2059 [ - + ]: 4 : if (form->pubowner == newOwnerId)
2060 : 0 : return;
2061 : :
2062 [ + + ]: 4 : if (!superuser())
2063 : : {
2064 : 2 : AclResult aclresult;
2065 : :
2066 : : /* Must be owner */
2067 [ + - ]: 2 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2068 : 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2069 : 0 : NameStr(form->pubname));
2070 : :
2071 : : /* Must be able to become new owner */
2072 : 2 : check_can_set_role(GetUserId(), newOwnerId);
2073 : :
2074 : : /* New owner must have CREATE privilege on database */
2075 : 2 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2076 [ + - ]: 2 : if (aclresult != ACLCHECK_OK)
2077 : 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2078 : 0 : get_database_name(MyDatabaseId));
2079 : :
2080 [ + + ]: 2 : if (!superuser_arg(newOwnerId))
2081 : : {
2082 [ - + ]: 1 : if (form->puballtables || form->puballsequences ||
2083 : 0 : is_schema_publication(form->oid))
2084 [ + - + - ]: 1 : ereport(ERROR,
2085 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2086 : : errmsg("permission denied to change owner of publication \"%s\"",
2087 : : NameStr(form->pubname)),
2088 : : errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2089 : 0 : }
2090 : 1 : }
2091 : :
2092 : 3 : form->pubowner = newOwnerId;
2093 : 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2094 : :
2095 : : /* Update owner dependency reference */
2096 : 3 : changeDependencyOnOwner(PublicationRelationId,
2097 : 3 : form->oid,
2098 : 3 : newOwnerId);
2099 : :
2100 [ + - ]: 3 : InvokeObjectPostAlterHook(PublicationRelationId,
2101 : : form->oid, 0);
2102 [ - + ]: 3 : }
2103 : :
2104 : : /*
2105 : : * Change publication owner -- by name
2106 : : */
2107 : : ObjectAddress
2108 : 3 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2109 : : {
2110 : 3 : Oid pubid;
2111 : 3 : HeapTuple tup;
2112 : 3 : Relation rel;
2113 : : ObjectAddress address;
2114 : 3 : Form_pg_publication pubform;
2115 : :
2116 : 3 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2117 : :
2118 : 3 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2119 : :
2120 [ + - ]: 3 : if (!HeapTupleIsValid(tup))
2121 [ # # # # ]: 0 : ereport(ERROR,
2122 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2123 : : errmsg("publication \"%s\" does not exist", name)));
2124 : :
2125 : 3 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2126 : 3 : pubid = pubform->oid;
2127 : :
2128 : 3 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2129 : :
2130 : 3 : ObjectAddressSet(address, PublicationRelationId, pubid);
2131 : :
2132 : 3 : heap_freetuple(tup);
2133 : :
2134 : 3 : table_close(rel, RowExclusiveLock);
2135 : :
2136 : : return address;
2137 : 3 : }
2138 : :
2139 : : /*
2140 : : * Change publication owner -- by OID
2141 : : */
2142 : : void
2143 : 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2144 : : {
2145 : 0 : HeapTuple tup;
2146 : 0 : Relation rel;
2147 : :
2148 : 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2149 : :
2150 : 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2151 : :
2152 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
2153 [ # # # # ]: 0 : ereport(ERROR,
2154 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2155 : : errmsg("publication with OID %u does not exist", pubid)));
2156 : :
2157 : 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2158 : :
2159 : 0 : heap_freetuple(tup);
2160 : :
2161 : 0 : table_close(rel, RowExclusiveLock);
2162 : 0 : }
2163 : :
2164 : : /*
2165 : : * Extract the publish_generated_columns option value from a DefElem. "stored"
2166 : : * and "none" values are accepted.
2167 : : */
2168 : : static char
2169 : 11 : defGetGeneratedColsOption(DefElem *def)
2170 : : {
2171 : 11 : char *sval = "";
2172 : :
2173 : : /*
2174 : : * A parameter value is required.
2175 : : */
2176 [ + + ]: 11 : if (def->arg)
2177 : : {
2178 : 10 : sval = defGetString(def);
2179 : :
2180 [ + + ]: 10 : if (pg_strcasecmp(sval, "none") == 0)
2181 : 3 : return PUBLISH_GENCOLS_NONE;
2182 [ + + ]: 7 : if (pg_strcasecmp(sval, "stored") == 0)
2183 : 6 : return PUBLISH_GENCOLS_STORED;
2184 : 1 : }
2185 : :
2186 [ + - + - ]: 2 : ereport(ERROR,
2187 : : errcode(ERRCODE_SYNTAX_ERROR),
2188 : : errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2189 : : errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2190 : :
2191 : 0 : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2192 : 9 : }
|