LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 393 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 15 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4              :  *                                      pg_recvlogical
       5              :  *
       6              :  * Author: Magnus Hagander <magnus@hagander.net>
       7              :  *
       8              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *                src/bin/pg_basebackup/streamutil.c
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <sys/time.h>
      18              : #include <unistd.h>
      19              : 
      20              : #include "access/xlog_internal.h"
      21              : #include "common/connect.h"
      22              : #include "common/file_perm.h"
      23              : #include "common/logging.h"
      24              : #include "common/string.h"
      25              : #include "datatype/timestamp.h"
      26              : #include "port/pg_bswap.h"
      27              : #include "pqexpbuffer.h"
      28              : #include "streamutil.h"
      29              : 
      30              : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      31              : 
      32              : int                     WalSegSz;
      33              : 
      34              : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      35              : 
      36              : /* SHOW command for replication connection was introduced in version 10 */
      37              : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      38              : 
      39              : /*
      40              :  * Group access is supported from version 11.
      41              :  */
      42              : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      43              : 
      44              : const char *progname;
      45              : char       *connection_string = NULL;
      46              : char       *dbhost = NULL;
      47              : char       *dbuser = NULL;
      48              : char       *dbport = NULL;
      49              : char       *dbname = NULL;
      50              : int                     dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
      51              : static char *password = NULL;
      52              : PGconn     *conn = NULL;
      53              : 
      54              : /*
      55              :  * Connect to the server. Returns a valid PGconn pointer if connected,
      56              :  * or NULL on non-permanent error. On permanent error, the function will
      57              :  * call exit(1) directly.
      58              :  */
      59              : PGconn *
      60            0 : GetConnection(void)
      61              : {
      62            0 :         PGconn     *tmpconn;
      63            0 :         int                     argcount = 7;   /* dbname, replication, fallback_app_name,
      64              :                                                                  * host, user, port, password */
      65            0 :         int                     i;
      66            0 :         const char **keywords;
      67            0 :         const char **values;
      68            0 :         const char *tmpparam;
      69            0 :         bool            need_password;
      70            0 :         PQconninfoOption *conn_opts = NULL;
      71            0 :         PQconninfoOption *conn_opt;
      72            0 :         char       *err_msg = NULL;
      73              : 
      74              :         /*
      75              :          * pg_recvlogical uses dbname only; others use connection_string only.
      76              :          * (Note: both variables will be NULL if there's no command line options.)
      77              :          */
      78            0 :         Assert(dbname == NULL || connection_string == NULL);
      79              : 
      80              :         /*
      81              :          * Merge the connection info inputs given in form of connection string,
      82              :          * options and default values (dbname=replication, replication=true, etc.)
      83              :          */
      84            0 :         i = 0;
      85            0 :         if (connection_string)
      86              :         {
      87            0 :                 conn_opts = PQconninfoParse(connection_string, &err_msg);
      88            0 :                 if (conn_opts == NULL)
      89            0 :                         pg_fatal("%s", err_msg);
      90              : 
      91            0 :                 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      92              :                 {
      93            0 :                         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
      94            0 :                                 argcount++;
      95            0 :                 }
      96              : 
      97            0 :                 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
      98            0 :                 values = pg_malloc0((argcount + 1) * sizeof(*values));
      99              : 
     100              :                 /*
     101              :                  * Set dbname here already, so it can be overridden by a dbname in the
     102              :                  * connection string.
     103              :                  */
     104            0 :                 keywords[i] = "dbname";
     105            0 :                 values[i] = "replication";
     106            0 :                 i++;
     107              : 
     108            0 :                 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     109              :                 {
     110            0 :                         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     111              :                         {
     112            0 :                                 keywords[i] = conn_opt->keyword;
     113            0 :                                 values[i] = conn_opt->val;
     114            0 :                                 i++;
     115            0 :                         }
     116            0 :                 }
     117            0 :         }
     118              :         else
     119              :         {
     120            0 :                 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
     121            0 :                 values = pg_malloc0((argcount + 1) * sizeof(*values));
     122            0 :                 keywords[i] = "dbname";
     123            0 :                 values[i] = (dbname == NULL) ? "replication" : dbname;
     124            0 :                 i++;
     125              :         }
     126              : 
     127            0 :         keywords[i] = "replication";
     128            0 :         values[i] = (dbname == NULL) ? "true" : "database";
     129            0 :         i++;
     130            0 :         keywords[i] = "fallback_application_name";
     131            0 :         values[i] = progname;
     132            0 :         i++;
     133              : 
     134            0 :         if (dbhost)
     135              :         {
     136            0 :                 keywords[i] = "host";
     137            0 :                 values[i] = dbhost;
     138            0 :                 i++;
     139            0 :         }
     140            0 :         if (dbuser)
     141              :         {
     142            0 :                 keywords[i] = "user";
     143            0 :                 values[i] = dbuser;
     144            0 :                 i++;
     145            0 :         }
     146            0 :         if (dbport)
     147              :         {
     148            0 :                 keywords[i] = "port";
     149            0 :                 values[i] = dbport;
     150            0 :                 i++;
     151            0 :         }
     152              : 
     153              :         /* If -W was given, force prompt for password, but only the first time */
     154            0 :         need_password = (dbgetpassword == 1 && !password);
     155              : 
     156            0 :         do
     157              :         {
     158              :                 /* Get a new password if appropriate */
     159            0 :                 if (need_password)
     160              :                 {
     161            0 :                         free(password);
     162            0 :                         password = simple_prompt("Password: ", false);
     163            0 :                         need_password = false;
     164            0 :                 }
     165              : 
     166              :                 /* Use (or reuse, on a subsequent connection) password if we have it */
     167            0 :                 if (password)
     168              :                 {
     169            0 :                         keywords[i] = "password";
     170            0 :                         values[i] = password;
     171            0 :                 }
     172              :                 else
     173              :                 {
     174            0 :                         keywords[i] = NULL;
     175            0 :                         values[i] = NULL;
     176              :                 }
     177              : 
     178              :                 /*
     179              :                  * Only expand dbname when we did not already parse the argument as a
     180              :                  * connection string ourselves.
     181              :                  */
     182            0 :                 tmpconn = PQconnectdbParams(keywords, values, !connection_string);
     183              : 
     184              :                 /*
     185              :                  * If there is too little memory even to allocate the PGconn object
     186              :                  * and PQconnectdbParams returns NULL, we call exit(1) directly.
     187              :                  */
     188            0 :                 if (!tmpconn)
     189            0 :                         pg_fatal("could not connect to server");
     190              : 
     191              :                 /* If we need a password and -w wasn't given, loop back and get one */
     192            0 :                 if (PQstatus(tmpconn) == CONNECTION_BAD &&
     193            0 :                         PQconnectionNeedsPassword(tmpconn) &&
     194            0 :                         dbgetpassword != -1)
     195              :                 {
     196            0 :                         PQfinish(tmpconn);
     197            0 :                         need_password = true;
     198            0 :                 }
     199            0 :         }
     200            0 :         while (need_password);
     201              : 
     202            0 :         if (PQstatus(tmpconn) != CONNECTION_OK)
     203              :         {
     204            0 :                 pg_log_error("%s", PQerrorMessage(tmpconn));
     205            0 :                 PQfinish(tmpconn);
     206            0 :                 free(values);
     207            0 :                 free(keywords);
     208            0 :                 PQconninfoFree(conn_opts);
     209            0 :                 return NULL;
     210              :         }
     211              : 
     212              :         /* Connection ok! */
     213            0 :         free(values);
     214            0 :         free(keywords);
     215            0 :         PQconninfoFree(conn_opts);
     216              : 
     217              :         /*
     218              :          * Set always-secure search path, so malicious users can't get control.
     219              :          * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     220              :          * the search path cannot be changed (by us or attackers) on earlier
     221              :          * versions.
     222              :          */
     223            0 :         if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     224              :         {
     225            0 :                 PGresult   *res;
     226              : 
     227            0 :                 res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     228            0 :                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
     229              :                 {
     230            0 :                         pg_log_error("could not clear \"search_path\": %s",
     231              :                                                  PQerrorMessage(tmpconn));
     232            0 :                         PQclear(res);
     233            0 :                         PQfinish(tmpconn);
     234            0 :                         exit(1);
     235              :                 }
     236            0 :                 PQclear(res);
     237            0 :         }
     238              : 
     239              :         /*
     240              :          * Ensure we have the same value of integer_datetimes (now always "on") as
     241              :          * the server we are connecting to.
     242              :          */
     243            0 :         tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     244            0 :         if (!tmpparam)
     245              :         {
     246            0 :                 pg_log_error("could not determine server setting for \"integer_datetimes\"");
     247            0 :                 PQfinish(tmpconn);
     248            0 :                 exit(1);
     249              :         }
     250              : 
     251            0 :         if (strcmp(tmpparam, "on") != 0)
     252              :         {
     253            0 :                 pg_log_error("\"integer_datetimes\" compile flag does not match server");
     254            0 :                 PQfinish(tmpconn);
     255            0 :                 exit(1);
     256              :         }
     257              : 
     258              :         /*
     259              :          * Retrieve the source data directory mode and use it to construct a umask
     260              :          * for creating directories and files.
     261              :          */
     262            0 :         if (!RetrieveDataDirCreatePerm(tmpconn))
     263              :         {
     264            0 :                 PQfinish(tmpconn);
     265            0 :                 exit(1);
     266              :         }
     267              : 
     268            0 :         return tmpconn;
     269            0 : }
     270              : 
     271              : /*
     272              :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     273              :  * since ControlFile is not accessible here.
     274              :  */
     275              : bool
     276            0 : RetrieveWalSegSize(PGconn *conn)
     277              : {
     278            0 :         PGresult   *res;
     279            0 :         char            xlog_unit[3];
     280            0 :         int                     xlog_val,
     281            0 :                                 multiplier = 1;
     282              : 
     283              :         /* check connection existence */
     284            0 :         Assert(conn != NULL);
     285              : 
     286              :         /* for previous versions set the default xlog seg size */
     287            0 :         if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     288              :         {
     289            0 :                 WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     290            0 :                 return true;
     291              :         }
     292              : 
     293            0 :         res = PQexec(conn, "SHOW wal_segment_size");
     294            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     295              :         {
     296            0 :                 pg_log_error("could not send replication command \"%s\": %s",
     297              :                                          "SHOW wal_segment_size", PQerrorMessage(conn));
     298              : 
     299            0 :                 PQclear(res);
     300            0 :                 return false;
     301              :         }
     302            0 :         if (PQntuples(res) != 1 || PQnfields(res) < 1)
     303              :         {
     304            0 :                 pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     305              :                                          PQntuples(res), PQnfields(res), 1, 1);
     306              : 
     307            0 :                 PQclear(res);
     308            0 :                 return false;
     309              :         }
     310              : 
     311              :         /* fetch xlog value and unit from the result */
     312            0 :         if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     313              :         {
     314            0 :                 pg_log_error("WAL segment size could not be parsed");
     315            0 :                 PQclear(res);
     316            0 :                 return false;
     317              :         }
     318              : 
     319            0 :         PQclear(res);
     320              : 
     321              :         /* set the multiplier based on unit to convert xlog_val to bytes */
     322            0 :         if (strcmp(xlog_unit, "MB") == 0)
     323            0 :                 multiplier = 1024 * 1024;
     324            0 :         else if (strcmp(xlog_unit, "GB") == 0)
     325            0 :                 multiplier = 1024 * 1024 * 1024;
     326              : 
     327              :         /* convert and set WalSegSz */
     328            0 :         WalSegSz = xlog_val * multiplier;
     329              : 
     330            0 :         if (!IsValidWalSegSize(WalSegSz))
     331              :         {
     332            0 :                 pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
     333              :                                                           "remote server reported invalid WAL segment size (%d bytes)",
     334              :                                                           WalSegSz),
     335              :                                          WalSegSz);
     336            0 :                 pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     337            0 :                 return false;
     338              :         }
     339              : 
     340            0 :         return true;
     341            0 : }
     342              : 
     343              : /*
     344              :  * RetrieveDataDirCreatePerm
     345              :  *
     346              :  * This function is used to determine the privileges on the server's PG data
     347              :  * directory and, based on that, set what the permissions will be for
     348              :  * directories and files we create.
     349              :  *
     350              :  * PG11 added support for (optionally) group read/execute rights to be set on
     351              :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     352              :  * on the data directory.
     353              :  */
     354              : static bool
     355            0 : RetrieveDataDirCreatePerm(PGconn *conn)
     356              : {
     357            0 :         PGresult   *res;
     358            0 :         int                     data_directory_mode;
     359              : 
     360              :         /* check connection existence */
     361            0 :         Assert(conn != NULL);
     362              : 
     363              :         /* for previous versions leave the default group access */
     364            0 :         if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     365            0 :                 return true;
     366              : 
     367            0 :         res = PQexec(conn, "SHOW data_directory_mode");
     368            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     369              :         {
     370            0 :                 pg_log_error("could not send replication command \"%s\": %s",
     371              :                                          "SHOW data_directory_mode", PQerrorMessage(conn));
     372              : 
     373            0 :                 PQclear(res);
     374            0 :                 return false;
     375              :         }
     376            0 :         if (PQntuples(res) != 1 || PQnfields(res) < 1)
     377              :         {
     378            0 :                 pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     379              :                                          PQntuples(res), PQnfields(res), 1, 1);
     380              : 
     381            0 :                 PQclear(res);
     382            0 :                 return false;
     383              :         }
     384              : 
     385            0 :         if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     386              :         {
     387            0 :                 pg_log_error("group access flag could not be parsed: %s",
     388              :                                          PQgetvalue(res, 0, 0));
     389              : 
     390            0 :                 PQclear(res);
     391            0 :                 return false;
     392              :         }
     393              : 
     394            0 :         SetDataDirectoryCreatePerm(data_directory_mode);
     395              : 
     396            0 :         PQclear(res);
     397            0 :         return true;
     398            0 : }
     399              : 
     400              : /*
     401              :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     402              :  * some result information if requested:
     403              :  * - System identifier
     404              :  * - Current timeline ID
     405              :  * - Start LSN position
     406              :  * - Database name (NULL in servers prior to 9.4)
     407              :  */
     408              : bool
     409            0 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     410              :                                   XLogRecPtr *startpos, char **db_name)
     411              : {
     412            0 :         PGresult   *res;
     413            0 :         uint32          hi,
     414              :                                 lo;
     415              : 
     416              :         /* Check connection existence */
     417            0 :         Assert(conn != NULL);
     418              : 
     419            0 :         res = PQexec(conn, "IDENTIFY_SYSTEM");
     420            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     421              :         {
     422            0 :                 pg_log_error("could not send replication command \"%s\": %s",
     423              :                                          "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     424              : 
     425            0 :                 PQclear(res);
     426            0 :                 return false;
     427              :         }
     428            0 :         if (PQntuples(res) != 1 || PQnfields(res) < 3)
     429              :         {
     430            0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     431              :                                          PQntuples(res), PQnfields(res), 1, 3);
     432              : 
     433            0 :                 PQclear(res);
     434            0 :                 return false;
     435              :         }
     436              : 
     437              :         /* Get system identifier */
     438            0 :         if (sysid != NULL)
     439            0 :                 *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     440              : 
     441              :         /* Get timeline ID to start streaming from */
     442            0 :         if (starttli != NULL)
     443            0 :                 *starttli = atoi(PQgetvalue(res, 0, 1));
     444              : 
     445              :         /* Get LSN start position if necessary */
     446            0 :         if (startpos != NULL)
     447              :         {
     448            0 :                 if (sscanf(PQgetvalue(res, 0, 2), "%X/%08X", &hi, &lo) != 2)
     449              :                 {
     450            0 :                         pg_log_error("could not parse write-ahead log location \"%s\"",
     451              :                                                  PQgetvalue(res, 0, 2));
     452              : 
     453            0 :                         PQclear(res);
     454            0 :                         return false;
     455              :                 }
     456            0 :                 *startpos = ((uint64) hi) << 32 | lo;
     457            0 :         }
     458              : 
     459              :         /* Get database name, only available in 9.4 and newer versions */
     460            0 :         if (db_name != NULL)
     461              :         {
     462            0 :                 *db_name = NULL;
     463            0 :                 if (PQserverVersion(conn) >= 90400)
     464              :                 {
     465            0 :                         if (PQnfields(res) < 4)
     466              :                         {
     467            0 :                                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     468              :                                                          PQntuples(res), PQnfields(res), 1, 4);
     469              : 
     470            0 :                                 PQclear(res);
     471            0 :                                 return false;
     472              :                         }
     473            0 :                         if (!PQgetisnull(res, 0, 3))
     474            0 :                                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     475            0 :                 }
     476            0 :         }
     477              : 
     478            0 :         PQclear(res);
     479            0 :         return true;
     480            0 : }
     481              : 
     482              : /*
     483              :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     484              :  * caller some result information if requested for this slot:
     485              :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     486              :  * - Current timeline ID, 0 if unknown.
     487              :  * Returns false on failure, and true otherwise.
     488              :  */
     489              : bool
     490            0 : GetSlotInformation(PGconn *conn, const char *slot_name,
     491              :                                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     492              : {
     493            0 :         PGresult   *res;
     494            0 :         PQExpBuffer query;
     495            0 :         XLogRecPtr      lsn_loc = InvalidXLogRecPtr;
     496            0 :         TimeLineID      tli_loc = 0;
     497              : 
     498            0 :         if (restart_lsn)
     499            0 :                 *restart_lsn = lsn_loc;
     500            0 :         if (restart_tli)
     501            0 :                 *restart_tli = tli_loc;
     502              : 
     503            0 :         query = createPQExpBuffer();
     504            0 :         appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
     505            0 :         res = PQexec(conn, query->data);
     506            0 :         destroyPQExpBuffer(query);
     507              : 
     508            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     509              :         {
     510            0 :                 pg_log_error("could not send replication command \"%s\": %s",
     511              :                                          "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     512            0 :                 PQclear(res);
     513            0 :                 return false;
     514              :         }
     515              : 
     516              :         /* The command should always return precisely one tuple and three fields */
     517            0 :         if (PQntuples(res) != 1 || PQnfields(res) != 3)
     518              :         {
     519            0 :                 pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     520              :                                          slot_name, PQntuples(res), PQnfields(res), 1, 3);
     521            0 :                 PQclear(res);
     522            0 :                 return false;
     523              :         }
     524              : 
     525              :         /*
     526              :          * When the slot doesn't exist, the command returns a tuple with NULL
     527              :          * values.  This checks only the slot type field.
     528              :          */
     529            0 :         if (PQgetisnull(res, 0, 0))
     530              :         {
     531            0 :                 pg_log_error("replication slot \"%s\" does not exist", slot_name);
     532            0 :                 PQclear(res);
     533            0 :                 return false;
     534              :         }
     535              : 
     536              :         /*
     537              :          * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     538              :          * physical slots, but play it safe.
     539              :          */
     540            0 :         if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     541              :         {
     542            0 :                 pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     543              :                                          PQgetvalue(res, 0, 0));
     544            0 :                 PQclear(res);
     545            0 :                 return false;
     546              :         }
     547              : 
     548              :         /* restart LSN */
     549            0 :         if (!PQgetisnull(res, 0, 1))
     550              :         {
     551            0 :                 uint32          hi,
     552              :                                         lo;
     553              : 
     554            0 :                 if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &hi, &lo) != 2)
     555              :                 {
     556            0 :                         pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     557              :                                                  PQgetvalue(res, 0, 1), slot_name);
     558            0 :                         PQclear(res);
     559            0 :                         return false;
     560              :                 }
     561            0 :                 lsn_loc = ((uint64) hi) << 32 | lo;
     562            0 :         }
     563              : 
     564              :         /* current TLI */
     565            0 :         if (!PQgetisnull(res, 0, 2))
     566            0 :                 tli_loc = (TimeLineID) atoll(PQgetvalue(res, 0, 2));
     567              : 
     568            0 :         PQclear(res);
     569              : 
     570              :         /* Assign results if requested */
     571            0 :         if (restart_lsn)
     572            0 :                 *restart_lsn = lsn_loc;
     573            0 :         if (restart_tli)
     574            0 :                 *restart_tli = tli_loc;
     575              : 
     576            0 :         return true;
     577            0 : }
     578              : 
     579              : /*
     580              :  * Create a replication slot for the given connection. This function
     581              :  * returns true in case of success.
     582              :  */
     583              : bool
     584            0 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     585              :                                           bool is_temporary, bool is_physical, bool reserve_wal,
     586              :                                           bool slot_exists_ok, bool two_phase, bool failover)
     587              : {
     588            0 :         PQExpBuffer query;
     589            0 :         PGresult   *res;
     590            0 :         bool            use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     591              : 
     592            0 :         query = createPQExpBuffer();
     593              : 
     594            0 :         Assert((is_physical && plugin == NULL) ||
     595              :                    (!is_physical && plugin != NULL));
     596            0 :         Assert(!(two_phase && is_physical));
     597            0 :         Assert(!(failover && is_physical));
     598            0 :         Assert(slot_name != NULL);
     599              : 
     600              :         /* Build base portion of query */
     601            0 :         appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
     602            0 :         if (is_temporary)
     603            0 :                 appendPQExpBufferStr(query, " TEMPORARY");
     604            0 :         if (is_physical)
     605            0 :                 appendPQExpBufferStr(query, " PHYSICAL");
     606              :         else
     607            0 :                 appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
     608              : 
     609              :         /* Add any requested options */
     610            0 :         if (use_new_option_syntax)
     611            0 :                 appendPQExpBufferStr(query, " (");
     612            0 :         if (is_physical)
     613              :         {
     614            0 :                 if (reserve_wal)
     615            0 :                         AppendPlainCommandOption(query, use_new_option_syntax,
     616              :                                                                          "RESERVE_WAL");
     617            0 :         }
     618              :         else
     619              :         {
     620            0 :                 if (failover && PQserverVersion(conn) >= 170000)
     621            0 :                         AppendPlainCommandOption(query, use_new_option_syntax,
     622              :                                                                          "FAILOVER");
     623              : 
     624            0 :                 if (two_phase && PQserverVersion(conn) >= 150000)
     625            0 :                         AppendPlainCommandOption(query, use_new_option_syntax,
     626              :                                                                          "TWO_PHASE");
     627              : 
     628            0 :                 if (PQserverVersion(conn) >= 100000)
     629              :                 {
     630              :                         /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     631            0 :                         if (use_new_option_syntax)
     632            0 :                                 AppendStringCommandOption(query, use_new_option_syntax,
     633              :                                                                                   "SNAPSHOT", "nothing");
     634              :                         else
     635            0 :                                 AppendPlainCommandOption(query, use_new_option_syntax,
     636              :                                                                                  "NOEXPORT_SNAPSHOT");
     637            0 :                 }
     638              :         }
     639            0 :         if (use_new_option_syntax)
     640              :         {
     641              :                 /* Suppress option list if it would be empty, otherwise terminate */
     642            0 :                 if (query->data[query->len - 1] == '(')
     643              :                 {
     644            0 :                         query->len -= 2;
     645            0 :                         query->data[query->len] = '\0';
     646            0 :                 }
     647              :                 else
     648            0 :                         appendPQExpBufferChar(query, ')');
     649            0 :         }
     650              : 
     651              :         /* Now run the query */
     652            0 :         res = PQexec(conn, query->data);
     653            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     654              :         {
     655            0 :                 const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     656              : 
     657            0 :                 if (slot_exists_ok &&
     658            0 :                         sqlstate &&
     659            0 :                         strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     660              :                 {
     661            0 :                         destroyPQExpBuffer(query);
     662            0 :                         PQclear(res);
     663            0 :                         return true;
     664              :                 }
     665              :                 else
     666              :                 {
     667            0 :                         pg_log_error("could not send replication command \"%s\": %s",
     668              :                                                  query->data, PQerrorMessage(conn));
     669              : 
     670            0 :                         destroyPQExpBuffer(query);
     671            0 :                         PQclear(res);
     672            0 :                         return false;
     673              :                 }
     674            0 :         }
     675              : 
     676            0 :         if (PQntuples(res) != 1 || PQnfields(res) != 4)
     677              :         {
     678            0 :                 pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     679              :                                          slot_name,
     680              :                                          PQntuples(res), PQnfields(res), 1, 4);
     681              : 
     682            0 :                 destroyPQExpBuffer(query);
     683            0 :                 PQclear(res);
     684            0 :                 return false;
     685              :         }
     686              : 
     687            0 :         destroyPQExpBuffer(query);
     688            0 :         PQclear(res);
     689            0 :         return true;
     690            0 : }
     691              : 
     692              : /*
     693              :  * Drop a replication slot for the given connection. This function
     694              :  * returns true in case of success.
     695              :  */
     696              : bool
     697            0 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     698              : {
     699            0 :         PQExpBuffer query;
     700            0 :         PGresult   *res;
     701              : 
     702            0 :         Assert(slot_name != NULL);
     703              : 
     704            0 :         query = createPQExpBuffer();
     705              : 
     706              :         /* Build query */
     707            0 :         appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
     708            0 :                                           slot_name);
     709            0 :         res = PQexec(conn, query->data);
     710            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     711              :         {
     712            0 :                 pg_log_error("could not send replication command \"%s\": %s",
     713              :                                          query->data, PQerrorMessage(conn));
     714              : 
     715            0 :                 destroyPQExpBuffer(query);
     716            0 :                 PQclear(res);
     717            0 :                 return false;
     718              :         }
     719              : 
     720            0 :         if (PQntuples(res) != 0 || PQnfields(res) != 0)
     721              :         {
     722            0 :                 pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     723              :                                          slot_name,
     724              :                                          PQntuples(res), PQnfields(res), 0, 0);
     725              : 
     726            0 :                 destroyPQExpBuffer(query);
     727            0 :                 PQclear(res);
     728            0 :                 return false;
     729              :         }
     730              : 
     731            0 :         destroyPQExpBuffer(query);
     732            0 :         PQclear(res);
     733            0 :         return true;
     734            0 : }
     735              : 
     736              : /*
     737              :  * Append a "plain" option - one with no value - to a server command that
     738              :  * is being constructed.
     739              :  *
     740              :  * In the old syntax, all options were parser keywords, so you could just
     741              :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     742              :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     743              :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     744              :  */
     745              : void
     746            0 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     747              :                                                  char *option_name)
     748              : {
     749            0 :         if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     750              :         {
     751            0 :                 if (use_new_option_syntax)
     752            0 :                         appendPQExpBufferStr(buf, ", ");
     753              :                 else
     754            0 :                         appendPQExpBufferChar(buf, ' ');
     755            0 :         }
     756              : 
     757            0 :         appendPQExpBuffer(buf, " %s", option_name);
     758            0 : }
     759              : 
     760              : /*
     761              :  * Append an option with an associated string value to a server command that
     762              :  * is being constructed.
     763              :  *
     764              :  * See comments for AppendPlainCommandOption, above.
     765              :  */
     766              : void
     767            0 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     768              :                                                   char *option_name, char *option_value)
     769              : {
     770            0 :         AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     771              : 
     772            0 :         if (option_value != NULL)
     773              :         {
     774            0 :                 size_t          length = strlen(option_value);
     775            0 :                 char       *escaped_value = palloc(1 + 2 * length);
     776              : 
     777            0 :                 PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
     778            0 :                 appendPQExpBuffer(buf, " '%s'", escaped_value);
     779            0 :                 pfree(escaped_value);
     780            0 :         }
     781            0 : }
     782              : 
     783              : /*
     784              :  * Append an option with an associated integer value to a server command
     785              :  * is being constructed.
     786              :  *
     787              :  * See comments for AppendPlainCommandOption, above.
     788              :  */
     789              : void
     790            0 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     791              :                                                    char *option_name, int32 option_value)
     792              : {
     793            0 :         AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     794              : 
     795            0 :         appendPQExpBuffer(buf, " %d", option_value);
     796            0 : }
     797              : 
     798              : /*
     799              :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     800              :  * backend code.
     801              :  */
     802              : TimestampTz
     803            0 : feGetCurrentTimestamp(void)
     804              : {
     805            0 :         TimestampTz result;
     806            0 :         struct timeval tp;
     807              : 
     808            0 :         gettimeofday(&tp, NULL);
     809              : 
     810            0 :         result = (TimestampTz) tp.tv_sec -
     811              :                 ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     812            0 :         result = (result * USECS_PER_SEC) + tp.tv_usec;
     813              : 
     814            0 :         return result;
     815            0 : }
     816              : 
     817              : /*
     818              :  * Frontend version of TimestampDifference(), since we are not linked with
     819              :  * backend code.
     820              :  */
     821              : void
     822            0 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     823              :                                           long *secs, int *microsecs)
     824              : {
     825            0 :         TimestampTz diff = stop_time - start_time;
     826              : 
     827            0 :         if (diff <= 0)
     828              :         {
     829            0 :                 *secs = 0;
     830            0 :                 *microsecs = 0;
     831            0 :         }
     832              :         else
     833              :         {
     834            0 :                 *secs = (long) (diff / USECS_PER_SEC);
     835            0 :                 *microsecs = (int) (diff % USECS_PER_SEC);
     836              :         }
     837            0 : }
     838              : 
     839              : /*
     840              :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     841              :  * linked with backend code.
     842              :  */
     843              : bool
     844            0 : feTimestampDifferenceExceeds(TimestampTz start_time,
     845              :                                                          TimestampTz stop_time,
     846              :                                                          int msec)
     847              : {
     848            0 :         TimestampTz diff = stop_time - start_time;
     849              : 
     850            0 :         return (diff >= msec * INT64CONST(1000));
     851            0 : }
     852              : 
     853              : /*
     854              :  * Converts an int64 to network byte order.
     855              :  */
     856              : void
     857            0 : fe_sendint64(int64 i, char *buf)
     858              : {
     859            0 :         uint64          n64 = pg_hton64(i);
     860              : 
     861            0 :         memcpy(buf, &n64, sizeof(n64));
     862            0 : }
     863              : 
     864              : /*
     865              :  * Converts an int64 from network byte order to native format.
     866              :  */
     867              : int64
     868            0 : fe_recvint64(char *buf)
     869              : {
     870            0 :         uint64          n64;
     871              : 
     872            0 :         memcpy(&n64, buf, sizeof(n64));
     873              : 
     874            0 :         return pg_ntoh64(n64);
     875            0 : }
        

Generated by: LCOV version 2.3.2-1