LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_receivewal.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 411 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 10 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_receivewal.c - receive streaming WAL data and write it
       4              :  *                                        to a local file.
       5              :  *
       6              :  * Author: Magnus Hagander <magnus@hagander.net>
       7              :  *
       8              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *                src/bin/pg_basebackup/pg_receivewal.c
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <dirent.h>
      18              : #include <limits.h>
      19              : #include <signal.h>
      20              : #include <sys/stat.h>
      21              : #include <unistd.h>
      22              : 
      23              : #ifdef USE_LZ4
      24              : #include <lz4frame.h>
      25              : #endif
      26              : #ifdef HAVE_LIBZ
      27              : #include <zlib.h>
      28              : #endif
      29              : 
      30              : #include "access/xlog_internal.h"
      31              : #include "common/file_perm.h"
      32              : #include "common/logging.h"
      33              : #include "fe_utils/option_utils.h"
      34              : #include "getopt_long.h"
      35              : #include "libpq-fe.h"
      36              : #include "receivelog.h"
      37              : #include "streamutil.h"
      38              : 
      39              : /* Time to sleep between reconnection attempts */
      40              : #define RECONNECT_SLEEP_TIME 5
      41              : 
      42              : /* Global options */
      43              : static char *basedir = NULL;
      44              : static int      verbose = 0;
      45              : static int      compresslevel = 0;
      46              : static bool noloop = false;
      47              : static int      standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      48              : static volatile sig_atomic_t time_to_stop = false;
      49              : static bool do_create_slot = false;
      50              : static bool slot_exists_ok = false;
      51              : static bool do_drop_slot = false;
      52              : static bool do_sync = true;
      53              : static bool synchronous = false;
      54              : static char *replication_slot = NULL;
      55              : static pg_compress_algorithm compression_algorithm = PG_COMPRESSION_NONE;
      56              : static XLogRecPtr endpos = InvalidXLogRecPtr;
      57              : 
      58              : 
      59              : static void usage(void);
      60              : static DIR *get_destination_dir(char *dest_folder);
      61              : static void close_destination_dir(DIR *dest_dir, char *dest_folder);
      62              : static XLogRecPtr FindStreamingStart(uint32 *tli);
      63              : static void StreamLog(void);
      64              : static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline,
      65              :                                                    bool segment_finished);
      66              : 
      67              : static void
      68            0 : disconnect_atexit(void)
      69              : {
      70            0 :         if (conn != NULL)
      71            0 :                 PQfinish(conn);
      72            0 : }
      73              : 
      74              : static void
      75            0 : usage(void)
      76              : {
      77            0 :         printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
      78              :                    progname);
      79            0 :         printf(_("Usage:\n"));
      80            0 :         printf(_("  %s [OPTION]...\n"), progname);
      81            0 :         printf(_("\nOptions:\n"));
      82            0 :         printf(_("  -D, --directory=DIR    receive write-ahead log files into this directory\n"));
      83            0 :         printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      84            0 :         printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
      85            0 :         printf(_("  -n, --no-loop          do not loop on connection lost\n"));
      86            0 :         printf(_("      --no-sync          do not wait for changes to be written safely to disk\n"));
      87            0 :         printf(_("  -s, --status-interval=SECS\n"
      88              :                          "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
      89            0 :         printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
      90            0 :         printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
      91            0 :         printf(_("  -v, --verbose          output verbose messages\n"));
      92            0 :         printf(_("  -V, --version          output version information, then exit\n"));
      93            0 :         printf(_("  -Z, --compress=METHOD[:DETAIL]\n"
      94              :                          "                         compress as specified\n"));
      95            0 :         printf(_("  -?, --help             show this help, then exit\n"));
      96            0 :         printf(_("\nConnection options:\n"));
      97            0 :         printf(_("  -d, --dbname=CONNSTR   connection string\n"));
      98            0 :         printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
      99            0 :         printf(_("  -p, --port=PORT        database server port number\n"));
     100            0 :         printf(_("  -U, --username=NAME    connect as specified database user\n"));
     101            0 :         printf(_("  -w, --no-password      never prompt for password\n"));
     102            0 :         printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     103            0 :         printf(_("\nOptional actions:\n"));
     104            0 :         printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
     105            0 :         printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
     106            0 :         printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     107            0 :         printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     108            0 : }
     109              : 
     110              : 
     111              : /*
     112              :  * Check if the filename looks like a WAL file, letting caller know if this
     113              :  * WAL segment is partial and/or compressed.
     114              :  */
     115              : static bool
     116            0 : is_xlogfilename(const char *filename, bool *ispartial,
     117              :                                 pg_compress_algorithm *wal_compression_algorithm)
     118              : {
     119            0 :         size_t          fname_len = strlen(filename);
     120            0 :         size_t          xlog_pattern_len = strspn(filename, "0123456789ABCDEF");
     121              : 
     122              :         /* File does not look like a WAL file */
     123            0 :         if (xlog_pattern_len != XLOG_FNAME_LEN)
     124            0 :                 return false;
     125              : 
     126              :         /* File looks like a completed uncompressed WAL file */
     127            0 :         if (fname_len == XLOG_FNAME_LEN)
     128              :         {
     129            0 :                 *ispartial = false;
     130            0 :                 *wal_compression_algorithm = PG_COMPRESSION_NONE;
     131            0 :                 return true;
     132              :         }
     133              : 
     134              :         /* File looks like a completed gzip-compressed WAL file */
     135            0 :         if (fname_len == XLOG_FNAME_LEN + strlen(".gz") &&
     136            0 :                 strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0)
     137              :         {
     138            0 :                 *ispartial = false;
     139            0 :                 *wal_compression_algorithm = PG_COMPRESSION_GZIP;
     140            0 :                 return true;
     141              :         }
     142              : 
     143              :         /* File looks like a completed LZ4-compressed WAL file */
     144            0 :         if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
     145            0 :                 strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
     146              :         {
     147            0 :                 *ispartial = false;
     148            0 :                 *wal_compression_algorithm = PG_COMPRESSION_LZ4;
     149            0 :                 return true;
     150              :         }
     151              : 
     152              :         /* File looks like a partial uncompressed WAL file */
     153            0 :         if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
     154            0 :                 strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
     155              :         {
     156            0 :                 *ispartial = true;
     157            0 :                 *wal_compression_algorithm = PG_COMPRESSION_NONE;
     158            0 :                 return true;
     159              :         }
     160              : 
     161              :         /* File looks like a partial gzip-compressed WAL file */
     162            0 :         if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") &&
     163            0 :                 strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0)
     164              :         {
     165            0 :                 *ispartial = true;
     166            0 :                 *wal_compression_algorithm = PG_COMPRESSION_GZIP;
     167            0 :                 return true;
     168              :         }
     169              : 
     170              :         /* File looks like a partial LZ4-compressed WAL file */
     171            0 :         if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
     172            0 :                 strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
     173              :         {
     174            0 :                 *ispartial = true;
     175            0 :                 *wal_compression_algorithm = PG_COMPRESSION_LZ4;
     176            0 :                 return true;
     177              :         }
     178              : 
     179              :         /* File does not look like something we know */
     180            0 :         return false;
     181            0 : }
     182              : 
     183              : static bool
     184            0 : stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
     185              : {
     186              :         static uint32 prevtimeline = 0;
     187              :         static XLogRecPtr prevpos = InvalidXLogRecPtr;
     188              : 
     189              :         /* we assume that we get called once at the end of each segment */
     190            0 :         if (verbose && segment_finished)
     191            0 :                 pg_log_info("finished segment at %X/%08X (timeline %u)",
     192              :                                         LSN_FORMAT_ARGS(xlogpos),
     193              :                                         timeline);
     194              : 
     195            0 :         if (XLogRecPtrIsValid(endpos) && endpos < xlogpos)
     196              :         {
     197            0 :                 if (verbose)
     198            0 :                         pg_log_info("stopped log streaming at %X/%08X (timeline %u)",
     199              :                                                 LSN_FORMAT_ARGS(xlogpos),
     200              :                                                 timeline);
     201            0 :                 time_to_stop = true;
     202            0 :                 return true;
     203              :         }
     204              : 
     205              :         /*
     206              :          * Note that we report the previous, not current, position here. After a
     207              :          * timeline switch, xlogpos points to the beginning of the segment because
     208              :          * that's where we always begin streaming. Reporting the end of previous
     209              :          * timeline isn't totally accurate, because the next timeline can begin
     210              :          * slightly before the end of the WAL that we received on the previous
     211              :          * timeline, but it's close enough for reporting purposes.
     212              :          */
     213            0 :         if (verbose && prevtimeline != 0 && prevtimeline != timeline)
     214            0 :                 pg_log_info("switched to timeline %u at %X/%08X",
     215              :                                         timeline,
     216              :                                         LSN_FORMAT_ARGS(prevpos));
     217              : 
     218            0 :         prevtimeline = timeline;
     219            0 :         prevpos = xlogpos;
     220              : 
     221            0 :         if (time_to_stop)
     222              :         {
     223            0 :                 if (verbose)
     224            0 :                         pg_log_info("received interrupt signal, exiting");
     225            0 :                 return true;
     226              :         }
     227            0 :         return false;
     228            0 : }
     229              : 
     230              : 
     231              : /*
     232              :  * Get destination directory.
     233              :  */
     234              : static DIR *
     235            0 : get_destination_dir(char *dest_folder)
     236              : {
     237            0 :         DIR                *dir;
     238              : 
     239            0 :         Assert(dest_folder != NULL);
     240            0 :         dir = opendir(dest_folder);
     241            0 :         if (dir == NULL)
     242            0 :                 pg_fatal("could not open directory \"%s\": %m", dest_folder);
     243              : 
     244            0 :         return dir;
     245            0 : }
     246              : 
     247              : 
     248              : /*
     249              :  * Close existing directory.
     250              :  */
     251              : static void
     252            0 : close_destination_dir(DIR *dest_dir, char *dest_folder)
     253              : {
     254            0 :         Assert(dest_dir != NULL && dest_folder != NULL);
     255            0 :         if (closedir(dest_dir))
     256            0 :                 pg_fatal("could not close directory \"%s\": %m", dest_folder);
     257            0 : }
     258              : 
     259              : 
     260              : /*
     261              :  * Determine starting location for streaming, based on any existing xlog
     262              :  * segments in the directory. We start at the end of the last one that is
     263              :  * complete (size matches wal segment size), on the timeline with highest ID.
     264              :  *
     265              :  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
     266              :  */
     267              : static XLogRecPtr
     268            0 : FindStreamingStart(uint32 *tli)
     269              : {
     270            0 :         DIR                *dir;
     271            0 :         struct dirent *dirent;
     272            0 :         XLogSegNo       high_segno = 0;
     273            0 :         uint32          high_tli = 0;
     274            0 :         bool            high_ispartial = false;
     275              : 
     276            0 :         dir = get_destination_dir(basedir);
     277              : 
     278            0 :         while (errno = 0, (dirent = readdir(dir)) != NULL)
     279              :         {
     280            0 :                 uint32          tli;
     281            0 :                 XLogSegNo       segno;
     282            0 :                 pg_compress_algorithm wal_compression_algorithm;
     283            0 :                 bool            ispartial;
     284              : 
     285            0 :                 if (!is_xlogfilename(dirent->d_name,
     286              :                                                          &ispartial, &wal_compression_algorithm))
     287            0 :                         continue;
     288              : 
     289              :                 /*
     290              :                  * Looks like an xlog file. Parse its position.
     291              :                  */
     292            0 :                 XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
     293              : 
     294              :                 /*
     295              :                  * Check that the segment has the right size, if it's supposed to be
     296              :                  * completed.  For non-compressed segments just check the on-disk size
     297              :                  * and see if it matches a completed segment.  For gzip-compressed
     298              :                  * segments, look at the last 4 bytes of the compressed file, which is
     299              :                  * where the uncompressed size is located for files with a size lower
     300              :                  * than 4GB, and then compare it to the size of a completed segment.
     301              :                  * The 4 last bytes correspond to the ISIZE member according to
     302              :                  * http://www.zlib.org/rfc-gzip.html.
     303              :                  *
     304              :                  * For LZ4-compressed segments, uncompress the file in a throw-away
     305              :                  * buffer keeping track of the uncompressed size, then compare it to
     306              :                  * the size of a completed segment.  Per its protocol, LZ4 does not
     307              :                  * store the uncompressed size of an object by default.  contentSize
     308              :                  * is one possible way to do that, but we need to rely on a method
     309              :                  * where WAL segments could have been compressed by a different source
     310              :                  * than pg_receivewal, like an archive_command with lz4.
     311              :                  */
     312            0 :                 if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_NONE)
     313              :                 {
     314            0 :                         struct stat statbuf;
     315            0 :                         char            fullpath[MAXPGPATH * 2];
     316              : 
     317            0 :                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     318            0 :                         if (stat(fullpath, &statbuf) != 0)
     319            0 :                                 pg_fatal("could not stat file \"%s\": %m", fullpath);
     320              : 
     321            0 :                         if (statbuf.st_size != WalSegSz)
     322              :                         {
     323            0 :                                 pg_log_warning("segment file \"%s\" has incorrect size %lld, skipping",
     324              :                                                            dirent->d_name, (long long int) statbuf.st_size);
     325            0 :                                 continue;
     326              :                         }
     327            0 :                 }
     328            0 :                 else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_GZIP)
     329              :                 {
     330            0 :                         int                     fd;
     331            0 :                         char            buf[4];
     332            0 :                         int                     bytes_out;
     333            0 :                         char            fullpath[MAXPGPATH * 2];
     334            0 :                         int                     r;
     335              : 
     336            0 :                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     337              : 
     338            0 :                         fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
     339            0 :                         if (fd < 0)
     340            0 :                                 pg_fatal("could not open compressed file \"%s\": %m",
     341              :                                                  fullpath);
     342            0 :                         if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
     343            0 :                                 pg_fatal("could not seek in compressed file \"%s\": %m",
     344              :                                                  fullpath);
     345            0 :                         r = read(fd, buf, sizeof(buf));
     346            0 :                         if (r != sizeof(buf))
     347              :                         {
     348            0 :                                 if (r < 0)
     349            0 :                                         pg_fatal("could not read compressed file \"%s\": %m",
     350              :                                                          fullpath);
     351              :                                 else
     352            0 :                                         pg_fatal("could not read compressed file \"%s\": read %d of %zu",
     353              :                                                          fullpath, r, sizeof(buf));
     354            0 :                         }
     355              : 
     356            0 :                         close(fd);
     357            0 :                         bytes_out = (buf[3] << 24) | (buf[2] << 16) |
     358            0 :                                 (buf[1] << 8) | buf[0];
     359              : 
     360            0 :                         if (bytes_out != WalSegSz)
     361              :                         {
     362            0 :                                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
     363              :                                                            dirent->d_name, bytes_out);
     364            0 :                                 continue;
     365              :                         }
     366            0 :                 }
     367            0 :                 else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_LZ4)
     368              :                 {
     369              : #ifdef USE_LZ4
     370              : #define LZ4_CHUNK_SZ    64 * 1024       /* 64kB as maximum chunk size read */
     371            0 :                         int                     fd;
     372            0 :                         ssize_t         r;
     373            0 :                         size_t          uncompressed_size = 0;
     374            0 :                         char            fullpath[MAXPGPATH * 2];
     375            0 :                         char       *outbuf;
     376            0 :                         char       *readbuf;
     377            0 :                         LZ4F_decompressionContext_t ctx = NULL;
     378            0 :                         LZ4F_decompressOptions_t dec_opt;
     379            0 :                         LZ4F_errorCode_t status;
     380              : 
     381            0 :                         memset(&dec_opt, 0, sizeof(dec_opt));
     382            0 :                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     383              : 
     384            0 :                         fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
     385            0 :                         if (fd < 0)
     386            0 :                                 pg_fatal("could not open file \"%s\": %m", fullpath);
     387              : 
     388            0 :                         status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
     389            0 :                         if (LZ4F_isError(status))
     390            0 :                                 pg_fatal("could not create LZ4 decompression context: %s",
     391              :                                                  LZ4F_getErrorName(status));
     392              : 
     393            0 :                         outbuf = pg_malloc0(LZ4_CHUNK_SZ);
     394            0 :                         readbuf = pg_malloc0(LZ4_CHUNK_SZ);
     395            0 :                         do
     396              :                         {
     397            0 :                                 char       *readp;
     398            0 :                                 char       *readend;
     399              : 
     400            0 :                                 r = read(fd, readbuf, LZ4_CHUNK_SZ);
     401            0 :                                 if (r < 0)
     402            0 :                                         pg_fatal("could not read file \"%s\": %m", fullpath);
     403              : 
     404              :                                 /* Done reading the file */
     405            0 :                                 if (r == 0)
     406            0 :                                         break;
     407              : 
     408              :                                 /* Process one chunk */
     409            0 :                                 readp = readbuf;
     410            0 :                                 readend = readbuf + r;
     411            0 :                                 while (readp < readend)
     412              :                                 {
     413            0 :                                         size_t          out_size = LZ4_CHUNK_SZ;
     414            0 :                                         size_t          read_size = readend - readp;
     415              : 
     416            0 :                                         memset(outbuf, 0, LZ4_CHUNK_SZ);
     417            0 :                                         status = LZ4F_decompress(ctx, outbuf, &out_size,
     418            0 :                                                                                          readp, &read_size, &dec_opt);
     419            0 :                                         if (LZ4F_isError(status))
     420            0 :                                                 pg_fatal("could not decompress file \"%s\": %s",
     421              :                                                                  fullpath,
     422              :                                                                  LZ4F_getErrorName(status));
     423              : 
     424            0 :                                         readp += read_size;
     425            0 :                                         uncompressed_size += out_size;
     426            0 :                                 }
     427              : 
     428              :                                 /*
     429              :                                  * No need to continue reading the file when the
     430              :                                  * uncompressed_size exceeds WalSegSz, even if there are still
     431              :                                  * data left to read. However, if uncompressed_size is equal
     432              :                                  * to WalSegSz, it should verify that there is no more data to
     433              :                                  * read.
     434              :                                  */
     435            0 :                         } while (uncompressed_size <= WalSegSz && r > 0);
     436              : 
     437            0 :                         close(fd);
     438            0 :                         pg_free(outbuf);
     439            0 :                         pg_free(readbuf);
     440              : 
     441            0 :                         status = LZ4F_freeDecompressionContext(ctx);
     442            0 :                         if (LZ4F_isError(status))
     443            0 :                                 pg_fatal("could not free LZ4 decompression context: %s",
     444              :                                                  LZ4F_getErrorName(status));
     445              : 
     446            0 :                         if (uncompressed_size != WalSegSz)
     447              :                         {
     448            0 :                                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %zu, skipping",
     449              :                                                            dirent->d_name, uncompressed_size);
     450            0 :                                 continue;
     451              :                         }
     452              : #else
     453              :                         pg_log_error("cannot check file \"%s\": compression with %s not supported by this build",
     454              :                                                  dirent->d_name, "LZ4");
     455              :                         exit(1);
     456              : #endif
     457            0 :                 }
     458              : 
     459              :                 /* Looks like a valid segment. Remember that we saw it. */
     460            0 :                 if ((segno > high_segno) ||
     461            0 :                         (segno == high_segno && tli > high_tli) ||
     462            0 :                         (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
     463              :                 {
     464            0 :                         high_segno = segno;
     465            0 :                         high_tli = tli;
     466            0 :                         high_ispartial = ispartial;
     467            0 :                 }
     468            0 :         }
     469              : 
     470            0 :         if (errno)
     471            0 :                 pg_fatal("could not read directory \"%s\": %m", basedir);
     472              : 
     473            0 :         close_destination_dir(dir, basedir);
     474              : 
     475            0 :         if (high_segno > 0)
     476              :         {
     477            0 :                 XLogRecPtr      high_ptr;
     478              : 
     479              :                 /*
     480              :                  * Move the starting pointer to the start of the next segment, if the
     481              :                  * highest one we saw was completed. Otherwise start streaming from
     482              :                  * the beginning of the .partial segment.
     483              :                  */
     484            0 :                 if (!high_ispartial)
     485            0 :                         high_segno++;
     486              : 
     487            0 :                 XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
     488              : 
     489            0 :                 *tli = high_tli;
     490            0 :                 return high_ptr;
     491            0 :         }
     492              :         else
     493            0 :                 return InvalidXLogRecPtr;
     494            0 : }
     495              : 
     496              : /*
     497              :  * Start the log streaming
     498              :  */
     499              : static void
     500            0 : StreamLog(void)
     501              : {
     502            0 :         XLogRecPtr      serverpos;
     503            0 :         TimeLineID      servertli;
     504            0 :         StreamCtl       stream = {0};
     505            0 :         char       *sysidentifier;
     506              : 
     507              :         /*
     508              :          * Connect in replication mode to the server
     509              :          */
     510            0 :         if (conn == NULL)
     511            0 :                 conn = GetConnection();
     512            0 :         if (!conn)
     513              :                 /* Error message already written in GetConnection() */
     514            0 :                 return;
     515              : 
     516            0 :         if (!CheckServerVersionForStreaming(conn))
     517              :         {
     518              :                 /*
     519              :                  * Error message already written in CheckServerVersionForStreaming().
     520              :                  * There's no hope of recovering from a version mismatch, so don't
     521              :                  * retry.
     522              :                  */
     523            0 :                 exit(1);
     524              :         }
     525              : 
     526              :         /*
     527              :          * Identify server, obtaining start LSN position and current timeline ID
     528              :          * at the same time, necessary if not valid data can be found in the
     529              :          * existing output directory.
     530              :          */
     531            0 :         if (!RunIdentifySystem(conn, &sysidentifier, &servertli, &serverpos, NULL))
     532            0 :                 exit(1);
     533              : 
     534              :         /*
     535              :          * Figure out where to start streaming.  First scan the local directory.
     536              :          */
     537            0 :         stream.startpos = FindStreamingStart(&stream.timeline);
     538            0 :         if (!XLogRecPtrIsValid(stream.startpos))
     539              :         {
     540              :                 /*
     541              :                  * Try to get the starting point from the slot if any.  This is
     542              :                  * supported in PostgreSQL 15 and newer.
     543              :                  */
     544            0 :                 if (replication_slot != NULL &&
     545            0 :                         PQserverVersion(conn) >= 150000)
     546              :                 {
     547            0 :                         if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
     548            0 :                                                                         &stream.timeline))
     549              :                         {
     550              :                                 /* Error is logged by GetSlotInformation() */
     551            0 :                                 return;
     552              :                         }
     553            0 :                 }
     554              : 
     555              :                 /*
     556              :                  * If it the starting point is still not known, use the current WAL
     557              :                  * flush value as last resort.
     558              :                  */
     559            0 :                 if (!XLogRecPtrIsValid(stream.startpos))
     560              :                 {
     561            0 :                         stream.startpos = serverpos;
     562            0 :                         stream.timeline = servertli;
     563            0 :                 }
     564            0 :         }
     565              : 
     566            0 :         Assert(XLogRecPtrIsValid(stream.startpos) &&
     567              :                    stream.timeline != 0);
     568              : 
     569              :         /*
     570              :          * Always start streaming at the beginning of a segment
     571              :          */
     572            0 :         stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
     573              : 
     574              :         /*
     575              :          * Start the replication
     576              :          */
     577            0 :         if (verbose)
     578            0 :                 pg_log_info("starting log streaming at %X/%08X (timeline %u)",
     579              :                                         LSN_FORMAT_ARGS(stream.startpos),
     580              :                                         stream.timeline);
     581              : 
     582            0 :         stream.stream_stop = stop_streaming;
     583            0 :         stream.stop_socket = PGINVALID_SOCKET;
     584            0 :         stream.standby_message_timeout = standby_message_timeout;
     585            0 :         stream.synchronous = synchronous;
     586            0 :         stream.do_sync = do_sync;
     587            0 :         stream.mark_done = false;
     588            0 :         stream.walmethod = CreateWalDirectoryMethod(basedir,
     589            0 :                                                                                                 compression_algorithm,
     590            0 :                                                                                                 compresslevel,
     591            0 :                                                                                                 stream.do_sync);
     592            0 :         stream.partial_suffix = ".partial";
     593            0 :         stream.replication_slot = replication_slot;
     594            0 :         stream.sysidentifier = sysidentifier;
     595              : 
     596            0 :         ReceiveXlogStream(conn, &stream);
     597              : 
     598            0 :         if (!stream.walmethod->ops->finish(stream.walmethod))
     599              :         {
     600            0 :                 pg_log_info("could not finish writing WAL files: %m");
     601            0 :                 return;
     602              :         }
     603              : 
     604            0 :         PQfinish(conn);
     605            0 :         conn = NULL;
     606              : 
     607            0 :         stream.walmethod->ops->free(stream.walmethod);
     608            0 : }
     609              : 
     610              : /*
     611              :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
     612              :  * possible moment.
     613              :  */
     614              : #ifndef WIN32
     615              : 
     616              : static void
     617            0 : sigexit_handler(SIGNAL_ARGS)
     618              : {
     619            0 :         time_to_stop = true;
     620            0 : }
     621              : #endif
     622              : 
     623              : int
     624            0 : main(int argc, char **argv)
     625              : {
     626              :         static struct option long_options[] = {
     627              :                 {"help", no_argument, NULL, '?'},
     628              :                 {"version", no_argument, NULL, 'V'},
     629              :                 {"directory", required_argument, NULL, 'D'},
     630              :                 {"dbname", required_argument, NULL, 'd'},
     631              :                 {"endpos", required_argument, NULL, 'E'},
     632              :                 {"host", required_argument, NULL, 'h'},
     633              :                 {"port", required_argument, NULL, 'p'},
     634              :                 {"username", required_argument, NULL, 'U'},
     635              :                 {"no-loop", no_argument, NULL, 'n'},
     636              :                 {"no-password", no_argument, NULL, 'w'},
     637              :                 {"password", no_argument, NULL, 'W'},
     638              :                 {"status-interval", required_argument, NULL, 's'},
     639              :                 {"slot", required_argument, NULL, 'S'},
     640              :                 {"verbose", no_argument, NULL, 'v'},
     641              :                 {"compress", required_argument, NULL, 'Z'},
     642              : /* action */
     643              :                 {"create-slot", no_argument, NULL, 1},
     644              :                 {"drop-slot", no_argument, NULL, 2},
     645              :                 {"if-not-exists", no_argument, NULL, 3},
     646              :                 {"synchronous", no_argument, NULL, 4},
     647              :                 {"no-sync", no_argument, NULL, 5},
     648              :                 {NULL, 0, NULL, 0}
     649              :         };
     650              : 
     651            0 :         int                     c;
     652            0 :         int                     option_index;
     653            0 :         char       *db_name;
     654            0 :         uint32          hi,
     655              :                                 lo;
     656            0 :         pg_compress_specification compression_spec;
     657            0 :         char       *compression_detail = NULL;
     658            0 :         char       *compression_algorithm_str = "none";
     659            0 :         char       *error_detail = NULL;
     660              : 
     661            0 :         pg_logging_init(argv[0]);
     662            0 :         progname = get_progname(argv[0]);
     663            0 :         set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     664              : 
     665            0 :         if (argc > 1)
     666              :         {
     667            0 :                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     668              :                 {
     669            0 :                         usage();
     670            0 :                         exit(0);
     671              :                 }
     672            0 :                 else if (strcmp(argv[1], "-V") == 0 ||
     673            0 :                                  strcmp(argv[1], "--version") == 0)
     674              :                 {
     675            0 :                         puts("pg_receivewal (PostgreSQL) " PG_VERSION);
     676            0 :                         exit(0);
     677              :                 }
     678            0 :         }
     679              : 
     680            0 :         while ((c = getopt_long(argc, argv, "d:D:E:h:np:s:S:U:vwWZ:",
     681            0 :                                                         long_options, &option_index)) != -1)
     682              :         {
     683            0 :                 switch (c)
     684              :                 {
     685              :                         case 'd':
     686            0 :                                 connection_string = pg_strdup(optarg);
     687            0 :                                 break;
     688              :                         case 'D':
     689            0 :                                 basedir = pg_strdup(optarg);
     690            0 :                                 break;
     691              :                         case 'E':
     692            0 :                                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
     693            0 :                                         pg_fatal("could not parse end position \"%s\"", optarg);
     694            0 :                                 endpos = ((uint64) hi) << 32 | lo;
     695            0 :                                 break;
     696              :                         case 'h':
     697            0 :                                 dbhost = pg_strdup(optarg);
     698            0 :                                 break;
     699              :                         case 'n':
     700            0 :                                 noloop = true;
     701            0 :                                 break;
     702              :                         case 'p':
     703            0 :                                 dbport = pg_strdup(optarg);
     704            0 :                                 break;
     705              :                         case 's':
     706            0 :                                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
     707              :                                                                           INT_MAX / 1000,
     708              :                                                                           &standby_message_timeout))
     709            0 :                                         exit(1);
     710            0 :                                 standby_message_timeout *= 1000;
     711            0 :                                 break;
     712              :                         case 'S':
     713            0 :                                 replication_slot = pg_strdup(optarg);
     714            0 :                                 break;
     715              :                         case 'U':
     716            0 :                                 dbuser = pg_strdup(optarg);
     717            0 :                                 break;
     718              :                         case 'v':
     719            0 :                                 verbose++;
     720            0 :                                 break;
     721              :                         case 'w':
     722            0 :                                 dbgetpassword = -1;
     723            0 :                                 break;
     724              :                         case 'W':
     725            0 :                                 dbgetpassword = 1;
     726            0 :                                 break;
     727              :                         case 'Z':
     728            0 :                                 parse_compress_options(optarg, &compression_algorithm_str,
     729              :                                                                            &compression_detail);
     730            0 :                                 break;
     731              :                         case 1:
     732            0 :                                 do_create_slot = true;
     733            0 :                                 break;
     734              :                         case 2:
     735            0 :                                 do_drop_slot = true;
     736            0 :                                 break;
     737              :                         case 3:
     738            0 :                                 slot_exists_ok = true;
     739            0 :                                 break;
     740              :                         case 4:
     741            0 :                                 synchronous = true;
     742            0 :                                 break;
     743              :                         case 5:
     744            0 :                                 do_sync = false;
     745            0 :                                 break;
     746              :                         default:
     747              :                                 /* getopt_long already emitted a complaint */
     748            0 :                                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     749            0 :                                 exit(1);
     750              :                 }
     751              :         }
     752              : 
     753              :         /*
     754              :          * Any non-option arguments?
     755              :          */
     756            0 :         if (optind < argc)
     757              :         {
     758            0 :                 pg_log_error("too many command-line arguments (first is \"%s\")",
     759              :                                          argv[optind]);
     760            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     761            0 :                 exit(1);
     762              :         }
     763              : 
     764            0 :         if (do_drop_slot && do_create_slot)
     765              :         {
     766            0 :                 pg_log_error("cannot use --create-slot together with --drop-slot");
     767            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     768            0 :                 exit(1);
     769              :         }
     770              : 
     771            0 :         if (replication_slot == NULL && (do_drop_slot || do_create_slot))
     772              :         {
     773              :                 /* translator: second %s is an option name */
     774            0 :                 pg_log_error("%s needs a slot to be specified using --slot",
     775              :                                          do_drop_slot ? "--drop-slot" : "--create-slot");
     776            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     777            0 :                 exit(1);
     778              :         }
     779              : 
     780            0 :         if (synchronous && !do_sync)
     781              :         {
     782            0 :                 pg_log_error("cannot use --synchronous together with --no-sync");
     783            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     784            0 :                 exit(1);
     785              :         }
     786              : 
     787              :         /*
     788              :          * Required arguments
     789              :          */
     790            0 :         if (basedir == NULL && !do_drop_slot && !do_create_slot)
     791              :         {
     792            0 :                 pg_log_error("no target directory specified");
     793            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     794            0 :                 exit(1);
     795              :         }
     796              : 
     797              :         /*
     798              :          * Compression options
     799              :          */
     800            0 :         if (!parse_compress_algorithm(compression_algorithm_str,
     801              :                                                                   &compression_algorithm))
     802            0 :                 pg_fatal("unrecognized compression algorithm: \"%s\"",
     803              :                                  compression_algorithm_str);
     804              : 
     805            0 :         parse_compress_specification(compression_algorithm, compression_detail,
     806              :                                                                  &compression_spec);
     807            0 :         error_detail = validate_compress_specification(&compression_spec);
     808            0 :         if (error_detail != NULL)
     809            0 :                 pg_fatal("invalid compression specification: %s",
     810              :                                  error_detail);
     811              : 
     812              :         /* Extract the compression level */
     813            0 :         compresslevel = compression_spec.level;
     814              : 
     815            0 :         if (compression_algorithm == PG_COMPRESSION_ZSTD)
     816            0 :                 pg_fatal("compression with %s is not yet supported", "ZSTD");
     817              : 
     818              :         /*
     819              :          * Check existence of destination folder.
     820              :          */
     821            0 :         if (!do_drop_slot && !do_create_slot)
     822              :         {
     823            0 :                 DIR                *dir = get_destination_dir(basedir);
     824              : 
     825            0 :                 close_destination_dir(dir, basedir);
     826            0 :         }
     827              : 
     828              :         /*
     829              :          * Obtain a connection before doing anything.
     830              :          */
     831            0 :         conn = GetConnection();
     832            0 :         if (!conn)
     833              :                 /* error message already written in GetConnection() */
     834            0 :                 exit(1);
     835            0 :         atexit(disconnect_atexit);
     836              : 
     837              :         /*
     838              :          * Trap signals.  (Don't do this until after the initial password prompt,
     839              :          * if one is needed, in GetConnection.)
     840              :          */
     841              : #ifndef WIN32
     842            0 :         pqsignal(SIGINT, sigexit_handler);
     843            0 :         pqsignal(SIGTERM, sigexit_handler);
     844              : #endif
     845              : 
     846              :         /*
     847              :          * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
     848              :          * replication connection and haven't connected using a database specific
     849              :          * connection.
     850              :          */
     851            0 :         if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     852            0 :                 exit(1);
     853              : 
     854              :         /*
     855              :          * Check that there is a database associated with connection, none should
     856              :          * be defined in this context.
     857              :          */
     858            0 :         if (db_name)
     859            0 :                 pg_fatal("replication connection using slot \"%s\" is unexpectedly database specific",
     860              :                                  replication_slot);
     861              : 
     862              :         /*
     863              :          * Set umask so that directories/files are created with the same
     864              :          * permissions as directories/files in the source data directory.
     865              :          *
     866              :          * pg_mode_mask is set to owner-only by default and then updated in
     867              :          * GetConnection() where we get the mode from the server-side with
     868              :          * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     869              :          */
     870            0 :         umask(pg_mode_mask);
     871              : 
     872              :         /*
     873              :          * Drop a replication slot.
     874              :          */
     875            0 :         if (do_drop_slot)
     876              :         {
     877            0 :                 if (verbose)
     878            0 :                         pg_log_info("dropping replication slot \"%s\"", replication_slot);
     879              : 
     880            0 :                 if (!DropReplicationSlot(conn, replication_slot))
     881            0 :                         exit(1);
     882            0 :                 exit(0);
     883              :         }
     884              : 
     885              :         /* Create a replication slot */
     886            0 :         if (do_create_slot)
     887              :         {
     888            0 :                 if (verbose)
     889            0 :                         pg_log_info("creating replication slot \"%s\"", replication_slot);
     890              : 
     891            0 :                 if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
     892            0 :                                                                    slot_exists_ok, false, false))
     893            0 :                         exit(1);
     894            0 :                 exit(0);
     895              :         }
     896              : 
     897              :         /* determine remote server's xlog segment size */
     898            0 :         if (!RetrieveWalSegSize(conn))
     899            0 :                 exit(1);
     900              : 
     901              :         /*
     902              :          * Don't close the connection here so that subsequent StreamLog() can
     903              :          * reuse it.
     904              :          */
     905              : 
     906            0 :         while (true)
     907              :         {
     908            0 :                 StreamLog();
     909            0 :                 if (time_to_stop)
     910              :                 {
     911              :                         /*
     912              :                          * We've been Ctrl-C'ed or end of streaming position has been
     913              :                          * willingly reached, so exit without an error code.
     914              :                          */
     915            0 :                         exit(0);
     916              :                 }
     917            0 :                 else if (noloop)
     918            0 :                         pg_fatal("disconnected");
     919              :                 else
     920              :                 {
     921              :                         /* translator: check source for value for %d */
     922            0 :                         pg_log_info("disconnected; waiting %d seconds to try again",
     923              :                                                 RECONNECT_SLEEP_TIME);
     924            0 :                         pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
     925              :                 }
     926              :         }
     927              : }
        

Generated by: LCOV version 2.3.2-1