LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 92.3 % 904 834
Test Date: 2026-01-26 10:56:24 Functions: 96.9 % 32 31
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 69.3 % 709 491

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * publicationcmds.c
       4                 :             :  *              publication manipulation
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *              src/backend/commands/publicationcmds.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/htup_details.h"
      18                 :             : #include "access/table.h"
      19                 :             : #include "access/xact.h"
      20                 :             : #include "catalog/catalog.h"
      21                 :             : #include "catalog/indexing.h"
      22                 :             : #include "catalog/namespace.h"
      23                 :             : #include "catalog/objectaccess.h"
      24                 :             : #include "catalog/objectaddress.h"
      25                 :             : #include "catalog/pg_database.h"
      26                 :             : #include "catalog/pg_inherits.h"
      27                 :             : #include "catalog/pg_namespace.h"
      28                 :             : #include "catalog/pg_proc.h"
      29                 :             : #include "catalog/pg_publication.h"
      30                 :             : #include "catalog/pg_publication_namespace.h"
      31                 :             : #include "catalog/pg_publication_rel.h"
      32                 :             : #include "commands/defrem.h"
      33                 :             : #include "commands/event_trigger.h"
      34                 :             : #include "commands/publicationcmds.h"
      35                 :             : #include "miscadmin.h"
      36                 :             : #include "nodes/nodeFuncs.h"
      37                 :             : #include "parser/parse_clause.h"
      38                 :             : #include "parser/parse_collate.h"
      39                 :             : #include "parser/parse_relation.h"
      40                 :             : #include "rewrite/rewriteHandler.h"
      41                 :             : #include "storage/lmgr.h"
      42                 :             : #include "utils/acl.h"
      43                 :             : #include "utils/builtins.h"
      44                 :             : #include "utils/inval.h"
      45                 :             : #include "utils/lsyscache.h"
      46                 :             : #include "utils/rel.h"
      47                 :             : #include "utils/syscache.h"
      48                 :             : #include "utils/varlena.h"
      49                 :             : 
      50                 :             : 
      51                 :             : /*
      52                 :             :  * Information used to validate the columns in the row filter expression. See
      53                 :             :  * contain_invalid_rfcolumn_walker for details.
      54                 :             :  */
      55                 :             : typedef struct rf_context
      56                 :             : {
      57                 :             :         Bitmapset  *bms_replident;      /* bitset of replica identity columns */
      58                 :             :         bool            pubviaroot;             /* true if we are validating the parent
      59                 :             :                                                                  * relation's row filter */
      60                 :             :         Oid                     relid;                  /* relid of the relation */
      61                 :             :         Oid                     parentid;               /* relid of the parent relation */
      62                 :             : } rf_context;
      63                 :             : 
      64                 :             : static List *OpenTableList(List *tables);
      65                 :             : static void CloseTableList(List *rels);
      66                 :             : static void LockSchemaList(List *schemalist);
      67                 :             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68                 :             :                                                                  AlterPublicationStmt *stmt);
      69                 :             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70                 :             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71                 :             :                                                                   AlterPublicationStmt *stmt);
      72                 :             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73                 :             : static char defGetGeneratedColsOption(DefElem *def);
      74                 :             : 
      75                 :             : 
      76                 :             : static void
      77                 :         113 : parse_publication_options(ParseState *pstate,
      78                 :             :                                                   List *options,
      79                 :             :                                                   bool *publish_given,
      80                 :             :                                                   PublicationActions *pubactions,
      81                 :             :                                                   bool *publish_via_partition_root_given,
      82                 :             :                                                   bool *publish_via_partition_root,
      83                 :             :                                                   bool *publish_generated_columns_given,
      84                 :             :                                                   char *publish_generated_columns)
      85                 :             : {
      86                 :         113 :         ListCell   *lc;
      87                 :             : 
      88                 :         113 :         *publish_given = false;
      89                 :         113 :         *publish_via_partition_root_given = false;
      90                 :         113 :         *publish_generated_columns_given = false;
      91                 :             : 
      92                 :             :         /* defaults */
      93                 :         113 :         pubactions->pubinsert = true;
      94                 :         113 :         pubactions->pubupdate = true;
      95                 :         113 :         pubactions->pubdelete = true;
      96                 :         113 :         pubactions->pubtruncate = true;
      97                 :         113 :         *publish_via_partition_root = false;
      98                 :         113 :         *publish_generated_columns = PUBLISH_GENCOLS_NONE;
      99                 :             : 
     100                 :             :         /* Parse options */
     101   [ +  +  +  +  :         163 :         foreach(lc, options)
                   +  + ]
     102                 :             :         {
     103                 :          54 :                 DefElem    *defel = (DefElem *) lfirst(lc);
     104                 :             : 
     105         [ +  + ]:          54 :                 if (strcmp(defel->defname, "publish") == 0)
     106                 :             :                 {
     107                 :          19 :                         char       *publish;
     108                 :          19 :                         List       *publish_list;
     109                 :          19 :                         ListCell   *lc2;
     110                 :             : 
     111         [ +  - ]:          19 :                         if (*publish_given)
     112                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     113                 :             : 
     114                 :             :                         /*
     115                 :             :                          * If publish option was given only the explicitly listed actions
     116                 :             :                          * should be published.
     117                 :             :                          */
     118                 :          19 :                         pubactions->pubinsert = false;
     119                 :          19 :                         pubactions->pubupdate = false;
     120                 :          19 :                         pubactions->pubdelete = false;
     121                 :          19 :                         pubactions->pubtruncate = false;
     122                 :             : 
     123                 :          19 :                         *publish_given = true;
     124                 :             : 
     125                 :             :                         /*
     126                 :             :                          * SplitIdentifierString destructively modifies its input, so make
     127                 :             :                          * a copy so we don't modify the memory of the executing statement
     128                 :             :                          */
     129                 :          19 :                         publish = pstrdup(defGetString(defel));
     130                 :             : 
     131         [ +  - ]:          19 :                         if (!SplitIdentifierString(publish, ',', &publish_list))
     132   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
     133                 :             :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     134                 :             :                                                  errmsg("invalid list syntax in parameter \"%s\"",
     135                 :             :                                                                 "publish")));
     136                 :             : 
     137                 :             :                         /* Process the option list. */
     138   [ +  -  +  +  :          41 :                         foreach(lc2, publish_list)
                   +  + ]
     139                 :             :                         {
     140                 :          23 :                                 char       *publish_opt = (char *) lfirst(lc2);
     141                 :             : 
     142         [ +  + ]:          23 :                                 if (strcmp(publish_opt, "insert") == 0)
     143                 :          17 :                                         pubactions->pubinsert = true;
     144         [ +  + ]:           6 :                                 else if (strcmp(publish_opt, "update") == 0)
     145                 :           3 :                                         pubactions->pubupdate = true;
     146         [ +  + ]:           3 :                                 else if (strcmp(publish_opt, "delete") == 0)
     147                 :           1 :                                         pubactions->pubdelete = true;
     148         [ +  + ]:           2 :                                 else if (strcmp(publish_opt, "truncate") == 0)
     149                 :           1 :                                         pubactions->pubtruncate = true;
     150                 :             :                                 else
     151   [ +  -  +  - ]:           1 :                                         ereport(ERROR,
     152                 :             :                                                         (errcode(ERRCODE_SYNTAX_ERROR),
     153                 :             :                                                          errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     154                 :             :                                                                         "publish", publish_opt)));
     155                 :          22 :                         }
     156                 :          18 :                 }
     157         [ +  + ]:          35 :                 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     158                 :             :                 {
     159         [ +  + ]:          22 :                         if (*publish_via_partition_root_given)
     160                 :           1 :                                 errorConflictingDefElem(defel, pstate);
     161                 :          21 :                         *publish_via_partition_root_given = true;
     162                 :          21 :                         *publish_via_partition_root = defGetBoolean(defel);
     163                 :          21 :                 }
     164         [ +  + ]:          13 :                 else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     165                 :             :                 {
     166         [ +  + ]:          12 :                         if (*publish_generated_columns_given)
     167                 :           1 :                                 errorConflictingDefElem(defel, pstate);
     168                 :          11 :                         *publish_generated_columns_given = true;
     169                 :          11 :                         *publish_generated_columns = defGetGeneratedColsOption(defel);
     170                 :          11 :                 }
     171                 :             :                 else
     172   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     173                 :             :                                         (errcode(ERRCODE_SYNTAX_ERROR),
     174                 :             :                                          errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     175                 :          50 :         }
     176                 :         109 : }
     177                 :             : 
     178                 :             : /*
     179                 :             :  * Convert the PublicationObjSpecType list into schema oid list and
     180                 :             :  * PublicationTable list.
     181                 :             :  */
     182                 :             : static void
     183                 :         220 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     184                 :             :                                                    List **rels, List **schemas)
     185                 :             : {
     186                 :         220 :         ListCell   *cell;
     187                 :         220 :         PublicationObjSpec *pubobj;
     188                 :             : 
     189         [ +  + ]:         220 :         if (!pubobjspec_list)
     190                 :          10 :                 return;
     191                 :             : 
     192   [ +  -  +  +  :         453 :         foreach(cell, pubobjspec_list)
                   +  + ]
     193                 :             :         {
     194                 :         244 :                 Oid                     schemaid;
     195                 :         244 :                 List       *search_path;
     196                 :             : 
     197                 :         244 :                 pubobj = (PublicationObjSpec *) lfirst(cell);
     198                 :             : 
     199   [ +  -  +  + ]:         244 :                 switch (pubobj->pubobjtype)
     200                 :             :                 {
     201                 :             :                         case PUBLICATIONOBJ_TABLE:
     202                 :         182 :                                 *rels = lappend(*rels, pubobj->pubtable);
     203                 :         182 :                                 break;
     204                 :             :                         case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     205                 :          58 :                                 schemaid = get_namespace_oid(pubobj->name, false);
     206                 :             : 
     207                 :             :                                 /* Filter out duplicates if user specifies "sch1, sch1" */
     208                 :          58 :                                 *schemas = list_append_unique_oid(*schemas, schemaid);
     209                 :          58 :                                 break;
     210                 :             :                         case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     211                 :           4 :                                 search_path = fetch_search_path(false);
     212         [ +  + ]:           4 :                                 if (search_path == NIL) /* nothing valid in search_path? */
     213   [ +  -  +  - ]:           1 :                                         ereport(ERROR,
     214                 :             :                                                         errcode(ERRCODE_UNDEFINED_SCHEMA),
     215                 :             :                                                         errmsg("no schema has been selected for CURRENT_SCHEMA"));
     216                 :             : 
     217                 :           3 :                                 schemaid = linitial_oid(search_path);
     218                 :           3 :                                 list_free(search_path);
     219                 :             : 
     220                 :             :                                 /* Filter out duplicates if user specifies "sch1, sch1" */
     221                 :           3 :                                 *schemas = list_append_unique_oid(*schemas, schemaid);
     222                 :           3 :                                 break;
     223                 :             :                         default:
     224                 :             :                                 /* shouldn't happen */
     225   [ #  #  #  # ]:           0 :                                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     226                 :           0 :                                 break;
     227                 :             :                 }
     228                 :         243 :         }
     229         [ -  + ]:         219 : }
     230                 :             : 
     231                 :             : /*
     232                 :             :  * Returns true if any of the columns used in the row filter WHERE expression is
     233                 :             :  * not part of REPLICA IDENTITY, false otherwise.
     234                 :             :  */
     235                 :             : static bool
     236                 :          38 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     237                 :             : {
     238         [ +  - ]:          38 :         if (node == NULL)
     239                 :           0 :                 return false;
     240                 :             : 
     241         [ +  + ]:          38 :         if (IsA(node, Var))
     242                 :             :         {
     243                 :          16 :                 Var                *var = (Var *) node;
     244                 :          16 :                 AttrNumber      attnum = var->varattno;
     245                 :             : 
     246                 :             :                 /*
     247                 :             :                  * If pubviaroot is true, we are validating the row filter of the
     248                 :             :                  * parent table, but the bitmap contains the replica identity
     249                 :             :                  * information of the child table. So, get the column number of the
     250                 :             :                  * child table as parent and child column order could be different.
     251                 :             :                  */
     252         [ +  + ]:          16 :                 if (context->pubviaroot)
     253                 :             :                 {
     254                 :           2 :                         char       *colname = get_attname(context->parentid, attnum, false);
     255                 :             : 
     256                 :           2 :                         attnum = get_attnum(context->relid, colname);
     257                 :           2 :                 }
     258                 :             : 
     259   [ +  +  +  + ]:          32 :                 if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     260                 :          16 :                                                    context->bms_replident))
     261                 :          10 :                         return true;
     262      [ -  +  + ]:          16 :         }
     263                 :             : 
     264                 :          28 :         return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     265                 :             :                                                                   context);
     266                 :          38 : }
     267                 :             : 
     268                 :             : /*
     269                 :             :  * Check if all columns referenced in the filter expression are part of the
     270                 :             :  * REPLICA IDENTITY index or not.
     271                 :             :  *
     272                 :             :  * Returns true if any invalid column is found.
     273                 :             :  */
     274                 :             : bool
     275                 :          69 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     276                 :             :                                                            bool pubviaroot)
     277                 :             : {
     278                 :          69 :         HeapTuple       rftuple;
     279                 :          69 :         Oid                     relid = RelationGetRelid(relation);
     280                 :          69 :         Oid                     publish_as_relid = RelationGetRelid(relation);
     281                 :          69 :         bool            result = false;
     282                 :          69 :         Datum           rfdatum;
     283                 :          69 :         bool            rfisnull;
     284                 :             : 
     285                 :             :         /*
     286                 :             :          * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     287                 :             :          * allowed in the row filter and we can skip the validation.
     288                 :             :          */
     289         [ +  + ]:          69 :         if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     290                 :          14 :                 return false;
     291                 :             : 
     292                 :             :         /*
     293                 :             :          * For a partition, if pubviaroot is true, find the topmost ancestor that
     294                 :             :          * is published via this publication as we need to use its row filter
     295                 :             :          * expression to filter the partition's changes.
     296                 :             :          *
     297                 :             :          * Note that even though the row filter used is for an ancestor, the
     298                 :             :          * REPLICA IDENTITY used will be for the actual child table.
     299                 :             :          */
     300   [ +  +  -  + ]:          55 :         if (pubviaroot && relation->rd_rel->relispartition)
     301                 :             :         {
     302                 :             :                 publish_as_relid
     303                 :          14 :                         = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     304                 :             : 
     305         [ +  - ]:          14 :                 if (!OidIsValid(publish_as_relid))
     306                 :           0 :                         publish_as_relid = relid;
     307                 :          14 :         }
     308                 :             : 
     309                 :          55 :         rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     310                 :          55 :                                                           ObjectIdGetDatum(publish_as_relid),
     311                 :          55 :                                                           ObjectIdGetDatum(pubid));
     312                 :             : 
     313         [ +  + ]:          55 :         if (!HeapTupleIsValid(rftuple))
     314                 :           6 :                 return false;
     315                 :             : 
     316                 :          49 :         rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     317                 :             :                                                           Anum_pg_publication_rel_prqual,
     318                 :             :                                                           &rfisnull);
     319                 :             : 
     320         [ +  + ]:          49 :         if (!rfisnull)
     321                 :             :         {
     322                 :          16 :                 rf_context      context = {0};
     323                 :          16 :                 Node       *rfnode;
     324                 :          16 :                 Bitmapset  *bms = NULL;
     325                 :             : 
     326                 :          16 :                 context.pubviaroot = pubviaroot;
     327                 :          16 :                 context.parentid = publish_as_relid;
     328                 :          16 :                 context.relid = relid;
     329                 :             : 
     330                 :             :                 /* Remember columns that are part of the REPLICA IDENTITY */
     331                 :          16 :                 bms = RelationGetIndexAttrBitmap(relation,
     332                 :             :                                                                                  INDEX_ATTR_BITMAP_IDENTITY_KEY);
     333                 :             : 
     334                 :          16 :                 context.bms_replident = bms;
     335                 :          16 :                 rfnode = stringToNode(TextDatumGetCString(rfdatum));
     336                 :          16 :                 result = contain_invalid_rfcolumn_walker(rfnode, &context);
     337                 :          16 :         }
     338                 :             : 
     339                 :          49 :         ReleaseSysCache(rftuple);
     340                 :             : 
     341                 :          49 :         return result;
     342                 :          69 : }
     343                 :             : 
     344                 :             : /*
     345                 :             :  * Check for invalid columns in the publication table definition.
     346                 :             :  *
     347                 :             :  * This function evaluates two conditions:
     348                 :             :  *
     349                 :             :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     350                 :             :  *    by the column list. If any column is missing, *invalid_column_list is set
     351                 :             :  *    to true.
     352                 :             :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     353                 :             :  *    are published, either by being explicitly named in the column list or, if
     354                 :             :  *    no column list is specified, by setting the option
     355                 :             :  *    publish_generated_columns to stored. If any unpublished
     356                 :             :  *    generated column is found, *invalid_gen_col is set to true.
     357                 :             :  *
     358                 :             :  * Returns true if any of the above conditions are not met.
     359                 :             :  */
     360                 :             : bool
     361                 :          74 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     362                 :             :                                                         bool pubviaroot, char pubgencols_type,
     363                 :             :                                                         bool *invalid_column_list,
     364                 :             :                                                         bool *invalid_gen_col)
     365                 :             : {
     366                 :          74 :         Oid                     relid = RelationGetRelid(relation);
     367                 :          74 :         Oid                     publish_as_relid = RelationGetRelid(relation);
     368                 :          74 :         Bitmapset  *idattrs;
     369                 :          74 :         Bitmapset  *columns = NULL;
     370                 :          74 :         TupleDesc       desc = RelationGetDescr(relation);
     371                 :          74 :         Publication *pub;
     372                 :          74 :         int                     x;
     373                 :             : 
     374                 :          74 :         *invalid_column_list = false;
     375                 :          74 :         *invalid_gen_col = false;
     376                 :             : 
     377                 :             :         /*
     378                 :             :          * For a partition, if pubviaroot is true, find the topmost ancestor that
     379                 :             :          * is published via this publication as we need to use its column list for
     380                 :             :          * the changes.
     381                 :             :          *
     382                 :             :          * Note that even though the column list used is for an ancestor, the
     383                 :             :          * REPLICA IDENTITY used will be for the actual child table.
     384                 :             :          */
     385   [ +  +  -  + ]:          74 :         if (pubviaroot && relation->rd_rel->relispartition)
     386                 :             :         {
     387                 :          17 :                 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     388                 :             : 
     389         [ +  - ]:          17 :                 if (!OidIsValid(publish_as_relid))
     390                 :           0 :                         publish_as_relid = relid;
     391                 :          17 :         }
     392                 :             : 
     393                 :             :         /* Fetch the column list */
     394                 :          74 :         pub = GetPublication(pubid);
     395                 :          74 :         check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     396                 :             : 
     397         [ +  + ]:          74 :         if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     398                 :             :         {
     399                 :             :                 /* With REPLICA IDENTITY FULL, no column list is allowed. */
     400                 :          14 :                 *invalid_column_list = (columns != NULL);
     401                 :             : 
     402                 :             :                 /*
     403                 :             :                  * As we don't allow a column list with REPLICA IDENTITY FULL, the
     404                 :             :                  * publish_generated_columns option must be set to stored if the table
     405                 :             :                  * has any stored generated columns.
     406                 :             :                  */
     407         [ +  + ]:          14 :                 if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
     408   [ +  +  +  + ]:          12 :                         relation->rd_att->constr &&
     409                 :           8 :                         relation->rd_att->constr->has_generated_stored)
     410                 :           1 :                         *invalid_gen_col = true;
     411                 :             : 
     412                 :             :                 /*
     413                 :             :                  * Virtual generated columns are currently not supported for logical
     414                 :             :                  * replication at all.
     415                 :             :                  */
     416   [ +  +  +  + ]:          14 :                 if (relation->rd_att->constr &&
     417                 :          10 :                         relation->rd_att->constr->has_generated_virtual)
     418                 :           2 :                         *invalid_gen_col = true;
     419                 :             : 
     420   [ +  +  +  - ]:          14 :                 if (*invalid_gen_col && *invalid_column_list)
     421                 :           0 :                         return true;
     422                 :          14 :         }
     423                 :             : 
     424                 :             :         /* Remember columns that are part of the REPLICA IDENTITY */
     425                 :          74 :         idattrs = RelationGetIndexAttrBitmap(relation,
     426                 :             :                                                                                  INDEX_ATTR_BITMAP_IDENTITY_KEY);
     427                 :             : 
     428                 :             :         /*
     429                 :             :          * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     430                 :             :          * (to handle system columns the usual way), while column list does not
     431                 :             :          * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     432                 :             :          * over the idattrs and check all of them are in the list.
     433                 :             :          */
     434                 :          74 :         x = -1;
     435         [ +  + ]:         123 :         while ((x = bms_next_member(idattrs, x)) >= 0)
     436                 :             :         {
     437                 :          49 :                 AttrNumber      attnum = (x + FirstLowInvalidHeapAttributeNumber);
     438                 :          49 :                 Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     439                 :             : 
     440         [ +  + ]:          49 :                 if (columns == NULL)
     441                 :             :                 {
     442                 :             :                         /*
     443                 :             :                          * The publish_generated_columns option must be set to stored if
     444                 :             :                          * the REPLICA IDENTITY contains any stored generated column.
     445                 :             :                          */
     446   [ +  +  -  + ]:          19 :                         if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
     447                 :             :                         {
     448                 :           1 :                                 *invalid_gen_col = true;
     449                 :           1 :                                 break;
     450                 :             :                         }
     451                 :             : 
     452                 :             :                         /*
     453                 :             :                          * The equivalent setting for virtual generated columns does not
     454                 :             :                          * exist yet.
     455                 :             :                          */
     456         [ -  + ]:          18 :                         if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     457                 :             :                         {
     458                 :           0 :                                 *invalid_gen_col = true;
     459                 :           0 :                                 break;
     460                 :             :                         }
     461                 :             : 
     462                 :             :                         /* Skip validating the column list since it is not defined */
     463                 :          18 :                         continue;
     464                 :             :                 }
     465                 :             : 
     466                 :             :                 /*
     467                 :             :                  * If pubviaroot is true, we are validating the column list of the
     468                 :             :                  * parent table, but the bitmap contains the replica identity
     469                 :             :                  * information of the child table. The parent/child attnums may not
     470                 :             :                  * match, so translate them to the parent - get the attname from the
     471                 :             :                  * child, and look it up in the parent.
     472                 :             :                  */
     473         [ +  + ]:          30 :                 if (pubviaroot)
     474                 :             :                 {
     475                 :             :                         /* attribute name in the child table */
     476                 :          11 :                         char       *colname = get_attname(relid, attnum, false);
     477                 :             : 
     478                 :             :                         /*
     479                 :             :                          * Determine the attnum for the attribute name in parent (we are
     480                 :             :                          * using the column list defined on the parent).
     481                 :             :                          */
     482                 :          11 :                         attnum = get_attnum(publish_as_relid, colname);
     483                 :          11 :                 }
     484                 :             : 
     485                 :             :                 /* replica identity column, not covered by the column list */
     486                 :          30 :                 *invalid_column_list |= !bms_is_member(attnum, columns);
     487                 :             : 
     488   [ +  +  +  - ]:          30 :                 if (*invalid_column_list && *invalid_gen_col)
     489                 :           0 :                         break;
     490      [ -  +  + ]:          49 :         }
     491                 :             : 
     492                 :          74 :         bms_free(columns);
     493                 :          74 :         bms_free(idattrs);
     494                 :             : 
     495         [ +  + ]:          74 :         return *invalid_column_list || *invalid_gen_col;
     496                 :          74 : }
     497                 :             : 
     498                 :             : /*
     499                 :             :  * Invalidate entries in the RelationSyncCache for relations included in the
     500                 :             :  * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
     501                 :             :  *
     502                 :             :  * If 'puballtables' is true, invalidate all cache entries.
     503                 :             :  */
     504                 :             : void
     505                 :           2 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
     506                 :             : {
     507         [ -  + ]:           2 :         if (puballtables)
     508                 :             :         {
     509                 :           0 :                 CacheInvalidateRelSyncAll();
     510                 :           0 :         }
     511                 :             :         else
     512                 :             :         {
     513                 :           2 :                 List       *relids = NIL;
     514                 :           2 :                 List       *schemarelids = NIL;
     515                 :             : 
     516                 :             :                 /*
     517                 :             :                  * For partitioned tables, we must invalidate all partitions and
     518                 :             :                  * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
     519                 :             :                  * a target. However, WAL records for TRUNCATE specify both a root and
     520                 :             :                  * its leaves.
     521                 :             :                  */
     522                 :           2 :                 relids = GetPublicationRelations(pubid,
     523                 :             :                                                                                  PUBLICATION_PART_ALL);
     524                 :           2 :                 schemarelids = GetAllSchemaPublicationRelations(pubid,
     525                 :             :                                                                                                                 PUBLICATION_PART_ALL);
     526                 :             : 
     527                 :           2 :                 relids = list_concat_unique_oid(relids, schemarelids);
     528                 :             : 
     529                 :             :                 /* Invalidate the relsyncache */
     530   [ +  +  -  +  :           4 :                 foreach_oid(relid, relids)
             #  #  -  + ]
     531                 :           2 :                         CacheInvalidateRelSync(relid);
     532                 :           2 :         }
     533                 :             : 
     534                 :           2 :         return;
     535                 :             : }
     536                 :             : 
     537                 :             : /* check_functions_in_node callback */
     538                 :             : static bool
     539                 :          60 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     540                 :             : {
     541         [ +  + ]:          60 :         return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     542                 :          59 :                         func_id >= FirstNormalObjectId);
     543                 :             : }
     544                 :             : 
     545                 :             : /*
     546                 :             :  * The row filter walker checks if the row filter expression is a "simple
     547                 :             :  * expression".
     548                 :             :  *
     549                 :             :  * It allows only simple or compound expressions such as:
     550                 :             :  * - (Var Op Const)
     551                 :             :  * - (Var Op Var)
     552                 :             :  * - (Var Op Const) AND/OR (Var Op Const)
     553                 :             :  * - etc
     554                 :             :  * (where Var is a column of the table this filter belongs to)
     555                 :             :  *
     556                 :             :  * The simple expression has the following restrictions:
     557                 :             :  * - User-defined operators are not allowed;
     558                 :             :  * - User-defined functions are not allowed;
     559                 :             :  * - User-defined types are not allowed;
     560                 :             :  * - User-defined collations are not allowed;
     561                 :             :  * - Non-immutable built-in functions are not allowed;
     562                 :             :  * - System columns are not allowed.
     563                 :             :  *
     564                 :             :  * NOTES
     565                 :             :  *
     566                 :             :  * We don't allow user-defined functions/operators/types/collations because
     567                 :             :  * (a) if a user drops a user-defined object used in a row filter expression or
     568                 :             :  * if there is any other error while using it, the logical decoding
     569                 :             :  * infrastructure won't be able to recover from such an error even if the
     570                 :             :  * object is recreated again because a historic snapshot is used to evaluate
     571                 :             :  * the row filter;
     572                 :             :  * (b) a user-defined function can be used to access tables that could have
     573                 :             :  * unpleasant results because a historic snapshot is used. That's why only
     574                 :             :  * immutable built-in functions are allowed in row filter expressions.
     575                 :             :  *
     576                 :             :  * We don't allow system columns because currently, we don't have that
     577                 :             :  * information in the tuple passed to downstream. Also, as we don't replicate
     578                 :             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     579                 :             :  * columns.
     580                 :             :  *
     581                 :             :  * We can allow other node types after more analysis and testing.
     582                 :             :  */
     583                 :             : static bool
     584                 :         203 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     585                 :             : {
     586                 :         203 :         char       *errdetail_msg = NULL;
     587                 :             : 
     588         [ +  + ]:         203 :         if (node == NULL)
     589                 :           1 :                 return false;
     590                 :             : 
     591   [ +  +  +  +  :         202 :         switch (nodeTag(node))
                   +  + ]
     592                 :             :         {
     593                 :             :                 case T_Var:
     594                 :             :                         /* System columns are not allowed. */
     595         [ +  + ]:          54 :                         if (((Var *) node)->varattno < InvalidAttrNumber)
     596                 :           1 :                                 errdetail_msg = _("System columns are not allowed.");
     597                 :          54 :                         break;
     598                 :             :                 case T_OpExpr:
     599                 :             :                 case T_DistinctExpr:
     600                 :             :                 case T_NullIfExpr:
     601                 :             :                         /* OK, except user-defined operators are not allowed. */
     602         [ +  + ]:          53 :                         if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     603                 :           1 :                                 errdetail_msg = _("User-defined operators are not allowed.");
     604                 :          53 :                         break;
     605                 :             :                 case T_ScalarArrayOpExpr:
     606                 :             :                         /* OK, except user-defined operators are not allowed. */
     607         [ +  - ]:           1 :                         if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     608                 :           0 :                                 errdetail_msg = _("User-defined operators are not allowed.");
     609                 :             : 
     610                 :             :                         /*
     611                 :             :                          * We don't need to check the hashfuncid and negfuncid of
     612                 :             :                          * ScalarArrayOpExpr as those functions are only built for a
     613                 :             :                          * subquery.
     614                 :             :                          */
     615                 :           1 :                         break;
     616                 :             :                 case T_RowCompareExpr:
     617                 :             :                         {
     618                 :           1 :                                 ListCell   *opid;
     619                 :             : 
     620                 :             :                                 /* OK, except user-defined operators are not allowed. */
     621   [ +  -  +  +  :           3 :                                 foreach(opid, ((RowCompareExpr *) node)->opnos)
                   +  + ]
     622                 :             :                                 {
     623         [ -  + ]:           2 :                                         if (lfirst_oid(opid) >= FirstNormalObjectId)
     624                 :             :                                         {
     625                 :           0 :                                                 errdetail_msg = _("User-defined operators are not allowed.");
     626                 :           0 :                                                 break;
     627                 :             :                                         }
     628                 :           2 :                                 }
     629                 :           1 :                         }
     630                 :           1 :                         break;
     631                 :             :                 case T_Const:
     632                 :             :                 case T_FuncExpr:
     633                 :             :                 case T_BoolExpr:
     634                 :             :                 case T_RelabelType:
     635                 :             :                 case T_CollateExpr:
     636                 :             :                 case T_CaseExpr:
     637                 :             :                 case T_CaseTestExpr:
     638                 :             :                 case T_ArrayExpr:
     639                 :             :                 case T_RowExpr:
     640                 :             :                 case T_CoalesceExpr:
     641                 :             :                 case T_MinMaxExpr:
     642                 :             :                 case T_XmlExpr:
     643                 :             :                 case T_NullTest:
     644                 :             :                 case T_BooleanTest:
     645                 :             :                 case T_List:
     646                 :             :                         /* OK, supported */
     647                 :          92 :                         break;
     648                 :             :                 default:
     649                 :           1 :                         errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     650                 :           1 :                         break;
     651                 :             :         }
     652                 :             : 
     653                 :             :         /*
     654                 :             :          * For all the supported nodes, if we haven't already found a problem,
     655                 :             :          * check the types, functions, and collations used in it.  We check List
     656                 :             :          * by walking through each element.
     657                 :             :          */
     658   [ +  +  +  + ]:         202 :         if (!errdetail_msg && !IsA(node, List))
     659                 :             :         {
     660         [ +  + ]:         190 :                 if (exprType(node) >= FirstNormalObjectId)
     661                 :           1 :                         errdetail_msg = _("User-defined types are not allowed.");
     662   [ +  +  +  + ]:         378 :                 else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     663                 :         189 :                                                                                  pstate))
     664                 :           2 :                         errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     665   [ +  -  +  + ]:         187 :                 else if (exprCollation(node) >= FirstNormalObjectId ||
     666                 :         187 :                                  exprInputCollation(node) >= FirstNormalObjectId)
     667                 :           1 :                         errdetail_msg = _("User-defined collations are not allowed.");
     668                 :         190 :         }
     669                 :             : 
     670                 :             :         /*
     671                 :             :          * If we found a problem in this node, throw error now. Otherwise keep
     672                 :             :          * going.
     673                 :             :          */
     674         [ +  + ]:         202 :         if (errdetail_msg)
     675   [ +  -  +  - ]:           7 :                 ereport(ERROR,
     676                 :             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     677                 :             :                                  errmsg("invalid publication WHERE expression"),
     678                 :             :                                  errdetail_internal("%s", errdetail_msg),
     679                 :             :                                  parser_errposition(pstate, exprLocation(node))));
     680                 :             : 
     681                 :         195 :         return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     682                 :             :                                                                   pstate);
     683                 :         196 : }
     684                 :             : 
     685                 :             : /*
     686                 :             :  * Check if the row filter expression is a "simple expression".
     687                 :             :  *
     688                 :             :  * See check_simple_rowfilter_expr_walker for details.
     689                 :             :  */
     690                 :             : static bool
     691                 :          53 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     692                 :             : {
     693                 :          53 :         return check_simple_rowfilter_expr_walker(node, pstate);
     694                 :             : }
     695                 :             : 
     696                 :             : /*
     697                 :             :  * Transform the publication WHERE expression for all the relations in the list,
     698                 :             :  * ensuring it is coerced to boolean and necessary collation information is
     699                 :             :  * added if required, and add a new nsitem/RTE for the associated relation to
     700                 :             :  * the ParseState's namespace list.
     701                 :             :  *
     702                 :             :  * Also check the publication row filter expression and throw an error if
     703                 :             :  * anything not permitted or unexpected is encountered.
     704                 :             :  */
     705                 :             : static void
     706                 :         142 : TransformPubWhereClauses(List *tables, const char *queryString,
     707                 :             :                                                  bool pubviaroot)
     708                 :             : {
     709                 :         142 :         ListCell   *lc;
     710                 :             : 
     711   [ +  +  +  +  :         295 :         foreach(lc, tables)
                   +  + ]
     712                 :             :         {
     713                 :         154 :                 ParseNamespaceItem *nsitem;
     714                 :         154 :                 Node       *whereclause = NULL;
     715                 :         154 :                 ParseState *pstate;
     716                 :         154 :                 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     717                 :             : 
     718         [ +  + ]:         154 :                 if (pri->whereClause == NULL)
     719                 :          98 :                         continue;
     720                 :             : 
     721                 :             :                 /*
     722                 :             :                  * If the publication doesn't publish changes via the root partitioned
     723                 :             :                  * table, the partition's row filter will be used. So disallow using
     724                 :             :                  * WHERE clause on partitioned table in this case.
     725                 :             :                  */
     726   [ +  +  +  + ]:          56 :                 if (!pubviaroot &&
     727                 :          53 :                         pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     728   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     729                 :             :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     730                 :             :                                          errmsg("cannot use publication WHERE clause for relation \"%s\"",
     731                 :             :                                                         RelationGetRelationName(pri->relation)),
     732                 :             :                                          errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     733                 :             :                                                            "publish_via_partition_root")));
     734                 :             : 
     735                 :             :                 /*
     736                 :             :                  * A fresh pstate is required so that we only have "this" table in its
     737                 :             :                  * rangetable
     738                 :             :                  */
     739                 :          55 :                 pstate = make_parsestate(NULL);
     740                 :          55 :                 pstate->p_sourcetext = queryString;
     741                 :          55 :                 nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     742                 :             :                                                                                            AccessShareLock, NULL,
     743                 :             :                                                                                            false, false);
     744                 :          55 :                 addNSItemToQuery(pstate, nsitem, false, true, true);
     745                 :             : 
     746                 :         110 :                 whereclause = transformWhereClause(pstate,
     747                 :          55 :                                                                                    copyObject(pri->whereClause),
     748                 :             :                                                                                    EXPR_KIND_WHERE,
     749                 :             :                                                                                    "PUBLICATION WHERE");
     750                 :             : 
     751                 :             :                 /* Fix up collation information */
     752                 :          55 :                 assign_expr_collations(pstate, whereclause);
     753                 :             : 
     754                 :          55 :                 whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
     755                 :             : 
     756                 :             :                 /*
     757                 :             :                  * We allow only simple expressions in row filters. See
     758                 :             :                  * check_simple_rowfilter_expr_walker.
     759                 :             :                  */
     760                 :          55 :                 check_simple_rowfilter_expr(whereclause, pstate);
     761                 :             : 
     762                 :          55 :                 free_parsestate(pstate);
     763                 :             : 
     764                 :          55 :                 pri->whereClause = whereclause;
     765      [ -  +  + ]:         153 :         }
     766                 :         141 : }
     767                 :             : 
     768                 :             : 
     769                 :             : /*
     770                 :             :  * Given a list of tables that are going to be added to a publication,
     771                 :             :  * verify that they fulfill the necessary preconditions, namely: no tables
     772                 :             :  * have a column list if any schema is published; and partitioned tables do
     773                 :             :  * not have column lists if publish_via_partition_root is not set.
     774                 :             :  *
     775                 :             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     776                 :             :  * SCHEMA elements (newly added in this command, or preexisting).
     777                 :             :  * 'pubviaroot' is the value of publish_via_partition_root.
     778                 :             :  */
     779                 :             : static void
     780                 :         141 : CheckPubRelationColumnList(char *pubname, List *tables,
     781                 :             :                                                    bool publish_schema, bool pubviaroot)
     782                 :             : {
     783                 :         141 :         ListCell   *lc;
     784                 :             : 
     785   [ +  +  +  +  :         280 :         foreach(lc, tables)
                   +  + ]
     786                 :             :         {
     787                 :         144 :                 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     788                 :             : 
     789         [ +  + ]:         144 :                 if (pri->columns == NIL)
     790                 :          90 :                         continue;
     791                 :             : 
     792                 :             :                 /*
     793                 :             :                  * Disallow specifying column list if any schema is in the
     794                 :             :                  * publication.
     795                 :             :                  *
     796                 :             :                  * XXX We could instead just forbid the case when the publication
     797                 :             :                  * tries to publish the table with a column list and a schema for that
     798                 :             :                  * table. However, if we do that then we need a restriction during
     799                 :             :                  * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     800                 :             :                  * seem to be a good idea.
     801                 :             :                  */
     802         [ +  + ]:          54 :                 if (publish_schema)
     803   [ +  -  +  - ]:           4 :                         ereport(ERROR,
     804                 :             :                                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     805                 :             :                                         errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     806                 :             :                                                    get_namespace_name(RelationGetNamespace(pri->relation)),
     807                 :             :                                                    RelationGetRelationName(pri->relation), pubname),
     808                 :             :                                         errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     809                 :             : 
     810                 :             :                 /*
     811                 :             :                  * If the publication doesn't publish changes via the root partitioned
     812                 :             :                  * table, the partition's column list will be used. So disallow using
     813                 :             :                  * a column list on the partitioned table in this case.
     814                 :             :                  */
     815   [ +  +  +  + ]:          50 :                 if (!pubviaroot &&
     816                 :          42 :                         pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     817   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     818                 :             :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     819                 :             :                                          errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     820                 :             :                                                         get_namespace_name(RelationGetNamespace(pri->relation)),
     821                 :             :                                                         RelationGetRelationName(pri->relation), pubname),
     822                 :             :                                          errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     823                 :             :                                                            "publish_via_partition_root")));
     824      [ -  +  + ]:         139 :         }
     825                 :         136 : }
     826                 :             : 
     827                 :             : /*
     828                 :             :  * Create new publication.
     829                 :             :  */
     830                 :             : ObjectAddress
     831                 :          70 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     832                 :             : {
     833                 :          70 :         Relation        rel;
     834                 :             :         ObjectAddress myself;
     835                 :          70 :         Oid                     puboid;
     836                 :          70 :         bool            nulls[Natts_pg_publication];
     837                 :          70 :         Datum           values[Natts_pg_publication];
     838                 :          70 :         HeapTuple       tup;
     839                 :          70 :         bool            publish_given;
     840                 :          70 :         PublicationActions pubactions;
     841                 :          70 :         bool            publish_via_partition_root_given;
     842                 :          70 :         bool            publish_via_partition_root;
     843                 :          70 :         bool            publish_generated_columns_given;
     844                 :          70 :         char            publish_generated_columns;
     845                 :          70 :         AclResult       aclresult;
     846                 :          70 :         List       *relations = NIL;
     847                 :          70 :         List       *schemaidlist = NIL;
     848                 :             : 
     849                 :             :         /* must have CREATE privilege on database */
     850                 :          70 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     851         [ +  + ]:          70 :         if (aclresult != ACLCHECK_OK)
     852                 :           2 :                 aclcheck_error(aclresult, OBJECT_DATABASE,
     853                 :           1 :                                            get_database_name(MyDatabaseId));
     854                 :             : 
     855                 :             :         /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
     856         [ +  + ]:          70 :         if (!superuser())
     857                 :             :         {
     858         [ +  - ]:           3 :                 if (stmt->for_all_tables || stmt->for_all_sequences)
     859   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     860                 :             :                                         errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     861                 :             :                                         errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
     862                 :           3 :         }
     863                 :             : 
     864                 :          70 :         rel = table_open(PublicationRelationId, RowExclusiveLock);
     865                 :             : 
     866                 :             :         /* Check if name is used */
     867                 :          70 :         puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     868                 :             :                                                          CStringGetDatum(stmt->pubname));
     869         [ +  + ]:          70 :         if (OidIsValid(puboid))
     870   [ +  -  +  - ]:           1 :                 ereport(ERROR,
     871                 :             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     872                 :             :                                  errmsg("publication \"%s\" already exists",
     873                 :             :                                                 stmt->pubname)));
     874                 :             : 
     875                 :             :         /* Form a tuple. */
     876                 :          69 :         memset(values, 0, sizeof(values));
     877                 :          69 :         memset(nulls, false, sizeof(nulls));
     878                 :             : 
     879                 :          69 :         values[Anum_pg_publication_pubname - 1] =
     880                 :          69 :                 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     881                 :          69 :         values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     882                 :             : 
     883                 :         138 :         parse_publication_options(pstate,
     884                 :          69 :                                                           stmt->options,
     885                 :             :                                                           &publish_given, &pubactions,
     886                 :             :                                                           &publish_via_partition_root_given,
     887                 :             :                                                           &publish_via_partition_root,
     888                 :             :                                                           &publish_generated_columns_given,
     889                 :             :                                                           &publish_generated_columns);
     890                 :             : 
     891   [ +  +  -  + ]:          72 :         if (stmt->for_all_sequences &&
     892   [ +  -  +  - ]:           3 :                 (publish_given || publish_via_partition_root_given ||
     893                 :           3 :                  publish_generated_columns_given))
     894   [ #  #  #  # ]:           0 :                 ereport(NOTICE,
     895                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     896                 :             :                                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
     897                 :             : 
     898                 :          69 :         puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     899                 :             :                                                                 Anum_pg_publication_oid);
     900                 :          69 :         values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     901                 :          69 :         values[Anum_pg_publication_puballtables - 1] =
     902                 :          69 :                 BoolGetDatum(stmt->for_all_tables);
     903                 :          69 :         values[Anum_pg_publication_puballsequences - 1] =
     904                 :          69 :                 BoolGetDatum(stmt->for_all_sequences);
     905                 :          69 :         values[Anum_pg_publication_pubinsert - 1] =
     906                 :          69 :                 BoolGetDatum(pubactions.pubinsert);
     907                 :          69 :         values[Anum_pg_publication_pubupdate - 1] =
     908                 :          69 :                 BoolGetDatum(pubactions.pubupdate);
     909                 :          69 :         values[Anum_pg_publication_pubdelete - 1] =
     910                 :          69 :                 BoolGetDatum(pubactions.pubdelete);
     911                 :          69 :         values[Anum_pg_publication_pubtruncate - 1] =
     912                 :          69 :                 BoolGetDatum(pubactions.pubtruncate);
     913                 :          69 :         values[Anum_pg_publication_pubviaroot - 1] =
     914                 :          69 :                 BoolGetDatum(publish_via_partition_root);
     915                 :          69 :         values[Anum_pg_publication_pubgencols - 1] =
     916                 :          69 :                 CharGetDatum(publish_generated_columns);
     917                 :             : 
     918                 :          69 :         tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     919                 :             : 
     920                 :             :         /* Insert tuple into catalog. */
     921                 :          69 :         CatalogTupleInsert(rel, tup);
     922                 :          69 :         heap_freetuple(tup);
     923                 :             : 
     924                 :          69 :         recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     925                 :             : 
     926                 :          69 :         ObjectAddressSet(myself, PublicationRelationId, puboid);
     927                 :             : 
     928                 :             :         /* Make the changes visible. */
     929                 :          69 :         CommandCounterIncrement();
     930                 :             : 
     931                 :             :         /* Associate objects with the publication. */
     932         [ +  + ]:          69 :         if (stmt->for_all_tables)
     933                 :             :         {
     934                 :             :                 /*
     935                 :             :                  * Invalidate relcache so that publication info is rebuilt. Sequences
     936                 :             :                  * publication doesn't require invalidation, as replica identity
     937                 :             :                  * checks don't apply to them.
     938                 :             :                  */
     939                 :           7 :                 CacheInvalidateRelcacheAll();
     940                 :           7 :         }
     941         [ +  + ]:          62 :         else if (!stmt->for_all_sequences)
     942                 :             :         {
     943                 :          60 :                 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     944                 :             :                                                                    &schemaidlist);
     945                 :             : 
     946                 :             :                 /* FOR TABLES IN SCHEMA requires superuser */
     947   [ +  +  +  + ]:          60 :                 if (schemaidlist != NIL && !superuser())
     948   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     949                 :             :                                         errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     950                 :             :                                         errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     951                 :             : 
     952         [ +  + ]:          59 :                 if (relations != NIL)
     953                 :             :                 {
     954                 :          45 :                         List       *rels;
     955                 :             : 
     956                 :          45 :                         rels = OpenTableList(relations);
     957                 :          90 :                         TransformPubWhereClauses(rels, pstate->p_sourcetext,
     958                 :          45 :                                                                          publish_via_partition_root);
     959                 :             : 
     960                 :          90 :                         CheckPubRelationColumnList(stmt->pubname, rels,
     961                 :          45 :                                                                            schemaidlist != NIL,
     962                 :          45 :                                                                            publish_via_partition_root);
     963                 :             : 
     964                 :          45 :                         PublicationAddTables(puboid, rels, true, NULL);
     965                 :          45 :                         CloseTableList(rels);
     966                 :          45 :                 }
     967                 :             : 
     968         [ +  + ]:          59 :                 if (schemaidlist != NIL)
     969                 :             :                 {
     970                 :             :                         /*
     971                 :             :                          * Schema lock is held until the publication is created to prevent
     972                 :             :                          * concurrent schema deletion.
     973                 :             :                          */
     974                 :          24 :                         LockSchemaList(schemaidlist);
     975                 :          24 :                         PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     976                 :          24 :                 }
     977                 :          59 :         }
     978                 :             : 
     979                 :          68 :         table_close(rel, RowExclusiveLock);
     980                 :             : 
     981         [ +  - ]:          68 :         InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     982                 :             : 
     983                 :             :         /*
     984                 :             :          * We don't need this warning message when wal_level >= 'replica' since
     985                 :             :          * logical decoding is automatically enabled up on a logical slot
     986                 :             :          * creation.
     987                 :             :          */
     988         [ -  + ]:          68 :         if (wal_level < WAL_LEVEL_REPLICA)
     989   [ -  +  +  - ]:          68 :                 ereport(WARNING,
     990                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     991                 :             :                                  errmsg("logical decoding must be enabled to publish logical changes"),
     992                 :             :                                  errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
     993                 :             : 
     994                 :             :         return myself;
     995                 :          68 : }
     996                 :             : 
     997                 :             : /*
     998                 :             :  * Change options of a publication.
     999                 :             :  */
    1000                 :             : static void
    1001                 :          20 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
    1002                 :             :                                                 Relation rel, HeapTuple tup)
    1003                 :             : {
    1004                 :          20 :         bool            nulls[Natts_pg_publication];
    1005                 :          20 :         bool            replaces[Natts_pg_publication];
    1006                 :          20 :         Datum           values[Natts_pg_publication];
    1007                 :          20 :         bool            publish_given;
    1008                 :          20 :         PublicationActions pubactions;
    1009                 :          20 :         bool            publish_via_partition_root_given;
    1010                 :          20 :         bool            publish_via_partition_root;
    1011                 :          20 :         bool            publish_generated_columns_given;
    1012                 :          20 :         char            publish_generated_columns;
    1013                 :          20 :         ObjectAddress obj;
    1014                 :          20 :         Form_pg_publication pubform;
    1015                 :          20 :         List       *root_relids = NIL;
    1016                 :          20 :         ListCell   *lc;
    1017                 :             : 
    1018                 :          20 :         pubform = (Form_pg_publication) GETSTRUCT(tup);
    1019                 :             : 
    1020                 :          40 :         parse_publication_options(pstate,
    1021                 :          20 :                                                           stmt->options,
    1022                 :             :                                                           &publish_given, &pubactions,
    1023                 :             :                                                           &publish_via_partition_root_given,
    1024                 :             :                                                           &publish_via_partition_root,
    1025                 :             :                                                           &publish_generated_columns_given,
    1026                 :             :                                                           &publish_generated_columns);
    1027                 :             : 
    1028   [ +  +  +  - ]:          21 :         if (pubform->puballsequences &&
    1029   [ +  +  +  - ]:           2 :                 (publish_given || publish_via_partition_root_given ||
    1030                 :           1 :                  publish_generated_columns_given))
    1031   [ -  +  +  - ]:           2 :                 ereport(NOTICE,
    1032                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1033                 :             :                                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
    1034                 :             : 
    1035                 :             :         /*
    1036                 :             :          * If the publication doesn't publish changes via the root partitioned
    1037                 :             :          * table, the partition's row filter and column list will be used. So
    1038                 :             :          * disallow using WHERE clause and column lists on partitioned table in
    1039                 :             :          * this case.
    1040                 :             :          */
    1041   [ +  +  +  +  :          20 :         if (!pubform->puballtables && publish_via_partition_root_given &&
                   +  + ]
    1042                 :          13 :                 !publish_via_partition_root)
    1043                 :             :         {
    1044                 :             :                 /*
    1045                 :             :                  * Lock the publication so nobody else can do anything with it. This
    1046                 :             :                  * prevents concurrent alter to add partitioned table(s) with WHERE
    1047                 :             :                  * clause(s) and/or column lists which we don't allow when not
    1048                 :             :                  * publishing via root.
    1049                 :             :                  */
    1050                 :           8 :                 LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
    1051                 :             :                                                    AccessShareLock);
    1052                 :             : 
    1053                 :           8 :                 root_relids = GetPublicationRelations(pubform->oid,
    1054                 :             :                                                                                           PUBLICATION_PART_ROOT);
    1055                 :             : 
    1056   [ +  -  +  +  :          14 :                 foreach(lc, root_relids)
                   +  + ]
    1057                 :             :                 {
    1058                 :           8 :                         Oid                     relid = lfirst_oid(lc);
    1059                 :           8 :                         HeapTuple       rftuple;
    1060                 :           8 :                         char            relkind;
    1061                 :           8 :                         char       *relname;
    1062                 :           8 :                         bool            has_rowfilter;
    1063                 :           8 :                         bool            has_collist;
    1064                 :             : 
    1065                 :             :                         /*
    1066                 :             :                          * Beware: we don't have lock on the relations, so cope silently
    1067                 :             :                          * with the cache lookups returning NULL.
    1068                 :             :                          */
    1069                 :             : 
    1070                 :           8 :                         rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1071                 :           8 :                                                                           ObjectIdGetDatum(relid),
    1072                 :           8 :                                                                           ObjectIdGetDatum(pubform->oid));
    1073         [ +  - ]:           8 :                         if (!HeapTupleIsValid(rftuple))
    1074                 :           0 :                                 continue;
    1075                 :           8 :                         has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
    1076                 :           8 :                         has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
    1077   [ +  +  +  + ]:           8 :                         if (!has_rowfilter && !has_collist)
    1078                 :             :                         {
    1079                 :           2 :                                 ReleaseSysCache(rftuple);
    1080                 :           2 :                                 continue;
    1081                 :             :                         }
    1082                 :             : 
    1083                 :           6 :                         relkind = get_rel_relkind(relid);
    1084         [ +  + ]:           6 :                         if (relkind != RELKIND_PARTITIONED_TABLE)
    1085                 :             :                         {
    1086                 :           4 :                                 ReleaseSysCache(rftuple);
    1087                 :           4 :                                 continue;
    1088                 :             :                         }
    1089                 :           2 :                         relname = get_rel_name(relid);
    1090         [ -  + ]:           2 :                         if (relname == NULL)    /* table concurrently dropped */
    1091                 :             :                         {
    1092                 :           0 :                                 ReleaseSysCache(rftuple);
    1093                 :           0 :                                 continue;
    1094                 :             :                         }
    1095                 :             : 
    1096         [ +  + ]:           2 :                         if (has_rowfilter)
    1097   [ +  -  +  - ]:           1 :                                 ereport(ERROR,
    1098                 :             :                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1099                 :             :                                                  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1100                 :             :                                                                 "publish_via_partition_root",
    1101                 :             :                                                                 stmt->pubname),
    1102                 :             :                                                  errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1103                 :             :                                                                    relname, "publish_via_partition_root")));
    1104         [ +  - ]:           1 :                         Assert(has_collist);
    1105   [ +  -  +  - ]:           1 :                         ereport(ERROR,
    1106                 :             :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1107                 :             :                                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1108                 :             :                                                         "publish_via_partition_root",
    1109                 :             :                                                         stmt->pubname),
    1110                 :             :                                          errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1111                 :             :                                                            relname, "publish_via_partition_root")));
    1112      [ -  +  - ]:           6 :                 }
    1113                 :           6 :         }
    1114                 :             : 
    1115                 :             :         /* Everything ok, form a new tuple. */
    1116                 :          18 :         memset(values, 0, sizeof(values));
    1117                 :          18 :         memset(nulls, false, sizeof(nulls));
    1118                 :          18 :         memset(replaces, false, sizeof(replaces));
    1119                 :             : 
    1120         [ +  + ]:          18 :         if (publish_given)
    1121                 :             :         {
    1122                 :           5 :                 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1123                 :           5 :                 replaces[Anum_pg_publication_pubinsert - 1] = true;
    1124                 :             : 
    1125                 :           5 :                 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1126                 :           5 :                 replaces[Anum_pg_publication_pubupdate - 1] = true;
    1127                 :             : 
    1128                 :           5 :                 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1129                 :           5 :                 replaces[Anum_pg_publication_pubdelete - 1] = true;
    1130                 :             : 
    1131                 :           5 :                 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1132                 :           5 :                 replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1133                 :           5 :         }
    1134                 :             : 
    1135         [ +  + ]:          18 :         if (publish_via_partition_root_given)
    1136                 :             :         {
    1137                 :          11 :                 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1138                 :          11 :                 replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1139                 :          11 :         }
    1140                 :             : 
    1141         [ +  + ]:          18 :         if (publish_generated_columns_given)
    1142                 :             :         {
    1143                 :           2 :                 values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
    1144                 :           2 :                 replaces[Anum_pg_publication_pubgencols - 1] = true;
    1145                 :           2 :         }
    1146                 :             : 
    1147                 :          36 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1148                 :          18 :                                                         replaces);
    1149                 :             : 
    1150                 :             :         /* Update the catalog. */
    1151                 :          18 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    1152                 :             : 
    1153                 :          18 :         CommandCounterIncrement();
    1154                 :             : 
    1155                 :          18 :         pubform = (Form_pg_publication) GETSTRUCT(tup);
    1156                 :             : 
    1157                 :             :         /* Invalidate the relcache. */
    1158         [ +  + ]:          18 :         if (pubform->puballtables)
    1159                 :             :         {
    1160                 :           3 :                 CacheInvalidateRelcacheAll();
    1161                 :           3 :         }
    1162                 :             :         else
    1163                 :             :         {
    1164                 :          15 :                 List       *relids = NIL;
    1165                 :          15 :                 List       *schemarelids = NIL;
    1166                 :             : 
    1167                 :             :                 /*
    1168                 :             :                  * For any partitioned tables contained in the publication, we must
    1169                 :             :                  * invalidate all partitions contained in the respective partition
    1170                 :             :                  * trees, not just those explicitly mentioned in the publication.
    1171                 :             :                  */
    1172         [ +  + ]:          15 :                 if (root_relids == NIL)
    1173                 :           9 :                         relids = GetPublicationRelations(pubform->oid,
    1174                 :             :                                                                                          PUBLICATION_PART_ALL);
    1175                 :             :                 else
    1176                 :             :                 {
    1177                 :             :                         /*
    1178                 :             :                          * We already got tables explicitly mentioned in the publication.
    1179                 :             :                          * Now get all partitions for the partitioned table in the list.
    1180                 :             :                          */
    1181   [ +  -  +  +  :          12 :                         foreach(lc, root_relids)
                   +  + ]
    1182                 :          12 :                                 relids = GetPubPartitionOptionRelations(relids,
    1183                 :             :                                                                                                                 PUBLICATION_PART_ALL,
    1184                 :           6 :                                                                                                                 lfirst_oid(lc));
    1185                 :             :                 }
    1186                 :             : 
    1187                 :          15 :                 schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1188                 :             :                                                                                                                 PUBLICATION_PART_ALL);
    1189                 :          15 :                 relids = list_concat_unique_oid(relids, schemarelids);
    1190                 :             : 
    1191                 :          15 :                 InvalidatePublicationRels(relids);
    1192                 :          15 :         }
    1193                 :             : 
    1194                 :          18 :         ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1195                 :          18 :         EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1196                 :          18 :                                                                          (Node *) stmt);
    1197                 :             : 
    1198         [ +  - ]:          18 :         InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1199                 :          18 : }
    1200                 :             : 
    1201                 :             : /*
    1202                 :             :  * Invalidate the relations.
    1203                 :             :  */
    1204                 :             : void
    1205                 :         325 : InvalidatePublicationRels(List *relids)
    1206                 :             : {
    1207                 :             :         /*
    1208                 :             :          * We don't want to send too many individual messages, at some point it's
    1209                 :             :          * cheaper to just reset whole relcache.
    1210                 :             :          */
    1211         [ +  - ]:         325 :         if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1212                 :             :         {
    1213                 :         325 :                 ListCell   *lc;
    1214                 :             : 
    1215   [ +  +  +  +  :        3246 :                 foreach(lc, relids)
                   +  + ]
    1216                 :        2921 :                         CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1217                 :         325 :         }
    1218                 :             :         else
    1219                 :           0 :                 CacheInvalidateRelcacheAll();
    1220                 :         325 : }
    1221                 :             : 
    1222                 :             : /*
    1223                 :             :  * Add or remove table to/from publication.
    1224                 :             :  */
    1225                 :             : static void
    1226                 :         130 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1227                 :             :                                            List *tables, const char *queryString,
    1228                 :             :                                            bool publish_schema)
    1229                 :             : {
    1230                 :         130 :         List       *rels = NIL;
    1231                 :         130 :         Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1232                 :         130 :         Oid                     pubid = pubform->oid;
    1233                 :             : 
    1234                 :             :         /*
    1235                 :             :          * Nothing to do if no objects, except in SET: for that it is quite
    1236                 :             :          * possible that user has not specified any tables in which case we need
    1237                 :             :          * to remove all the existing tables.
    1238                 :             :          */
    1239   [ +  +  +  + ]:         130 :         if (!tables && stmt->action != AP_SetObjects)
    1240                 :          10 :                 return;
    1241                 :             : 
    1242                 :         120 :         rels = OpenTableList(tables);
    1243                 :             : 
    1244         [ +  + ]:         120 :         if (stmt->action == AP_AddObjects)
    1245                 :             :         {
    1246                 :          38 :                 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1247                 :             : 
    1248                 :          38 :                 publish_schema |= is_schema_publication(pubid);
    1249                 :             : 
    1250                 :          76 :                 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1251                 :          38 :                                                                    pubform->pubviaroot);
    1252                 :             : 
    1253                 :          38 :                 PublicationAddTables(pubid, rels, false, stmt);
    1254                 :          38 :         }
    1255         [ +  + ]:          82 :         else if (stmt->action == AP_DropObjects)
    1256                 :          16 :                 PublicationDropTables(pubid, rels, false);
    1257                 :             :         else                                            /* AP_SetObjects */
    1258                 :             :         {
    1259                 :          66 :                 List       *oldrelids = GetPublicationRelations(pubid,
    1260                 :             :                                                                                                                 PUBLICATION_PART_ROOT);
    1261                 :          66 :                 List       *delrels = NIL;
    1262                 :          66 :                 ListCell   *oldlc;
    1263                 :             : 
    1264                 :          66 :                 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1265                 :             : 
    1266                 :         132 :                 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1267                 :          66 :                                                                    pubform->pubviaroot);
    1268                 :             : 
    1269                 :             :                 /*
    1270                 :             :                  * To recreate the relation list for the publication, look for
    1271                 :             :                  * existing relations that do not need to be dropped.
    1272                 :             :                  */
    1273   [ +  +  +  +  :         131 :                 foreach(oldlc, oldrelids)
                   +  + ]
    1274                 :             :                 {
    1275                 :          65 :                         Oid                     oldrelid = lfirst_oid(oldlc);
    1276                 :          65 :                         ListCell   *newlc;
    1277                 :          65 :                         PublicationRelInfo *oldrel;
    1278                 :          65 :                         bool            found = false;
    1279                 :          65 :                         HeapTuple       rftuple;
    1280                 :          65 :                         Node       *oldrelwhereclause = NULL;
    1281                 :          65 :                         Bitmapset  *oldcolumns = NULL;
    1282                 :             : 
    1283                 :             :                         /* look up the cache for the old relmap */
    1284                 :          65 :                         rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1285                 :          65 :                                                                           ObjectIdGetDatum(oldrelid),
    1286                 :          65 :                                                                           ObjectIdGetDatum(pubid));
    1287                 :             : 
    1288                 :             :                         /*
    1289                 :             :                          * See if the existing relation currently has a WHERE clause or a
    1290                 :             :                          * column list. We need to compare those too.
    1291                 :             :                          */
    1292         [ -  + ]:          65 :                         if (HeapTupleIsValid(rftuple))
    1293                 :             :                         {
    1294                 :          65 :                                 bool            isnull = true;
    1295                 :          65 :                                 Datum           whereClauseDatum;
    1296                 :          65 :                                 Datum           columnListDatum;
    1297                 :             : 
    1298                 :             :                                 /* Load the WHERE clause for this table. */
    1299                 :          65 :                                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1300                 :             :                                                                                                    Anum_pg_publication_rel_prqual,
    1301                 :             :                                                                                                    &isnull);
    1302         [ +  + ]:          65 :                                 if (!isnull)
    1303                 :          34 :                                         oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1304                 :             : 
    1305                 :             :                                 /* Transform the int2vector column list to a bitmap. */
    1306                 :          65 :                                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1307                 :             :                                                                                                   Anum_pg_publication_rel_prattrs,
    1308                 :             :                                                                                                   &isnull);
    1309                 :             : 
    1310         [ +  + ]:          65 :                                 if (!isnull)
    1311                 :          22 :                                         oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1312                 :             : 
    1313                 :          65 :                                 ReleaseSysCache(rftuple);
    1314                 :          65 :                         }
    1315                 :             : 
    1316   [ +  +  +  +  :         129 :                         foreach(newlc, rels)
                   +  + ]
    1317                 :             :                         {
    1318                 :          64 :                                 PublicationRelInfo *newpubrel;
    1319                 :          64 :                                 Oid                     newrelid;
    1320                 :          64 :                                 Bitmapset  *newcolumns = NULL;
    1321                 :             : 
    1322                 :          64 :                                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1323                 :          64 :                                 newrelid = RelationGetRelid(newpubrel->relation);
    1324                 :             : 
    1325                 :             :                                 /*
    1326                 :             :                                  * Validate the column list.  If the column list or WHERE
    1327                 :             :                                  * clause changes, then the validation done here will be
    1328                 :             :                                  * duplicated inside PublicationAddTables().  The validation
    1329                 :             :                                  * is cheap enough that that seems harmless.
    1330                 :             :                                  */
    1331                 :         128 :                                 newcolumns = pub_collist_validate(newpubrel->relation,
    1332                 :          64 :                                                                                                   newpubrel->columns);
    1333                 :             : 
    1334                 :             :                                 /*
    1335                 :             :                                  * Check if any of the new set of relations matches with the
    1336                 :             :                                  * existing relations in the publication. Additionally, if the
    1337                 :             :                                  * relation has an associated WHERE clause, check the WHERE
    1338                 :             :                                  * expressions also match. Same for the column list. Drop the
    1339                 :             :                                  * rest.
    1340                 :             :                                  */
    1341         [ +  + ]:          64 :                                 if (newrelid == oldrelid)
    1342                 :             :                                 {
    1343   [ +  +  +  + ]:          32 :                                         if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1344                 :          12 :                                                 bms_equal(oldcolumns, newcolumns))
    1345                 :             :                                         {
    1346                 :           2 :                                                 found = true;
    1347                 :           2 :                                                 break;
    1348                 :             :                                         }
    1349                 :          30 :                                 }
    1350         [ +  + ]:          64 :                         }
    1351                 :             : 
    1352                 :             :                         /*
    1353                 :             :                          * Add the non-matched relations to a list so that they can be
    1354                 :             :                          * dropped.
    1355                 :             :                          */
    1356         [ +  + ]:          65 :                         if (!found)
    1357                 :             :                         {
    1358                 :          61 :                                 oldrel = palloc_object(PublicationRelInfo);
    1359                 :          61 :                                 oldrel->whereClause = NULL;
    1360                 :          61 :                                 oldrel->columns = NIL;
    1361                 :          61 :                                 oldrel->relation = table_open(oldrelid,
    1362                 :             :                                                                                           ShareUpdateExclusiveLock);
    1363                 :          61 :                                 delrels = lappend(delrels, oldrel);
    1364                 :          61 :                         }
    1365                 :          65 :                 }
    1366                 :             : 
    1367                 :             :                 /* And drop them. */
    1368                 :          66 :                 PublicationDropTables(pubid, delrels, true);
    1369                 :             : 
    1370                 :             :                 /*
    1371                 :             :                  * Don't bother calculating the difference for adding, we'll catch and
    1372                 :             :                  * skip existing ones when doing catalog update.
    1373                 :             :                  */
    1374                 :          66 :                 PublicationAddTables(pubid, rels, true, stmt);
    1375                 :             : 
    1376                 :          66 :                 CloseTableList(delrels);
    1377                 :          66 :         }
    1378                 :             : 
    1379                 :         120 :         CloseTableList(rels);
    1380         [ -  + ]:         130 : }
    1381                 :             : 
    1382                 :             : /*
    1383                 :             :  * Alter the publication schemas.
    1384                 :             :  *
    1385                 :             :  * Add or remove schemas to/from publication.
    1386                 :             :  */
    1387                 :             : static void
    1388                 :         115 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1389                 :             :                                                 HeapTuple tup, List *schemaidlist)
    1390                 :             : {
    1391                 :         115 :         Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1392                 :             : 
    1393                 :             :         /*
    1394                 :             :          * Nothing to do if no objects, except in SET: for that it is quite
    1395                 :             :          * possible that user has not specified any schemas in which case we need
    1396                 :             :          * to remove all the existing schemas.
    1397                 :             :          */
    1398   [ +  +  +  + ]:         115 :         if (!schemaidlist && stmt->action != AP_SetObjects)
    1399                 :          39 :                 return;
    1400                 :             : 
    1401                 :             :         /*
    1402                 :             :          * Schema lock is held until the publication is altered to prevent
    1403                 :             :          * concurrent schema deletion.
    1404                 :             :          */
    1405                 :          76 :         LockSchemaList(schemaidlist);
    1406         [ +  + ]:          76 :         if (stmt->action == AP_AddObjects)
    1407                 :             :         {
    1408                 :           4 :                 ListCell   *lc;
    1409                 :           4 :                 List       *reloids;
    1410                 :             : 
    1411                 :           4 :                 reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1412                 :             : 
    1413   [ +  +  +  +  :           5 :                 foreach(lc, reloids)
                   +  + ]
    1414                 :             :                 {
    1415                 :           2 :                         HeapTuple       coltuple;
    1416                 :             : 
    1417                 :           2 :                         coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1418                 :           2 :                                                                            ObjectIdGetDatum(lfirst_oid(lc)),
    1419                 :           2 :                                                                            ObjectIdGetDatum(pubform->oid));
    1420                 :             : 
    1421         [ +  - ]:           2 :                         if (!HeapTupleIsValid(coltuple))
    1422                 :           0 :                                 continue;
    1423                 :             : 
    1424                 :             :                         /*
    1425                 :             :                          * Disallow adding schema if column list is already part of the
    1426                 :             :                          * publication. See CheckPubRelationColumnList.
    1427                 :             :                          */
    1428         [ +  + ]:           2 :                         if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1429   [ +  -  +  - ]:           1 :                                 ereport(ERROR,
    1430                 :             :                                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1431                 :             :                                                 errmsg("cannot add schema to publication \"%s\"",
    1432                 :             :                                                            stmt->pubname),
    1433                 :             :                                                 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1434                 :             : 
    1435                 :           1 :                         ReleaseSysCache(coltuple);
    1436         [ -  + ]:           1 :                 }
    1437                 :             : 
    1438                 :           3 :                 PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1439                 :           3 :         }
    1440         [ +  + ]:          72 :         else if (stmt->action == AP_DropObjects)
    1441                 :           6 :                 PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1442                 :             :         else                                            /* AP_SetObjects */
    1443                 :             :         {
    1444                 :          66 :                 List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1445                 :          66 :                 List       *delschemas = NIL;
    1446                 :             : 
    1447                 :             :                 /* Identify which schemas should be dropped */
    1448                 :          66 :                 delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1449                 :             : 
    1450                 :             :                 /*
    1451                 :             :                  * Schema lock is held until the publication is altered to prevent
    1452                 :             :                  * concurrent schema deletion.
    1453                 :             :                  */
    1454                 :          66 :                 LockSchemaList(delschemas);
    1455                 :             : 
    1456                 :             :                 /* And drop them */
    1457                 :          66 :                 PublicationDropSchemas(pubform->oid, delschemas, true);
    1458                 :             : 
    1459                 :             :                 /*
    1460                 :             :                  * Don't bother calculating the difference for adding, we'll catch and
    1461                 :             :                  * skip existing ones when doing catalog update.
    1462                 :             :                  */
    1463                 :          66 :                 PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1464                 :          66 :         }
    1465                 :         114 : }
    1466                 :             : 
    1467                 :             : /*
    1468                 :             :  * Check if relations and schemas can be in a given publication and throw
    1469                 :             :  * appropriate error if not.
    1470                 :             :  */
    1471                 :             : static void
    1472                 :         144 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1473                 :             :                                           List *tables, List *schemaidlist)
    1474                 :             : {
    1475                 :         144 :         Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1476                 :             : 
    1477   [ +  +  +  + ]:         144 :         if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1478         [ +  + ]:          75 :                 schemaidlist && !superuser())
    1479   [ +  -  +  - ]:           1 :                 ereport(ERROR,
    1480                 :             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1481                 :             :                                  errmsg("must be superuser to add or set schemas")));
    1482                 :             : 
    1483                 :             :         /*
    1484                 :             :          * Check that user is allowed to manipulate the publication tables in
    1485                 :             :          * schema
    1486                 :             :          */
    1487   [ +  +  +  + ]:          83 :         if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
    1488                 :             :         {
    1489   [ +  -  +  - ]:           3 :                 if (pubform->puballtables && pubform->puballsequences)
    1490   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1491                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1492                 :             :                                         errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1493                 :             :                                                    NameStr(pubform->pubname)),
    1494                 :             :                                         errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1495         [ +  - ]:           3 :                 else if (pubform->puballtables)
    1496   [ +  -  +  - ]:           3 :                         ereport(ERROR,
    1497                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1498                 :             :                                         errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1499                 :             :                                                    NameStr(pubform->pubname)),
    1500                 :             :                                         errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
    1501                 :             :                 else
    1502   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1503                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1504                 :             :                                         errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1505                 :             :                                                    NameStr(pubform->pubname)),
    1506                 :             :                                         errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1507                 :           0 :         }
    1508                 :             : 
    1509                 :             :         /* Check that user is allowed to manipulate the publication tables. */
    1510   [ +  +  +  + ]:          80 :         if (tables && (pubform->puballtables || pubform->puballsequences))
    1511                 :             :         {
    1512   [ +  -  +  - ]:           3 :                 if (pubform->puballtables && pubform->puballsequences)
    1513   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1514                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1515                 :             :                                         errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1516                 :             :                                                    NameStr(pubform->pubname)),
    1517                 :             :                                         errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1518         [ +  - ]:           3 :                 else if (pubform->puballtables)
    1519   [ +  -  +  - ]:           3 :                         ereport(ERROR,
    1520                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1521                 :             :                                         errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1522                 :             :                                                    NameStr(pubform->pubname)),
    1523                 :             :                                         errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
    1524                 :             :                 else
    1525   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1526                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1527                 :             :                                         errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1528                 :             :                                                    NameStr(pubform->pubname)),
    1529                 :             :                                         errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1530                 :           0 :         }
    1531                 :         167 : }
    1532                 :             : 
    1533                 :             : /*
    1534                 :             :  * Alter the existing publication.
    1535                 :             :  *
    1536                 :             :  * This is dispatcher function for AlterPublicationOptions,
    1537                 :             :  * AlterPublicationSchemas and AlterPublicationTables.
    1538                 :             :  */
    1539                 :             : void
    1540                 :         157 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1541                 :             : {
    1542                 :         157 :         Relation        rel;
    1543                 :         157 :         HeapTuple       tup;
    1544                 :         157 :         Form_pg_publication pubform;
    1545                 :             : 
    1546                 :         157 :         rel = table_open(PublicationRelationId, RowExclusiveLock);
    1547                 :             : 
    1548                 :         157 :         tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1549                 :             :                                                           CStringGetDatum(stmt->pubname));
    1550                 :             : 
    1551         [ +  - ]:         157 :         if (!HeapTupleIsValid(tup))
    1552   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1553                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1554                 :             :                                  errmsg("publication \"%s\" does not exist",
    1555                 :             :                                                 stmt->pubname)));
    1556                 :             : 
    1557                 :         157 :         pubform = (Form_pg_publication) GETSTRUCT(tup);
    1558                 :             : 
    1559                 :             :         /* must be owner */
    1560         [ +  - ]:         157 :         if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1561                 :           0 :                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1562                 :           0 :                                            stmt->pubname);
    1563                 :             : 
    1564         [ +  + ]:         157 :         if (stmt->options)
    1565                 :          20 :                 AlterPublicationOptions(pstate, stmt, rel, tup);
    1566                 :             :         else
    1567                 :             :         {
    1568                 :         137 :                 List       *relations = NIL;
    1569                 :         137 :                 List       *schemaidlist = NIL;
    1570                 :         137 :                 Oid                     pubid = pubform->oid;
    1571                 :             : 
    1572                 :         137 :                 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1573                 :             :                                                                    &schemaidlist);
    1574                 :             : 
    1575                 :         137 :                 CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1576                 :             : 
    1577                 :         137 :                 heap_freetuple(tup);
    1578                 :             : 
    1579                 :             :                 /* Lock the publication so nobody else can do anything with it. */
    1580                 :         137 :                 LockDatabaseObject(PublicationRelationId, pubid, 0,
    1581                 :             :                                                    AccessExclusiveLock);
    1582                 :             : 
    1583                 :             :                 /*
    1584                 :             :                  * It is possible that by the time we acquire the lock on publication,
    1585                 :             :                  * concurrent DDL has removed it. We can test this by checking the
    1586                 :             :                  * existence of publication. We get the tuple again to avoid the risk
    1587                 :             :                  * of any publication option getting changed.
    1588                 :             :                  */
    1589                 :         137 :                 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1590         [ +  - ]:         137 :                 if (!HeapTupleIsValid(tup))
    1591   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1592                 :             :                                         errcode(ERRCODE_UNDEFINED_OBJECT),
    1593                 :             :                                         errmsg("publication \"%s\" does not exist",
    1594                 :             :                                                    stmt->pubname));
    1595                 :             : 
    1596                 :         274 :                 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1597                 :         137 :                                                            schemaidlist != NIL);
    1598                 :         137 :                 AlterPublicationSchemas(stmt, tup, schemaidlist);
    1599                 :         137 :         }
    1600                 :             : 
    1601                 :             :         /* Cleanup. */
    1602                 :         157 :         heap_freetuple(tup);
    1603                 :         157 :         table_close(rel, RowExclusiveLock);
    1604                 :         157 : }
    1605                 :             : 
    1606                 :             : /*
    1607                 :             :  * Remove relation from publication by mapping OID.
    1608                 :             :  */
    1609                 :             : void
    1610                 :         119 : RemovePublicationRelById(Oid proid)
    1611                 :             : {
    1612                 :         119 :         Relation        rel;
    1613                 :         119 :         HeapTuple       tup;
    1614                 :         119 :         Form_pg_publication_rel pubrel;
    1615                 :         119 :         List       *relids = NIL;
    1616                 :             : 
    1617                 :         119 :         rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1618                 :             : 
    1619                 :         119 :         tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1620                 :             : 
    1621         [ +  - ]:         119 :         if (!HeapTupleIsValid(tup))
    1622   [ #  #  #  # ]:           0 :                 elog(ERROR, "cache lookup failed for publication table %u",
    1623                 :             :                          proid);
    1624                 :             : 
    1625                 :         119 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1626                 :             : 
    1627                 :             :         /*
    1628                 :             :          * Invalidate relcache so that publication info is rebuilt.
    1629                 :             :          *
    1630                 :             :          * For the partitioned tables, we must invalidate all partitions contained
    1631                 :             :          * in the respective partition hierarchies, not just the one explicitly
    1632                 :             :          * mentioned in the publication. This is required because we implicitly
    1633                 :             :          * publish the child tables when the parent table is published.
    1634                 :             :          */
    1635                 :         238 :         relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1636                 :         119 :                                                                                         pubrel->prrelid);
    1637                 :             : 
    1638                 :         119 :         InvalidatePublicationRels(relids);
    1639                 :             : 
    1640                 :         119 :         CatalogTupleDelete(rel, &tup->t_self);
    1641                 :             : 
    1642                 :         119 :         ReleaseSysCache(tup);
    1643                 :             : 
    1644                 :         119 :         table_close(rel, RowExclusiveLock);
    1645                 :         119 : }
    1646                 :             : 
    1647                 :             : /*
    1648                 :             :  * Remove the publication by mapping OID.
    1649                 :             :  */
    1650                 :             : void
    1651                 :          63 : RemovePublicationById(Oid pubid)
    1652                 :             : {
    1653                 :          63 :         Relation        rel;
    1654                 :          63 :         HeapTuple       tup;
    1655                 :          63 :         Form_pg_publication pubform;
    1656                 :             : 
    1657                 :          63 :         rel = table_open(PublicationRelationId, RowExclusiveLock);
    1658                 :             : 
    1659                 :          63 :         tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1660         [ +  - ]:          63 :         if (!HeapTupleIsValid(tup))
    1661   [ #  #  #  # ]:           0 :                 elog(ERROR, "cache lookup failed for publication %u", pubid);
    1662                 :             : 
    1663                 :          63 :         pubform = (Form_pg_publication) GETSTRUCT(tup);
    1664                 :             : 
    1665                 :             :         /* Invalidate relcache so that publication info is rebuilt. */
    1666         [ +  + ]:          63 :         if (pubform->puballtables)
    1667                 :           7 :                 CacheInvalidateRelcacheAll();
    1668                 :             : 
    1669                 :          63 :         CatalogTupleDelete(rel, &tup->t_self);
    1670                 :             : 
    1671                 :          63 :         ReleaseSysCache(tup);
    1672                 :             : 
    1673                 :          63 :         table_close(rel, RowExclusiveLock);
    1674                 :          63 : }
    1675                 :             : 
    1676                 :             : /*
    1677                 :             :  * Remove schema from publication by mapping OID.
    1678                 :             :  */
    1679                 :             : void
    1680                 :          31 : RemovePublicationSchemaById(Oid psoid)
    1681                 :             : {
    1682                 :          31 :         Relation        rel;
    1683                 :          31 :         HeapTuple       tup;
    1684                 :          31 :         List       *schemaRels = NIL;
    1685                 :          31 :         Form_pg_publication_namespace pubsch;
    1686                 :             : 
    1687                 :          31 :         rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1688                 :             : 
    1689                 :          31 :         tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1690                 :             : 
    1691         [ +  - ]:          31 :         if (!HeapTupleIsValid(tup))
    1692   [ #  #  #  # ]:           0 :                 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1693                 :             : 
    1694                 :          31 :         pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1695                 :             : 
    1696                 :             :         /*
    1697                 :             :          * Invalidate relcache so that publication info is rebuilt. See
    1698                 :             :          * RemovePublicationRelById for why we need to consider all the
    1699                 :             :          * partitions.
    1700                 :             :          */
    1701                 :          31 :         schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1702                 :             :                                                                                            PUBLICATION_PART_ALL);
    1703                 :          31 :         InvalidatePublicationRels(schemaRels);
    1704                 :             : 
    1705                 :          31 :         CatalogTupleDelete(rel, &tup->t_self);
    1706                 :             : 
    1707                 :          31 :         ReleaseSysCache(tup);
    1708                 :             : 
    1709                 :          31 :         table_close(rel, RowExclusiveLock);
    1710                 :          31 : }
    1711                 :             : 
    1712                 :             : /*
    1713                 :             :  * Open relations specified by a PublicationTable list.
    1714                 :             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1715                 :             :  * add them to a publication.
    1716                 :             :  */
    1717                 :             : static List *
    1718                 :         172 : OpenTableList(List *tables)
    1719                 :             : {
    1720                 :         172 :         List       *relids = NIL;
    1721                 :         172 :         List       *rels = NIL;
    1722                 :         172 :         ListCell   *lc;
    1723                 :         172 :         List       *relids_with_rf = NIL;
    1724                 :         172 :         List       *relids_with_collist = NIL;
    1725                 :             : 
    1726                 :             :         /*
    1727                 :             :          * Open, share-lock, and check all the explicitly-specified relations
    1728                 :             :          */
    1729   [ +  +  +  +  :         346 :         foreach(lc, tables)
                   +  + ]
    1730                 :             :         {
    1731                 :         178 :                 PublicationTable *t = lfirst_node(PublicationTable, lc);
    1732                 :         178 :                 bool            recurse = t->relation->inh;
    1733                 :         178 :                 Relation        rel;
    1734                 :         178 :                 Oid                     myrelid;
    1735                 :         178 :                 PublicationRelInfo *pub_rel;
    1736                 :             : 
    1737                 :             :                 /* Allow query cancel in case this takes a long time */
    1738         [ +  - ]:         178 :                 CHECK_FOR_INTERRUPTS();
    1739                 :             : 
    1740                 :         178 :                 rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1741                 :         178 :                 myrelid = RelationGetRelid(rel);
    1742                 :             : 
    1743                 :             :                 /*
    1744                 :             :                  * Filter out duplicates if user specifies "foo, foo".
    1745                 :             :                  *
    1746                 :             :                  * Note that this algorithm is known to not be very efficient (O(N^2))
    1747                 :             :                  * but given that it only works on list of tables given to us by user
    1748                 :             :                  * it's deemed acceptable.
    1749                 :             :                  */
    1750         [ +  + ]:         178 :                 if (list_member_oid(relids, myrelid))
    1751                 :             :                 {
    1752                 :             :                         /* Disallow duplicate tables if there are any with row filters. */
    1753         [ +  + ]:           4 :                         if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1754   [ +  -  +  - ]:           2 :                                 ereport(ERROR,
    1755                 :             :                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1756                 :             :                                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1757                 :             :                                                                 RelationGetRelationName(rel))));
    1758                 :             : 
    1759                 :             :                         /* Disallow duplicate tables if there are any with column lists. */
    1760         [ -  + ]:           2 :                         if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1761   [ +  -  +  - ]:           2 :                                 ereport(ERROR,
    1762                 :             :                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1763                 :             :                                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1764                 :             :                                                                 RelationGetRelationName(rel))));
    1765                 :             : 
    1766                 :           0 :                         table_close(rel, ShareUpdateExclusiveLock);
    1767                 :           0 :                         continue;
    1768                 :             :                 }
    1769                 :             : 
    1770                 :         174 :                 pub_rel = palloc_object(PublicationRelInfo);
    1771                 :         174 :                 pub_rel->relation = rel;
    1772                 :         174 :                 pub_rel->whereClause = t->whereClause;
    1773                 :         174 :                 pub_rel->columns = t->columns;
    1774                 :         174 :                 rels = lappend(rels, pub_rel);
    1775                 :         174 :                 relids = lappend_oid(relids, myrelid);
    1776                 :             : 
    1777         [ +  + ]:         174 :                 if (t->whereClause)
    1778                 :          58 :                         relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1779                 :             : 
    1780         [ +  + ]:         174 :                 if (t->columns)
    1781                 :          55 :                         relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1782                 :             : 
    1783                 :             :                 /*
    1784                 :             :                  * Add children of this rel, if requested, so that they too are added
    1785                 :             :                  * to the publication.  A partitioned table can't have any inheritance
    1786                 :             :                  * children other than its partitions, which need not be explicitly
    1787                 :             :                  * added to the publication.
    1788                 :             :                  */
    1789   [ +  +  +  + ]:         174 :                 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1790                 :             :                 {
    1791                 :         145 :                         List       *children;
    1792                 :         145 :                         ListCell   *child;
    1793                 :             : 
    1794                 :         145 :                         children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1795                 :             :                                                                                    NULL);
    1796                 :             : 
    1797   [ +  -  +  +  :         291 :                         foreach(child, children)
                   +  + ]
    1798                 :             :                         {
    1799                 :         146 :                                 Oid                     childrelid = lfirst_oid(child);
    1800                 :             : 
    1801                 :             :                                 /* Allow query cancel in case this takes a long time */
    1802         [ +  - ]:         146 :                                 CHECK_FOR_INTERRUPTS();
    1803                 :             : 
    1804                 :             :                                 /*
    1805                 :             :                                  * Skip duplicates if user specified both parent and child
    1806                 :             :                                  * tables.
    1807                 :             :                                  */
    1808         [ +  + ]:         146 :                                 if (list_member_oid(relids, childrelid))
    1809                 :             :                                 {
    1810                 :             :                                         /*
    1811                 :             :                                          * We don't allow to specify row filter for both parent
    1812                 :             :                                          * and child table at the same time as it is not very
    1813                 :             :                                          * clear which one should be given preference.
    1814                 :             :                                          */
    1815         [ +  - ]:         145 :                                         if (childrelid != myrelid &&
    1816         [ #  # ]:           0 :                                                 (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1817   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1818                 :             :                                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1819                 :             :                                                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1820                 :             :                                                                                 RelationGetRelationName(rel))));
    1821                 :             : 
    1822                 :             :                                         /*
    1823                 :             :                                          * We don't allow to specify column list for both parent
    1824                 :             :                                          * and child table at the same time as it is not very
    1825                 :             :                                          * clear which one should be given preference.
    1826                 :             :                                          */
    1827         [ +  - ]:         145 :                                         if (childrelid != myrelid &&
    1828         [ #  # ]:           0 :                                                 (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1829   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1830                 :             :                                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1831                 :             :                                                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1832                 :             :                                                                                 RelationGetRelationName(rel))));
    1833                 :             : 
    1834                 :         145 :                                         continue;
    1835                 :             :                                 }
    1836                 :             : 
    1837                 :             :                                 /* find_all_inheritors already got lock */
    1838                 :           1 :                                 rel = table_open(childrelid, NoLock);
    1839                 :           1 :                                 pub_rel = palloc_object(PublicationRelInfo);
    1840                 :           1 :                                 pub_rel->relation = rel;
    1841                 :             :                                 /* child inherits WHERE clause from parent */
    1842                 :           1 :                                 pub_rel->whereClause = t->whereClause;
    1843                 :             : 
    1844                 :             :                                 /* child inherits column list from parent */
    1845                 :           1 :                                 pub_rel->columns = t->columns;
    1846                 :           1 :                                 rels = lappend(rels, pub_rel);
    1847                 :           1 :                                 relids = lappend_oid(relids, childrelid);
    1848                 :             : 
    1849         [ +  - ]:           1 :                                 if (t->whereClause)
    1850                 :           0 :                                         relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1851                 :             : 
    1852         [ +  - ]:           1 :                                 if (t->columns)
    1853                 :           0 :                                         relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1854         [ +  + ]:         146 :                         }
    1855                 :         145 :                 }
    1856         [ -  + ]:         174 :         }
    1857                 :             : 
    1858                 :         168 :         list_free(relids);
    1859                 :         168 :         list_free(relids_with_rf);
    1860                 :             : 
    1861                 :         336 :         return rels;
    1862                 :         168 : }
    1863                 :             : 
    1864                 :             : /*
    1865                 :             :  * Close all relations in the list.
    1866                 :             :  */
    1867                 :             : static void
    1868                 :         202 : CloseTableList(List *rels)
    1869                 :             : {
    1870                 :         202 :         ListCell   *lc;
    1871                 :             : 
    1872   [ +  +  +  +  :         403 :         foreach(lc, rels)
                   +  + ]
    1873                 :             :         {
    1874                 :         201 :                 PublicationRelInfo *pub_rel;
    1875                 :             : 
    1876                 :         201 :                 pub_rel = (PublicationRelInfo *) lfirst(lc);
    1877                 :         201 :                 table_close(pub_rel->relation, NoLock);
    1878                 :         201 :         }
    1879                 :             : 
    1880                 :         202 :         list_free_deep(rels);
    1881                 :         202 : }
    1882                 :             : 
    1883                 :             : /*
    1884                 :             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1885                 :             :  * order to prevent concurrent schema deletion.
    1886                 :             :  */
    1887                 :             : static void
    1888                 :         166 : LockSchemaList(List *schemalist)
    1889                 :             : {
    1890                 :         166 :         ListCell   *lc;
    1891                 :             : 
    1892   [ +  +  +  +  :         216 :         foreach(lc, schemalist)
                   +  + ]
    1893                 :             :         {
    1894                 :          50 :                 Oid                     schemaid = lfirst_oid(lc);
    1895                 :             : 
    1896                 :             :                 /* Allow query cancel in case this takes a long time */
    1897         [ +  - ]:          50 :                 CHECK_FOR_INTERRUPTS();
    1898                 :          50 :                 LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1899                 :             : 
    1900                 :             :                 /*
    1901                 :             :                  * It is possible that by the time we acquire the lock on schema,
    1902                 :             :                  * concurrent DDL has removed it. We can test this by checking the
    1903                 :             :                  * existence of schema.
    1904                 :             :                  */
    1905         [ +  - ]:          50 :                 if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1906   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1907                 :             :                                         errcode(ERRCODE_UNDEFINED_SCHEMA),
    1908                 :             :                                         errmsg("schema with OID %u does not exist", schemaid));
    1909                 :          50 :         }
    1910                 :         166 : }
    1911                 :             : 
    1912                 :             : /*
    1913                 :             :  * Add listed tables to the publication.
    1914                 :             :  */
    1915                 :             : static void
    1916                 :         123 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1917                 :             :                                          AlterPublicationStmt *stmt)
    1918                 :             : {
    1919                 :         123 :         ListCell   *lc;
    1920                 :             : 
    1921   [ +  +  +  +  :         260 :         foreach(lc, rels)
                   +  + ]
    1922                 :             :         {
    1923                 :         137 :                 PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1924                 :         137 :                 Relation        rel = pub_rel->relation;
    1925                 :         137 :                 ObjectAddress obj;
    1926                 :             : 
    1927                 :             :                 /* Must be owner of the table or superuser. */
    1928         [ +  + ]:         137 :                 if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1929                 :           2 :                         aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1930                 :           1 :                                                    RelationGetRelationName(rel));
    1931                 :             : 
    1932                 :         137 :                 obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1933         [ +  + ]:         137 :                 if (stmt)
    1934                 :             :                 {
    1935                 :          88 :                         EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1936                 :          88 :                                                                                          (Node *) stmt);
    1937                 :             : 
    1938         [ +  - ]:          88 :                         InvokeObjectPostCreateHook(PublicationRelRelationId,
    1939                 :             :                                                                            obj.objectId, 0);
    1940                 :          88 :                 }
    1941                 :         137 :         }
    1942                 :         123 : }
    1943                 :             : 
    1944                 :             : /*
    1945                 :             :  * Remove listed tables from the publication.
    1946                 :             :  */
    1947                 :             : static void
    1948                 :          82 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1949                 :             : {
    1950                 :          82 :         ObjectAddress obj;
    1951                 :          82 :         ListCell   *lc;
    1952                 :          82 :         Oid                     prid;
    1953                 :             : 
    1954   [ +  +  +  +  :         157 :         foreach(lc, rels)
                   +  + ]
    1955                 :             :         {
    1956                 :          78 :                 PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1957                 :          78 :                 Relation        rel = pubrel->relation;
    1958                 :          78 :                 Oid                     relid = RelationGetRelid(rel);
    1959                 :             : 
    1960         [ +  - ]:          78 :                 if (pubrel->columns)
    1961   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1962                 :             :                                         errcode(ERRCODE_SYNTAX_ERROR),
    1963                 :             :                                         errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1964                 :             : 
    1965                 :          78 :                 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1966                 :             :                                                            ObjectIdGetDatum(relid),
    1967                 :             :                                                            ObjectIdGetDatum(pubid));
    1968         [ +  + ]:          78 :                 if (!OidIsValid(prid))
    1969                 :             :                 {
    1970         [ -  + ]:           2 :                         if (missing_ok)
    1971                 :           0 :                                 continue;
    1972                 :             : 
    1973   [ +  -  +  - ]:           2 :                         ereport(ERROR,
    1974                 :             :                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
    1975                 :             :                                          errmsg("relation \"%s\" is not part of the publication",
    1976                 :             :                                                         RelationGetRelationName(rel))));
    1977                 :           0 :                 }
    1978                 :             : 
    1979         [ +  + ]:          76 :                 if (pubrel->whereClause)
    1980   [ +  -  +  - ]:           1 :                         ereport(ERROR,
    1981                 :             :                                         (errcode(ERRCODE_SYNTAX_ERROR),
    1982                 :             :                                          errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1983                 :             : 
    1984                 :          75 :                 ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1985                 :          75 :                 performDeletion(&obj, DROP_CASCADE, 0);
    1986      [ -  -  + ]:          75 :         }
    1987                 :          79 : }
    1988                 :             : 
    1989                 :             : /*
    1990                 :             :  * Add listed schemas to the publication.
    1991                 :             :  */
    1992                 :             : static void
    1993                 :          91 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1994                 :             :                                           AlterPublicationStmt *stmt)
    1995                 :             : {
    1996                 :          91 :         ListCell   *lc;
    1997                 :             : 
    1998   [ +  +  +  +  :         131 :         foreach(lc, schemas)
                   +  + ]
    1999                 :             :         {
    2000                 :          40 :                 Oid                     schemaid = lfirst_oid(lc);
    2001                 :          40 :                 ObjectAddress obj;
    2002                 :             : 
    2003                 :          40 :                 obj = publication_add_schema(pubid, schemaid, if_not_exists);
    2004         [ +  + ]:          40 :                 if (stmt)
    2005                 :             :                 {
    2006                 :           9 :                         EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    2007                 :           9 :                                                                                          (Node *) stmt);
    2008                 :             : 
    2009         [ +  - ]:           9 :                         InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    2010                 :             :                                                                            obj.objectId, 0);
    2011                 :           9 :                 }
    2012                 :          40 :         }
    2013                 :          91 : }
    2014                 :             : 
    2015                 :             : /*
    2016                 :             :  * Remove listed schemas from the publication.
    2017                 :             :  */
    2018                 :             : static void
    2019                 :          72 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    2020                 :             : {
    2021                 :          72 :         ObjectAddress obj;
    2022                 :          72 :         ListCell   *lc;
    2023                 :          72 :         Oid                     psid;
    2024                 :             : 
    2025   [ +  +  +  +  :          80 :         foreach(lc, schemas)
                   +  + ]
    2026                 :             :         {
    2027                 :           9 :                 Oid                     schemaid = lfirst_oid(lc);
    2028                 :             : 
    2029                 :           9 :                 psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    2030                 :             :                                                            Anum_pg_publication_namespace_oid,
    2031                 :             :                                                            ObjectIdGetDatum(schemaid),
    2032                 :             :                                                            ObjectIdGetDatum(pubid));
    2033         [ +  + ]:           9 :                 if (!OidIsValid(psid))
    2034                 :             :                 {
    2035         [ -  + ]:           1 :                         if (missing_ok)
    2036                 :           0 :                                 continue;
    2037                 :             : 
    2038   [ +  -  +  - ]:           1 :                         ereport(ERROR,
    2039                 :             :                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
    2040                 :             :                                          errmsg("tables from schema \"%s\" are not part of the publication",
    2041                 :             :                                                         get_namespace_name(schemaid))));
    2042                 :           0 :                 }
    2043                 :             : 
    2044                 :           8 :                 ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    2045                 :           8 :                 performDeletion(&obj, DROP_CASCADE, 0);
    2046      [ -  -  + ]:           8 :         }
    2047                 :          71 : }
    2048                 :             : 
    2049                 :             : /*
    2050                 :             :  * Internal workhorse for changing a publication owner
    2051                 :             :  */
    2052                 :             : static void
    2053                 :           4 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2054                 :             : {
    2055                 :           4 :         Form_pg_publication form;
    2056                 :             : 
    2057                 :           4 :         form = (Form_pg_publication) GETSTRUCT(tup);
    2058                 :             : 
    2059         [ -  + ]:           4 :         if (form->pubowner == newOwnerId)
    2060                 :           0 :                 return;
    2061                 :             : 
    2062         [ +  + ]:           4 :         if (!superuser())
    2063                 :             :         {
    2064                 :           2 :                 AclResult       aclresult;
    2065                 :             : 
    2066                 :             :                 /* Must be owner */
    2067         [ +  - ]:           2 :                 if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    2068                 :           0 :                         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    2069                 :           0 :                                                    NameStr(form->pubname));
    2070                 :             : 
    2071                 :             :                 /* Must be able to become new owner */
    2072                 :           2 :                 check_can_set_role(GetUserId(), newOwnerId);
    2073                 :             : 
    2074                 :             :                 /* New owner must have CREATE privilege on database */
    2075                 :           2 :                 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    2076         [ +  - ]:           2 :                 if (aclresult != ACLCHECK_OK)
    2077                 :           0 :                         aclcheck_error(aclresult, OBJECT_DATABASE,
    2078                 :           0 :                                                    get_database_name(MyDatabaseId));
    2079                 :             : 
    2080         [ +  + ]:           2 :                 if (!superuser_arg(newOwnerId))
    2081                 :             :                 {
    2082         [ -  + ]:           1 :                         if (form->puballtables || form->puballsequences ||
    2083                 :           0 :                                 is_schema_publication(form->oid))
    2084   [ +  -  +  - ]:           1 :                                 ereport(ERROR,
    2085                 :             :                                                 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2086                 :             :                                                 errmsg("permission denied to change owner of publication \"%s\"",
    2087                 :             :                                                            NameStr(form->pubname)),
    2088                 :             :                                                 errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
    2089                 :           0 :                 }
    2090                 :           1 :         }
    2091                 :             : 
    2092                 :           3 :         form->pubowner = newOwnerId;
    2093                 :           3 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    2094                 :             : 
    2095                 :             :         /* Update owner dependency reference */
    2096                 :           3 :         changeDependencyOnOwner(PublicationRelationId,
    2097                 :           3 :                                                         form->oid,
    2098                 :           3 :                                                         newOwnerId);
    2099                 :             : 
    2100         [ +  - ]:           3 :         InvokeObjectPostAlterHook(PublicationRelationId,
    2101                 :             :                                                           form->oid, 0);
    2102         [ -  + ]:           3 : }
    2103                 :             : 
    2104                 :             : /*
    2105                 :             :  * Change publication owner -- by name
    2106                 :             :  */
    2107                 :             : ObjectAddress
    2108                 :           3 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    2109                 :             : {
    2110                 :           3 :         Oid                     pubid;
    2111                 :           3 :         HeapTuple       tup;
    2112                 :           3 :         Relation        rel;
    2113                 :             :         ObjectAddress address;
    2114                 :           3 :         Form_pg_publication pubform;
    2115                 :             : 
    2116                 :           3 :         rel = table_open(PublicationRelationId, RowExclusiveLock);
    2117                 :             : 
    2118                 :           3 :         tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2119                 :             : 
    2120         [ +  - ]:           3 :         if (!HeapTupleIsValid(tup))
    2121   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2122                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2123                 :             :                                  errmsg("publication \"%s\" does not exist", name)));
    2124                 :             : 
    2125                 :           3 :         pubform = (Form_pg_publication) GETSTRUCT(tup);
    2126                 :           3 :         pubid = pubform->oid;
    2127                 :             : 
    2128                 :           3 :         AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2129                 :             : 
    2130                 :           3 :         ObjectAddressSet(address, PublicationRelationId, pubid);
    2131                 :             : 
    2132                 :           3 :         heap_freetuple(tup);
    2133                 :             : 
    2134                 :           3 :         table_close(rel, RowExclusiveLock);
    2135                 :             : 
    2136                 :             :         return address;
    2137                 :           3 : }
    2138                 :             : 
    2139                 :             : /*
    2140                 :             :  * Change publication owner -- by OID
    2141                 :             :  */
    2142                 :             : void
    2143                 :           0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
    2144                 :             : {
    2145                 :           0 :         HeapTuple       tup;
    2146                 :           0 :         Relation        rel;
    2147                 :             : 
    2148                 :           0 :         rel = table_open(PublicationRelationId, RowExclusiveLock);
    2149                 :             : 
    2150                 :           0 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    2151                 :             : 
    2152         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
    2153   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2154                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2155                 :             :                                  errmsg("publication with OID %u does not exist", pubid)));
    2156                 :             : 
    2157                 :           0 :         AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2158                 :             : 
    2159                 :           0 :         heap_freetuple(tup);
    2160                 :             : 
    2161                 :           0 :         table_close(rel, RowExclusiveLock);
    2162                 :           0 : }
    2163                 :             : 
    2164                 :             : /*
    2165                 :             :  * Extract the publish_generated_columns option value from a DefElem. "stored"
    2166                 :             :  * and "none" values are accepted.
    2167                 :             :  */
    2168                 :             : static char
    2169                 :          11 : defGetGeneratedColsOption(DefElem *def)
    2170                 :             : {
    2171                 :          11 :         char       *sval = "";
    2172                 :             : 
    2173                 :             :         /*
    2174                 :             :          * A parameter value is required.
    2175                 :             :          */
    2176         [ +  + ]:          11 :         if (def->arg)
    2177                 :             :         {
    2178                 :          10 :                 sval = defGetString(def);
    2179                 :             : 
    2180         [ +  + ]:          10 :                 if (pg_strcasecmp(sval, "none") == 0)
    2181                 :           3 :                         return PUBLISH_GENCOLS_NONE;
    2182         [ +  + ]:           7 :                 if (pg_strcasecmp(sval, "stored") == 0)
    2183                 :           6 :                         return PUBLISH_GENCOLS_STORED;
    2184                 :           1 :         }
    2185                 :             : 
    2186   [ +  -  +  - ]:           2 :         ereport(ERROR,
    2187                 :             :                         errcode(ERRCODE_SYNTAX_ERROR),
    2188                 :             :                         errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
    2189                 :             :                         errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
    2190                 :             : 
    2191                 :           0 :         return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
    2192                 :           9 : }
        

Generated by: LCOV version 2.3.2-1