LCOV - code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 52 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 3 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*
       2              :  *      parallel.c
       3              :  *
       4              :  *      multi-process support
       5              :  *
       6              :  *      Copyright (c) 2010-2026, PostgreSQL Global Development Group
       7              :  *      src/bin/pg_upgrade/parallel.c
       8              :  */
       9              : 
      10              : #include "postgres_fe.h"
      11              : 
      12              : #include <sys/wait.h>
      13              : #ifdef WIN32
      14              : #include <io.h>
      15              : #endif
      16              : 
      17              : #include "pg_upgrade.h"
      18              : 
      19              : static int      parallel_jobs;
      20              : 
      21              : #ifdef WIN32
      22              : /*
      23              :  *      Array holding all active threads.  There can't be any gaps/zeros so
      24              :  *      it can be passed to WaitForMultipleObjects().  We use two arrays
      25              :  *      so the thread_handles array can be passed to WaitForMultipleObjects().
      26              :  */
      27              : static HANDLE *thread_handles;
      28              : 
      29              : typedef struct
      30              : {
      31              :         char       *log_file;
      32              :         char       *opt_log_file;
      33              :         char       *cmd;
      34              : } exec_thread_arg;
      35              : 
      36              : typedef struct
      37              : {
      38              :         DbInfoArr  *old_db_arr;
      39              :         DbInfoArr  *new_db_arr;
      40              :         char       *old_pgdata;
      41              :         char       *new_pgdata;
      42              :         char       *old_tablespace;
      43              :         char       *new_tablespace;
      44              : } transfer_thread_arg;
      45              : 
      46              : static exec_thread_arg **exec_thread_args;
      47              : static transfer_thread_arg **transfer_thread_args;
      48              : 
      49              : /* track current thread_args struct so reap_child() can be used for all cases */
      50              : static void **cur_thread_args;
      51              : 
      52              : DWORD           win32_exec_prog(exec_thread_arg *args);
      53              : DWORD           win32_transfer_all_new_dbs(transfer_thread_arg *args);
      54              : #endif
      55              : 
      56              : /*
      57              :  *      parallel_exec_prog
      58              :  *
      59              :  *      This has the same API as exec_prog, except it does parallel execution,
      60              :  *      and therefore must throw errors and doesn't return an error status.
      61              :  */
      62              : void
      63            0 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
      64              :                                    const char *fmt,...)
      65              : {
      66            0 :         va_list         args;
      67            0 :         char            cmd[MAX_STRING];
      68              : 
      69              : #ifndef WIN32
      70            0 :         pid_t           child;
      71              : #else
      72              :         HANDLE          child;
      73              :         exec_thread_arg *new_arg;
      74              : #endif
      75              : 
      76            0 :         va_start(args, fmt);
      77            0 :         vsnprintf(cmd, sizeof(cmd), fmt, args);
      78            0 :         va_end(args);
      79              : 
      80            0 :         if (user_opts.jobs <= 1)
      81              :                 /* exit_on_error must be true to allow jobs */
      82            0 :                 exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
      83              :         else
      84              :         {
      85              :                 /* parallel */
      86              : #ifdef WIN32
      87              :                 if (thread_handles == NULL)
      88              :                         thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
      89              : 
      90              :                 if (exec_thread_args == NULL)
      91              :                 {
      92              :                         int                     i;
      93              : 
      94              :                         exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
      95              : 
      96              :                         /*
      97              :                          * For safety and performance, we keep the args allocated during
      98              :                          * the entire life of the process, and we don't free the args in a
      99              :                          * thread different from the one that allocated it.
     100              :                          */
     101              :                         for (i = 0; i < user_opts.jobs; i++)
     102              :                                 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
     103              :                 }
     104              : 
     105              :                 cur_thread_args = (void **) exec_thread_args;
     106              : #endif
     107              :                 /* harvest any dead children */
     108            0 :                 while (reap_child(false) == true)
     109              :                         ;
     110              : 
     111              :                 /* must we wait for a dead child? */
     112            0 :                 if (parallel_jobs >= user_opts.jobs)
     113            0 :                         reap_child(true);
     114              : 
     115              :                 /* set this before we start the job */
     116            0 :                 parallel_jobs++;
     117              : 
     118              :                 /* Ensure stdio state is quiesced before forking */
     119            0 :                 fflush(NULL);
     120              : 
     121              : #ifndef WIN32
     122            0 :                 child = fork();
     123            0 :                 if (child == 0)
     124              :                         /* use _exit to skip atexit() functions */
     125            0 :                         _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
     126            0 :                 else if (child < 0)
     127              :                         /* fork failed */
     128            0 :                         pg_fatal("could not create worker process: %m");
     129              : #else
     130              :                 /* empty array element are always at the end */
     131              :                 new_arg = exec_thread_args[parallel_jobs - 1];
     132              : 
     133              :                 /* Can only pass one pointer into the function, so use a struct */
     134              :                 pg_free(new_arg->log_file);
     135              :                 new_arg->log_file = pg_strdup(log_file);
     136              :                 pg_free(new_arg->opt_log_file);
     137              :                 new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
     138              :                 pg_free(new_arg->cmd);
     139              :                 new_arg->cmd = pg_strdup(cmd);
     140              : 
     141              :                 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
     142              :                                                                                 new_arg, 0, NULL);
     143              :                 if (child == 0)
     144              :                         pg_fatal("could not create worker thread: %m");
     145              : 
     146              :                 thread_handles[parallel_jobs - 1] = child;
     147              : #endif
     148              :         }
     149            0 : }
     150              : 
     151              : 
     152              : #ifdef WIN32
     153              : DWORD
     154              : win32_exec_prog(exec_thread_arg *args)
     155              : {
     156              :         int                     ret;
     157              : 
     158              :         ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
     159              : 
     160              :         /* terminates thread */
     161              :         return ret;
     162              : }
     163              : #endif
     164              : 
     165              : 
     166              : /*
     167              :  *      parallel_transfer_all_new_dbs
     168              :  *
     169              :  *      This has the same API as transfer_all_new_dbs, except it does parallel execution
     170              :  *      by transferring multiple tablespaces in parallel
     171              :  */
     172              : void
     173            0 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
     174              :                                                           char *old_pgdata, char *new_pgdata,
     175              :                                                           char *old_tablespace, char *new_tablespace)
     176              : {
     177              : #ifndef WIN32
     178            0 :         pid_t           child;
     179              : #else
     180              :         HANDLE          child;
     181              :         transfer_thread_arg *new_arg;
     182              : #endif
     183              : 
     184            0 :         if (user_opts.jobs <= 1)
     185            0 :                 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL, NULL);
     186              :         else
     187              :         {
     188              :                 /* parallel */
     189              : #ifdef WIN32
     190              :                 if (thread_handles == NULL)
     191              :                         thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
     192              : 
     193              :                 if (transfer_thread_args == NULL)
     194              :                 {
     195              :                         int                     i;
     196              : 
     197              :                         transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
     198              : 
     199              :                         /*
     200              :                          * For safety and performance, we keep the args allocated during
     201              :                          * the entire life of the process, and we don't free the args in a
     202              :                          * thread different from the one that allocated it.
     203              :                          */
     204              :                         for (i = 0; i < user_opts.jobs; i++)
     205              :                                 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
     206              :                 }
     207              : 
     208              :                 cur_thread_args = (void **) transfer_thread_args;
     209              : #endif
     210              :                 /* harvest any dead children */
     211            0 :                 while (reap_child(false) == true)
     212              :                         ;
     213              : 
     214              :                 /* must we wait for a dead child? */
     215            0 :                 if (parallel_jobs >= user_opts.jobs)
     216            0 :                         reap_child(true);
     217              : 
     218              :                 /* set this before we start the job */
     219            0 :                 parallel_jobs++;
     220              : 
     221              :                 /* Ensure stdio state is quiesced before forking */
     222            0 :                 fflush(NULL);
     223              : 
     224              : #ifndef WIN32
     225            0 :                 child = fork();
     226            0 :                 if (child == 0)
     227              :                 {
     228            0 :                         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
     229            0 :                                                                  old_tablespace, new_tablespace);
     230              :                         /* if we take another exit path, it will be non-zero */
     231              :                         /* use _exit to skip atexit() functions */
     232            0 :                         _exit(0);
     233              :                 }
     234            0 :                 else if (child < 0)
     235              :                         /* fork failed */
     236            0 :                         pg_fatal("could not create worker process: %m");
     237              : #else
     238              :                 /* empty array element are always at the end */
     239              :                 new_arg = transfer_thread_args[parallel_jobs - 1];
     240              : 
     241              :                 /* Can only pass one pointer into the function, so use a struct */
     242              :                 new_arg->old_db_arr = old_db_arr;
     243              :                 new_arg->new_db_arr = new_db_arr;
     244              :                 pg_free(new_arg->old_pgdata);
     245              :                 new_arg->old_pgdata = pg_strdup(old_pgdata);
     246              :                 pg_free(new_arg->new_pgdata);
     247              :                 new_arg->new_pgdata = pg_strdup(new_pgdata);
     248              :                 pg_free(new_arg->old_tablespace);
     249              :                 new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
     250              :                 new_arg->new_tablespace = new_tablespace ? pg_strdup(new_tablespace) : NULL;
     251              : 
     252              :                 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
     253              :                                                                                 new_arg, 0, NULL);
     254              :                 if (child == 0)
     255              :                         pg_fatal("could not create worker thread: %m");
     256              : 
     257              :                 thread_handles[parallel_jobs - 1] = child;
     258              : #endif
     259              :         }
     260            0 : }
     261              : 
     262              : 
     263              : #ifdef WIN32
     264              : DWORD
     265              : win32_transfer_all_new_dbs(transfer_thread_arg *args)
     266              : {
     267              :         transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
     268              :                                                  args->new_pgdata, args->old_tablespace,
     269              :                                                  args->new_tablespace);
     270              : 
     271              :         /* terminates thread */
     272              :         return 0;
     273              : }
     274              : #endif
     275              : 
     276              : 
     277              : /*
     278              :  *      collect status from a completed worker child
     279              :  */
     280              : bool
     281            0 : reap_child(bool wait_for_child)
     282              : {
     283              : #ifndef WIN32
     284            0 :         int                     work_status;
     285            0 :         pid_t           child;
     286              : #else
     287              :         int                     thread_num;
     288              :         DWORD           res;
     289              : #endif
     290              : 
     291            0 :         if (user_opts.jobs <= 1 || parallel_jobs == 0)
     292            0 :                 return false;
     293              : 
     294              : #ifndef WIN32
     295            0 :         child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
     296            0 :         if (child == (pid_t) -1)
     297            0 :                 pg_fatal("%s() failed: %m", "waitpid");
     298            0 :         if (child == 0)
     299            0 :                 return false;                   /* no children, or no dead children */
     300            0 :         if (work_status != 0)
     301            0 :                 pg_fatal("child process exited abnormally: status %d", work_status);
     302              : #else
     303              :         /* wait for one to finish */
     304              :         thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
     305              :                                                                                 false, wait_for_child ? INFINITE : 0);
     306              : 
     307              :         if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
     308              :                 return false;
     309              : 
     310              :         /* compute thread index in active_threads */
     311              :         thread_num -= WAIT_OBJECT_0;
     312              : 
     313              :         /* get the result */
     314              :         GetExitCodeThread(thread_handles[thread_num], &res);
     315              :         if (res != 0)
     316              :                 pg_fatal("child worker exited abnormally: %m");
     317              : 
     318              :         /* dispose of handle to stop leaks */
     319              :         CloseHandle(thread_handles[thread_num]);
     320              : 
     321              :         /* Move last slot into dead child's position */
     322              :         if (thread_num != parallel_jobs - 1)
     323              :         {
     324              :                 void       *tmp_args;
     325              : 
     326              :                 thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
     327              : 
     328              :                 /*
     329              :                  * Move last active thread arg struct into the now-dead slot, and the
     330              :                  * now-dead slot to the end for reuse by the next thread. Though the
     331              :                  * thread struct is in use by another thread, we can safely swap the
     332              :                  * struct pointers within the array.
     333              :                  */
     334              :                 tmp_args = cur_thread_args[thread_num];
     335              :                 cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
     336              :                 cur_thread_args[parallel_jobs - 1] = tmp_args;
     337              :         }
     338              : #endif
     339              : 
     340              :         /* do this after job has been removed */
     341            0 :         parallel_jobs--;
     342              : 
     343            0 :         return true;
     344            0 : }
        

Generated by: LCOV version 2.3.2-1