LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 1172 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 40 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_createsubscriber.c
       4              :  *        Create a new logical replica from a standby server
       5              :  *
       6              :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *        src/bin/pg_basebackup/pg_createsubscriber.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : 
      14              : #include "postgres_fe.h"
      15              : 
      16              : #include <sys/stat.h>
      17              : #include <sys/time.h>
      18              : #include <sys/wait.h>
      19              : #include <time.h>
      20              : 
      21              : #include "common/connect.h"
      22              : #include "common/controldata_utils.h"
      23              : #include "common/file_utils.h"
      24              : #include "common/logging.h"
      25              : #include "common/pg_prng.h"
      26              : #include "common/restricted_token.h"
      27              : #include "datatype/timestamp.h"
      28              : #include "fe_utils/recovery_gen.h"
      29              : #include "fe_utils/simple_list.h"
      30              : #include "fe_utils/string_utils.h"
      31              : #include "fe_utils/version.h"
      32              : #include "getopt_long.h"
      33              : 
      34              : #define DEFAULT_SUB_PORT        "50432"
      35              : #define OBJECTTYPE_PUBLICATIONS  0x0001
      36              : 
      37              : /*
      38              :  * Configuration files for recovery parameters.
      39              :  *
      40              :  * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
      41              :  * the server through an include_if_exists in postgresql.auto.conf.
      42              :  *
      43              :  * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
      44              :  * so as the recovery parameters set by this tool never take effect on node
      45              :  * restart.  The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
      46              :  * debugging.
      47              :  */
      48              : #define PG_AUTOCONF_FILENAME            "postgresql.auto.conf"
      49              : #define INCLUDED_CONF_FILE                      "pg_createsubscriber.conf"
      50              : #define INCLUDED_CONF_FILE_DISABLED     INCLUDED_CONF_FILE ".disabled"
      51              : 
      52              : /* Command-line options */
      53              : struct CreateSubscriberOptions
      54              : {
      55              :         char       *config_file;        /* configuration file */
      56              :         char       *pub_conninfo_str;   /* publisher connection string */
      57              :         char       *socket_dir;         /* directory for Unix-domain socket, if any */
      58              :         char       *sub_port;           /* subscriber port number */
      59              :         const char *sub_username;       /* subscriber username */
      60              :         bool            two_phase;              /* enable-two-phase option */
      61              :         SimpleStringList database_names;        /* list of database names */
      62              :         SimpleStringList pub_names; /* list of publication names */
      63              :         SimpleStringList sub_names; /* list of subscription names */
      64              :         SimpleStringList replslot_names;        /* list of replication slot names */
      65              :         int                     recovery_timeout;       /* stop recovery after this time */
      66              :         bool            all_dbs;                /* all option */
      67              :         SimpleStringList objecttypes_to_clean;  /* list of object types to cleanup */
      68              : };
      69              : 
      70              : /* per-database publication/subscription info */
      71              : struct LogicalRepInfo
      72              : {
      73              :         char       *dbname;                     /* database name */
      74              :         char       *pubconninfo;        /* publisher connection string */
      75              :         char       *subconninfo;        /* subscriber connection string */
      76              :         char       *pubname;            /* publication name */
      77              :         char       *subname;            /* subscription name */
      78              :         char       *replslotname;       /* replication slot name */
      79              : 
      80              :         bool            made_replslot;  /* replication slot was created */
      81              :         bool            made_publication;       /* publication was created */
      82              : };
      83              : 
      84              : /*
      85              :  * Information shared across all the databases (or publications and
      86              :  * subscriptions).
      87              :  */
      88              : struct LogicalRepInfos
      89              : {
      90              :         struct LogicalRepInfo *dbinfo;
      91              :         bool            two_phase;              /* enable-two-phase option */
      92              :         bits32          objecttypes_to_clean;   /* flags indicating which object types
      93              :                                                                                  * to clean up on subscriber */
      94              : };
      95              : 
      96              : static void cleanup_objects_atexit(void);
      97              : static void usage(void);
      98              : static char *get_base_conninfo(const char *conninfo, char **dbname);
      99              : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
     100              : static char *get_exec_path(const char *argv0, const char *progname);
     101              : static void check_data_directory(const char *datadir);
     102              : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
     103              : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     104              :                                                                                                  const char *pub_base_conninfo,
     105              :                                                                                                  const char *sub_base_conninfo);
     106              : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
     107              : static void disconnect_database(PGconn *conn, bool exit_on_error);
     108              : static uint64 get_primary_sysid(const char *conninfo);
     109              : static uint64 get_standby_sysid(const char *datadir);
     110              : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
     111              : static bool server_is_in_recovery(PGconn *conn);
     112              : static char *generate_object_name(PGconn *conn);
     113              : static void check_publisher(const struct LogicalRepInfo *dbinfo);
     114              : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
     115              : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
     116              : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
     117              :                                                          const char *consistent_lsn);
     118              : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
     119              :                                                    const char *lsn);
     120              : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
     121              :                                                                                   const char *slotname);
     122              : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
     123              : static char *create_logical_replication_slot(PGconn *conn,
     124              :                                                                                          struct LogicalRepInfo *dbinfo);
     125              : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
     126              :                                                                   const char *slot_name);
     127              : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
     128              : static void start_standby_server(const struct CreateSubscriberOptions *opt,
     129              :                                                                  bool restricted_access,
     130              :                                                                  bool restrict_logical_worker);
     131              : static void stop_standby_server(const char *datadir);
     132              : static void wait_for_end_recovery(const char *conninfo,
     133              :                                                                   const struct CreateSubscriberOptions *opt);
     134              : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     135              : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
     136              : static void drop_publication(PGconn *conn, const char *pubname,
     137              :                                                          const char *dbname, bool *made_publication);
     138              : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
     139              : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     140              : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     141              :                                                                          const char *lsn);
     142              : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     143              : static void check_and_drop_existing_subscriptions(PGconn *conn,
     144              :                                                                                                   const struct LogicalRepInfo *dbinfo);
     145              : static void drop_existing_subscription(PGconn *conn, const char *subname,
     146              :                                                                            const char *dbname);
     147              : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
     148              :                                                                         bool dbnamespecified);
     149              : 
     150              : #define WAIT_INTERVAL   1               /* 1 second */
     151              : 
     152              : static const char *progname;
     153              : 
     154              : static char *primary_slot_name = NULL;
     155              : static bool dry_run = false;
     156              : 
     157              : static bool success = false;
     158              : 
     159              : static struct LogicalRepInfos dbinfos;
     160              : static int      num_dbs = 0;            /* number of specified databases */
     161              : static int      num_pubs = 0;           /* number of specified publications */
     162              : static int      num_subs = 0;           /* number of specified subscriptions */
     163              : static int      num_replslots = 0;      /* number of specified replication slots */
     164              : 
     165              : static pg_prng_state prng_state;
     166              : 
     167              : static char *pg_ctl_path = NULL;
     168              : static char *pg_resetwal_path = NULL;
     169              : 
     170              : /* standby / subscriber data directory */
     171              : static char *subscriber_dir = NULL;
     172              : 
     173              : static bool recovery_ended = false;
     174              : static bool standby_running = false;
     175              : static bool recovery_params_set = false;
     176              : 
     177              : 
     178              : /*
     179              :  * Clean up objects created by pg_createsubscriber.
     180              :  *
     181              :  * Publications and replication slots are created on the primary.  Depending
     182              :  * on the step where it failed, already-created objects should be removed if
     183              :  * possible (sometimes this won't work due to a connection issue).
     184              :  * There is no cleanup on the target server *after* its promotion, because any
     185              :  * failure at this point means recreating the physical replica and starting
     186              :  * again.
     187              :  *
     188              :  * The recovery configuration is always removed, by renaming the included
     189              :  * configuration file out of the way.
     190              :  */
     191              : static void
     192            0 : cleanup_objects_atexit(void)
     193              : {
     194              :         /* Rename the included configuration file, if necessary. */
     195            0 :         if (recovery_params_set)
     196              :         {
     197            0 :                 char            conf_filename[MAXPGPATH];
     198            0 :                 char            conf_filename_disabled[MAXPGPATH];
     199              : 
     200            0 :                 snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
     201              :                                  INCLUDED_CONF_FILE);
     202            0 :                 snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
     203              :                                  INCLUDED_CONF_FILE_DISABLED);
     204              : 
     205            0 :                 if (durable_rename(conf_filename, conf_filename_disabled) != 0)
     206              :                 {
     207              :                         /* durable_rename() has already logged something. */
     208            0 :                         pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
     209            0 :                 }
     210            0 :         }
     211              : 
     212            0 :         if (success)
     213            0 :                 return;
     214              : 
     215              :         /*
     216              :          * If the server is promoted, there is no way to use the current setup
     217              :          * again. Warn the user that a new replication setup should be done before
     218              :          * trying again.
     219              :          */
     220            0 :         if (recovery_ended)
     221              :         {
     222            0 :                 pg_log_warning("failed after the end of recovery");
     223            0 :                 pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
     224              :                                                         "You must recreate the physical replica before continuing.");
     225            0 :         }
     226              : 
     227            0 :         for (int i = 0; i < num_dbs; i++)
     228              :         {
     229            0 :                 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
     230              : 
     231            0 :                 if (dbinfo->made_publication || dbinfo->made_replslot)
     232              :                 {
     233            0 :                         PGconn     *conn;
     234              : 
     235            0 :                         conn = connect_database(dbinfo->pubconninfo, false);
     236            0 :                         if (conn != NULL)
     237              :                         {
     238            0 :                                 if (dbinfo->made_publication)
     239            0 :                                         drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
     240            0 :                                                                          &dbinfo->made_publication);
     241            0 :                                 if (dbinfo->made_replslot)
     242            0 :                                         drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
     243            0 :                                 disconnect_database(conn, false);
     244            0 :                         }
     245              :                         else
     246              :                         {
     247              :                                 /*
     248              :                                  * If a connection could not be established, inform the user
     249              :                                  * that some objects were left on primary and should be
     250              :                                  * removed before trying again.
     251              :                                  */
     252            0 :                                 if (dbinfo->made_publication)
     253              :                                 {
     254            0 :                                         pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
     255              :                                                                    dbinfo->pubname,
     256              :                                                                    dbinfo->dbname);
     257            0 :                                         pg_log_warning_hint("Drop this publication before trying again.");
     258            0 :                                 }
     259            0 :                                 if (dbinfo->made_replslot)
     260              :                                 {
     261            0 :                                         pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
     262              :                                                                    dbinfo->replslotname,
     263              :                                                                    dbinfo->dbname);
     264            0 :                                         pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
     265            0 :                                 }
     266              :                         }
     267            0 :                 }
     268            0 :         }
     269              : 
     270            0 :         if (standby_running)
     271            0 :                 stop_standby_server(subscriber_dir);
     272            0 : }
     273              : 
     274              : static void
     275            0 : usage(void)
     276              : {
     277            0 :         printf(_("%s creates a new logical replica from a standby server.\n\n"),
     278              :                    progname);
     279            0 :         printf(_("Usage:\n"));
     280            0 :         printf(_("  %s [OPTION]...\n"), progname);
     281            0 :         printf(_("\nOptions:\n"));
     282            0 :         printf(_("  -a, --all                       create subscriptions for all databases except template\n"
     283              :                          "                                  databases and databases that don't allow connections\n"));
     284            0 :         printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     285            0 :         printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     286            0 :         printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     287            0 :         printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     288            0 :         printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     289            0 :         printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     290            0 :         printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     291            0 :         printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
     292            0 :         printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     293            0 :         printf(_("  -v, --verbose                   output verbose messages\n"));
     294            0 :         printf(_("      --clean=OBJECTTYPE          drop all objects of the specified type from specified\n"
     295              :                          "                                  databases on the subscriber; accepts: \"%s\"\n"), "publications");
     296            0 :         printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     297              :                          "                                  file when running target cluster\n"));
     298            0 :         printf(_("      --publication=NAME          publication name\n"));
     299            0 :         printf(_("      --replication-slot=NAME     replication slot name\n"));
     300            0 :         printf(_("      --subscription=NAME         subscription name\n"));
     301            0 :         printf(_("  -V, --version                   output version information, then exit\n"));
     302            0 :         printf(_("  -?, --help                      show this help, then exit\n"));
     303            0 :         printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     304            0 :         printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     305            0 : }
     306              : 
     307              : /*
     308              :  * Subroutine to append "keyword=value" to a connection string,
     309              :  * with proper quoting of the value.  (We assume keywords don't need that.)
     310              :  */
     311              : static void
     312            0 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     313              : {
     314            0 :         if (buf->len > 0)
     315            0 :                 appendPQExpBufferChar(buf, ' ');
     316            0 :         appendPQExpBufferStr(buf, keyword);
     317            0 :         appendPQExpBufferChar(buf, '=');
     318            0 :         appendConnStrVal(buf, val);
     319            0 : }
     320              : 
     321              : /*
     322              :  * Validate a connection string. Returns a base connection string that is a
     323              :  * connection string without a database name.
     324              :  *
     325              :  * Since we might process multiple databases, each database name will be
     326              :  * appended to this base connection string to provide a final connection
     327              :  * string. If the second argument (dbname) is not null, returns dbname if the
     328              :  * provided connection string contains it.
     329              :  *
     330              :  * It is the caller's responsibility to free the returned connection string and
     331              :  * dbname.
     332              :  */
     333              : static char *
     334            0 : get_base_conninfo(const char *conninfo, char **dbname)
     335              : {
     336            0 :         PQExpBuffer buf;
     337            0 :         PQconninfoOption *conn_opts;
     338            0 :         PQconninfoOption *conn_opt;
     339            0 :         char       *errmsg = NULL;
     340            0 :         char       *ret;
     341              : 
     342            0 :         conn_opts = PQconninfoParse(conninfo, &errmsg);
     343            0 :         if (conn_opts == NULL)
     344              :         {
     345            0 :                 pg_log_error("could not parse connection string: %s", errmsg);
     346            0 :                 PQfreemem(errmsg);
     347            0 :                 return NULL;
     348              :         }
     349              : 
     350            0 :         buf = createPQExpBuffer();
     351            0 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     352              :         {
     353            0 :                 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     354              :                 {
     355            0 :                         if (strcmp(conn_opt->keyword, "dbname") == 0)
     356              :                         {
     357            0 :                                 if (dbname)
     358            0 :                                         *dbname = pg_strdup(conn_opt->val);
     359            0 :                                 continue;
     360              :                         }
     361            0 :                         appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     362            0 :                 }
     363            0 :         }
     364              : 
     365            0 :         ret = pg_strdup(buf->data);
     366              : 
     367            0 :         destroyPQExpBuffer(buf);
     368            0 :         PQconninfoFree(conn_opts);
     369              : 
     370            0 :         return ret;
     371            0 : }
     372              : 
     373              : /*
     374              :  * Build a subscriber connection string. Only a few parameters are supported
     375              :  * since it starts a server with restricted access.
     376              :  */
     377              : static char *
     378            0 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     379              : {
     380            0 :         PQExpBuffer buf = createPQExpBuffer();
     381            0 :         char       *ret;
     382              : 
     383            0 :         appendConnStrItem(buf, "port", opt->sub_port);
     384              : #if !defined(WIN32)
     385            0 :         appendConnStrItem(buf, "host", opt->socket_dir);
     386              : #endif
     387            0 :         if (opt->sub_username != NULL)
     388            0 :                 appendConnStrItem(buf, "user", opt->sub_username);
     389            0 :         appendConnStrItem(buf, "fallback_application_name", progname);
     390              : 
     391            0 :         ret = pg_strdup(buf->data);
     392              : 
     393            0 :         destroyPQExpBuffer(buf);
     394              : 
     395            0 :         return ret;
     396            0 : }
     397              : 
     398              : /*
     399              :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     400              :  * pg_createsubscriber and it has the same version.  It returns the absolute
     401              :  * path of the progname.
     402              :  */
     403              : static char *
     404            0 : get_exec_path(const char *argv0, const char *progname)
     405              : {
     406            0 :         char       *versionstr;
     407            0 :         char       *exec_path;
     408            0 :         int                     ret;
     409              : 
     410            0 :         versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     411            0 :         exec_path = pg_malloc(MAXPGPATH);
     412            0 :         ret = find_other_exec(argv0, progname, versionstr, exec_path);
     413              : 
     414            0 :         if (ret < 0)
     415              :         {
     416            0 :                 char            full_path[MAXPGPATH];
     417              : 
     418            0 :                 if (find_my_exec(argv0, full_path) < 0)
     419            0 :                         strlcpy(full_path, progname, sizeof(full_path));
     420              : 
     421            0 :                 if (ret == -1)
     422            0 :                         pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     423              :                                          progname, "pg_createsubscriber", full_path);
     424              :                 else
     425            0 :                         pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     426              :                                          progname, full_path, "pg_createsubscriber");
     427            0 :         }
     428              : 
     429            0 :         pg_log_debug("%s path is:  %s", progname, exec_path);
     430              : 
     431            0 :         return exec_path;
     432            0 : }
     433              : 
     434              : /*
     435              :  * Is it a cluster directory? These are preliminary checks. It is far from
     436              :  * making an accurate check. If it is not a clone from the publisher, it will
     437              :  * eventually fail in a future step.
     438              :  */
     439              : static void
     440            0 : check_data_directory(const char *datadir)
     441              : {
     442            0 :         struct stat statbuf;
     443            0 :         uint32          major_version;
     444            0 :         char       *version_str;
     445              : 
     446            0 :         pg_log_info("checking if directory \"%s\" is a cluster data directory",
     447              :                                 datadir);
     448              : 
     449            0 :         if (stat(datadir, &statbuf) != 0)
     450              :         {
     451            0 :                 if (errno == ENOENT)
     452            0 :                         pg_fatal("data directory \"%s\" does not exist", datadir);
     453              :                 else
     454            0 :                         pg_fatal("could not access directory \"%s\": %m", datadir);
     455            0 :         }
     456              : 
     457              :         /*
     458              :          * Retrieve the contents of this cluster's PG_VERSION.  We require
     459              :          * compatibility with the same major version as the one this tool is
     460              :          * compiled with.
     461              :          */
     462            0 :         major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
     463            0 :         if (major_version != PG_MAJORVERSION_NUM)
     464              :         {
     465            0 :                 pg_log_error("data directory is of wrong version");
     466            0 :                 pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
     467              :                                                         "PG_VERSION", version_str, PG_MAJORVERSION);
     468            0 :                 exit(1);
     469              :         }
     470            0 : }
     471              : 
     472              : /*
     473              :  * Append database name into a base connection string.
     474              :  *
     475              :  * dbname is the only parameter that changes so it is not included in the base
     476              :  * connection string. This function concatenates dbname to build a "real"
     477              :  * connection string.
     478              :  */
     479              : static char *
     480            0 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     481              : {
     482            0 :         PQExpBuffer buf = createPQExpBuffer();
     483            0 :         char       *ret;
     484              : 
     485            0 :         Assert(conninfo != NULL);
     486              : 
     487            0 :         appendPQExpBufferStr(buf, conninfo);
     488            0 :         appendConnStrItem(buf, "dbname", dbname);
     489              : 
     490            0 :         ret = pg_strdup(buf->data);
     491            0 :         destroyPQExpBuffer(buf);
     492              : 
     493            0 :         return ret;
     494            0 : }
     495              : 
     496              : /*
     497              :  * Store publication and subscription information.
     498              :  *
     499              :  * If publication, replication slot and subscription names were specified,
     500              :  * store it here. Otherwise, a generated name will be assigned to the object in
     501              :  * setup_publisher().
     502              :  */
     503              : static struct LogicalRepInfo *
     504            0 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     505              :                                    const char *pub_base_conninfo,
     506              :                                    const char *sub_base_conninfo)
     507              : {
     508            0 :         struct LogicalRepInfo *dbinfo;
     509            0 :         SimpleStringListCell *pubcell = NULL;
     510            0 :         SimpleStringListCell *subcell = NULL;
     511            0 :         SimpleStringListCell *replslotcell = NULL;
     512            0 :         int                     i = 0;
     513              : 
     514            0 :         dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     515              : 
     516            0 :         if (num_pubs > 0)
     517            0 :                 pubcell = opt->pub_names.head;
     518            0 :         if (num_subs > 0)
     519            0 :                 subcell = opt->sub_names.head;
     520            0 :         if (num_replslots > 0)
     521            0 :                 replslotcell = opt->replslot_names.head;
     522              : 
     523            0 :         for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     524              :         {
     525            0 :                 char       *conninfo;
     526              : 
     527              :                 /* Fill publisher attributes */
     528            0 :                 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     529            0 :                 dbinfo[i].pubconninfo = conninfo;
     530            0 :                 dbinfo[i].dbname = cell->val;
     531            0 :                 if (num_pubs > 0)
     532            0 :                         dbinfo[i].pubname = pubcell->val;
     533              :                 else
     534            0 :                         dbinfo[i].pubname = NULL;
     535            0 :                 if (num_replslots > 0)
     536            0 :                         dbinfo[i].replslotname = replslotcell->val;
     537              :                 else
     538            0 :                         dbinfo[i].replslotname = NULL;
     539            0 :                 dbinfo[i].made_replslot = false;
     540            0 :                 dbinfo[i].made_publication = false;
     541              :                 /* Fill subscriber attributes */
     542            0 :                 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     543            0 :                 dbinfo[i].subconninfo = conninfo;
     544            0 :                 if (num_subs > 0)
     545            0 :                         dbinfo[i].subname = subcell->val;
     546              :                 else
     547            0 :                         dbinfo[i].subname = NULL;
     548              :                 /* Other fields will be filled later */
     549              : 
     550            0 :                 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     551              :                                          dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     552              :                                          dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     553              :                                          dbinfo[i].pubconninfo);
     554            0 :                 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
     555              :                                          dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     556              :                                          dbinfo[i].subconninfo,
     557              :                                          dbinfos.two_phase ? "true" : "false");
     558              : 
     559            0 :                 if (num_pubs > 0)
     560            0 :                         pubcell = pubcell->next;
     561            0 :                 if (num_subs > 0)
     562            0 :                         subcell = subcell->next;
     563            0 :                 if (num_replslots > 0)
     564            0 :                         replslotcell = replslotcell->next;
     565              : 
     566            0 :                 i++;
     567            0 :         }
     568              : 
     569            0 :         return dbinfo;
     570            0 : }
     571              : 
     572              : /*
     573              :  * Open a new connection. If exit_on_error is true, it has an undesired
     574              :  * condition and it should exit immediately.
     575              :  */
     576              : static PGconn *
     577            0 : connect_database(const char *conninfo, bool exit_on_error)
     578              : {
     579            0 :         PGconn     *conn;
     580            0 :         PGresult   *res;
     581              : 
     582            0 :         conn = PQconnectdb(conninfo);
     583            0 :         if (PQstatus(conn) != CONNECTION_OK)
     584              :         {
     585            0 :                 pg_log_error("connection to database failed: %s",
     586              :                                          PQerrorMessage(conn));
     587            0 :                 PQfinish(conn);
     588              : 
     589            0 :                 if (exit_on_error)
     590            0 :                         exit(1);
     591            0 :                 return NULL;
     592              :         }
     593              : 
     594              :         /* Secure search_path */
     595            0 :         res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     596            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     597              :         {
     598            0 :                 pg_log_error("could not clear \"search_path\": %s",
     599              :                                          PQresultErrorMessage(res));
     600            0 :                 PQclear(res);
     601            0 :                 PQfinish(conn);
     602              : 
     603            0 :                 if (exit_on_error)
     604            0 :                         exit(1);
     605            0 :                 return NULL;
     606              :         }
     607            0 :         PQclear(res);
     608              : 
     609            0 :         return conn;
     610            0 : }
     611              : 
     612              : /*
     613              :  * Close the connection. If exit_on_error is true, it has an undesired
     614              :  * condition and it should exit immediately.
     615              :  */
     616              : static void
     617            0 : disconnect_database(PGconn *conn, bool exit_on_error)
     618              : {
     619            0 :         Assert(conn != NULL);
     620              : 
     621            0 :         PQfinish(conn);
     622              : 
     623            0 :         if (exit_on_error)
     624            0 :                 exit(1);
     625            0 : }
     626              : 
     627              : /*
     628              :  * Obtain the system identifier using the provided connection. It will be used
     629              :  * to compare if a data directory is a clone of another one.
     630              :  */
     631              : static uint64
     632            0 : get_primary_sysid(const char *conninfo)
     633              : {
     634            0 :         PGconn     *conn;
     635            0 :         PGresult   *res;
     636            0 :         uint64          sysid;
     637              : 
     638            0 :         pg_log_info("getting system identifier from publisher");
     639              : 
     640            0 :         conn = connect_database(conninfo, true);
     641              : 
     642            0 :         res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     643            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     644              :         {
     645            0 :                 pg_log_error("could not get system identifier: %s",
     646              :                                          PQresultErrorMessage(res));
     647            0 :                 disconnect_database(conn, true);
     648            0 :         }
     649            0 :         if (PQntuples(res) != 1)
     650              :         {
     651            0 :                 pg_log_error("could not get system identifier: got %d rows, expected %d row",
     652              :                                          PQntuples(res), 1);
     653            0 :                 disconnect_database(conn, true);
     654            0 :         }
     655              : 
     656            0 :         sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     657              : 
     658            0 :         pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
     659              : 
     660            0 :         PQclear(res);
     661            0 :         disconnect_database(conn, false);
     662              : 
     663            0 :         return sysid;
     664            0 : }
     665              : 
     666              : /*
     667              :  * Obtain the system identifier from control file. It will be used to compare
     668              :  * if a data directory is a clone of another one. This routine is used locally
     669              :  * and avoids a connection.
     670              :  */
     671              : static uint64
     672            0 : get_standby_sysid(const char *datadir)
     673              : {
     674            0 :         ControlFileData *cf;
     675            0 :         bool            crc_ok;
     676            0 :         uint64          sysid;
     677              : 
     678            0 :         pg_log_info("getting system identifier from subscriber");
     679              : 
     680            0 :         cf = get_controlfile(datadir, &crc_ok);
     681            0 :         if (!crc_ok)
     682            0 :                 pg_fatal("control file appears to be corrupt");
     683              : 
     684            0 :         sysid = cf->system_identifier;
     685              : 
     686            0 :         pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
     687              : 
     688            0 :         pg_free(cf);
     689              : 
     690            0 :         return sysid;
     691            0 : }
     692              : 
     693              : /*
     694              :  * Modify the system identifier. Since a standby server preserves the system
     695              :  * identifier, it makes sense to change it to avoid situations in which WAL
     696              :  * files from one of the systems might be used in the other one.
     697              :  */
     698              : static void
     699            0 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     700              : {
     701            0 :         ControlFileData *cf;
     702            0 :         bool            crc_ok;
     703            0 :         struct timeval tv;
     704              : 
     705            0 :         char       *cmd_str;
     706              : 
     707            0 :         pg_log_info("modifying system identifier of subscriber");
     708              : 
     709            0 :         cf = get_controlfile(subscriber_dir, &crc_ok);
     710            0 :         if (!crc_ok)
     711            0 :                 pg_fatal("control file appears to be corrupt");
     712              : 
     713              :         /*
     714              :          * Select a new system identifier.
     715              :          *
     716              :          * XXX this code was extracted from BootStrapXLOG().
     717              :          */
     718            0 :         gettimeofday(&tv, NULL);
     719            0 :         cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     720            0 :         cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     721            0 :         cf->system_identifier |= getpid() & 0xFFF;
     722              : 
     723            0 :         if (dry_run)
     724            0 :                 pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
     725              :                                         cf->system_identifier);
     726              :         else
     727              :         {
     728            0 :                 update_controlfile(subscriber_dir, cf, true);
     729            0 :                 pg_log_info("system identifier is %" PRIu64 " on subscriber",
     730              :                                         cf->system_identifier);
     731              :         }
     732              : 
     733            0 :         if (dry_run)
     734            0 :                 pg_log_info("dry-run: would run pg_resetwal on the subscriber");
     735              :         else
     736            0 :                 pg_log_info("running pg_resetwal on the subscriber");
     737              : 
     738            0 :         cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
     739            0 :                                            subscriber_dir, DEVNULL);
     740              : 
     741            0 :         pg_log_debug("pg_resetwal command is: %s", cmd_str);
     742              : 
     743            0 :         if (!dry_run)
     744              :         {
     745            0 :                 int                     rc = system(cmd_str);
     746              : 
     747            0 :                 if (rc == 0)
     748            0 :                         pg_log_info("successfully reset WAL on the subscriber");
     749              :                 else
     750            0 :                         pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
     751            0 :         }
     752              : 
     753            0 :         pg_free(cf);
     754            0 : }
     755              : 
     756              : /*
     757              :  * Generate an object name using a prefix, database oid and a random integer.
     758              :  * It is used in case the user does not specify an object name (publication,
     759              :  * subscription, replication slot).
     760              :  */
     761              : static char *
     762            0 : generate_object_name(PGconn *conn)
     763              : {
     764            0 :         PGresult   *res;
     765            0 :         Oid                     oid;
     766            0 :         uint32          rand;
     767            0 :         char       *objname;
     768              : 
     769            0 :         res = PQexec(conn,
     770              :                                  "SELECT oid FROM pg_catalog.pg_database "
     771              :                                  "WHERE datname = pg_catalog.current_database()");
     772            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     773              :         {
     774            0 :                 pg_log_error("could not obtain database OID: %s",
     775              :                                          PQresultErrorMessage(res));
     776            0 :                 disconnect_database(conn, true);
     777            0 :         }
     778              : 
     779            0 :         if (PQntuples(res) != 1)
     780              :         {
     781            0 :                 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
     782              :                                          PQntuples(res), 1);
     783            0 :                 disconnect_database(conn, true);
     784            0 :         }
     785              : 
     786              :         /* Database OID */
     787            0 :         oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     788              : 
     789            0 :         PQclear(res);
     790              : 
     791              :         /* Random unsigned integer */
     792            0 :         rand = pg_prng_uint32(&prng_state);
     793              : 
     794              :         /*
     795              :          * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     796              :          * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     797              :          * '\0').
     798              :          */
     799            0 :         objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     800              : 
     801            0 :         return objname;
     802            0 : }
     803              : 
     804              : /*
     805              :  * Does the publication exist in the specified database?
     806              :  */
     807              : static bool
     808            0 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
     809              : {
     810            0 :         PQExpBuffer str = createPQExpBuffer();
     811            0 :         PGresult   *res;
     812            0 :         bool            found = false;
     813            0 :         char       *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
     814              : 
     815            0 :         appendPQExpBuffer(str,
     816              :                                           "SELECT 1 FROM pg_catalog.pg_publication "
     817              :                                           "WHERE pubname = %s",
     818            0 :                                           pubname_esc);
     819            0 :         res = PQexec(conn, str->data);
     820            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     821              :         {
     822            0 :                 pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
     823              :                                          pubname, dbname, PQerrorMessage(conn));
     824            0 :                 disconnect_database(conn, true);
     825            0 :         }
     826              : 
     827            0 :         if (PQntuples(res) == 1)
     828            0 :                 found = true;
     829              : 
     830            0 :         PQclear(res);
     831            0 :         PQfreemem(pubname_esc);
     832            0 :         destroyPQExpBuffer(str);
     833              : 
     834            0 :         return found;
     835            0 : }
     836              : 
     837              : /*
     838              :  * Create the publications and replication slots in preparation for logical
     839              :  * replication. Returns the LSN from latest replication slot. It will be the
     840              :  * replication start point that is used to adjust the subscriptions (see
     841              :  * set_replication_progress).
     842              :  */
     843              : static char *
     844            0 : setup_publisher(struct LogicalRepInfo *dbinfo)
     845              : {
     846            0 :         char       *lsn = NULL;
     847              : 
     848            0 :         pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     849              : 
     850            0 :         for (int i = 0; i < num_dbs; i++)
     851              :         {
     852            0 :                 PGconn     *conn;
     853            0 :                 char       *genname = NULL;
     854              : 
     855            0 :                 conn = connect_database(dbinfo[i].pubconninfo, true);
     856              : 
     857              :                 /*
     858              :                  * If an object name was not specified as command-line options, assign
     859              :                  * a generated object name. The replication slot has a different rule.
     860              :                  * The subscription name is assigned to the replication slot name if
     861              :                  * no replication slot is specified. It follows the same rule as
     862              :                  * CREATE SUBSCRIPTION.
     863              :                  */
     864            0 :                 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     865            0 :                         genname = generate_object_name(conn);
     866            0 :                 if (num_pubs == 0)
     867            0 :                         dbinfo[i].pubname = pg_strdup(genname);
     868            0 :                 if (num_subs == 0)
     869            0 :                         dbinfo[i].subname = pg_strdup(genname);
     870            0 :                 if (num_replslots == 0)
     871            0 :                         dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     872              : 
     873            0 :                 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
     874              :                 {
     875              :                         /* Reuse existing publication on publisher. */
     876            0 :                         pg_log_info("use existing publication \"%s\" in database \"%s\"",
     877              :                                                 dbinfo[i].pubname, dbinfo[i].dbname);
     878              :                         /* Don't remove pre-existing publication if an error occurs. */
     879            0 :                         dbinfo[i].made_publication = false;
     880            0 :                 }
     881              :                 else
     882              :                 {
     883              :                         /*
     884              :                          * Create publication on publisher. This step should be executed
     885              :                          * *before* promoting the subscriber to avoid any transactions
     886              :                          * between consistent LSN and the new publication rows (such
     887              :                          * transactions wouldn't see the new publication rows resulting in
     888              :                          * an error).
     889              :                          */
     890            0 :                         create_publication(conn, &dbinfo[i]);
     891              :                 }
     892              : 
     893              :                 /* Create replication slot on publisher */
     894            0 :                 if (lsn)
     895            0 :                         pg_free(lsn);
     896            0 :                 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     897            0 :                 if (lsn == NULL && !dry_run)
     898            0 :                         exit(1);
     899              : 
     900              :                 /*
     901              :                  * Since we are using the LSN returned by the last replication slot as
     902              :                  * recovery_target_lsn, this LSN is ahead of the current WAL position
     903              :                  * and the recovery waits until the publisher writes a WAL record to
     904              :                  * reach the target and ends the recovery. On idle systems, this wait
     905              :                  * time is unpredictable and could lead to failure in promoting the
     906              :                  * subscriber. To avoid that, insert a harmless WAL record.
     907              :                  */
     908            0 :                 if (i == num_dbs - 1 && !dry_run)
     909              :                 {
     910            0 :                         PGresult   *res;
     911              : 
     912            0 :                         res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
     913            0 :                         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     914              :                         {
     915            0 :                                 pg_log_error("could not write an additional WAL record: %s",
     916              :                                                          PQresultErrorMessage(res));
     917            0 :                                 disconnect_database(conn, true);
     918            0 :                         }
     919            0 :                         PQclear(res);
     920            0 :                 }
     921              : 
     922            0 :                 disconnect_database(conn, false);
     923            0 :         }
     924              : 
     925            0 :         return lsn;
     926            0 : }
     927              : 
     928              : /*
     929              :  * Is recovery still in progress?
     930              :  */
     931              : static bool
     932            0 : server_is_in_recovery(PGconn *conn)
     933              : {
     934            0 :         PGresult   *res;
     935            0 :         int                     ret;
     936              : 
     937            0 :         res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
     938              : 
     939            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     940              :         {
     941            0 :                 pg_log_error("could not obtain recovery progress: %s",
     942              :                                          PQresultErrorMessage(res));
     943            0 :                 disconnect_database(conn, true);
     944            0 :         }
     945              : 
     946              : 
     947            0 :         ret = strcmp("t", PQgetvalue(res, 0, 0));
     948              : 
     949            0 :         PQclear(res);
     950              : 
     951            0 :         return ret == 0;
     952            0 : }
     953              : 
     954              : /*
     955              :  * Is the primary server ready for logical replication?
     956              :  *
     957              :  * XXX Does it not allow a synchronous replica?
     958              :  */
     959              : static void
     960            0 : check_publisher(const struct LogicalRepInfo *dbinfo)
     961              : {
     962            0 :         PGconn     *conn;
     963            0 :         PGresult   *res;
     964            0 :         bool            failed = false;
     965              : 
     966            0 :         char       *wal_level;
     967            0 :         int                     max_repslots;
     968            0 :         int                     cur_repslots;
     969            0 :         int                     max_walsenders;
     970            0 :         int                     cur_walsenders;
     971            0 :         int                     max_prepared_transactions;
     972            0 :         char       *max_slot_wal_keep_size;
     973              : 
     974            0 :         pg_log_info("checking settings on publisher");
     975              : 
     976            0 :         conn = connect_database(dbinfo[0].pubconninfo, true);
     977              : 
     978              :         /*
     979              :          * If the primary server is in recovery (i.e. cascading replication),
     980              :          * objects (publication) cannot be created because it is read only.
     981              :          */
     982            0 :         if (server_is_in_recovery(conn))
     983              :         {
     984            0 :                 pg_log_error("primary server cannot be in recovery");
     985            0 :                 disconnect_database(conn, true);
     986            0 :         }
     987              : 
     988              :         /*------------------------------------------------------------------------
     989              :          * Logical replication requires a few parameters to be set on publisher.
     990              :          * Since these parameters are not a requirement for physical replication,
     991              :          * we should check it to make sure it won't fail.
     992              :          *
     993              :          * - wal_level >= replica
     994              :          * - max_replication_slots >= current + number of dbs to be converted
     995              :          * - max_wal_senders >= current + number of dbs to be converted
     996              :          * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
     997              :          * -----------------------------------------------------------------------
     998              :          */
     999            0 :         res = PQexec(conn,
    1000              :                                  "SELECT pg_catalog.current_setting('wal_level'),"
    1001              :                                  " pg_catalog.current_setting('max_replication_slots'),"
    1002              :                                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
    1003              :                                  " pg_catalog.current_setting('max_wal_senders'),"
    1004              :                                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
    1005              :                                  " pg_catalog.current_setting('max_prepared_transactions'),"
    1006              :                                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
    1007              : 
    1008            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1009              :         {
    1010            0 :                 pg_log_error("could not obtain publisher settings: %s",
    1011              :                                          PQresultErrorMessage(res));
    1012            0 :                 disconnect_database(conn, true);
    1013            0 :         }
    1014              : 
    1015            0 :         wal_level = pg_strdup(PQgetvalue(res, 0, 0));
    1016            0 :         max_repslots = atoi(PQgetvalue(res, 0, 1));
    1017            0 :         cur_repslots = atoi(PQgetvalue(res, 0, 2));
    1018            0 :         max_walsenders = atoi(PQgetvalue(res, 0, 3));
    1019            0 :         cur_walsenders = atoi(PQgetvalue(res, 0, 4));
    1020            0 :         max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
    1021            0 :         max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
    1022              : 
    1023            0 :         PQclear(res);
    1024              : 
    1025            0 :         pg_log_debug("publisher: wal_level: %s", wal_level);
    1026            0 :         pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
    1027            0 :         pg_log_debug("publisher: current replication slots: %d", cur_repslots);
    1028            0 :         pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
    1029            0 :         pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
    1030            0 :         pg_log_debug("publisher: max_prepared_transactions: %d",
    1031              :                                  max_prepared_transactions);
    1032            0 :         pg_log_debug("publisher: max_slot_wal_keep_size: %s",
    1033              :                                  max_slot_wal_keep_size);
    1034              : 
    1035            0 :         disconnect_database(conn, false);
    1036              : 
    1037            0 :         if (strcmp(wal_level, "minimal") == 0)
    1038              :         {
    1039            0 :                 pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
    1040            0 :                 failed = true;
    1041            0 :         }
    1042              : 
    1043            0 :         if (max_repslots - cur_repslots < num_dbs)
    1044              :         {
    1045            0 :                 pg_log_error("publisher requires %d replication slots, but only %d remain",
    1046              :                                          num_dbs, max_repslots - cur_repslots);
    1047            0 :                 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1048              :                                                   "max_replication_slots", cur_repslots + num_dbs);
    1049            0 :                 failed = true;
    1050            0 :         }
    1051              : 
    1052            0 :         if (max_walsenders - cur_walsenders < num_dbs)
    1053              :         {
    1054            0 :                 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
    1055              :                                          num_dbs, max_walsenders - cur_walsenders);
    1056            0 :                 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1057              :                                                   "max_wal_senders", cur_walsenders + num_dbs);
    1058            0 :                 failed = true;
    1059            0 :         }
    1060              : 
    1061            0 :         if (max_prepared_transactions != 0 && !dbinfos.two_phase)
    1062              :         {
    1063            0 :                 pg_log_warning("two_phase option will not be enabled for replication slots");
    1064            0 :                 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
    1065              :                                                           "Prepared transactions will be replicated at COMMIT PREPARED.");
    1066            0 :                 pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
    1067            0 :         }
    1068              : 
    1069              :         /*
    1070              :          * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
    1071              :          * is set to a non-default value, it may cause replication failures due to
    1072              :          * required WAL files being prematurely removed.
    1073              :          */
    1074            0 :         if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
    1075              :         {
    1076            0 :                 pg_log_warning("required WAL could be removed from the publisher");
    1077            0 :                 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
    1078              :                                                         "max_slot_wal_keep_size");
    1079            0 :         }
    1080              : 
    1081            0 :         pg_free(wal_level);
    1082              : 
    1083            0 :         if (failed)
    1084            0 :                 exit(1);
    1085            0 : }
    1086              : 
    1087              : /*
    1088              :  * Is the standby server ready for logical replication?
    1089              :  *
    1090              :  * XXX Does it not allow a time-delayed replica?
    1091              :  *
    1092              :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
    1093              :  * is S, it cannot detect there is a replica (server C) because server S starts
    1094              :  * accepting only local connections and server C cannot connect to it. Hence,
    1095              :  * there is not a reliable way to provide a suitable error saying the server C
    1096              :  * will be broken at the end of this process (due to pg_resetwal).
    1097              :  */
    1098              : static void
    1099            0 : check_subscriber(const struct LogicalRepInfo *dbinfo)
    1100              : {
    1101            0 :         PGconn     *conn;
    1102            0 :         PGresult   *res;
    1103            0 :         bool            failed = false;
    1104              : 
    1105            0 :         int                     max_lrworkers;
    1106            0 :         int                     max_reporigins;
    1107            0 :         int                     max_wprocs;
    1108              : 
    1109            0 :         pg_log_info("checking settings on subscriber");
    1110              : 
    1111            0 :         conn = connect_database(dbinfo[0].subconninfo, true);
    1112              : 
    1113              :         /* The target server must be a standby */
    1114            0 :         if (!server_is_in_recovery(conn))
    1115              :         {
    1116            0 :                 pg_log_error("target server must be a standby");
    1117            0 :                 disconnect_database(conn, true);
    1118            0 :         }
    1119              : 
    1120              :         /*------------------------------------------------------------------------
    1121              :          * Logical replication requires a few parameters to be set on subscriber.
    1122              :          * Since these parameters are not a requirement for physical replication,
    1123              :          * we should check it to make sure it won't fail.
    1124              :          *
    1125              :          * - max_active_replication_origins >= number of dbs to be converted
    1126              :          * - max_logical_replication_workers >= number of dbs to be converted
    1127              :          * - max_worker_processes >= 1 + number of dbs to be converted
    1128              :          *------------------------------------------------------------------------
    1129              :          */
    1130            0 :         res = PQexec(conn,
    1131              :                                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1132              :                                  "'max_logical_replication_workers', "
    1133              :                                  "'max_active_replication_origins', "
    1134              :                                  "'max_worker_processes', "
    1135              :                                  "'primary_slot_name') "
    1136              :                                  "ORDER BY name");
    1137              : 
    1138            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1139              :         {
    1140            0 :                 pg_log_error("could not obtain subscriber settings: %s",
    1141              :                                          PQresultErrorMessage(res));
    1142            0 :                 disconnect_database(conn, true);
    1143            0 :         }
    1144              : 
    1145            0 :         max_reporigins = atoi(PQgetvalue(res, 0, 0));
    1146            0 :         max_lrworkers = atoi(PQgetvalue(res, 1, 0));
    1147            0 :         max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1148            0 :         if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1149            0 :                 primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1150              : 
    1151            0 :         pg_log_debug("subscriber: max_logical_replication_workers: %d",
    1152              :                                  max_lrworkers);
    1153            0 :         pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
    1154            0 :         pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
    1155            0 :         if (primary_slot_name)
    1156            0 :                 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
    1157              : 
    1158            0 :         PQclear(res);
    1159              : 
    1160            0 :         disconnect_database(conn, false);
    1161              : 
    1162            0 :         if (max_reporigins < num_dbs)
    1163              :         {
    1164            0 :                 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
    1165              :                                          num_dbs, max_reporigins);
    1166            0 :                 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1167              :                                                   "max_active_replication_origins", num_dbs);
    1168            0 :                 failed = true;
    1169            0 :         }
    1170              : 
    1171            0 :         if (max_lrworkers < num_dbs)
    1172              :         {
    1173            0 :                 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
    1174              :                                          num_dbs, max_lrworkers);
    1175            0 :                 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1176              :                                                   "max_logical_replication_workers", num_dbs);
    1177            0 :                 failed = true;
    1178            0 :         }
    1179              : 
    1180            0 :         if (max_wprocs < num_dbs + 1)
    1181              :         {
    1182            0 :                 pg_log_error("subscriber requires %d worker processes, but only %d remain",
    1183              :                                          num_dbs + 1, max_wprocs);
    1184            0 :                 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1185              :                                                   "max_worker_processes", num_dbs + 1);
    1186            0 :                 failed = true;
    1187            0 :         }
    1188              : 
    1189            0 :         if (failed)
    1190            0 :                 exit(1);
    1191            0 : }
    1192              : 
    1193              : /*
    1194              :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1195              :  * the primary (publisher node) and the newly created subscriber. We
    1196              :  * shouldn't drop the associated slot as that would be used by the publisher
    1197              :  * node.
    1198              :  */
    1199              : static void
    1200            0 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
    1201              : {
    1202            0 :         PQExpBuffer query = createPQExpBuffer();
    1203            0 :         PGresult   *res;
    1204              : 
    1205            0 :         Assert(conn != NULL);
    1206              : 
    1207              :         /*
    1208              :          * Construct a query string. These commands are allowed to be executed
    1209              :          * within a transaction.
    1210              :          */
    1211            0 :         appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1212            0 :                                           subname);
    1213            0 :         appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1214            0 :                                           subname);
    1215            0 :         appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1216              : 
    1217            0 :         if (dry_run)
    1218            0 :                 pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
    1219              :                                         subname, dbname);
    1220              :         else
    1221              :         {
    1222            0 :                 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
    1223              :                                         subname, dbname);
    1224              : 
    1225            0 :                 res = PQexec(conn, query->data);
    1226              : 
    1227            0 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1228              :                 {
    1229            0 :                         pg_log_error("could not drop subscription \"%s\": %s",
    1230              :                                                  subname, PQresultErrorMessage(res));
    1231            0 :                         disconnect_database(conn, true);
    1232            0 :                 }
    1233              : 
    1234            0 :                 PQclear(res);
    1235              :         }
    1236              : 
    1237            0 :         destroyPQExpBuffer(query);
    1238            0 : }
    1239              : 
    1240              : /*
    1241              :  * Retrieve and drop the pre-existing subscriptions.
    1242              :  */
    1243              : static void
    1244            0 : check_and_drop_existing_subscriptions(PGconn *conn,
    1245              :                                                                           const struct LogicalRepInfo *dbinfo)
    1246              : {
    1247            0 :         PQExpBuffer query = createPQExpBuffer();
    1248            0 :         char       *dbname;
    1249            0 :         PGresult   *res;
    1250              : 
    1251            0 :         Assert(conn != NULL);
    1252              : 
    1253            0 :         dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1254              : 
    1255            0 :         appendPQExpBuffer(query,
    1256              :                                           "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1257              :                                           "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1258              :                                           "WHERE d.datname = %s",
    1259            0 :                                           dbname);
    1260            0 :         res = PQexec(conn, query->data);
    1261              : 
    1262            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1263              :         {
    1264            0 :                 pg_log_error("could not obtain pre-existing subscriptions: %s",
    1265              :                                          PQresultErrorMessage(res));
    1266            0 :                 disconnect_database(conn, true);
    1267            0 :         }
    1268              : 
    1269            0 :         for (int i = 0; i < PQntuples(res); i++)
    1270            0 :                 drop_existing_subscription(conn, PQgetvalue(res, i, 0),
    1271            0 :                                                                    dbinfo->dbname);
    1272              : 
    1273            0 :         PQclear(res);
    1274            0 :         destroyPQExpBuffer(query);
    1275            0 :         PQfreemem(dbname);
    1276            0 : }
    1277              : 
    1278              : /*
    1279              :  * Create the subscriptions, adjust the initial location for logical
    1280              :  * replication and enable the subscriptions. That's the last step for logical
    1281              :  * replication setup.
    1282              :  */
    1283              : static void
    1284            0 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1285              : {
    1286            0 :         for (int i = 0; i < num_dbs; i++)
    1287              :         {
    1288            0 :                 PGconn     *conn;
    1289              : 
    1290              :                 /* Connect to subscriber. */
    1291            0 :                 conn = connect_database(dbinfo[i].subconninfo, true);
    1292              : 
    1293              :                 /*
    1294              :                  * We don't need the pre-existing subscriptions on the newly formed
    1295              :                  * subscriber. They can connect to other publisher nodes and either
    1296              :                  * get some unwarranted data or can lead to ERRORs in connecting to
    1297              :                  * such nodes.
    1298              :                  */
    1299            0 :                 check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1300              : 
    1301              :                 /* Check and drop the required publications in the given database. */
    1302            0 :                 check_and_drop_publications(conn, &dbinfo[i]);
    1303              : 
    1304            0 :                 create_subscription(conn, &dbinfo[i]);
    1305              : 
    1306              :                 /* Set the replication progress to the correct LSN */
    1307            0 :                 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1308              : 
    1309              :                 /* Enable subscription */
    1310            0 :                 enable_subscription(conn, &dbinfo[i]);
    1311              : 
    1312            0 :                 disconnect_database(conn, false);
    1313            0 :         }
    1314            0 : }
    1315              : 
    1316              : /*
    1317              :  * Write the required recovery parameters.
    1318              :  */
    1319              : static void
    1320            0 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1321              : {
    1322            0 :         PGconn     *conn;
    1323            0 :         PQExpBuffer recoveryconfcontents;
    1324              : 
    1325              :         /*
    1326              :          * Despite of the recovery parameters will be written to the subscriber,
    1327              :          * use a publisher connection. The primary_conninfo is generated using the
    1328              :          * connection settings.
    1329              :          */
    1330            0 :         conn = connect_database(dbinfo[0].pubconninfo, true);
    1331              : 
    1332              :         /*
    1333              :          * Write recovery parameters.
    1334              :          *
    1335              :          * The subscriber is not running yet. In dry run mode, the recovery
    1336              :          * parameters *won't* be written. An invalid LSN is used for printing
    1337              :          * purposes. Additional recovery parameters are added here. It avoids
    1338              :          * unexpected behavior such as end of recovery as soon as a consistent
    1339              :          * state is reached (recovery_target) and failure due to multiple recovery
    1340              :          * targets (name, time, xid, LSN).
    1341              :          */
    1342            0 :         recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1343            0 :         appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
    1344            0 :         appendPQExpBufferStr(recoveryconfcontents,
    1345              :                                                  "recovery_target_timeline = 'latest'\n");
    1346              : 
    1347              :         /*
    1348              :          * Set recovery_target_inclusive = false to avoid reapplying the
    1349              :          * transaction committed at 'lsn' after subscription is enabled. This is
    1350              :          * because the provided 'lsn' is also used as the replication start point
    1351              :          * for the subscription. So, the server can send the transaction committed
    1352              :          * at that 'lsn' after replication is started which can lead to applying
    1353              :          * the same transaction twice if we keep recovery_target_inclusive = true.
    1354              :          */
    1355            0 :         appendPQExpBufferStr(recoveryconfcontents,
    1356              :                                                  "recovery_target_inclusive = false\n");
    1357            0 :         appendPQExpBufferStr(recoveryconfcontents,
    1358              :                                                  "recovery_target_action = promote\n");
    1359            0 :         appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
    1360            0 :         appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
    1361            0 :         appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
    1362              : 
    1363            0 :         if (dry_run)
    1364              :         {
    1365            0 :                 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
    1366            0 :                 appendPQExpBuffer(recoveryconfcontents,
    1367              :                                                   "recovery_target_lsn = '%X/%08X'\n",
    1368            0 :                                                   LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1369            0 :         }
    1370              :         else
    1371              :         {
    1372            0 :                 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1373            0 :                                                   lsn);
    1374              :         }
    1375              : 
    1376            0 :         pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
    1377              : 
    1378            0 :         if (!dry_run)
    1379              :         {
    1380            0 :                 char            conf_filename[MAXPGPATH];
    1381            0 :                 FILE       *fd;
    1382              : 
    1383              :                 /* Write the recovery parameters to INCLUDED_CONF_FILE */
    1384            0 :                 snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
    1385              :                                  INCLUDED_CONF_FILE);
    1386            0 :                 fd = fopen(conf_filename, "w");
    1387            0 :                 if (fd == NULL)
    1388            0 :                         pg_fatal("could not open file \"%s\": %m", conf_filename);
    1389              : 
    1390            0 :                 if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
    1391            0 :                         pg_fatal("could not write to file \"%s\": %m", conf_filename);
    1392              : 
    1393            0 :                 fclose(fd);
    1394            0 :                 recovery_params_set = true;
    1395              : 
    1396              :                 /* Include conditionally the recovery parameters. */
    1397            0 :                 resetPQExpBuffer(recoveryconfcontents);
    1398            0 :                 appendPQExpBufferStr(recoveryconfcontents,
    1399              :                                                          "include_if_exists '" INCLUDED_CONF_FILE "'\n");
    1400            0 :                 WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1401            0 :         }
    1402              : 
    1403            0 :         disconnect_database(conn, false);
    1404            0 : }
    1405              : 
    1406              : /*
    1407              :  * Drop physical replication slot on primary if the standby was using it. After
    1408              :  * the transformation, it has no use.
    1409              :  *
    1410              :  * XXX we might not fail here. Instead, we provide a warning so the user
    1411              :  * eventually drops this replication slot later.
    1412              :  */
    1413              : static void
    1414            0 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1415              : {
    1416            0 :         PGconn     *conn;
    1417              : 
    1418              :         /* Replication slot does not exist, do nothing */
    1419            0 :         if (!primary_slot_name)
    1420            0 :                 return;
    1421              : 
    1422            0 :         conn = connect_database(dbinfo[0].pubconninfo, false);
    1423            0 :         if (conn != NULL)
    1424              :         {
    1425            0 :                 drop_replication_slot(conn, &dbinfo[0], slotname);
    1426            0 :                 disconnect_database(conn, false);
    1427            0 :         }
    1428              :         else
    1429              :         {
    1430            0 :                 pg_log_warning("could not drop replication slot \"%s\" on primary",
    1431              :                                            slotname);
    1432            0 :                 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
    1433              :         }
    1434            0 : }
    1435              : 
    1436              : /*
    1437              :  * Drop failover replication slots on subscriber. After the transformation,
    1438              :  * they have no use.
    1439              :  *
    1440              :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1441              :  * them later.
    1442              :  */
    1443              : static void
    1444            0 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1445              : {
    1446            0 :         PGconn     *conn;
    1447            0 :         PGresult   *res;
    1448              : 
    1449            0 :         conn = connect_database(dbinfo[0].subconninfo, false);
    1450            0 :         if (conn != NULL)
    1451              :         {
    1452              :                 /* Get failover replication slot names */
    1453            0 :                 res = PQexec(conn,
    1454              :                                          "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1455              : 
    1456            0 :                 if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1457              :                 {
    1458              :                         /* Remove failover replication slots from subscriber */
    1459            0 :                         for (int i = 0; i < PQntuples(res); i++)
    1460            0 :                                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1461            0 :                 }
    1462              :                 else
    1463              :                 {
    1464            0 :                         pg_log_warning("could not obtain failover replication slot information: %s",
    1465              :                                                    PQresultErrorMessage(res));
    1466            0 :                         pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1467              :                 }
    1468              : 
    1469            0 :                 PQclear(res);
    1470            0 :                 disconnect_database(conn, false);
    1471            0 :         }
    1472              :         else
    1473              :         {
    1474            0 :                 pg_log_warning("could not drop failover replication slot");
    1475            0 :                 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1476              :         }
    1477            0 : }
    1478              : 
    1479              : /*
    1480              :  * Create a logical replication slot and returns a LSN.
    1481              :  *
    1482              :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1483              :  * result set that contains the LSN.
    1484              :  */
    1485              : static char *
    1486            0 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1487              : {
    1488            0 :         PQExpBuffer str = createPQExpBuffer();
    1489            0 :         PGresult   *res = NULL;
    1490            0 :         const char *slot_name = dbinfo->replslotname;
    1491            0 :         char       *slot_name_esc;
    1492            0 :         char       *lsn = NULL;
    1493              : 
    1494            0 :         Assert(conn != NULL);
    1495              : 
    1496            0 :         if (dry_run)
    1497            0 :                 pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
    1498              :                                         slot_name, dbinfo->dbname);
    1499              :         else
    1500            0 :                 pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
    1501              :                                         slot_name, dbinfo->dbname);
    1502              : 
    1503            0 :         slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1504              : 
    1505            0 :         appendPQExpBuffer(str,
    1506              :                                           "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
    1507            0 :                                           slot_name_esc,
    1508            0 :                                           dbinfos.two_phase ? "true" : "false");
    1509              : 
    1510            0 :         PQfreemem(slot_name_esc);
    1511              : 
    1512            0 :         pg_log_debug("command is: %s", str->data);
    1513              : 
    1514            0 :         if (!dry_run)
    1515              :         {
    1516            0 :                 res = PQexec(conn, str->data);
    1517            0 :                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1518              :                 {
    1519            0 :                         pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
    1520              :                                                  slot_name, dbinfo->dbname,
    1521              :                                                  PQresultErrorMessage(res));
    1522            0 :                         PQclear(res);
    1523            0 :                         destroyPQExpBuffer(str);
    1524            0 :                         return NULL;
    1525              :                 }
    1526              : 
    1527            0 :                 lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1528            0 :                 PQclear(res);
    1529            0 :         }
    1530              : 
    1531              :         /* For cleanup purposes */
    1532            0 :         dbinfo->made_replslot = true;
    1533              : 
    1534            0 :         destroyPQExpBuffer(str);
    1535              : 
    1536            0 :         return lsn;
    1537            0 : }
    1538              : 
    1539              : static void
    1540            0 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1541              :                                           const char *slot_name)
    1542              : {
    1543            0 :         PQExpBuffer str = createPQExpBuffer();
    1544            0 :         char       *slot_name_esc;
    1545            0 :         PGresult   *res;
    1546              : 
    1547            0 :         Assert(conn != NULL);
    1548              : 
    1549            0 :         if (dry_run)
    1550            0 :                 pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
    1551              :                                         slot_name, dbinfo->dbname);
    1552              :         else
    1553            0 :                 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
    1554              :                                         slot_name, dbinfo->dbname);
    1555              : 
    1556            0 :         slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1557              : 
    1558            0 :         appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1559              : 
    1560            0 :         PQfreemem(slot_name_esc);
    1561              : 
    1562            0 :         pg_log_debug("command is: %s", str->data);
    1563              : 
    1564            0 :         if (!dry_run)
    1565              :         {
    1566            0 :                 res = PQexec(conn, str->data);
    1567            0 :                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1568              :                 {
    1569            0 :                         pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
    1570              :                                                  slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1571            0 :                         dbinfo->made_replslot = false;       /* don't try again. */
    1572            0 :                 }
    1573              : 
    1574            0 :                 PQclear(res);
    1575            0 :         }
    1576              : 
    1577            0 :         destroyPQExpBuffer(str);
    1578            0 : }
    1579              : 
    1580              : /*
    1581              :  * Reports a suitable message if pg_ctl fails.
    1582              :  */
    1583              : static void
    1584            0 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1585              : {
    1586            0 :         if (rc != 0)
    1587              :         {
    1588            0 :                 if (WIFEXITED(rc))
    1589              :                 {
    1590            0 :                         pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
    1591            0 :                 }
    1592            0 :                 else if (WIFSIGNALED(rc))
    1593              :                 {
    1594              : #if defined(WIN32)
    1595              :                         pg_log_error("pg_ctl was terminated by exception 0x%X",
    1596              :                                                  WTERMSIG(rc));
    1597              :                         pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1598              : #else
    1599            0 :                         pg_log_error("pg_ctl was terminated by signal %d: %s",
    1600              :                                                  WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1601              : #endif
    1602            0 :                 }
    1603              :                 else
    1604              :                 {
    1605            0 :                         pg_log_error("pg_ctl exited with unrecognized status %d", rc);
    1606              :                 }
    1607              : 
    1608            0 :                 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
    1609            0 :                 exit(1);
    1610              :         }
    1611            0 : }
    1612              : 
    1613              : static void
    1614            0 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1615              :                                          bool restrict_logical_worker)
    1616              : {
    1617            0 :         PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1618            0 :         int                     rc;
    1619              : 
    1620            0 :         appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1621            0 :         appendShellString(pg_ctl_cmd, subscriber_dir);
    1622            0 :         appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1623              : 
    1624              :         /* Prevent unintended slot invalidation */
    1625            0 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1626              : 
    1627            0 :         if (restricted_access)
    1628              :         {
    1629            0 :                 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1630              : #if !defined(WIN32)
    1631              : 
    1632              :                 /*
    1633              :                  * An empty listen_addresses list means the server does not listen on
    1634              :                  * any IP interfaces; only Unix-domain sockets can be used to connect
    1635              :                  * to the server. Prevent external connections to minimize the chance
    1636              :                  * of failure.
    1637              :                  */
    1638            0 :                 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1639            0 :                 if (opt->socket_dir)
    1640            0 :                         appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1641            0 :                                                           opt->socket_dir);
    1642            0 :                 appendPQExpBufferChar(pg_ctl_cmd, '"');
    1643              : #endif
    1644            0 :         }
    1645            0 :         if (opt->config_file != NULL)
    1646            0 :                 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1647            0 :                                                   opt->config_file);
    1648              : 
    1649              :         /* Suppress to start logical replication if requested */
    1650            0 :         if (restrict_logical_worker)
    1651            0 :                 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1652              : 
    1653            0 :         pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    1654            0 :         rc = system(pg_ctl_cmd->data);
    1655            0 :         pg_ctl_status(pg_ctl_cmd->data, rc);
    1656            0 :         standby_running = true;
    1657            0 :         destroyPQExpBuffer(pg_ctl_cmd);
    1658            0 :         pg_log_info("server was started");
    1659            0 : }
    1660              : 
    1661              : static void
    1662            0 : stop_standby_server(const char *datadir)
    1663              : {
    1664            0 :         char       *pg_ctl_cmd;
    1665            0 :         int                     rc;
    1666              : 
    1667            0 :         pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1668            0 :                                                   datadir);
    1669            0 :         pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
    1670            0 :         rc = system(pg_ctl_cmd);
    1671            0 :         pg_ctl_status(pg_ctl_cmd, rc);
    1672            0 :         standby_running = false;
    1673            0 :         pg_log_info("server was stopped");
    1674            0 : }
    1675              : 
    1676              : /*
    1677              :  * Returns after the server finishes the recovery process.
    1678              :  *
    1679              :  * If recovery_timeout option is set, terminate abnormally without finishing
    1680              :  * the recovery process. By default, it waits forever.
    1681              :  *
    1682              :  * XXX Is the recovery process still in progress? When recovery process has a
    1683              :  * better progress reporting mechanism, it should be added here.
    1684              :  */
    1685              : static void
    1686            0 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1687              : {
    1688            0 :         PGconn     *conn;
    1689            0 :         bool            ready = false;
    1690            0 :         int                     timer = 0;
    1691              : 
    1692            0 :         pg_log_info("waiting for the target server to reach the consistent state");
    1693              : 
    1694            0 :         conn = connect_database(conninfo, true);
    1695              : 
    1696            0 :         for (;;)
    1697              :         {
    1698              :                 /* Did the recovery process finish? We're done if so. */
    1699            0 :                 if (dry_run || !server_is_in_recovery(conn))
    1700              :                 {
    1701            0 :                         ready = true;
    1702            0 :                         recovery_ended = true;
    1703            0 :                         break;
    1704              :                 }
    1705              : 
    1706              :                 /* Bail out after recovery_timeout seconds if this option is set */
    1707            0 :                 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1708              :                 {
    1709            0 :                         stop_standby_server(subscriber_dir);
    1710            0 :                         pg_log_error("recovery timed out");
    1711            0 :                         disconnect_database(conn, true);
    1712            0 :                 }
    1713              : 
    1714              :                 /* Keep waiting */
    1715            0 :                 pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
    1716            0 :                 timer += WAIT_INTERVAL;
    1717              :         }
    1718              : 
    1719            0 :         disconnect_database(conn, false);
    1720              : 
    1721            0 :         if (!ready)
    1722            0 :                 pg_fatal("server did not end recovery");
    1723              : 
    1724            0 :         pg_log_info("target server reached the consistent state");
    1725            0 :         pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1726            0 : }
    1727              : 
    1728              : /*
    1729              :  * Create a publication that includes all tables in the database.
    1730              :  */
    1731              : static void
    1732            0 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1733              : {
    1734            0 :         PQExpBuffer str = createPQExpBuffer();
    1735            0 :         PGresult   *res;
    1736            0 :         char       *ipubname_esc;
    1737            0 :         char       *spubname_esc;
    1738              : 
    1739            0 :         Assert(conn != NULL);
    1740              : 
    1741            0 :         ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1742            0 :         spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1743              : 
    1744              :         /* Check if the publication already exists */
    1745            0 :         appendPQExpBuffer(str,
    1746              :                                           "SELECT 1 FROM pg_catalog.pg_publication "
    1747              :                                           "WHERE pubname = %s",
    1748            0 :                                           spubname_esc);
    1749            0 :         res = PQexec(conn, str->data);
    1750            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1751              :         {
    1752            0 :                 pg_log_error("could not obtain publication information: %s",
    1753              :                                          PQresultErrorMessage(res));
    1754            0 :                 disconnect_database(conn, true);
    1755            0 :         }
    1756              : 
    1757            0 :         if (PQntuples(res) == 1)
    1758              :         {
    1759              :                 /*
    1760              :                  * Unfortunately, if it reaches this code path, it will always fail
    1761              :                  * (unless you decide to change the existing publication name). That's
    1762              :                  * bad but it is very unlikely that the user will choose a name with
    1763              :                  * pg_createsubscriber_ prefix followed by the exact database oid and
    1764              :                  * a random number.
    1765              :                  */
    1766            0 :                 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
    1767            0 :                 pg_log_error_hint("Consider renaming this publication before continuing.");
    1768            0 :                 disconnect_database(conn, true);
    1769            0 :         }
    1770              : 
    1771            0 :         PQclear(res);
    1772            0 :         resetPQExpBuffer(str);
    1773              : 
    1774            0 :         if (dry_run)
    1775            0 :                 pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
    1776              :                                         dbinfo->pubname, dbinfo->dbname);
    1777              :         else
    1778            0 :                 pg_log_info("creating publication \"%s\" in database \"%s\"",
    1779              :                                         dbinfo->pubname, dbinfo->dbname);
    1780              : 
    1781            0 :         appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1782            0 :                                           ipubname_esc);
    1783              : 
    1784            0 :         pg_log_debug("command is: %s", str->data);
    1785              : 
    1786            0 :         if (!dry_run)
    1787              :         {
    1788            0 :                 res = PQexec(conn, str->data);
    1789            0 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1790              :                 {
    1791            0 :                         pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
    1792              :                                                  dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1793            0 :                         disconnect_database(conn, true);
    1794            0 :                 }
    1795            0 :                 PQclear(res);
    1796            0 :         }
    1797              : 
    1798              :         /* For cleanup purposes */
    1799            0 :         dbinfo->made_publication = true;
    1800              : 
    1801            0 :         PQfreemem(ipubname_esc);
    1802            0 :         PQfreemem(spubname_esc);
    1803            0 :         destroyPQExpBuffer(str);
    1804            0 : }
    1805              : 
    1806              : /*
    1807              :  * Drop the specified publication in the given database.
    1808              :  */
    1809              : static void
    1810            0 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
    1811              :                                  bool *made_publication)
    1812              : {
    1813            0 :         PQExpBuffer str = createPQExpBuffer();
    1814            0 :         PGresult   *res;
    1815            0 :         char       *pubname_esc;
    1816              : 
    1817            0 :         Assert(conn != NULL);
    1818              : 
    1819            0 :         pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
    1820              : 
    1821            0 :         if (dry_run)
    1822            0 :                 pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
    1823              :                                         pubname, dbname);
    1824              :         else
    1825            0 :                 pg_log_info("dropping publication \"%s\" in database \"%s\"",
    1826              :                                         pubname, dbname);
    1827              : 
    1828            0 :         appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1829              : 
    1830            0 :         PQfreemem(pubname_esc);
    1831              : 
    1832            0 :         pg_log_debug("command is: %s", str->data);
    1833              : 
    1834            0 :         if (!dry_run)
    1835              :         {
    1836            0 :                 res = PQexec(conn, str->data);
    1837            0 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1838              :                 {
    1839            0 :                         pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
    1840              :                                                  pubname, dbname, PQresultErrorMessage(res));
    1841            0 :                         *made_publication = false;      /* don't try again. */
    1842              : 
    1843              :                         /*
    1844              :                          * Don't disconnect and exit here. This routine is used by primary
    1845              :                          * (cleanup publication / replication slot due to an error) and
    1846              :                          * subscriber (remove the replicated publications). In both cases,
    1847              :                          * it can continue and provide instructions for the user to remove
    1848              :                          * it later if cleanup fails.
    1849              :                          */
    1850            0 :                 }
    1851            0 :                 PQclear(res);
    1852            0 :         }
    1853              : 
    1854            0 :         destroyPQExpBuffer(str);
    1855            0 : }
    1856              : 
    1857              : /*
    1858              :  * Retrieve and drop the publications.
    1859              :  *
    1860              :  * Publications copied during physical replication remain on the subscriber
    1861              :  * after promotion. If --clean=publications is specified, drop all existing
    1862              :  * publications in the subscriber database. Otherwise, only drop publications
    1863              :  * that were created by pg_createsubscriber during this operation.
    1864              :  */
    1865              : static void
    1866            0 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1867              : {
    1868            0 :         PGresult   *res;
    1869            0 :         bool            drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
    1870              : 
    1871            0 :         Assert(conn != NULL);
    1872              : 
    1873            0 :         if (drop_all_pubs)
    1874              :         {
    1875            0 :                 pg_log_info("dropping all existing publications in database \"%s\"",
    1876              :                                         dbinfo->dbname);
    1877              : 
    1878              :                 /* Fetch all publication names */
    1879            0 :                 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
    1880            0 :                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1881              :                 {
    1882            0 :                         pg_log_error("could not obtain publication information: %s",
    1883              :                                                  PQresultErrorMessage(res));
    1884            0 :                         PQclear(res);
    1885            0 :                         disconnect_database(conn, true);
    1886            0 :                 }
    1887              : 
    1888              :                 /* Drop each publication */
    1889            0 :                 for (int i = 0; i < PQntuples(res); i++)
    1890            0 :                         drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
    1891            0 :                                                          &dbinfo->made_publication);
    1892              : 
    1893            0 :                 PQclear(res);
    1894            0 :         }
    1895              :         else
    1896              :         {
    1897              :                 /* Drop publication only if it was created by this tool */
    1898            0 :                 if (dbinfo->made_publication)
    1899              :                 {
    1900            0 :                         drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
    1901            0 :                                                          &dbinfo->made_publication);
    1902            0 :                 }
    1903              :                 else
    1904              :                 {
    1905            0 :                         if (dry_run)
    1906            0 :                                 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
    1907              :                                                         dbinfo->pubname, dbinfo->dbname);
    1908              :                         else
    1909            0 :                                 pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
    1910              :                                                         dbinfo->pubname, dbinfo->dbname);
    1911              :                 }
    1912              :         }
    1913            0 : }
    1914              : 
    1915              : /*
    1916              :  * Create a subscription with some predefined options.
    1917              :  *
    1918              :  * A replication slot was already created in a previous step. Let's use it.  It
    1919              :  * is not required to copy data. The subscription will be created but it will
    1920              :  * not be enabled now. That's because the replication progress must be set and
    1921              :  * the replication origin name (one of the function arguments) contains the
    1922              :  * subscription OID in its name. Once the subscription is created,
    1923              :  * set_replication_progress() can obtain the chosen origin name and set up its
    1924              :  * initial location.
    1925              :  */
    1926              : static void
    1927            0 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1928              : {
    1929            0 :         PQExpBuffer str = createPQExpBuffer();
    1930            0 :         PGresult   *res;
    1931            0 :         char       *pubname_esc;
    1932            0 :         char       *subname_esc;
    1933            0 :         char       *pubconninfo_esc;
    1934            0 :         char       *replslotname_esc;
    1935              : 
    1936            0 :         Assert(conn != NULL);
    1937              : 
    1938            0 :         pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1939            0 :         subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1940            0 :         pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    1941            0 :         replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    1942              : 
    1943            0 :         if (dry_run)
    1944            0 :                 pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
    1945              :                                         dbinfo->subname, dbinfo->dbname);
    1946              :         else
    1947            0 :                 pg_log_info("creating subscription \"%s\" in database \"%s\"",
    1948              :                                         dbinfo->subname, dbinfo->dbname);
    1949              : 
    1950            0 :         appendPQExpBuffer(str,
    1951              :                                           "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    1952              :                                           "WITH (create_slot = false, enabled = false, "
    1953              :                                           "slot_name = %s, copy_data = false, two_phase = %s)",
    1954            0 :                                           subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
    1955            0 :                                           dbinfos.two_phase ? "true" : "false");
    1956              : 
    1957            0 :         PQfreemem(pubname_esc);
    1958            0 :         PQfreemem(subname_esc);
    1959            0 :         PQfreemem(pubconninfo_esc);
    1960            0 :         PQfreemem(replslotname_esc);
    1961              : 
    1962            0 :         pg_log_debug("command is: %s", str->data);
    1963              : 
    1964            0 :         if (!dry_run)
    1965              :         {
    1966            0 :                 res = PQexec(conn, str->data);
    1967            0 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1968              :                 {
    1969            0 :                         pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
    1970              :                                                  dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    1971            0 :                         disconnect_database(conn, true);
    1972            0 :                 }
    1973            0 :                 PQclear(res);
    1974            0 :         }
    1975              : 
    1976            0 :         destroyPQExpBuffer(str);
    1977            0 : }
    1978              : 
    1979              : /*
    1980              :  * Sets the replication progress to the consistent LSN.
    1981              :  *
    1982              :  * The subscriber caught up to the consistent LSN provided by the last
    1983              :  * replication slot that was created. The goal is to set up the initial
    1984              :  * location for the logical replication that is the exact LSN that the
    1985              :  * subscriber was promoted. Once the subscription is enabled it will start
    1986              :  * streaming from that location onwards.  In dry run mode, the subscription OID
    1987              :  * and LSN are set to invalid values for printing purposes.
    1988              :  */
    1989              : static void
    1990            0 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    1991              : {
    1992            0 :         PQExpBuffer str = createPQExpBuffer();
    1993            0 :         PGresult   *res;
    1994            0 :         Oid                     suboid;
    1995            0 :         char       *subname;
    1996            0 :         char       *dbname;
    1997            0 :         char       *originname;
    1998            0 :         char       *lsnstr;
    1999              : 
    2000            0 :         Assert(conn != NULL);
    2001              : 
    2002            0 :         subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    2003            0 :         dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    2004              : 
    2005            0 :         appendPQExpBuffer(str,
    2006              :                                           "SELECT s.oid FROM pg_catalog.pg_subscription s "
    2007              :                                           "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    2008              :                                           "WHERE s.subname = %s AND d.datname = %s",
    2009            0 :                                           subname, dbname);
    2010              : 
    2011            0 :         res = PQexec(conn, str->data);
    2012            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2013              :         {
    2014            0 :                 pg_log_error("could not obtain subscription OID: %s",
    2015              :                                          PQresultErrorMessage(res));
    2016            0 :                 disconnect_database(conn, true);
    2017            0 :         }
    2018              : 
    2019            0 :         if (PQntuples(res) != 1 && !dry_run)
    2020              :         {
    2021            0 :                 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
    2022              :                                          PQntuples(res), 1);
    2023            0 :                 disconnect_database(conn, true);
    2024            0 :         }
    2025              : 
    2026            0 :         if (dry_run)
    2027              :         {
    2028            0 :                 suboid = InvalidOid;
    2029            0 :                 lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    2030            0 :         }
    2031              :         else
    2032              :         {
    2033            0 :                 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    2034            0 :                 lsnstr = psprintf("%s", lsn);
    2035              :         }
    2036              : 
    2037            0 :         PQclear(res);
    2038              : 
    2039              :         /*
    2040              :          * The origin name is defined as pg_%u. %u is the subscription OID. See
    2041              :          * ApplyWorkerMain().
    2042              :          */
    2043            0 :         originname = psprintf("pg_%u", suboid);
    2044              : 
    2045            0 :         if (dry_run)
    2046            0 :                 pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2047              :                                         originname, lsnstr, dbinfo->dbname);
    2048              :         else
    2049            0 :                 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2050              :                                         originname, lsnstr, dbinfo->dbname);
    2051              : 
    2052            0 :         resetPQExpBuffer(str);
    2053            0 :         appendPQExpBuffer(str,
    2054              :                                           "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    2055            0 :                                           originname, lsnstr);
    2056              : 
    2057            0 :         pg_log_debug("command is: %s", str->data);
    2058              : 
    2059            0 :         if (!dry_run)
    2060              :         {
    2061            0 :                 res = PQexec(conn, str->data);
    2062            0 :                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2063              :                 {
    2064            0 :                         pg_log_error("could not set replication progress for subscription \"%s\": %s",
    2065              :                                                  dbinfo->subname, PQresultErrorMessage(res));
    2066            0 :                         disconnect_database(conn, true);
    2067            0 :                 }
    2068            0 :                 PQclear(res);
    2069            0 :         }
    2070              : 
    2071            0 :         PQfreemem(subname);
    2072            0 :         PQfreemem(dbname);
    2073            0 :         pg_free(originname);
    2074            0 :         pg_free(lsnstr);
    2075            0 :         destroyPQExpBuffer(str);
    2076            0 : }
    2077              : 
    2078              : /*
    2079              :  * Enables the subscription.
    2080              :  *
    2081              :  * The subscription was created in a previous step but it was disabled. After
    2082              :  * adjusting the initial logical replication location, enable the subscription.
    2083              :  */
    2084              : static void
    2085            0 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    2086              : {
    2087            0 :         PQExpBuffer str = createPQExpBuffer();
    2088            0 :         PGresult   *res;
    2089            0 :         char       *subname;
    2090              : 
    2091            0 :         Assert(conn != NULL);
    2092              : 
    2093            0 :         subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    2094              : 
    2095            0 :         if (dry_run)
    2096            0 :                 pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
    2097              :                                         dbinfo->subname, dbinfo->dbname);
    2098              :         else
    2099            0 :                 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
    2100              :                                         dbinfo->subname, dbinfo->dbname);
    2101              : 
    2102            0 :         appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    2103              : 
    2104            0 :         pg_log_debug("command is: %s", str->data);
    2105              : 
    2106            0 :         if (!dry_run)
    2107              :         {
    2108            0 :                 res = PQexec(conn, str->data);
    2109            0 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2110              :                 {
    2111            0 :                         pg_log_error("could not enable subscription \"%s\": %s",
    2112              :                                                  dbinfo->subname, PQresultErrorMessage(res));
    2113            0 :                         disconnect_database(conn, true);
    2114            0 :                 }
    2115              : 
    2116            0 :                 PQclear(res);
    2117            0 :         }
    2118              : 
    2119            0 :         PQfreemem(subname);
    2120            0 :         destroyPQExpBuffer(str);
    2121            0 : }
    2122              : 
    2123              : /*
    2124              :  * Fetch a list of all connectable non-template databases from the source server
    2125              :  * and form a list such that they appear as if the user has specified multiple
    2126              :  * --database options, one for each source database.
    2127              :  */
    2128              : static void
    2129            0 : get_publisher_databases(struct CreateSubscriberOptions *opt,
    2130              :                                                 bool dbnamespecified)
    2131              : {
    2132            0 :         PGconn     *conn;
    2133            0 :         PGresult   *res;
    2134              : 
    2135              :         /* If a database name was specified, just connect to it. */
    2136            0 :         if (dbnamespecified)
    2137            0 :                 conn = connect_database(opt->pub_conninfo_str, true);
    2138              :         else
    2139              :         {
    2140              :                 /* Otherwise, try postgres first and then template1. */
    2141            0 :                 char       *conninfo;
    2142              : 
    2143            0 :                 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
    2144            0 :                 conn = connect_database(conninfo, false);
    2145            0 :                 pg_free(conninfo);
    2146            0 :                 if (!conn)
    2147              :                 {
    2148            0 :                         conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
    2149            0 :                         conn = connect_database(conninfo, true);
    2150            0 :                         pg_free(conninfo);
    2151            0 :                 }
    2152            0 :         }
    2153              : 
    2154            0 :         res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
    2155            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2156              :         {
    2157            0 :                 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
    2158            0 :                 PQclear(res);
    2159            0 :                 disconnect_database(conn, true);
    2160            0 :         }
    2161              : 
    2162            0 :         for (int i = 0; i < PQntuples(res); i++)
    2163              :         {
    2164            0 :                 const char *dbname = PQgetvalue(res, i, 0);
    2165              : 
    2166            0 :                 simple_string_list_append(&opt->database_names, dbname);
    2167              : 
    2168              :                 /* Increment num_dbs to reflect multiple --database options */
    2169            0 :                 num_dbs++;
    2170            0 :         }
    2171              : 
    2172            0 :         PQclear(res);
    2173            0 :         disconnect_database(conn, false);
    2174            0 : }
    2175              : 
    2176              : int
    2177            0 : main(int argc, char **argv)
    2178              : {
    2179              :         static struct option long_options[] =
    2180              :         {
    2181              :                 {"all", no_argument, NULL, 'a'},
    2182              :                 {"database", required_argument, NULL, 'd'},
    2183              :                 {"pgdata", required_argument, NULL, 'D'},
    2184              :                 {"dry-run", no_argument, NULL, 'n'},
    2185              :                 {"subscriber-port", required_argument, NULL, 'p'},
    2186              :                 {"publisher-server", required_argument, NULL, 'P'},
    2187              :                 {"socketdir", required_argument, NULL, 's'},
    2188              :                 {"recovery-timeout", required_argument, NULL, 't'},
    2189              :                 {"enable-two-phase", no_argument, NULL, 'T'},
    2190              :                 {"subscriber-username", required_argument, NULL, 'U'},
    2191              :                 {"verbose", no_argument, NULL, 'v'},
    2192              :                 {"version", no_argument, NULL, 'V'},
    2193              :                 {"help", no_argument, NULL, '?'},
    2194              :                 {"config-file", required_argument, NULL, 1},
    2195              :                 {"publication", required_argument, NULL, 2},
    2196              :                 {"replication-slot", required_argument, NULL, 3},
    2197              :                 {"subscription", required_argument, NULL, 4},
    2198              :                 {"clean", required_argument, NULL, 5},
    2199              :                 {NULL, 0, NULL, 0}
    2200              :         };
    2201              : 
    2202            0 :         struct CreateSubscriberOptions opt = {0};
    2203              : 
    2204            0 :         int                     c;
    2205            0 :         int                     option_index;
    2206              : 
    2207            0 :         char       *pub_base_conninfo;
    2208            0 :         char       *sub_base_conninfo;
    2209            0 :         char       *dbname_conninfo = NULL;
    2210              : 
    2211            0 :         uint64          pub_sysid;
    2212            0 :         uint64          sub_sysid;
    2213            0 :         struct stat statbuf;
    2214              : 
    2215            0 :         char       *consistent_lsn;
    2216              : 
    2217            0 :         char            pidfile[MAXPGPATH];
    2218              : 
    2219            0 :         pg_logging_init(argv[0]);
    2220            0 :         pg_logging_set_level(PG_LOG_WARNING);
    2221            0 :         progname = get_progname(argv[0]);
    2222            0 :         set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    2223              : 
    2224            0 :         if (argc > 1)
    2225              :         {
    2226            0 :                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    2227              :                 {
    2228            0 :                         usage();
    2229            0 :                         exit(0);
    2230              :                 }
    2231            0 :                 else if (strcmp(argv[1], "-V") == 0
    2232            0 :                                  || strcmp(argv[1], "--version") == 0)
    2233              :                 {
    2234            0 :                         puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    2235            0 :                         exit(0);
    2236              :                 }
    2237            0 :         }
    2238              : 
    2239              :         /* Default settings */
    2240            0 :         subscriber_dir = NULL;
    2241            0 :         opt.config_file = NULL;
    2242            0 :         opt.pub_conninfo_str = NULL;
    2243            0 :         opt.socket_dir = NULL;
    2244            0 :         opt.sub_port = DEFAULT_SUB_PORT;
    2245            0 :         opt.sub_username = NULL;
    2246            0 :         opt.two_phase = false;
    2247            0 :         opt.database_names = (SimpleStringList)
    2248            0 :         {
    2249              :                 0
    2250              :         };
    2251            0 :         opt.recovery_timeout = 0;
    2252            0 :         opt.all_dbs = false;
    2253              : 
    2254              :         /*
    2255              :          * Don't allow it to be run as root. It uses pg_ctl which does not allow
    2256              :          * it either.
    2257              :          */
    2258              : #ifndef WIN32
    2259            0 :         if (geteuid() == 0)
    2260              :         {
    2261            0 :                 pg_log_error("cannot be executed by \"root\"");
    2262            0 :                 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
    2263              :                                                   progname);
    2264            0 :                 exit(1);
    2265              :         }
    2266              : #endif
    2267              : 
    2268            0 :         get_restricted_token();
    2269              : 
    2270            0 :         while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
    2271            0 :                                                         long_options, &option_index)) != -1)
    2272              :         {
    2273            0 :                 switch (c)
    2274              :                 {
    2275              :                         case 'a':
    2276            0 :                                 opt.all_dbs = true;
    2277            0 :                                 break;
    2278              :                         case 'd':
    2279            0 :                                 if (!simple_string_list_member(&opt.database_names, optarg))
    2280              :                                 {
    2281            0 :                                         simple_string_list_append(&opt.database_names, optarg);
    2282            0 :                                         num_dbs++;
    2283            0 :                                 }
    2284              :                                 else
    2285            0 :                                         pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
    2286            0 :                                 break;
    2287              :                         case 'D':
    2288            0 :                                 subscriber_dir = pg_strdup(optarg);
    2289            0 :                                 canonicalize_path(subscriber_dir);
    2290            0 :                                 break;
    2291              :                         case 'n':
    2292            0 :                                 dry_run = true;
    2293            0 :                                 break;
    2294              :                         case 'p':
    2295            0 :                                 opt.sub_port = pg_strdup(optarg);
    2296            0 :                                 break;
    2297              :                         case 'P':
    2298            0 :                                 opt.pub_conninfo_str = pg_strdup(optarg);
    2299            0 :                                 break;
    2300              :                         case 's':
    2301            0 :                                 opt.socket_dir = pg_strdup(optarg);
    2302            0 :                                 canonicalize_path(opt.socket_dir);
    2303            0 :                                 break;
    2304              :                         case 't':
    2305            0 :                                 opt.recovery_timeout = atoi(optarg);
    2306            0 :                                 break;
    2307              :                         case 'T':
    2308            0 :                                 opt.two_phase = true;
    2309            0 :                                 break;
    2310              :                         case 'U':
    2311            0 :                                 opt.sub_username = pg_strdup(optarg);
    2312            0 :                                 break;
    2313              :                         case 'v':
    2314            0 :                                 pg_logging_increase_verbosity();
    2315            0 :                                 break;
    2316              :                         case 1:
    2317            0 :                                 opt.config_file = pg_strdup(optarg);
    2318            0 :                                 break;
    2319              :                         case 2:
    2320            0 :                                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2321              :                                 {
    2322            0 :                                         simple_string_list_append(&opt.pub_names, optarg);
    2323            0 :                                         num_pubs++;
    2324            0 :                                 }
    2325              :                                 else
    2326            0 :                                         pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
    2327            0 :                                 break;
    2328              :                         case 3:
    2329            0 :                                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2330              :                                 {
    2331            0 :                                         simple_string_list_append(&opt.replslot_names, optarg);
    2332            0 :                                         num_replslots++;
    2333            0 :                                 }
    2334              :                                 else
    2335            0 :                                         pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
    2336            0 :                                 break;
    2337              :                         case 4:
    2338            0 :                                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2339              :                                 {
    2340            0 :                                         simple_string_list_append(&opt.sub_names, optarg);
    2341            0 :                                         num_subs++;
    2342            0 :                                 }
    2343              :                                 else
    2344            0 :                                         pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
    2345            0 :                                 break;
    2346              :                         case 5:
    2347            0 :                                 if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
    2348            0 :                                         simple_string_list_append(&opt.objecttypes_to_clean, optarg);
    2349              :                                 else
    2350            0 :                                         pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
    2351            0 :                                 break;
    2352              :                         default:
    2353              :                                 /* getopt_long already emitted a complaint */
    2354            0 :                                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2355            0 :                                 exit(1);
    2356              :                 }
    2357              :         }
    2358              : 
    2359              :         /* Validate that --all is not used with incompatible options */
    2360            0 :         if (opt.all_dbs)
    2361              :         {
    2362            0 :                 char       *bad_switch = NULL;
    2363              : 
    2364            0 :                 if (num_dbs > 0)
    2365            0 :                         bad_switch = "--database";
    2366            0 :                 else if (num_pubs > 0)
    2367            0 :                         bad_switch = "--publication";
    2368            0 :                 else if (num_replslots > 0)
    2369            0 :                         bad_switch = "--replication-slot";
    2370            0 :                 else if (num_subs > 0)
    2371            0 :                         bad_switch = "--subscription";
    2372              : 
    2373            0 :                 if (bad_switch)
    2374              :                 {
    2375            0 :                         pg_log_error("options %s and %s cannot be used together",
    2376              :                                                  bad_switch, "-a/--all");
    2377            0 :                         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2378            0 :                         exit(1);
    2379              :                 }
    2380            0 :         }
    2381              : 
    2382              :         /* Any non-option arguments? */
    2383            0 :         if (optind < argc)
    2384              :         {
    2385            0 :                 pg_log_error("too many command-line arguments (first is \"%s\")",
    2386              :                                          argv[optind]);
    2387            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2388            0 :                 exit(1);
    2389              :         }
    2390              : 
    2391              :         /* Required arguments */
    2392            0 :         if (subscriber_dir == NULL)
    2393              :         {
    2394            0 :                 pg_log_error("no subscriber data directory specified");
    2395            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2396            0 :                 exit(1);
    2397              :         }
    2398              : 
    2399              :         /* If socket directory is not provided, use the current directory */
    2400            0 :         if (opt.socket_dir == NULL)
    2401              :         {
    2402            0 :                 char            cwd[MAXPGPATH];
    2403              : 
    2404            0 :                 if (!getcwd(cwd, MAXPGPATH))
    2405            0 :                         pg_fatal("could not determine current directory");
    2406            0 :                 opt.socket_dir = pg_strdup(cwd);
    2407            0 :                 canonicalize_path(opt.socket_dir);
    2408            0 :         }
    2409              : 
    2410              :         /*
    2411              :          * Parse connection string. Build a base connection string that might be
    2412              :          * reused by multiple databases.
    2413              :          */
    2414            0 :         if (opt.pub_conninfo_str == NULL)
    2415              :         {
    2416              :                 /*
    2417              :                  * TODO use primary_conninfo (if available) from subscriber and
    2418              :                  * extract publisher connection string. Assume that there are
    2419              :                  * identical entries for physical and logical replication. If there is
    2420              :                  * not, we would fail anyway.
    2421              :                  */
    2422            0 :                 pg_log_error("no publisher connection string specified");
    2423            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2424            0 :                 exit(1);
    2425              :         }
    2426              : 
    2427            0 :         if (dry_run)
    2428            0 :                 pg_log_info("Executing in dry-run mode.\n"
    2429              :                                         "The target directory will not be modified.");
    2430              : 
    2431            0 :         pg_log_info("validating publisher connection string");
    2432            0 :         pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2433              :                                                                                   &dbname_conninfo);
    2434            0 :         if (pub_base_conninfo == NULL)
    2435            0 :                 exit(1);
    2436              : 
    2437            0 :         pg_log_info("validating subscriber connection string");
    2438            0 :         sub_base_conninfo = get_sub_conninfo(&opt);
    2439              : 
    2440              :         /*
    2441              :          * Fetch all databases from the source (publisher) and treat them as if
    2442              :          * the user specified has multiple --database options, one for each source
    2443              :          * database.
    2444              :          */
    2445            0 :         if (opt.all_dbs)
    2446              :         {
    2447            0 :                 bool            dbnamespecified = (dbname_conninfo != NULL);
    2448              : 
    2449            0 :                 get_publisher_databases(&opt, dbnamespecified);
    2450            0 :         }
    2451              : 
    2452            0 :         if (opt.database_names.head == NULL)
    2453              :         {
    2454            0 :                 pg_log_info("no database was specified");
    2455              : 
    2456              :                 /*
    2457              :                  * Try to obtain the dbname from the publisher conninfo. If dbname
    2458              :                  * parameter is not available, error out.
    2459              :                  */
    2460            0 :                 if (dbname_conninfo)
    2461              :                 {
    2462            0 :                         simple_string_list_append(&opt.database_names, dbname_conninfo);
    2463            0 :                         num_dbs++;
    2464              : 
    2465            0 :                         pg_log_info("database name \"%s\" was extracted from the publisher connection string",
    2466              :                                                 dbname_conninfo);
    2467            0 :                 }
    2468              :                 else
    2469              :                 {
    2470            0 :                         pg_log_error("no database name specified");
    2471            0 :                         pg_log_error_hint("Try \"%s --help\" for more information.",
    2472              :                                                           progname);
    2473            0 :                         exit(1);
    2474              :                 }
    2475            0 :         }
    2476              : 
    2477              :         /* Number of object names must match number of databases */
    2478            0 :         if (num_pubs > 0 && num_pubs != num_dbs)
    2479              :         {
    2480            0 :                 pg_log_error("wrong number of publication names specified");
    2481            0 :                 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
    2482              :                                                         num_pubs, num_dbs);
    2483            0 :                 exit(1);
    2484              :         }
    2485            0 :         if (num_subs > 0 && num_subs != num_dbs)
    2486              :         {
    2487            0 :                 pg_log_error("wrong number of subscription names specified");
    2488            0 :                 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2489              :                                                         num_subs, num_dbs);
    2490            0 :                 exit(1);
    2491              :         }
    2492            0 :         if (num_replslots > 0 && num_replslots != num_dbs)
    2493              :         {
    2494            0 :                 pg_log_error("wrong number of replication slot names specified");
    2495            0 :                 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2496              :                                                         num_replslots, num_dbs);
    2497            0 :                 exit(1);
    2498              :         }
    2499              : 
    2500              :         /* Verify the object types specified for removal from the subscriber */
    2501            0 :         for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
    2502              :         {
    2503            0 :                 if (pg_strcasecmp(cell->val, "publications") == 0)
    2504            0 :                         dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
    2505              :                 else
    2506              :                 {
    2507            0 :                         pg_log_error("invalid object type \"%s\" specified for %s",
    2508              :                                                  cell->val, "--clean");
    2509            0 :                         pg_log_error_hint("The valid value is: \"%s\"", "publications");
    2510            0 :                         exit(1);
    2511              :                 }
    2512            0 :         }
    2513              : 
    2514              :         /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2515            0 :         pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2516            0 :         pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2517              : 
    2518              :         /* Rudimentary check for a data directory */
    2519            0 :         check_data_directory(subscriber_dir);
    2520              : 
    2521            0 :         dbinfos.two_phase = opt.two_phase;
    2522              : 
    2523              :         /*
    2524              :          * Store database information for publisher and subscriber. It should be
    2525              :          * called before atexit() because its return is used in the
    2526              :          * cleanup_objects_atexit().
    2527              :          */
    2528            0 :         dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2529              : 
    2530              :         /* Register a function to clean up objects in case of failure */
    2531            0 :         atexit(cleanup_objects_atexit);
    2532              : 
    2533              :         /*
    2534              :          * Check if the subscriber data directory has the same system identifier
    2535              :          * than the publisher data directory.
    2536              :          */
    2537            0 :         pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
    2538            0 :         sub_sysid = get_standby_sysid(subscriber_dir);
    2539            0 :         if (pub_sysid != sub_sysid)
    2540            0 :                 pg_fatal("subscriber data directory is not a copy of the source database cluster");
    2541              : 
    2542              :         /* Subscriber PID file */
    2543            0 :         snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2544              : 
    2545              :         /*
    2546              :          * The standby server must not be running. If the server is started under
    2547              :          * service manager and pg_createsubscriber stops it, the service manager
    2548              :          * might react to this action and start the server again. Therefore,
    2549              :          * refuse to proceed if the server is running to avoid possible failures.
    2550              :          */
    2551            0 :         if (stat(pidfile, &statbuf) == 0)
    2552              :         {
    2553            0 :                 pg_log_error("standby server is running");
    2554            0 :                 pg_log_error_hint("Stop the standby server and try again.");
    2555            0 :                 exit(1);
    2556              :         }
    2557              : 
    2558              :         /*
    2559              :          * Start a short-lived standby server with temporary parameters (provided
    2560              :          * by command-line options). The goal is to avoid connections during the
    2561              :          * transformation steps.
    2562              :          */
    2563            0 :         pg_log_info("starting the standby server with command-line options");
    2564            0 :         start_standby_server(&opt, true, false);
    2565              : 
    2566              :         /* Check if the standby server is ready for logical replication */
    2567            0 :         check_subscriber(dbinfos.dbinfo);
    2568              : 
    2569              :         /* Check if the primary server is ready for logical replication */
    2570            0 :         check_publisher(dbinfos.dbinfo);
    2571              : 
    2572              :         /*
    2573              :          * Stop the target server. The recovery process requires that the server
    2574              :          * reaches a consistent state before targeting the recovery stop point.
    2575              :          * Make sure a consistent state is reached (stop the target server
    2576              :          * guarantees it) *before* creating the replication slots in
    2577              :          * setup_publisher().
    2578              :          */
    2579            0 :         pg_log_info("stopping the subscriber");
    2580            0 :         stop_standby_server(subscriber_dir);
    2581              : 
    2582              :         /* Create the required objects for each database on publisher */
    2583            0 :         consistent_lsn = setup_publisher(dbinfos.dbinfo);
    2584              : 
    2585              :         /* Write the required recovery parameters */
    2586            0 :         setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
    2587              : 
    2588              :         /*
    2589              :          * Start subscriber so the recovery parameters will take effect. Wait
    2590              :          * until accepting connections. We don't want to start logical replication
    2591              :          * during setup.
    2592              :          */
    2593            0 :         pg_log_info("starting the subscriber");
    2594            0 :         start_standby_server(&opt, true, true);
    2595              : 
    2596              :         /* Waiting the subscriber to be promoted */
    2597            0 :         wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
    2598              : 
    2599              :         /*
    2600              :          * Create the subscription for each database on subscriber. It does not
    2601              :          * enable it immediately because it needs to adjust the replication start
    2602              :          * point to the LSN reported by setup_publisher().  It also cleans up
    2603              :          * publications created by this tool and replication to the standby.
    2604              :          */
    2605            0 :         setup_subscriber(dbinfos.dbinfo, consistent_lsn);
    2606              : 
    2607              :         /* Remove primary_slot_name if it exists on primary */
    2608            0 :         drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
    2609              : 
    2610              :         /* Remove failover replication slots if they exist on subscriber */
    2611            0 :         drop_failover_replication_slots(dbinfos.dbinfo);
    2612              : 
    2613              :         /* Stop the subscriber */
    2614            0 :         pg_log_info("stopping the subscriber");
    2615            0 :         stop_standby_server(subscriber_dir);
    2616              : 
    2617              :         /* Change system identifier from subscriber */
    2618            0 :         modify_subscriber_sysid(&opt);
    2619              : 
    2620            0 :         success = true;
    2621              : 
    2622            0 :         pg_log_info("Done!");
    2623              : 
    2624            0 :         return 0;
    2625            0 : }
        

Generated by: LCOV version 2.3.2-1