LCOV - code coverage report
Current view: top level - src/bin/pg_rewind - libpq_source.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 277 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 13 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * libpq_source.c
       4              :  *        Functions for fetching files from a remote server via libpq.
       5              :  *
       6              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  *-------------------------------------------------------------------------
       9              :  */
      10              : #include "postgres_fe.h"
      11              : 
      12              : #include "catalog/pg_type_d.h"
      13              : #include "common/connect.h"
      14              : #include "file_ops.h"
      15              : #include "filemap.h"
      16              : #include "lib/stringinfo.h"
      17              : #include "pg_rewind.h"
      18              : #include "port/pg_bswap.h"
      19              : #include "rewind_source.h"
      20              : 
      21              : /*
      22              :  * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
      23              :  * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
      24              :  */
      25              : #define MAX_CHUNK_SIZE (1024 * 1024)
      26              : #define MAX_CHUNKS_PER_QUERY 1000
      27              : 
      28              : /* represents a request to fetch a piece of a file from the source */
      29              : typedef struct
      30              : {
      31              :         const char *path;                       /* path relative to data directory root */
      32              :         off_t           offset;
      33              :         size_t          length;
      34              : } fetch_range_request;
      35              : 
      36              : typedef struct
      37              : {
      38              :         rewind_source common;           /* common interface functions */
      39              : 
      40              :         PGconn     *conn;
      41              : 
      42              :         /*
      43              :          * Queue of chunks that have been requested with the queue_fetch_range()
      44              :          * function, but have not been fetched from the remote server yet.
      45              :          */
      46              :         int                     num_requests;
      47              :         fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
      48              : 
      49              :         /* temporary space for process_queued_fetch_requests() */
      50              :         StringInfoData paths;
      51              :         StringInfoData offsets;
      52              :         StringInfoData lengths;
      53              : } libpq_source;
      54              : 
      55              : static void init_libpq_conn(PGconn *conn);
      56              : static char *run_simple_query(PGconn *conn, const char *sql);
      57              : static void run_simple_command(PGconn *conn, const char *sql);
      58              : static void appendArrayEscapedString(StringInfo buf, const char *str);
      59              : 
      60              : static void process_queued_fetch_requests(libpq_source *src);
      61              : 
      62              : /* public interface functions */
      63              : static void libpq_traverse_files(rewind_source *source,
      64              :                                                                  process_file_callback_t callback);
      65              : static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len);
      66              : static void libpq_queue_fetch_range(rewind_source *source, const char *path,
      67              :                                                                         off_t off, size_t len);
      68              : static void libpq_finish_fetch(rewind_source *source);
      69              : static char *libpq_fetch_file(rewind_source *source, const char *path,
      70              :                                                           size_t *filesize);
      71              : static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
      72              : static void libpq_destroy(rewind_source *source);
      73              : 
      74              : /*
      75              :  * Create a new libpq source.
      76              :  *
      77              :  * The caller has already established the connection, but should not try
      78              :  * to use it while the source is active.
      79              :  */
      80              : rewind_source *
      81            0 : init_libpq_source(PGconn *conn)
      82              : {
      83            0 :         libpq_source *src;
      84              : 
      85            0 :         init_libpq_conn(conn);
      86              : 
      87            0 :         src = pg_malloc0(sizeof(libpq_source));
      88              : 
      89            0 :         src->common.traverse_files = libpq_traverse_files;
      90            0 :         src->common.fetch_file = libpq_fetch_file;
      91            0 :         src->common.queue_fetch_file = libpq_queue_fetch_file;
      92            0 :         src->common.queue_fetch_range = libpq_queue_fetch_range;
      93            0 :         src->common.finish_fetch = libpq_finish_fetch;
      94            0 :         src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
      95            0 :         src->common.destroy = libpq_destroy;
      96              : 
      97            0 :         src->conn = conn;
      98              : 
      99            0 :         initStringInfo(&src->paths);
     100            0 :         initStringInfo(&src->offsets);
     101            0 :         initStringInfo(&src->lengths);
     102              : 
     103            0 :         return &src->common;
     104            0 : }
     105              : 
     106              : /*
     107              :  * Initialize a libpq connection for use.
     108              :  */
     109              : static void
     110            0 : init_libpq_conn(PGconn *conn)
     111              : {
     112            0 :         PGresult   *res;
     113            0 :         char       *str;
     114              : 
     115              :         /* disable all types of timeouts */
     116            0 :         run_simple_command(conn, "SET statement_timeout = 0");
     117            0 :         run_simple_command(conn, "SET lock_timeout = 0");
     118            0 :         run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
     119            0 :         run_simple_command(conn, "SET transaction_timeout = 0");
     120              : 
     121              :         /*
     122              :          * we don't intend to do any updates, put the connection in read-only mode
     123              :          * to keep us honest
     124              :          */
     125            0 :         run_simple_command(conn, "SET default_transaction_read_only = on");
     126              : 
     127              :         /* secure search_path */
     128            0 :         res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     129            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     130            0 :                 pg_fatal("could not clear \"search_path\": %s",
     131              :                                  PQresultErrorMessage(res));
     132            0 :         PQclear(res);
     133              : 
     134              :         /*
     135              :          * Also check that full_page_writes is enabled.  We can get torn pages if
     136              :          * a page is modified while we read it with pg_read_binary_file(), and we
     137              :          * rely on full page images to fix them.
     138              :          */
     139            0 :         str = run_simple_query(conn, "SHOW full_page_writes");
     140            0 :         if (strcmp(str, "on") != 0)
     141            0 :                 pg_fatal("\"full_page_writes\" must be enabled in the source server");
     142            0 :         pg_free(str);
     143              : 
     144              :         /* Prepare a statement we'll use to fetch files */
     145            0 :         res = PQprepare(conn, "fetch_chunks_stmt",
     146              :                                         "SELECT path, begin,\n"
     147              :                                         "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
     148              :                                         "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
     149              :                                         3, NULL);
     150              : 
     151            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     152            0 :                 pg_fatal("could not prepare statement to fetch file contents: %s",
     153              :                                  PQresultErrorMessage(res));
     154            0 :         PQclear(res);
     155            0 : }
     156              : 
     157              : /*
     158              :  * Run a query that returns a single value.
     159              :  *
     160              :  * The result should be pg_free'd after use.
     161              :  */
     162              : static char *
     163            0 : run_simple_query(PGconn *conn, const char *sql)
     164              : {
     165            0 :         PGresult   *res;
     166            0 :         char       *result;
     167              : 
     168            0 :         res = PQexec(conn, sql);
     169              : 
     170            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     171            0 :                 pg_fatal("error running query (%s) on source server: %s",
     172              :                                  sql, PQresultErrorMessage(res));
     173              : 
     174              :         /* sanity check the result set */
     175            0 :         if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     176            0 :                 pg_fatal("unexpected result set from query");
     177              : 
     178            0 :         result = pg_strdup(PQgetvalue(res, 0, 0));
     179              : 
     180            0 :         PQclear(res);
     181              : 
     182            0 :         return result;
     183            0 : }
     184              : 
     185              : /*
     186              :  * Run a command.
     187              :  *
     188              :  * In the event of a failure, exit immediately.
     189              :  */
     190              : static void
     191            0 : run_simple_command(PGconn *conn, const char *sql)
     192              : {
     193            0 :         PGresult   *res;
     194              : 
     195            0 :         res = PQexec(conn, sql);
     196              : 
     197            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     198            0 :                 pg_fatal("error running query (%s) in source server: %s",
     199              :                                  sql, PQresultErrorMessage(res));
     200              : 
     201            0 :         PQclear(res);
     202            0 : }
     203              : 
     204              : /*
     205              :  * Call the pg_current_wal_insert_lsn() function in the remote system.
     206              :  */
     207              : static XLogRecPtr
     208            0 : libpq_get_current_wal_insert_lsn(rewind_source *source)
     209              : {
     210            0 :         PGconn     *conn = ((libpq_source *) source)->conn;
     211            0 :         XLogRecPtr      result;
     212            0 :         uint32          hi;
     213            0 :         uint32          lo;
     214            0 :         char       *val;
     215              : 
     216            0 :         val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
     217              : 
     218            0 :         if (sscanf(val, "%X/%08X", &hi, &lo) != 2)
     219            0 :                 pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
     220              : 
     221            0 :         result = ((uint64) hi) << 32 | lo;
     222              : 
     223            0 :         pg_free(val);
     224              : 
     225            0 :         return result;
     226            0 : }
     227              : 
     228              : /*
     229              :  * Get a list of all files in the data directory.
     230              :  */
     231              : static void
     232            0 : libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
     233              : {
     234            0 :         PGconn     *conn = ((libpq_source *) source)->conn;
     235            0 :         PGresult   *res;
     236            0 :         const char *sql;
     237            0 :         int                     i;
     238              : 
     239              :         /*
     240              :          * Create a recursive directory listing of the whole data directory.
     241              :          *
     242              :          * The WITH RECURSIVE part does most of the work. The second part gets the
     243              :          * targets of the symlinks in pg_tblspc directory.
     244              :          *
     245              :          * XXX: There is no backend function to get a symbolic link's target in
     246              :          * general, so if the admin has put any custom symbolic links in the data
     247              :          * directory, they won't be copied correctly.
     248              :          */
     249            0 :         sql =
     250              :                 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
     251              :                 "  SELECT '' AS path, filename, size, isdir FROM\n"
     252              :                 "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
     253              :                 "        pg_stat_file(fn.filename, true) AS this\n"
     254              :                 "  UNION ALL\n"
     255              :                 "  SELECT parent.path || parent.filename || '/' AS path,\n"
     256              :                 "         fn, this.size, this.isdir\n"
     257              :                 "  FROM files AS parent,\n"
     258              :                 "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
     259              :                 "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
     260              :                 "       WHERE parent.isdir = 't'\n"
     261              :                 ")\n"
     262              :                 "SELECT path || filename, size, isdir,\n"
     263              :                 "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
     264              :                 "FROM files\n"
     265              :                 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
     266              :                 "                             AND oid::text = files.filename\n";
     267            0 :         res = PQexec(conn, sql);
     268              : 
     269            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     270            0 :                 pg_fatal("could not fetch file list: %s",
     271              :                                  PQresultErrorMessage(res));
     272              : 
     273              :         /* sanity check the result set */
     274            0 :         if (PQnfields(res) != 4)
     275            0 :                 pg_fatal("unexpected result set while fetching file list");
     276              : 
     277              :         /* Read result to local variables */
     278            0 :         for (i = 0; i < PQntuples(res); i++)
     279              :         {
     280            0 :                 char       *path;
     281            0 :                 int64           filesize;
     282            0 :                 bool            isdir;
     283            0 :                 char       *link_target;
     284            0 :                 file_type_t type;
     285              : 
     286            0 :                 if (PQgetisnull(res, i, 1))
     287              :                 {
     288              :                         /*
     289              :                          * The file was removed from the server while the query was
     290              :                          * running. Ignore it.
     291              :                          */
     292            0 :                         continue;
     293              :                 }
     294              : 
     295            0 :                 path = PQgetvalue(res, i, 0);
     296            0 :                 filesize = atoll(PQgetvalue(res, i, 1));
     297            0 :                 isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
     298            0 :                 link_target = PQgetvalue(res, i, 3);
     299              : 
     300            0 :                 if (link_target[0])
     301              :                 {
     302              :                         /*
     303              :                          * In-place tablespaces are directories located in pg_tblspc/ with
     304              :                          * relative paths.
     305              :                          */
     306            0 :                         if (is_absolute_path(link_target))
     307            0 :                                 type = FILE_TYPE_SYMLINK;
     308              :                         else
     309            0 :                                 type = FILE_TYPE_DIRECTORY;
     310            0 :                 }
     311            0 :                 else if (isdir)
     312            0 :                         type = FILE_TYPE_DIRECTORY;
     313              :                 else
     314            0 :                         type = FILE_TYPE_REGULAR;
     315              : 
     316            0 :                 callback(path, type, filesize, link_target);
     317            0 :         }
     318            0 :         PQclear(res);
     319            0 : }
     320              : 
     321              : /*
     322              :  * Queue up a request to fetch a file from remote system.
     323              :  */
     324              : static void
     325            0 : libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
     326              : {
     327              :         /*
     328              :          * Truncate the target file immediately, and queue a request to fetch it
     329              :          * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
     330              :          * request fetching a full-sized chunk anyway, so that if the file has
     331              :          * become larger in the source system, after we scanned the source
     332              :          * directory, we still fetch the whole file. This only works for files up
     333              :          * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
     334              :          * and such that are changed every now and then, but not WAL-logged. For
     335              :          * larger files, we fetch up to the original size.
     336              :          *
     337              :          * Even with that mechanism, there is an inherent race condition if the
     338              :          * file is modified at the same instant that we're copying it, so that we
     339              :          * might copy a torn version of the file with one half from the old
     340              :          * version and another half from the new. But pg_basebackup has the same
     341              :          * problem, and it hasn't been a problem in practice.
     342              :          *
     343              :          * It might seem more natural to truncate the file later, when we receive
     344              :          * it from the source server, but then we'd need to track which
     345              :          * fetch-requests are for a whole file.
     346              :          */
     347            0 :         open_target_file(path, true);
     348            0 :         libpq_queue_fetch_range(source, path, 0, Max(len, MAX_CHUNK_SIZE));
     349            0 : }
     350              : 
     351              : /*
     352              :  * Queue up a request to fetch a piece of a file from remote system.
     353              :  */
     354              : static void
     355            0 : libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
     356              :                                                 size_t len)
     357              : {
     358            0 :         libpq_source *src = (libpq_source *) source;
     359              : 
     360              :         /*
     361              :          * Does this request happen to be a continuation of the previous chunk? If
     362              :          * so, merge it with the previous one.
     363              :          *
     364              :          * XXX: We use pointer equality to compare the path. That's good enough
     365              :          * for our purposes; the caller always passes the same pointer for the
     366              :          * same filename. If it didn't, we would fail to merge requests, but it
     367              :          * wouldn't affect correctness.
     368              :          */
     369            0 :         if (src->num_requests > 0)
     370              :         {
     371            0 :                 fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
     372              : 
     373            0 :                 if (prev->offset + prev->length == off &&
     374            0 :                         prev->length < MAX_CHUNK_SIZE &&
     375            0 :                         prev->path == path)
     376              :                 {
     377              :                         /*
     378              :                          * Extend the previous request to cover as much of this new
     379              :                          * request as possible, without exceeding MAX_CHUNK_SIZE.
     380              :                          */
     381            0 :                         size_t          thislen;
     382              : 
     383            0 :                         thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
     384            0 :                         prev->length += thislen;
     385              : 
     386            0 :                         off += thislen;
     387            0 :                         len -= thislen;
     388              : 
     389              :                         /*
     390              :                          * Fall through to create new requests for any remaining 'len'
     391              :                          * that didn't fit in the previous chunk.
     392              :                          */
     393            0 :                 }
     394            0 :         }
     395              : 
     396              :         /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
     397            0 :         while (len > 0)
     398              :         {
     399            0 :                 int32           thislen;
     400              : 
     401              :                 /* if the queue is full, perform all the work queued up so far */
     402            0 :                 if (src->num_requests == MAX_CHUNKS_PER_QUERY)
     403            0 :                         process_queued_fetch_requests(src);
     404              : 
     405            0 :                 thislen = Min(len, MAX_CHUNK_SIZE);
     406            0 :                 src->request_queue[src->num_requests].path = path;
     407            0 :                 src->request_queue[src->num_requests].offset = off;
     408            0 :                 src->request_queue[src->num_requests].length = thislen;
     409            0 :                 src->num_requests++;
     410              : 
     411            0 :                 off += thislen;
     412            0 :                 len -= thislen;
     413            0 :         }
     414            0 : }
     415              : 
     416              : /*
     417              :  * Fetch all the queued chunks and write them to the target data directory.
     418              :  */
     419              : static void
     420            0 : libpq_finish_fetch(rewind_source *source)
     421              : {
     422            0 :         process_queued_fetch_requests((libpq_source *) source);
     423            0 : }
     424              : 
     425              : static void
     426            0 : process_queued_fetch_requests(libpq_source *src)
     427              : {
     428            0 :         const char *params[3];
     429            0 :         PGresult   *res;
     430            0 :         int                     chunkno;
     431              : 
     432            0 :         if (src->num_requests == 0)
     433            0 :                 return;
     434              : 
     435            0 :         pg_log_debug("getting %d file chunks", src->num_requests);
     436              : 
     437              :         /*
     438              :          * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
     439              :          * the same length as parameters: paths, offsets and lengths. Construct
     440              :          * the string representations of them.
     441              :          */
     442            0 :         resetStringInfo(&src->paths);
     443            0 :         resetStringInfo(&src->offsets);
     444            0 :         resetStringInfo(&src->lengths);
     445              : 
     446            0 :         appendStringInfoChar(&src->paths, '{');
     447            0 :         appendStringInfoChar(&src->offsets, '{');
     448            0 :         appendStringInfoChar(&src->lengths, '{');
     449            0 :         for (int i = 0; i < src->num_requests; i++)
     450              :         {
     451            0 :                 fetch_range_request *rq = &src->request_queue[i];
     452              : 
     453            0 :                 if (i > 0)
     454              :                 {
     455            0 :                         appendStringInfoChar(&src->paths, ',');
     456            0 :                         appendStringInfoChar(&src->offsets, ',');
     457            0 :                         appendStringInfoChar(&src->lengths, ',');
     458            0 :                 }
     459              : 
     460            0 :                 appendArrayEscapedString(&src->paths, rq->path);
     461            0 :                 appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset);
     462            0 :                 appendStringInfo(&src->lengths, "%zu", rq->length);
     463            0 :         }
     464            0 :         appendStringInfoChar(&src->paths, '}');
     465            0 :         appendStringInfoChar(&src->offsets, '}');
     466            0 :         appendStringInfoChar(&src->lengths, '}');
     467              : 
     468              :         /*
     469              :          * Execute the prepared statement.
     470              :          */
     471            0 :         params[0] = src->paths.data;
     472            0 :         params[1] = src->offsets.data;
     473            0 :         params[2] = src->lengths.data;
     474              : 
     475            0 :         if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
     476            0 :                 pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
     477              : 
     478            0 :         if (PQsetSingleRowMode(src->conn) != 1)
     479            0 :                 pg_fatal("could not set libpq connection to single row mode");
     480              : 
     481              :         /*----
     482              :          * The result set is of format:
     483              :          *
     484              :          * path         text    -- path in the data directory, e.g "base/1/123"
     485              :          * begin        int8    -- offset within the file
     486              :          * chunk        bytea   -- file content
     487              :          *----
     488              :          */
     489            0 :         chunkno = 0;
     490            0 :         while ((res = PQgetResult(src->conn)) != NULL)
     491              :         {
     492            0 :                 fetch_range_request *rq = &src->request_queue[chunkno];
     493            0 :                 char       *filename;
     494            0 :                 int                     filenamelen;
     495            0 :                 int64           chunkoff;
     496            0 :                 int                     chunksize;
     497            0 :                 char       *chunk;
     498              : 
     499            0 :                 switch (PQresultStatus(res))
     500              :                 {
     501              :                         case PGRES_SINGLE_TUPLE:
     502              :                                 break;
     503              : 
     504              :                         case PGRES_TUPLES_OK:
     505            0 :                                 PQclear(res);
     506            0 :                                 continue;               /* final zero-row result */
     507              : 
     508              :                         default:
     509            0 :                                 pg_fatal("unexpected result while fetching remote files: %s",
     510              :                                                  PQresultErrorMessage(res));
     511            0 :                 }
     512              : 
     513            0 :                 if (chunkno > src->num_requests)
     514            0 :                         pg_fatal("received more data chunks than requested");
     515              : 
     516              :                 /* sanity check the result set */
     517            0 :                 if (PQnfields(res) != 3 || PQntuples(res) != 1)
     518            0 :                         pg_fatal("unexpected result set size while fetching remote files");
     519              : 
     520            0 :                 if (PQftype(res, 0) != TEXTOID ||
     521            0 :                         PQftype(res, 1) != INT8OID ||
     522            0 :                         PQftype(res, 2) != BYTEAOID)
     523              :                 {
     524            0 :                         pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
     525              :                                          PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
     526            0 :                 }
     527              : 
     528            0 :                 if (PQfformat(res, 0) != 1 &&
     529            0 :                         PQfformat(res, 1) != 1 &&
     530            0 :                         PQfformat(res, 2) != 1)
     531              :                 {
     532            0 :                         pg_fatal("unexpected result format while fetching remote files");
     533            0 :                 }
     534              : 
     535            0 :                 if (PQgetisnull(res, 0, 0) ||
     536            0 :                         PQgetisnull(res, 0, 1))
     537              :                 {
     538            0 :                         pg_fatal("unexpected null values in result while fetching remote files");
     539            0 :                 }
     540              : 
     541            0 :                 if (PQgetlength(res, 0, 1) != sizeof(int64))
     542            0 :                         pg_fatal("unexpected result length while fetching remote files");
     543              : 
     544              :                 /* Read result set to local variables */
     545            0 :                 memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
     546            0 :                 chunkoff = pg_ntoh64(chunkoff);
     547            0 :                 chunksize = PQgetlength(res, 0, 2);
     548              : 
     549            0 :                 filenamelen = PQgetlength(res, 0, 0);
     550            0 :                 filename = pg_malloc(filenamelen + 1);
     551            0 :                 memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
     552            0 :                 filename[filenamelen] = '\0';
     553              : 
     554            0 :                 chunk = PQgetvalue(res, 0, 2);
     555              : 
     556              :                 /*
     557              :                  * If a file has been deleted on the source, remove it on the target
     558              :                  * as well.  Note that multiple unlink() calls may happen on the same
     559              :                  * file if multiple data chunks are associated with it, hence ignore
     560              :                  * unconditionally anything missing.
     561              :                  */
     562            0 :                 if (PQgetisnull(res, 0, 2))
     563              :                 {
     564            0 :                         pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
     565              :                                                  filename);
     566            0 :                         remove_target_file(filename, true);
     567            0 :                 }
     568              :                 else
     569              :                 {
     570            0 :                         pg_log_debug("received chunk for file \"%s\", offset %" PRId64 ", size %d",
     571              :                                                  filename, chunkoff, chunksize);
     572              : 
     573            0 :                         if (strcmp(filename, rq->path) != 0)
     574              :                         {
     575            0 :                                 pg_fatal("received data for file \"%s\", when requested for \"%s\"",
     576              :                                                  filename, rq->path);
     577            0 :                         }
     578            0 :                         if (chunkoff != rq->offset)
     579            0 :                                 pg_fatal("received data at offset %" PRId64 " of file \"%s\", when requested for offset %lld",
     580              :                                                  chunkoff, rq->path, (long long int) rq->offset);
     581              : 
     582              :                         /*
     583              :                          * We should not receive more data than we requested, or
     584              :                          * pg_read_binary_file() messed up.  We could receive less,
     585              :                          * though, if the file was truncated in the source after we
     586              :                          * checked its size. That's OK, there should be a WAL record of
     587              :                          * the truncation, which will get replayed when you start the
     588              :                          * target system for the first time after pg_rewind has completed.
     589              :                          */
     590            0 :                         if (chunksize > rq->length)
     591            0 :                                 pg_fatal("received more than requested for file \"%s\"", rq->path);
     592              : 
     593            0 :                         open_target_file(filename, false);
     594              : 
     595            0 :                         write_target_range(chunk, chunkoff, chunksize);
     596              :                 }
     597              : 
     598            0 :                 pg_free(filename);
     599              : 
     600            0 :                 PQclear(res);
     601            0 :                 chunkno++;
     602            0 :         }
     603            0 :         if (chunkno != src->num_requests)
     604            0 :                 pg_fatal("unexpected number of data chunks received");
     605              : 
     606            0 :         src->num_requests = 0;
     607            0 : }
     608              : 
     609              : /*
     610              :  * Escape a string to be used as element in a text array constant
     611              :  */
     612              : static void
     613            0 : appendArrayEscapedString(StringInfo buf, const char *str)
     614              : {
     615            0 :         appendStringInfoCharMacro(buf, '\"');
     616            0 :         while (*str)
     617              :         {
     618            0 :                 char            ch = *str;
     619              : 
     620            0 :                 if (ch == '"' || ch == '\\')
     621            0 :                         appendStringInfoCharMacro(buf, '\\');
     622              : 
     623            0 :                 appendStringInfoCharMacro(buf, ch);
     624              : 
     625            0 :                 str++;
     626            0 :         }
     627            0 :         appendStringInfoCharMacro(buf, '\"');
     628            0 : }
     629              : 
     630              : /*
     631              :  * Fetch a single file as a malloc'd buffer.
     632              :  */
     633              : static char *
     634            0 : libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
     635              : {
     636            0 :         PGconn     *conn = ((libpq_source *) source)->conn;
     637            0 :         PGresult   *res;
     638            0 :         char       *result;
     639            0 :         int                     len;
     640            0 :         const char *paramValues[1];
     641              : 
     642            0 :         paramValues[0] = path;
     643            0 :         res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
     644            0 :                                            1, NULL, paramValues, NULL, NULL, 1);
     645              : 
     646            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     647            0 :                 pg_fatal("could not fetch remote file \"%s\": %s",
     648              :                                  path, PQresultErrorMessage(res));
     649              : 
     650              :         /* sanity check the result set */
     651            0 :         if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     652            0 :                 pg_fatal("unexpected result set while fetching remote file \"%s\"",
     653              :                                  path);
     654              : 
     655              :         /* Read result to local variables */
     656            0 :         len = PQgetlength(res, 0, 0);
     657            0 :         result = pg_malloc(len + 1);
     658            0 :         memcpy(result, PQgetvalue(res, 0, 0), len);
     659            0 :         result[len] = '\0';
     660              : 
     661            0 :         PQclear(res);
     662              : 
     663            0 :         pg_log_debug("fetched file \"%s\", length %d", path, len);
     664              : 
     665            0 :         if (filesize)
     666            0 :                 *filesize = len;
     667            0 :         return result;
     668            0 : }
     669              : 
     670              : /*
     671              :  * Close a libpq source.
     672              :  */
     673              : static void
     674            0 : libpq_destroy(rewind_source *source)
     675              : {
     676            0 :         libpq_source *src = (libpq_source *) source;
     677              : 
     678            0 :         pfree(src->paths.data);
     679            0 :         pfree(src->offsets.data);
     680            0 :         pfree(src->lengths.data);
     681            0 :         pfree(src);
     682              : 
     683              :         /* NOTE: we don't close the connection here, as it was not opened by us. */
     684            0 : }
        

Generated by: LCOV version 2.3.2-1