LCOV - code coverage report
Current view: top level - src/bin/pg_dump - parallel.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 427 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 32 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * parallel.c
       4              :  *
       5              :  *      Parallel support for pg_dump and pg_restore
       6              :  *
       7              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8              :  * Portions Copyright (c) 1994, Regents of the University of California
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *              src/bin/pg_dump/parallel.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : /*
      17              :  * Parallel operation works like this:
      18              :  *
      19              :  * The original, leader process calls ParallelBackupStart(), which forks off
      20              :  * the desired number of worker processes, which each enter WaitForCommands().
      21              :  *
      22              :  * The leader process dispatches an individual work item to one of the worker
      23              :  * processes in DispatchJobForTocEntry().  We send a command string such as
      24              :  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
      25              :  * The worker process receives and decodes the command and passes it to the
      26              :  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
      27              :  * which are routines of the current archive format.  That routine performs
      28              :  * the required action (dump or restore) and returns an integer status code.
      29              :  * This is passed back to the leader where we pass it to the
      30              :  * ParallelCompletionPtr callback function that was passed to
      31              :  * DispatchJobForTocEntry().  The callback function does state updating
      32              :  * for the leader control logic in pg_backup_archiver.c.
      33              :  *
      34              :  * In principle additional archive-format-specific information might be needed
      35              :  * in commands or worker status responses, but so far that hasn't proved
      36              :  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
      37              :  * data structures.  Remember that we have forked off the workers only after
      38              :  * we have read in the catalog.  That's why our worker processes can also
      39              :  * access the catalog information.  (In the Windows case, the workers are
      40              :  * threads in the same process.  To avoid problems, they work with cloned
      41              :  * copies of the Archive data structure; see RunWorker().)
      42              :  *
      43              :  * In the leader process, the workerStatus field for each worker has one of
      44              :  * the following values:
      45              :  *              WRKR_NOT_STARTED: we've not yet forked this worker
      46              :  *              WRKR_IDLE: it's waiting for a command
      47              :  *              WRKR_WORKING: it's working on a command
      48              :  *              WRKR_TERMINATED: process ended
      49              :  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
      50              :  * state, and must be NULL in other states.
      51              :  */
      52              : 
      53              : #include "postgres_fe.h"
      54              : 
      55              : #ifndef WIN32
      56              : #include <sys/select.h>
      57              : #include <sys/wait.h>
      58              : #include <signal.h>
      59              : #include <unistd.h>
      60              : #include <fcntl.h>
      61              : #endif
      62              : 
      63              : #include "fe_utils/string_utils.h"
      64              : #include "parallel.h"
      65              : #include "pg_backup_utils.h"
      66              : #ifdef WIN32
      67              : #include "port/pg_bswap.h"
      68              : #endif
      69              : 
      70              : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
      71              : #define PIPE_READ                                                       0
      72              : #define PIPE_WRITE                                                      1
      73              : 
      74              : #define NO_SLOT (-1)                    /* Failure result for GetIdleWorker() */
      75              : 
      76              : /* Worker process statuses */
      77              : typedef enum
      78              : {
      79              :         WRKR_NOT_STARTED = 0,
      80              :         WRKR_IDLE,
      81              :         WRKR_WORKING,
      82              :         WRKR_TERMINATED,
      83              : } T_WorkerStatus;
      84              : 
      85              : #define WORKER_IS_RUNNING(workerStatus) \
      86              :         ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
      87              : 
      88              : /*
      89              :  * Private per-parallel-worker state (typedef for this is in parallel.h).
      90              :  *
      91              :  * Much of this is valid only in the leader process (or, on Windows, should
      92              :  * be touched only by the leader thread).  But the AH field should be touched
      93              :  * only by workers.  The pipe descriptors are valid everywhere.
      94              :  */
      95              : struct ParallelSlot
      96              : {
      97              :         T_WorkerStatus workerStatus;    /* see enum above */
      98              : 
      99              :         /* These fields are valid if workerStatus == WRKR_WORKING: */
     100              :         ParallelCompletionPtr callback; /* function to call on completion */
     101              :         void       *callback_data;      /* passthrough data for it */
     102              : 
     103              :         ArchiveHandle *AH;                      /* Archive data worker is using */
     104              : 
     105              :         int                     pipeRead;               /* leader's end of the pipes */
     106              :         int                     pipeWrite;
     107              :         int                     pipeRevRead;    /* child's end of the pipes */
     108              :         int                     pipeRevWrite;
     109              : 
     110              :         /* Child process/thread identity info: */
     111              : #ifdef WIN32
     112              :         uintptr_t       hThread;
     113              :         unsigned int threadId;
     114              : #else
     115              :         pid_t           pid;
     116              : #endif
     117              : };
     118              : 
     119              : #ifdef WIN32
     120              : 
     121              : /*
     122              :  * Structure to hold info passed by _beginthreadex() to the function it calls
     123              :  * via its single allowed argument.
     124              :  */
     125              : typedef struct
     126              : {
     127              :         ArchiveHandle *AH;                      /* leader database connection */
     128              :         ParallelSlot *slot;                     /* this worker's parallel slot */
     129              : } WorkerInfo;
     130              : 
     131              : /* Windows implementation of pipe access */
     132              : static int      pgpipe(int handles[2]);
     133              : #define piperead(a,b,c)         recv(a,b,c,0)
     134              : #define pipewrite(a,b,c)        send(a,b,c,0)
     135              : 
     136              : #else                                                   /* !WIN32 */
     137              : 
     138              : /* Non-Windows implementation of pipe access */
     139              : #define pgpipe(a)                       pipe(a)
     140              : #define piperead(a,b,c)         read(a,b,c)
     141              : #define pipewrite(a,b,c)        write(a,b,c)
     142              : 
     143              : #endif                                                  /* WIN32 */
     144              : 
     145              : /*
     146              :  * State info for archive_close_connection() shutdown callback.
     147              :  */
     148              : typedef struct ShutdownInformation
     149              : {
     150              :         ParallelState *pstate;
     151              :         Archive    *AHX;
     152              : } ShutdownInformation;
     153              : 
     154              : static ShutdownInformation shutdown_info;
     155              : 
     156              : /*
     157              :  * State info for signal handling.
     158              :  * We assume signal_info initializes to zeroes.
     159              :  *
     160              :  * On Unix, myAH is the leader DB connection in the leader process, and the
     161              :  * worker's own connection in worker processes.  On Windows, we have only one
     162              :  * instance of signal_info, so myAH is the leader connection and the worker
     163              :  * connections must be dug out of pstate->parallelSlot[].
     164              :  */
     165              : typedef struct DumpSignalInformation
     166              : {
     167              :         ArchiveHandle *myAH;            /* database connection to issue cancel for */
     168              :         ParallelState *pstate;          /* parallel state, if any */
     169              :         bool            handler_set;    /* signal handler set up in this process? */
     170              : #ifndef WIN32
     171              :         bool            am_worker;              /* am I a worker process? */
     172              : #endif
     173              : } DumpSignalInformation;
     174              : 
     175              : static volatile DumpSignalInformation signal_info;
     176              : 
     177              : #ifdef WIN32
     178              : static CRITICAL_SECTION signal_info_lock;
     179              : #endif
     180              : 
     181              : /*
     182              :  * Write a simple string to stderr --- must be safe in a signal handler.
     183              :  * We ignore the write() result since there's not much we could do about it.
     184              :  * Certain compilers make that harder than it ought to be.
     185              :  */
     186              : #define write_stderr(str) \
     187              :         do { \
     188              :                 const char *str_ = (str); \
     189              :                 int             rc_; \
     190              :                 rc_ = write(fileno(stderr), str_, strlen(str_)); \
     191              :                 (void) rc_; \
     192              :         } while (0)
     193              : 
     194              : 
     195              : #ifdef WIN32
     196              : /* file-scope variables */
     197              : static DWORD tls_index;
     198              : 
     199              : /* globally visible variables (needed by exit_nicely) */
     200              : bool            parallel_init_done = false;
     201              : DWORD           mainThreadId;
     202              : #endif                                                  /* WIN32 */
     203              : 
     204              : /* Local function prototypes */
     205              : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
     206              : static void archive_close_connection(int code, void *arg);
     207              : static void ShutdownWorkersHard(ParallelState *pstate);
     208              : static void WaitForTerminatingWorkers(ParallelState *pstate);
     209              : static void set_cancel_handler(void);
     210              : static void set_cancel_pstate(ParallelState *pstate);
     211              : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
     212              : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
     213              : static int      GetIdleWorker(ParallelState *pstate);
     214              : static bool HasEveryWorkerTerminated(ParallelState *pstate);
     215              : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
     216              : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
     217              : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
     218              :                                                         bool do_wait);
     219              : static char *getMessageFromLeader(int pipefd[2]);
     220              : static void sendMessageToLeader(int pipefd[2], const char *str);
     221              : static int      select_loop(int maxFd, fd_set *workerset);
     222              : static char *getMessageFromWorker(ParallelState *pstate,
     223              :                                                                   bool do_wait, int *worker);
     224              : static void sendMessageToWorker(ParallelState *pstate,
     225              :                                                                 int worker, const char *str);
     226              : static char *readMessageFromPipe(int fd);
     227              : 
     228              : #define messageStartsWith(msg, prefix) \
     229              :         (strncmp(msg, prefix, strlen(prefix)) == 0)
     230              : 
     231              : 
     232              : /*
     233              :  * Initialize parallel dump support --- should be called early in process
     234              :  * startup.  (Currently, this is called whether or not we intend parallel
     235              :  * activity.)
     236              :  */
     237              : void
     238            0 : init_parallel_dump_utils(void)
     239              : {
     240              : #ifdef WIN32
     241              :         if (!parallel_init_done)
     242              :         {
     243              :                 WSADATA         wsaData;
     244              :                 int                     err;
     245              : 
     246              :                 /* Prepare for threaded operation */
     247              :                 tls_index = TlsAlloc();
     248              :                 mainThreadId = GetCurrentThreadId();
     249              : 
     250              :                 /* Initialize socket access */
     251              :                 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
     252              :                 if (err != 0)
     253              :                         pg_fatal("%s() failed: error code %d", "WSAStartup", err);
     254              : 
     255              :                 parallel_init_done = true;
     256              :         }
     257              : #endif
     258            0 : }
     259              : 
     260              : /*
     261              :  * Find the ParallelSlot for the current worker process or thread.
     262              :  *
     263              :  * Returns NULL if no matching slot is found (this implies we're the leader).
     264              :  */
     265              : static ParallelSlot *
     266            0 : GetMyPSlot(ParallelState *pstate)
     267              : {
     268            0 :         int                     i;
     269              : 
     270            0 :         for (i = 0; i < pstate->numWorkers; i++)
     271              :         {
     272              : #ifdef WIN32
     273              :                 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
     274              : #else
     275            0 :                 if (pstate->parallelSlot[i].pid == getpid())
     276              : #endif
     277            0 :                         return &(pstate->parallelSlot[i]);
     278            0 :         }
     279              : 
     280            0 :         return NULL;
     281            0 : }
     282              : 
     283              : /*
     284              :  * A thread-local version of getLocalPQExpBuffer().
     285              :  *
     286              :  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
     287              :  * thread, which is much better than one per fmtId/fmtQualifiedId call.
     288              :  */
     289              : #ifdef WIN32
     290              : static PQExpBuffer
     291              : getThreadLocalPQExpBuffer(void)
     292              : {
     293              :         /*
     294              :          * The Tls code goes awry if we use a static var, so we provide for both
     295              :          * static and auto, and omit any use of the static var when using Tls. We
     296              :          * rely on TlsGetValue() to return 0 if the value is not yet set.
     297              :          */
     298              :         static PQExpBuffer s_id_return = NULL;
     299              :         PQExpBuffer id_return;
     300              : 
     301              :         if (parallel_init_done)
     302              :                 id_return = (PQExpBuffer) TlsGetValue(tls_index);
     303              :         else
     304              :                 id_return = s_id_return;
     305              : 
     306              :         if (id_return)                          /* first time through? */
     307              :         {
     308              :                 /* same buffer, just wipe contents */
     309              :                 resetPQExpBuffer(id_return);
     310              :         }
     311              :         else
     312              :         {
     313              :                 /* new buffer */
     314              :                 id_return = createPQExpBuffer();
     315              :                 if (parallel_init_done)
     316              :                         TlsSetValue(tls_index, id_return);
     317              :                 else
     318              :                         s_id_return = id_return;
     319              :         }
     320              : 
     321              :         return id_return;
     322              : }
     323              : #endif                                                  /* WIN32 */
     324              : 
     325              : /*
     326              :  * pg_dump and pg_restore call this to register the cleanup handler
     327              :  * as soon as they've created the ArchiveHandle.
     328              :  */
     329              : void
     330            0 : on_exit_close_archive(Archive *AHX)
     331              : {
     332            0 :         shutdown_info.AHX = AHX;
     333            0 :         on_exit_nicely(archive_close_connection, &shutdown_info);
     334            0 : }
     335              : 
     336              : /*
     337              :  * on_exit_nicely handler for shutting down database connections and
     338              :  * worker processes cleanly.
     339              :  */
     340              : static void
     341            0 : archive_close_connection(int code, void *arg)
     342              : {
     343            0 :         ShutdownInformation *si = (ShutdownInformation *) arg;
     344              : 
     345            0 :         if (si->pstate)
     346              :         {
     347              :                 /* In parallel mode, must figure out who we are */
     348            0 :                 ParallelSlot *slot = GetMyPSlot(si->pstate);
     349              : 
     350            0 :                 if (!slot)
     351              :                 {
     352              :                         /*
     353              :                          * We're the leader.  Forcibly shut down workers, then close our
     354              :                          * own database connection, if any.
     355              :                          */
     356            0 :                         ShutdownWorkersHard(si->pstate);
     357              : 
     358            0 :                         if (si->AHX)
     359            0 :                                 DisconnectDatabase(si->AHX);
     360            0 :                 }
     361              :                 else
     362              :                 {
     363              :                         /*
     364              :                          * We're a worker.  Shut down our own DB connection if any.  On
     365              :                          * Windows, we also have to close our communication sockets, to
     366              :                          * emulate what will happen on Unix when the worker process exits.
     367              :                          * (Without this, if this is a premature exit, the leader would
     368              :                          * fail to detect it because there would be no EOF condition on
     369              :                          * the other end of the pipe.)
     370              :                          */
     371            0 :                         if (slot->AH)
     372            0 :                                 DisconnectDatabase(&(slot->AH->public));
     373              : 
     374              : #ifdef WIN32
     375              :                         closesocket(slot->pipeRevRead);
     376              :                         closesocket(slot->pipeRevWrite);
     377              : #endif
     378              :                 }
     379            0 :         }
     380              :         else
     381              :         {
     382              :                 /* Non-parallel operation: just kill the leader DB connection */
     383            0 :                 if (si->AHX)
     384            0 :                         DisconnectDatabase(si->AHX);
     385              :         }
     386            0 : }
     387              : 
     388              : /*
     389              :  * Forcibly shut down any remaining workers, waiting for them to finish.
     390              :  *
     391              :  * Note that we don't expect to come here during normal exit (the workers
     392              :  * should be long gone, and the ParallelState too).  We're only here in a
     393              :  * pg_fatal() situation, so intervening to cancel active commands is
     394              :  * appropriate.
     395              :  */
     396              : static void
     397            0 : ShutdownWorkersHard(ParallelState *pstate)
     398              : {
     399            0 :         int                     i;
     400              : 
     401              :         /*
     402              :          * Close our write end of the sockets so that any workers waiting for
     403              :          * commands know they can exit.  (Note: some of the pipeWrite fields might
     404              :          * still be zero, if we failed to initialize all the workers.  Hence, just
     405              :          * ignore errors here.)
     406              :          */
     407            0 :         for (i = 0; i < pstate->numWorkers; i++)
     408            0 :                 closesocket(pstate->parallelSlot[i].pipeWrite);
     409              : 
     410              :         /*
     411              :          * Force early termination of any commands currently in progress.
     412              :          */
     413              : #ifndef WIN32
     414              :         /* On non-Windows, send SIGTERM to each worker process. */
     415            0 :         for (i = 0; i < pstate->numWorkers; i++)
     416              :         {
     417            0 :                 pid_t           pid = pstate->parallelSlot[i].pid;
     418              : 
     419            0 :                 if (pid != 0)
     420            0 :                         kill(pid, SIGTERM);
     421            0 :         }
     422              : #else
     423              : 
     424              :         /*
     425              :          * On Windows, send query cancels directly to the workers' backends.  Use
     426              :          * a critical section to ensure worker threads don't change state.
     427              :          */
     428              :         EnterCriticalSection(&signal_info_lock);
     429              :         for (i = 0; i < pstate->numWorkers; i++)
     430              :         {
     431              :                 ArchiveHandle *AH = pstate->parallelSlot[i].AH;
     432              :                 char            errbuf[1];
     433              : 
     434              :                 if (AH != NULL && AH->connCancel != NULL)
     435              :                         (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     436              :         }
     437              :         LeaveCriticalSection(&signal_info_lock);
     438              : #endif
     439              : 
     440              :         /* Now wait for them to terminate. */
     441            0 :         WaitForTerminatingWorkers(pstate);
     442            0 : }
     443              : 
     444              : /*
     445              :  * Wait for all workers to terminate.
     446              :  */
     447              : static void
     448            0 : WaitForTerminatingWorkers(ParallelState *pstate)
     449              : {
     450            0 :         while (!HasEveryWorkerTerminated(pstate))
     451              :         {
     452            0 :                 ParallelSlot *slot = NULL;
     453            0 :                 int                     j;
     454              : 
     455              : #ifndef WIN32
     456              :                 /* On non-Windows, use wait() to wait for next worker to end */
     457            0 :                 int                     status;
     458            0 :                 pid_t           pid = wait(&status);
     459              : 
     460              :                 /* Find dead worker's slot, and clear the PID field */
     461            0 :                 for (j = 0; j < pstate->numWorkers; j++)
     462              :                 {
     463            0 :                         slot = &(pstate->parallelSlot[j]);
     464            0 :                         if (slot->pid == pid)
     465              :                         {
     466            0 :                                 slot->pid = 0;
     467            0 :                                 break;
     468              :                         }
     469            0 :                 }
     470              : #else                                                   /* WIN32 */
     471              :                 /* On Windows, we must use WaitForMultipleObjects() */
     472              :                 HANDLE     *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
     473              :                 int                     nrun = 0;
     474              :                 DWORD           ret;
     475              :                 uintptr_t       hThread;
     476              : 
     477              :                 for (j = 0; j < pstate->numWorkers; j++)
     478              :                 {
     479              :                         if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
     480              :                         {
     481              :                                 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
     482              :                                 nrun++;
     483              :                         }
     484              :                 }
     485              :                 ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
     486              :                 Assert(ret != WAIT_FAILED);
     487              :                 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
     488              :                 free(lpHandles);
     489              : 
     490              :                 /* Find dead worker's slot, and clear the hThread field */
     491              :                 for (j = 0; j < pstate->numWorkers; j++)
     492              :                 {
     493              :                         slot = &(pstate->parallelSlot[j]);
     494              :                         if (slot->hThread == hThread)
     495              :                         {
     496              :                                 /* For cleanliness, close handles for dead threads */
     497              :                                 CloseHandle((HANDLE) slot->hThread);
     498              :                                 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
     499              :                                 break;
     500              :                         }
     501              :                 }
     502              : #endif                                                  /* WIN32 */
     503              : 
     504              :                 /* On all platforms, update workerStatus and te[] as well */
     505            0 :                 Assert(j < pstate->numWorkers);
     506            0 :                 slot->workerStatus = WRKR_TERMINATED;
     507            0 :                 pstate->te[j] = NULL;
     508            0 :         }
     509            0 : }
     510              : 
     511              : 
     512              : /*
     513              :  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
     514              :  *
     515              :  * This doesn't quite belong in this module, but it needs access to the
     516              :  * ParallelState data, so there's not really a better place either.
     517              :  *
     518              :  * When we get a cancel interrupt, we could just die, but in pg_restore that
     519              :  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
     520              :  * for a long time.  Instead, we try to send a cancel request and then die.
     521              :  * pg_dump probably doesn't really need this, but we might as well use it
     522              :  * there too.  Note that sending the cancel directly from the signal handler
     523              :  * is safe because PQcancel() is written to make it so.
     524              :  *
     525              :  * In parallel operation on Unix, each process is responsible for canceling
     526              :  * its own connection (this must be so because nobody else has access to it).
     527              :  * Furthermore, the leader process should attempt to forward its signal to
     528              :  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
     529              :  * needed because typing control-C at the console would deliver SIGINT to
     530              :  * every member of the terminal process group --- but in other scenarios it
     531              :  * might be that only the leader gets signaled.
     532              :  *
     533              :  * On Windows, the cancel handler runs in a separate thread, because that's
     534              :  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
     535              :  * cancels on all active connections, and then return FALSE, which will allow
     536              :  * the process to die.  For safety's sake, we use a critical section to
     537              :  * protect the PGcancel structures against being changed while the signal
     538              :  * thread runs.
     539              :  */
     540              : 
     541              : #ifndef WIN32
     542              : 
     543              : /*
     544              :  * Signal handler (Unix only)
     545              :  */
     546              : static void
     547            0 : sigTermHandler(SIGNAL_ARGS)
     548              : {
     549            0 :         int                     i;
     550            0 :         char            errbuf[1];
     551              : 
     552              :         /*
     553              :          * Some platforms allow delivery of new signals to interrupt an active
     554              :          * signal handler.  That could muck up our attempt to send PQcancel, so
     555              :          * disable the signals that set_cancel_handler enabled.
     556              :          */
     557            0 :         pqsignal(SIGINT, SIG_IGN);
     558            0 :         pqsignal(SIGTERM, SIG_IGN);
     559            0 :         pqsignal(SIGQUIT, SIG_IGN);
     560              : 
     561              :         /*
     562              :          * If we're in the leader, forward signal to all workers.  (It seems best
     563              :          * to do this before PQcancel; killing the leader transaction will result
     564              :          * in invalid-snapshot errors from active workers, which maybe we can
     565              :          * quiet by killing workers first.)  Ignore any errors.
     566              :          */
     567            0 :         if (signal_info.pstate != NULL)
     568              :         {
     569            0 :                 for (i = 0; i < signal_info.pstate->numWorkers; i++)
     570              :                 {
     571            0 :                         pid_t           pid = signal_info.pstate->parallelSlot[i].pid;
     572              : 
     573            0 :                         if (pid != 0)
     574            0 :                                 kill(pid, SIGTERM);
     575            0 :                 }
     576            0 :         }
     577              : 
     578              :         /*
     579              :          * Send QueryCancel if we have a connection to send to.  Ignore errors,
     580              :          * there's not much we can do about them anyway.
     581              :          */
     582            0 :         if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     583            0 :                 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
     584              : 
     585              :         /*
     586              :          * Report we're quitting, using nothing more complicated than write(2).
     587              :          * When in parallel operation, only the leader process should do this.
     588              :          */
     589            0 :         if (!signal_info.am_worker)
     590              :         {
     591            0 :                 if (progname)
     592              :                 {
     593            0 :                         write_stderr(progname);
     594            0 :                         write_stderr(": ");
     595            0 :                 }
     596            0 :                 write_stderr("terminated by user\n");
     597            0 :         }
     598              : 
     599              :         /*
     600              :          * And die, using _exit() not exit() because the latter will invoke atexit
     601              :          * handlers that can fail if we interrupted related code.
     602              :          */
     603            0 :         _exit(1);
     604              : }
     605              : 
     606              : /*
     607              :  * Enable cancel interrupt handler, if not already done.
     608              :  */
     609              : static void
     610            0 : set_cancel_handler(void)
     611              : {
     612              :         /*
     613              :          * When forking, signal_info.handler_set will propagate into the new
     614              :          * process, but that's fine because the signal handler state does too.
     615              :          */
     616            0 :         if (!signal_info.handler_set)
     617              :         {
     618            0 :                 signal_info.handler_set = true;
     619              : 
     620            0 :                 pqsignal(SIGINT, sigTermHandler);
     621            0 :                 pqsignal(SIGTERM, sigTermHandler);
     622            0 :                 pqsignal(SIGQUIT, sigTermHandler);
     623            0 :         }
     624            0 : }
     625              : 
     626              : #else                                                   /* WIN32 */
     627              : 
     628              : /*
     629              :  * Console interrupt handler --- runs in a newly-started thread.
     630              :  *
     631              :  * After stopping other threads and sending cancel requests on all open
     632              :  * connections, we return FALSE which will allow the default ExitProcess()
     633              :  * action to be taken.
     634              :  */
     635              : static BOOL WINAPI
     636              : consoleHandler(DWORD dwCtrlType)
     637              : {
     638              :         int                     i;
     639              :         char            errbuf[1];
     640              : 
     641              :         if (dwCtrlType == CTRL_C_EVENT ||
     642              :                 dwCtrlType == CTRL_BREAK_EVENT)
     643              :         {
     644              :                 /* Critical section prevents changing data we look at here */
     645              :                 EnterCriticalSection(&signal_info_lock);
     646              : 
     647              :                 /*
     648              :                  * If in parallel mode, stop worker threads and send QueryCancel to
     649              :                  * their connected backends.  The main point of stopping the worker
     650              :                  * threads is to keep them from reporting the query cancels as errors,
     651              :                  * which would clutter the user's screen.  We needn't stop the leader
     652              :                  * thread since it won't be doing much anyway.  Do this before
     653              :                  * canceling the main transaction, else we might get invalid-snapshot
     654              :                  * errors reported before we can stop the workers.  Ignore errors,
     655              :                  * there's not much we can do about them anyway.
     656              :                  */
     657              :                 if (signal_info.pstate != NULL)
     658              :                 {
     659              :                         for (i = 0; i < signal_info.pstate->numWorkers; i++)
     660              :                         {
     661              :                                 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
     662              :                                 ArchiveHandle *AH = slot->AH;
     663              :                                 HANDLE          hThread = (HANDLE) slot->hThread;
     664              : 
     665              :                                 /*
     666              :                                  * Using TerminateThread here may leave some resources leaked,
     667              :                                  * but it doesn't matter since we're about to end the whole
     668              :                                  * process.
     669              :                                  */
     670              :                                 if (hThread != INVALID_HANDLE_VALUE)
     671              :                                         TerminateThread(hThread, 0);
     672              : 
     673              :                                 if (AH != NULL && AH->connCancel != NULL)
     674              :                                         (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     675              :                         }
     676              :                 }
     677              : 
     678              :                 /*
     679              :                  * Send QueryCancel to leader connection, if enabled.  Ignore errors,
     680              :                  * there's not much we can do about them anyway.
     681              :                  */
     682              :                 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     683              :                         (void) PQcancel(signal_info.myAH->connCancel,
     684              :                                                         errbuf, sizeof(errbuf));
     685              : 
     686              :                 LeaveCriticalSection(&signal_info_lock);
     687              : 
     688              :                 /*
     689              :                  * Report we're quitting, using nothing more complicated than
     690              :                  * write(2).  (We might be able to get away with using pg_log_*()
     691              :                  * here, but since we terminated other threads uncleanly above, it
     692              :                  * seems better to assume as little as possible.)
     693              :                  */
     694              :                 if (progname)
     695              :                 {
     696              :                         write_stderr(progname);
     697              :                         write_stderr(": ");
     698              :                 }
     699              :                 write_stderr("terminated by user\n");
     700              :         }
     701              : 
     702              :         /* Always return FALSE to allow signal handling to continue */
     703              :         return FALSE;
     704              : }
     705              : 
     706              : /*
     707              :  * Enable cancel interrupt handler, if not already done.
     708              :  */
     709              : static void
     710              : set_cancel_handler(void)
     711              : {
     712              :         if (!signal_info.handler_set)
     713              :         {
     714              :                 signal_info.handler_set = true;
     715              : 
     716              :                 InitializeCriticalSection(&signal_info_lock);
     717              : 
     718              :                 SetConsoleCtrlHandler(consoleHandler, TRUE);
     719              :         }
     720              : }
     721              : 
     722              : #endif                                                  /* WIN32 */
     723              : 
     724              : 
     725              : /*
     726              :  * set_archive_cancel_info
     727              :  *
     728              :  * Fill AH->connCancel with cancellation info for the specified database
     729              :  * connection; or clear it if conn is NULL.
     730              :  */
     731              : void
     732            0 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
     733              : {
     734            0 :         PGcancel   *oldConnCancel;
     735              : 
     736              :         /*
     737              :          * Activate the interrupt handler if we didn't yet in this process.  On
     738              :          * Windows, this also initializes signal_info_lock; therefore it's
     739              :          * important that this happen at least once before we fork off any
     740              :          * threads.
     741              :          */
     742            0 :         set_cancel_handler();
     743              : 
     744              :         /*
     745              :          * On Unix, we assume that storing a pointer value is atomic with respect
     746              :          * to any possible signal interrupt.  On Windows, use a critical section.
     747              :          */
     748              : 
     749              : #ifdef WIN32
     750              :         EnterCriticalSection(&signal_info_lock);
     751              : #endif
     752              : 
     753              :         /* Free the old one if we have one */
     754            0 :         oldConnCancel = AH->connCancel;
     755              :         /* be sure interrupt handler doesn't use pointer while freeing */
     756            0 :         AH->connCancel = NULL;
     757              : 
     758            0 :         if (oldConnCancel != NULL)
     759            0 :                 PQfreeCancel(oldConnCancel);
     760              : 
     761              :         /* Set the new one if specified */
     762            0 :         if (conn)
     763            0 :                 AH->connCancel = PQgetCancel(conn);
     764              : 
     765              :         /*
     766              :          * On Unix, there's only ever one active ArchiveHandle per process, so we
     767              :          * can just set signal_info.myAH unconditionally.  On Windows, do that
     768              :          * only in the main thread; worker threads have to make sure their
     769              :          * ArchiveHandle appears in the pstate data, which is dealt with in
     770              :          * RunWorker().
     771              :          */
     772              : #ifndef WIN32
     773            0 :         signal_info.myAH = AH;
     774              : #else
     775              :         if (mainThreadId == GetCurrentThreadId())
     776              :                 signal_info.myAH = AH;
     777              : #endif
     778              : 
     779              : #ifdef WIN32
     780              :         LeaveCriticalSection(&signal_info_lock);
     781              : #endif
     782            0 : }
     783              : 
     784              : /*
     785              :  * set_cancel_pstate
     786              :  *
     787              :  * Set signal_info.pstate to point to the specified ParallelState, if any.
     788              :  * We need this mainly to have an interlock against Windows signal thread.
     789              :  */
     790              : static void
     791            0 : set_cancel_pstate(ParallelState *pstate)
     792              : {
     793              : #ifdef WIN32
     794              :         EnterCriticalSection(&signal_info_lock);
     795              : #endif
     796              : 
     797            0 :         signal_info.pstate = pstate;
     798              : 
     799              : #ifdef WIN32
     800              :         LeaveCriticalSection(&signal_info_lock);
     801              : #endif
     802            0 : }
     803              : 
     804              : /*
     805              :  * set_cancel_slot_archive
     806              :  *
     807              :  * Set ParallelSlot's AH field to point to the specified archive, if any.
     808              :  * We need this mainly to have an interlock against Windows signal thread.
     809              :  */
     810              : static void
     811            0 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
     812              : {
     813              : #ifdef WIN32
     814              :         EnterCriticalSection(&signal_info_lock);
     815              : #endif
     816              : 
     817            0 :         slot->AH = AH;
     818              : 
     819              : #ifdef WIN32
     820              :         LeaveCriticalSection(&signal_info_lock);
     821              : #endif
     822            0 : }
     823              : 
     824              : 
     825              : /*
     826              :  * This function is called by both Unix and Windows variants to set up
     827              :  * and run a worker process.  Caller should exit the process (or thread)
     828              :  * upon return.
     829              :  */
     830              : static void
     831            0 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
     832              : {
     833            0 :         int                     pipefd[2];
     834              : 
     835              :         /* fetch child ends of pipes */
     836            0 :         pipefd[PIPE_READ] = slot->pipeRevRead;
     837            0 :         pipefd[PIPE_WRITE] = slot->pipeRevWrite;
     838              : 
     839              :         /*
     840              :          * Clone the archive so that we have our own state to work with, and in
     841              :          * particular our own database connection.
     842              :          *
     843              :          * We clone on Unix as well as Windows, even though technically we don't
     844              :          * need to because fork() gives us a copy in our own address space
     845              :          * already.  But CloneArchive resets the state information and also clones
     846              :          * the database connection which both seem kinda helpful.
     847              :          */
     848            0 :         AH = CloneArchive(AH);
     849              : 
     850              :         /* Remember cloned archive where signal handler can find it */
     851            0 :         set_cancel_slot_archive(slot, AH);
     852              : 
     853              :         /*
     854              :          * Call the setup worker function that's defined in the ArchiveHandle.
     855              :          */
     856            0 :         (AH->SetupWorkerPtr) ((Archive *) AH);
     857              : 
     858              :         /*
     859              :          * Execute commands until done.
     860              :          */
     861            0 :         WaitForCommands(AH, pipefd);
     862              : 
     863              :         /*
     864              :          * Disconnect from database and clean up.
     865              :          */
     866            0 :         set_cancel_slot_archive(slot, NULL);
     867            0 :         DisconnectDatabase(&(AH->public));
     868            0 :         DeCloneArchive(AH);
     869            0 : }
     870              : 
     871              : /*
     872              :  * Thread base function for Windows
     873              :  */
     874              : #ifdef WIN32
     875              : static unsigned __stdcall
     876              : init_spawned_worker_win32(WorkerInfo *wi)
     877              : {
     878              :         ArchiveHandle *AH = wi->AH;
     879              :         ParallelSlot *slot = wi->slot;
     880              : 
     881              :         /* Don't need WorkerInfo anymore */
     882              :         free(wi);
     883              : 
     884              :         /* Run the worker ... */
     885              :         RunWorker(AH, slot);
     886              : 
     887              :         /* Exit the thread */
     888              :         _endthreadex(0);
     889              :         return 0;
     890              : }
     891              : #endif                                                  /* WIN32 */
     892              : 
     893              : /*
     894              :  * This function starts a parallel dump or restore by spawning off the worker
     895              :  * processes.  For Windows, it creates a number of threads; on Unix the
     896              :  * workers are created with fork().
     897              :  */
     898              : ParallelState *
     899            0 : ParallelBackupStart(ArchiveHandle *AH)
     900              : {
     901            0 :         ParallelState *pstate;
     902            0 :         int                     i;
     903              : 
     904            0 :         Assert(AH->public.numWorkers > 0);
     905              : 
     906            0 :         pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
     907              : 
     908            0 :         pstate->numWorkers = AH->public.numWorkers;
     909            0 :         pstate->te = NULL;
     910            0 :         pstate->parallelSlot = NULL;
     911              : 
     912            0 :         if (AH->public.numWorkers == 1)
     913            0 :                 return pstate;
     914              : 
     915              :         /* Create status arrays, being sure to initialize all fields to 0 */
     916            0 :         pstate->te = (TocEntry **)
     917            0 :                 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
     918            0 :         pstate->parallelSlot = (ParallelSlot *)
     919            0 :                 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
     920              : 
     921              : #ifdef WIN32
     922              :         /* Make fmtId() and fmtQualifiedId() use thread-local storage */
     923              :         getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
     924              : #endif
     925              : 
     926              :         /*
     927              :          * Set the pstate in shutdown_info, to tell the exit handler that it must
     928              :          * clean up workers as well as the main database connection.  But we don't
     929              :          * set this in signal_info yet, because we don't want child processes to
     930              :          * inherit non-NULL signal_info.pstate.
     931              :          */
     932            0 :         shutdown_info.pstate = pstate;
     933              : 
     934              :         /*
     935              :          * Temporarily disable query cancellation on the leader connection.  This
     936              :          * ensures that child processes won't inherit valid AH->connCancel
     937              :          * settings and thus won't try to issue cancels against the leader's
     938              :          * connection.  No harm is done if we fail while it's disabled, because
     939              :          * the leader connection is idle at this point anyway.
     940              :          */
     941            0 :         set_archive_cancel_info(AH, NULL);
     942              : 
     943              :         /* Ensure stdio state is quiesced before forking */
     944            0 :         fflush(NULL);
     945              : 
     946              :         /* Create desired number of workers */
     947            0 :         for (i = 0; i < pstate->numWorkers; i++)
     948              :         {
     949              : #ifdef WIN32
     950              :                 WorkerInfo *wi;
     951              :                 uintptr_t       handle;
     952              : #else
     953            0 :                 pid_t           pid;
     954              : #endif
     955            0 :                 ParallelSlot *slot = &(pstate->parallelSlot[i]);
     956            0 :                 int                     pipeMW[2],
     957              :                                         pipeWM[2];
     958              : 
     959              :                 /* Create communication pipes for this worker */
     960            0 :                 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
     961            0 :                         pg_fatal("could not create communication channels: %m");
     962              : 
     963              :                 /* leader's ends of the pipes */
     964            0 :                 slot->pipeRead = pipeWM[PIPE_READ];
     965            0 :                 slot->pipeWrite = pipeMW[PIPE_WRITE];
     966              :                 /* child's ends of the pipes */
     967            0 :                 slot->pipeRevRead = pipeMW[PIPE_READ];
     968            0 :                 slot->pipeRevWrite = pipeWM[PIPE_WRITE];
     969              : 
     970              : #ifdef WIN32
     971              :                 /* Create transient structure to pass args to worker function */
     972              :                 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
     973              : 
     974              :                 wi->AH = AH;
     975              :                 wi->slot = slot;
     976              : 
     977              :                 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
     978              :                                                                 wi, 0, &(slot->threadId));
     979              :                 slot->hThread = handle;
     980              :                 slot->workerStatus = WRKR_IDLE;
     981              : #else                                                   /* !WIN32 */
     982            0 :                 pid = fork();
     983            0 :                 if (pid == 0)
     984              :                 {
     985              :                         /* we are the worker */
     986            0 :                         int                     j;
     987              : 
     988              :                         /* this is needed for GetMyPSlot() */
     989            0 :                         slot->pid = getpid();
     990              : 
     991              :                         /* instruct signal handler that we're in a worker now */
     992            0 :                         signal_info.am_worker = true;
     993              : 
     994              :                         /* close read end of Worker -> Leader */
     995            0 :                         closesocket(pipeWM[PIPE_READ]);
     996              :                         /* close write end of Leader -> Worker */
     997            0 :                         closesocket(pipeMW[PIPE_WRITE]);
     998              : 
     999              :                         /*
    1000              :                          * Close all inherited fds for communication of the leader with
    1001              :                          * previously-forked workers.
    1002              :                          */
    1003            0 :                         for (j = 0; j < i; j++)
    1004              :                         {
    1005            0 :                                 closesocket(pstate->parallelSlot[j].pipeRead);
    1006            0 :                                 closesocket(pstate->parallelSlot[j].pipeWrite);
    1007            0 :                         }
    1008              : 
    1009              :                         /* Run the worker ... */
    1010            0 :                         RunWorker(AH, slot);
    1011              : 
    1012              :                         /* We can just exit(0) when done */
    1013            0 :                         exit(0);
    1014              :                 }
    1015            0 :                 else if (pid < 0)
    1016              :                 {
    1017              :                         /* fork failed */
    1018            0 :                         pg_fatal("could not create worker process: %m");
    1019            0 :                 }
    1020              : 
    1021              :                 /* In Leader after successful fork */
    1022            0 :                 slot->pid = pid;
    1023            0 :                 slot->workerStatus = WRKR_IDLE;
    1024              : 
    1025              :                 /* close read end of Leader -> Worker */
    1026            0 :                 closesocket(pipeMW[PIPE_READ]);
    1027              :                 /* close write end of Worker -> Leader */
    1028            0 :                 closesocket(pipeWM[PIPE_WRITE]);
    1029              : #endif                                                  /* WIN32 */
    1030            0 :         }
    1031              : 
    1032              :         /*
    1033              :          * Having forked off the workers, disable SIGPIPE so that leader isn't
    1034              :          * killed if it tries to send a command to a dead worker.  We don't want
    1035              :          * the workers to inherit this setting, though.
    1036              :          */
    1037              : #ifndef WIN32
    1038            0 :         pqsignal(SIGPIPE, SIG_IGN);
    1039              : #endif
    1040              : 
    1041              :         /*
    1042              :          * Re-establish query cancellation on the leader connection.
    1043              :          */
    1044            0 :         set_archive_cancel_info(AH, AH->connection);
    1045              : 
    1046              :         /*
    1047              :          * Tell the cancel signal handler to forward signals to worker processes,
    1048              :          * too.  (As with query cancel, we did not need this earlier because the
    1049              :          * workers have not yet been given anything to do; if we die before this
    1050              :          * point, any already-started workers will see EOF and quit promptly.)
    1051              :          */
    1052            0 :         set_cancel_pstate(pstate);
    1053              : 
    1054            0 :         return pstate;
    1055            0 : }
    1056              : 
    1057              : /*
    1058              :  * Close down a parallel dump or restore.
    1059              :  */
    1060              : void
    1061            0 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
    1062              : {
    1063            0 :         int                     i;
    1064              : 
    1065              :         /* No work if non-parallel */
    1066            0 :         if (pstate->numWorkers == 1)
    1067            0 :                 return;
    1068              : 
    1069              :         /* There should not be any unfinished jobs */
    1070            0 :         Assert(IsEveryWorkerIdle(pstate));
    1071              : 
    1072              :         /* Close the sockets so that the workers know they can exit */
    1073            0 :         for (i = 0; i < pstate->numWorkers; i++)
    1074              :         {
    1075            0 :                 closesocket(pstate->parallelSlot[i].pipeRead);
    1076            0 :                 closesocket(pstate->parallelSlot[i].pipeWrite);
    1077            0 :         }
    1078              : 
    1079              :         /* Wait for them to exit */
    1080            0 :         WaitForTerminatingWorkers(pstate);
    1081              : 
    1082              :         /*
    1083              :          * Unlink pstate from shutdown_info, so the exit handler will not try to
    1084              :          * use it; and likewise unlink from signal_info.
    1085              :          */
    1086            0 :         shutdown_info.pstate = NULL;
    1087            0 :         set_cancel_pstate(NULL);
    1088              : 
    1089              :         /* Release state (mere neatnik-ism, since we're about to terminate) */
    1090            0 :         free(pstate->te);
    1091            0 :         free(pstate->parallelSlot);
    1092            0 :         free(pstate);
    1093            0 : }
    1094              : 
    1095              : /*
    1096              :  * These next four functions handle construction and parsing of the command
    1097              :  * strings and response strings for parallel workers.
    1098              :  *
    1099              :  * Currently, these can be the same regardless of which archive format we are
    1100              :  * processing.  In future, we might want to let format modules override these
    1101              :  * functions to add format-specific data to a command or response.
    1102              :  */
    1103              : 
    1104              : /*
    1105              :  * buildWorkerCommand: format a command string to send to a worker.
    1106              :  *
    1107              :  * The string is built in the caller-supplied buffer of size buflen.
    1108              :  */
    1109              : static void
    1110            0 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
    1111              :                                    char *buf, int buflen)
    1112              : {
    1113            0 :         if (act == ACT_DUMP)
    1114            0 :                 snprintf(buf, buflen, "DUMP %d", te->dumpId);
    1115            0 :         else if (act == ACT_RESTORE)
    1116            0 :                 snprintf(buf, buflen, "RESTORE %d", te->dumpId);
    1117              :         else
    1118            0 :                 Assert(false);
    1119            0 : }
    1120              : 
    1121              : /*
    1122              :  * parseWorkerCommand: interpret a command string in a worker.
    1123              :  */
    1124              : static void
    1125            0 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
    1126              :                                    const char *msg)
    1127              : {
    1128            0 :         DumpId          dumpId;
    1129            0 :         int                     nBytes;
    1130              : 
    1131            0 :         if (messageStartsWith(msg, "DUMP "))
    1132              :         {
    1133            0 :                 *act = ACT_DUMP;
    1134            0 :                 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
    1135            0 :                 Assert(nBytes == strlen(msg));
    1136            0 :                 *te = getTocEntryByDumpId(AH, dumpId);
    1137            0 :                 Assert(*te != NULL);
    1138            0 :         }
    1139            0 :         else if (messageStartsWith(msg, "RESTORE "))
    1140              :         {
    1141            0 :                 *act = ACT_RESTORE;
    1142            0 :                 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
    1143            0 :                 Assert(nBytes == strlen(msg));
    1144            0 :                 *te = getTocEntryByDumpId(AH, dumpId);
    1145            0 :                 Assert(*te != NULL);
    1146            0 :         }
    1147              :         else
    1148            0 :                 pg_fatal("unrecognized command received from leader: \"%s\"",
    1149              :                                  msg);
    1150            0 : }
    1151              : 
    1152              : /*
    1153              :  * buildWorkerResponse: format a response string to send to the leader.
    1154              :  *
    1155              :  * The string is built in the caller-supplied buffer of size buflen.
    1156              :  */
    1157              : static void
    1158            0 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
    1159              :                                         char *buf, int buflen)
    1160              : {
    1161            0 :         snprintf(buf, buflen, "OK %d %d %d",
    1162            0 :                          te->dumpId,
    1163            0 :                          status,
    1164            0 :                          status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
    1165            0 : }
    1166              : 
    1167              : /*
    1168              :  * parseWorkerResponse: parse the status message returned by a worker.
    1169              :  *
    1170              :  * Returns the integer status code, and may update fields of AH and/or te.
    1171              :  */
    1172              : static int
    1173            0 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
    1174              :                                         const char *msg)
    1175              : {
    1176            0 :         DumpId          dumpId;
    1177            0 :         int                     nBytes,
    1178              :                                 n_errors;
    1179            0 :         int                     status = 0;
    1180              : 
    1181            0 :         if (messageStartsWith(msg, "OK "))
    1182              :         {
    1183            0 :                 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
    1184              : 
    1185            0 :                 Assert(dumpId == te->dumpId);
    1186            0 :                 Assert(nBytes == strlen(msg));
    1187              : 
    1188            0 :                 AH->public.n_errors += n_errors;
    1189            0 :         }
    1190              :         else
    1191            0 :                 pg_fatal("invalid message received from worker: \"%s\"",
    1192              :                                  msg);
    1193              : 
    1194            0 :         return status;
    1195            0 : }
    1196              : 
    1197              : /*
    1198              :  * Dispatch a job to some free worker.
    1199              :  *
    1200              :  * te is the TocEntry to be processed, act is the action to be taken on it.
    1201              :  * callback is the function to call on completion of the job.
    1202              :  *
    1203              :  * If no worker is currently available, this will block, and previously
    1204              :  * registered callback functions may be called.
    1205              :  */
    1206              : void
    1207            0 : DispatchJobForTocEntry(ArchiveHandle *AH,
    1208              :                                            ParallelState *pstate,
    1209              :                                            TocEntry *te,
    1210              :                                            T_Action act,
    1211              :                                            ParallelCompletionPtr callback,
    1212              :                                            void *callback_data)
    1213              : {
    1214            0 :         int                     worker;
    1215            0 :         char            buf[256];
    1216              : 
    1217              :         /* Get a worker, waiting if none are idle */
    1218            0 :         while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
    1219            0 :                 WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
    1220              : 
    1221              :         /* Construct and send command string */
    1222            0 :         buildWorkerCommand(AH, te, act, buf, sizeof(buf));
    1223              : 
    1224            0 :         sendMessageToWorker(pstate, worker, buf);
    1225              : 
    1226              :         /* Remember worker is busy, and which TocEntry it's working on */
    1227            0 :         pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
    1228            0 :         pstate->parallelSlot[worker].callback = callback;
    1229            0 :         pstate->parallelSlot[worker].callback_data = callback_data;
    1230            0 :         pstate->te[worker] = te;
    1231            0 : }
    1232              : 
    1233              : /*
    1234              :  * Find an idle worker and return its slot number.
    1235              :  * Return NO_SLOT if none are idle.
    1236              :  */
    1237              : static int
    1238            0 : GetIdleWorker(ParallelState *pstate)
    1239              : {
    1240            0 :         int                     i;
    1241              : 
    1242            0 :         for (i = 0; i < pstate->numWorkers; i++)
    1243              :         {
    1244            0 :                 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
    1245            0 :                         return i;
    1246            0 :         }
    1247            0 :         return NO_SLOT;
    1248            0 : }
    1249              : 
    1250              : /*
    1251              :  * Return true iff no worker is running.
    1252              :  */
    1253              : static bool
    1254            0 : HasEveryWorkerTerminated(ParallelState *pstate)
    1255              : {
    1256            0 :         int                     i;
    1257              : 
    1258            0 :         for (i = 0; i < pstate->numWorkers; i++)
    1259              :         {
    1260            0 :                 if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1261            0 :                         return false;
    1262            0 :         }
    1263            0 :         return true;
    1264            0 : }
    1265              : 
    1266              : /*
    1267              :  * Return true iff every worker is in the WRKR_IDLE state.
    1268              :  */
    1269              : bool
    1270            0 : IsEveryWorkerIdle(ParallelState *pstate)
    1271              : {
    1272            0 :         int                     i;
    1273              : 
    1274            0 :         for (i = 0; i < pstate->numWorkers; i++)
    1275              :         {
    1276            0 :                 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
    1277            0 :                         return false;
    1278            0 :         }
    1279            0 :         return true;
    1280            0 : }
    1281              : 
    1282              : /*
    1283              :  * Acquire lock on a table to be dumped by a worker process.
    1284              :  *
    1285              :  * The leader process is already holding an ACCESS SHARE lock.  Ordinarily
    1286              :  * it's no problem for a worker to get one too, but if anything else besides
    1287              :  * pg_dump is running, there's a possible deadlock:
    1288              :  *
    1289              :  * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
    1290              :  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
    1291              :  *        because the leader holds a conflicting ACCESS SHARE lock).
    1292              :  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
    1293              :  *        The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
    1294              :  * 4) Now we have a deadlock, since the leader is effectively waiting for
    1295              :  *        the worker.  The server cannot detect that, however.
    1296              :  *
    1297              :  * To prevent an infinite wait, prior to touching a table in a worker, request
    1298              :  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
    1299              :  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
    1300              :  * so we have a deadlock.  We must fail the backup in that case.
    1301              :  */
    1302              : static void
    1303            0 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
    1304              : {
    1305            0 :         const char *qualId;
    1306            0 :         PQExpBuffer query;
    1307            0 :         PGresult   *res;
    1308              : 
    1309              :         /* Nothing to do for BLOBS */
    1310            0 :         if (strcmp(te->desc, "BLOBS") == 0)
    1311            0 :                 return;
    1312              : 
    1313            0 :         query = createPQExpBuffer();
    1314              : 
    1315            0 :         qualId = fmtQualifiedId(te->namespace, te->tag);
    1316              : 
    1317            0 :         appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
    1318            0 :                                           qualId);
    1319              : 
    1320            0 :         res = PQexec(AH->connection, query->data);
    1321              : 
    1322            0 :         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
    1323            0 :                 pg_fatal("could not obtain lock on relation \"%s\"\n"
    1324              :                                  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
    1325              :                                  "on the table after the pg_dump parent process had gotten the "
    1326              :                                  "initial ACCESS SHARE lock on the table.", qualId);
    1327              : 
    1328            0 :         PQclear(res);
    1329            0 :         destroyPQExpBuffer(query);
    1330            0 : }
    1331              : 
    1332              : /*
    1333              :  * WaitForCommands: main routine for a worker process.
    1334              :  *
    1335              :  * Read and execute commands from the leader until we see EOF on the pipe.
    1336              :  */
    1337              : static void
    1338            0 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
    1339              : {
    1340            0 :         char       *command;
    1341            0 :         TocEntry   *te;
    1342            0 :         T_Action        act;
    1343            0 :         int                     status = 0;
    1344            0 :         char            buf[256];
    1345              : 
    1346            0 :         for (;;)
    1347              :         {
    1348            0 :                 if (!(command = getMessageFromLeader(pipefd)))
    1349              :                 {
    1350              :                         /* EOF, so done */
    1351              :                         return;
    1352              :                 }
    1353              : 
    1354              :                 /* Decode the command */
    1355            0 :                 parseWorkerCommand(AH, &te, &act, command);
    1356              : 
    1357            0 :                 if (act == ACT_DUMP)
    1358              :                 {
    1359              :                         /* Acquire lock on this table within the worker's session */
    1360            0 :                         lockTableForWorker(AH, te);
    1361              : 
    1362              :                         /* Perform the dump command */
    1363            0 :                         status = (AH->WorkerJobDumpPtr) (AH, te);
    1364            0 :                 }
    1365            0 :                 else if (act == ACT_RESTORE)
    1366              :                 {
    1367              :                         /* Perform the restore command */
    1368            0 :                         status = (AH->WorkerJobRestorePtr) (AH, te);
    1369            0 :                 }
    1370              :                 else
    1371            0 :                         Assert(false);
    1372              : 
    1373              :                 /* Return status to leader */
    1374            0 :                 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
    1375              : 
    1376            0 :                 sendMessageToLeader(pipefd, buf);
    1377              : 
    1378              :                 /* command was pg_malloc'd and we are responsible for free()ing it. */
    1379            0 :                 free(command);
    1380              :         }
    1381            0 : }
    1382              : 
    1383              : /*
    1384              :  * Check for status messages from workers.
    1385              :  *
    1386              :  * If do_wait is true, wait to get a status message; otherwise, just return
    1387              :  * immediately if there is none available.
    1388              :  *
    1389              :  * When we get a status message, we pass the status code to the callback
    1390              :  * function that was specified to DispatchJobForTocEntry, then reset the
    1391              :  * worker status to IDLE.
    1392              :  *
    1393              :  * Returns true if we collected a status message, else false.
    1394              :  *
    1395              :  * XXX is it worth checking for more than one status message per call?
    1396              :  * It seems somewhat unlikely that multiple workers would finish at exactly
    1397              :  * the same time.
    1398              :  */
    1399              : static bool
    1400            0 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
    1401              : {
    1402            0 :         int                     worker;
    1403            0 :         char       *msg;
    1404              : 
    1405              :         /* Try to collect a status message */
    1406            0 :         msg = getMessageFromWorker(pstate, do_wait, &worker);
    1407              : 
    1408            0 :         if (!msg)
    1409              :         {
    1410              :                 /* If do_wait is true, we must have detected EOF on some socket */
    1411            0 :                 if (do_wait)
    1412            0 :                         pg_fatal("a worker process died unexpectedly");
    1413            0 :                 return false;
    1414              :         }
    1415              : 
    1416              :         /* Process it and update our idea of the worker's status */
    1417            0 :         if (messageStartsWith(msg, "OK "))
    1418              :         {
    1419            0 :                 ParallelSlot *slot = &pstate->parallelSlot[worker];
    1420            0 :                 TocEntry   *te = pstate->te[worker];
    1421            0 :                 int                     status;
    1422              : 
    1423            0 :                 status = parseWorkerResponse(AH, te, msg);
    1424            0 :                 slot->callback(AH, te, status, slot->callback_data);
    1425            0 :                 slot->workerStatus = WRKR_IDLE;
    1426            0 :                 pstate->te[worker] = NULL;
    1427            0 :         }
    1428              :         else
    1429            0 :                 pg_fatal("invalid message received from worker: \"%s\"",
    1430              :                                  msg);
    1431              : 
    1432              :         /* Free the string returned from getMessageFromWorker */
    1433            0 :         free(msg);
    1434              : 
    1435            0 :         return true;
    1436            0 : }
    1437              : 
    1438              : /*
    1439              :  * Check for status results from workers, waiting if necessary.
    1440              :  *
    1441              :  * Available wait modes are:
    1442              :  * WFW_NO_WAIT: reap any available status, but don't block
    1443              :  * WFW_GOT_STATUS: wait for at least one more worker to finish
    1444              :  * WFW_ONE_IDLE: wait for at least one worker to be idle
    1445              :  * WFW_ALL_IDLE: wait for all workers to be idle
    1446              :  *
    1447              :  * Any received results are passed to the callback specified to
    1448              :  * DispatchJobForTocEntry.
    1449              :  *
    1450              :  * This function is executed in the leader process.
    1451              :  */
    1452              : void
    1453            0 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
    1454              : {
    1455            0 :         bool            do_wait = false;
    1456              : 
    1457              :         /*
    1458              :          * In GOT_STATUS mode, always block waiting for a message, since we can't
    1459              :          * return till we get something.  In other modes, we don't block the first
    1460              :          * time through the loop.
    1461              :          */
    1462            0 :         if (mode == WFW_GOT_STATUS)
    1463              :         {
    1464              :                 /* Assert that caller knows what it's doing */
    1465            0 :                 Assert(!IsEveryWorkerIdle(pstate));
    1466            0 :                 do_wait = true;
    1467            0 :         }
    1468              : 
    1469            0 :         for (;;)
    1470              :         {
    1471              :                 /*
    1472              :                  * Check for status messages, even if we don't need to block.  We do
    1473              :                  * not try very hard to reap all available messages, though, since
    1474              :                  * there's unlikely to be more than one.
    1475              :                  */
    1476            0 :                 if (ListenToWorkers(AH, pstate, do_wait))
    1477              :                 {
    1478              :                         /*
    1479              :                          * If we got a message, we are done by definition for GOT_STATUS
    1480              :                          * mode, and we can also be certain that there's at least one idle
    1481              :                          * worker.  So we're done in all but ALL_IDLE mode.
    1482              :                          */
    1483            0 :                         if (mode != WFW_ALL_IDLE)
    1484            0 :                                 return;
    1485            0 :                 }
    1486              : 
    1487              :                 /* Check whether we must wait for new status messages */
    1488            0 :                 switch (mode)
    1489              :                 {
    1490              :                         case WFW_NO_WAIT:
    1491            0 :                                 return;                 /* never wait */
    1492              :                         case WFW_GOT_STATUS:
    1493            0 :                                 Assert(false);  /* can't get here, because we waited */
    1494              :                                 break;
    1495              :                         case WFW_ONE_IDLE:
    1496            0 :                                 if (GetIdleWorker(pstate) != NO_SLOT)
    1497            0 :                                         return;
    1498            0 :                                 break;
    1499              :                         case WFW_ALL_IDLE:
    1500            0 :                                 if (IsEveryWorkerIdle(pstate))
    1501            0 :                                         return;
    1502            0 :                                 break;
    1503              :                 }
    1504              : 
    1505              :                 /* Loop back, and this time wait for something to happen */
    1506            0 :                 do_wait = true;
    1507              :         }
    1508            0 : }
    1509              : 
    1510              : /*
    1511              :  * Read one command message from the leader, blocking if necessary
    1512              :  * until one is available, and return it as a malloc'd string.
    1513              :  * On EOF, return NULL.
    1514              :  *
    1515              :  * This function is executed in worker processes.
    1516              :  */
    1517              : static char *
    1518            0 : getMessageFromLeader(int pipefd[2])
    1519              : {
    1520            0 :         return readMessageFromPipe(pipefd[PIPE_READ]);
    1521              : }
    1522              : 
    1523              : /*
    1524              :  * Send a status message to the leader.
    1525              :  *
    1526              :  * This function is executed in worker processes.
    1527              :  */
    1528              : static void
    1529            0 : sendMessageToLeader(int pipefd[2], const char *str)
    1530              : {
    1531            0 :         int                     len = strlen(str) + 1;
    1532              : 
    1533            0 :         if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
    1534            0 :                 pg_fatal("could not write to the communication channel: %m");
    1535            0 : }
    1536              : 
    1537              : /*
    1538              :  * Wait until some descriptor in "workerset" becomes readable.
    1539              :  * Returns -1 on error, else the number of readable descriptors.
    1540              :  */
    1541              : static int
    1542            0 : select_loop(int maxFd, fd_set *workerset)
    1543              : {
    1544            0 :         int                     i;
    1545            0 :         fd_set          saveSet = *workerset;
    1546              : 
    1547            0 :         for (;;)
    1548              :         {
    1549            0 :                 *workerset = saveSet;
    1550            0 :                 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
    1551              : 
    1552              : #ifndef WIN32
    1553            0 :                 if (i < 0 && errno == EINTR)
    1554            0 :                         continue;
    1555              : #else
    1556              :                 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
    1557              :                         continue;
    1558              : #endif
    1559            0 :                 break;
    1560              :         }
    1561              : 
    1562            0 :         return i;
    1563            0 : }
    1564              : 
    1565              : 
    1566              : /*
    1567              :  * Check for messages from worker processes.
    1568              :  *
    1569              :  * If a message is available, return it as a malloc'd string, and put the
    1570              :  * index of the sending worker in *worker.
    1571              :  *
    1572              :  * If nothing is available, wait if "do_wait" is true, else return NULL.
    1573              :  *
    1574              :  * If we detect EOF on any socket, we'll return NULL.  It's not great that
    1575              :  * that's hard to distinguish from the no-data-available case, but for now
    1576              :  * our one caller is okay with that.
    1577              :  *
    1578              :  * This function is executed in the leader process.
    1579              :  */
    1580              : static char *
    1581            0 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
    1582              : {
    1583            0 :         int                     i;
    1584            0 :         fd_set          workerset;
    1585            0 :         int                     maxFd = -1;
    1586            0 :         struct timeval nowait = {0, 0};
    1587              : 
    1588              :         /* construct bitmap of socket descriptors for select() */
    1589            0 :         FD_ZERO(&workerset);
    1590            0 :         for (i = 0; i < pstate->numWorkers; i++)
    1591              :         {
    1592            0 :                 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1593            0 :                         continue;
    1594            0 :                 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
    1595            0 :                 if (pstate->parallelSlot[i].pipeRead > maxFd)
    1596            0 :                         maxFd = pstate->parallelSlot[i].pipeRead;
    1597            0 :         }
    1598              : 
    1599            0 :         if (do_wait)
    1600              :         {
    1601            0 :                 i = select_loop(maxFd, &workerset);
    1602            0 :                 Assert(i != 0);
    1603            0 :         }
    1604              :         else
    1605              :         {
    1606            0 :                 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
    1607            0 :                         return NULL;
    1608              :         }
    1609              : 
    1610            0 :         if (i < 0)
    1611            0 :                 pg_fatal("%s() failed: %m", "select");
    1612              : 
    1613            0 :         for (i = 0; i < pstate->numWorkers; i++)
    1614              :         {
    1615            0 :                 char       *msg;
    1616              : 
    1617            0 :                 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1618            0 :                         continue;
    1619            0 :                 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
    1620            0 :                         continue;
    1621              : 
    1622              :                 /*
    1623              :                  * Read the message if any.  If the socket is ready because of EOF,
    1624              :                  * we'll return NULL instead (and the socket will stay ready, so the
    1625              :                  * condition will persist).
    1626              :                  *
    1627              :                  * Note: because this is a blocking read, we'll wait if only part of
    1628              :                  * the message is available.  Waiting a long time would be bad, but
    1629              :                  * since worker status messages are short and are always sent in one
    1630              :                  * operation, it shouldn't be a problem in practice.
    1631              :                  */
    1632            0 :                 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
    1633            0 :                 *worker = i;
    1634            0 :                 return msg;
    1635            0 :         }
    1636            0 :         Assert(false);
    1637              :         return NULL;
    1638            0 : }
    1639              : 
    1640              : /*
    1641              :  * Send a command message to the specified worker process.
    1642              :  *
    1643              :  * This function is executed in the leader process.
    1644              :  */
    1645              : static void
    1646            0 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
    1647              : {
    1648            0 :         int                     len = strlen(str) + 1;
    1649              : 
    1650            0 :         if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
    1651              :         {
    1652            0 :                 pg_fatal("could not write to the communication channel: %m");
    1653            0 :         }
    1654            0 : }
    1655              : 
    1656              : /*
    1657              :  * Read one message from the specified pipe (fd), blocking if necessary
    1658              :  * until one is available, and return it as a malloc'd string.
    1659              :  * On EOF, return NULL.
    1660              :  *
    1661              :  * A "message" on the channel is just a null-terminated string.
    1662              :  */
    1663              : static char *
    1664            0 : readMessageFromPipe(int fd)
    1665              : {
    1666            0 :         char       *msg;
    1667            0 :         int                     msgsize,
    1668              :                                 bufsize;
    1669            0 :         int                     ret;
    1670              : 
    1671              :         /*
    1672              :          * In theory, if we let piperead() read multiple bytes, it might give us
    1673              :          * back fragments of multiple messages.  (That can't actually occur, since
    1674              :          * neither leader nor workers send more than one message without waiting
    1675              :          * for a reply, but we don't wish to assume that here.)  For simplicity,
    1676              :          * read a byte at a time until we get the terminating '\0'.  This method
    1677              :          * is a bit inefficient, but since this is only used for relatively short
    1678              :          * command and status strings, it shouldn't matter.
    1679              :          */
    1680            0 :         bufsize = 64;                           /* could be any number */
    1681            0 :         msg = (char *) pg_malloc(bufsize);
    1682            0 :         msgsize = 0;
    1683            0 :         for (;;)
    1684              :         {
    1685            0 :                 Assert(msgsize < bufsize);
    1686            0 :                 ret = piperead(fd, msg + msgsize, 1);
    1687            0 :                 if (ret <= 0)
    1688            0 :                         break;                          /* error or connection closure */
    1689              : 
    1690            0 :                 Assert(ret == 1);
    1691              : 
    1692            0 :                 if (msg[msgsize] == '\0')
    1693            0 :                         return msg;                     /* collected whole message */
    1694              : 
    1695            0 :                 msgsize++;
    1696            0 :                 if (msgsize == bufsize) /* enlarge buffer if needed */
    1697              :                 {
    1698            0 :                         bufsize += 16;          /* could be any number */
    1699            0 :                         msg = (char *) pg_realloc(msg, bufsize);
    1700            0 :                 }
    1701              :         }
    1702              : 
    1703              :         /* Other end has closed the connection */
    1704            0 :         pg_free(msg);
    1705            0 :         return NULL;
    1706            0 : }
    1707              : 
    1708              : #ifdef WIN32
    1709              : 
    1710              : /*
    1711              :  * This is a replacement version of pipe(2) for Windows which allows the pipe
    1712              :  * handles to be used in select().
    1713              :  *
    1714              :  * Reads and writes on the pipe must go through piperead()/pipewrite().
    1715              :  *
    1716              :  * For consistency with Unix we declare the returned handles as "int".
    1717              :  * This is okay even on WIN64 because system handles are not more than
    1718              :  * 32 bits wide, but we do have to do some casting.
    1719              :  */
    1720              : static int
    1721              : pgpipe(int handles[2])
    1722              : {
    1723              :         pgsocket        s,
    1724              :                                 tmp_sock;
    1725              :         struct sockaddr_in serv_addr;
    1726              :         int                     len = sizeof(serv_addr);
    1727              : 
    1728              :         /* We have to use the Unix socket invalid file descriptor value here. */
    1729              :         handles[0] = handles[1] = -1;
    1730              : 
    1731              :         /*
    1732              :          * setup listen socket
    1733              :          */
    1734              :         if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1735              :         {
    1736              :                 pg_log_error("pgpipe: could not create socket: error code %d",
    1737              :                                          WSAGetLastError());
    1738              :                 return -1;
    1739              :         }
    1740              : 
    1741              :         memset(&serv_addr, 0, sizeof(serv_addr));
    1742              :         serv_addr.sin_family = AF_INET;
    1743              :         serv_addr.sin_port = pg_hton16(0);
    1744              :         serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
    1745              :         if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1746              :         {
    1747              :                 pg_log_error("pgpipe: could not bind: error code %d",
    1748              :                                          WSAGetLastError());
    1749              :                 closesocket(s);
    1750              :                 return -1;
    1751              :         }
    1752              :         if (listen(s, 1) == SOCKET_ERROR)
    1753              :         {
    1754              :                 pg_log_error("pgpipe: could not listen: error code %d",
    1755              :                                          WSAGetLastError());
    1756              :                 closesocket(s);
    1757              :                 return -1;
    1758              :         }
    1759              :         if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
    1760              :         {
    1761              :                 pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
    1762              :                                          WSAGetLastError());
    1763              :                 closesocket(s);
    1764              :                 return -1;
    1765              :         }
    1766              : 
    1767              :         /*
    1768              :          * setup pipe handles
    1769              :          */
    1770              :         if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1771              :         {
    1772              :                 pg_log_error("pgpipe: could not create second socket: error code %d",
    1773              :                                          WSAGetLastError());
    1774              :                 closesocket(s);
    1775              :                 return -1;
    1776              :         }
    1777              :         handles[1] = (int) tmp_sock;
    1778              : 
    1779              :         if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1780              :         {
    1781              :                 pg_log_error("pgpipe: could not connect socket: error code %d",
    1782              :                                          WSAGetLastError());
    1783              :                 closesocket(handles[1]);
    1784              :                 handles[1] = -1;
    1785              :                 closesocket(s);
    1786              :                 return -1;
    1787              :         }
    1788              :         if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
    1789              :         {
    1790              :                 pg_log_error("pgpipe: could not accept connection: error code %d",
    1791              :                                          WSAGetLastError());
    1792              :                 closesocket(handles[1]);
    1793              :                 handles[1] = -1;
    1794              :                 closesocket(s);
    1795              :                 return -1;
    1796              :         }
    1797              :         handles[0] = (int) tmp_sock;
    1798              : 
    1799              :         closesocket(s);
    1800              :         return 0;
    1801              : }
    1802              : 
    1803              : #endif                                                  /* WIN32 */
        

Generated by: LCOV version 2.3.2-1