LCOV - code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 47.9 % 1283 614
Test Date: 2026-01-26 10:56:24 Functions: 34.8 % 23 8
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 34.0 % 1085 369

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * subscriptioncmds.c
       4                 :             :  *              subscription catalog manipulation functions
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *              src/backend/commands/subscriptioncmds.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/commit_ts.h"
      18                 :             : #include "access/htup_details.h"
      19                 :             : #include "access/table.h"
      20                 :             : #include "access/twophase.h"
      21                 :             : #include "access/xact.h"
      22                 :             : #include "catalog/catalog.h"
      23                 :             : #include "catalog/dependency.h"
      24                 :             : #include "catalog/indexing.h"
      25                 :             : #include "catalog/namespace.h"
      26                 :             : #include "catalog/objectaccess.h"
      27                 :             : #include "catalog/objectaddress.h"
      28                 :             : #include "catalog/pg_authid_d.h"
      29                 :             : #include "catalog/pg_database_d.h"
      30                 :             : #include "catalog/pg_subscription.h"
      31                 :             : #include "catalog/pg_subscription_rel.h"
      32                 :             : #include "catalog/pg_type.h"
      33                 :             : #include "commands/defrem.h"
      34                 :             : #include "commands/event_trigger.h"
      35                 :             : #include "commands/subscriptioncmds.h"
      36                 :             : #include "executor/executor.h"
      37                 :             : #include "miscadmin.h"
      38                 :             : #include "nodes/makefuncs.h"
      39                 :             : #include "pgstat.h"
      40                 :             : #include "replication/logicallauncher.h"
      41                 :             : #include "replication/logicalworker.h"
      42                 :             : #include "replication/origin.h"
      43                 :             : #include "replication/slot.h"
      44                 :             : #include "replication/walreceiver.h"
      45                 :             : #include "replication/walsender.h"
      46                 :             : #include "replication/worker_internal.h"
      47                 :             : #include "storage/lmgr.h"
      48                 :             : #include "utils/acl.h"
      49                 :             : #include "utils/builtins.h"
      50                 :             : #include "utils/guc.h"
      51                 :             : #include "utils/lsyscache.h"
      52                 :             : #include "utils/memutils.h"
      53                 :             : #include "utils/pg_lsn.h"
      54                 :             : #include "utils/syscache.h"
      55                 :             : 
      56                 :             : /*
      57                 :             :  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
      58                 :             :  * command.
      59                 :             :  */
      60                 :             : #define SUBOPT_CONNECT                          0x00000001
      61                 :             : #define SUBOPT_ENABLED                          0x00000002
      62                 :             : #define SUBOPT_CREATE_SLOT                      0x00000004
      63                 :             : #define SUBOPT_SLOT_NAME                        0x00000008
      64                 :             : #define SUBOPT_COPY_DATA                        0x00000010
      65                 :             : #define SUBOPT_SYNCHRONOUS_COMMIT       0x00000020
      66                 :             : #define SUBOPT_REFRESH                          0x00000040
      67                 :             : #define SUBOPT_BINARY                           0x00000080
      68                 :             : #define SUBOPT_STREAMING                        0x00000100
      69                 :             : #define SUBOPT_TWOPHASE_COMMIT          0x00000200
      70                 :             : #define SUBOPT_DISABLE_ON_ERR           0x00000400
      71                 :             : #define SUBOPT_PASSWORD_REQUIRED        0x00000800
      72                 :             : #define SUBOPT_RUN_AS_OWNER                     0x00001000
      73                 :             : #define SUBOPT_FAILOVER                         0x00002000
      74                 :             : #define SUBOPT_RETAIN_DEAD_TUPLES       0x00004000
      75                 :             : #define SUBOPT_MAX_RETENTION_DURATION   0x00008000
      76                 :             : #define SUBOPT_LSN                                      0x00010000
      77                 :             : #define SUBOPT_ORIGIN                           0x00020000
      78                 :             : 
      79                 :             : /* check if the 'val' has 'bits' set */
      80                 :             : #define IsSet(val, bits)  (((val) & (bits)) == (bits))
      81                 :             : 
      82                 :             : /*
      83                 :             :  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
      84                 :             :  * SUBSCRIPTION command options and the parsed/default values of each of them.
      85                 :             :  */
      86                 :             : typedef struct SubOpts
      87                 :             : {
      88                 :             :         bits32          specified_opts;
      89                 :             :         char       *slot_name;
      90                 :             :         char       *synchronous_commit;
      91                 :             :         bool            connect;
      92                 :             :         bool            enabled;
      93                 :             :         bool            create_slot;
      94                 :             :         bool            copy_data;
      95                 :             :         bool            refresh;
      96                 :             :         bool            binary;
      97                 :             :         char            streaming;
      98                 :             :         bool            twophase;
      99                 :             :         bool            disableonerr;
     100                 :             :         bool            passwordrequired;
     101                 :             :         bool            runasowner;
     102                 :             :         bool            failover;
     103                 :             :         bool            retaindeadtuples;
     104                 :             :         int32           maxretention;
     105                 :             :         char       *origin;
     106                 :             :         XLogRecPtr      lsn;
     107                 :             : } SubOpts;
     108                 :             : 
     109                 :             : /*
     110                 :             :  * PublicationRelKind represents a relation included in a publication.
     111                 :             :  * It stores the schema-qualified relation name (rv) and its kind (relkind).
     112                 :             :  */
     113                 :             : typedef struct PublicationRelKind
     114                 :             : {
     115                 :             :         RangeVar   *rv;
     116                 :             :         char            relkind;
     117                 :             : } PublicationRelKind;
     118                 :             : 
     119                 :             : static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
     120                 :             : static void check_publications_origin_tables(WalReceiverConn *wrconn,
     121                 :             :                                                                                          List *publications, bool copydata,
     122                 :             :                                                                                          bool retain_dead_tuples,
     123                 :             :                                                                                          char *origin,
     124                 :             :                                                                                          Oid *subrel_local_oids,
     125                 :             :                                                                                          int subrel_count, char *subname);
     126                 :             : static void check_publications_origin_sequences(WalReceiverConn *wrconn,
     127                 :             :                                                                                                 List *publications,
     128                 :             :                                                                                                 bool copydata, char *origin,
     129                 :             :                                                                                                 Oid *subrel_local_oids,
     130                 :             :                                                                                                 int subrel_count,
     131                 :             :                                                                                                 char *subname);
     132                 :             : static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
     133                 :             : static void check_duplicates_in_publist(List *publist, Datum *datums);
     134                 :             : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
     135                 :             : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
     136                 :             : static void CheckAlterSubOption(Subscription *sub, const char *option,
     137                 :             :                                                                 bool slot_needs_update, bool isTopLevel);
     138                 :             : 
     139                 :             : 
     140                 :             : /*
     141                 :             :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
     142                 :             :  *
     143                 :             :  * Since not all options can be specified in both commands, this function
     144                 :             :  * will report an error if mutually exclusive options are specified.
     145                 :             :  */
     146                 :             : static void
     147                 :          85 : parse_subscription_options(ParseState *pstate, List *stmt_options,
     148                 :             :                                                    bits32 supported_opts, SubOpts *opts)
     149                 :             : {
     150                 :          85 :         ListCell   *lc;
     151                 :             : 
     152                 :             :         /* Start out with cleared opts. */
     153                 :          85 :         memset(opts, 0, sizeof(SubOpts));
     154                 :             : 
     155                 :             :         /* caller must expect some option */
     156         [ +  - ]:          85 :         Assert(supported_opts != 0);
     157                 :             : 
     158                 :             :         /* If connect option is supported, these others also need to be. */
     159   [ +  +  +  - ]:          85 :         Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
     160                 :             :                    IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     161                 :             :                                  SUBOPT_COPY_DATA));
     162                 :             : 
     163                 :             :         /* Set default values for the supported options. */
     164         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_CONNECT))
     165                 :          38 :                 opts->connect = true;
     166         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_ENABLED))
     167                 :          43 :                 opts->enabled = true;
     168         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
     169                 :          38 :                 opts->create_slot = true;
     170         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_COPY_DATA))
     171                 :          50 :                 opts->copy_data = true;
     172         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_REFRESH))
     173                 :          11 :                 opts->refresh = true;
     174         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_BINARY))
     175                 :          65 :                 opts->binary = false;
     176         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_STREAMING))
     177                 :          65 :                 opts->streaming = LOGICALREP_STREAM_PARALLEL;
     178         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     179                 :          65 :                 opts->twophase = false;
     180         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
     181                 :          65 :                 opts->disableonerr = false;
     182         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
     183                 :          65 :                 opts->passwordrequired = true;
     184         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
     185                 :          65 :                 opts->runasowner = false;
     186         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_FAILOVER))
     187                 :          65 :                 opts->failover = false;
     188         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     189                 :          65 :                 opts->retaindeadtuples = false;
     190         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
     191                 :          65 :                 opts->maxretention = 0;
     192         [ +  + ]:          85 :         if (IsSet(supported_opts, SUBOPT_ORIGIN))
     193                 :          65 :                 opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
     194                 :             : 
     195                 :             :         /* Parse options */
     196   [ +  +  +  +  :         194 :         foreach(lc, stmt_options)
                   +  + ]
     197                 :             :         {
     198                 :         112 :                 DefElem    *defel = (DefElem *) lfirst(lc);
     199                 :             : 
     200   [ +  +  +  + ]:         112 :                 if (IsSet(supported_opts, SUBOPT_CONNECT) &&
     201                 :          66 :                         strcmp(defel->defname, "connect") == 0)
     202                 :             :                 {
     203         [ -  + ]:          29 :                         if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
     204                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     205                 :             : 
     206                 :          29 :                         opts->specified_opts |= SUBOPT_CONNECT;
     207                 :          29 :                         opts->connect = defGetBoolean(defel);
     208                 :          29 :                 }
     209   [ +  +  +  + ]:          83 :                 else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
     210                 :          42 :                                  strcmp(defel->defname, "enabled") == 0)
     211                 :             :                 {
     212         [ -  + ]:           9 :                         if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     213                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     214                 :             : 
     215                 :           9 :                         opts->specified_opts |= SUBOPT_ENABLED;
     216                 :           9 :                         opts->enabled = defGetBoolean(defel);
     217                 :           9 :                 }
     218   [ +  +  +  + ]:          74 :                 else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
     219                 :          33 :                                  strcmp(defel->defname, "create_slot") == 0)
     220                 :             :                 {
     221         [ +  - ]:           5 :                         if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     222                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     223                 :             : 
     224                 :           5 :                         opts->specified_opts |= SUBOPT_CREATE_SLOT;
     225                 :           5 :                         opts->create_slot = defGetBoolean(defel);
     226                 :           5 :                 }
     227   [ +  +  +  + ]:          69 :                 else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
     228                 :          55 :                                  strcmp(defel->defname, "slot_name") == 0)
     229                 :             :                 {
     230         [ -  + ]:          21 :                         if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     231                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     232                 :             : 
     233                 :          21 :                         opts->specified_opts |= SUBOPT_SLOT_NAME;
     234                 :          21 :                         opts->slot_name = defGetString(defel);
     235                 :             : 
     236                 :             :                         /* Setting slot_name = NONE is treated as no slot name. */
     237         [ +  + ]:          21 :                         if (strcmp(opts->slot_name, "none") == 0)
     238                 :          19 :                                 opts->slot_name = NULL;
     239                 :             :                         else
     240                 :           2 :                                 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
     241                 :          21 :                 }
     242   [ +  +  +  + ]:          48 :                 else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
     243                 :          30 :                                  strcmp(defel->defname, "copy_data") == 0)
     244                 :             :                 {
     245         [ -  + ]:           2 :                         if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     246                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     247                 :             : 
     248                 :           2 :                         opts->specified_opts |= SUBOPT_COPY_DATA;
     249                 :           2 :                         opts->copy_data = defGetBoolean(defel);
     250                 :           2 :                 }
     251   [ +  +  +  + ]:          46 :                 else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
     252                 :          32 :                                  strcmp(defel->defname, "synchronous_commit") == 0)
     253                 :             :                 {
     254         [ -  + ]:           2 :                         if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
     255                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     256                 :             : 
     257                 :           2 :                         opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
     258                 :           2 :                         opts->synchronous_commit = defGetString(defel);
     259                 :             : 
     260                 :             :                         /* Test if the given value is valid for synchronous_commit GUC. */
     261                 :           2 :                         (void) set_config_option("synchronous_commit", opts->synchronous_commit,
     262                 :             :                                                                          PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     263                 :             :                                                                          false, 0, false);
     264                 :           2 :                 }
     265   [ +  +  -  + ]:          44 :                 else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
     266                 :          11 :                                  strcmp(defel->defname, "refresh") == 0)
     267                 :             :                 {
     268         [ +  - ]:          11 :                         if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
     269                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     270                 :             : 
     271                 :          11 :                         opts->specified_opts |= SUBOPT_REFRESH;
     272                 :          11 :                         opts->refresh = defGetBoolean(defel);
     273                 :          11 :                 }
     274   [ +  +  +  + ]:          33 :                 else if (IsSet(supported_opts, SUBOPT_BINARY) &&
     275                 :          30 :                                  strcmp(defel->defname, "binary") == 0)
     276                 :             :                 {
     277         [ +  - ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_BINARY))
     278                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     279                 :             : 
     280                 :           3 :                         opts->specified_opts |= SUBOPT_BINARY;
     281                 :           3 :                         opts->binary = defGetBoolean(defel);
     282                 :           3 :                 }
     283   [ +  +  +  + ]:          30 :                 else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
     284                 :          27 :                                  strcmp(defel->defname, "streaming") == 0)
     285                 :             :                 {
     286         [ +  - ]:           6 :                         if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
     287                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     288                 :             : 
     289                 :           6 :                         opts->specified_opts |= SUBOPT_STREAMING;
     290                 :           6 :                         opts->streaming = defGetStreamingMode(defel);
     291                 :           6 :                 }
     292   [ +  +  +  + ]:          24 :                 else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
     293                 :          21 :                                  strcmp(defel->defname, "two_phase") == 0)
     294                 :             :                 {
     295         [ +  - ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
     296                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     297                 :             : 
     298                 :           3 :                         opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
     299                 :           3 :                         opts->twophase = defGetBoolean(defel);
     300                 :           3 :                 }
     301   [ +  +  +  + ]:          21 :                 else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
     302                 :          18 :                                  strcmp(defel->defname, "disable_on_error") == 0)
     303                 :             :                 {
     304         [ +  - ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
     305                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     306                 :             : 
     307                 :           3 :                         opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
     308                 :           3 :                         opts->disableonerr = defGetBoolean(defel);
     309                 :           3 :                 }
     310   [ +  +  +  + ]:          18 :                 else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
     311                 :          15 :                                  strcmp(defel->defname, "password_required") == 0)
     312                 :             :                 {
     313         [ +  - ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
     314                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     315                 :             : 
     316                 :           3 :                         opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
     317                 :           3 :                         opts->passwordrequired = defGetBoolean(defel);
     318                 :           3 :                 }
     319   [ +  +  +  + ]:          15 :                 else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
     320                 :          12 :                                  strcmp(defel->defname, "run_as_owner") == 0)
     321                 :             :                 {
     322         [ -  + ]:           2 :                         if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
     323                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     324                 :             : 
     325                 :           2 :                         opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
     326                 :           2 :                         opts->runasowner = defGetBoolean(defel);
     327                 :           2 :                 }
     328   [ +  +  +  + ]:          13 :                 else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
     329                 :          10 :                                  strcmp(defel->defname, "failover") == 0)
     330                 :             :                 {
     331         [ +  - ]:           1 :                         if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
     332                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     333                 :             : 
     334                 :           1 :                         opts->specified_opts |= SUBOPT_FAILOVER;
     335                 :           1 :                         opts->failover = defGetBoolean(defel);
     336                 :           1 :                 }
     337   [ +  +  +  + ]:          12 :                 else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
     338                 :           9 :                                  strcmp(defel->defname, "retain_dead_tuples") == 0)
     339                 :             :                 {
     340         [ +  - ]:           2 :                         if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
     341                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     342                 :             : 
     343                 :           2 :                         opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
     344                 :           2 :                         opts->retaindeadtuples = defGetBoolean(defel);
     345                 :           2 :                 }
     346   [ +  +  +  + ]:          10 :                 else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
     347                 :           7 :                                  strcmp(defel->defname, "max_retention_duration") == 0)
     348                 :             :                 {
     349         [ +  - ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
     350                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     351                 :             : 
     352                 :           3 :                         opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
     353                 :           3 :                         opts->maxretention = defGetInt32(defel);
     354                 :           3 :                 }
     355   [ +  +  +  + ]:           7 :                 else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
     356                 :           4 :                                  strcmp(defel->defname, "origin") == 0)
     357                 :             :                 {
     358         [ -  + ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
     359                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     360                 :             : 
     361                 :           3 :                         opts->specified_opts |= SUBOPT_ORIGIN;
     362                 :           3 :                         pfree(opts->origin);
     363                 :             : 
     364                 :             :                         /*
     365                 :             :                          * Even though the "origin" parameter allows only "none" and "any"
     366                 :             :                          * values, it is implemented as a string type so that the
     367                 :             :                          * parameter can be extended in future versions to support
     368                 :             :                          * filtering using origin names specified by the user.
     369                 :             :                          */
     370                 :           3 :                         opts->origin = defGetString(defel);
     371                 :             : 
     372   [ +  +  +  + ]:           3 :                         if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
     373                 :           2 :                                 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
     374   [ +  -  +  - ]:           1 :                                 ereport(ERROR,
     375                 :             :                                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     376                 :             :                                                 errmsg("unrecognized origin value: \"%s\"", opts->origin));
     377                 :           2 :                 }
     378         [ +  + ]:           4 :                 else if (IsSet(supported_opts, SUBOPT_LSN) &&
     379                 :           3 :                                  strcmp(defel->defname, "lsn") == 0)
     380                 :             :                 {
     381                 :           3 :                         char       *lsn_str = defGetString(defel);
     382                 :           3 :                         XLogRecPtr      lsn;
     383                 :             : 
     384         [ +  - ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_LSN))
     385                 :           0 :                                 errorConflictingDefElem(defel, pstate);
     386                 :             : 
     387                 :             :                         /* Setting lsn = NONE is treated as resetting LSN */
     388         [ +  + ]:           3 :                         if (strcmp(lsn_str, "none") == 0)
     389                 :           1 :                                 lsn = InvalidXLogRecPtr;
     390                 :             :                         else
     391                 :             :                         {
     392                 :             :                                 /* Parse the argument as LSN */
     393                 :           2 :                                 lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
     394                 :             :                                                                                                           CStringGetDatum(lsn_str)));
     395                 :             : 
     396         [ +  + ]:           2 :                                 if (!XLogRecPtrIsValid(lsn))
     397   [ +  -  +  - ]:           1 :                                         ereport(ERROR,
     398                 :             :                                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     399                 :             :                                                          errmsg("invalid WAL location (LSN): %s", lsn_str)));
     400                 :             :                         }
     401                 :             : 
     402                 :           2 :                         opts->specified_opts |= SUBOPT_LSN;
     403                 :           2 :                         opts->lsn = lsn;
     404                 :           2 :                 }
     405                 :             :                 else
     406   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     407                 :             :                                         (errcode(ERRCODE_SYNTAX_ERROR),
     408                 :             :                                          errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     409                 :         109 :         }
     410                 :             : 
     411                 :             :         /*
     412                 :             :          * We've been explicitly asked to not connect, that requires some
     413                 :             :          * additional processing.
     414                 :             :          */
     415   [ +  +  +  + ]:          82 :         if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
     416                 :             :         {
     417                 :             :                 /* Check for incompatible options from the user. */
     418   [ +  -  +  + ]:          22 :                 if (opts->enabled &&
     419                 :          22 :                         IsSet(opts->specified_opts, SUBOPT_ENABLED))
     420   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     421                 :             :                                         (errcode(ERRCODE_SYNTAX_ERROR),
     422                 :             :                         /*- translator: both %s are strings of the form "option = value" */
     423                 :             :                                          errmsg("%s and %s are mutually exclusive options",
     424                 :             :                                                         "connect = false", "enabled = true")));
     425                 :             : 
     426   [ +  +  +  + ]:          21 :                 if (opts->create_slot &&
     427                 :          20 :                         IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     428   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     429                 :             :                                         (errcode(ERRCODE_SYNTAX_ERROR),
     430                 :             :                                          errmsg("%s and %s are mutually exclusive options",
     431                 :             :                                                         "connect = false", "create_slot = true")));
     432                 :             : 
     433   [ +  +  +  + ]:          20 :                 if (opts->copy_data &&
     434                 :          19 :                         IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     435   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     436                 :             :                                         (errcode(ERRCODE_SYNTAX_ERROR),
     437                 :             :                                          errmsg("%s and %s are mutually exclusive options",
     438                 :             :                                                         "connect = false", "copy_data = true")));
     439                 :             : 
     440                 :             :                 /* Change the defaults of other options. */
     441                 :          19 :                 opts->enabled = false;
     442                 :          19 :                 opts->create_slot = false;
     443                 :          19 :                 opts->copy_data = false;
     444                 :          19 :         }
     445                 :             : 
     446                 :             :         /*
     447                 :             :          * Do additional checking for disallowed combination when slot_name = NONE
     448                 :             :          * was used.
     449                 :             :          */
     450   [ +  +  +  + ]:          79 :         if (!opts->slot_name &&
     451                 :          70 :                 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     452                 :             :         {
     453         [ +  + ]:          18 :                 if (opts->enabled)
     454                 :             :                 {
     455         [ +  + ]:           3 :                         if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     456   [ +  -  +  - ]:           1 :                                 ereport(ERROR,
     457                 :             :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     458                 :             :                                 /*- translator: both %s are strings of the form "option = value" */
     459                 :             :                                                  errmsg("%s and %s are mutually exclusive options",
     460                 :             :                                                                 "slot_name = NONE", "enabled = true")));
     461                 :             :                         else
     462   [ +  -  +  - ]:           2 :                                 ereport(ERROR,
     463                 :             :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     464                 :             :                                 /*- translator: both %s are strings of the form "option = value" */
     465                 :             :                                                  errmsg("subscription with %s must also set %s",
     466                 :             :                                                                 "slot_name = NONE", "enabled = false")));
     467                 :           0 :                 }
     468                 :             : 
     469         [ +  + ]:          15 :                 if (opts->create_slot)
     470                 :             :                 {
     471         [ +  + ]:           2 :                         if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     472   [ +  -  +  - ]:           1 :                                 ereport(ERROR,
     473                 :             :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     474                 :             :                                 /*- translator: both %s are strings of the form "option = value" */
     475                 :             :                                                  errmsg("%s and %s are mutually exclusive options",
     476                 :             :                                                                 "slot_name = NONE", "create_slot = true")));
     477                 :             :                         else
     478   [ +  -  +  - ]:           1 :                                 ereport(ERROR,
     479                 :             :                                                 (errcode(ERRCODE_SYNTAX_ERROR),
     480                 :             :                                 /*- translator: both %s are strings of the form "option = value" */
     481                 :             :                                                  errmsg("subscription with %s must also set %s",
     482                 :             :                                                                 "slot_name = NONE", "create_slot = false")));
     483                 :           0 :                 }
     484                 :          13 :         }
     485                 :          74 : }
     486                 :             : 
     487                 :             : /*
     488                 :             :  * Check that the specified publications are present on the publisher.
     489                 :             :  */
     490                 :             : static void
     491                 :           0 : check_publications(WalReceiverConn *wrconn, List *publications)
     492                 :             : {
     493                 :           0 :         WalRcvExecResult *res;
     494                 :           0 :         StringInfoData cmd;
     495                 :           0 :         TupleTableSlot *slot;
     496                 :           0 :         List       *publicationsCopy = NIL;
     497                 :           0 :         Oid                     tableRow[1] = {TEXTOID};
     498                 :             : 
     499                 :           0 :         initStringInfo(&cmd);
     500                 :           0 :         appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
     501                 :             :                                                    " pg_catalog.pg_publication t WHERE\n"
     502                 :             :                                                    " t.pubname IN (");
     503                 :           0 :         GetPublicationsStr(publications, &cmd, true);
     504                 :           0 :         appendStringInfoChar(&cmd, ')');
     505                 :             : 
     506                 :           0 :         res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
     507                 :           0 :         pfree(cmd.data);
     508                 :             : 
     509         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
     510   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     511                 :             :                                 errmsg("could not receive list of publications from the publisher: %s",
     512                 :             :                                            res->err));
     513                 :             : 
     514                 :           0 :         publicationsCopy = list_copy(publications);
     515                 :             : 
     516                 :             :         /* Process publication(s). */
     517                 :           0 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     518         [ #  # ]:           0 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     519                 :             :         {
     520                 :           0 :                 char       *pubname;
     521                 :           0 :                 bool            isnull;
     522                 :             : 
     523                 :           0 :                 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     524         [ #  # ]:           0 :                 Assert(!isnull);
     525                 :             : 
     526                 :             :                 /* Delete the publication present in publisher from the list. */
     527                 :           0 :                 publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
     528                 :           0 :                 ExecClearTuple(slot);
     529                 :           0 :         }
     530                 :             : 
     531                 :           0 :         ExecDropSingleTupleTableSlot(slot);
     532                 :             : 
     533                 :           0 :         walrcv_clear_result(res);
     534                 :             : 
     535         [ #  # ]:           0 :         if (list_length(publicationsCopy))
     536                 :             :         {
     537                 :             :                 /* Prepare the list of non-existent publication(s) for error message. */
     538                 :           0 :                 StringInfoData pubnames;
     539                 :             : 
     540                 :           0 :                 initStringInfo(&pubnames);
     541                 :             : 
     542                 :           0 :                 GetPublicationsStr(publicationsCopy, &pubnames, false);
     543   [ #  #  #  # ]:           0 :                 ereport(WARNING,
     544                 :             :                                 errcode(ERRCODE_UNDEFINED_OBJECT),
     545                 :             :                                 errmsg_plural("publication %s does not exist on the publisher",
     546                 :             :                                                           "publications %s do not exist on the publisher",
     547                 :             :                                                           list_length(publicationsCopy),
     548                 :             :                                                           pubnames.data));
     549                 :           0 :         }
     550                 :           0 : }
     551                 :             : 
     552                 :             : /*
     553                 :             :  * Auxiliary function to build a text array out of a list of String nodes.
     554                 :             :  */
     555                 :             : static Datum
     556                 :          20 : publicationListToArray(List *publist)
     557                 :             : {
     558                 :          20 :         ArrayType  *arr;
     559                 :          20 :         Datum      *datums;
     560                 :          20 :         MemoryContext memcxt;
     561                 :          20 :         MemoryContext oldcxt;
     562                 :             : 
     563                 :             :         /* Create memory context for temporary allocations. */
     564                 :          20 :         memcxt = AllocSetContextCreate(CurrentMemoryContext,
     565                 :             :                                                                    "publicationListToArray to array",
     566                 :             :                                                                    ALLOCSET_DEFAULT_SIZES);
     567                 :          20 :         oldcxt = MemoryContextSwitchTo(memcxt);
     568                 :             : 
     569                 :          20 :         datums = palloc_array(Datum, list_length(publist));
     570                 :             : 
     571                 :          20 :         check_duplicates_in_publist(publist, datums);
     572                 :             : 
     573                 :          20 :         MemoryContextSwitchTo(oldcxt);
     574                 :             : 
     575                 :          20 :         arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
     576                 :             : 
     577                 :          20 :         MemoryContextDelete(memcxt);
     578                 :             : 
     579                 :          40 :         return PointerGetDatum(arr);
     580                 :          20 : }
     581                 :             : 
     582                 :             : /*
     583                 :             :  * Create new subscription.
     584                 :             :  */
     585                 :             : ObjectAddress
     586                 :          25 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
     587                 :             :                                    bool isTopLevel)
     588                 :             : {
     589                 :          25 :         Relation        rel;
     590                 :             :         ObjectAddress myself;
     591                 :          25 :         Oid                     subid;
     592                 :          25 :         bool            nulls[Natts_pg_subscription];
     593                 :          25 :         Datum           values[Natts_pg_subscription];
     594                 :          25 :         Oid                     owner = GetUserId();
     595                 :          25 :         HeapTuple       tup;
     596                 :          25 :         char       *conninfo;
     597                 :          25 :         char            originname[NAMEDATALEN];
     598                 :          25 :         List       *publications;
     599                 :          25 :         bits32          supported_opts;
     600                 :          25 :         SubOpts         opts = {0};
     601                 :          25 :         AclResult       aclresult;
     602                 :             : 
     603                 :             :         /*
     604                 :             :          * Parse and check options.
     605                 :             :          *
     606                 :             :          * Connection and publication should not be specified here.
     607                 :             :          */
     608                 :          25 :         supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     609                 :             :                                           SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
     610                 :             :                                           SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
     611                 :             :                                           SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
     612                 :             :                                           SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
     613                 :             :                                           SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
     614                 :             :                                           SUBOPT_RETAIN_DEAD_TUPLES |
     615                 :             :                                           SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
     616                 :          25 :         parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
     617                 :             : 
     618                 :             :         /*
     619                 :             :          * Since creating a replication slot is not transactional, rolling back
     620                 :             :          * the transaction leaves the created replication slot.  So we cannot run
     621                 :             :          * CREATE SUBSCRIPTION inside a transaction block if creating a
     622                 :             :          * replication slot.
     623                 :             :          */
     624         [ +  + ]:          25 :         if (opts.create_slot)
     625                 :           4 :                 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     626                 :             : 
     627                 :             :         /*
     628                 :             :          * We don't want to allow unprivileged users to be able to trigger
     629                 :             :          * attempts to access arbitrary network destinations, so require the user
     630                 :             :          * to have been specifically authorized to create subscriptions.
     631                 :             :          */
     632         [ +  + ]:          25 :         if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
     633   [ +  -  +  - ]:           1 :                 ereport(ERROR,
     634                 :             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     635                 :             :                                  errmsg("permission denied to create subscription"),
     636                 :             :                                  errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
     637                 :             :                                                    "pg_create_subscription")));
     638                 :             : 
     639                 :             :         /*
     640                 :             :          * Since a subscription is a database object, we also check for CREATE
     641                 :             :          * permission on the database.
     642                 :             :          */
     643                 :          48 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
     644                 :          24 :                                                                 owner, ACL_CREATE);
     645         [ +  + ]:          24 :         if (aclresult != ACLCHECK_OK)
     646                 :           2 :                 aclcheck_error(aclresult, OBJECT_DATABASE,
     647                 :           1 :                                            get_database_name(MyDatabaseId));
     648                 :             : 
     649                 :             :         /*
     650                 :             :          * Non-superusers are required to set a password for authentication, and
     651                 :             :          * that password must be used by the target server, but the superuser can
     652                 :             :          * exempt a subscription from this requirement.
     653                 :             :          */
     654   [ +  +  -  + ]:          24 :         if (!opts.passwordrequired && !superuser_arg(owner))
     655   [ +  -  +  - ]:           1 :                 ereport(ERROR,
     656                 :             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     657                 :             :                                  errmsg("password_required=false is superuser-only"),
     658                 :             :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
     659                 :             : 
     660                 :             :         /*
     661                 :             :          * If built with appropriate switch, whine when regression-testing
     662                 :             :          * conventions for subscription names are violated.
     663                 :             :          */
     664                 :             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     665                 :             :         if (strncmp(stmt->subname, "regress_", 8) != 0)
     666                 :             :                 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     667                 :             : #endif
     668                 :             : 
     669                 :          23 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     670                 :             : 
     671                 :             :         /* Check if name is used */
     672                 :          23 :         subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     673                 :             :                                                         ObjectIdGetDatum(MyDatabaseId), CStringGetDatum(stmt->subname));
     674         [ +  + ]:          23 :         if (OidIsValid(subid))
     675                 :             :         {
     676   [ +  -  +  - ]:           1 :                 ereport(ERROR,
     677                 :             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     678                 :             :                                  errmsg("subscription \"%s\" already exists",
     679                 :             :                                                 stmt->subname)));
     680                 :           0 :         }
     681                 :             : 
     682                 :             :         /*
     683                 :             :          * Ensure that system configuration parameters are set appropriately to
     684                 :             :          * support retain_dead_tuples and max_retention_duration.
     685                 :             :          */
     686                 :          44 :         CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
     687                 :          22 :                                                            opts.retaindeadtuples, opts.retaindeadtuples,
     688                 :          22 :                                                            (opts.maxretention > 0));
     689                 :             : 
     690   [ +  +  -  + ]:          22 :         if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
     691                 :          15 :                 opts.slot_name == NULL)
     692                 :          15 :                 opts.slot_name = stmt->subname;
     693                 :             : 
     694                 :             :         /* The default for synchronous_commit of subscriptions is off. */
     695         [ +  + ]:          22 :         if (opts.synchronous_commit == NULL)
     696                 :          18 :                 opts.synchronous_commit = "off";
     697                 :             : 
     698                 :          22 :         conninfo = stmt->conninfo;
     699                 :          22 :         publications = stmt->publication;
     700                 :             : 
     701                 :             :         /* Load the library providing us libpq calls. */
     702                 :          22 :         load_file("libpqwalreceiver", false);
     703                 :             : 
     704                 :             :         /* Check the connection info string. */
     705         [ +  + ]:          22 :         walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
     706                 :             : 
     707                 :             :         /* Everything ok, form a new tuple. */
     708                 :          22 :         memset(values, 0, sizeof(values));
     709                 :          22 :         memset(nulls, false, sizeof(nulls));
     710                 :             : 
     711                 :          22 :         subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     712                 :             :                                                            Anum_pg_subscription_oid);
     713                 :          22 :         values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     714                 :          22 :         values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     715                 :          22 :         values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
     716                 :          22 :         values[Anum_pg_subscription_subname - 1] =
     717                 :          22 :                 DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     718                 :          22 :         values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     719                 :          22 :         values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
     720                 :          22 :         values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
     721                 :          22 :         values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
     722                 :          22 :         values[Anum_pg_subscription_subtwophasestate - 1] =
     723                 :          22 :                 CharGetDatum(opts.twophase ?
     724                 :             :                                          LOGICALREP_TWOPHASE_STATE_PENDING :
     725                 :             :                                          LOGICALREP_TWOPHASE_STATE_DISABLED);
     726                 :          22 :         values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
     727                 :          22 :         values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
     728                 :          22 :         values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
     729                 :          22 :         values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
     730                 :          22 :         values[Anum_pg_subscription_subretaindeadtuples - 1] =
     731                 :          22 :                 BoolGetDatum(opts.retaindeadtuples);
     732                 :          22 :         values[Anum_pg_subscription_submaxretention - 1] =
     733                 :          22 :                 Int32GetDatum(opts.maxretention);
     734                 :          22 :         values[Anum_pg_subscription_subretentionactive - 1] =
     735                 :          22 :                 Int32GetDatum(opts.retaindeadtuples);
     736                 :          22 :         values[Anum_pg_subscription_subconninfo - 1] =
     737                 :          22 :                 CStringGetTextDatum(conninfo);
     738         [ +  + ]:          22 :         if (opts.slot_name)
     739                 :          11 :                 values[Anum_pg_subscription_subslotname - 1] =
     740                 :          11 :                         DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
     741                 :             :         else
     742                 :           3 :                 nulls[Anum_pg_subscription_subslotname - 1] = true;
     743                 :          14 :         values[Anum_pg_subscription_subsynccommit - 1] =
     744                 :          14 :                 CStringGetTextDatum(opts.synchronous_commit);
     745                 :          14 :         values[Anum_pg_subscription_subpublications - 1] =
     746                 :          14 :                 publicationListToArray(publications);
     747                 :          14 :         values[Anum_pg_subscription_suborigin - 1] =
     748                 :          14 :                 CStringGetTextDatum(opts.origin);
     749                 :             : 
     750                 :          14 :         tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     751                 :             : 
     752                 :             :         /* Insert tuple into catalog. */
     753                 :          14 :         CatalogTupleInsert(rel, tup);
     754                 :          14 :         heap_freetuple(tup);
     755                 :             : 
     756                 :          14 :         recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     757                 :             : 
     758                 :             :         /*
     759                 :             :          * A replication origin is currently created for all subscriptions,
     760                 :             :          * including those that only contain sequences or are otherwise empty.
     761                 :             :          *
     762                 :             :          * XXX: While this is technically unnecessary, optimizing it would require
     763                 :             :          * additional logic to skip origin creation during DDL operations and
     764                 :             :          * apply workers initialization, and to handle origin creation dynamically
     765                 :             :          * when tables are added to the subscription. It is not clear whether
     766                 :             :          * preventing creation of origins is worth additional complexity.
     767                 :             :          */
     768                 :          14 :         ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
     769                 :          14 :         replorigin_create(originname);
     770                 :             : 
     771                 :             :         /*
     772                 :             :          * Connect to remote side to execute requested commands and fetch table
     773                 :             :          * and sequence info.
     774                 :             :          */
     775         [ +  + ]:          14 :         if (opts.connect)
     776                 :             :         {
     777                 :           1 :                 char       *err;
     778                 :           1 :                 WalReceiverConn *wrconn;
     779                 :           1 :                 bool            must_use_password;
     780                 :             : 
     781                 :             :                 /* Try to connect to the publisher. */
     782         [ +  - ]:           1 :                 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
     783                 :           1 :                 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
     784                 :             :                                                                 stmt->subname, &err);
     785         [ -  + ]:           1 :                 if (!wrconn)
     786   [ +  -  +  - ]:           1 :                         ereport(ERROR,
     787                 :             :                                         (errcode(ERRCODE_CONNECTION_FAILURE),
     788                 :             :                                          errmsg("subscription \"%s\" could not connect to the publisher: %s",
     789                 :             :                                                         stmt->subname, err)));
     790                 :             : 
     791         [ #  # ]:           0 :                 PG_TRY();
     792                 :             :                 {
     793                 :           0 :                         bool            has_tables = false;
     794                 :           0 :                         List       *pubrels;
     795                 :           0 :                         char            relation_state;
     796                 :             : 
     797                 :           0 :                         check_publications(wrconn, publications);
     798                 :           0 :                         check_publications_origin_tables(wrconn, publications,
     799                 :           0 :                                                                                          opts.copy_data,
     800                 :           0 :                                                                                          opts.retaindeadtuples, opts.origin,
     801                 :           0 :                                                                                          NULL, 0, stmt->subname);
     802                 :           0 :                         check_publications_origin_sequences(wrconn, publications,
     803                 :           0 :                                                                                                 opts.copy_data, opts.origin,
     804                 :           0 :                                                                                                 NULL, 0, stmt->subname);
     805                 :             : 
     806         [ #  # ]:           0 :                         if (opts.retaindeadtuples)
     807                 :           0 :                                 check_pub_dead_tuple_retention(wrconn);
     808                 :             : 
     809                 :             :                         /*
     810                 :             :                          * Set sync state based on if we were asked to do data copy or
     811                 :             :                          * not.
     812                 :             :                          */
     813                 :           0 :                         relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     814                 :             : 
     815                 :             :                         /*
     816                 :             :                          * Build local relation status info. Relations are for both tables
     817                 :             :                          * and sequences from the publisher.
     818                 :             :                          */
     819                 :           0 :                         pubrels = fetch_relation_list(wrconn, publications);
     820                 :             : 
     821   [ #  #  #  #  :           0 :                         foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
             #  #  #  # ]
     822                 :             :                         {
     823                 :           0 :                                 Oid                     relid;
     824                 :           0 :                                 char            relkind;
     825                 :           0 :                                 RangeVar   *rv = pubrelinfo->rv;
     826                 :             : 
     827                 :           0 :                                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     828                 :           0 :                                 relkind = get_rel_relkind(relid);
     829                 :             : 
     830                 :             :                                 /* Check for supported relkind. */
     831                 :           0 :                                 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
     832                 :           0 :                                                                                  rv->schemaname, rv->relname);
     833                 :           0 :                                 has_tables |= (relkind != RELKIND_SEQUENCE);
     834                 :           0 :                                 AddSubscriptionRelState(subid, relid, relation_state,
     835                 :             :                                                                                 InvalidXLogRecPtr, true);
     836                 :           0 :                         }
     837                 :             : 
     838                 :             :                         /*
     839                 :             :                          * If requested, create permanent slot for the subscription. We
     840                 :             :                          * won't use the initial snapshot for anything, so no need to
     841                 :             :                          * export it.
     842                 :             :                          *
     843                 :             :                          * XXX: Similar to origins, it is not clear whether preventing the
     844                 :             :                          * slot creation for empty and sequence-only subscriptions is
     845                 :             :                          * worth additional complexity.
     846                 :             :                          */
     847         [ #  # ]:           0 :                         if (opts.create_slot)
     848                 :             :                         {
     849                 :           0 :                                 bool            twophase_enabled = false;
     850                 :             : 
     851         [ #  # ]:           0 :                                 Assert(opts.slot_name);
     852                 :             : 
     853                 :             :                                 /*
     854                 :             :                                  * Even if two_phase is set, don't create the slot with
     855                 :             :                                  * two-phase enabled. Will enable it once all the tables are
     856                 :             :                                  * synced and ready. This avoids race-conditions like prepared
     857                 :             :                                  * transactions being skipped due to changes not being applied
     858                 :             :                                  * due to checks in should_apply_changes_for_rel() when
     859                 :             :                                  * tablesync for the corresponding tables are in progress. See
     860                 :             :                                  * comments atop worker.c.
     861                 :             :                                  *
     862                 :             :                                  * Note that if tables were specified but copy_data is false
     863                 :             :                                  * then it is safe to enable two_phase up-front because those
     864                 :             :                                  * tables are already initially in READY state. When the
     865                 :             :                                  * subscription has no tables, we leave the twophase state as
     866                 :             :                                  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
     867                 :             :                                  * PUBLICATION to work.
     868                 :             :                                  */
     869   [ #  #  #  #  :           0 :                                 if (opts.twophase && !opts.copy_data && has_tables)
                   #  # ]
     870                 :           0 :                                         twophase_enabled = true;
     871                 :             : 
     872                 :           0 :                                 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
     873                 :             :                                                                    opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
     874                 :             : 
     875         [ #  # ]:           0 :                                 if (twophase_enabled)
     876                 :           0 :                                         UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
     877                 :             : 
     878   [ #  #  #  # ]:           0 :                                 ereport(NOTICE,
     879                 :             :                                                 (errmsg("created replication slot \"%s\" on publisher",
     880                 :             :                                                                 opts.slot_name)));
     881                 :           0 :                         }
     882                 :           0 :                 }
     883                 :           0 :                 PG_FINALLY();
     884                 :             :                 {
     885                 :           0 :                         walrcv_disconnect(wrconn);
     886                 :             :                 }
     887         [ #  # ]:           0 :                 PG_END_TRY();
     888                 :           0 :         }
     889                 :             :         else
     890   [ -  +  +  - ]:          13 :                 ereport(WARNING,
     891                 :             :                                 (errmsg("subscription was created, but is not connected"),
     892                 :             :                                  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
     893                 :             : 
     894                 :          13 :         table_close(rel, RowExclusiveLock);
     895                 :             : 
     896                 :          13 :         pgstat_create_subscription(subid);
     897                 :             : 
     898                 :             :         /*
     899                 :             :          * Notify the launcher to start the apply worker if the subscription is
     900                 :             :          * enabled, or to create the conflict detection slot if retain_dead_tuples
     901                 :             :          * is enabled.
     902                 :             :          *
     903                 :             :          * Creating the conflict detection slot is essential even when the
     904                 :             :          * subscription is not enabled. This ensures that dead tuples are
     905                 :             :          * retained, which is necessary for accurately identifying the type of
     906                 :             :          * conflict during replication.
     907                 :             :          */
     908   [ +  -  -  + ]:          13 :         if (opts.enabled || opts.retaindeadtuples)
     909                 :           0 :                 ApplyLauncherWakeupAtCommit();
     910                 :             : 
     911                 :          13 :         ObjectAddressSet(myself, SubscriptionRelationId, subid);
     912                 :             : 
     913         [ +  - ]:          13 :         InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     914                 :             : 
     915                 :             :         return myself;
     916                 :          13 : }
     917                 :             : 
     918                 :             : static void
     919                 :           0 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
     920                 :             :                                                   List *validate_publications)
     921                 :             : {
     922                 :           0 :         char       *err;
     923                 :           0 :         List       *pubrels = NIL;
     924                 :           0 :         Oid                *pubrel_local_oids;
     925                 :           0 :         List       *subrel_states;
     926                 :           0 :         List       *sub_remove_rels = NIL;
     927                 :           0 :         Oid                *subrel_local_oids;
     928                 :           0 :         Oid                *subseq_local_oids;
     929                 :           0 :         int                     subrel_count;
     930                 :           0 :         ListCell   *lc;
     931                 :           0 :         int                     off;
     932                 :           0 :         int                     tbl_count = 0;
     933                 :           0 :         int                     seq_count = 0;
     934                 :           0 :         Relation        rel = NULL;
     935                 :             :         typedef struct SubRemoveRels
     936                 :             :         {
     937                 :             :                 Oid                     relid;
     938                 :             :                 char            state;
     939                 :             :         } SubRemoveRels;
     940                 :             : 
     941                 :           0 :         WalReceiverConn *wrconn;
     942                 :           0 :         bool            must_use_password;
     943                 :             : 
     944                 :             :         /* Load the library providing us libpq calls. */
     945                 :           0 :         load_file("libpqwalreceiver", false);
     946                 :             : 
     947                 :             :         /* Try to connect to the publisher. */
     948         [ #  # ]:           0 :         must_use_password = sub->passwordrequired && !sub->ownersuperuser;
     949                 :           0 :         wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
     950                 :             :                                                         sub->name, &err);
     951         [ #  # ]:           0 :         if (!wrconn)
     952   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     953                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
     954                 :             :                                  errmsg("subscription \"%s\" could not connect to the publisher: %s",
     955                 :             :                                                 sub->name, err)));
     956                 :             : 
     957         [ #  # ]:           0 :         PG_TRY();
     958                 :             :         {
     959         [ #  # ]:           0 :                 if (validate_publications)
     960                 :           0 :                         check_publications(wrconn, validate_publications);
     961                 :             : 
     962                 :             :                 /* Get the relation list from publisher. */
     963                 :           0 :                 pubrels = fetch_relation_list(wrconn, sub->publications);
     964                 :             : 
     965                 :             :                 /* Get local relation list. */
     966                 :           0 :                 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
     967                 :           0 :                 subrel_count = list_length(subrel_states);
     968                 :             : 
     969                 :             :                 /*
     970                 :             :                  * Build qsorted arrays of local table oids and sequence oids for
     971                 :             :                  * faster lookup. This can potentially contain all tables and
     972                 :             :                  * sequences in the database so speed of lookup is important.
     973                 :             :                  *
     974                 :             :                  * We do not yet know the exact count of tables and sequences, so we
     975                 :             :                  * allocate separate arrays for table OIDs and sequence OIDs based on
     976                 :             :                  * the total number of relations (subrel_count).
     977                 :             :                  */
     978                 :           0 :                 subrel_local_oids = palloc(subrel_count * sizeof(Oid));
     979                 :           0 :                 subseq_local_oids = palloc(subrel_count * sizeof(Oid));
     980   [ #  #  #  #  :           0 :                 foreach(lc, subrel_states)
                   #  # ]
     981                 :             :                 {
     982                 :           0 :                         SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
     983                 :             : 
     984         [ #  # ]:           0 :                         if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
     985                 :           0 :                                 subseq_local_oids[seq_count++] = relstate->relid;
     986                 :             :                         else
     987                 :           0 :                                 subrel_local_oids[tbl_count++] = relstate->relid;
     988                 :           0 :                 }
     989                 :             : 
     990                 :           0 :                 qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
     991                 :           0 :                 check_publications_origin_tables(wrconn, sub->publications, copy_data,
     992                 :           0 :                                                                                  sub->retaindeadtuples, sub->origin,
     993                 :           0 :                                                                                  subrel_local_oids, tbl_count,
     994                 :           0 :                                                                                  sub->name);
     995                 :             : 
     996                 :           0 :                 qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
     997                 :           0 :                 check_publications_origin_sequences(wrconn, sub->publications,
     998                 :           0 :                                                                                         copy_data, sub->origin,
     999                 :           0 :                                                                                         subseq_local_oids, seq_count,
    1000                 :           0 :                                                                                         sub->name);
    1001                 :             : 
    1002                 :             :                 /*
    1003                 :             :                  * Walk over the remote relations and try to match them to locally
    1004                 :             :                  * known relations. If the relation is not known locally create a new
    1005                 :             :                  * state for it.
    1006                 :             :                  *
    1007                 :             :                  * Also builds array of local oids of remote relations for the next
    1008                 :             :                  * step.
    1009                 :             :                  */
    1010                 :           0 :                 off = 0;
    1011                 :           0 :                 pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
    1012                 :             : 
    1013   [ #  #  #  #  :           0 :                 foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
             #  #  #  # ]
    1014                 :             :                 {
    1015                 :           0 :                         RangeVar   *rv = pubrelinfo->rv;
    1016                 :           0 :                         Oid                     relid;
    1017                 :           0 :                         char            relkind;
    1018                 :             : 
    1019                 :           0 :                         relid = RangeVarGetRelid(rv, AccessShareLock, false);
    1020                 :           0 :                         relkind = get_rel_relkind(relid);
    1021                 :             : 
    1022                 :             :                         /* Check for supported relkind. */
    1023                 :           0 :                         CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
    1024                 :           0 :                                                                          rv->schemaname, rv->relname);
    1025                 :             : 
    1026                 :           0 :                         pubrel_local_oids[off++] = relid;
    1027                 :             : 
    1028                 :           0 :                         if (!bsearch(&relid, subrel_local_oids,
    1029   [ #  #  #  #  :           0 :                                                  tbl_count, sizeof(Oid), oid_cmp) &&
                   #  # ]
    1030                 :           0 :                                 !bsearch(&relid, subseq_local_oids,
    1031                 :           0 :                                                  seq_count, sizeof(Oid), oid_cmp))
    1032                 :             :                         {
    1033                 :           0 :                                 AddSubscriptionRelState(sub->oid, relid,
    1034                 :           0 :                                                                                 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
    1035                 :             :                                                                                 InvalidXLogRecPtr, true);
    1036   [ #  #  #  # ]:           0 :                                 ereport(DEBUG1,
    1037                 :             :                                                 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
    1038                 :             :                                                                                 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1039                 :             :                                                                                 rv->schemaname, rv->relname, sub->name));
    1040                 :           0 :                         }
    1041                 :           0 :                 }
    1042                 :             : 
    1043                 :             :                 /*
    1044                 :             :                  * Next remove state for tables we should not care about anymore using
    1045                 :             :                  * the data we collected above
    1046                 :             :                  */
    1047                 :           0 :                 qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
    1048                 :             : 
    1049         [ #  # ]:           0 :                 for (off = 0; off < tbl_count; off++)
    1050                 :             :                 {
    1051                 :           0 :                         Oid                     relid = subrel_local_oids[off];
    1052                 :             : 
    1053   [ #  #  #  # ]:           0 :                         if (!bsearch(&relid, pubrel_local_oids,
    1054                 :           0 :                                                  list_length(pubrels), sizeof(Oid), oid_cmp))
    1055                 :             :                         {
    1056                 :           0 :                                 char            state;
    1057                 :           0 :                                 XLogRecPtr      statelsn;
    1058                 :           0 :                                 SubRemoveRels *remove_rel = palloc_object(SubRemoveRels);
    1059                 :             : 
    1060                 :             :                                 /*
    1061                 :             :                                  * Lock pg_subscription_rel with AccessExclusiveLock to
    1062                 :             :                                  * prevent any race conditions with the apply worker
    1063                 :             :                                  * re-launching workers at the same time this code is trying
    1064                 :             :                                  * to remove those tables.
    1065                 :             :                                  *
    1066                 :             :                                  * Even if new worker for this particular rel is restarted it
    1067                 :             :                                  * won't be able to make any progress as we hold exclusive
    1068                 :             :                                  * lock on pg_subscription_rel till the transaction end. It
    1069                 :             :                                  * will simply exit as there is no corresponding rel entry.
    1070                 :             :                                  *
    1071                 :             :                                  * This locking also ensures that the state of rels won't
    1072                 :             :                                  * change till we are done with this refresh operation.
    1073                 :             :                                  */
    1074         [ #  # ]:           0 :                                 if (!rel)
    1075                 :           0 :                                         rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1076                 :             : 
    1077                 :             :                                 /* Last known rel state. */
    1078                 :           0 :                                 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
    1079                 :             : 
    1080                 :           0 :                                 RemoveSubscriptionRel(sub->oid, relid);
    1081                 :             : 
    1082                 :           0 :                                 remove_rel->relid = relid;
    1083                 :           0 :                                 remove_rel->state = state;
    1084                 :             : 
    1085                 :           0 :                                 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
    1086                 :             : 
    1087                 :           0 :                                 logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
    1088                 :             : 
    1089                 :             :                                 /*
    1090                 :             :                                  * For READY state, we would have already dropped the
    1091                 :             :                                  * tablesync origin.
    1092                 :             :                                  */
    1093         [ #  # ]:           0 :                                 if (state != SUBREL_STATE_READY)
    1094                 :             :                                 {
    1095                 :           0 :                                         char            originname[NAMEDATALEN];
    1096                 :             : 
    1097                 :             :                                         /*
    1098                 :             :                                          * Drop the tablesync's origin tracking if exists.
    1099                 :             :                                          *
    1100                 :             :                                          * It is possible that the origin is not yet created for
    1101                 :             :                                          * tablesync worker, this can happen for the states before
    1102                 :             :                                          * SUBREL_STATE_DATASYNC. The tablesync worker or apply
    1103                 :             :                                          * worker can also concurrently try to drop the origin and
    1104                 :             :                                          * by this time the origin might be already removed. For
    1105                 :             :                                          * these reasons, passing missing_ok = true.
    1106                 :             :                                          */
    1107                 :           0 :                                         ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
    1108                 :             :                                                                                                            sizeof(originname));
    1109                 :           0 :                                         replorigin_drop_by_name(originname, true, false);
    1110                 :           0 :                                 }
    1111                 :             : 
    1112   [ #  #  #  # ]:           0 :                                 ereport(DEBUG1,
    1113                 :             :                                                 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
    1114                 :             :                                                                                  get_namespace_name(get_rel_namespace(relid)),
    1115                 :             :                                                                                  get_rel_name(relid),
    1116                 :             :                                                                                  sub->name)));
    1117                 :           0 :                         }
    1118                 :           0 :                 }
    1119                 :             : 
    1120                 :             :                 /*
    1121                 :             :                  * Drop the tablesync slots associated with removed tables. This has
    1122                 :             :                  * to be at the end because otherwise if there is an error while doing
    1123                 :             :                  * the database operations we won't be able to rollback dropped slots.
    1124                 :             :                  */
    1125   [ #  #  #  #  :           0 :                 foreach_ptr(SubRemoveRels, sub_remove_rel, sub_remove_rels)
             #  #  #  # ]
    1126                 :             :                 {
    1127   [ #  #  #  # ]:           0 :                         if (sub_remove_rel->state != SUBREL_STATE_READY &&
    1128                 :           0 :                                 sub_remove_rel->state != SUBREL_STATE_SYNCDONE)
    1129                 :             :                         {
    1130                 :           0 :                                 char            syncslotname[NAMEDATALEN] = {0};
    1131                 :             : 
    1132                 :             :                                 /*
    1133                 :             :                                  * For READY/SYNCDONE states we know the tablesync slot has
    1134                 :             :                                  * already been dropped by the tablesync worker.
    1135                 :             :                                  *
    1136                 :             :                                  * For other states, there is no certainty, maybe the slot
    1137                 :             :                                  * does not exist yet. Also, if we fail after removing some of
    1138                 :             :                                  * the slots, next time, it will again try to drop already
    1139                 :             :                                  * dropped slots and fail. For these reasons, we allow
    1140                 :             :                                  * missing_ok = true for the drop.
    1141                 :             :                                  */
    1142                 :           0 :                                 ReplicationSlotNameForTablesync(sub->oid, sub_remove_rel->relid,
    1143                 :           0 :                                                                                                 syncslotname, sizeof(syncslotname));
    1144                 :           0 :                                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1145                 :           0 :                         }
    1146                 :           0 :                 }
    1147                 :             : 
    1148                 :             :                 /*
    1149                 :             :                  * Next remove state for sequences we should not care about anymore
    1150                 :             :                  * using the data we collected above
    1151                 :             :                  */
    1152         [ #  # ]:           0 :                 for (off = 0; off < seq_count; off++)
    1153                 :             :                 {
    1154                 :           0 :                         Oid                     relid = subseq_local_oids[off];
    1155                 :             : 
    1156   [ #  #  #  # ]:           0 :                         if (!bsearch(&relid, pubrel_local_oids,
    1157                 :           0 :                                                  list_length(pubrels), sizeof(Oid), oid_cmp))
    1158                 :             :                         {
    1159                 :             :                                 /*
    1160                 :             :                                  * This locking ensures that the state of rels won't change
    1161                 :             :                                  * till we are done with this refresh operation.
    1162                 :             :                                  */
    1163         [ #  # ]:           0 :                                 if (!rel)
    1164                 :           0 :                                         rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
    1165                 :             : 
    1166                 :           0 :                                 RemoveSubscriptionRel(sub->oid, relid);
    1167                 :             : 
    1168   [ #  #  #  # ]:           0 :                                 ereport(DEBUG1,
    1169                 :             :                                                 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
    1170                 :             :                                                                                 get_namespace_name(get_rel_namespace(relid)),
    1171                 :             :                                                                                 get_rel_name(relid),
    1172                 :             :                                                                                 sub->name));
    1173                 :           0 :                         }
    1174                 :           0 :                 }
    1175                 :             :         }
    1176                 :           0 :         PG_FINALLY();
    1177                 :             :         {
    1178                 :           0 :                 walrcv_disconnect(wrconn);
    1179                 :             :         }
    1180         [ #  # ]:           0 :         PG_END_TRY();
    1181                 :             : 
    1182         [ #  # ]:           0 :         if (rel)
    1183                 :           0 :                 table_close(rel, NoLock);
    1184                 :           0 : }
    1185                 :             : 
    1186                 :             : /*
    1187                 :             :  * Marks all sequences with INIT state.
    1188                 :             :  */
    1189                 :             : static void
    1190                 :           0 : AlterSubscription_refresh_seq(Subscription *sub)
    1191                 :             : {
    1192                 :           0 :         char       *err = NULL;
    1193                 :           0 :         WalReceiverConn *wrconn;
    1194                 :           0 :         bool            must_use_password;
    1195                 :             : 
    1196                 :             :         /* Load the library providing us libpq calls. */
    1197                 :           0 :         load_file("libpqwalreceiver", false);
    1198                 :             : 
    1199                 :             :         /* Try to connect to the publisher. */
    1200         [ #  # ]:           0 :         must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1201                 :           0 :         wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
    1202                 :             :                                                         sub->name, &err);
    1203         [ #  # ]:           0 :         if (!wrconn)
    1204   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1205                 :             :                                 errcode(ERRCODE_CONNECTION_FAILURE),
    1206                 :             :                                 errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1207                 :             :                                            sub->name, err));
    1208                 :             : 
    1209         [ #  # ]:           0 :         PG_TRY();
    1210                 :             :         {
    1211                 :           0 :                 List       *subrel_states;
    1212                 :             : 
    1213                 :           0 :                 check_publications_origin_sequences(wrconn, sub->publications, true,
    1214                 :           0 :                                                                                         sub->origin, NULL, 0, sub->name);
    1215                 :             : 
    1216                 :             :                 /* Get local sequence list. */
    1217                 :           0 :                 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
    1218   [ #  #  #  #  :           0 :                 foreach_ptr(SubscriptionRelState, subrel, subrel_states)
             #  #  #  # ]
    1219                 :             :                 {
    1220                 :           0 :                         Oid                     relid = subrel->relid;
    1221                 :             : 
    1222                 :           0 :                         UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
    1223                 :             :                                                                            InvalidXLogRecPtr, false);
    1224   [ #  #  #  # ]:           0 :                         ereport(DEBUG1,
    1225                 :             :                                         errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
    1226                 :             :                                                                         get_namespace_name(get_rel_namespace(relid)),
    1227                 :             :                                                                         get_rel_name(relid),
    1228                 :             :                                                                         sub->name));
    1229                 :           0 :                 }
    1230                 :           0 :         }
    1231                 :           0 :         PG_FINALLY();
    1232                 :             :         {
    1233                 :           0 :                 walrcv_disconnect(wrconn);
    1234                 :             :         }
    1235         [ #  # ]:           0 :         PG_END_TRY();
    1236                 :           0 : }
    1237                 :             : 
    1238                 :             : /*
    1239                 :             :  * Common checks for altering failover, two_phase, and retain_dead_tuples
    1240                 :             :  * options.
    1241                 :             :  */
    1242                 :             : static void
    1243                 :           1 : CheckAlterSubOption(Subscription *sub, const char *option,
    1244                 :             :                                         bool slot_needs_update, bool isTopLevel)
    1245                 :             : {
    1246   [ -  +  #  #  :           1 :         Assert(strcmp(option, "failover") == 0 ||
                   #  # ]
    1247                 :             :                    strcmp(option, "two_phase") == 0 ||
    1248                 :             :                    strcmp(option, "retain_dead_tuples") == 0);
    1249                 :             : 
    1250                 :             :         /*
    1251                 :             :          * Altering the retain_dead_tuples option does not update the slot on the
    1252                 :             :          * publisher.
    1253                 :             :          */
    1254   [ +  -  +  - ]:           1 :         Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
    1255                 :             : 
    1256                 :             :         /*
    1257                 :             :          * Do not allow changing the option if the subscription is enabled. This
    1258                 :             :          * is because both failover and two_phase options of the slot on the
    1259                 :             :          * publisher cannot be modified if the slot is currently acquired by the
    1260                 :             :          * existing walsender.
    1261                 :             :          *
    1262                 :             :          * Note that two_phase is enabled (aka changed from 'false' to 'true') on
    1263                 :             :          * the publisher by the existing walsender, so we could have allowed that
    1264                 :             :          * even when the subscription is enabled. But we kept this restriction for
    1265                 :             :          * the sake of consistency and simplicity.
    1266                 :             :          *
    1267                 :             :          * Additionally, do not allow changing the retain_dead_tuples option when
    1268                 :             :          * the subscription is enabled to prevent race conditions arising from the
    1269                 :             :          * new option value being acknowledged asynchronously by the launcher and
    1270                 :             :          * apply workers.
    1271                 :             :          *
    1272                 :             :          * Without the restriction, a race condition may arise when a user
    1273                 :             :          * disables and immediately re-enables the retain_dead_tuples option. In
    1274                 :             :          * this case, the launcher might drop the slot upon noticing the disabled
    1275                 :             :          * action, while the apply worker may keep maintaining
    1276                 :             :          * oldest_nonremovable_xid without noticing the option change. During this
    1277                 :             :          * period, a transaction ID wraparound could falsely make this ID appear
    1278                 :             :          * as if it originates from the future w.r.t the transaction ID stored in
    1279                 :             :          * the slot maintained by launcher.
    1280                 :             :          *
    1281                 :             :          * Similarly, if the user enables retain_dead_tuples concurrently with the
    1282                 :             :          * launcher starting the worker, the apply worker may start calculating
    1283                 :             :          * oldest_nonremovable_xid before the launcher notices the enable action.
    1284                 :             :          * Consequently, the launcher may update slot.xmin to a newer value than
    1285                 :             :          * that maintained by the worker. In subsequent cycles, upon integrating
    1286                 :             :          * the worker's oldest_nonremovable_xid, the launcher might detect a
    1287                 :             :          * retreat in the calculated xmin, necessitating additional handling.
    1288                 :             :          *
    1289                 :             :          * XXX To address the above race conditions, we can define
    1290                 :             :          * oldest_nonremovable_xid as FullTransactionId and adds the check to
    1291                 :             :          * disallow retreating the conflict slot's xmin. For now, we kept the
    1292                 :             :          * implementation simple by disallowing change to the retain_dead_tuples,
    1293                 :             :          * but in the future we can change this after some more analysis.
    1294                 :             :          *
    1295                 :             :          * Note that we could restrict only the enabling of retain_dead_tuples to
    1296                 :             :          * avoid the race conditions described above, but we maintain the
    1297                 :             :          * restriction for both enable and disable operations for the sake of
    1298                 :             :          * consistency.
    1299                 :             :          */
    1300         [ +  - ]:           1 :         if (sub->enabled)
    1301   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1302                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1303                 :             :                                  errmsg("cannot set option \"%s\" for enabled subscription",
    1304                 :             :                                                 option)));
    1305                 :             : 
    1306         [ -  + ]:           1 :         if (slot_needs_update)
    1307                 :             :         {
    1308                 :           1 :                 StringInfoData cmd;
    1309                 :             : 
    1310                 :             :                 /*
    1311                 :             :                  * A valid slot must be associated with the subscription for us to
    1312                 :             :                  * modify any of the slot's properties.
    1313                 :             :                  */
    1314         [ +  - ]:           1 :                 if (!sub->slotname)
    1315   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1316                 :             :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1317                 :             :                                          errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
    1318                 :             :                                                         option)));
    1319                 :             : 
    1320                 :             :                 /* The changed option of the slot can't be rolled back. */
    1321                 :           1 :                 initStringInfo(&cmd);
    1322                 :           1 :                 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
    1323                 :             : 
    1324                 :           1 :                 PreventInTransactionBlock(isTopLevel, cmd.data);
    1325                 :           1 :                 pfree(cmd.data);
    1326                 :           1 :         }
    1327                 :           1 : }
    1328                 :             : 
    1329                 :             : /*
    1330                 :             :  * Alter the existing subscription.
    1331                 :             :  */
    1332                 :             : ObjectAddress
    1333                 :          40 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    1334                 :             :                                   bool isTopLevel)
    1335                 :             : {
    1336                 :          40 :         Relation        rel;
    1337                 :             :         ObjectAddress myself;
    1338                 :          40 :         bool            nulls[Natts_pg_subscription];
    1339                 :          40 :         bool            replaces[Natts_pg_subscription];
    1340                 :          40 :         Datum           values[Natts_pg_subscription];
    1341                 :          40 :         HeapTuple       tup;
    1342                 :          40 :         Oid                     subid;
    1343                 :          40 :         bool            update_tuple = false;
    1344                 :          40 :         bool            update_failover = false;
    1345                 :          40 :         bool            update_two_phase = false;
    1346                 :          40 :         bool            check_pub_rdt = false;
    1347                 :          40 :         bool            retain_dead_tuples;
    1348                 :          40 :         int                     max_retention;
    1349                 :          40 :         bool            retention_active;
    1350                 :          40 :         char       *origin;
    1351                 :          40 :         Subscription *sub;
    1352                 :          40 :         Form_pg_subscription form;
    1353                 :          40 :         bits32          supported_opts;
    1354                 :          40 :         SubOpts         opts = {0};
    1355                 :             : 
    1356                 :          40 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1357                 :             : 
    1358                 :             :         /* Fetch the existing tuple. */
    1359                 :          40 :         tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    1360                 :             :                                                           CStringGetDatum(stmt->subname));
    1361                 :             : 
    1362         [ +  + ]:          40 :         if (!HeapTupleIsValid(tup))
    1363   [ +  -  +  - ]:           1 :                 ereport(ERROR,
    1364                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1365                 :             :                                  errmsg("subscription \"%s\" does not exist",
    1366                 :             :                                                 stmt->subname)));
    1367                 :             : 
    1368                 :          39 :         form = (Form_pg_subscription) GETSTRUCT(tup);
    1369                 :          39 :         subid = form->oid;
    1370                 :             : 
    1371                 :             :         /* must be owner */
    1372         [ +  - ]:          39 :         if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1373                 :           0 :                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1374                 :           0 :                                            stmt->subname);
    1375                 :             : 
    1376                 :          39 :         sub = GetSubscription(subid, false);
    1377                 :             : 
    1378                 :          39 :         retain_dead_tuples = sub->retaindeadtuples;
    1379                 :          39 :         origin = sub->origin;
    1380                 :          39 :         max_retention = sub->maxretention;
    1381                 :          39 :         retention_active = sub->retentionactive;
    1382                 :             : 
    1383                 :             :         /*
    1384                 :             :          * Don't allow non-superuser modification of a subscription with
    1385                 :             :          * password_required=false.
    1386                 :             :          */
    1387   [ +  +  +  - ]:          39 :         if (!sub->passwordrequired && !superuser())
    1388   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1389                 :             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1390                 :             :                                  errmsg("password_required=false is superuser-only"),
    1391                 :             :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1392                 :             : 
    1393                 :             :         /* Lock the subscription so nobody else can do anything with it. */
    1394                 :          39 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1395                 :             : 
    1396                 :             :         /* Form a new tuple. */
    1397                 :          39 :         memset(values, 0, sizeof(values));
    1398                 :          39 :         memset(nulls, false, sizeof(nulls));
    1399                 :          39 :         memset(replaces, false, sizeof(replaces));
    1400                 :             : 
    1401   [ +  +  +  +  :          39 :         switch (stmt->kind)
             +  +  -  +  
                      - ]
    1402                 :             :         {
    1403                 :             :                 case ALTER_SUBSCRIPTION_OPTIONS:
    1404                 :             :                         {
    1405                 :          23 :                                 supported_opts = (SUBOPT_SLOT_NAME |
    1406                 :             :                                                                   SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
    1407                 :             :                                                                   SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
    1408                 :             :                                                                   SUBOPT_DISABLE_ON_ERR |
    1409                 :             :                                                                   SUBOPT_PASSWORD_REQUIRED |
    1410                 :             :                                                                   SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
    1411                 :             :                                                                   SUBOPT_RETAIN_DEAD_TUPLES |
    1412                 :             :                                                                   SUBOPT_MAX_RETENTION_DURATION |
    1413                 :             :                                                                   SUBOPT_ORIGIN);
    1414                 :             : 
    1415                 :          46 :                                 parse_subscription_options(pstate, stmt->options,
    1416                 :          23 :                                                                                    supported_opts, &opts);
    1417                 :             : 
    1418         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1419                 :             :                                 {
    1420                 :             :                                         /*
    1421                 :             :                                          * The subscription must be disabled to allow slot_name as
    1422                 :             :                                          * 'none', otherwise, the apply worker will repeatedly try
    1423                 :             :                                          * to stream the data using that slot_name which neither
    1424                 :             :                                          * exists on the publisher nor the user will be allowed to
    1425                 :             :                                          * create it.
    1426                 :             :                                          */
    1427   [ -  +  #  # ]:          11 :                                         if (sub->enabled && !opts.slot_name)
    1428   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1429                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1430                 :             :                                                                  errmsg("cannot set %s for enabled subscription",
    1431                 :             :                                                                                 "slot_name = NONE")));
    1432                 :             : 
    1433         [ +  + ]:          11 :                                         if (opts.slot_name)
    1434                 :           1 :                                                 values[Anum_pg_subscription_subslotname - 1] =
    1435                 :           1 :                                                         DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    1436                 :             :                                         else
    1437                 :          10 :                                                 nulls[Anum_pg_subscription_subslotname - 1] = true;
    1438                 :          11 :                                         replaces[Anum_pg_subscription_subslotname - 1] = true;
    1439                 :          11 :                                 }
    1440                 :             : 
    1441         [ +  + ]:          23 :                                 if (opts.synchronous_commit)
    1442                 :             :                                 {
    1443                 :           1 :                                         values[Anum_pg_subscription_subsynccommit - 1] =
    1444                 :           1 :                                                 CStringGetTextDatum(opts.synchronous_commit);
    1445                 :           1 :                                         replaces[Anum_pg_subscription_subsynccommit - 1] = true;
    1446                 :           1 :                                 }
    1447                 :             : 
    1448         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
    1449                 :             :                                 {
    1450                 :           1 :                                         values[Anum_pg_subscription_subbinary - 1] =
    1451                 :           1 :                                                 BoolGetDatum(opts.binary);
    1452                 :           1 :                                         replaces[Anum_pg_subscription_subbinary - 1] = true;
    1453                 :           1 :                                 }
    1454                 :             : 
    1455         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
    1456                 :             :                                 {
    1457                 :           3 :                                         values[Anum_pg_subscription_substream - 1] =
    1458                 :           3 :                                                 CharGetDatum(opts.streaming);
    1459                 :           3 :                                         replaces[Anum_pg_subscription_substream - 1] = true;
    1460                 :           3 :                                 }
    1461                 :             : 
    1462         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
    1463                 :             :                                 {
    1464                 :           1 :                                         values[Anum_pg_subscription_subdisableonerr - 1]
    1465                 :           2 :                                                 = BoolGetDatum(opts.disableonerr);
    1466                 :           1 :                                         replaces[Anum_pg_subscription_subdisableonerr - 1]
    1467                 :           1 :                                                 = true;
    1468                 :           1 :                                 }
    1469                 :             : 
    1470         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
    1471                 :             :                                 {
    1472                 :             :                                         /* Non-superuser may not disable password_required. */
    1473   [ +  +  +  - ]:           2 :                                         if (!opts.passwordrequired && !superuser())
    1474   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1475                 :             :                                                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1476                 :             :                                                                  errmsg("password_required=false is superuser-only"),
    1477                 :             :                                                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1478                 :             : 
    1479                 :           2 :                                         values[Anum_pg_subscription_subpasswordrequired - 1]
    1480                 :           4 :                                                 = BoolGetDatum(opts.passwordrequired);
    1481                 :           2 :                                         replaces[Anum_pg_subscription_subpasswordrequired - 1]
    1482                 :           2 :                                                 = true;
    1483                 :           2 :                                 }
    1484                 :             : 
    1485         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
    1486                 :             :                                 {
    1487                 :           2 :                                         values[Anum_pg_subscription_subrunasowner - 1] =
    1488                 :           2 :                                                 BoolGetDatum(opts.runasowner);
    1489                 :           2 :                                         replaces[Anum_pg_subscription_subrunasowner - 1] = true;
    1490                 :           2 :                                 }
    1491                 :             : 
    1492         [ +  - ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
    1493                 :             :                                 {
    1494                 :             :                                         /*
    1495                 :             :                                          * We need to update both the slot and the subscription
    1496                 :             :                                          * for the two_phase option. We can enable the two_phase
    1497                 :             :                                          * option for a slot only once the initial data
    1498                 :             :                                          * synchronization is done. This is to avoid missing some
    1499                 :             :                                          * data as explained in comments atop worker.c.
    1500                 :             :                                          */
    1501                 :           0 :                                         update_two_phase = !opts.twophase;
    1502                 :             : 
    1503                 :           0 :                                         CheckAlterSubOption(sub, "two_phase", update_two_phase,
    1504                 :           0 :                                                                                 isTopLevel);
    1505                 :             : 
    1506                 :             :                                         /*
    1507                 :             :                                          * Modifying the two_phase slot option requires a slot
    1508                 :             :                                          * lookup by slot name, so changing the slot name at the
    1509                 :             :                                          * same time is not allowed.
    1510                 :             :                                          */
    1511   [ #  #  #  # ]:           0 :                                         if (update_two_phase &&
    1512                 :           0 :                                                 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1513   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1514                 :             :                                                                 (errcode(ERRCODE_SYNTAX_ERROR),
    1515                 :             :                                                                  errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
    1516                 :             : 
    1517                 :             :                                         /*
    1518                 :             :                                          * Note that workers may still survive even if the
    1519                 :             :                                          * subscription has been disabled.
    1520                 :             :                                          *
    1521                 :             :                                          * Ensure workers have already been exited to avoid
    1522                 :             :                                          * getting prepared transactions while we are disabling
    1523                 :             :                                          * the two_phase option. Otherwise, the changes of an
    1524                 :             :                                          * already prepared transaction can be replicated again
    1525                 :             :                                          * along with its corresponding commit, leading to
    1526                 :             :                                          * duplicate data or errors.
    1527                 :             :                                          */
    1528         [ #  # ]:           0 :                                         if (logicalrep_workers_find(subid, true, true))
    1529   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1530                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1531                 :             :                                                                  errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
    1532                 :             :                                                                  errhint("Try again after some time.")));
    1533                 :             : 
    1534                 :             :                                         /*
    1535                 :             :                                          * two_phase cannot be disabled if there are any
    1536                 :             :                                          * uncommitted prepared transactions present otherwise it
    1537                 :             :                                          * can lead to duplicate data or errors as explained in
    1538                 :             :                                          * the comment above.
    1539                 :             :                                          */
    1540         [ #  # ]:           0 :                                         if (update_two_phase &&
    1541   [ #  #  #  # ]:           0 :                                                 sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
    1542                 :           0 :                                                 LookupGXactBySubid(subid))
    1543   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1544                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1545                 :             :                                                                  errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
    1546                 :             :                                                                  errhint("Resolve these transactions and try again.")));
    1547                 :             : 
    1548                 :             :                                         /* Change system catalog accordingly */
    1549                 :           0 :                                         values[Anum_pg_subscription_subtwophasestate - 1] =
    1550                 :           0 :                                                 CharGetDatum(opts.twophase ?
    1551                 :             :                                                                          LOGICALREP_TWOPHASE_STATE_PENDING :
    1552                 :             :                                                                          LOGICALREP_TWOPHASE_STATE_DISABLED);
    1553                 :           0 :                                         replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1554                 :           0 :                                 }
    1555                 :             : 
    1556         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
    1557                 :             :                                 {
    1558                 :             :                                         /*
    1559                 :             :                                          * Similar to the two_phase case above, we need to update
    1560                 :             :                                          * the failover option for both the slot and the
    1561                 :             :                                          * subscription.
    1562                 :             :                                          */
    1563                 :           1 :                                         update_failover = true;
    1564                 :             : 
    1565                 :           2 :                                         CheckAlterSubOption(sub, "failover", update_failover,
    1566                 :           1 :                                                                                 isTopLevel);
    1567                 :             : 
    1568                 :           1 :                                         values[Anum_pg_subscription_subfailover - 1] =
    1569                 :           1 :                                                 BoolGetDatum(opts.failover);
    1570                 :           1 :                                         replaces[Anum_pg_subscription_subfailover - 1] = true;
    1571                 :           1 :                                 }
    1572                 :             : 
    1573         [ +  - ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
    1574                 :             :                                 {
    1575                 :           0 :                                         values[Anum_pg_subscription_subretaindeadtuples - 1] =
    1576                 :           0 :                                                 BoolGetDatum(opts.retaindeadtuples);
    1577                 :           0 :                                         replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
    1578                 :             : 
    1579                 :             :                                         /*
    1580                 :             :                                          * Update the retention status only if there's a change in
    1581                 :             :                                          * the retain_dead_tuples option value.
    1582                 :             :                                          *
    1583                 :             :                                          * Automatically marking retention as active when
    1584                 :             :                                          * retain_dead_tuples is enabled may not always be ideal,
    1585                 :             :                                          * especially if retention was previously stopped and the
    1586                 :             :                                          * user toggles retain_dead_tuples without adjusting the
    1587                 :             :                                          * publisher workload. However, this behavior provides a
    1588                 :             :                                          * convenient way for users to manually refresh the
    1589                 :             :                                          * retention status. Since retention will be stopped again
    1590                 :             :                                          * unless the publisher workload is reduced, this approach
    1591                 :             :                                          * is acceptable for now.
    1592                 :             :                                          */
    1593         [ #  # ]:           0 :                                         if (opts.retaindeadtuples != sub->retaindeadtuples)
    1594                 :             :                                         {
    1595                 :           0 :                                                 values[Anum_pg_subscription_subretentionactive - 1] =
    1596                 :           0 :                                                         BoolGetDatum(opts.retaindeadtuples);
    1597                 :           0 :                                                 replaces[Anum_pg_subscription_subretentionactive - 1] = true;
    1598                 :             : 
    1599                 :           0 :                                                 retention_active = opts.retaindeadtuples;
    1600                 :           0 :                                         }
    1601                 :             : 
    1602                 :           0 :                                         CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
    1603                 :             : 
    1604                 :             :                                         /*
    1605                 :             :                                          * Workers may continue running even after the
    1606                 :             :                                          * subscription has been disabled.
    1607                 :             :                                          *
    1608                 :             :                                          * To prevent race conditions (as described in
    1609                 :             :                                          * CheckAlterSubOption()), ensure that all worker
    1610                 :             :                                          * processes have already exited before proceeding.
    1611                 :             :                                          */
    1612         [ #  # ]:           0 :                                         if (logicalrep_workers_find(subid, true, true))
    1613   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1614                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1615                 :             :                                                                  errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
    1616                 :             :                                                                  errhint("Try again after some time.")));
    1617                 :             : 
    1618                 :             :                                         /*
    1619                 :             :                                          * Notify the launcher to manage the replication slot for
    1620                 :             :                                          * conflict detection. This ensures that replication slot
    1621                 :             :                                          * is efficiently handled (created, updated, or dropped)
    1622                 :             :                                          * in response to any configuration changes.
    1623                 :             :                                          */
    1624                 :           0 :                                         ApplyLauncherWakeupAtCommit();
    1625                 :             : 
    1626                 :           0 :                                         check_pub_rdt = opts.retaindeadtuples;
    1627                 :           0 :                                         retain_dead_tuples = opts.retaindeadtuples;
    1628                 :           0 :                                 }
    1629                 :             : 
    1630         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1631                 :             :                                 {
    1632                 :           1 :                                         values[Anum_pg_subscription_submaxretention - 1] =
    1633                 :           1 :                                                 Int32GetDatum(opts.maxretention);
    1634                 :           1 :                                         replaces[Anum_pg_subscription_submaxretention - 1] = true;
    1635                 :             : 
    1636                 :           1 :                                         max_retention = opts.maxretention;
    1637                 :           1 :                                 }
    1638                 :             : 
    1639                 :             :                                 /*
    1640                 :             :                                  * Ensure that system configuration parameters are set
    1641                 :             :                                  * appropriately to support retain_dead_tuples and
    1642                 :             :                                  * max_retention_duration.
    1643                 :             :                                  */
    1644   [ +  -  +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
    1645                 :          23 :                                         IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
    1646                 :           2 :                                         CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
    1647                 :           1 :                                                                                            retain_dead_tuples,
    1648                 :           1 :                                                                                            retention_active,
    1649                 :           1 :                                                                                            (max_retention > 0));
    1650                 :             : 
    1651         [ +  + ]:          23 :                                 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
    1652                 :             :                                 {
    1653                 :           1 :                                         values[Anum_pg_subscription_suborigin - 1] =
    1654                 :           1 :                                                 CStringGetTextDatum(opts.origin);
    1655                 :           1 :                                         replaces[Anum_pg_subscription_suborigin - 1] = true;
    1656                 :             : 
    1657                 :             :                                         /*
    1658                 :             :                                          * Check if changes from different origins may be received
    1659                 :             :                                          * from the publisher when the origin is changed to ANY
    1660                 :             :                                          * and retain_dead_tuples is enabled.
    1661                 :             :                                          */
    1662         [ +  - ]:           1 :                                         check_pub_rdt = retain_dead_tuples &&
    1663                 :           0 :                                                 pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
    1664                 :             : 
    1665                 :           1 :                                         origin = opts.origin;
    1666                 :           1 :                                 }
    1667                 :             : 
    1668                 :          23 :                                 update_tuple = true;
    1669                 :          23 :                                 break;
    1670                 :             :                         }
    1671                 :             : 
    1672                 :             :                 case ALTER_SUBSCRIPTION_ENABLED:
    1673                 :             :                         {
    1674                 :           5 :                                 parse_subscription_options(pstate, stmt->options,
    1675                 :             :                                                                                    SUBOPT_ENABLED, &opts);
    1676         [ +  - ]:           5 :                                 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
    1677                 :             : 
    1678   [ +  +  -  + ]:           5 :                                 if (!sub->slotname && opts.enabled)
    1679   [ +  -  +  - ]:           1 :                                         ereport(ERROR,
    1680                 :             :                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1681                 :             :                                                          errmsg("cannot enable subscription that does not have a slot name")));
    1682                 :             : 
    1683                 :             :                                 /*
    1684                 :             :                                  * Check track_commit_timestamp only when enabling the
    1685                 :             :                                  * subscription in case it was disabled after creation. See
    1686                 :             :                                  * comments atop CheckSubDeadTupleRetention() for details.
    1687                 :             :                                  */
    1688                 :           8 :                                 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
    1689                 :           4 :                                                                                    WARNING, sub->retaindeadtuples,
    1690                 :           4 :                                                                                    sub->retentionactive, false);
    1691                 :             : 
    1692                 :           4 :                                 values[Anum_pg_subscription_subenabled - 1] =
    1693                 :           4 :                                         BoolGetDatum(opts.enabled);
    1694                 :           4 :                                 replaces[Anum_pg_subscription_subenabled - 1] = true;
    1695                 :             : 
    1696         [ +  + ]:           4 :                                 if (opts.enabled)
    1697                 :           2 :                                         ApplyLauncherWakeupAtCommit();
    1698                 :             : 
    1699                 :           4 :                                 update_tuple = true;
    1700                 :             : 
    1701                 :             :                                 /*
    1702                 :             :                                  * The subscription might be initially created with
    1703                 :             :                                  * connect=false and retain_dead_tuples=true, meaning the
    1704                 :             :                                  * remote server's status may not be checked. Ensure this
    1705                 :             :                                  * check is conducted now.
    1706                 :             :                                  */
    1707         [ +  - ]:           4 :                                 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
    1708                 :           4 :                                 break;
    1709                 :             :                         }
    1710                 :             : 
    1711                 :             :                 case ALTER_SUBSCRIPTION_CONNECTION:
    1712                 :             :                         /* Load the library providing us libpq calls. */
    1713                 :           2 :                         load_file("libpqwalreceiver", false);
    1714                 :             :                         /* Check the connection info string. */
    1715         [ -  + ]:           2 :                         walrcv_check_conninfo(stmt->conninfo,
    1716                 :             :                                                                   sub->passwordrequired && !sub->ownersuperuser);
    1717                 :             : 
    1718                 :           2 :                         values[Anum_pg_subscription_subconninfo - 1] =
    1719                 :           2 :                                 CStringGetTextDatum(stmt->conninfo);
    1720                 :           2 :                         replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1721                 :           2 :                         update_tuple = true;
    1722                 :             : 
    1723                 :             :                         /*
    1724                 :             :                          * Since the remote server configuration might have changed,
    1725                 :             :                          * perform a check to ensure it permits enabling
    1726                 :             :                          * retain_dead_tuples.
    1727                 :             :                          */
    1728                 :           2 :                         check_pub_rdt = sub->retaindeadtuples;
    1729                 :           2 :                         break;
    1730                 :             : 
    1731                 :             :                 case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    1732                 :             :                         {
    1733                 :           3 :                                 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
    1734                 :           6 :                                 parse_subscription_options(pstate, stmt->options,
    1735                 :           3 :                                                                                    supported_opts, &opts);
    1736                 :             : 
    1737                 :           3 :                                 values[Anum_pg_subscription_subpublications - 1] =
    1738                 :           3 :                                         publicationListToArray(stmt->publication);
    1739                 :           3 :                                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1740                 :             : 
    1741                 :           3 :                                 update_tuple = true;
    1742                 :             : 
    1743                 :             :                                 /* Refresh if user asked us to. */
    1744         [ +  + ]:           3 :                                 if (opts.refresh)
    1745                 :             :                                 {
    1746         [ +  - ]:           2 :                                         if (!sub->enabled)
    1747   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1748                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1749                 :             :                                                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1750                 :             :                                                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
    1751                 :             : 
    1752                 :             :                                         /*
    1753                 :             :                                          * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1754                 :             :                                          * why this is not allowed.
    1755                 :             :                                          */
    1756   [ -  +  #  # ]:           2 :                                         if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1757   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1758                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1759                 :             :                                                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1760                 :             :                                                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1761                 :             : 
    1762                 :           2 :                                         PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1763                 :             : 
    1764                 :             :                                         /* Make sure refresh sees the new list of publications. */
    1765                 :           2 :                                         sub->publications = stmt->publication;
    1766                 :             : 
    1767                 :           4 :                                         AlterSubscription_refresh(sub, opts.copy_data,
    1768                 :           2 :                                                                                           stmt->publication);
    1769                 :           2 :                                 }
    1770                 :             : 
    1771                 :           3 :                                 break;
    1772                 :             :                         }
    1773                 :             : 
    1774                 :             :                 case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    1775                 :             :                 case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    1776                 :             :                         {
    1777                 :           2 :                                 List       *publist;
    1778                 :           2 :                                 bool            isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
    1779                 :             : 
    1780                 :           2 :                                 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
    1781                 :           4 :                                 parse_subscription_options(pstate, stmt->options,
    1782                 :           2 :                                                                                    supported_opts, &opts);
    1783                 :             : 
    1784                 :           2 :                                 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
    1785                 :           2 :                                 values[Anum_pg_subscription_subpublications - 1] =
    1786                 :           2 :                                         publicationListToArray(publist);
    1787                 :           2 :                                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1788                 :             : 
    1789                 :           2 :                                 update_tuple = true;
    1790                 :             : 
    1791                 :             :                                 /* Refresh if user asked us to. */
    1792         [ +  - ]:           2 :                                 if (opts.refresh)
    1793                 :             :                                 {
    1794                 :             :                                         /* We only need to validate user specified publications. */
    1795         [ #  # ]:           0 :                                         List       *validate_publications = (isadd) ? stmt->publication : NULL;
    1796                 :             : 
    1797         [ #  # ]:           0 :                                         if (!sub->enabled)
    1798   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1799                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1800                 :             :                                                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1801                 :             :                                                 /* translator: %s is an SQL ALTER command */
    1802                 :             :                                                                  errhint("Use %s instead.",
    1803                 :             :                                                                                  isadd ?
    1804                 :             :                                                                                  "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
    1805                 :             :                                                                                  "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
    1806                 :             : 
    1807                 :             :                                         /*
    1808                 :             :                                          * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
    1809                 :             :                                          * why this is not allowed.
    1810                 :             :                                          */
    1811   [ #  #  #  # ]:           0 :                                         if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1812   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1813                 :             :                                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1814                 :             :                                                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1815                 :             :                                                 /* translator: %s is an SQL ALTER command */
    1816                 :             :                                                                  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
    1817                 :             :                                                                                  isadd ?
    1818                 :             :                                                                                  "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
    1819                 :             :                                                                                  "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
    1820                 :             : 
    1821                 :           0 :                                         PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1822                 :             : 
    1823                 :             :                                         /* Refresh the new list of publications. */
    1824                 :           0 :                                         sub->publications = publist;
    1825                 :             : 
    1826                 :           0 :                                         AlterSubscription_refresh(sub, opts.copy_data,
    1827                 :           0 :                                                                                           validate_publications);
    1828                 :           0 :                                 }
    1829                 :             : 
    1830                 :             :                                 break;
    1831                 :           2 :                         }
    1832                 :             : 
    1833                 :             :                 case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
    1834                 :             :                         {
    1835         [ +  + ]:           2 :                                 if (!sub->enabled)
    1836   [ +  -  +  - ]:           1 :                                         ereport(ERROR,
    1837                 :             :                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1838                 :             :                                                          errmsg("%s is not allowed for disabled subscriptions",
    1839                 :             :                                                                         "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
    1840                 :             : 
    1841                 :           1 :                                 parse_subscription_options(pstate, stmt->options,
    1842                 :             :                                                                                    SUBOPT_COPY_DATA, &opts);
    1843                 :             : 
    1844                 :             :                                 /*
    1845                 :             :                                  * The subscription option "two_phase" requires that
    1846                 :             :                                  * replication has passed the initial table synchronization
    1847                 :             :                                  * phase before the two_phase becomes properly enabled.
    1848                 :             :                                  *
    1849                 :             :                                  * But, having reached this two-phase commit "enabled" state
    1850                 :             :                                  * we must not allow any subsequent table initialization to
    1851                 :             :                                  * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
    1852                 :             :                                  * disallowed when the user had requested two_phase = on mode.
    1853                 :             :                                  *
    1854                 :             :                                  * The exception to this restriction is when copy_data =
    1855                 :             :                                  * false, because when copy_data is false the tablesync will
    1856                 :             :                                  * start already in READY state and will exit directly without
    1857                 :             :                                  * doing anything.
    1858                 :             :                                  *
    1859                 :             :                                  * For more details see comments atop worker.c.
    1860                 :             :                                  */
    1861   [ -  +  #  # ]:           1 :                                 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1862   [ #  #  #  # ]:           0 :                                         ereport(ERROR,
    1863                 :             :                                                         (errcode(ERRCODE_SYNTAX_ERROR),
    1864                 :             :                                                          errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
    1865                 :             :                                                          errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1866                 :             : 
    1867                 :           1 :                                 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
    1868                 :             : 
    1869                 :           1 :                                 AlterSubscription_refresh(sub, opts.copy_data, NULL);
    1870                 :             : 
    1871                 :           1 :                                 break;
    1872                 :             :                         }
    1873                 :             : 
    1874                 :             :                 case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
    1875                 :             :                         {
    1876         [ #  # ]:           0 :                                 if (!sub->enabled)
    1877   [ #  #  #  # ]:           0 :                                         ereport(ERROR,
    1878                 :             :                                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1879                 :             :                                                         errmsg("%s is not allowed for disabled subscriptions",
    1880                 :             :                                                                    "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
    1881                 :             : 
    1882                 :           0 :                                 AlterSubscription_refresh_seq(sub);
    1883                 :             : 
    1884                 :           0 :                                 break;
    1885                 :             :                         }
    1886                 :             : 
    1887                 :             :                 case ALTER_SUBSCRIPTION_SKIP:
    1888                 :             :                         {
    1889                 :           2 :                                 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
    1890                 :             : 
    1891                 :             :                                 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
    1892         [ +  - ]:           2 :                                 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
    1893                 :             : 
    1894                 :             :                                 /*
    1895                 :             :                                  * If the user sets subskiplsn, we do a sanity check to make
    1896                 :             :                                  * sure that the specified LSN is a probable value.
    1897                 :             :                                  */
    1898         [ +  + ]:           2 :                                 if (XLogRecPtrIsValid(opts.lsn))
    1899                 :             :                                 {
    1900                 :           1 :                                         RepOriginId originid;
    1901                 :           1 :                                         char            originname[NAMEDATALEN];
    1902                 :           1 :                                         XLogRecPtr      remote_lsn;
    1903                 :             : 
    1904                 :           2 :                                         ReplicationOriginNameForLogicalRep(subid, InvalidOid,
    1905                 :           1 :                                                                                                            originname, sizeof(originname));
    1906                 :           1 :                                         originid = replorigin_by_name(originname, false);
    1907                 :           1 :                                         remote_lsn = replorigin_get_progress(originid, false);
    1908                 :             : 
    1909                 :             :                                         /* Check the given LSN is at least a future LSN */
    1910   [ -  +  #  # ]:           1 :                                         if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
    1911   [ #  #  #  # ]:           0 :                                                 ereport(ERROR,
    1912                 :             :                                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1913                 :             :                                                                  errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
    1914                 :             :                                                                                 LSN_FORMAT_ARGS(opts.lsn),
    1915                 :             :                                                                                 LSN_FORMAT_ARGS(remote_lsn))));
    1916                 :           1 :                                 }
    1917                 :             : 
    1918                 :           2 :                                 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
    1919                 :           2 :                                 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    1920                 :             : 
    1921                 :           2 :                                 update_tuple = true;
    1922                 :           2 :                                 break;
    1923                 :             :                         }
    1924                 :             : 
    1925                 :             :                 default:
    1926   [ #  #  #  # ]:           0 :                         elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
    1927                 :             :                                  stmt->kind);
    1928                 :           0 :         }
    1929                 :             : 
    1930                 :             :         /* Update the catalog if needed. */
    1931         [ +  + ]:          35 :         if (update_tuple)
    1932                 :             :         {
    1933                 :          66 :                 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1934                 :          33 :                                                                 replaces);
    1935                 :             : 
    1936                 :          33 :                 CatalogTupleUpdate(rel, &tup->t_self, tup);
    1937                 :             : 
    1938                 :          33 :                 heap_freetuple(tup);
    1939                 :          33 :         }
    1940                 :             : 
    1941                 :             :         /*
    1942                 :             :          * Try to acquire the connection necessary either for modifying the slot
    1943                 :             :          * or for checking if the remote server permits enabling
    1944                 :             :          * retain_dead_tuples.
    1945                 :             :          *
    1946                 :             :          * This has to be at the end because otherwise if there is an error while
    1947                 :             :          * doing the database operations we won't be able to rollback altered
    1948                 :             :          * slot.
    1949                 :             :          */
    1950   [ +  +  +  -  :          35 :         if (update_failover || update_two_phase || check_pub_rdt)
                   +  + ]
    1951                 :             :         {
    1952                 :           4 :                 bool            must_use_password;
    1953                 :           4 :                 char       *err;
    1954                 :           4 :                 WalReceiverConn *wrconn;
    1955                 :             : 
    1956                 :             :                 /* Load the library providing us libpq calls. */
    1957                 :           4 :                 load_file("libpqwalreceiver", false);
    1958                 :             : 
    1959                 :             :                 /*
    1960                 :             :                  * Try to connect to the publisher, using the new connection string if
    1961                 :             :                  * available.
    1962                 :             :                  */
    1963         [ #  # ]:           4 :                 must_use_password = sub->passwordrequired && !sub->ownersuperuser;
    1964         [ #  # ]:           0 :                 wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
    1965                 :             :                                                                 true, true, must_use_password, sub->name,
    1966                 :             :                                                                 &err);
    1967         [ #  # ]:           0 :                 if (!wrconn)
    1968   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    1969                 :             :                                         (errcode(ERRCODE_CONNECTION_FAILURE),
    1970                 :             :                                          errmsg("subscription \"%s\" could not connect to the publisher: %s",
    1971                 :             :                                                         sub->name, err)));
    1972                 :             : 
    1973         [ #  # ]:           0 :                 PG_TRY();
    1974                 :             :                 {
    1975         [ #  # ]:           0 :                         if (retain_dead_tuples)
    1976                 :           0 :                                 check_pub_dead_tuple_retention(wrconn);
    1977                 :             : 
    1978                 :           0 :                         check_publications_origin_tables(wrconn, sub->publications, false,
    1979                 :           0 :                                                                                          retain_dead_tuples, origin, NULL, 0,
    1980                 :           0 :                                                                                          sub->name);
    1981                 :             : 
    1982   [ #  #  #  # ]:           0 :                         if (update_failover || update_two_phase)
    1983   [ #  #  #  # ]:           0 :                                 walrcv_alter_slot(wrconn, sub->slotname,
    1984                 :             :                                                                   update_failover ? &opts.failover : NULL,
    1985                 :             :                                                                   update_two_phase ? &opts.twophase : NULL);
    1986                 :             :                 }
    1987                 :           0 :                 PG_FINALLY();
    1988                 :             :                 {
    1989                 :           0 :                         walrcv_disconnect(wrconn);
    1990                 :             :                 }
    1991         [ #  # ]:           0 :                 PG_END_TRY();
    1992                 :           0 :         }
    1993                 :             : 
    1994                 :          31 :         table_close(rel, RowExclusiveLock);
    1995                 :             : 
    1996                 :          31 :         ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1997                 :             : 
    1998         [ +  - ]:          31 :         InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
    1999                 :             : 
    2000                 :             :         /* Wake up related replication workers to handle this change quickly. */
    2001                 :          31 :         LogicalRepWorkersWakeupAtCommit(subid);
    2002                 :             : 
    2003                 :             :         return myself;
    2004                 :          31 : }
    2005                 :             : 
    2006                 :             : /*
    2007                 :             :  * Drop a subscription
    2008                 :             :  */
    2009                 :             : void
    2010                 :          15 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    2011                 :             : {
    2012                 :          15 :         Relation        rel;
    2013                 :          15 :         ObjectAddress myself;
    2014                 :          15 :         HeapTuple       tup;
    2015                 :          15 :         Oid                     subid;
    2016                 :          15 :         Oid                     subowner;
    2017                 :          15 :         Datum           datum;
    2018                 :          15 :         bool            isnull;
    2019                 :          15 :         char       *subname;
    2020                 :          15 :         char       *conninfo;
    2021                 :          15 :         char       *slotname;
    2022                 :          15 :         List       *subworkers;
    2023                 :          15 :         ListCell   *lc;
    2024                 :          15 :         char            originname[NAMEDATALEN];
    2025                 :          15 :         char       *err = NULL;
    2026                 :          15 :         WalReceiverConn *wrconn;
    2027                 :          15 :         Form_pg_subscription form;
    2028                 :          15 :         List       *rstates;
    2029                 :          15 :         bool            must_use_password;
    2030                 :             : 
    2031                 :             :         /*
    2032                 :             :          * The launcher may concurrently start a new worker for this subscription.
    2033                 :             :          * During initialization, the worker checks for subscription validity and
    2034                 :             :          * exits if the subscription has already been dropped. See
    2035                 :             :          * InitializeLogRepWorker.
    2036                 :             :          */
    2037                 :          15 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2038                 :             : 
    2039                 :          30 :         tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2040                 :          15 :                                                   CStringGetDatum(stmt->subname));
    2041                 :             : 
    2042         [ +  + ]:          15 :         if (!HeapTupleIsValid(tup))
    2043                 :             :         {
    2044                 :           2 :                 table_close(rel, NoLock);
    2045                 :             : 
    2046         [ +  + ]:           2 :                 if (!stmt->missing_ok)
    2047   [ +  -  +  - ]:           1 :                         ereport(ERROR,
    2048                 :             :                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
    2049                 :             :                                          errmsg("subscription \"%s\" does not exist",
    2050                 :             :                                                         stmt->subname)));
    2051                 :             :                 else
    2052   [ -  +  +  - ]:           1 :                         ereport(NOTICE,
    2053                 :             :                                         (errmsg("subscription \"%s\" does not exist, skipping",
    2054                 :             :                                                         stmt->subname)));
    2055                 :             : 
    2056                 :           1 :                 return;
    2057                 :             :         }
    2058                 :             : 
    2059                 :          13 :         form = (Form_pg_subscription) GETSTRUCT(tup);
    2060                 :          13 :         subid = form->oid;
    2061                 :          13 :         subowner = form->subowner;
    2062         [ +  + ]:          13 :         must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
    2063                 :             : 
    2064                 :             :         /* must be owner */
    2065         [ +  - ]:          13 :         if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    2066                 :           0 :                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2067                 :           0 :                                            stmt->subname);
    2068                 :             : 
    2069                 :             :         /* DROP hook for the subscription being removed */
    2070         [ +  - ]:          13 :         InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
    2071                 :             : 
    2072                 :             :         /*
    2073                 :             :          * Lock the subscription so nobody else can do anything with it (including
    2074                 :             :          * the replication workers).
    2075                 :             :          */
    2076                 :          13 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    2077                 :             : 
    2078                 :             :         /* Get subname */
    2079                 :          13 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2080                 :             :                                                                    Anum_pg_subscription_subname);
    2081                 :          13 :         subname = pstrdup(NameStr(*DatumGetName(datum)));
    2082                 :             : 
    2083                 :             :         /* Get conninfo */
    2084                 :          13 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    2085                 :             :                                                                    Anum_pg_subscription_subconninfo);
    2086                 :          13 :         conninfo = TextDatumGetCString(datum);
    2087                 :             : 
    2088                 :             :         /* Get slotname */
    2089                 :          13 :         datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    2090                 :             :                                                         Anum_pg_subscription_subslotname, &isnull);
    2091         [ +  - ]:          13 :         if (!isnull)
    2092                 :           0 :                 slotname = pstrdup(NameStr(*DatumGetName(datum)));
    2093                 :             :         else
    2094                 :          13 :                 slotname = NULL;
    2095                 :             : 
    2096                 :             :         /*
    2097                 :             :          * Since dropping a replication slot is not transactional, the replication
    2098                 :             :          * slot stays dropped even if the transaction rolls back.  So we cannot
    2099                 :             :          * run DROP SUBSCRIPTION inside a transaction block if dropping the
    2100                 :             :          * replication slot.  Also, in this case, we report a message for dropping
    2101                 :             :          * the subscription to the cumulative stats system.
    2102                 :             :          *
    2103                 :             :          * XXX The command name should really be something like "DROP SUBSCRIPTION
    2104                 :             :          * of a subscription that is associated with a replication slot", but we
    2105                 :             :          * don't have the proper facilities for that.
    2106                 :             :          */
    2107         [ +  + ]:          13 :         if (slotname)
    2108                 :           1 :                 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
    2109                 :             : 
    2110                 :          13 :         ObjectAddressSet(myself, SubscriptionRelationId, subid);
    2111                 :          13 :         EventTriggerSQLDropAddObject(&myself, true, true);
    2112                 :             : 
    2113                 :             :         /* Remove the tuple from catalog. */
    2114                 :          13 :         CatalogTupleDelete(rel, &tup->t_self);
    2115                 :             : 
    2116                 :          13 :         ReleaseSysCache(tup);
    2117                 :             : 
    2118                 :             :         /*
    2119                 :             :          * Stop all the subscription workers immediately.
    2120                 :             :          *
    2121                 :             :          * This is necessary if we are dropping the replication slot, so that the
    2122                 :             :          * slot becomes accessible.
    2123                 :             :          *
    2124                 :             :          * It is also necessary if the subscription is disabled and was disabled
    2125                 :             :          * in the same transaction.  Then the workers haven't seen the disabling
    2126                 :             :          * yet and will still be running, leading to hangs later when we want to
    2127                 :             :          * drop the replication origin.  If the subscription was disabled before
    2128                 :             :          * this transaction, then there shouldn't be any workers left, so this
    2129                 :             :          * won't make a difference.
    2130                 :             :          *
    2131                 :             :          * New workers won't be started because we hold an exclusive lock on the
    2132                 :             :          * subscription till the end of the transaction.
    2133                 :             :          */
    2134                 :          13 :         subworkers = logicalrep_workers_find(subid, false, true);
    2135   [ +  +  +  +  :          14 :         foreach(lc, subworkers)
                   +  + ]
    2136                 :             :         {
    2137                 :           1 :                 LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
    2138                 :             : 
    2139                 :           1 :                 logicalrep_worker_stop(w->type, w->subid, w->relid);
    2140                 :           1 :         }
    2141                 :          13 :         list_free(subworkers);
    2142                 :             : 
    2143                 :             :         /*
    2144                 :             :          * Remove the no-longer-useful entry in the launcher's table of apply
    2145                 :             :          * worker start times.
    2146                 :             :          *
    2147                 :             :          * If this transaction rolls back, the launcher might restart a failed
    2148                 :             :          * apply worker before wal_retrieve_retry_interval milliseconds have
    2149                 :             :          * elapsed, but that's pretty harmless.
    2150                 :             :          */
    2151                 :          13 :         ApplyLauncherForgetWorkerStartTime(subid);
    2152                 :             : 
    2153                 :             :         /*
    2154                 :             :          * Cleanup of tablesync replication origins.
    2155                 :             :          *
    2156                 :             :          * Any READY-state relations would already have dealt with clean-ups.
    2157                 :             :          *
    2158                 :             :          * Note that the state can't change because we have already stopped both
    2159                 :             :          * the apply and tablesync workers and they can't restart because of
    2160                 :             :          * exclusive lock on the subscription.
    2161                 :             :          */
    2162                 :          13 :         rstates = GetSubscriptionRelations(subid, true, false, true);
    2163   [ -  +  #  #  :          13 :         foreach(lc, rstates)
                   -  + ]
    2164                 :             :         {
    2165                 :           0 :                 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2166                 :           0 :                 Oid                     relid = rstate->relid;
    2167                 :             : 
    2168                 :             :                 /* Only cleanup resources of tablesync workers */
    2169         [ #  # ]:           0 :                 if (!OidIsValid(relid))
    2170                 :           0 :                         continue;
    2171                 :             : 
    2172                 :             :                 /*
    2173                 :             :                  * Drop the tablesync's origin tracking if exists.
    2174                 :             :                  *
    2175                 :             :                  * It is possible that the origin is not yet created for tablesync
    2176                 :             :                  * worker so passing missing_ok = true. This can happen for the states
    2177                 :             :                  * before SUBREL_STATE_DATASYNC.
    2178                 :             :                  */
    2179                 :           0 :                 ReplicationOriginNameForLogicalRep(subid, relid, originname,
    2180                 :             :                                                                                    sizeof(originname));
    2181                 :           0 :                 replorigin_drop_by_name(originname, true, false);
    2182         [ #  # ]:           0 :         }
    2183                 :             : 
    2184                 :             :         /* Clean up dependencies */
    2185                 :          13 :         deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
    2186                 :             : 
    2187                 :             :         /* Remove any associated relation synchronization states. */
    2188                 :          13 :         RemoveSubscriptionRel(subid, InvalidOid);
    2189                 :             : 
    2190                 :             :         /* Remove the origin tracking if exists. */
    2191                 :          13 :         ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    2192                 :          13 :         replorigin_drop_by_name(originname, true, false);
    2193                 :             : 
    2194                 :             :         /*
    2195                 :             :          * Tell the cumulative stats system that the subscription is getting
    2196                 :             :          * dropped.
    2197                 :             :          */
    2198                 :          13 :         pgstat_drop_subscription(subid);
    2199                 :             : 
    2200                 :             :         /*
    2201                 :             :          * If there is no slot associated with the subscription, we can finish
    2202                 :             :          * here.
    2203                 :             :          */
    2204   [ +  -  -  + ]:          13 :         if (!slotname && rstates == NIL)
    2205                 :             :         {
    2206                 :          13 :                 table_close(rel, NoLock);
    2207                 :          13 :                 return;
    2208                 :             :         }
    2209                 :             : 
    2210                 :             :         /*
    2211                 :             :          * Try to acquire the connection necessary for dropping slots.
    2212                 :             :          *
    2213                 :             :          * Note: If the slotname is NONE/NULL then we allow the command to finish
    2214                 :             :          * and users need to manually cleanup the apply and tablesync worker slots
    2215                 :             :          * later.
    2216                 :             :          *
    2217                 :             :          * This has to be at the end because otherwise if there is an error while
    2218                 :             :          * doing the database operations we won't be able to rollback dropped
    2219                 :             :          * slot.
    2220                 :             :          */
    2221                 :           0 :         load_file("libpqwalreceiver", false);
    2222                 :             : 
    2223                 :           0 :         wrconn = walrcv_connect(conninfo, true, true, must_use_password,
    2224                 :             :                                                         subname, &err);
    2225         [ #  # ]:           0 :         if (wrconn == NULL)
    2226                 :             :         {
    2227         [ #  # ]:           0 :                 if (!slotname)
    2228                 :             :                 {
    2229                 :             :                         /* be tidy */
    2230                 :           0 :                         list_free(rstates);
    2231                 :           0 :                         table_close(rel, NoLock);
    2232                 :           0 :                         return;
    2233                 :             :                 }
    2234                 :             :                 else
    2235                 :             :                 {
    2236                 :           0 :                         ReportSlotConnectionError(rstates, subid, slotname, err);
    2237                 :             :                 }
    2238                 :           0 :         }
    2239                 :             : 
    2240         [ #  # ]:           0 :         PG_TRY();
    2241                 :             :         {
    2242   [ #  #  #  #  :           0 :                 foreach(lc, rstates)
                   #  # ]
    2243                 :             :                 {
    2244                 :           0 :                         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2245                 :           0 :                         Oid                     relid = rstate->relid;
    2246                 :             : 
    2247                 :             :                         /* Only cleanup resources of tablesync workers */
    2248         [ #  # ]:           0 :                         if (!OidIsValid(relid))
    2249                 :           0 :                                 continue;
    2250                 :             : 
    2251                 :             :                         /*
    2252                 :             :                          * Drop the tablesync slots associated with removed tables.
    2253                 :             :                          *
    2254                 :             :                          * For SYNCDONE/READY states, the tablesync slot is known to have
    2255                 :             :                          * already been dropped by the tablesync worker.
    2256                 :             :                          *
    2257                 :             :                          * For other states, there is no certainty, maybe the slot does
    2258                 :             :                          * not exist yet. Also, if we fail after removing some of the
    2259                 :             :                          * slots, next time, it will again try to drop already dropped
    2260                 :             :                          * slots and fail. For these reasons, we allow missing_ok = true
    2261                 :             :                          * for the drop.
    2262                 :             :                          */
    2263         [ #  # ]:           0 :                         if (rstate->state != SUBREL_STATE_SYNCDONE)
    2264                 :             :                         {
    2265                 :           0 :                                 char            syncslotname[NAMEDATALEN] = {0};
    2266                 :             : 
    2267                 :           0 :                                 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    2268                 :             :                                                                                                 sizeof(syncslotname));
    2269                 :           0 :                                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    2270                 :           0 :                         }
    2271         [ #  # ]:           0 :                 }
    2272                 :             : 
    2273                 :           0 :                 list_free(rstates);
    2274                 :             : 
    2275                 :             :                 /*
    2276                 :             :                  * If there is a slot associated with the subscription, then drop the
    2277                 :             :                  * replication slot at the publisher.
    2278                 :             :                  */
    2279         [ #  # ]:           0 :                 if (slotname)
    2280                 :           0 :                         ReplicationSlotDropAtPubNode(wrconn, slotname, false);
    2281                 :             :         }
    2282                 :           0 :         PG_FINALLY();
    2283                 :             :         {
    2284                 :           0 :                 walrcv_disconnect(wrconn);
    2285                 :             :         }
    2286         [ #  # ]:           0 :         PG_END_TRY();
    2287                 :             : 
    2288                 :           0 :         table_close(rel, NoLock);
    2289                 :          14 : }
    2290                 :             : 
    2291                 :             : /*
    2292                 :             :  * Drop the replication slot at the publisher node using the replication
    2293                 :             :  * connection.
    2294                 :             :  *
    2295                 :             :  * missing_ok - if true then only issue a LOG message if the slot doesn't
    2296                 :             :  * exist.
    2297                 :             :  */
    2298                 :             : void
    2299                 :           0 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
    2300                 :             : {
    2301                 :           0 :         StringInfoData cmd;
    2302                 :             : 
    2303         [ #  # ]:           0 :         Assert(wrconn);
    2304                 :             : 
    2305                 :           0 :         load_file("libpqwalreceiver", false);
    2306                 :             : 
    2307                 :           0 :         initStringInfo(&cmd);
    2308                 :           0 :         appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
    2309                 :             : 
    2310         [ #  # ]:           0 :         PG_TRY();
    2311                 :             :         {
    2312                 :           0 :                 WalRcvExecResult *res;
    2313                 :             : 
    2314                 :           0 :                 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    2315                 :             : 
    2316         [ #  # ]:           0 :                 if (res->status == WALRCV_OK_COMMAND)
    2317                 :             :                 {
    2318                 :             :                         /* NOTICE. Success. */
    2319   [ #  #  #  # ]:           0 :                         ereport(NOTICE,
    2320                 :             :                                         (errmsg("dropped replication slot \"%s\" on publisher",
    2321                 :             :                                                         slotname)));
    2322                 :           0 :                 }
    2323         [ #  # ]:           0 :                 else if (res->status == WALRCV_ERROR &&
    2324                 :           0 :                                  missing_ok &&
    2325                 :           0 :                                  res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
    2326                 :             :                 {
    2327                 :             :                         /* LOG. Error, but missing_ok = true. */
    2328   [ #  #  #  # ]:           0 :                         ereport(LOG,
    2329                 :             :                                         (errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2330                 :             :                                                         slotname, res->err)));
    2331                 :           0 :                 }
    2332                 :             :                 else
    2333                 :             :                 {
    2334                 :             :                         /* ERROR. */
    2335   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    2336                 :             :                                         (errcode(ERRCODE_CONNECTION_FAILURE),
    2337                 :             :                                          errmsg("could not drop replication slot \"%s\" on publisher: %s",
    2338                 :             :                                                         slotname, res->err)));
    2339                 :             :                 }
    2340                 :             : 
    2341                 :           0 :                 walrcv_clear_result(res);
    2342                 :           0 :         }
    2343                 :           0 :         PG_FINALLY();
    2344                 :             :         {
    2345                 :           0 :                 pfree(cmd.data);
    2346                 :             :         }
    2347         [ #  # ]:           0 :         PG_END_TRY();
    2348                 :           0 : }
    2349                 :             : 
    2350                 :             : /*
    2351                 :             :  * Internal workhorse for changing a subscription owner
    2352                 :             :  */
    2353                 :             : static void
    2354                 :           0 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2355                 :             : {
    2356                 :           0 :         Form_pg_subscription form;
    2357                 :           0 :         AclResult       aclresult;
    2358                 :             : 
    2359                 :           0 :         form = (Form_pg_subscription) GETSTRUCT(tup);
    2360                 :             : 
    2361         [ #  # ]:           0 :         if (form->subowner == newOwnerId)
    2362                 :           0 :                 return;
    2363                 :             : 
    2364         [ #  # ]:           0 :         if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
    2365                 :           0 :                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    2366                 :           0 :                                            NameStr(form->subname));
    2367                 :             : 
    2368                 :             :         /*
    2369                 :             :          * Don't allow non-superuser modification of a subscription with
    2370                 :             :          * password_required=false.
    2371                 :             :          */
    2372   [ #  #  #  # ]:           0 :         if (!form->subpasswordrequired && !superuser())
    2373   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2374                 :             :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2375                 :             :                                  errmsg("password_required=false is superuser-only"),
    2376                 :             :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    2377                 :             : 
    2378                 :             :         /* Must be able to become new owner */
    2379                 :           0 :         check_can_set_role(GetUserId(), newOwnerId);
    2380                 :             : 
    2381                 :             :         /*
    2382                 :             :          * current owner must have CREATE on database
    2383                 :             :          *
    2384                 :             :          * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
    2385                 :             :          * other object types behave differently (e.g. you can't give a table to a
    2386                 :             :          * user who lacks CREATE privileges on a schema).
    2387                 :             :          */
    2388                 :           0 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
    2389                 :           0 :                                                                 GetUserId(), ACL_CREATE);
    2390         [ #  # ]:           0 :         if (aclresult != ACLCHECK_OK)
    2391                 :           0 :                 aclcheck_error(aclresult, OBJECT_DATABASE,
    2392                 :           0 :                                            get_database_name(MyDatabaseId));
    2393                 :             : 
    2394                 :           0 :         form->subowner = newOwnerId;
    2395                 :           0 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    2396                 :             : 
    2397                 :             :         /* Update owner dependency reference */
    2398                 :           0 :         changeDependencyOnOwner(SubscriptionRelationId,
    2399                 :           0 :                                                         form->oid,
    2400                 :           0 :                                                         newOwnerId);
    2401                 :             : 
    2402         [ #  # ]:           0 :         InvokeObjectPostAlterHook(SubscriptionRelationId,
    2403                 :             :                                                           form->oid, 0);
    2404                 :             : 
    2405                 :             :         /* Wake up related background processes to handle this change quickly. */
    2406                 :           0 :         ApplyLauncherWakeupAtCommit();
    2407                 :           0 :         LogicalRepWorkersWakeupAtCommit(form->oid);
    2408         [ #  # ]:           0 : }
    2409                 :             : 
    2410                 :             : /*
    2411                 :             :  * Change subscription owner -- by name
    2412                 :             :  */
    2413                 :             : ObjectAddress
    2414                 :           0 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    2415                 :             : {
    2416                 :           0 :         Oid                     subid;
    2417                 :           0 :         HeapTuple       tup;
    2418                 :           0 :         Relation        rel;
    2419                 :             :         ObjectAddress address;
    2420                 :           0 :         Form_pg_subscription form;
    2421                 :             : 
    2422                 :           0 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2423                 :             : 
    2424                 :           0 :         tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
    2425                 :             :                                                           CStringGetDatum(name));
    2426                 :             : 
    2427         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
    2428   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2429                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2430                 :             :                                  errmsg("subscription \"%s\" does not exist", name)));
    2431                 :             : 
    2432                 :           0 :         form = (Form_pg_subscription) GETSTRUCT(tup);
    2433                 :           0 :         subid = form->oid;
    2434                 :             : 
    2435                 :           0 :         AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2436                 :             : 
    2437                 :           0 :         ObjectAddressSet(address, SubscriptionRelationId, subid);
    2438                 :             : 
    2439                 :           0 :         heap_freetuple(tup);
    2440                 :             : 
    2441                 :           0 :         table_close(rel, RowExclusiveLock);
    2442                 :             : 
    2443                 :             :         return address;
    2444                 :           0 : }
    2445                 :             : 
    2446                 :             : /*
    2447                 :             :  * Change subscription owner -- by OID
    2448                 :             :  */
    2449                 :             : void
    2450                 :           0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    2451                 :             : {
    2452                 :           0 :         HeapTuple       tup;
    2453                 :           0 :         Relation        rel;
    2454                 :             : 
    2455                 :           0 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    2456                 :             : 
    2457                 :           0 :         tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    2458                 :             : 
    2459         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
    2460   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2461                 :             :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2462                 :             :                                  errmsg("subscription with OID %u does not exist", subid)));
    2463                 :             : 
    2464                 :           0 :         AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    2465                 :             : 
    2466                 :           0 :         heap_freetuple(tup);
    2467                 :             : 
    2468                 :           0 :         table_close(rel, RowExclusiveLock);
    2469                 :           0 : }
    2470                 :             : 
    2471                 :             : /*
    2472                 :             :  * Check and log a warning if the publisher has subscribed to the same table,
    2473                 :             :  * its partition ancestors (if it's a partition), or its partition children (if
    2474                 :             :  * it's a partitioned table), from some other publishers. This check is
    2475                 :             :  * required in the following scenarios:
    2476                 :             :  *
    2477                 :             :  * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2478                 :             :  *    statements with "copy_data = true" and "origin = none":
    2479                 :             :  *    - Warn the user that data with an origin might have been copied.
    2480                 :             :  *    - This check is skipped for tables already added, as incremental sync via
    2481                 :             :  *      WAL allows origin tracking. The list of such tables is in
    2482                 :             :  *      subrel_local_oids.
    2483                 :             :  *
    2484                 :             :  * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    2485                 :             :  *    statements with "retain_dead_tuples = true" and "origin = any", and for
    2486                 :             :  *    ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
    2487                 :             :  *    or when the publisher's status changes (e.g., due to a connection string
    2488                 :             :  *    update):
    2489                 :             :  *    - Warn the user that only conflict detection info for local changes on
    2490                 :             :  *      the publisher is retained. Data from other origins may lack sufficient
    2491                 :             :  *      details for reliable conflict detection.
    2492                 :             :  *    - See comments atop worker.c for more details.
    2493                 :             :  */
    2494                 :             : static void
    2495                 :           0 : check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
    2496                 :             :                                                                  bool copydata, bool retain_dead_tuples,
    2497                 :             :                                                                  char *origin, Oid *subrel_local_oids,
    2498                 :             :                                                                  int subrel_count, char *subname)
    2499                 :             : {
    2500                 :           0 :         WalRcvExecResult *res;
    2501                 :           0 :         StringInfoData cmd;
    2502                 :           0 :         TupleTableSlot *slot;
    2503                 :           0 :         Oid                     tableRow[1] = {TEXTOID};
    2504                 :           0 :         List       *publist = NIL;
    2505                 :           0 :         int                     i;
    2506                 :           0 :         bool            check_rdt;
    2507                 :           0 :         bool            check_table_sync;
    2508         [ #  # ]:           0 :         bool            origin_none = origin &&
    2509                 :           0 :                 pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
    2510                 :             : 
    2511                 :             :         /*
    2512                 :             :          * Enable retain_dead_tuples checks only when origin is set to 'any',
    2513                 :             :          * since with origin='none' only local changes are replicated to the
    2514                 :             :          * subscriber.
    2515                 :             :          */
    2516         [ #  # ]:           0 :         check_rdt = retain_dead_tuples && !origin_none;
    2517                 :             : 
    2518                 :             :         /*
    2519                 :             :          * Enable table synchronization checks only when origin is 'none', to
    2520                 :             :          * ensure that data from other origins is not inadvertently copied.
    2521                 :             :          */
    2522         [ #  # ]:           0 :         check_table_sync = copydata && origin_none;
    2523                 :             : 
    2524                 :             :         /* retain_dead_tuples and table sync checks occur separately */
    2525   [ #  #  #  # ]:           0 :         Assert(!(check_rdt && check_table_sync));
    2526                 :             : 
    2527                 :             :         /* Return if no checks are required */
    2528   [ #  #  #  # ]:           0 :         if (!check_rdt && !check_table_sync)
    2529                 :           0 :                 return;
    2530                 :             : 
    2531                 :           0 :         initStringInfo(&cmd);
    2532                 :           0 :         appendStringInfoString(&cmd,
    2533                 :             :                                                    "SELECT DISTINCT P.pubname AS pubname\n"
    2534                 :             :                                                    "FROM pg_publication P,\n"
    2535                 :             :                                                    "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
    2536                 :             :                                                    "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
    2537                 :             :                                                    "     GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
    2538                 :             :                                                    "                   SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
    2539                 :             :                                                    "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2540                 :             :                                                    "WHERE C.oid = GPT.relid AND P.pubname IN (");
    2541                 :           0 :         GetPublicationsStr(publications, &cmd, true);
    2542                 :           0 :         appendStringInfoString(&cmd, ")\n");
    2543                 :             : 
    2544                 :             :         /*
    2545                 :             :          * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2546                 :             :          * subrel_local_oids contains the list of relation oids that are already
    2547                 :             :          * present on the subscriber. This check should be skipped for these
    2548                 :             :          * tables if checking for table sync scenario. However, when handling the
    2549                 :             :          * retain_dead_tuples scenario, ensure all tables are checked, as some
    2550                 :             :          * existing tables may now include changes from other origins due to newly
    2551                 :             :          * created subscriptions on the publisher.
    2552                 :             :          */
    2553         [ #  # ]:           0 :         if (check_table_sync)
    2554                 :             :         {
    2555         [ #  # ]:           0 :                 for (i = 0; i < subrel_count; i++)
    2556                 :             :                 {
    2557                 :           0 :                         Oid                     relid = subrel_local_oids[i];
    2558                 :           0 :                         char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2559                 :           0 :                         char       *tablename = get_rel_name(relid);
    2560                 :             : 
    2561                 :           0 :                         appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2562                 :           0 :                                                          schemaname, tablename);
    2563                 :           0 :                 }
    2564                 :           0 :         }
    2565                 :             : 
    2566                 :           0 :         res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2567                 :           0 :         pfree(cmd.data);
    2568                 :             : 
    2569         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
    2570   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2571                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2572                 :             :                                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2573                 :             :                                                 res->err)));
    2574                 :             : 
    2575                 :             :         /* Process publications. */
    2576                 :           0 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2577         [ #  # ]:           0 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2578                 :             :         {
    2579                 :           0 :                 char       *pubname;
    2580                 :           0 :                 bool            isnull;
    2581                 :             : 
    2582                 :           0 :                 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2583         [ #  # ]:           0 :                 Assert(!isnull);
    2584                 :             : 
    2585                 :           0 :                 ExecClearTuple(slot);
    2586                 :           0 :                 publist = list_append_unique(publist, makeString(pubname));
    2587                 :           0 :         }
    2588                 :             : 
    2589                 :             :         /*
    2590                 :             :          * Log a warning if the publisher has subscribed to the same table from
    2591                 :             :          * some other publisher. We cannot know the origin of data during the
    2592                 :             :          * initial sync. Data origins can be found only from the WAL by looking at
    2593                 :             :          * the origin id.
    2594                 :             :          *
    2595                 :             :          * XXX: For simplicity, we don't check whether the table has any data or
    2596                 :             :          * not. If the table doesn't have any data then we don't need to
    2597                 :             :          * distinguish between data having origin and data not having origin so we
    2598                 :             :          * can avoid logging a warning for table sync scenario.
    2599                 :             :          */
    2600         [ #  # ]:           0 :         if (publist)
    2601                 :             :         {
    2602                 :           0 :                 StringInfoData pubnames;
    2603                 :             : 
    2604                 :             :                 /* Prepare the list of publication(s) for warning message. */
    2605                 :           0 :                 initStringInfo(&pubnames);
    2606                 :           0 :                 GetPublicationsStr(publist, &pubnames, false);
    2607                 :             : 
    2608         [ #  # ]:           0 :                 if (check_table_sync)
    2609   [ #  #  #  # ]:           0 :                         ereport(WARNING,
    2610                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2611                 :             :                                         errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2612                 :             :                                                    subname),
    2613                 :             :                                         errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2614                 :             :                                                                          "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2615                 :             :                                                                          list_length(publist), pubnames.data),
    2616                 :             :                                         errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
    2617                 :             :                 else
    2618   [ #  #  #  # ]:           0 :                         ereport(WARNING,
    2619                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2620                 :             :                                         errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
    2621                 :             :                                                    subname),
    2622                 :             :                                         errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
    2623                 :             :                                                                          "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
    2624                 :             :                                                                          list_length(publist), pubnames.data),
    2625                 :             :                                         errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
    2626                 :           0 :         }
    2627                 :             : 
    2628                 :           0 :         ExecDropSingleTupleTableSlot(slot);
    2629                 :             : 
    2630                 :           0 :         walrcv_clear_result(res);
    2631         [ #  # ]:           0 : }
    2632                 :             : 
    2633                 :             : /*
    2634                 :             :  * This function is similar to check_publications_origin_tables and serves
    2635                 :             :  * same purpose for sequences.
    2636                 :             :  */
    2637                 :             : static void
    2638                 :           0 : check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
    2639                 :             :                                                                         bool copydata, char *origin,
    2640                 :             :                                                                         Oid *subrel_local_oids, int subrel_count,
    2641                 :             :                                                                         char *subname)
    2642                 :             : {
    2643                 :           0 :         WalRcvExecResult *res;
    2644                 :           0 :         StringInfoData cmd;
    2645                 :           0 :         TupleTableSlot *slot;
    2646                 :           0 :         Oid                     tableRow[1] = {TEXTOID};
    2647                 :           0 :         List       *publist = NIL;
    2648                 :             : 
    2649                 :             :         /*
    2650                 :             :          * Enable sequence synchronization checks only when origin is 'none' , to
    2651                 :             :          * ensure that sequence data from other origins is not inadvertently
    2652                 :             :          * copied. This check is necessary if the publisher is running PG19 or
    2653                 :             :          * later, where logical replication sequence synchronization is supported.
    2654                 :             :          */
    2655   [ #  #  #  #  :           0 :         if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
                   #  # ]
    2656                 :           0 :                 walrcv_server_version(wrconn) < 190000)
    2657                 :           0 :                 return;
    2658                 :             : 
    2659                 :           0 :         initStringInfo(&cmd);
    2660                 :           0 :         appendStringInfoString(&cmd,
    2661                 :             :                                                    "SELECT DISTINCT P.pubname AS pubname\n"
    2662                 :             :                                                    "FROM pg_publication P,\n"
    2663                 :             :                                                    "     LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
    2664                 :             :                                                    "     JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
    2665                 :             :                                                    "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    2666                 :             :                                                    "WHERE C.oid = GPS.relid AND P.pubname IN (");
    2667                 :             : 
    2668                 :           0 :         GetPublicationsStr(publications, &cmd, true);
    2669                 :           0 :         appendStringInfoString(&cmd, ")\n");
    2670                 :             : 
    2671                 :             :         /*
    2672                 :             :          * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
    2673                 :             :          * subrel_local_oids contains the list of relations that are already
    2674                 :             :          * present on the subscriber. This check should be skipped as these will
    2675                 :             :          * not be re-synced.
    2676                 :             :          */
    2677         [ #  # ]:           0 :         for (int i = 0; i < subrel_count; i++)
    2678                 :             :         {
    2679                 :           0 :                 Oid                     relid = subrel_local_oids[i];
    2680                 :           0 :                 char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    2681                 :           0 :                 char       *seqname = get_rel_name(relid);
    2682                 :             : 
    2683                 :           0 :                 appendStringInfo(&cmd,
    2684                 :             :                                                  "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    2685                 :           0 :                                                  schemaname, seqname);
    2686                 :           0 :         }
    2687                 :             : 
    2688                 :           0 :         res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    2689                 :           0 :         pfree(cmd.data);
    2690                 :             : 
    2691         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
    2692   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2693                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2694                 :             :                                  errmsg("could not receive list of replicated sequences from the publisher: %s",
    2695                 :             :                                                 res->err)));
    2696                 :             : 
    2697                 :             :         /* Process publications. */
    2698                 :           0 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2699         [ #  # ]:           0 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2700                 :             :         {
    2701                 :           0 :                 char       *pubname;
    2702                 :           0 :                 bool            isnull;
    2703                 :             : 
    2704                 :           0 :                 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2705         [ #  # ]:           0 :                 Assert(!isnull);
    2706                 :             : 
    2707                 :           0 :                 ExecClearTuple(slot);
    2708                 :           0 :                 publist = list_append_unique(publist, makeString(pubname));
    2709                 :           0 :         }
    2710                 :             : 
    2711                 :             :         /*
    2712                 :             :          * Log a warning if the publisher has subscribed to the same sequence from
    2713                 :             :          * some other publisher. We cannot know the origin of sequences data
    2714                 :             :          * during the initial sync.
    2715                 :             :          */
    2716         [ #  # ]:           0 :         if (publist)
    2717                 :             :         {
    2718                 :           0 :                 StringInfoData pubnames;
    2719                 :             : 
    2720                 :             :                 /* Prepare the list of publication(s) for warning message. */
    2721                 :           0 :                 initStringInfo(&pubnames);
    2722                 :           0 :                 GetPublicationsStr(publist, &pubnames, false);
    2723                 :             : 
    2724   [ #  #  #  # ]:           0 :                 ereport(WARNING,
    2725                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2726                 :             :                                 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2727                 :             :                                            subname),
    2728                 :             :                                 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
    2729                 :             :                                                                  "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
    2730                 :             :                                                                  list_length(publist), pubnames.data),
    2731                 :             :                                 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
    2732                 :           0 :         }
    2733                 :             : 
    2734                 :           0 :         ExecDropSingleTupleTableSlot(slot);
    2735                 :             : 
    2736                 :           0 :         walrcv_clear_result(res);
    2737         [ #  # ]:           0 : }
    2738                 :             : 
    2739                 :             : /*
    2740                 :             :  * Determine whether the retain_dead_tuples can be enabled based on the
    2741                 :             :  * publisher's status.
    2742                 :             :  *
    2743                 :             :  * This option is disallowed if the publisher is running a version earlier
    2744                 :             :  * than the PG19, or if the publisher is in recovery (i.e., it is a standby
    2745                 :             :  * server).
    2746                 :             :  *
    2747                 :             :  * See comments atop worker.c for a detailed explanation.
    2748                 :             :  */
    2749                 :             : static void
    2750                 :           0 : check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
    2751                 :             : {
    2752                 :           0 :         WalRcvExecResult *res;
    2753                 :           0 :         Oid                     RecoveryRow[1] = {BOOLOID};
    2754                 :           0 :         TupleTableSlot *slot;
    2755                 :           0 :         bool            isnull;
    2756                 :           0 :         bool            remote_in_recovery;
    2757                 :             : 
    2758         [ #  # ]:           0 :         if (walrcv_server_version(wrconn) < 190000)
    2759   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2760                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2761                 :             :                                 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
    2762                 :             : 
    2763                 :           0 :         res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
    2764                 :             : 
    2765         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
    2766   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2767                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2768                 :             :                                  errmsg("could not obtain recovery progress from the publisher: %s",
    2769                 :             :                                                 res->err)));
    2770                 :             : 
    2771                 :           0 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2772         [ #  # ]:           0 :         if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2773   [ #  #  #  # ]:           0 :                 elog(ERROR, "failed to fetch tuple for the recovery progress");
    2774                 :             : 
    2775                 :           0 :         remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
    2776                 :             : 
    2777         [ #  # ]:           0 :         if (remote_in_recovery)
    2778   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2779                 :             :                                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2780                 :             :                                 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
    2781                 :             : 
    2782                 :           0 :         ExecDropSingleTupleTableSlot(slot);
    2783                 :             : 
    2784                 :           0 :         walrcv_clear_result(res);
    2785                 :           0 : }
    2786                 :             : 
    2787                 :             : /*
    2788                 :             :  * Check if the subscriber's configuration is adequate to enable the
    2789                 :             :  * retain_dead_tuples option.
    2790                 :             :  *
    2791                 :             :  * Issue an ERROR if the wal_level does not support the use of replication
    2792                 :             :  * slots when check_guc is set to true.
    2793                 :             :  *
    2794                 :             :  * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
    2795                 :             :  * set to true. This is only to highlight the importance of enabling
    2796                 :             :  * track_commit_timestamp instead of catching all the misconfigurations, as
    2797                 :             :  * this setting can be adjusted after subscription creation. Without it, the
    2798                 :             :  * apply worker will simply skip conflict detection.
    2799                 :             :  *
    2800                 :             :  * Issue a WARNING or NOTICE if the subscription is disabled and the retention
    2801                 :             :  * is active. Do not raise an ERROR since users can only modify
    2802                 :             :  * retain_dead_tuples for disabled subscriptions. And as long as the
    2803                 :             :  * subscription is enabled promptly, it will not pose issues.
    2804                 :             :  *
    2805                 :             :  * Issue a NOTICE to inform users that max_retention_duration is
    2806                 :             :  * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
    2807                 :             :  * is not issued because setting max_retention_duration causes no harm,
    2808                 :             :  * even when it is ineffective.
    2809                 :             :  */
    2810                 :             : void
    2811                 :          23 : CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
    2812                 :             :                                                    int elevel_for_sub_disabled,
    2813                 :             :                                                    bool retain_dead_tuples, bool retention_active,
    2814                 :             :                                                    bool max_retention_set)
    2815                 :             : {
    2816   [ +  +  +  - ]:          23 :         Assert(elevel_for_sub_disabled == NOTICE ||
    2817                 :             :                    elevel_for_sub_disabled == WARNING);
    2818                 :             : 
    2819         [ -  + ]:          23 :         if (retain_dead_tuples)
    2820                 :             :         {
    2821   [ #  #  #  # ]:           0 :                 if (check_guc && wal_level < WAL_LEVEL_REPLICA)
    2822   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    2823                 :             :                                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2824                 :             :                                         errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
    2825                 :             :                                         errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
    2826                 :             : 
    2827   [ #  #  #  # ]:           0 :                 if (check_guc && !track_commit_timestamp)
    2828   [ #  #  #  # ]:           0 :                         ereport(WARNING,
    2829                 :             :                                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2830                 :             :                                         errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
    2831                 :             :                                         errhint("Consider setting \"%s\" to true.",
    2832                 :             :                                                         "track_commit_timestamp"));
    2833                 :             : 
    2834   [ #  #  #  # ]:           0 :                 if (sub_disabled && retention_active)
    2835   [ #  #  #  #  :           0 :                         ereport(elevel_for_sub_disabled,
          #  #  #  #  #  
                #  #  # ]
    2836                 :             :                                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2837                 :             :                                         errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
    2838                 :             :                                         (elevel_for_sub_disabled > NOTICE)
    2839                 :             :                                         ? errhint("Consider setting %s to false.",
    2840                 :             :                                                           "retain_dead_tuples") : 0);
    2841                 :           0 :         }
    2842         [ +  + ]:          23 :         else if (max_retention_set)
    2843                 :             :         {
    2844   [ -  +  +  - ]:           1 :                 ereport(NOTICE,
    2845                 :             :                                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2846                 :             :                                 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
    2847                 :           1 :         }
    2848                 :          23 : }
    2849                 :             : 
    2850                 :             : /*
    2851                 :             :  * Return true iff 'rv' is a member of the list.
    2852                 :             :  */
    2853                 :             : static bool
    2854                 :           0 : list_member_rangevar(const List *list, RangeVar *rv)
    2855                 :             : {
    2856   [ #  #  #  #  :           0 :         foreach_ptr(PublicationRelKind, relinfo, list)
          #  #  #  #  #  
             #  #  #  # ]
    2857                 :             :         {
    2858         [ #  # ]:           0 :                 if (equal(relinfo->rv, rv))
    2859                 :           0 :                         return true;
    2860                 :           0 :         }
    2861                 :             : 
    2862                 :           0 :         return false;
    2863                 :           0 : }
    2864                 :             : 
    2865                 :             : /*
    2866                 :             :  * Get the list of tables and sequences which belong to specified publications
    2867                 :             :  * on the publisher connection.
    2868                 :             :  *
    2869                 :             :  * Note that we don't support the case where the column list is different for
    2870                 :             :  * the same table in different publications to avoid sending unwanted column
    2871                 :             :  * information for some of the rows. This can happen when both the column
    2872                 :             :  * list and row filter are specified for different publications.
    2873                 :             :  */
    2874                 :             : static List *
    2875                 :           0 : fetch_relation_list(WalReceiverConn *wrconn, List *publications)
    2876                 :             : {
    2877                 :           0 :         WalRcvExecResult *res;
    2878                 :           0 :         StringInfoData cmd;
    2879                 :           0 :         TupleTableSlot *slot;
    2880                 :           0 :         Oid                     tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
    2881                 :           0 :         List       *relationlist = NIL;
    2882                 :           0 :         int                     server_version = walrcv_server_version(wrconn);
    2883                 :           0 :         bool            check_columnlist = (server_version >= 150000);
    2884                 :           0 :         int                     column_count = check_columnlist ? 4 : 3;
    2885                 :           0 :         StringInfoData pub_names;
    2886                 :             : 
    2887                 :           0 :         initStringInfo(&cmd);
    2888                 :           0 :         initStringInfo(&pub_names);
    2889                 :             : 
    2890                 :             :         /* Build the pub_names comma-separated string. */
    2891                 :           0 :         GetPublicationsStr(publications, &pub_names, true);
    2892                 :             : 
    2893                 :             :         /* Get the list of relations from the publisher */
    2894         [ #  # ]:           0 :         if (server_version >= 160000)
    2895                 :             :         {
    2896                 :           0 :                 tableRow[3] = INT2VECTOROID;
    2897                 :             : 
    2898                 :             :                 /*
    2899                 :             :                  * From version 16, we allowed passing multiple publications to the
    2900                 :             :                  * function pg_get_publication_tables. This helped to filter out the
    2901                 :             :                  * partition table whose ancestor is also published in this
    2902                 :             :                  * publication array.
    2903                 :             :                  *
    2904                 :             :                  * Join pg_get_publication_tables with pg_publication to exclude
    2905                 :             :                  * non-existing publications.
    2906                 :             :                  *
    2907                 :             :                  * Note that attrs are always stored in sorted order so we don't need
    2908                 :             :                  * to worry if different publications have specified them in a
    2909                 :             :                  * different order. See pub_collist_validate.
    2910                 :             :                  */
    2911                 :           0 :                 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
    2912                 :             :                                                  "   FROM pg_class c\n"
    2913                 :             :                                                  "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
    2914                 :             :                                                  "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
    2915                 :             :                                                  "                FROM pg_publication\n"
    2916                 :             :                                                  "                WHERE pubname IN ( %s )) AS gpt\n"
    2917                 :             :                                                  "             ON gpt.relid = c.oid\n",
    2918                 :           0 :                                                  pub_names.data);
    2919                 :             : 
    2920                 :             :                 /* From version 19, inclusion of sequences in the target is supported */
    2921         [ #  # ]:           0 :                 if (server_version >= 190000)
    2922                 :           0 :                         appendStringInfo(&cmd,
    2923                 :             :                                                          "UNION ALL\n"
    2924                 :             :                                                          "  SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
    2925                 :             :                                                          "  FROM pg_catalog.pg_publication_sequences s\n"
    2926                 :             :                                                          "  WHERE s.pubname IN ( %s )",
    2927                 :           0 :                                                          pub_names.data);
    2928                 :           0 :         }
    2929                 :             :         else
    2930                 :             :         {
    2931                 :           0 :                 tableRow[3] = NAMEARRAYOID;
    2932                 :           0 :                 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
    2933                 :             : 
    2934                 :             :                 /* Get column lists for each relation if the publisher supports it */
    2935         [ #  # ]:           0 :                 if (check_columnlist)
    2936                 :           0 :                         appendStringInfoString(&cmd, ", t.attnames\n");
    2937                 :             : 
    2938                 :           0 :                 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
    2939                 :             :                                                  " WHERE t.pubname IN ( %s )",
    2940                 :           0 :                                                  pub_names.data);
    2941                 :             :         }
    2942                 :             : 
    2943                 :           0 :         pfree(pub_names.data);
    2944                 :             : 
    2945                 :           0 :         res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
    2946                 :           0 :         pfree(cmd.data);
    2947                 :             : 
    2948         [ #  # ]:           0 :         if (res->status != WALRCV_OK_TUPLES)
    2949   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    2950                 :             :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2951                 :             :                                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2952                 :             :                                                 res->err)));
    2953                 :             : 
    2954                 :             :         /* Process tables. */
    2955                 :           0 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2956         [ #  # ]:           0 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2957                 :             :         {
    2958                 :           0 :                 char       *nspname;
    2959                 :           0 :                 char       *relname;
    2960                 :           0 :                 bool            isnull;
    2961                 :           0 :                 char            relkind;
    2962                 :           0 :                 PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
    2963                 :             : 
    2964                 :           0 :                 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2965         [ #  # ]:           0 :                 Assert(!isnull);
    2966                 :           0 :                 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    2967         [ #  # ]:           0 :                 Assert(!isnull);
    2968                 :           0 :                 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
    2969         [ #  # ]:           0 :                 Assert(!isnull);
    2970                 :             : 
    2971                 :           0 :                 relinfo->rv = makeRangeVar(nspname, relname, -1);
    2972                 :           0 :                 relinfo->relkind = relkind;
    2973                 :             : 
    2974         [ #  # ]:           0 :                 if (relkind != RELKIND_SEQUENCE &&
    2975   [ #  #  #  # ]:           0 :                         check_columnlist &&
    2976                 :           0 :                         list_member_rangevar(relationlist, relinfo->rv))
    2977   [ #  #  #  # ]:           0 :                         ereport(ERROR,
    2978                 :             :                                         errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2979                 :             :                                         errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    2980                 :             :                                                    nspname, relname));
    2981                 :             :                 else
    2982                 :           0 :                         relationlist = lappend(relationlist, relinfo);
    2983                 :             : 
    2984                 :           0 :                 ExecClearTuple(slot);
    2985                 :           0 :         }
    2986                 :           0 :         ExecDropSingleTupleTableSlot(slot);
    2987                 :             : 
    2988                 :           0 :         walrcv_clear_result(res);
    2989                 :             : 
    2990                 :           0 :         return relationlist;
    2991                 :           0 : }
    2992                 :             : 
    2993                 :             : /*
    2994                 :             :  * This is to report the connection failure while dropping replication slots.
    2995                 :             :  * Here, we report the WARNING for all tablesync slots so that user can drop
    2996                 :             :  * them manually, if required.
    2997                 :             :  */
    2998                 :             : static void
    2999                 :           0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
    3000                 :             : {
    3001                 :           0 :         ListCell   *lc;
    3002                 :             : 
    3003   [ #  #  #  #  :           0 :         foreach(lc, rstates)
                   #  # ]
    3004                 :             :         {
    3005                 :           0 :                 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    3006                 :           0 :                 Oid                     relid = rstate->relid;
    3007                 :             : 
    3008                 :             :                 /* Only cleanup resources of tablesync workers */
    3009         [ #  # ]:           0 :                 if (!OidIsValid(relid))
    3010                 :           0 :                         continue;
    3011                 :             : 
    3012                 :             :                 /*
    3013                 :             :                  * Caller needs to ensure that relstate doesn't change underneath us.
    3014                 :             :                  * See DropSubscription where we get the relstates.
    3015                 :             :                  */
    3016         [ #  # ]:           0 :                 if (rstate->state != SUBREL_STATE_SYNCDONE)
    3017                 :             :                 {
    3018                 :           0 :                         char            syncslotname[NAMEDATALEN] = {0};
    3019                 :             : 
    3020                 :           0 :                         ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    3021                 :             :                                                                                         sizeof(syncslotname));
    3022   [ #  #  #  # ]:           0 :                         elog(WARNING, "could not drop tablesync replication slot \"%s\"",
    3023                 :             :                                  syncslotname);
    3024                 :           0 :                 }
    3025      [ #  #  # ]:           0 :         }
    3026                 :             : 
    3027   [ #  #  #  # ]:           0 :         ereport(ERROR,
    3028                 :             :                         (errcode(ERRCODE_CONNECTION_FAILURE),
    3029                 :             :                          errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
    3030                 :             :                                         slotname, err),
    3031                 :             :         /* translator: %s is an SQL ALTER command */
    3032                 :             :                          errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
    3033                 :             :                                          "ALTER SUBSCRIPTION ... DISABLE",
    3034                 :             :                                          "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
    3035                 :           0 : }
    3036                 :             : 
    3037                 :             : /*
    3038                 :             :  * Check for duplicates in the given list of publications and error out if
    3039                 :             :  * found one.  Add publications to datums as text datums, if datums is not
    3040                 :             :  * NULL.
    3041                 :             :  */
    3042                 :             : static void
    3043                 :           0 : check_duplicates_in_publist(List *publist, Datum *datums)
    3044                 :             : {
    3045                 :           0 :         ListCell   *cell;
    3046                 :           0 :         int                     j = 0;
    3047                 :             : 
    3048   [ #  #  #  #  :           0 :         foreach(cell, publist)
                   #  # ]
    3049                 :             :         {
    3050                 :           0 :                 char       *name = strVal(lfirst(cell));
    3051                 :           0 :                 ListCell   *pcell;
    3052                 :             : 
    3053   [ #  #  #  #  :           0 :                 foreach(pcell, publist)
                   #  # ]
    3054                 :             :                 {
    3055                 :           0 :                         char       *pname = strVal(lfirst(pcell));
    3056                 :             : 
    3057         [ #  # ]:           0 :                         if (pcell == cell)
    3058                 :           0 :                                 break;
    3059                 :             : 
    3060         [ #  # ]:           0 :                         if (strcmp(name, pname) == 0)
    3061   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
    3062                 :             :                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    3063                 :             :                                                  errmsg("publication name \"%s\" used more than once",
    3064                 :             :                                                                 pname)));
    3065         [ #  # ]:           0 :                 }
    3066                 :             : 
    3067         [ #  # ]:           0 :                 if (datums)
    3068                 :           0 :                         datums[j++] = CStringGetTextDatum(name);
    3069                 :           0 :         }
    3070                 :           0 : }
    3071                 :             : 
    3072                 :             : /*
    3073                 :             :  * Merge current subscription's publications and user-specified publications
    3074                 :             :  * from ADD/DROP PUBLICATIONS.
    3075                 :             :  *
    3076                 :             :  * If addpub is true, we will add the list of publications into oldpublist.
    3077                 :             :  * Otherwise, we will delete the list of publications from oldpublist.  The
    3078                 :             :  * returned list is a copy, oldpublist itself is not changed.
    3079                 :             :  *
    3080                 :             :  * subname is the subscription name, for error messages.
    3081                 :             :  */
    3082                 :             : static List *
    3083                 :           8 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
    3084                 :             : {
    3085                 :           8 :         ListCell   *lc;
    3086                 :             : 
    3087                 :           8 :         oldpublist = list_copy(oldpublist);
    3088                 :             : 
    3089                 :           8 :         check_duplicates_in_publist(newpublist, NULL);
    3090                 :             : 
    3091   [ +  -  +  +  :          13 :         foreach(lc, newpublist)
                   +  + ]
    3092                 :             :         {
    3093                 :          12 :                 char       *name = strVal(lfirst(lc));
    3094                 :          12 :                 ListCell   *lc2;
    3095                 :          12 :                 bool            found = false;
    3096                 :             : 
    3097   [ +  -  +  +  :          26 :                 foreach(lc2, oldpublist)
                   +  + ]
    3098                 :             :                 {
    3099                 :          16 :                         char       *pubname = strVal(lfirst(lc2));
    3100                 :             : 
    3101         [ +  + ]:          16 :                         if (strcmp(name, pubname) == 0)
    3102                 :             :                         {
    3103                 :           7 :                                 found = true;
    3104         [ +  + ]:           7 :                                 if (addpub)
    3105   [ +  -  +  - ]:           2 :                                         ereport(ERROR,
    3106                 :             :                                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    3107                 :             :                                                          errmsg("publication \"%s\" is already in subscription \"%s\"",
    3108                 :             :                                                                         name, subname)));
    3109                 :             :                                 else
    3110                 :           5 :                                         oldpublist = foreach_delete_current(oldpublist, lc2);
    3111                 :             : 
    3112                 :           5 :                                 break;
    3113                 :             :                         }
    3114         [ +  + ]:          14 :                 }
    3115                 :             : 
    3116   [ +  +  -  + ]:          10 :                 if (addpub && !found)
    3117                 :           2 :                         oldpublist = lappend(oldpublist, makeString(name));
    3118   [ +  +  +  + ]:           8 :                 else if (!addpub && !found)
    3119   [ +  -  +  - ]:           1 :                         ereport(ERROR,
    3120                 :             :                                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3121                 :             :                                          errmsg("publication \"%s\" is not in subscription \"%s\"",
    3122                 :             :                                                         name, subname)));
    3123                 :           5 :         }
    3124                 :             : 
    3125                 :             :         /*
    3126                 :             :          * XXX Probably no strong reason for this, but for now it's to make ALTER
    3127                 :             :          * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
    3128                 :             :          */
    3129         [ -  + ]:           1 :         if (!oldpublist)
    3130   [ +  -  +  - ]:           1 :                 ereport(ERROR,
    3131                 :             :                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    3132                 :             :                                  errmsg("cannot drop all the publications from a subscription")));
    3133                 :             : 
    3134                 :           0 :         return oldpublist;
    3135                 :           0 : }
    3136                 :             : 
    3137                 :             : /*
    3138                 :             :  * Extract the streaming mode value from a DefElem.  This is like
    3139                 :             :  * defGetBoolean() but also accepts the special value of "parallel".
    3140                 :             :  */
    3141                 :             : char
    3142                 :           0 : defGetStreamingMode(DefElem *def)
    3143                 :             : {
    3144                 :             :         /*
    3145                 :             :          * If no parameter value given, assume "true" is meant.
    3146                 :             :          */
    3147         [ #  # ]:           0 :         if (!def->arg)
    3148                 :           0 :                 return LOGICALREP_STREAM_ON;
    3149                 :             : 
    3150                 :             :         /*
    3151                 :             :          * Allow 0, 1, "false", "true", "off", "on" or "parallel".
    3152                 :             :          */
    3153         [ #  # ]:           0 :         switch (nodeTag(def->arg))
    3154                 :             :         {
    3155                 :             :                 case T_Integer:
    3156      [ #  #  # ]:           0 :                         switch (intVal(def->arg))
    3157                 :             :                         {
    3158                 :             :                                 case 0:
    3159                 :           0 :                                         return LOGICALREP_STREAM_OFF;
    3160                 :             :                                 case 1:
    3161                 :           0 :                                         return LOGICALREP_STREAM_ON;
    3162                 :             :                                 default:
    3163                 :             :                                         /* otherwise, error out below */
    3164                 :           0 :                                         break;
    3165                 :             :                         }
    3166                 :           0 :                         break;
    3167                 :             :                 default:
    3168                 :             :                         {
    3169                 :           0 :                                 char       *sval = defGetString(def);
    3170                 :             : 
    3171                 :             :                                 /*
    3172                 :             :                                  * The set of strings accepted here should match up with the
    3173                 :             :                                  * grammar's opt_boolean_or_string production.
    3174                 :             :                                  */
    3175   [ #  #  #  # ]:           0 :                                 if (pg_strcasecmp(sval, "false") == 0 ||
    3176                 :           0 :                                         pg_strcasecmp(sval, "off") == 0)
    3177                 :           0 :                                         return LOGICALREP_STREAM_OFF;
    3178   [ #  #  #  # ]:           0 :                                 if (pg_strcasecmp(sval, "true") == 0 ||
    3179                 :           0 :                                         pg_strcasecmp(sval, "on") == 0)
    3180                 :           0 :                                         return LOGICALREP_STREAM_ON;
    3181         [ #  # ]:           0 :                                 if (pg_strcasecmp(sval, "parallel") == 0)
    3182                 :           0 :                                         return LOGICALREP_STREAM_PARALLEL;
    3183      [ #  #  # ]:           0 :                         }
    3184                 :           0 :                         break;
    3185                 :             :         }
    3186                 :             : 
    3187   [ #  #  #  # ]:           0 :         ereport(ERROR,
    3188                 :             :                         (errcode(ERRCODE_SYNTAX_ERROR),
    3189                 :             :                          errmsg("%s requires a Boolean value or \"parallel\"",
    3190                 :             :                                         def->defname)));
    3191                 :           0 :         return LOGICALREP_STREAM_OFF;   /* keep compiler quiet */
    3192                 :           0 : }
        

Generated by: LCOV version 2.3.2-1