LCOV - code coverage report
Current view: top level - src/fe_utils - parallel_slot.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 204 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 14 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  *      parallel_slot.c
       4              :  *              Parallel support for front-end parallel database connections
       5              :  *
       6              :  *
       7              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8              :  * Portions Copyright (c) 1994, Regents of the University of California
       9              :  *
      10              :  * src/fe_utils/parallel_slot.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #if defined(WIN32) && FD_SETSIZE < 1024
      16              : #error FD_SETSIZE needs to have been increased
      17              : #endif
      18              : 
      19              : #include "postgres_fe.h"
      20              : 
      21              : #include <sys/select.h>
      22              : 
      23              : #include "common/logging.h"
      24              : #include "fe_utils/cancel.h"
      25              : #include "fe_utils/parallel_slot.h"
      26              : #include "fe_utils/query_utils.h"
      27              : 
      28              : #define ERRCODE_UNDEFINED_TABLE  "42P01"
      29              : 
      30              : static int      select_loop(int maxFd, fd_set *workerset);
      31              : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
      32              : 
      33              : /*
      34              :  * Process (and delete) a query result.  Returns true if there's no problem,
      35              :  * false otherwise. It's up to the handler to decide what constitutes a
      36              :  * problem.
      37              :  */
      38              : static bool
      39            0 : processQueryResult(ParallelSlot *slot, PGresult *result)
      40              : {
      41            0 :         Assert(slot->handler != NULL);
      42              : 
      43              :         /* On failure, the handler should return NULL after freeing the result */
      44            0 :         if (!slot->handler(result, slot->connection, slot->handler_context))
      45            0 :                 return false;
      46              : 
      47              :         /* Ok, we have to free it ourself */
      48            0 :         PQclear(result);
      49            0 :         return true;
      50            0 : }
      51              : 
      52              : /*
      53              :  * Consume all the results generated for the given connection until
      54              :  * nothing remains.  If at least one error is encountered, return false.
      55              :  * Note that this will block if the connection is busy.
      56              :  */
      57              : static bool
      58            0 : consumeQueryResult(ParallelSlot *slot)
      59              : {
      60            0 :         bool            ok = true;
      61            0 :         PGresult   *result;
      62              : 
      63            0 :         SetCancelConn(slot->connection);
      64            0 :         while ((result = PQgetResult(slot->connection)) != NULL)
      65              :         {
      66            0 :                 if (!processQueryResult(slot, result))
      67            0 :                         ok = false;
      68              :         }
      69            0 :         ResetCancelConn();
      70            0 :         return ok;
      71            0 : }
      72              : 
      73              : /*
      74              :  * Wait until a file descriptor from the given set becomes readable.
      75              :  *
      76              :  * Returns the number of ready descriptors, or -1 on failure (including
      77              :  * getting a cancel request).
      78              :  */
      79              : static int
      80            0 : select_loop(int maxFd, fd_set *workerset)
      81              : {
      82            0 :         int                     i;
      83            0 :         fd_set          saveSet = *workerset;
      84              : 
      85            0 :         if (CancelRequested)
      86            0 :                 return -1;
      87              : 
      88            0 :         for (;;)
      89              :         {
      90              :                 /*
      91              :                  * On Windows, we need to check once in a while for cancel requests;
      92              :                  * on other platforms we rely on select() returning when interrupted.
      93              :                  */
      94            0 :                 struct timeval *tvp;
      95              : #ifdef WIN32
      96              :                 struct timeval tv = {0, 1000000};
      97              : 
      98              :                 tvp = &tv;
      99              : #else
     100            0 :                 tvp = NULL;
     101              : #endif
     102              : 
     103            0 :                 *workerset = saveSet;
     104            0 :                 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
     105              : 
     106              : #ifdef WIN32
     107              :                 if (i == SOCKET_ERROR)
     108              :                 {
     109              :                         i = -1;
     110              : 
     111              :                         if (WSAGetLastError() == WSAEINTR)
     112              :                                 errno = EINTR;
     113              :                 }
     114              : #endif
     115              : 
     116            0 :                 if (i < 0 && errno == EINTR)
     117            0 :                         continue;                       /* ignore this */
     118            0 :                 if (i < 0 || CancelRequested)
     119            0 :                         return -1;                      /* but not this */
     120            0 :                 if (i == 0)
     121            0 :                         continue;                       /* timeout (Win32 only) */
     122            0 :                 break;
     123            0 :         }
     124              : 
     125            0 :         return i;
     126            0 : }
     127              : 
     128              : /*
     129              :  * Return the offset of a suitable idle slot, or -1 if none are available.  If
     130              :  * the given dbname is not null, only idle slots connected to the given
     131              :  * database are considered suitable, otherwise all idle connected slots are
     132              :  * considered suitable.
     133              :  */
     134              : static int
     135            0 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
     136              : {
     137            0 :         int                     i;
     138              : 
     139            0 :         for (i = 0; i < sa->numslots; i++)
     140              :         {
     141            0 :                 if (sa->slots[i].inUse)
     142            0 :                         continue;
     143              : 
     144            0 :                 if (sa->slots[i].connection == NULL)
     145            0 :                         continue;
     146              : 
     147            0 :                 if (dbname == NULL ||
     148            0 :                         strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
     149            0 :                         return i;
     150            0 :         }
     151            0 :         return -1;
     152            0 : }
     153              : 
     154              : /*
     155              :  * Return the offset of the first slot without a database connection, or -1 if
     156              :  * all slots are connected.
     157              :  */
     158              : static int
     159            0 : find_unconnected_slot(const ParallelSlotArray *sa)
     160              : {
     161            0 :         int                     i;
     162              : 
     163            0 :         for (i = 0; i < sa->numslots; i++)
     164              :         {
     165            0 :                 if (sa->slots[i].inUse)
     166            0 :                         continue;
     167              : 
     168            0 :                 if (sa->slots[i].connection == NULL)
     169            0 :                         return i;
     170            0 :         }
     171              : 
     172            0 :         return -1;
     173            0 : }
     174              : 
     175              : /*
     176              :  * Return the offset of the first idle slot, or -1 if all slots are busy.
     177              :  */
     178              : static int
     179            0 : find_any_idle_slot(const ParallelSlotArray *sa)
     180              : {
     181            0 :         int                     i;
     182              : 
     183            0 :         for (i = 0; i < sa->numslots; i++)
     184            0 :                 if (!sa->slots[i].inUse)
     185            0 :                         return i;
     186              : 
     187            0 :         return -1;
     188            0 : }
     189              : 
     190              : /*
     191              :  * Wait for any slot's connection to have query results, consume the results,
     192              :  * and update the slot's status as appropriate.  Returns true on success,
     193              :  * false on cancellation, on error, or if no slots are connected.
     194              :  */
     195              : static bool
     196            0 : wait_on_slots(ParallelSlotArray *sa)
     197              : {
     198            0 :         int                     i;
     199            0 :         fd_set          slotset;
     200            0 :         int                     maxFd = 0;
     201            0 :         PGconn     *cancelconn = NULL;
     202              : 
     203              :         /* We must reconstruct the fd_set for each call to select_loop */
     204            0 :         FD_ZERO(&slotset);
     205              : 
     206            0 :         for (i = 0; i < sa->numslots; i++)
     207              :         {
     208            0 :                 int                     sock;
     209              : 
     210              :                 /* We shouldn't get here if we still have slots without connections */
     211            0 :                 Assert(sa->slots[i].connection != NULL);
     212              : 
     213            0 :                 sock = PQsocket(sa->slots[i].connection);
     214              : 
     215              :                 /*
     216              :                  * We don't really expect any connections to lose their sockets after
     217              :                  * startup, but just in case, cope by ignoring them.
     218              :                  */
     219            0 :                 if (sock < 0)
     220            0 :                         continue;
     221              : 
     222              :                 /* Keep track of the first valid connection we see. */
     223            0 :                 if (cancelconn == NULL)
     224            0 :                         cancelconn = sa->slots[i].connection;
     225              : 
     226            0 :                 FD_SET(sock, &slotset);
     227            0 :                 if (sock > maxFd)
     228            0 :                         maxFd = sock;
     229            0 :         }
     230              : 
     231              :         /*
     232              :          * If we get this far with no valid connections, processing cannot
     233              :          * continue.
     234              :          */
     235            0 :         if (cancelconn == NULL)
     236            0 :                 return false;
     237              : 
     238            0 :         SetCancelConn(cancelconn);
     239            0 :         i = select_loop(maxFd, &slotset);
     240            0 :         ResetCancelConn();
     241              : 
     242              :         /* failure? */
     243            0 :         if (i < 0)
     244            0 :                 return false;
     245              : 
     246            0 :         for (i = 0; i < sa->numslots; i++)
     247              :         {
     248            0 :                 int                     sock;
     249              : 
     250            0 :                 sock = PQsocket(sa->slots[i].connection);
     251              : 
     252            0 :                 if (sock >= 0 && FD_ISSET(sock, &slotset))
     253              :                 {
     254              :                         /* select() says input is available, so consume it */
     255            0 :                         PQconsumeInput(sa->slots[i].connection);
     256            0 :                 }
     257              : 
     258              :                 /* Collect result(s) as long as any are available */
     259            0 :                 while (!PQisBusy(sa->slots[i].connection))
     260              :                 {
     261            0 :                         PGresult   *result = PQgetResult(sa->slots[i].connection);
     262              : 
     263            0 :                         if (result != NULL)
     264              :                         {
     265              :                                 /* Handle and discard the command result */
     266            0 :                                 if (!processQueryResult(&sa->slots[i], result))
     267            0 :                                         return false;
     268            0 :                         }
     269              :                         else
     270              :                         {
     271              :                                 /* This connection has become idle */
     272            0 :                                 ParallelSlotSetIdle(&sa->slots[i]);
     273            0 :                                 break;
     274              :                         }
     275            0 :                 }
     276            0 :         }
     277            0 :         return true;
     278            0 : }
     279              : 
     280              : /*
     281              :  * Open a new database connection using the stored connection parameters and
     282              :  * optionally a given dbname if not null, execute the stored initial command if
     283              :  * any, and associate the new connection with the given slot.
     284              :  */
     285              : static void
     286            0 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
     287              : {
     288            0 :         const char *old_override;
     289            0 :         ParallelSlot *slot = &sa->slots[slotno];
     290              : 
     291            0 :         old_override = sa->cparams->override_dbname;
     292            0 :         if (dbname)
     293            0 :                 sa->cparams->override_dbname = dbname;
     294            0 :         slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
     295            0 :         sa->cparams->override_dbname = old_override;
     296              : 
     297              :         /*
     298              :          * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
     299              :          * FD_SET() and allied macros.  Windows defines it as a ceiling on the
     300              :          * count of file descriptors in the set, not a ceiling on the value of
     301              :          * each file descriptor; see
     302              :          * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
     303              :          * and
     304              :          * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
     305              :          * We can't ignore that, because Windows starts file descriptors at a
     306              :          * higher value, delays reuse, and skips values.  With less than ten
     307              :          * concurrent file descriptors, opened and closed rapidly, one can reach
     308              :          * file descriptor 1024.
     309              :          *
     310              :          * Doing a hard exit here is a bit grotty, but it doesn't seem worth
     311              :          * complicating the API to make it less grotty.
     312              :          */
     313              : #ifdef WIN32
     314              :         if (slotno >= FD_SETSIZE)
     315              :         {
     316              :                 pg_log_error("too many jobs for this platform: %d", slotno);
     317              :                 exit(1);
     318              :         }
     319              : #else
     320              :         {
     321            0 :                 int                     fd = PQsocket(slot->connection);
     322              : 
     323            0 :                 if (fd >= FD_SETSIZE)
     324              :                 {
     325            0 :                         pg_log_error("socket file descriptor out of range for select(): %d",
     326              :                                                  fd);
     327            0 :                         pg_log_error_hint("Try fewer jobs.");
     328            0 :                         exit(1);
     329              :                 }
     330            0 :         }
     331              : #endif
     332              : 
     333              :         /* Setup the connection using the supplied command, if any. */
     334            0 :         if (sa->initcmd)
     335            0 :                 executeCommand(slot->connection, sa->initcmd, sa->echo);
     336            0 : }
     337              : 
     338              : /*
     339              :  * ParallelSlotsGetIdle
     340              :  *              Return a connection slot that is ready to execute a command.
     341              :  *
     342              :  * The slot returned is chosen as follows:
     343              :  *
     344              :  * If any idle slot already has an open connection, and if either dbname is
     345              :  * null or the existing connection is to the given database, that slot will be
     346              :  * returned allowing the connection to be reused.
     347              :  *
     348              :  * Otherwise, if any idle slot is not yet connected to any database, the slot
     349              :  * will be returned with its connection opened using the stored cparams and
     350              :  * optionally the given dbname if not null.
     351              :  *
     352              :  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
     353              :  * after having its connection disconnected and reconnected using the stored
     354              :  * cparams and optionally the given dbname if not null.
     355              :  *
     356              :  * Otherwise, if any slots have connections that are busy, we loop on select()
     357              :  * until one socket becomes available.  When this happens, we read the whole
     358              :  * set and mark as free all sockets that become available.  We then select a
     359              :  * slot using the same rules as above.
     360              :  *
     361              :  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
     362              :  *
     363              :  * For any connection created, if the stored initcmd is not null, it will be
     364              :  * executed as a command on the newly formed connection before the slot is
     365              :  * returned.
     366              :  *
     367              :  * If an error occurs, NULL is returned.
     368              :  */
     369              : ParallelSlot *
     370            0 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
     371              : {
     372            0 :         int                     offset;
     373              : 
     374            0 :         Assert(sa);
     375            0 :         Assert(sa->numslots > 0);
     376              : 
     377            0 :         while (1)
     378              :         {
     379              :                 /* First choice: a slot already connected to the desired database. */
     380            0 :                 offset = find_matching_idle_slot(sa, dbname);
     381            0 :                 if (offset >= 0)
     382              :                 {
     383            0 :                         sa->slots[offset].inUse = true;
     384            0 :                         return &sa->slots[offset];
     385              :                 }
     386              : 
     387              :                 /* Second choice: a slot not connected to any database. */
     388            0 :                 offset = find_unconnected_slot(sa);
     389            0 :                 if (offset >= 0)
     390              :                 {
     391            0 :                         connect_slot(sa, offset, dbname);
     392            0 :                         sa->slots[offset].inUse = true;
     393            0 :                         return &sa->slots[offset];
     394              :                 }
     395              : 
     396              :                 /* Third choice: a slot connected to the wrong database. */
     397            0 :                 offset = find_any_idle_slot(sa);
     398            0 :                 if (offset >= 0)
     399              :                 {
     400            0 :                         disconnectDatabase(sa->slots[offset].connection);
     401            0 :                         sa->slots[offset].connection = NULL;
     402            0 :                         connect_slot(sa, offset, dbname);
     403            0 :                         sa->slots[offset].inUse = true;
     404            0 :                         return &sa->slots[offset];
     405              :                 }
     406              : 
     407              :                 /*
     408              :                  * Fourth choice: block until one or more slots become available. If
     409              :                  * any slots hit a fatal error, we'll find out about that here and
     410              :                  * return NULL.
     411              :                  */
     412            0 :                 if (!wait_on_slots(sa))
     413            0 :                         return NULL;
     414              :         }
     415            0 : }
     416              : 
     417              : /*
     418              :  * ParallelSlotsSetup
     419              :  *              Prepare a set of parallel slots but do not connect to any database.
     420              :  *
     421              :  * This creates and initializes a set of slots, marking all parallel slots as
     422              :  * free and ready to use.  Establishing connections is delayed until requesting
     423              :  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
     424              :  * use and must remain valid for the lifetime of the returned array.
     425              :  */
     426              : ParallelSlotArray *
     427            0 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
     428              :                                    bool echo, const char *initcmd)
     429              : {
     430            0 :         ParallelSlotArray *sa;
     431              : 
     432            0 :         Assert(numslots > 0);
     433            0 :         Assert(cparams != NULL);
     434            0 :         Assert(progname != NULL);
     435              : 
     436            0 :         sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
     437            0 :                                                                            numslots * sizeof(ParallelSlot));
     438              : 
     439            0 :         sa->numslots = numslots;
     440            0 :         sa->cparams = cparams;
     441            0 :         sa->progname = progname;
     442            0 :         sa->echo = echo;
     443            0 :         sa->initcmd = initcmd;
     444              : 
     445            0 :         return sa;
     446            0 : }
     447              : 
     448              : /*
     449              :  * ParallelSlotsAdoptConn
     450              :  *              Assign an open connection to the slots array for reuse.
     451              :  *
     452              :  * This turns over ownership of an open connection to a slots array.  The
     453              :  * caller should not further use or close the connection.  All the connection's
     454              :  * parameters (user, host, port, etc.) except possibly dbname should match
     455              :  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
     456              :  * these parameters differ, subsequent behavior is undefined.
     457              :  */
     458              : void
     459            0 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
     460              : {
     461            0 :         int                     offset;
     462              : 
     463            0 :         offset = find_unconnected_slot(sa);
     464            0 :         if (offset >= 0)
     465            0 :                 sa->slots[offset].connection = conn;
     466              :         else
     467            0 :                 disconnectDatabase(conn);
     468            0 : }
     469              : 
     470              : /*
     471              :  * ParallelSlotsTerminate
     472              :  *              Clean up a set of parallel slots
     473              :  *
     474              :  * Iterate through all connections in a given set of ParallelSlots and
     475              :  * terminate all connections.
     476              :  */
     477              : void
     478            0 : ParallelSlotsTerminate(ParallelSlotArray *sa)
     479              : {
     480            0 :         int                     i;
     481              : 
     482            0 :         for (i = 0; i < sa->numslots; i++)
     483              :         {
     484            0 :                 PGconn     *conn = sa->slots[i].connection;
     485              : 
     486            0 :                 if (conn == NULL)
     487            0 :                         continue;
     488              : 
     489            0 :                 disconnectDatabase(conn);
     490            0 :         }
     491            0 : }
     492              : 
     493              : /*
     494              :  * ParallelSlotsWaitCompletion
     495              :  *
     496              :  * Wait for all connections to finish, returning false if at least one
     497              :  * error has been found on the way.
     498              :  */
     499              : bool
     500            0 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
     501              : {
     502            0 :         int                     i;
     503              : 
     504            0 :         for (i = 0; i < sa->numslots; i++)
     505              :         {
     506            0 :                 if (sa->slots[i].connection == NULL)
     507            0 :                         continue;
     508            0 :                 if (!consumeQueryResult(&sa->slots[i]))
     509            0 :                         return false;
     510              :                 /* Mark connection as idle */
     511            0 :                 ParallelSlotSetIdle(&sa->slots[i]);
     512            0 :         }
     513              : 
     514            0 :         return true;
     515            0 : }
     516              : 
     517              : /*
     518              :  * TableCommandResultHandler
     519              :  *
     520              :  * ParallelSlotResultHandler for results of commands (not queries) against
     521              :  * tables.
     522              :  *
     523              :  * Requires that the result status is either PGRES_COMMAND_OK or an error about
     524              :  * a missing table.  This is useful for utilities that compile a list of tables
     525              :  * to process and then run commands (vacuum, reindex, or whatever) against
     526              :  * those tables, as there is a race condition between the time the list is
     527              :  * compiled and the time the command attempts to open the table.
     528              :  *
     529              :  * For missing tables, logs an error but allows processing to continue.
     530              :  *
     531              :  * For all other errors, logs an error and terminates further processing.
     532              :  *
     533              :  * res: PGresult from the query executed on the slot's connection
     534              :  * conn: connection belonging to the slot
     535              :  * context: unused
     536              :  */
     537              : bool
     538            0 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
     539              : {
     540            0 :         Assert(res != NULL);
     541            0 :         Assert(conn != NULL);
     542              : 
     543              :         /*
     544              :          * If it's an error, report it.  Errors about a missing table are harmless
     545              :          * so we continue processing; but die for other errors.
     546              :          */
     547            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     548              :         {
     549            0 :                 char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     550              : 
     551            0 :                 pg_log_error("processing of database \"%s\" failed: %s",
     552              :                                          PQdb(conn), PQerrorMessage(conn));
     553              : 
     554            0 :                 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
     555              :                 {
     556            0 :                         PQclear(res);
     557            0 :                         return false;
     558              :                 }
     559            0 :         }
     560              : 
     561            0 :         return true;
     562            0 : }
        

Generated by: LCOV version 2.3.2-1