LCOV - code coverage report
Current view: top level - src/bin/pg_upgrade - task.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 139 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 9 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*
       2              :  * task.c
       3              :  *              framework for parallelizing pg_upgrade's once-in-each-database tasks
       4              :  *
       5              :  * This framework provides an efficient way of running the various
       6              :  * once-in-each-database tasks required by pg_upgrade.  Specifically, it
       7              :  * parallelizes these tasks by managing a set of slots that follow a simple
       8              :  * state machine and by using libpq's asynchronous APIs to establish the
       9              :  * connections and run the queries.  Callers simply need to create a callback
      10              :  * function and build/execute an UpgradeTask.  A simple example follows:
      11              :  *
      12              :  *              static void
      13              :  *              my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
      14              :  *              {
      15              :  *                      for (int i = 0; i < PQntuples(res); i++)
      16              :  *                      {
      17              :  *                              ... process results ...
      18              :  *                      }
      19              :  *              }
      20              :  *
      21              :  *              void
      22              :  *              my_task(ClusterInfo *cluster)
      23              :  *              {
      24              :  *                      UpgradeTask *task = upgrade_task_create();
      25              :  *
      26              :  *                      upgrade_task_add_step(task,
      27              :  *                                                                "... query text ...",
      28              :  *                                                                my_process_cb,
      29              :  *                                                                true,         // let the task free the PGresult
      30              :  *                                                                NULL);        // "arg" pointer for callback
      31              :  *                      upgrade_task_run(task, cluster);
      32              :  *                      upgrade_task_free(task);
      33              :  *              }
      34              :  *
      35              :  * Note that multiple steps can be added to a given task.  When there are
      36              :  * multiple steps, the task will run all of the steps consecutively in the same
      37              :  * database connection before freeing the connection and moving on.  In other
      38              :  * words, it only ever initiates one connection to each database in the
      39              :  * cluster for a given run.
      40              :  *
      41              :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
      42              :  * src/bin/pg_upgrade/task.c
      43              :  */
      44              : 
      45              : #include "postgres_fe.h"
      46              : 
      47              : #include "common/connect.h"
      48              : #include "fe_utils/string_utils.h"
      49              : #include "pg_upgrade.h"
      50              : 
      51              : /*
      52              :  * dbs_complete stores the number of databases that we have completed
      53              :  * processing.  When this value equals the number of databases in the cluster,
      54              :  * the task is finished.
      55              :  */
      56              : static int      dbs_complete;
      57              : 
      58              : /*
      59              :  * dbs_processing stores the index of the next database in the cluster's array
      60              :  * of databases that will be picked up for processing.  It will always be
      61              :  * greater than or equal to dbs_complete.
      62              :  */
      63              : static int      dbs_processing;
      64              : 
      65              : /*
      66              :  * This struct stores the information for a single step of a task.  Note that
      67              :  * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
      68              :  * All steps in a task are run in a single connection before moving on to the
      69              :  * next database (which requires a new connection).
      70              :  */
      71              : typedef struct UpgradeTaskStep
      72              : {
      73              :         UpgradeTaskProcessCB process_cb;        /* processes the results of the query */
      74              :         bool            free_result;    /* should we free the result? */
      75              :         void       *arg;                        /* pointer passed to process_cb */
      76              : } UpgradeTaskStep;
      77              : 
      78              : /*
      79              :  * This struct is a thin wrapper around an array of steps, i.e.,
      80              :  * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
      81              :  */
      82              : struct UpgradeTask
      83              : {
      84              :         UpgradeTaskStep *steps;
      85              :         int                     num_steps;
      86              :         PQExpBuffer queries;
      87              : };
      88              : 
      89              : /*
      90              :  * The different states for a parallel slot.
      91              :  */
      92              : typedef enum UpgradeTaskSlotState
      93              : {
      94              :         FREE,                                           /* slot available for use in a new database */
      95              :         CONNECTING,                                     /* waiting for connection to be established */
      96              :         RUNNING_QUERIES,                        /* running/processing queries in the task */
      97              : } UpgradeTaskSlotState;
      98              : 
      99              : /*
     100              :  * We maintain an array of user_opts.jobs slots to execute the task.
     101              :  */
     102              : typedef struct UpgradeTaskSlot
     103              : {
     104              :         UpgradeTaskSlotState state; /* state of the slot */
     105              :         int                     db_idx;                 /* index of the database assigned to slot */
     106              :         int                     step_idx;               /* index of the current step of task */
     107              :         PGconn     *conn;                       /* current connection managed by slot */
     108              :         bool            ready;                  /* slot is ready for processing */
     109              :         bool            select_mode;    /* select() mode: true->read, false->write */
     110              :         int                     sock;                   /* file descriptor for connection's socket */
     111              : } UpgradeTaskSlot;
     112              : 
     113              : /*
     114              :  * Initializes an UpgradeTask.
     115              :  */
     116              : UpgradeTask *
     117            0 : upgrade_task_create(void)
     118              : {
     119            0 :         UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
     120              : 
     121            0 :         task->queries = createPQExpBuffer();
     122              : 
     123              :         /* All tasks must first set a secure search_path. */
     124            0 :         upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL);
     125              : 
     126            0 :         return task;
     127            0 : }
     128              : 
     129              : /*
     130              :  * Frees all storage associated with an UpgradeTask.
     131              :  */
     132              : void
     133            0 : upgrade_task_free(UpgradeTask *task)
     134              : {
     135            0 :         destroyPQExpBuffer(task->queries);
     136            0 :         pg_free(task->steps);
     137            0 :         pg_free(task);
     138            0 : }
     139              : 
     140              : /*
     141              :  * Adds a step to an UpgradeTask.  The steps will be executed in each database
     142              :  * in the order in which they are added.
     143              :  *
     144              :  *      task: task object that must have been initialized via upgrade_task_create()
     145              :  *      query: the query text
     146              :  *      process_cb: function that processes the results of the query
     147              :  *      free_result: should we free the PGresult, or leave it to the caller?
     148              :  *      arg: pointer to task-specific data that is passed to each callback
     149              :  */
     150              : void
     151            0 : upgrade_task_add_step(UpgradeTask *task, const char *query,
     152              :                                           UpgradeTaskProcessCB process_cb, bool free_result,
     153              :                                           void *arg)
     154              : {
     155            0 :         UpgradeTaskStep *new_step;
     156              : 
     157            0 :         task->steps = pg_realloc(task->steps,
     158            0 :                                                          ++task->num_steps * sizeof(UpgradeTaskStep));
     159              : 
     160            0 :         new_step = &task->steps[task->num_steps - 1];
     161            0 :         new_step->process_cb = process_cb;
     162            0 :         new_step->free_result = free_result;
     163            0 :         new_step->arg = arg;
     164              : 
     165            0 :         appendPQExpBuffer(task->queries, "%s;", query);
     166            0 : }
     167              : 
     168              : /*
     169              :  * Build a connection string for the slot's current database and asynchronously
     170              :  * start a new connection, but do not wait for the connection to be
     171              :  * established.
     172              :  */
     173              : static void
     174            0 : start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
     175              : {
     176            0 :         PQExpBufferData conn_opts;
     177            0 :         DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
     178              : 
     179              :         /* Build connection string with proper quoting */
     180            0 :         initPQExpBuffer(&conn_opts);
     181            0 :         appendPQExpBufferStr(&conn_opts, "dbname=");
     182            0 :         appendConnStrVal(&conn_opts, dbinfo->db_name);
     183            0 :         appendPQExpBufferStr(&conn_opts, " user=");
     184            0 :         appendConnStrVal(&conn_opts, os_info.user);
     185            0 :         appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
     186            0 :         if (cluster->sockdir)
     187              :         {
     188            0 :                 appendPQExpBufferStr(&conn_opts, " host=");
     189            0 :                 appendConnStrVal(&conn_opts, cluster->sockdir);
     190            0 :         }
     191              : 
     192            0 :         slot->conn = PQconnectStart(conn_opts.data);
     193              : 
     194            0 :         if (!slot->conn)
     195            0 :                 pg_fatal("out of memory");
     196              : 
     197            0 :         termPQExpBuffer(&conn_opts);
     198            0 : }
     199              : 
     200              : /*
     201              :  * Run the process_cb callback function to process the result of a query, and
     202              :  * free the result if the caller indicated we should do so.
     203              :  */
     204              : static void
     205            0 : process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
     206              :                                          const UpgradeTask *task)
     207              : {
     208            0 :         UpgradeTaskStep *steps = &task->steps[slot->step_idx];
     209            0 :         UpgradeTaskProcessCB process_cb = steps->process_cb;
     210            0 :         DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
     211            0 :         PGresult   *res = PQgetResult(slot->conn);
     212              : 
     213            0 :         if (PQstatus(slot->conn) == CONNECTION_BAD ||
     214            0 :                 (PQresultStatus(res) != PGRES_TUPLES_OK &&
     215            0 :                  PQresultStatus(res) != PGRES_COMMAND_OK))
     216            0 :                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     217              : 
     218              :         /*
     219              :          * We assume that a NULL process_cb callback function means there's
     220              :          * nothing to process.  This is primarily intended for the initial step in
     221              :          * every task that sets a safe search_path.
     222              :          */
     223            0 :         if (process_cb)
     224            0 :                 (*process_cb) (dbinfo, res, steps->arg);
     225              : 
     226            0 :         if (steps->free_result)
     227            0 :                 PQclear(res);
     228            0 : }
     229              : 
     230              : /*
     231              :  * Advances the state machine for a given slot as necessary.
     232              :  */
     233              : static void
     234            0 : process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
     235              : {
     236            0 :         PostgresPollingStatusType status;
     237              : 
     238            0 :         if (!slot->ready)
     239            0 :                 return;
     240              : 
     241            0 :         switch (slot->state)
     242              :         {
     243              :                 case FREE:
     244              : 
     245              :                         /*
     246              :                          * If all of the databases in the cluster have been processed or
     247              :                          * are currently being processed by other slots, we are done.
     248              :                          */
     249            0 :                         if (dbs_processing >= cluster->dbarr.ndbs)
     250            0 :                                 return;
     251              : 
     252              :                         /*
     253              :                          * Claim the next database in the cluster's array and initiate a
     254              :                          * new connection.
     255              :                          */
     256            0 :                         slot->db_idx = dbs_processing++;
     257            0 :                         slot->state = CONNECTING;
     258            0 :                         start_conn(cluster, slot);
     259              : 
     260            0 :                         return;
     261              : 
     262              :                 case CONNECTING:
     263              : 
     264              :                         /* Check for connection failure. */
     265            0 :                         status = PQconnectPoll(slot->conn);
     266            0 :                         if (status == PGRES_POLLING_FAILED)
     267            0 :                                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     268              : 
     269              :                         /* Check whether the connection is still establishing. */
     270            0 :                         if (status != PGRES_POLLING_OK)
     271              :                         {
     272            0 :                                 slot->select_mode = (status == PGRES_POLLING_READING);
     273            0 :                                 return;
     274              :                         }
     275              : 
     276              :                         /*
     277              :                          * Move on to running/processing the queries in the task.
     278              :                          */
     279            0 :                         slot->state = RUNNING_QUERIES;
     280            0 :                         slot->select_mode = true;    /* wait until ready for reading */
     281            0 :                         if (!PQsendQuery(slot->conn, task->queries->data))
     282            0 :                                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     283              : 
     284            0 :                         return;
     285              : 
     286              :                 case RUNNING_QUERIES:
     287              : 
     288              :                         /*
     289              :                          * Consume any available data and clear the read-ready indicator
     290              :                          * for the connection.
     291              :                          */
     292            0 :                         if (!PQconsumeInput(slot->conn))
     293            0 :                                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     294              : 
     295              :                         /*
     296              :                          * Process any results that are ready so that we can free up this
     297              :                          * slot for another database as soon as possible.
     298              :                          */
     299            0 :                         for (; slot->step_idx < task->num_steps; slot->step_idx++)
     300              :                         {
     301              :                                 /* If no more results are available yet, move on. */
     302            0 :                                 if (PQisBusy(slot->conn))
     303            0 :                                         return;
     304              : 
     305            0 :                                 process_query_result(cluster, slot, task);
     306            0 :                         }
     307              : 
     308              :                         /*
     309              :                          * If we just finished processing the result of the last step in
     310              :                          * the task, free the slot.  We recursively call this function on
     311              :                          * the newly-freed slot so that we can start initiating the next
     312              :                          * connection immediately instead of waiting for the next loop
     313              :                          * through the slots.
     314              :                          */
     315            0 :                         dbs_complete++;
     316            0 :                         PQfinish(slot->conn);
     317            0 :                         memset(slot, 0, sizeof(UpgradeTaskSlot));
     318            0 :                         slot->ready = true;
     319              : 
     320            0 :                         process_slot(cluster, slot, task);
     321              : 
     322            0 :                         return;
     323              :         }
     324            0 : }
     325              : 
     326              : /*
     327              :  * Returns -1 on error, else the number of ready descriptors.
     328              :  */
     329              : static int
     330            0 : select_loop(int maxFd, fd_set *input, fd_set *output)
     331              : {
     332            0 :         fd_set          save_input = *input;
     333            0 :         fd_set          save_output = *output;
     334              : 
     335            0 :         if (maxFd == 0)
     336            0 :                 return 0;
     337              : 
     338            0 :         for (;;)
     339              :         {
     340            0 :                 int                     i;
     341              : 
     342            0 :                 *input = save_input;
     343            0 :                 *output = save_output;
     344              : 
     345            0 :                 i = select(maxFd + 1, input, output, NULL, NULL);
     346              : 
     347              : #ifndef WIN32
     348            0 :                 if (i < 0 && errno == EINTR)
     349            0 :                         continue;
     350              : #else
     351              :                 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
     352              :                         continue;
     353              : #endif
     354            0 :                 return i;
     355            0 :         }
     356            0 : }
     357              : 
     358              : /*
     359              :  * Wait on the slots to either finish connecting or to receive query results if
     360              :  * possible.  This avoids a tight loop in upgrade_task_run().
     361              :  */
     362              : static void
     363            0 : wait_on_slots(UpgradeTaskSlot *slots, int numslots)
     364              : {
     365            0 :         fd_set          input;
     366            0 :         fd_set          output;
     367            0 :         int                     maxFd = 0;
     368              : 
     369            0 :         FD_ZERO(&input);
     370            0 :         FD_ZERO(&output);
     371              : 
     372            0 :         for (int i = 0; i < numslots; i++)
     373              :         {
     374              :                 /*
     375              :                  * We assume the previous call to process_slot() handled everything
     376              :                  * that was marked ready in the previous call to wait_on_slots(), if
     377              :                  * any.
     378              :                  */
     379            0 :                 slots[i].ready = false;
     380              : 
     381              :                 /*
     382              :                  * This function should only ever see free slots as we are finishing
     383              :                  * processing the last few databases, at which point we don't have any
     384              :                  * databases left for them to process.  We'll never use these slots
     385              :                  * again, so we can safely ignore them.
     386              :                  */
     387            0 :                 if (slots[i].state == FREE)
     388            0 :                         continue;
     389              : 
     390              :                 /*
     391              :                  * Add the socket to the set.
     392              :                  */
     393            0 :                 slots[i].sock = PQsocket(slots[i].conn);
     394            0 :                 if (slots[i].sock < 0)
     395            0 :                         pg_fatal("invalid socket");
     396            0 :                 FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
     397            0 :                 maxFd = Max(maxFd, slots[i].sock);
     398            0 :         }
     399              : 
     400              :         /*
     401              :          * If we found socket(s) to wait on, wait.
     402              :          */
     403            0 :         if (select_loop(maxFd, &input, &output) == -1)
     404            0 :                 pg_fatal("%s() failed: %m", "select");
     405              : 
     406              :         /*
     407              :          * Mark which sockets appear to be ready.
     408              :          */
     409            0 :         for (int i = 0; i < numslots; i++)
     410            0 :                 slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
     411            0 :                                                    FD_ISSET(slots[i].sock, &output));
     412            0 : }
     413              : 
     414              : /*
     415              :  * Runs all the steps of the task in every database in the cluster using
     416              :  * user_opts.jobs parallel slots.
     417              :  */
     418              : void
     419            0 : upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
     420              : {
     421            0 :         int                     jobs = Max(1, user_opts.jobs);
     422            0 :         UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
     423              : 
     424            0 :         dbs_complete = 0;
     425            0 :         dbs_processing = 0;
     426              : 
     427              :         /*
     428              :          * Process every slot the first time round.
     429              :          */
     430            0 :         for (int i = 0; i < jobs; i++)
     431            0 :                 slots[i].ready = true;
     432              : 
     433            0 :         while (dbs_complete < cluster->dbarr.ndbs)
     434              :         {
     435            0 :                 for (int i = 0; i < jobs; i++)
     436            0 :                         process_slot(cluster, &slots[i], task);
     437              : 
     438            0 :                 wait_on_slots(slots, jobs);
     439              :         }
     440              : 
     441            0 :         pg_free(slots);
     442            0 : }
        

Generated by: LCOV version 2.3.2-1