Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "common/file_perm.h"
22 : #include "common/logging.h"
23 : #include "fe_utils/option_utils.h"
24 : #include "getopt_long.h"
25 : #include "libpq-fe.h"
26 : #include "libpq/pqsignal.h"
27 : #include "libpq/protocol.h"
28 : #include "pqexpbuffer.h"
29 : #include "streamutil.h"
30 :
31 : /* Time to sleep between reconnection attempts */
32 : #define RECONNECT_SLEEP_TIME 5
33 :
34 : typedef enum
35 : {
36 : STREAM_STOP_NONE,
37 : STREAM_STOP_END_OF_WAL,
38 : STREAM_STOP_KEEPALIVE,
39 : STREAM_STOP_SIGNAL
40 : } StreamStopReason;
41 :
42 : /* Global Options */
43 : static char *outfile = NULL;
44 : static int verbose = 0;
45 : static bool two_phase = false; /* enable-two-phase option */
46 : static bool failover = false; /* enable-failover option */
47 : static int noloop = 0;
48 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 : static XLogRecPtr startpos = InvalidXLogRecPtr;
51 : static XLogRecPtr endpos = InvalidXLogRecPtr;
52 : static bool do_create_slot = false;
53 : static bool slot_exists_ok = false;
54 : static bool do_start_slot = false;
55 : static bool do_drop_slot = false;
56 : static char *replication_slot = NULL;
57 :
58 : /* filled pairwise with option, value. value may be NULL */
59 : static char **options;
60 : static size_t noptions = 0;
61 : static const char *plugin = "test_decoding";
62 :
63 : /* Global State */
64 : static int outfd = -1;
65 : static volatile sig_atomic_t time_to_abort = false;
66 : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 : static volatile sig_atomic_t output_reopen = false;
68 : static bool output_isfile;
69 : static TimestampTz output_last_fsync = -1;
70 : static bool output_needs_fsync = false;
71 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
72 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
73 :
74 : static void usage(void);
75 : static void StreamLogicalLog(void);
76 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
77 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
78 : StreamStopReason reason,
79 : XLogRecPtr lsn);
80 :
81 : static void
82 0 : usage(void)
83 : {
84 0 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 : progname);
86 0 : printf(_("Usage:\n"));
87 0 : printf(_(" %s [OPTION]...\n"), progname);
88 0 : printf(_("\nAction to be performed:\n"));
89 0 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 0 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 0 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
92 0 : printf(_("\nOptions:\n"));
93 0 : printf(_(" --enable-failover enable replication slot synchronization to standby servers when\n"
94 : " creating a replication slot\n"));
95 0 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
96 0 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
97 0 : printf(_(" -F --fsync-interval=SECS\n"
98 : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
99 0 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
100 0 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
101 0 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
102 0 : printf(_(" -o, --option=NAME[=VALUE]\n"
103 : " pass option NAME with optional value VALUE to the\n"
104 : " output plugin\n"));
105 0 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
106 0 : printf(_(" -s, --status-interval=SECS\n"
107 : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
108 0 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
109 0 : printf(_(" -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
110 0 : printf(_(" --two-phase (same as --enable-two-phase, deprecated)\n"));
111 0 : printf(_(" -v, --verbose output verbose messages\n"));
112 0 : printf(_(" -V, --version output version information, then exit\n"));
113 0 : printf(_(" -?, --help show this help, then exit\n"));
114 0 : printf(_("\nConnection options:\n"));
115 0 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
116 0 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
117 0 : printf(_(" -p, --port=PORT database server port number\n"));
118 0 : printf(_(" -U, --username=NAME connect as specified database user\n"));
119 0 : printf(_(" -w, --no-password never prompt for password\n"));
120 0 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
121 0 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
122 0 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
123 0 : }
124 :
125 : /*
126 : * Send a Standby Status Update message to server.
127 : */
128 : static bool
129 0 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
130 : {
131 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
132 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
133 :
134 0 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
135 0 : int len = 0;
136 :
137 : /*
138 : * we normally don't want to send superfluous feedback, but if it's
139 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
140 : * us.
141 : */
142 0 : if (!force &&
143 0 : last_written_lsn == output_written_lsn &&
144 0 : last_fsync_lsn == output_fsync_lsn)
145 0 : return true;
146 :
147 0 : if (verbose)
148 0 : pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
149 : LSN_FORMAT_ARGS(output_written_lsn),
150 : LSN_FORMAT_ARGS(output_fsync_lsn),
151 : replication_slot);
152 :
153 0 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
154 0 : len += 1;
155 0 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
156 0 : len += 8;
157 0 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
158 0 : len += 8;
159 0 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
160 0 : len += 8;
161 0 : fe_sendint64(now, &replybuf[len]); /* sendTime */
162 0 : len += 8;
163 0 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
164 0 : len += 1;
165 :
166 0 : startpos = output_written_lsn;
167 0 : last_written_lsn = output_written_lsn;
168 0 : last_fsync_lsn = output_fsync_lsn;
169 :
170 0 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
171 : {
172 0 : pg_log_error("could not send feedback packet: %s",
173 : PQerrorMessage(conn));
174 0 : return false;
175 : }
176 :
177 0 : return true;
178 0 : }
179 :
180 : static void
181 0 : disconnect_atexit(void)
182 : {
183 0 : if (conn != NULL)
184 0 : PQfinish(conn);
185 0 : }
186 :
187 : static void
188 0 : OutputFsync(TimestampTz now)
189 : {
190 0 : output_last_fsync = now;
191 :
192 0 : output_fsync_lsn = output_written_lsn;
193 :
194 : /*
195 : * Save the last flushed position as the replication start point. On
196 : * reconnect, replication resumes from there to avoid re-sending flushed
197 : * data.
198 : */
199 0 : startpos = output_fsync_lsn;
200 :
201 0 : if (fsync_interval <= 0)
202 0 : return;
203 :
204 0 : if (!output_needs_fsync)
205 0 : return;
206 :
207 0 : output_needs_fsync = false;
208 :
209 : /* can only fsync if it's a regular file */
210 0 : if (!output_isfile)
211 0 : return;
212 :
213 0 : if (fsync(outfd) != 0)
214 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
215 0 : }
216 :
217 : /*
218 : * Start the log streaming
219 : */
220 : static void
221 0 : StreamLogicalLog(void)
222 : {
223 0 : PGresult *res;
224 0 : char *copybuf = NULL;
225 0 : TimestampTz last_status = -1;
226 0 : int i;
227 0 : PQExpBuffer query;
228 0 : XLogRecPtr cur_record_lsn;
229 :
230 0 : cur_record_lsn = InvalidXLogRecPtr;
231 :
232 : /*
233 : * Connect in replication mode to the server
234 : */
235 0 : if (!conn)
236 0 : conn = GetConnection();
237 0 : if (!conn)
238 : /* Error message already written in GetConnection() */
239 0 : return;
240 :
241 : /*
242 : * Start the replication
243 : */
244 0 : if (verbose)
245 0 : pg_log_info("starting log streaming at %X/%08X (slot %s)",
246 : LSN_FORMAT_ARGS(startpos),
247 : replication_slot);
248 :
249 : /* Initiate the replication stream at specified location */
250 0 : query = createPQExpBuffer();
251 0 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
252 0 : replication_slot, LSN_FORMAT_ARGS(startpos));
253 :
254 : /* print options if there are any */
255 0 : if (noptions)
256 0 : appendPQExpBufferStr(query, " (");
257 :
258 0 : for (i = 0; i < noptions; i++)
259 : {
260 : /* separator */
261 0 : if (i > 0)
262 0 : appendPQExpBufferStr(query, ", ");
263 :
264 : /* write option name */
265 0 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
266 :
267 : /* write option value if specified */
268 0 : if (options[(i * 2) + 1] != NULL)
269 0 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
270 0 : }
271 :
272 0 : if (noptions)
273 0 : appendPQExpBufferChar(query, ')');
274 :
275 0 : res = PQexec(conn, query->data);
276 0 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
277 : {
278 0 : pg_log_error("could not send replication command \"%s\": %s",
279 : query->data, PQresultErrorMessage(res));
280 0 : PQclear(res);
281 0 : goto error;
282 : }
283 0 : PQclear(res);
284 0 : resetPQExpBuffer(query);
285 :
286 0 : if (verbose)
287 0 : pg_log_info("streaming initiated");
288 :
289 0 : while (!time_to_abort)
290 : {
291 0 : int r;
292 0 : int bytes_left;
293 0 : int bytes_written;
294 0 : TimestampTz now;
295 0 : int hdr_len;
296 :
297 0 : cur_record_lsn = InvalidXLogRecPtr;
298 :
299 0 : if (copybuf != NULL)
300 : {
301 0 : PQfreemem(copybuf);
302 0 : copybuf = NULL;
303 0 : }
304 :
305 : /*
306 : * Potentially send a status message to the primary.
307 : */
308 0 : now = feGetCurrentTimestamp();
309 :
310 0 : if (outfd != -1 &&
311 0 : feTimestampDifferenceExceeds(output_last_fsync, now,
312 0 : fsync_interval))
313 0 : OutputFsync(now);
314 :
315 0 : if (standby_message_timeout > 0 &&
316 0 : feTimestampDifferenceExceeds(last_status, now,
317 0 : standby_message_timeout))
318 : {
319 : /* Time to send feedback! */
320 0 : if (!sendFeedback(conn, now, true, false))
321 0 : goto error;
322 :
323 0 : last_status = now;
324 0 : }
325 :
326 : /* got SIGHUP, close output file */
327 0 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
328 : {
329 0 : now = feGetCurrentTimestamp();
330 0 : OutputFsync(now);
331 0 : close(outfd);
332 0 : outfd = -1;
333 0 : }
334 0 : output_reopen = false;
335 :
336 : /* open the output file, if not open yet */
337 0 : if (outfd == -1)
338 : {
339 0 : struct stat statbuf;
340 :
341 0 : if (strcmp(outfile, "-") == 0)
342 0 : outfd = fileno(stdout);
343 : else
344 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
345 : S_IRUSR | S_IWUSR);
346 0 : if (outfd == -1)
347 : {
348 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
349 0 : goto error;
350 : }
351 :
352 0 : if (fstat(outfd, &statbuf) != 0)
353 : {
354 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
355 0 : goto error;
356 : }
357 :
358 0 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
359 0 : }
360 :
361 0 : r = PQgetCopyData(conn, ©buf, 1);
362 0 : if (r == 0)
363 : {
364 : /*
365 : * In async mode, and no data available. We block on reading but
366 : * not more than the specified timeout, so that we can send a
367 : * response back to the client.
368 : */
369 0 : fd_set input_mask;
370 0 : TimestampTz message_target = 0;
371 0 : TimestampTz fsync_target = 0;
372 0 : struct timeval timeout;
373 0 : struct timeval *timeoutptr = NULL;
374 :
375 0 : if (PQsocket(conn) < 0)
376 : {
377 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
378 0 : goto error;
379 : }
380 :
381 0 : FD_ZERO(&input_mask);
382 0 : FD_SET(PQsocket(conn), &input_mask);
383 :
384 : /* Compute when we need to wakeup to send a keepalive message. */
385 0 : if (standby_message_timeout)
386 0 : message_target = last_status + (standby_message_timeout - 1) *
387 : ((int64) 1000);
388 :
389 : /* Compute when we need to wakeup to fsync the output file. */
390 0 : if (fsync_interval > 0 && output_needs_fsync)
391 0 : fsync_target = output_last_fsync + (fsync_interval - 1) *
392 : ((int64) 1000);
393 :
394 : /* Now compute when to wakeup. */
395 0 : if (message_target > 0 || fsync_target > 0)
396 : {
397 0 : TimestampTz targettime;
398 0 : long secs;
399 0 : int usecs;
400 :
401 0 : targettime = message_target;
402 :
403 0 : if (fsync_target > 0 && fsync_target < targettime)
404 0 : targettime = fsync_target;
405 :
406 0 : feTimestampDifference(now,
407 0 : targettime,
408 : &secs,
409 : &usecs);
410 0 : if (secs <= 0)
411 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
412 : else
413 0 : timeout.tv_sec = secs;
414 0 : timeout.tv_usec = usecs;
415 0 : timeoutptr = &timeout;
416 0 : }
417 :
418 0 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
419 0 : if (r == 0 || (r < 0 && errno == EINTR))
420 : {
421 : /*
422 : * Got a timeout or signal. Continue the loop and either
423 : * deliver a status packet to the server or just go back into
424 : * blocking.
425 : */
426 0 : continue;
427 : }
428 0 : else if (r < 0)
429 : {
430 0 : pg_log_error("%s() failed: %m", "select");
431 0 : goto error;
432 : }
433 :
434 : /* Else there is actually data on the socket */
435 0 : if (PQconsumeInput(conn) == 0)
436 : {
437 0 : pg_log_error("could not receive data from WAL stream: %s",
438 : PQerrorMessage(conn));
439 0 : goto error;
440 : }
441 0 : continue;
442 0 : }
443 :
444 : /* End of copy stream */
445 0 : if (r == -1)
446 0 : break;
447 :
448 : /* Failure while reading the copy stream */
449 0 : if (r == -2)
450 : {
451 0 : pg_log_error("could not read COPY data: %s",
452 : PQerrorMessage(conn));
453 0 : goto error;
454 : }
455 :
456 : /* Check the message type. */
457 0 : if (copybuf[0] == PqReplMsg_Keepalive)
458 : {
459 0 : int pos;
460 0 : bool replyRequested;
461 0 : XLogRecPtr walEnd;
462 0 : bool endposReached = false;
463 :
464 : /*
465 : * Parse the keepalive message, enclosed in the CopyData message.
466 : * We just check if the server requested a reply, and ignore the
467 : * rest.
468 : */
469 0 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
470 0 : walEnd = fe_recvint64(©buf[pos]);
471 0 : output_written_lsn = Max(walEnd, output_written_lsn);
472 :
473 0 : pos += 8; /* read walEnd */
474 :
475 0 : pos += 8; /* skip sendTime */
476 :
477 0 : if (r < pos + 1)
478 : {
479 0 : pg_log_error("streaming header too small: %d", r);
480 0 : goto error;
481 : }
482 0 : replyRequested = copybuf[pos];
483 :
484 0 : if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
485 : {
486 : /*
487 : * If there's nothing to read on the socket until a keepalive
488 : * we know that the server has nothing to send us; and if
489 : * walEnd has passed endpos, we know nothing else can have
490 : * committed before endpos. So we can bail out now.
491 : */
492 0 : endposReached = true;
493 0 : }
494 :
495 : /* Send a reply, if necessary */
496 0 : if (replyRequested || endposReached)
497 : {
498 0 : if (!flushAndSendFeedback(conn, &now))
499 0 : goto error;
500 0 : last_status = now;
501 0 : }
502 :
503 0 : if (endposReached)
504 : {
505 0 : stop_reason = STREAM_STOP_KEEPALIVE;
506 0 : time_to_abort = true;
507 0 : break;
508 : }
509 :
510 0 : continue;
511 0 : }
512 0 : else if (copybuf[0] != PqReplMsg_WALData)
513 : {
514 0 : pg_log_error("unrecognized streaming header: \"%c\"",
515 : copybuf[0]);
516 0 : goto error;
517 : }
518 :
519 : /*
520 : * Read the header of the WALData message, enclosed in the CopyData
521 : * message. We only need the WAL location field (dataStart), the rest
522 : * of the header is ignored.
523 : */
524 0 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
525 0 : hdr_len += 8; /* dataStart */
526 0 : hdr_len += 8; /* walEnd */
527 0 : hdr_len += 8; /* sendTime */
528 0 : if (r < hdr_len + 1)
529 : {
530 0 : pg_log_error("streaming header too small: %d", r);
531 0 : goto error;
532 : }
533 :
534 : /* Extract WAL location for this block */
535 0 : cur_record_lsn = fe_recvint64(©buf[1]);
536 :
537 0 : if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
538 : {
539 : /*
540 : * We've read past our endpoint, so prepare to go away being
541 : * cautious about what happens to our output data.
542 : */
543 0 : if (!flushAndSendFeedback(conn, &now))
544 0 : goto error;
545 0 : stop_reason = STREAM_STOP_END_OF_WAL;
546 0 : time_to_abort = true;
547 0 : break;
548 : }
549 :
550 0 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
551 :
552 0 : bytes_left = r - hdr_len;
553 0 : bytes_written = 0;
554 :
555 : /* signal that a fsync is needed */
556 0 : output_needs_fsync = true;
557 :
558 0 : while (bytes_left)
559 : {
560 0 : int ret;
561 :
562 0 : ret = write(outfd,
563 0 : copybuf + hdr_len + bytes_written,
564 0 : bytes_left);
565 :
566 0 : if (ret < 0)
567 : {
568 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
569 : bytes_left, outfile);
570 0 : goto error;
571 : }
572 :
573 : /* Write was successful, advance our position */
574 0 : bytes_written += ret;
575 0 : bytes_left -= ret;
576 0 : }
577 :
578 0 : if (write(outfd, "\n", 1) != 1)
579 : {
580 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
581 : 1, outfile);
582 0 : goto error;
583 : }
584 :
585 0 : if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
586 : {
587 : /* endpos was exactly the record we just processed, we're done */
588 0 : if (!flushAndSendFeedback(conn, &now))
589 0 : goto error;
590 0 : stop_reason = STREAM_STOP_END_OF_WAL;
591 0 : time_to_abort = true;
592 0 : break;
593 : }
594 0 : }
595 :
596 : /* Clean up connection state if stream has been aborted */
597 0 : if (time_to_abort)
598 0 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
599 :
600 0 : res = PQgetResult(conn);
601 0 : if (PQresultStatus(res) == PGRES_COPY_OUT)
602 : {
603 0 : PQclear(res);
604 :
605 : /*
606 : * We're doing a client-initiated clean exit and have sent CopyDone to
607 : * the server. Drain any messages, so we don't miss a last-minute
608 : * ErrorResponse. The walsender stops generating WALData records once
609 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
610 : * it's too late for sendFeedback(), even if this were to take a long
611 : * time. Hence, use synchronous-mode PQgetCopyData().
612 : */
613 0 : while (1)
614 : {
615 0 : int r;
616 :
617 0 : if (copybuf != NULL)
618 : {
619 0 : PQfreemem(copybuf);
620 0 : copybuf = NULL;
621 0 : }
622 0 : r = PQgetCopyData(conn, ©buf, 0);
623 0 : if (r == -1)
624 0 : break;
625 0 : if (r == -2)
626 : {
627 0 : pg_log_error("could not read COPY data: %s",
628 : PQerrorMessage(conn));
629 0 : time_to_abort = false; /* unclean exit */
630 0 : goto error;
631 : }
632 0 : }
633 :
634 0 : res = PQgetResult(conn);
635 0 : }
636 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
637 : {
638 0 : pg_log_error("unexpected termination of replication stream: %s",
639 : PQresultErrorMessage(res));
640 0 : PQclear(res);
641 0 : goto error;
642 : }
643 0 : PQclear(res);
644 :
645 0 : if (outfd != -1 && strcmp(outfile, "-") != 0)
646 : {
647 0 : TimestampTz t = feGetCurrentTimestamp();
648 :
649 0 : OutputFsync(t);
650 0 : if (close(outfd) != 0)
651 0 : pg_log_error("could not close file \"%s\": %m", outfile);
652 0 : }
653 0 : outfd = -1;
654 : error:
655 0 : if (copybuf != NULL)
656 : {
657 0 : PQfreemem(copybuf);
658 0 : copybuf = NULL;
659 0 : }
660 0 : destroyPQExpBuffer(query);
661 0 : PQfinish(conn);
662 0 : conn = NULL;
663 0 : }
664 :
665 : /*
666 : * Unfortunately we can't do sensible signal handling on windows...
667 : */
668 : #ifndef WIN32
669 :
670 : /*
671 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
672 : * possible moment.
673 : */
674 : static void
675 0 : sigexit_handler(SIGNAL_ARGS)
676 : {
677 0 : stop_reason = STREAM_STOP_SIGNAL;
678 0 : time_to_abort = true;
679 0 : }
680 :
681 : /*
682 : * Trigger the output file to be reopened.
683 : */
684 : static void
685 0 : sighup_handler(SIGNAL_ARGS)
686 : {
687 0 : output_reopen = true;
688 0 : }
689 : #endif
690 :
691 :
692 : int
693 0 : main(int argc, char **argv)
694 : {
695 : static struct option long_options[] = {
696 : /* general options */
697 : {"file", required_argument, NULL, 'f'},
698 : {"fsync-interval", required_argument, NULL, 'F'},
699 : {"no-loop", no_argument, NULL, 'n'},
700 : {"enable-failover", no_argument, NULL, 5},
701 : {"enable-two-phase", no_argument, NULL, 't'},
702 : {"two-phase", no_argument, NULL, 't'}, /* deprecated */
703 : {"verbose", no_argument, NULL, 'v'},
704 : {"version", no_argument, NULL, 'V'},
705 : {"help", no_argument, NULL, '?'},
706 : /* connection options */
707 : {"dbname", required_argument, NULL, 'd'},
708 : {"host", required_argument, NULL, 'h'},
709 : {"port", required_argument, NULL, 'p'},
710 : {"username", required_argument, NULL, 'U'},
711 : {"no-password", no_argument, NULL, 'w'},
712 : {"password", no_argument, NULL, 'W'},
713 : /* replication options */
714 : {"startpos", required_argument, NULL, 'I'},
715 : {"endpos", required_argument, NULL, 'E'},
716 : {"option", required_argument, NULL, 'o'},
717 : {"plugin", required_argument, NULL, 'P'},
718 : {"status-interval", required_argument, NULL, 's'},
719 : {"slot", required_argument, NULL, 'S'},
720 : /* action */
721 : {"create-slot", no_argument, NULL, 1},
722 : {"start", no_argument, NULL, 2},
723 : {"drop-slot", no_argument, NULL, 3},
724 : {"if-not-exists", no_argument, NULL, 4},
725 : {NULL, 0, NULL, 0}
726 : };
727 0 : int c;
728 0 : int option_index;
729 0 : uint32 hi,
730 : lo;
731 0 : char *db_name;
732 :
733 0 : pg_logging_init(argv[0]);
734 0 : progname = get_progname(argv[0]);
735 0 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
736 :
737 0 : if (argc > 1)
738 : {
739 0 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
740 : {
741 0 : usage();
742 0 : exit(0);
743 : }
744 0 : else if (strcmp(argv[1], "-V") == 0 ||
745 0 : strcmp(argv[1], "--version") == 0)
746 : {
747 0 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
748 0 : exit(0);
749 : }
750 0 : }
751 :
752 0 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
753 0 : long_options, &option_index)) != -1)
754 : {
755 0 : switch (c)
756 : {
757 : /* general options */
758 : case 'f':
759 0 : outfile = pg_strdup(optarg);
760 0 : break;
761 : case 'F':
762 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
763 : INT_MAX / 1000,
764 : &fsync_interval))
765 0 : exit(1);
766 0 : fsync_interval *= 1000;
767 0 : break;
768 : case 'n':
769 0 : noloop = 1;
770 0 : break;
771 : case 't':
772 0 : two_phase = true;
773 0 : break;
774 : case 'v':
775 0 : verbose++;
776 0 : break;
777 : case 5:
778 0 : failover = true;
779 0 : break;
780 : /* connection options */
781 : case 'd':
782 0 : dbname = pg_strdup(optarg);
783 0 : break;
784 : case 'h':
785 0 : dbhost = pg_strdup(optarg);
786 0 : break;
787 : case 'p':
788 0 : dbport = pg_strdup(optarg);
789 0 : break;
790 : case 'U':
791 0 : dbuser = pg_strdup(optarg);
792 0 : break;
793 : case 'w':
794 0 : dbgetpassword = -1;
795 0 : break;
796 : case 'W':
797 0 : dbgetpassword = 1;
798 0 : break;
799 : /* replication options */
800 : case 'I':
801 0 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
802 0 : pg_fatal("could not parse start position \"%s\"", optarg);
803 0 : startpos = ((uint64) hi) << 32 | lo;
804 0 : break;
805 : case 'E':
806 0 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
807 0 : pg_fatal("could not parse end position \"%s\"", optarg);
808 0 : endpos = ((uint64) hi) << 32 | lo;
809 0 : break;
810 : case 'o':
811 : {
812 0 : char *data = pg_strdup(optarg);
813 0 : char *val = strchr(data, '=');
814 :
815 0 : if (val != NULL)
816 : {
817 : /* remove =; separate data from val */
818 0 : *val = '\0';
819 0 : val++;
820 0 : }
821 :
822 0 : noptions += 1;
823 0 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
824 :
825 0 : options[(noptions - 1) * 2] = data;
826 0 : options[(noptions - 1) * 2 + 1] = val;
827 0 : }
828 :
829 0 : break;
830 : case 'P':
831 0 : plugin = pg_strdup(optarg);
832 0 : break;
833 : case 's':
834 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
835 : INT_MAX / 1000,
836 : &standby_message_timeout))
837 0 : exit(1);
838 0 : standby_message_timeout *= 1000;
839 0 : break;
840 : case 'S':
841 0 : replication_slot = pg_strdup(optarg);
842 0 : break;
843 : /* action */
844 : case 1:
845 0 : do_create_slot = true;
846 0 : break;
847 : case 2:
848 0 : do_start_slot = true;
849 0 : break;
850 : case 3:
851 0 : do_drop_slot = true;
852 0 : break;
853 : case 4:
854 0 : slot_exists_ok = true;
855 0 : break;
856 :
857 : default:
858 : /* getopt_long already emitted a complaint */
859 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
860 0 : exit(1);
861 : }
862 : }
863 :
864 : /*
865 : * Any non-option arguments?
866 : */
867 0 : if (optind < argc)
868 : {
869 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
870 : argv[optind]);
871 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
872 0 : exit(1);
873 : }
874 :
875 : /*
876 : * Required arguments
877 : */
878 0 : if (replication_slot == NULL)
879 : {
880 0 : pg_log_error("no slot specified");
881 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
882 0 : exit(1);
883 : }
884 :
885 0 : if (do_start_slot && outfile == NULL)
886 : {
887 0 : pg_log_error("no target file specified");
888 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
889 0 : exit(1);
890 : }
891 :
892 0 : if (!do_drop_slot && dbname == NULL)
893 : {
894 0 : pg_log_error("no database specified");
895 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
896 0 : exit(1);
897 : }
898 :
899 0 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
900 : {
901 0 : pg_log_error("at least one action needs to be specified");
902 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
903 0 : exit(1);
904 : }
905 :
906 0 : if (do_drop_slot && (do_create_slot || do_start_slot))
907 : {
908 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
909 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
910 0 : exit(1);
911 : }
912 :
913 0 : if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
914 : {
915 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
916 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
917 0 : exit(1);
918 : }
919 :
920 0 : if (XLogRecPtrIsValid(endpos) && !do_start_slot)
921 : {
922 0 : pg_log_error("--endpos may only be specified with --start");
923 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
924 0 : exit(1);
925 : }
926 :
927 0 : if (!do_create_slot)
928 : {
929 0 : if (two_phase)
930 : {
931 0 : pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
932 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
933 0 : exit(1);
934 : }
935 :
936 0 : if (failover)
937 : {
938 0 : pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
939 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
940 0 : exit(1);
941 : }
942 0 : }
943 :
944 : /*
945 : * Obtain a connection to server. Notably, if we need a password, we want
946 : * to collect it from the user immediately.
947 : */
948 0 : conn = GetConnection();
949 0 : if (!conn)
950 : /* Error message already written in GetConnection() */
951 0 : exit(1);
952 0 : atexit(disconnect_atexit);
953 :
954 : /*
955 : * Trap signals. (Don't do this until after the initial password prompt,
956 : * if one is needed, in GetConnection.)
957 : */
958 : #ifndef WIN32
959 0 : pqsignal(SIGINT, sigexit_handler);
960 0 : pqsignal(SIGTERM, sigexit_handler);
961 0 : pqsignal(SIGHUP, sighup_handler);
962 : #endif
963 :
964 : /*
965 : * Run IDENTIFY_SYSTEM to check the connection type for each action.
966 : * --create-slot and --start actions require a database-specific
967 : * replication connection because they handle logical replication slots.
968 : * --drop-slot can remove replication slots from any replication
969 : * connection without this restriction.
970 : */
971 0 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
972 0 : exit(1);
973 :
974 0 : if (!do_drop_slot && db_name == NULL)
975 0 : pg_fatal("could not establish database-specific replication connection");
976 :
977 : /*
978 : * Set umask so that directories/files are created with the same
979 : * permissions as directories/files in the source data directory.
980 : *
981 : * pg_mode_mask is set to owner-only by default and then updated in
982 : * GetConnection() where we get the mode from the server-side with
983 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
984 : */
985 0 : umask(pg_mode_mask);
986 :
987 : /* Drop a replication slot. */
988 0 : if (do_drop_slot)
989 : {
990 0 : if (verbose)
991 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
992 :
993 0 : if (!DropReplicationSlot(conn, replication_slot))
994 0 : exit(1);
995 0 : }
996 :
997 : /* Create a replication slot. */
998 0 : if (do_create_slot)
999 : {
1000 0 : if (verbose)
1001 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
1002 :
1003 0 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
1004 0 : false, false, slot_exists_ok, two_phase,
1005 0 : failover))
1006 0 : exit(1);
1007 0 : startpos = InvalidXLogRecPtr;
1008 0 : }
1009 :
1010 0 : if (!do_start_slot)
1011 0 : exit(0);
1012 :
1013 : /* Stream loop */
1014 0 : while (true)
1015 : {
1016 0 : StreamLogicalLog();
1017 0 : if (time_to_abort)
1018 : {
1019 : /*
1020 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1021 : * not an error, so exit without an errorcode.
1022 : */
1023 0 : exit(0);
1024 : }
1025 :
1026 : /*
1027 : * Ensure all written data is flushed to disk before exiting or
1028 : * starting a new replication.
1029 : */
1030 0 : if (outfd != -1)
1031 0 : OutputFsync(feGetCurrentTimestamp());
1032 :
1033 0 : if (noloop)
1034 : {
1035 0 : pg_fatal("disconnected");
1036 0 : }
1037 : else
1038 : {
1039 : /* translator: check source for value for %d */
1040 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1041 : RECONNECT_SLEEP_TIME);
1042 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1043 : }
1044 : }
1045 : }
1046 :
1047 : /*
1048 : * Fsync our output data, and send a feedback message to the server. Returns
1049 : * true if successful, false otherwise.
1050 : *
1051 : * If successful, *now is updated to the current timestamp just before sending
1052 : * feedback.
1053 : */
1054 : static bool
1055 0 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1056 : {
1057 : /* flush data to disk, so that we send a recent flush pointer */
1058 0 : OutputFsync(*now);
1059 0 : *now = feGetCurrentTimestamp();
1060 0 : if (!sendFeedback(conn, *now, true, false))
1061 0 : return false;
1062 :
1063 0 : return true;
1064 0 : }
1065 :
1066 : /*
1067 : * Try to inform the server about our upcoming demise, but don't wait around or
1068 : * retry on failure.
1069 : */
1070 : static void
1071 0 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1072 : XLogRecPtr lsn)
1073 : {
1074 0 : (void) PQputCopyEnd(conn, NULL);
1075 0 : (void) PQflush(conn);
1076 :
1077 0 : if (verbose)
1078 : {
1079 0 : switch (reason)
1080 : {
1081 : case STREAM_STOP_SIGNAL:
1082 0 : pg_log_info("received interrupt signal, exiting");
1083 0 : break;
1084 : case STREAM_STOP_KEEPALIVE:
1085 0 : pg_log_info("end position %X/%08X reached by keepalive",
1086 : LSN_FORMAT_ARGS(endpos));
1087 0 : break;
1088 : case STREAM_STOP_END_OF_WAL:
1089 0 : Assert(XLogRecPtrIsValid(lsn));
1090 0 : pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
1091 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1092 0 : break;
1093 : case STREAM_STOP_NONE:
1094 0 : Assert(false);
1095 : break;
1096 : }
1097 0 : }
1098 0 : }
|