LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 532 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 17 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * receivelog.c - receive WAL files using the streaming
       4              :  *                                replication protocol.
       5              :  *
       6              :  * Author: Magnus Hagander <magnus@hagander.net>
       7              :  *
       8              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *                src/bin/pg_basebackup/receivelog.c
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <sys/select.h>
      18              : #include <sys/stat.h>
      19              : #include <unistd.h>
      20              : 
      21              : #include "access/xlog_internal.h"
      22              : #include "common/logging.h"
      23              : #include "libpq-fe.h"
      24              : #include "libpq/protocol.h"
      25              : #include "receivelog.h"
      26              : #include "streamutil.h"
      27              : 
      28              : /* currently open WAL file */
      29              : static Walfile *walfile = NULL;
      30              : static bool reportFlushPosition = false;
      31              : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
      32              : 
      33              : static bool still_sending = true;       /* feedback still needs to be sent? */
      34              : 
      35              : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
      36              :                                                                   XLogRecPtr *stoppos);
      37              : static int      CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
      38              : static int      CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
      39              :                                                           char **buffer);
      40              : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
      41              :                                                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
      42              : static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
      43              :                                                           XLogRecPtr *blockpos);
      44              : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
      45              :                                                                            XLogRecPtr blockpos, XLogRecPtr *stoppos);
      46              : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
      47              : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
      48              :                                                                                  TimestampTz last_status);
      49              : 
      50              : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
      51              :                                                                          uint32 *timeline);
      52              : 
      53              : static bool
      54            0 : mark_file_as_archived(StreamCtl *stream, const char *fname)
      55              : {
      56            0 :         Walfile    *f;
      57              :         static char tmppath[MAXPGPATH];
      58              : 
      59            0 :         snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
      60            0 :                          fname);
      61              : 
      62            0 :         f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
      63              :                                                                                            NULL, 0);
      64            0 :         if (f == NULL)
      65              :         {
      66            0 :                 pg_log_error("could not create archive status file \"%s\": %s",
      67              :                                          tmppath, GetLastWalMethodError(stream->walmethod));
      68            0 :                 return false;
      69              :         }
      70              : 
      71            0 :         if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
      72              :         {
      73            0 :                 pg_log_error("could not close archive status file \"%s\": %s",
      74              :                                          tmppath, GetLastWalMethodError(stream->walmethod));
      75            0 :                 return false;
      76              :         }
      77              : 
      78            0 :         return true;
      79            0 : }
      80              : 
      81              : /*
      82              :  * Open a new WAL file in the specified directory.
      83              :  *
      84              :  * Returns true if OK; on failure, returns false after printing an error msg.
      85              :  * On success, 'walfile' is set to the opened WAL file.
      86              :  *
      87              :  * The file will be padded to 16Mb with zeroes.
      88              :  */
      89              : static bool
      90            0 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
      91              : {
      92            0 :         Walfile    *f;
      93            0 :         char       *fn;
      94            0 :         ssize_t         size;
      95            0 :         XLogSegNo       segno;
      96            0 :         char            walfile_name[MAXPGPATH];
      97              : 
      98            0 :         XLByteToSeg(startpoint, segno, WalSegSz);
      99            0 :         XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
     100              : 
     101              :         /* Note that this considers the compression used if necessary */
     102            0 :         fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     103            0 :                                                                                            walfile_name,
     104            0 :                                                                                            stream->partial_suffix);
     105              : 
     106              :         /*
     107              :          * When streaming to files, if an existing file exists we verify that it's
     108              :          * either empty (just created), or a complete WalSegSz segment (in which
     109              :          * case it has been created and padded). Anything else indicates a corrupt
     110              :          * file. Compressed files have no need for padding, so just ignore this
     111              :          * case.
     112              :          *
     113              :          * When streaming to tar, no file with this name will exist before, so we
     114              :          * never have to verify a size.
     115              :          */
     116            0 :         if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
     117            0 :                 stream->walmethod->ops->existsfile(stream->walmethod, fn))
     118              :         {
     119            0 :                 size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
     120            0 :                 if (size < 0)
     121              :                 {
     122            0 :                         pg_log_error("could not get size of write-ahead log file \"%s\": %s",
     123              :                                                  fn, GetLastWalMethodError(stream->walmethod));
     124            0 :                         pg_free(fn);
     125            0 :                         return false;
     126              :                 }
     127            0 :                 if (size == WalSegSz)
     128              :                 {
     129              :                         /* Already padded file. Open it for use */
     130            0 :                         f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
     131            0 :                         if (f == NULL)
     132              :                         {
     133            0 :                                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
     134              :                                                          fn, GetLastWalMethodError(stream->walmethod));
     135            0 :                                 pg_free(fn);
     136            0 :                                 return false;
     137              :                         }
     138              : 
     139              :                         /* fsync file in case of a previous crash */
     140            0 :                         if (stream->walmethod->ops->sync(f) != 0)
     141              :                         {
     142            0 :                                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
     143              :                                                          fn, GetLastWalMethodError(stream->walmethod));
     144            0 :                                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
     145            0 :                                 exit(1);
     146              :                         }
     147              : 
     148            0 :                         walfile = f;
     149            0 :                         pg_free(fn);
     150            0 :                         return true;
     151              :                 }
     152            0 :                 if (size != 0)
     153              :                 {
     154              :                         /* if write didn't set errno, assume problem is no disk space */
     155            0 :                         if (errno == 0)
     156            0 :                                 errno = ENOSPC;
     157            0 :                         pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
     158              :                                                                   "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
     159              :                                                                   size),
     160              :                                                  fn, size, WalSegSz);
     161            0 :                         pg_free(fn);
     162            0 :                         return false;
     163              :                 }
     164              :                 /* File existed and was empty, so fall through and open */
     165            0 :         }
     166              : 
     167              :         /* No file existed, so create one */
     168              : 
     169            0 :         f = stream->walmethod->ops->open_for_write(stream->walmethod,
     170            0 :                                                                                            walfile_name,
     171            0 :                                                                                            stream->partial_suffix,
     172            0 :                                                                                            WalSegSz);
     173            0 :         if (f == NULL)
     174              :         {
     175            0 :                 pg_log_error("could not open write-ahead log file \"%s\": %s",
     176              :                                          fn, GetLastWalMethodError(stream->walmethod));
     177            0 :                 pg_free(fn);
     178            0 :                 return false;
     179              :         }
     180              : 
     181            0 :         pg_free(fn);
     182            0 :         walfile = f;
     183            0 :         return true;
     184            0 : }
     185              : 
     186              : /*
     187              :  * Close the current WAL file (if open), and rename it to the correct
     188              :  * filename if it's complete. On failure, prints an error message to stderr
     189              :  * and returns false, otherwise returns true.
     190              :  */
     191              : static bool
     192            0 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
     193              : {
     194            0 :         char       *fn;
     195            0 :         pgoff_t         currpos;
     196            0 :         int                     r;
     197            0 :         char            walfile_name[MAXPGPATH];
     198              : 
     199            0 :         if (walfile == NULL)
     200            0 :                 return true;
     201              : 
     202            0 :         strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
     203            0 :         currpos = walfile->currpos;
     204              : 
     205              :         /* Note that this considers the compression used if necessary */
     206            0 :         fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     207            0 :                                                                                            walfile_name,
     208            0 :                                                                                            stream->partial_suffix);
     209              : 
     210            0 :         if (stream->partial_suffix)
     211              :         {
     212            0 :                 if (currpos == WalSegSz)
     213            0 :                         r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     214              :                 else
     215              :                 {
     216            0 :                         pg_log_info("not renaming \"%s\", segment is not complete", fn);
     217            0 :                         r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
     218              :                 }
     219            0 :         }
     220              :         else
     221            0 :                 r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     222              : 
     223            0 :         walfile = NULL;
     224              : 
     225            0 :         if (r != 0)
     226              :         {
     227            0 :                 pg_log_error("could not close file \"%s\": %s",
     228              :                                          fn, GetLastWalMethodError(stream->walmethod));
     229              : 
     230            0 :                 pg_free(fn);
     231            0 :                 return false;
     232              :         }
     233              : 
     234            0 :         pg_free(fn);
     235              : 
     236              :         /*
     237              :          * Mark file as archived if requested by the caller - pg_basebackup needs
     238              :          * to do so as files can otherwise get archived again after promotion of a
     239              :          * new node. This is in line with walreceiver.c always doing a
     240              :          * XLogArchiveForceDone() after a complete segment.
     241              :          */
     242            0 :         if (currpos == WalSegSz && stream->mark_done)
     243              :         {
     244              :                 /* writes error message if failed */
     245            0 :                 if (!mark_file_as_archived(stream, walfile_name))
     246            0 :                         return false;
     247            0 :         }
     248              : 
     249            0 :         lastFlushPosition = pos;
     250            0 :         return true;
     251            0 : }
     252              : 
     253              : 
     254              : /*
     255              :  * Check if a timeline history file exists.
     256              :  */
     257              : static bool
     258            0 : existsTimeLineHistoryFile(StreamCtl *stream)
     259              : {
     260            0 :         char            histfname[MAXFNAMELEN];
     261              : 
     262              :         /*
     263              :          * Timeline 1 never has a history file. We treat that as if it existed,
     264              :          * since we never need to stream it.
     265              :          */
     266            0 :         if (stream->timeline == 1)
     267            0 :                 return true;
     268              : 
     269            0 :         TLHistoryFileName(histfname, stream->timeline);
     270              : 
     271            0 :         return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
     272            0 : }
     273              : 
     274              : static bool
     275            0 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
     276              : {
     277            0 :         int                     size = strlen(content);
     278            0 :         char            histfname[MAXFNAMELEN];
     279            0 :         Walfile    *f;
     280              : 
     281              :         /*
     282              :          * Check that the server's idea of how timeline history files should be
     283              :          * named matches ours.
     284              :          */
     285            0 :         TLHistoryFileName(histfname, stream->timeline);
     286            0 :         if (strcmp(histfname, filename) != 0)
     287              :         {
     288            0 :                 pg_log_error("server reported unexpected history file name for timeline %u: %s",
     289              :                                          stream->timeline, filename);
     290            0 :                 return false;
     291              :         }
     292              : 
     293            0 :         f = stream->walmethod->ops->open_for_write(stream->walmethod,
     294            0 :                                                                                            histfname, ".tmp", 0);
     295            0 :         if (f == NULL)
     296              :         {
     297            0 :                 pg_log_error("could not create timeline history file \"%s\": %s",
     298              :                                          histfname, GetLastWalMethodError(stream->walmethod));
     299            0 :                 return false;
     300              :         }
     301              : 
     302            0 :         if ((int) stream->walmethod->ops->write(f, content, size) != size)
     303              :         {
     304            0 :                 pg_log_error("could not write timeline history file \"%s\": %s",
     305              :                                          histfname, GetLastWalMethodError(stream->walmethod));
     306              : 
     307              :                 /*
     308              :                  * If we fail to make the file, delete it to release disk space
     309              :                  */
     310            0 :                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
     311              : 
     312            0 :                 return false;
     313              :         }
     314              : 
     315            0 :         if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
     316              :         {
     317            0 :                 pg_log_error("could not close file \"%s\": %s",
     318              :                                          histfname, GetLastWalMethodError(stream->walmethod));
     319            0 :                 return false;
     320              :         }
     321              : 
     322              :         /* Maintain archive_status, check close_walfile() for details. */
     323            0 :         if (stream->mark_done)
     324              :         {
     325              :                 /* writes error message if failed */
     326            0 :                 if (!mark_file_as_archived(stream, histfname))
     327            0 :                         return false;
     328            0 :         }
     329              : 
     330            0 :         return true;
     331            0 : }
     332              : 
     333              : /*
     334              :  * Send a Standby Status Update message to server.
     335              :  */
     336              : static bool
     337            0 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
     338              : {
     339            0 :         char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
     340            0 :         int                     len = 0;
     341              : 
     342            0 :         replybuf[len] = PqReplMsg_StandbyStatusUpdate;
     343            0 :         len += 1;
     344            0 :         fe_sendint64(blockpos, &replybuf[len]); /* write */
     345            0 :         len += 8;
     346            0 :         if (reportFlushPosition)
     347            0 :                 fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
     348              :         else
     349            0 :                 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
     350            0 :         len += 8;
     351            0 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     352            0 :         len += 8;
     353            0 :         fe_sendint64(now, &replybuf[len]);  /* sendTime */
     354            0 :         len += 8;
     355            0 :         replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     356            0 :         len += 1;
     357              : 
     358            0 :         if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     359              :         {
     360            0 :                 pg_log_error("could not send feedback packet: %s",
     361              :                                          PQerrorMessage(conn));
     362            0 :                 return false;
     363              :         }
     364              : 
     365            0 :         return true;
     366            0 : }
     367              : 
     368              : /*
     369              :  * Check that the server version we're connected to is supported by
     370              :  * ReceiveXlogStream().
     371              :  *
     372              :  * If it's not, an error message is printed to stderr, and false is returned.
     373              :  */
     374              : bool
     375            0 : CheckServerVersionForStreaming(PGconn *conn)
     376              : {
     377            0 :         int                     minServerMajor,
     378              :                                 maxServerMajor;
     379            0 :         int                     serverMajor;
     380              : 
     381              :         /*
     382              :          * The message format used in streaming replication changed in 9.3, so we
     383              :          * cannot stream from older servers. And we don't support servers newer
     384              :          * than the client; it might work, but we don't know, so err on the safe
     385              :          * side.
     386              :          */
     387            0 :         minServerMajor = 903;
     388            0 :         maxServerMajor = PG_VERSION_NUM / 100;
     389            0 :         serverMajor = PQserverVersion(conn) / 100;
     390            0 :         if (serverMajor < minServerMajor)
     391              :         {
     392            0 :                 const char *serverver = PQparameterStatus(conn, "server_version");
     393              : 
     394            0 :                 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
     395              :                                          serverver ? serverver : "'unknown'",
     396              :                                          "9.3");
     397            0 :                 return false;
     398            0 :         }
     399            0 :         else if (serverMajor > maxServerMajor)
     400              :         {
     401            0 :                 const char *serverver = PQparameterStatus(conn, "server_version");
     402              : 
     403            0 :                 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
     404              :                                          serverver ? serverver : "'unknown'",
     405              :                                          PG_VERSION);
     406            0 :                 return false;
     407            0 :         }
     408            0 :         return true;
     409            0 : }
     410              : 
     411              : /*
     412              :  * Receive a log stream starting at the specified position.
     413              :  *
     414              :  * Individual parameters are passed through the StreamCtl structure.
     415              :  *
     416              :  * If sysidentifier is specified, validate that both the system
     417              :  * identifier and the timeline matches the specified ones
     418              :  * (by sending an extra IDENTIFY_SYSTEM command)
     419              :  *
     420              :  * All received segments will be written to the directory
     421              :  * specified by basedir. This will also fetch any missing timeline history
     422              :  * files.
     423              :  *
     424              :  * The stream_stop callback will be called every time data
     425              :  * is received, and whenever a segment is completed. If it returns
     426              :  * true, the streaming will stop and the function
     427              :  * return. As long as it returns false, streaming will continue
     428              :  * indefinitely.
     429              :  *
     430              :  * If stream_stop() checks for external input, stop_socket should be set to
     431              :  * the FD it checks.  This will allow such input to be detected promptly
     432              :  * rather than after standby_message_timeout (which might be indefinite).
     433              :  * Note that signals will interrupt waits for input as well, but that is
     434              :  * race-y since a signal received while busy won't interrupt the wait.
     435              :  *
     436              :  * standby_message_timeout controls how often we send a message
     437              :  * back to the primary letting it know our progress, in milliseconds.
     438              :  * Zero means no messages are sent.
     439              :  * This message will only contain the write location, and never
     440              :  * flush or replay.
     441              :  *
     442              :  * If 'partial_suffix' is not NULL, files are initially created with the
     443              :  * given suffix, and the suffix is removed once the file is finished. That
     444              :  * allows you to tell the difference between partial and completed files,
     445              :  * so that you can continue later where you left.
     446              :  *
     447              :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
     448              :  * otherwise only when the WAL file is closed.
     449              :  *
     450              :  * Note: The WAL location *must* be at a log segment start!
     451              :  */
     452              : bool
     453            0 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
     454              : {
     455            0 :         char            query[128];
     456            0 :         char            slotcmd[128];
     457            0 :         PGresult   *res;
     458            0 :         XLogRecPtr      stoppos;
     459              : 
     460              :         /*
     461              :          * The caller should've checked the server version already, but doesn't do
     462              :          * any harm to check it here too.
     463              :          */
     464            0 :         if (!CheckServerVersionForStreaming(conn))
     465            0 :                 return false;
     466              : 
     467              :         /*
     468              :          * Decide whether we want to report the flush position. If we report the
     469              :          * flush position, the primary will know what WAL we'll possibly
     470              :          * re-request, and it can then remove older WAL safely. We must always do
     471              :          * that when we are using slots.
     472              :          *
     473              :          * Reporting the flush position makes one eligible as a synchronous
     474              :          * replica. People shouldn't include generic names in
     475              :          * synchronous_standby_names, but we've protected them against it so far,
     476              :          * so let's continue to do so unless specifically requested.
     477              :          */
     478            0 :         if (stream->replication_slot != NULL)
     479              :         {
     480            0 :                 reportFlushPosition = true;
     481            0 :                 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
     482            0 :         }
     483              :         else
     484              :         {
     485            0 :                 if (stream->synchronous)
     486            0 :                         reportFlushPosition = true;
     487              :                 else
     488            0 :                         reportFlushPosition = false;
     489            0 :                 slotcmd[0] = 0;
     490              :         }
     491              : 
     492            0 :         if (stream->sysidentifier != NULL)
     493              :         {
     494            0 :                 char       *sysidentifier = NULL;
     495            0 :                 TimeLineID      servertli;
     496              : 
     497              :                 /*
     498              :                  * Get the server system identifier and timeline, and validate them.
     499              :                  */
     500            0 :                 if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
     501              :                 {
     502            0 :                         pg_free(sysidentifier);
     503            0 :                         return false;
     504              :                 }
     505              : 
     506            0 :                 if (strcmp(stream->sysidentifier, sysidentifier) != 0)
     507              :                 {
     508            0 :                         pg_log_error("system identifier does not match between base backup and streaming connection");
     509            0 :                         pg_free(sysidentifier);
     510            0 :                         return false;
     511              :                 }
     512            0 :                 pg_free(sysidentifier);
     513              : 
     514            0 :                 if (stream->timeline > servertli)
     515              :                 {
     516            0 :                         pg_log_error("starting timeline %u is not present in the server",
     517              :                                                  stream->timeline);
     518            0 :                         return false;
     519              :                 }
     520            0 :         }
     521              : 
     522              :         /*
     523              :          * initialize flush position to starting point, it's the caller's
     524              :          * responsibility that that's sane.
     525              :          */
     526            0 :         lastFlushPosition = stream->startpos;
     527              : 
     528            0 :         while (1)
     529              :         {
     530              :                 /*
     531              :                  * Fetch the timeline history file for this timeline, if we don't have
     532              :                  * it already. When streaming log to tar, this will always return
     533              :                  * false, as we are never streaming into an existing file and
     534              :                  * therefore there can be no pre-existing timeline history file.
     535              :                  */
     536            0 :                 if (!existsTimeLineHistoryFile(stream))
     537              :                 {
     538            0 :                         snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
     539            0 :                         res = PQexec(conn, query);
     540            0 :                         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     541              :                         {
     542              :                                 /* FIXME: we might send it ok, but get an error */
     543            0 :                                 pg_log_error("could not send replication command \"%s\": %s",
     544              :                                                          "TIMELINE_HISTORY", PQresultErrorMessage(res));
     545            0 :                                 PQclear(res);
     546            0 :                                 return false;
     547              :                         }
     548              : 
     549              :                         /*
     550              :                          * The response to TIMELINE_HISTORY is a single row result set
     551              :                          * with two fields: filename and content
     552              :                          */
     553            0 :                         if (PQnfields(res) != 2 || PQntuples(res) != 1)
     554              :                         {
     555            0 :                                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
     556              :                                                            PQntuples(res), PQnfields(res), 1, 2);
     557            0 :                         }
     558              : 
     559              :                         /* Write the history file to disk */
     560            0 :                         writeTimeLineHistoryFile(stream,
     561            0 :                                                                          PQgetvalue(res, 0, 0),
     562            0 :                                                                          PQgetvalue(res, 0, 1));
     563              : 
     564            0 :                         PQclear(res);
     565            0 :                 }
     566              : 
     567              :                 /*
     568              :                  * Before we start streaming from the requested location, check if the
     569              :                  * callback tells us to stop here.
     570              :                  */
     571            0 :                 if (stream->stream_stop(stream->startpos, stream->timeline, false))
     572            0 :                         return true;
     573              : 
     574              :                 /* Initiate the replication stream at specified location */
     575            0 :                 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%08X TIMELINE %u",
     576            0 :                                  slotcmd,
     577            0 :                                  LSN_FORMAT_ARGS(stream->startpos),
     578            0 :                                  stream->timeline);
     579            0 :                 res = PQexec(conn, query);
     580            0 :                 if (PQresultStatus(res) != PGRES_COPY_BOTH)
     581              :                 {
     582            0 :                         pg_log_error("could not send replication command \"%s\": %s",
     583              :                                                  "START_REPLICATION", PQresultErrorMessage(res));
     584            0 :                         PQclear(res);
     585            0 :                         return false;
     586              :                 }
     587            0 :                 PQclear(res);
     588              : 
     589              :                 /* Stream the WAL */
     590            0 :                 res = HandleCopyStream(conn, stream, &stoppos);
     591            0 :                 if (res == NULL)
     592            0 :                         goto error;
     593              : 
     594              :                 /*
     595              :                  * Streaming finished.
     596              :                  *
     597              :                  * There are two possible reasons for that: a controlled shutdown, or
     598              :                  * we reached the end of the current timeline. In case of
     599              :                  * end-of-timeline, the server sends a result set after Copy has
     600              :                  * finished, containing information about the next timeline. Read
     601              :                  * that, and restart streaming from the next timeline. In case of
     602              :                  * controlled shutdown, stop here.
     603              :                  */
     604            0 :                 if (PQresultStatus(res) == PGRES_TUPLES_OK)
     605              :                 {
     606              :                         /*
     607              :                          * End-of-timeline. Read the next timeline's ID and starting
     608              :                          * position. Usually, the starting position will match the end of
     609              :                          * the previous timeline, but there are corner cases like if the
     610              :                          * server had sent us half of a WAL record, when it was promoted.
     611              :                          * The new timeline will begin at the end of the last complete
     612              :                          * record in that case, overlapping the partial WAL record on the
     613              :                          * old timeline.
     614              :                          */
     615            0 :                         uint32          newtimeline;
     616            0 :                         bool            parsed;
     617              : 
     618            0 :                         parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
     619            0 :                         PQclear(res);
     620            0 :                         if (!parsed)
     621            0 :                                 goto error;
     622              : 
     623              :                         /* Sanity check the values the server gave us */
     624            0 :                         if (newtimeline <= stream->timeline)
     625              :                         {
     626            0 :                                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
     627              :                                                          newtimeline, stream->timeline);
     628            0 :                                 goto error;
     629              :                         }
     630            0 :                         if (stream->startpos > stoppos)
     631              :                         {
     632            0 :                                 pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
     633              :                                                          stream->timeline, LSN_FORMAT_ARGS(stoppos),
     634              :                                                          newtimeline, LSN_FORMAT_ARGS(stream->startpos));
     635            0 :                                 goto error;
     636              :                         }
     637              : 
     638              :                         /* Read the final result, which should be CommandComplete. */
     639            0 :                         res = PQgetResult(conn);
     640            0 :                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     641              :                         {
     642            0 :                                 pg_log_error("unexpected termination of replication stream: %s",
     643              :                                                          PQresultErrorMessage(res));
     644            0 :                                 PQclear(res);
     645            0 :                                 goto error;
     646              :                         }
     647            0 :                         PQclear(res);
     648              : 
     649              :                         /*
     650              :                          * Loop back to start streaming from the new timeline. Always
     651              :                          * start streaming at the beginning of a segment.
     652              :                          */
     653            0 :                         stream->timeline = newtimeline;
     654            0 :                         stream->startpos = stream->startpos -
     655            0 :                                 XLogSegmentOffset(stream->startpos, WalSegSz);
     656            0 :                         continue;
     657            0 :                 }
     658            0 :                 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     659              :                 {
     660            0 :                         PQclear(res);
     661              : 
     662              :                         /*
     663              :                          * End of replication (ie. controlled shut down of the server).
     664              :                          *
     665              :                          * Check if the callback thinks it's OK to stop here. If not,
     666              :                          * complain.
     667              :                          */
     668            0 :                         if (stream->stream_stop(stoppos, stream->timeline, false))
     669            0 :                                 return true;
     670              :                         else
     671              :                         {
     672            0 :                                 pg_log_error("replication stream was terminated before stop point");
     673            0 :                                 goto error;
     674              :                         }
     675              :                 }
     676              :                 else
     677              :                 {
     678              :                         /* Server returned an error. */
     679            0 :                         pg_log_error("unexpected termination of replication stream: %s",
     680              :                                                  PQresultErrorMessage(res));
     681            0 :                         PQclear(res);
     682            0 :                         goto error;
     683              :                 }
     684              :         }
     685              : 
     686              : error:
     687            0 :         if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
     688            0 :                 pg_log_error("could not close file \"%s\": %s",
     689              :                                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
     690            0 :         walfile = NULL;
     691            0 :         return false;
     692            0 : }
     693              : 
     694              : /*
     695              :  * Helper function to parse the result set returned by server after streaming
     696              :  * has finished. On failure, prints an error to stderr and returns false.
     697              :  */
     698              : static bool
     699            0 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
     700              : {
     701            0 :         uint32          startpos_xlogid,
     702              :                                 startpos_xrecoff;
     703              : 
     704              :         /*----------
     705              :          * The result set consists of one row and two columns, e.g:
     706              :          *
     707              :          *      next_tli | next_tli_startpos
     708              :          * ----------+-------------------
     709              :          *                 4 | 0/9949AE0
     710              :          *
     711              :          * next_tli is the timeline ID of the next timeline after the one that
     712              :          * just finished streaming. next_tli_startpos is the WAL location where
     713              :          * the server switched to it.
     714              :          *----------
     715              :          */
     716            0 :         if (PQnfields(res) < 2 || PQntuples(res) != 1)
     717              :         {
     718            0 :                 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
     719              :                                          PQntuples(res), PQnfields(res), 1, 2);
     720            0 :                 return false;
     721              :         }
     722              : 
     723            0 :         *timeline = atoi(PQgetvalue(res, 0, 0));
     724            0 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &startpos_xlogid,
     725            0 :                            &startpos_xrecoff) != 2)
     726              :         {
     727            0 :                 pg_log_error("could not parse next timeline's starting point \"%s\"",
     728              :                                          PQgetvalue(res, 0, 1));
     729            0 :                 return false;
     730              :         }
     731            0 :         *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
     732              : 
     733            0 :         return true;
     734            0 : }
     735              : 
     736              : /*
     737              :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
     738              :  * initiating streaming with the START_REPLICATION command.
     739              :  *
     740              :  * If the COPY ends (not necessarily successfully) due a message from the
     741              :  * server, returns a PGresult and sets *stoppos to the last byte written.
     742              :  * On any other sort of error, returns NULL.
     743              :  */
     744              : static PGresult *
     745            0 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
     746              :                                  XLogRecPtr *stoppos)
     747              : {
     748            0 :         char       *copybuf = NULL;
     749            0 :         TimestampTz last_status = -1;
     750            0 :         XLogRecPtr      blockpos = stream->startpos;
     751              : 
     752            0 :         still_sending = true;
     753              : 
     754            0 :         while (1)
     755              :         {
     756            0 :                 int                     r;
     757            0 :                 TimestampTz now;
     758            0 :                 long            sleeptime;
     759              : 
     760              :                 /*
     761              :                  * Check if we should continue streaming, or abort at this point.
     762              :                  */
     763            0 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
     764            0 :                         goto error;
     765              : 
     766            0 :                 now = feGetCurrentTimestamp();
     767              : 
     768              :                 /*
     769              :                  * If synchronous option is true, issue sync command as soon as there
     770              :                  * are WAL data which has not been flushed yet.
     771              :                  */
     772            0 :                 if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
     773              :                 {
     774            0 :                         if (stream->walmethod->ops->sync(walfile) != 0)
     775            0 :                                 pg_fatal("could not fsync file \"%s\": %s",
     776              :                                                  walfile->pathname, GetLastWalMethodError(stream->walmethod));
     777            0 :                         lastFlushPosition = blockpos;
     778              : 
     779              :                         /*
     780              :                          * Send feedback so that the server sees the latest WAL locations
     781              :                          * immediately.
     782              :                          */
     783            0 :                         if (!sendFeedback(conn, blockpos, now, false))
     784            0 :                                 goto error;
     785            0 :                         last_status = now;
     786            0 :                 }
     787              : 
     788              :                 /*
     789              :                  * Potentially send a status message to the primary
     790              :                  */
     791            0 :                 if (still_sending && stream->standby_message_timeout > 0 &&
     792            0 :                         feTimestampDifferenceExceeds(last_status, now,
     793            0 :                                                                                  stream->standby_message_timeout))
     794              :                 {
     795              :                         /* Time to send feedback! */
     796            0 :                         if (!sendFeedback(conn, blockpos, now, false))
     797            0 :                                 goto error;
     798            0 :                         last_status = now;
     799            0 :                 }
     800              : 
     801              :                 /*
     802              :                  * Calculate how long send/receive loops should sleep
     803              :                  */
     804            0 :                 sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
     805            0 :                                                                                                  last_status);
     806              : 
     807              :                 /* Done with any prior message */
     808            0 :                 PQfreemem(copybuf);
     809            0 :                 copybuf = NULL;
     810              : 
     811            0 :                 r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
     812            0 :                 while (r != 0)
     813              :                 {
     814            0 :                         if (r == -1)
     815            0 :                                 goto error;
     816            0 :                         if (r == -2)
     817              :                         {
     818            0 :                                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
     819              : 
     820            0 :                                 if (res == NULL)
     821            0 :                                         goto error;
     822            0 :                                 PQfreemem(copybuf);
     823            0 :                                 return res;
     824            0 :                         }
     825              : 
     826              :                         /* Check the message type. */
     827            0 :                         if (copybuf[0] == PqReplMsg_Keepalive)
     828              :                         {
     829            0 :                                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
     830              :                                                                                  &last_status))
     831            0 :                                         goto error;
     832            0 :                         }
     833            0 :                         else if (copybuf[0] == PqReplMsg_WALData)
     834              :                         {
     835            0 :                                 if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
     836            0 :                                         goto error;
     837              : 
     838              :                                 /*
     839              :                                  * Check if we should continue streaming, or abort at this
     840              :                                  * point.
     841              :                                  */
     842            0 :                                 if (!CheckCopyStreamStop(conn, stream, blockpos))
     843            0 :                                         goto error;
     844            0 :                         }
     845              :                         else
     846              :                         {
     847            0 :                                 pg_log_error("unrecognized streaming header: \"%c\"",
     848              :                                                          copybuf[0]);
     849            0 :                                 goto error;
     850              :                         }
     851              : 
     852              :                         /* Done with that message */
     853            0 :                         PQfreemem(copybuf);
     854            0 :                         copybuf = NULL;
     855              : 
     856              :                         /*
     857              :                          * Process the received data, and any subsequent data we can read
     858              :                          * without blocking.
     859              :                          */
     860            0 :                         r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
     861              :                 }
     862            0 :         }
     863              : 
     864              : error:
     865            0 :         PQfreemem(copybuf);
     866            0 :         return NULL;
     867            0 : }
     868              : 
     869              : /*
     870              :  * Wait until we can read a CopyData message,
     871              :  * or timeout, or occurrence of a signal or input on the stop_socket.
     872              :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
     873              :  *
     874              :  * Returns 1 if data has become available for reading, 0 if timed out
     875              :  * or interrupted by signal or stop_socket input, and -1 on an error.
     876              :  */
     877              : static int
     878            0 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
     879              : {
     880            0 :         int                     ret;
     881            0 :         fd_set          input_mask;
     882            0 :         int                     connsocket;
     883            0 :         int                     maxfd;
     884            0 :         struct timeval timeout;
     885            0 :         struct timeval *timeoutptr;
     886              : 
     887            0 :         connsocket = PQsocket(conn);
     888            0 :         if (connsocket < 0)
     889              :         {
     890            0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     891            0 :                 return -1;
     892              :         }
     893              : 
     894            0 :         FD_ZERO(&input_mask);
     895            0 :         FD_SET(connsocket, &input_mask);
     896            0 :         maxfd = connsocket;
     897            0 :         if (stop_socket != PGINVALID_SOCKET)
     898              :         {
     899            0 :                 FD_SET(stop_socket, &input_mask);
     900            0 :                 maxfd = Max(maxfd, stop_socket);
     901            0 :         }
     902              : 
     903            0 :         if (timeout_ms < 0)
     904            0 :                 timeoutptr = NULL;
     905              :         else
     906              :         {
     907            0 :                 timeout.tv_sec = timeout_ms / 1000L;
     908            0 :                 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
     909            0 :                 timeoutptr = &timeout;
     910              :         }
     911              : 
     912            0 :         ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
     913              : 
     914            0 :         if (ret < 0)
     915              :         {
     916            0 :                 if (errno == EINTR)
     917            0 :                         return 0;                       /* Got a signal, so not an error */
     918            0 :                 pg_log_error("%s() failed: %m", "select");
     919            0 :                 return -1;
     920              :         }
     921            0 :         if (ret > 0 && FD_ISSET(connsocket, &input_mask))
     922            0 :                 return 1;                               /* Got input on connection socket */
     923              : 
     924            0 :         return 0;                                       /* Got timeout or input on stop_socket */
     925            0 : }
     926              : 
     927              : /*
     928              :  * Receive CopyData message available from XLOG stream, blocking for
     929              :  * maximum of 'timeout' ms.
     930              :  *
     931              :  * If data was received, returns the length of the data. *buffer is set to
     932              :  * point to a buffer holding the received message. The caller must eventually
     933              :  * free the buffer with PQfreemem().
     934              :  *
     935              :  * Returns 0 if no data was available within timeout, or if wait was
     936              :  * interrupted by signal or stop_socket input.
     937              :  * -1 on error. -2 if the server ended the COPY.
     938              :  */
     939              : static int
     940            0 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
     941              :                                   char **buffer)
     942              : {
     943            0 :         char       *copybuf = NULL;
     944            0 :         int                     rawlen;
     945              : 
     946              :         /* Caller should have cleared any prior buffer */
     947            0 :         Assert(*buffer == NULL);
     948              : 
     949              :         /* Try to receive a CopyData message */
     950            0 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
     951            0 :         if (rawlen == 0)
     952              :         {
     953            0 :                 int                     ret;
     954              : 
     955              :                 /*
     956              :                  * No data available.  Wait for some to appear, but not longer than
     957              :                  * the specified timeout, so that we can ping the server.  Also stop
     958              :                  * waiting if input appears on stop_socket.
     959              :                  */
     960            0 :                 ret = CopyStreamPoll(conn, timeout, stop_socket);
     961            0 :                 if (ret <= 0)
     962            0 :                         return ret;
     963              : 
     964              :                 /* Now there is actually data on the socket */
     965            0 :                 if (PQconsumeInput(conn) == 0)
     966              :                 {
     967            0 :                         pg_log_error("could not receive data from WAL stream: %s",
     968              :                                                  PQerrorMessage(conn));
     969            0 :                         return -1;
     970              :                 }
     971              : 
     972              :                 /* Now that we've consumed some input, try again */
     973            0 :                 rawlen = PQgetCopyData(conn, &copybuf, 1);
     974            0 :                 if (rawlen == 0)
     975            0 :                         return 0;
     976            0 :         }
     977            0 :         if (rawlen == -1)                       /* end-of-streaming or error */
     978            0 :                 return -2;
     979            0 :         if (rawlen == -2)
     980              :         {
     981            0 :                 pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
     982            0 :                 return -1;
     983              :         }
     984              : 
     985              :         /* Return received messages to caller */
     986            0 :         *buffer = copybuf;
     987            0 :         return rawlen;
     988            0 : }
     989              : 
     990              : /*
     991              :  * Process the keepalive message.
     992              :  */
     993              : static bool
     994            0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     995              :                                         XLogRecPtr blockpos, TimestampTz *last_status)
     996              : {
     997            0 :         int                     pos;
     998            0 :         bool            replyRequested;
     999            0 :         TimestampTz now;
    1000              : 
    1001              :         /*
    1002              :          * Parse the keepalive message, enclosed in the CopyData message. We just
    1003              :          * check if the server requested a reply, and ignore the rest.
    1004              :          */
    1005            0 :         pos = 1;                                        /* skip msgtype PqReplMsg_Keepalive */
    1006            0 :         pos += 8;                                       /* skip walEnd */
    1007            0 :         pos += 8;                                       /* skip sendTime */
    1008              : 
    1009            0 :         if (len < pos + 1)
    1010              :         {
    1011            0 :                 pg_log_error("streaming header too small: %d", len);
    1012            0 :                 return false;
    1013              :         }
    1014            0 :         replyRequested = copybuf[pos];
    1015              : 
    1016              :         /* If the server requested an immediate reply, send one. */
    1017            0 :         if (replyRequested && still_sending)
    1018              :         {
    1019            0 :                 if (reportFlushPosition && lastFlushPosition < blockpos &&
    1020            0 :                         walfile != NULL)
    1021              :                 {
    1022              :                         /*
    1023              :                          * If a valid flush location needs to be reported, flush the
    1024              :                          * current WAL file so that the latest flush location is sent back
    1025              :                          * to the server. This is necessary to see whether the last WAL
    1026              :                          * data has been successfully replicated or not, at the normal
    1027              :                          * shutdown of the server.
    1028              :                          */
    1029            0 :                         if (stream->walmethod->ops->sync(walfile) != 0)
    1030            0 :                                 pg_fatal("could not fsync file \"%s\": %s",
    1031              :                                                  walfile->pathname, GetLastWalMethodError(stream->walmethod));
    1032            0 :                         lastFlushPosition = blockpos;
    1033            0 :                 }
    1034              : 
    1035            0 :                 now = feGetCurrentTimestamp();
    1036            0 :                 if (!sendFeedback(conn, blockpos, now, false))
    1037            0 :                         return false;
    1038            0 :                 *last_status = now;
    1039            0 :         }
    1040              : 
    1041            0 :         return true;
    1042            0 : }
    1043              : 
    1044              : /*
    1045              :  * Process WALData message.
    1046              :  */
    1047              : static bool
    1048            0 : ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1049              :                                   XLogRecPtr *blockpos)
    1050              : {
    1051            0 :         int                     xlogoff;
    1052            0 :         int                     bytes_left;
    1053            0 :         int                     bytes_written;
    1054            0 :         int                     hdr_len;
    1055              : 
    1056              :         /*
    1057              :          * Once we've decided we don't want to receive any more, just ignore any
    1058              :          * subsequent WALData messages.
    1059              :          */
    1060            0 :         if (!(still_sending))
    1061            0 :                 return true;
    1062              : 
    1063              :         /*
    1064              :          * Read the header of the WALData message, enclosed in the CopyData
    1065              :          * message. We only need the WAL location field (dataStart), the rest of
    1066              :          * the header is ignored.
    1067              :          */
    1068            0 :         hdr_len = 1;                            /* msgtype PqReplMsg_WALData */
    1069            0 :         hdr_len += 8;                           /* dataStart */
    1070            0 :         hdr_len += 8;                           /* walEnd */
    1071            0 :         hdr_len += 8;                           /* sendTime */
    1072            0 :         if (len < hdr_len)
    1073              :         {
    1074            0 :                 pg_log_error("streaming header too small: %d", len);
    1075            0 :                 return false;
    1076              :         }
    1077            0 :         *blockpos = fe_recvint64(&copybuf[1]);
    1078              : 
    1079              :         /* Extract WAL location for this block */
    1080            0 :         xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
    1081              : 
    1082              :         /*
    1083              :          * Verify that the initial location in the stream matches where we think
    1084              :          * we are.
    1085              :          */
    1086            0 :         if (walfile == NULL)
    1087              :         {
    1088              :                 /* No file open yet */
    1089            0 :                 if (xlogoff != 0)
    1090              :                 {
    1091            0 :                         pg_log_error("received write-ahead log record for offset %u with no file open",
    1092              :                                                  xlogoff);
    1093            0 :                         return false;
    1094              :                 }
    1095            0 :         }
    1096              :         else
    1097              :         {
    1098              :                 /* More data in existing segment */
    1099            0 :                 if (walfile->currpos != xlogoff)
    1100              :                 {
    1101            0 :                         pg_log_error("got WAL data offset %08x, expected %08x",
    1102              :                                                  xlogoff, (int) walfile->currpos);
    1103            0 :                         return false;
    1104              :                 }
    1105              :         }
    1106              : 
    1107            0 :         bytes_left = len - hdr_len;
    1108            0 :         bytes_written = 0;
    1109              : 
    1110            0 :         while (bytes_left)
    1111              :         {
    1112            0 :                 int                     bytes_to_write;
    1113              : 
    1114              :                 /*
    1115              :                  * If crossing a WAL boundary, only write up until we reach wal
    1116              :                  * segment size.
    1117              :                  */
    1118            0 :                 if (xlogoff + bytes_left > WalSegSz)
    1119            0 :                         bytes_to_write = WalSegSz - xlogoff;
    1120              :                 else
    1121            0 :                         bytes_to_write = bytes_left;
    1122              : 
    1123            0 :                 if (walfile == NULL)
    1124              :                 {
    1125            0 :                         if (!open_walfile(stream, *blockpos))
    1126              :                         {
    1127              :                                 /* Error logged by open_walfile */
    1128            0 :                                 return false;
    1129              :                         }
    1130            0 :                 }
    1131              : 
    1132            0 :                 if (stream->walmethod->ops->write(walfile,
    1133            0 :                                                                                   copybuf + hdr_len + bytes_written,
    1134            0 :                                                                                   bytes_to_write) != bytes_to_write)
    1135              :                 {
    1136            0 :                         pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
    1137              :                                                  bytes_to_write, walfile->pathname,
    1138              :                                                  GetLastWalMethodError(stream->walmethod));
    1139            0 :                         return false;
    1140              :                 }
    1141              : 
    1142              :                 /* Write was successful, advance our position */
    1143            0 :                 bytes_written += bytes_to_write;
    1144            0 :                 bytes_left -= bytes_to_write;
    1145            0 :                 *blockpos += bytes_to_write;
    1146            0 :                 xlogoff += bytes_to_write;
    1147              : 
    1148              :                 /* Did we reach the end of a WAL segment? */
    1149            0 :                 if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
    1150              :                 {
    1151            0 :                         if (!close_walfile(stream, *blockpos))
    1152              :                                 /* Error message written in close_walfile() */
    1153            0 :                                 return false;
    1154              : 
    1155            0 :                         xlogoff = 0;
    1156              : 
    1157            0 :                         if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
    1158              :                         {
    1159            0 :                                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1160              :                                 {
    1161            0 :                                         pg_log_error("could not send copy-end packet: %s",
    1162              :                                                                  PQerrorMessage(conn));
    1163            0 :                                         return false;
    1164              :                                 }
    1165            0 :                                 still_sending = false;
    1166            0 :                                 return true;    /* ignore the rest of this WALData packet */
    1167              :                         }
    1168            0 :                 }
    1169            0 :         }
    1170              :         /* No more data left to write, receive next copy packet */
    1171              : 
    1172            0 :         return true;
    1173            0 : }
    1174              : 
    1175              : /*
    1176              :  * Handle end of the copy stream.
    1177              :  */
    1178              : static PGresult *
    1179            0 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
    1180              :                                           XLogRecPtr blockpos, XLogRecPtr *stoppos)
    1181              : {
    1182            0 :         PGresult   *res = PQgetResult(conn);
    1183              : 
    1184              :         /*
    1185              :          * The server closed its end of the copy stream.  If we haven't closed
    1186              :          * ours already, we need to do so now, unless the server threw an error,
    1187              :          * in which case we don't.
    1188              :          */
    1189            0 :         if (still_sending)
    1190              :         {
    1191            0 :                 if (!close_walfile(stream, blockpos))
    1192              :                 {
    1193              :                         /* Error message written in close_walfile() */
    1194            0 :                         PQclear(res);
    1195            0 :                         return NULL;
    1196              :                 }
    1197            0 :                 if (PQresultStatus(res) == PGRES_COPY_IN)
    1198              :                 {
    1199            0 :                         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1200              :                         {
    1201            0 :                                 pg_log_error("could not send copy-end packet: %s",
    1202              :                                                          PQerrorMessage(conn));
    1203            0 :                                 PQclear(res);
    1204            0 :                                 return NULL;
    1205              :                         }
    1206            0 :                         res = PQgetResult(conn);
    1207            0 :                 }
    1208            0 :                 still_sending = false;
    1209            0 :         }
    1210            0 :         *stoppos = blockpos;
    1211            0 :         return res;
    1212            0 : }
    1213              : 
    1214              : /*
    1215              :  * Check if we should continue streaming, or abort at this point.
    1216              :  */
    1217              : static bool
    1218            0 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
    1219              : {
    1220            0 :         if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    1221              :         {
    1222            0 :                 if (!close_walfile(stream, blockpos))
    1223              :                 {
    1224              :                         /* Potential error message is written by close_walfile */
    1225            0 :                         return false;
    1226              :                 }
    1227            0 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1228              :                 {
    1229            0 :                         pg_log_error("could not send copy-end packet: %s",
    1230              :                                                  PQerrorMessage(conn));
    1231            0 :                         return false;
    1232              :                 }
    1233            0 :                 still_sending = false;
    1234            0 :         }
    1235              : 
    1236            0 :         return true;
    1237            0 : }
    1238              : 
    1239              : /*
    1240              :  * Calculate how long send/receive loops should sleep
    1241              :  */
    1242              : static long
    1243            0 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
    1244              :                                                          TimestampTz last_status)
    1245              : {
    1246            0 :         TimestampTz status_targettime = 0;
    1247            0 :         long            sleeptime;
    1248              : 
    1249            0 :         if (standby_message_timeout && still_sending)
    1250            0 :                 status_targettime = last_status +
    1251            0 :                         (standby_message_timeout - 1) * ((int64) 1000);
    1252              : 
    1253            0 :         if (status_targettime > 0)
    1254              :         {
    1255            0 :                 long            secs;
    1256            0 :                 int                     usecs;
    1257              : 
    1258            0 :                 feTimestampDifference(now,
    1259            0 :                                                           status_targettime,
    1260              :                                                           &secs,
    1261              :                                                           &usecs);
    1262              :                 /* Always sleep at least 1 sec */
    1263            0 :                 if (secs <= 0)
    1264              :                 {
    1265            0 :                         secs = 1;
    1266            0 :                         usecs = 0;
    1267            0 :                 }
    1268              : 
    1269            0 :                 sleeptime = secs * 1000 + usecs / 1000;
    1270            0 :         }
    1271              :         else
    1272            0 :                 sleeptime = -1;
    1273              : 
    1274            0 :         return sleeptime;
    1275            0 : }
        

Generated by: LCOV version 2.3.2-1