LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 515 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 10 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
       4              :  *                                        fashion and write it to a local file.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *                src/bin/pg_basebackup/pg_recvlogical.c
      10              :  *-------------------------------------------------------------------------
      11              :  */
      12              : 
      13              : #include "postgres_fe.h"
      14              : 
      15              : #include <dirent.h>
      16              : #include <limits.h>
      17              : #include <sys/select.h>
      18              : #include <sys/stat.h>
      19              : #include <unistd.h>
      20              : 
      21              : #include "common/file_perm.h"
      22              : #include "common/logging.h"
      23              : #include "fe_utils/option_utils.h"
      24              : #include "getopt_long.h"
      25              : #include "libpq-fe.h"
      26              : #include "libpq/pqsignal.h"
      27              : #include "libpq/protocol.h"
      28              : #include "pqexpbuffer.h"
      29              : #include "streamutil.h"
      30              : 
      31              : /* Time to sleep between reconnection attempts */
      32              : #define RECONNECT_SLEEP_TIME 5
      33              : 
      34              : typedef enum
      35              : {
      36              :         STREAM_STOP_NONE,
      37              :         STREAM_STOP_END_OF_WAL,
      38              :         STREAM_STOP_KEEPALIVE,
      39              :         STREAM_STOP_SIGNAL
      40              : } StreamStopReason;
      41              : 
      42              : /* Global Options */
      43              : static char *outfile = NULL;
      44              : static int      verbose = 0;
      45              : static bool two_phase = false;  /* enable-two-phase option */
      46              : static bool failover = false;   /* enable-failover option */
      47              : static int      noloop = 0;
      48              : static int      standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      49              : static int      fsync_interval = 10 * 1000; /* 10 sec = default */
      50              : static XLogRecPtr startpos = InvalidXLogRecPtr;
      51              : static XLogRecPtr endpos = InvalidXLogRecPtr;
      52              : static bool do_create_slot = false;
      53              : static bool slot_exists_ok = false;
      54              : static bool do_start_slot = false;
      55              : static bool do_drop_slot = false;
      56              : static char *replication_slot = NULL;
      57              : 
      58              : /* filled pairwise with option, value. value may be NULL */
      59              : static char **options;
      60              : static size_t noptions = 0;
      61              : static const char *plugin = "test_decoding";
      62              : 
      63              : /* Global State */
      64              : static int      outfd = -1;
      65              : static volatile sig_atomic_t time_to_abort = false;
      66              : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
      67              : static volatile sig_atomic_t output_reopen = false;
      68              : static bool output_isfile;
      69              : static TimestampTz output_last_fsync = -1;
      70              : static bool output_needs_fsync = false;
      71              : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
      72              : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
      73              : 
      74              : static void usage(void);
      75              : static void StreamLogicalLog(void);
      76              : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
      77              : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
      78              :                                                            StreamStopReason reason,
      79              :                                                            XLogRecPtr lsn);
      80              : 
      81              : static void
      82            0 : usage(void)
      83              : {
      84            0 :         printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
      85              :                    progname);
      86            0 :         printf(_("Usage:\n"));
      87            0 :         printf(_("  %s [OPTION]...\n"), progname);
      88            0 :         printf(_("\nAction to be performed:\n"));
      89            0 :         printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
      90            0 :         printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
      91            0 :         printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
      92            0 :         printf(_("\nOptions:\n"));
      93            0 :         printf(_("      --enable-failover  enable replication slot synchronization to standby servers when\n"
      94              :                          "                         creating a replication slot\n"));
      95            0 :         printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      96            0 :         printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
      97            0 :         printf(_("  -F  --fsync-interval=SECS\n"
      98              :                          "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
      99            0 :         printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
     100            0 :         printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
     101            0 :         printf(_("  -n, --no-loop          do not loop on connection lost\n"));
     102            0 :         printf(_("  -o, --option=NAME[=VALUE]\n"
     103              :                          "                         pass option NAME with optional value VALUE to the\n"
     104              :                          "                         output plugin\n"));
     105            0 :         printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
     106            0 :         printf(_("  -s, --status-interval=SECS\n"
     107              :                          "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
     108            0 :         printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
     109            0 :         printf(_("  -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
     110            0 :         printf(_("      --two-phase        (same as --enable-two-phase, deprecated)\n"));
     111            0 :         printf(_("  -v, --verbose          output verbose messages\n"));
     112            0 :         printf(_("  -V, --version          output version information, then exit\n"));
     113            0 :         printf(_("  -?, --help             show this help, then exit\n"));
     114            0 :         printf(_("\nConnection options:\n"));
     115            0 :         printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
     116            0 :         printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
     117            0 :         printf(_("  -p, --port=PORT        database server port number\n"));
     118            0 :         printf(_("  -U, --username=NAME    connect as specified database user\n"));
     119            0 :         printf(_("  -w, --no-password      never prompt for password\n"));
     120            0 :         printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     121            0 :         printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     122            0 :         printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     123            0 : }
     124              : 
     125              : /*
     126              :  * Send a Standby Status Update message to server.
     127              :  */
     128              : static bool
     129            0 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
     130              : {
     131              :         static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
     132              :         static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
     133              : 
     134            0 :         char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
     135            0 :         int                     len = 0;
     136              : 
     137              :         /*
     138              :          * we normally don't want to send superfluous feedback, but if it's
     139              :          * because of a timeout we need to, otherwise wal_sender_timeout will kill
     140              :          * us.
     141              :          */
     142            0 :         if (!force &&
     143            0 :                 last_written_lsn == output_written_lsn &&
     144            0 :                 last_fsync_lsn == output_fsync_lsn)
     145            0 :                 return true;
     146              : 
     147            0 :         if (verbose)
     148            0 :                 pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
     149              :                                         LSN_FORMAT_ARGS(output_written_lsn),
     150              :                                         LSN_FORMAT_ARGS(output_fsync_lsn),
     151              :                                         replication_slot);
     152              : 
     153            0 :         replybuf[len] = PqReplMsg_StandbyStatusUpdate;
     154            0 :         len += 1;
     155            0 :         fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
     156            0 :         len += 8;
     157            0 :         fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
     158            0 :         len += 8;
     159            0 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     160            0 :         len += 8;
     161            0 :         fe_sendint64(now, &replybuf[len]);  /* sendTime */
     162            0 :         len += 8;
     163            0 :         replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     164            0 :         len += 1;
     165              : 
     166            0 :         startpos = output_written_lsn;
     167            0 :         last_written_lsn = output_written_lsn;
     168            0 :         last_fsync_lsn = output_fsync_lsn;
     169              : 
     170            0 :         if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     171              :         {
     172            0 :                 pg_log_error("could not send feedback packet: %s",
     173              :                                          PQerrorMessage(conn));
     174            0 :                 return false;
     175              :         }
     176              : 
     177            0 :         return true;
     178            0 : }
     179              : 
     180              : static void
     181            0 : disconnect_atexit(void)
     182              : {
     183            0 :         if (conn != NULL)
     184            0 :                 PQfinish(conn);
     185            0 : }
     186              : 
     187              : static void
     188            0 : OutputFsync(TimestampTz now)
     189              : {
     190            0 :         output_last_fsync = now;
     191              : 
     192            0 :         output_fsync_lsn = output_written_lsn;
     193              : 
     194              :         /*
     195              :          * Save the last flushed position as the replication start point. On
     196              :          * reconnect, replication resumes from there to avoid re-sending flushed
     197              :          * data.
     198              :          */
     199            0 :         startpos = output_fsync_lsn;
     200              : 
     201            0 :         if (fsync_interval <= 0)
     202            0 :                 return;
     203              : 
     204            0 :         if (!output_needs_fsync)
     205            0 :                 return;
     206              : 
     207            0 :         output_needs_fsync = false;
     208              : 
     209              :         /* can only fsync if it's a regular file */
     210            0 :         if (!output_isfile)
     211            0 :                 return;
     212              : 
     213            0 :         if (fsync(outfd) != 0)
     214            0 :                 pg_fatal("could not fsync file \"%s\": %m", outfile);
     215            0 : }
     216              : 
     217              : /*
     218              :  * Start the log streaming
     219              :  */
     220              : static void
     221            0 : StreamLogicalLog(void)
     222              : {
     223            0 :         PGresult   *res;
     224            0 :         char       *copybuf = NULL;
     225            0 :         TimestampTz last_status = -1;
     226            0 :         int                     i;
     227            0 :         PQExpBuffer query;
     228            0 :         XLogRecPtr      cur_record_lsn;
     229              : 
     230            0 :         cur_record_lsn = InvalidXLogRecPtr;
     231              : 
     232              :         /*
     233              :          * Connect in replication mode to the server
     234              :          */
     235            0 :         if (!conn)
     236            0 :                 conn = GetConnection();
     237            0 :         if (!conn)
     238              :                 /* Error message already written in GetConnection() */
     239            0 :                 return;
     240              : 
     241              :         /*
     242              :          * Start the replication
     243              :          */
     244            0 :         if (verbose)
     245            0 :                 pg_log_info("starting log streaming at %X/%08X (slot %s)",
     246              :                                         LSN_FORMAT_ARGS(startpos),
     247              :                                         replication_slot);
     248              : 
     249              :         /* Initiate the replication stream at specified location */
     250            0 :         query = createPQExpBuffer();
     251            0 :         appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
     252            0 :                                           replication_slot, LSN_FORMAT_ARGS(startpos));
     253              : 
     254              :         /* print options if there are any */
     255            0 :         if (noptions)
     256            0 :                 appendPQExpBufferStr(query, " (");
     257              : 
     258            0 :         for (i = 0; i < noptions; i++)
     259              :         {
     260              :                 /* separator */
     261            0 :                 if (i > 0)
     262            0 :                         appendPQExpBufferStr(query, ", ");
     263              : 
     264              :                 /* write option name */
     265            0 :                 appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
     266              : 
     267              :                 /* write option value if specified */
     268            0 :                 if (options[(i * 2) + 1] != NULL)
     269            0 :                         appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
     270            0 :         }
     271              : 
     272            0 :         if (noptions)
     273            0 :                 appendPQExpBufferChar(query, ')');
     274              : 
     275            0 :         res = PQexec(conn, query->data);
     276            0 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
     277              :         {
     278            0 :                 pg_log_error("could not send replication command \"%s\": %s",
     279              :                                          query->data, PQresultErrorMessage(res));
     280            0 :                 PQclear(res);
     281            0 :                 goto error;
     282              :         }
     283            0 :         PQclear(res);
     284            0 :         resetPQExpBuffer(query);
     285              : 
     286            0 :         if (verbose)
     287            0 :                 pg_log_info("streaming initiated");
     288              : 
     289            0 :         while (!time_to_abort)
     290              :         {
     291            0 :                 int                     r;
     292            0 :                 int                     bytes_left;
     293            0 :                 int                     bytes_written;
     294            0 :                 TimestampTz now;
     295            0 :                 int                     hdr_len;
     296              : 
     297            0 :                 cur_record_lsn = InvalidXLogRecPtr;
     298              : 
     299            0 :                 if (copybuf != NULL)
     300              :                 {
     301            0 :                         PQfreemem(copybuf);
     302            0 :                         copybuf = NULL;
     303            0 :                 }
     304              : 
     305              :                 /*
     306              :                  * Potentially send a status message to the primary.
     307              :                  */
     308            0 :                 now = feGetCurrentTimestamp();
     309              : 
     310            0 :                 if (outfd != -1 &&
     311            0 :                         feTimestampDifferenceExceeds(output_last_fsync, now,
     312            0 :                                                                                  fsync_interval))
     313            0 :                         OutputFsync(now);
     314              : 
     315            0 :                 if (standby_message_timeout > 0 &&
     316            0 :                         feTimestampDifferenceExceeds(last_status, now,
     317            0 :                                                                                  standby_message_timeout))
     318              :                 {
     319              :                         /* Time to send feedback! */
     320            0 :                         if (!sendFeedback(conn, now, true, false))
     321            0 :                                 goto error;
     322              : 
     323            0 :                         last_status = now;
     324            0 :                 }
     325              : 
     326              :                 /* got SIGHUP, close output file */
     327            0 :                 if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
     328              :                 {
     329            0 :                         now = feGetCurrentTimestamp();
     330            0 :                         OutputFsync(now);
     331            0 :                         close(outfd);
     332            0 :                         outfd = -1;
     333            0 :                 }
     334            0 :                 output_reopen = false;
     335              : 
     336              :                 /* open the output file, if not open yet */
     337            0 :                 if (outfd == -1)
     338              :                 {
     339            0 :                         struct stat statbuf;
     340              : 
     341            0 :                         if (strcmp(outfile, "-") == 0)
     342            0 :                                 outfd = fileno(stdout);
     343              :                         else
     344            0 :                                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
     345              :                                                          S_IRUSR | S_IWUSR);
     346            0 :                         if (outfd == -1)
     347              :                         {
     348            0 :                                 pg_log_error("could not open log file \"%s\": %m", outfile);
     349            0 :                                 goto error;
     350              :                         }
     351              : 
     352            0 :                         if (fstat(outfd, &statbuf) != 0)
     353              :                         {
     354            0 :                                 pg_log_error("could not stat file \"%s\": %m", outfile);
     355            0 :                                 goto error;
     356              :                         }
     357              : 
     358            0 :                         output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
     359            0 :                 }
     360              : 
     361            0 :                 r = PQgetCopyData(conn, &copybuf, 1);
     362            0 :                 if (r == 0)
     363              :                 {
     364              :                         /*
     365              :                          * In async mode, and no data available. We block on reading but
     366              :                          * not more than the specified timeout, so that we can send a
     367              :                          * response back to the client.
     368              :                          */
     369            0 :                         fd_set          input_mask;
     370            0 :                         TimestampTz message_target = 0;
     371            0 :                         TimestampTz fsync_target = 0;
     372            0 :                         struct timeval timeout;
     373            0 :                         struct timeval *timeoutptr = NULL;
     374              : 
     375            0 :                         if (PQsocket(conn) < 0)
     376              :                         {
     377            0 :                                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     378            0 :                                 goto error;
     379              :                         }
     380              : 
     381            0 :                         FD_ZERO(&input_mask);
     382            0 :                         FD_SET(PQsocket(conn), &input_mask);
     383              : 
     384              :                         /* Compute when we need to wakeup to send a keepalive message. */
     385            0 :                         if (standby_message_timeout)
     386            0 :                                 message_target = last_status + (standby_message_timeout - 1) *
     387              :                                         ((int64) 1000);
     388              : 
     389              :                         /* Compute when we need to wakeup to fsync the output file. */
     390            0 :                         if (fsync_interval > 0 && output_needs_fsync)
     391            0 :                                 fsync_target = output_last_fsync + (fsync_interval - 1) *
     392              :                                         ((int64) 1000);
     393              : 
     394              :                         /* Now compute when to wakeup. */
     395            0 :                         if (message_target > 0 || fsync_target > 0)
     396              :                         {
     397            0 :                                 TimestampTz targettime;
     398            0 :                                 long            secs;
     399            0 :                                 int                     usecs;
     400              : 
     401            0 :                                 targettime = message_target;
     402              : 
     403            0 :                                 if (fsync_target > 0 && fsync_target < targettime)
     404            0 :                                         targettime = fsync_target;
     405              : 
     406            0 :                                 feTimestampDifference(now,
     407            0 :                                                                           targettime,
     408              :                                                                           &secs,
     409              :                                                                           &usecs);
     410            0 :                                 if (secs <= 0)
     411            0 :                                         timeout.tv_sec = 1; /* Always sleep at least 1 sec */
     412              :                                 else
     413            0 :                                         timeout.tv_sec = secs;
     414            0 :                                 timeout.tv_usec = usecs;
     415            0 :                                 timeoutptr = &timeout;
     416            0 :                         }
     417              : 
     418            0 :                         r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
     419            0 :                         if (r == 0 || (r < 0 && errno == EINTR))
     420              :                         {
     421              :                                 /*
     422              :                                  * Got a timeout or signal. Continue the loop and either
     423              :                                  * deliver a status packet to the server or just go back into
     424              :                                  * blocking.
     425              :                                  */
     426            0 :                                 continue;
     427              :                         }
     428            0 :                         else if (r < 0)
     429              :                         {
     430            0 :                                 pg_log_error("%s() failed: %m", "select");
     431            0 :                                 goto error;
     432              :                         }
     433              : 
     434              :                         /* Else there is actually data on the socket */
     435            0 :                         if (PQconsumeInput(conn) == 0)
     436              :                         {
     437            0 :                                 pg_log_error("could not receive data from WAL stream: %s",
     438              :                                                          PQerrorMessage(conn));
     439            0 :                                 goto error;
     440              :                         }
     441            0 :                         continue;
     442            0 :                 }
     443              : 
     444              :                 /* End of copy stream */
     445            0 :                 if (r == -1)
     446            0 :                         break;
     447              : 
     448              :                 /* Failure while reading the copy stream */
     449            0 :                 if (r == -2)
     450              :                 {
     451            0 :                         pg_log_error("could not read COPY data: %s",
     452              :                                                  PQerrorMessage(conn));
     453            0 :                         goto error;
     454              :                 }
     455              : 
     456              :                 /* Check the message type. */
     457            0 :                 if (copybuf[0] == PqReplMsg_Keepalive)
     458              :                 {
     459            0 :                         int                     pos;
     460            0 :                         bool            replyRequested;
     461            0 :                         XLogRecPtr      walEnd;
     462            0 :                         bool            endposReached = false;
     463              : 
     464              :                         /*
     465              :                          * Parse the keepalive message, enclosed in the CopyData message.
     466              :                          * We just check if the server requested a reply, and ignore the
     467              :                          * rest.
     468              :                          */
     469            0 :                         pos = 1;                        /* skip msgtype PqReplMsg_Keepalive */
     470            0 :                         walEnd = fe_recvint64(&copybuf[pos]);
     471            0 :                         output_written_lsn = Max(walEnd, output_written_lsn);
     472              : 
     473            0 :                         pos += 8;                       /* read walEnd */
     474              : 
     475            0 :                         pos += 8;                       /* skip sendTime */
     476              : 
     477            0 :                         if (r < pos + 1)
     478              :                         {
     479            0 :                                 pg_log_error("streaming header too small: %d", r);
     480            0 :                                 goto error;
     481              :                         }
     482            0 :                         replyRequested = copybuf[pos];
     483              : 
     484            0 :                         if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
     485              :                         {
     486              :                                 /*
     487              :                                  * If there's nothing to read on the socket until a keepalive
     488              :                                  * we know that the server has nothing to send us; and if
     489              :                                  * walEnd has passed endpos, we know nothing else can have
     490              :                                  * committed before endpos.  So we can bail out now.
     491              :                                  */
     492            0 :                                 endposReached = true;
     493            0 :                         }
     494              : 
     495              :                         /* Send a reply, if necessary */
     496            0 :                         if (replyRequested || endposReached)
     497              :                         {
     498            0 :                                 if (!flushAndSendFeedback(conn, &now))
     499            0 :                                         goto error;
     500            0 :                                 last_status = now;
     501            0 :                         }
     502              : 
     503            0 :                         if (endposReached)
     504              :                         {
     505            0 :                                 stop_reason = STREAM_STOP_KEEPALIVE;
     506            0 :                                 time_to_abort = true;
     507            0 :                                 break;
     508              :                         }
     509              : 
     510            0 :                         continue;
     511            0 :                 }
     512            0 :                 else if (copybuf[0] != PqReplMsg_WALData)
     513              :                 {
     514            0 :                         pg_log_error("unrecognized streaming header: \"%c\"",
     515              :                                                  copybuf[0]);
     516            0 :                         goto error;
     517              :                 }
     518              : 
     519              :                 /*
     520              :                  * Read the header of the WALData message, enclosed in the CopyData
     521              :                  * message. We only need the WAL location field (dataStart), the rest
     522              :                  * of the header is ignored.
     523              :                  */
     524            0 :                 hdr_len = 1;                    /* msgtype PqReplMsg_WALData */
     525            0 :                 hdr_len += 8;                   /* dataStart */
     526            0 :                 hdr_len += 8;                   /* walEnd */
     527            0 :                 hdr_len += 8;                   /* sendTime */
     528            0 :                 if (r < hdr_len + 1)
     529              :                 {
     530            0 :                         pg_log_error("streaming header too small: %d", r);
     531            0 :                         goto error;
     532              :                 }
     533              : 
     534              :                 /* Extract WAL location for this block */
     535            0 :                 cur_record_lsn = fe_recvint64(&copybuf[1]);
     536              : 
     537            0 :                 if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
     538              :                 {
     539              :                         /*
     540              :                          * We've read past our endpoint, so prepare to go away being
     541              :                          * cautious about what happens to our output data.
     542              :                          */
     543            0 :                         if (!flushAndSendFeedback(conn, &now))
     544            0 :                                 goto error;
     545            0 :                         stop_reason = STREAM_STOP_END_OF_WAL;
     546            0 :                         time_to_abort = true;
     547            0 :                         break;
     548              :                 }
     549              : 
     550            0 :                 output_written_lsn = Max(cur_record_lsn, output_written_lsn);
     551              : 
     552            0 :                 bytes_left = r - hdr_len;
     553            0 :                 bytes_written = 0;
     554              : 
     555              :                 /* signal that a fsync is needed */
     556            0 :                 output_needs_fsync = true;
     557              : 
     558            0 :                 while (bytes_left)
     559              :                 {
     560            0 :                         int                     ret;
     561              : 
     562            0 :                         ret = write(outfd,
     563            0 :                                                 copybuf + hdr_len + bytes_written,
     564            0 :                                                 bytes_left);
     565              : 
     566            0 :                         if (ret < 0)
     567              :                         {
     568            0 :                                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
     569              :                                                          bytes_left, outfile);
     570            0 :                                 goto error;
     571              :                         }
     572              : 
     573              :                         /* Write was successful, advance our position */
     574            0 :                         bytes_written += ret;
     575            0 :                         bytes_left -= ret;
     576            0 :                 }
     577              : 
     578            0 :                 if (write(outfd, "\n", 1) != 1)
     579              :                 {
     580            0 :                         pg_log_error("could not write %d bytes to log file \"%s\": %m",
     581              :                                                  1, outfile);
     582            0 :                         goto error;
     583              :                 }
     584              : 
     585            0 :                 if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
     586              :                 {
     587              :                         /* endpos was exactly the record we just processed, we're done */
     588            0 :                         if (!flushAndSendFeedback(conn, &now))
     589            0 :                                 goto error;
     590            0 :                         stop_reason = STREAM_STOP_END_OF_WAL;
     591            0 :                         time_to_abort = true;
     592            0 :                         break;
     593              :                 }
     594            0 :         }
     595              : 
     596              :         /* Clean up connection state if stream has been aborted */
     597            0 :         if (time_to_abort)
     598            0 :                 prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
     599              : 
     600            0 :         res = PQgetResult(conn);
     601            0 :         if (PQresultStatus(res) == PGRES_COPY_OUT)
     602              :         {
     603            0 :                 PQclear(res);
     604              : 
     605              :                 /*
     606              :                  * We're doing a client-initiated clean exit and have sent CopyDone to
     607              :                  * the server. Drain any messages, so we don't miss a last-minute
     608              :                  * ErrorResponse. The walsender stops generating WALData records once
     609              :                  * it sees CopyDone, so expect this to finish quickly. After CopyDone,
     610              :                  * it's too late for sendFeedback(), even if this were to take a long
     611              :                  * time. Hence, use synchronous-mode PQgetCopyData().
     612              :                  */
     613            0 :                 while (1)
     614              :                 {
     615            0 :                         int                     r;
     616              : 
     617            0 :                         if (copybuf != NULL)
     618              :                         {
     619            0 :                                 PQfreemem(copybuf);
     620            0 :                                 copybuf = NULL;
     621            0 :                         }
     622            0 :                         r = PQgetCopyData(conn, &copybuf, 0);
     623            0 :                         if (r == -1)
     624            0 :                                 break;
     625            0 :                         if (r == -2)
     626              :                         {
     627            0 :                                 pg_log_error("could not read COPY data: %s",
     628              :                                                          PQerrorMessage(conn));
     629            0 :                                 time_to_abort = false;  /* unclean exit */
     630            0 :                                 goto error;
     631              :                         }
     632            0 :                 }
     633              : 
     634            0 :                 res = PQgetResult(conn);
     635            0 :         }
     636            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     637              :         {
     638            0 :                 pg_log_error("unexpected termination of replication stream: %s",
     639              :                                          PQresultErrorMessage(res));
     640            0 :                 PQclear(res);
     641            0 :                 goto error;
     642              :         }
     643            0 :         PQclear(res);
     644              : 
     645            0 :         if (outfd != -1 && strcmp(outfile, "-") != 0)
     646              :         {
     647            0 :                 TimestampTz t = feGetCurrentTimestamp();
     648              : 
     649            0 :                 OutputFsync(t);
     650            0 :                 if (close(outfd) != 0)
     651            0 :                         pg_log_error("could not close file \"%s\": %m", outfile);
     652            0 :         }
     653            0 :         outfd = -1;
     654              : error:
     655            0 :         if (copybuf != NULL)
     656              :         {
     657            0 :                 PQfreemem(copybuf);
     658            0 :                 copybuf = NULL;
     659            0 :         }
     660            0 :         destroyPQExpBuffer(query);
     661            0 :         PQfinish(conn);
     662            0 :         conn = NULL;
     663            0 : }
     664              : 
     665              : /*
     666              :  * Unfortunately we can't do sensible signal handling on windows...
     667              :  */
     668              : #ifndef WIN32
     669              : 
     670              : /*
     671              :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
     672              :  * possible moment.
     673              :  */
     674              : static void
     675            0 : sigexit_handler(SIGNAL_ARGS)
     676              : {
     677            0 :         stop_reason = STREAM_STOP_SIGNAL;
     678            0 :         time_to_abort = true;
     679            0 : }
     680              : 
     681              : /*
     682              :  * Trigger the output file to be reopened.
     683              :  */
     684              : static void
     685            0 : sighup_handler(SIGNAL_ARGS)
     686              : {
     687            0 :         output_reopen = true;
     688            0 : }
     689              : #endif
     690              : 
     691              : 
     692              : int
     693            0 : main(int argc, char **argv)
     694              : {
     695              :         static struct option long_options[] = {
     696              : /* general options */
     697              :                 {"file", required_argument, NULL, 'f'},
     698              :                 {"fsync-interval", required_argument, NULL, 'F'},
     699              :                 {"no-loop", no_argument, NULL, 'n'},
     700              :                 {"enable-failover", no_argument, NULL, 5},
     701              :                 {"enable-two-phase", no_argument, NULL, 't'},
     702              :                 {"two-phase", no_argument, NULL, 't'},        /* deprecated */
     703              :                 {"verbose", no_argument, NULL, 'v'},
     704              :                 {"version", no_argument, NULL, 'V'},
     705              :                 {"help", no_argument, NULL, '?'},
     706              : /* connection options */
     707              :                 {"dbname", required_argument, NULL, 'd'},
     708              :                 {"host", required_argument, NULL, 'h'},
     709              :                 {"port", required_argument, NULL, 'p'},
     710              :                 {"username", required_argument, NULL, 'U'},
     711              :                 {"no-password", no_argument, NULL, 'w'},
     712              :                 {"password", no_argument, NULL, 'W'},
     713              : /* replication options */
     714              :                 {"startpos", required_argument, NULL, 'I'},
     715              :                 {"endpos", required_argument, NULL, 'E'},
     716              :                 {"option", required_argument, NULL, 'o'},
     717              :                 {"plugin", required_argument, NULL, 'P'},
     718              :                 {"status-interval", required_argument, NULL, 's'},
     719              :                 {"slot", required_argument, NULL, 'S'},
     720              : /* action */
     721              :                 {"create-slot", no_argument, NULL, 1},
     722              :                 {"start", no_argument, NULL, 2},
     723              :                 {"drop-slot", no_argument, NULL, 3},
     724              :                 {"if-not-exists", no_argument, NULL, 4},
     725              :                 {NULL, 0, NULL, 0}
     726              :         };
     727            0 :         int                     c;
     728            0 :         int                     option_index;
     729            0 :         uint32          hi,
     730              :                                 lo;
     731            0 :         char       *db_name;
     732              : 
     733            0 :         pg_logging_init(argv[0]);
     734            0 :         progname = get_progname(argv[0]);
     735            0 :         set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     736              : 
     737            0 :         if (argc > 1)
     738              :         {
     739            0 :                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     740              :                 {
     741            0 :                         usage();
     742            0 :                         exit(0);
     743              :                 }
     744            0 :                 else if (strcmp(argv[1], "-V") == 0 ||
     745            0 :                                  strcmp(argv[1], "--version") == 0)
     746              :                 {
     747            0 :                         puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
     748            0 :                         exit(0);
     749              :                 }
     750            0 :         }
     751              : 
     752            0 :         while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
     753            0 :                                                         long_options, &option_index)) != -1)
     754              :         {
     755            0 :                 switch (c)
     756              :                 {
     757              : /* general options */
     758              :                         case 'f':
     759            0 :                                 outfile = pg_strdup(optarg);
     760            0 :                                 break;
     761              :                         case 'F':
     762            0 :                                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
     763              :                                                                           INT_MAX / 1000,
     764              :                                                                           &fsync_interval))
     765            0 :                                         exit(1);
     766            0 :                                 fsync_interval *= 1000;
     767            0 :                                 break;
     768              :                         case 'n':
     769            0 :                                 noloop = 1;
     770            0 :                                 break;
     771              :                         case 't':
     772            0 :                                 two_phase = true;
     773            0 :                                 break;
     774              :                         case 'v':
     775            0 :                                 verbose++;
     776            0 :                                 break;
     777              :                         case 5:
     778            0 :                                 failover = true;
     779            0 :                                 break;
     780              : /* connection options */
     781              :                         case 'd':
     782            0 :                                 dbname = pg_strdup(optarg);
     783            0 :                                 break;
     784              :                         case 'h':
     785            0 :                                 dbhost = pg_strdup(optarg);
     786            0 :                                 break;
     787              :                         case 'p':
     788            0 :                                 dbport = pg_strdup(optarg);
     789            0 :                                 break;
     790              :                         case 'U':
     791            0 :                                 dbuser = pg_strdup(optarg);
     792            0 :                                 break;
     793              :                         case 'w':
     794            0 :                                 dbgetpassword = -1;
     795            0 :                                 break;
     796              :                         case 'W':
     797            0 :                                 dbgetpassword = 1;
     798            0 :                                 break;
     799              : /* replication options */
     800              :                         case 'I':
     801            0 :                                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
     802            0 :                                         pg_fatal("could not parse start position \"%s\"", optarg);
     803            0 :                                 startpos = ((uint64) hi) << 32 | lo;
     804            0 :                                 break;
     805              :                         case 'E':
     806            0 :                                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
     807            0 :                                         pg_fatal("could not parse end position \"%s\"", optarg);
     808            0 :                                 endpos = ((uint64) hi) << 32 | lo;
     809            0 :                                 break;
     810              :                         case 'o':
     811              :                                 {
     812            0 :                                         char       *data = pg_strdup(optarg);
     813            0 :                                         char       *val = strchr(data, '=');
     814              : 
     815            0 :                                         if (val != NULL)
     816              :                                         {
     817              :                                                 /* remove =; separate data from val */
     818            0 :                                                 *val = '\0';
     819            0 :                                                 val++;
     820            0 :                                         }
     821              : 
     822            0 :                                         noptions += 1;
     823            0 :                                         options = pg_realloc(options, sizeof(char *) * noptions * 2);
     824              : 
     825            0 :                                         options[(noptions - 1) * 2] = data;
     826            0 :                                         options[(noptions - 1) * 2 + 1] = val;
     827            0 :                                 }
     828              : 
     829            0 :                                 break;
     830              :                         case 'P':
     831            0 :                                 plugin = pg_strdup(optarg);
     832            0 :                                 break;
     833              :                         case 's':
     834            0 :                                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
     835              :                                                                           INT_MAX / 1000,
     836              :                                                                           &standby_message_timeout))
     837            0 :                                         exit(1);
     838            0 :                                 standby_message_timeout *= 1000;
     839            0 :                                 break;
     840              :                         case 'S':
     841            0 :                                 replication_slot = pg_strdup(optarg);
     842            0 :                                 break;
     843              : /* action */
     844              :                         case 1:
     845            0 :                                 do_create_slot = true;
     846            0 :                                 break;
     847              :                         case 2:
     848            0 :                                 do_start_slot = true;
     849            0 :                                 break;
     850              :                         case 3:
     851            0 :                                 do_drop_slot = true;
     852            0 :                                 break;
     853              :                         case 4:
     854            0 :                                 slot_exists_ok = true;
     855            0 :                                 break;
     856              : 
     857              :                         default:
     858              :                                 /* getopt_long already emitted a complaint */
     859            0 :                                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     860            0 :                                 exit(1);
     861              :                 }
     862              :         }
     863              : 
     864              :         /*
     865              :          * Any non-option arguments?
     866              :          */
     867            0 :         if (optind < argc)
     868              :         {
     869            0 :                 pg_log_error("too many command-line arguments (first is \"%s\")",
     870              :                                          argv[optind]);
     871            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     872            0 :                 exit(1);
     873              :         }
     874              : 
     875              :         /*
     876              :          * Required arguments
     877              :          */
     878            0 :         if (replication_slot == NULL)
     879              :         {
     880            0 :                 pg_log_error("no slot specified");
     881            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     882            0 :                 exit(1);
     883              :         }
     884              : 
     885            0 :         if (do_start_slot && outfile == NULL)
     886              :         {
     887            0 :                 pg_log_error("no target file specified");
     888            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     889            0 :                 exit(1);
     890              :         }
     891              : 
     892            0 :         if (!do_drop_slot && dbname == NULL)
     893              :         {
     894            0 :                 pg_log_error("no database specified");
     895            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     896            0 :                 exit(1);
     897              :         }
     898              : 
     899            0 :         if (!do_drop_slot && !do_create_slot && !do_start_slot)
     900              :         {
     901            0 :                 pg_log_error("at least one action needs to be specified");
     902            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     903            0 :                 exit(1);
     904              :         }
     905              : 
     906            0 :         if (do_drop_slot && (do_create_slot || do_start_slot))
     907              :         {
     908            0 :                 pg_log_error("cannot use --create-slot or --start together with --drop-slot");
     909            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     910            0 :                 exit(1);
     911              :         }
     912              : 
     913            0 :         if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
     914              :         {
     915            0 :                 pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
     916            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     917            0 :                 exit(1);
     918              :         }
     919              : 
     920            0 :         if (XLogRecPtrIsValid(endpos) && !do_start_slot)
     921              :         {
     922            0 :                 pg_log_error("--endpos may only be specified with --start");
     923            0 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     924            0 :                 exit(1);
     925              :         }
     926              : 
     927            0 :         if (!do_create_slot)
     928              :         {
     929            0 :                 if (two_phase)
     930              :                 {
     931            0 :                         pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
     932            0 :                         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     933            0 :                         exit(1);
     934              :                 }
     935              : 
     936            0 :                 if (failover)
     937              :                 {
     938            0 :                         pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
     939            0 :                         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     940            0 :                         exit(1);
     941              :                 }
     942            0 :         }
     943              : 
     944              :         /*
     945              :          * Obtain a connection to server.  Notably, if we need a password, we want
     946              :          * to collect it from the user immediately.
     947              :          */
     948            0 :         conn = GetConnection();
     949            0 :         if (!conn)
     950              :                 /* Error message already written in GetConnection() */
     951            0 :                 exit(1);
     952            0 :         atexit(disconnect_atexit);
     953              : 
     954              :         /*
     955              :          * Trap signals.  (Don't do this until after the initial password prompt,
     956              :          * if one is needed, in GetConnection.)
     957              :          */
     958              : #ifndef WIN32
     959            0 :         pqsignal(SIGINT, sigexit_handler);
     960            0 :         pqsignal(SIGTERM, sigexit_handler);
     961            0 :         pqsignal(SIGHUP, sighup_handler);
     962              : #endif
     963              : 
     964              :         /*
     965              :          * Run IDENTIFY_SYSTEM to check the connection type for each action.
     966              :          * --create-slot and --start actions require a database-specific
     967              :          * replication connection because they handle logical replication slots.
     968              :          * --drop-slot can remove replication slots from any replication
     969              :          * connection without this restriction.
     970              :          */
     971            0 :         if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     972            0 :                 exit(1);
     973              : 
     974            0 :         if (!do_drop_slot && db_name == NULL)
     975            0 :                 pg_fatal("could not establish database-specific replication connection");
     976              : 
     977              :         /*
     978              :          * Set umask so that directories/files are created with the same
     979              :          * permissions as directories/files in the source data directory.
     980              :          *
     981              :          * pg_mode_mask is set to owner-only by default and then updated in
     982              :          * GetConnection() where we get the mode from the server-side with
     983              :          * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     984              :          */
     985            0 :         umask(pg_mode_mask);
     986              : 
     987              :         /* Drop a replication slot. */
     988            0 :         if (do_drop_slot)
     989              :         {
     990            0 :                 if (verbose)
     991            0 :                         pg_log_info("dropping replication slot \"%s\"", replication_slot);
     992              : 
     993            0 :                 if (!DropReplicationSlot(conn, replication_slot))
     994            0 :                         exit(1);
     995            0 :         }
     996              : 
     997              :         /* Create a replication slot. */
     998            0 :         if (do_create_slot)
     999              :         {
    1000            0 :                 if (verbose)
    1001            0 :                         pg_log_info("creating replication slot \"%s\"", replication_slot);
    1002              : 
    1003            0 :                 if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
    1004            0 :                                                                    false, false, slot_exists_ok, two_phase,
    1005            0 :                                                                    failover))
    1006            0 :                         exit(1);
    1007            0 :                 startpos = InvalidXLogRecPtr;
    1008            0 :         }
    1009              : 
    1010            0 :         if (!do_start_slot)
    1011            0 :                 exit(0);
    1012              : 
    1013              :         /* Stream loop */
    1014            0 :         while (true)
    1015              :         {
    1016            0 :                 StreamLogicalLog();
    1017            0 :                 if (time_to_abort)
    1018              :                 {
    1019              :                         /*
    1020              :                          * We've been Ctrl-C'ed or reached an exit limit condition. That's
    1021              :                          * not an error, so exit without an errorcode.
    1022              :                          */
    1023            0 :                         exit(0);
    1024              :                 }
    1025              : 
    1026              :                 /*
    1027              :                  * Ensure all written data is flushed to disk before exiting or
    1028              :                  * starting a new replication.
    1029              :                  */
    1030            0 :                 if (outfd != -1)
    1031            0 :                         OutputFsync(feGetCurrentTimestamp());
    1032              : 
    1033            0 :                 if (noloop)
    1034              :                 {
    1035            0 :                         pg_fatal("disconnected");
    1036            0 :                 }
    1037              :                 else
    1038              :                 {
    1039              :                         /* translator: check source for value for %d */
    1040            0 :                         pg_log_info("disconnected; waiting %d seconds to try again",
    1041              :                                                 RECONNECT_SLEEP_TIME);
    1042            0 :                         pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
    1043              :                 }
    1044              :         }
    1045              : }
    1046              : 
    1047              : /*
    1048              :  * Fsync our output data, and send a feedback message to the server.  Returns
    1049              :  * true if successful, false otherwise.
    1050              :  *
    1051              :  * If successful, *now is updated to the current timestamp just before sending
    1052              :  * feedback.
    1053              :  */
    1054              : static bool
    1055            0 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
    1056              : {
    1057              :         /* flush data to disk, so that we send a recent flush pointer */
    1058            0 :         OutputFsync(*now);
    1059            0 :         *now = feGetCurrentTimestamp();
    1060            0 :         if (!sendFeedback(conn, *now, true, false))
    1061            0 :                 return false;
    1062              : 
    1063            0 :         return true;
    1064            0 : }
    1065              : 
    1066              : /*
    1067              :  * Try to inform the server about our upcoming demise, but don't wait around or
    1068              :  * retry on failure.
    1069              :  */
    1070              : static void
    1071            0 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
    1072              :                                    XLogRecPtr lsn)
    1073              : {
    1074            0 :         (void) PQputCopyEnd(conn, NULL);
    1075            0 :         (void) PQflush(conn);
    1076              : 
    1077            0 :         if (verbose)
    1078              :         {
    1079            0 :                 switch (reason)
    1080              :                 {
    1081              :                         case STREAM_STOP_SIGNAL:
    1082            0 :                                 pg_log_info("received interrupt signal, exiting");
    1083            0 :                                 break;
    1084              :                         case STREAM_STOP_KEEPALIVE:
    1085            0 :                                 pg_log_info("end position %X/%08X reached by keepalive",
    1086              :                                                         LSN_FORMAT_ARGS(endpos));
    1087            0 :                                 break;
    1088              :                         case STREAM_STOP_END_OF_WAL:
    1089            0 :                                 Assert(XLogRecPtrIsValid(lsn));
    1090            0 :                                 pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
    1091              :                                                         LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
    1092            0 :                                 break;
    1093              :                         case STREAM_STOP_NONE:
    1094            0 :                                 Assert(false);
    1095              :                                 break;
    1096              :                 }
    1097            0 :         }
    1098            0 : }
        

Generated by: LCOV version 2.3.2-1