LCOV - code coverage report
Current view: top level - src/backend/storage/aio - method_worker.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 82.9 % 240 199
Test Date: 2026-01-26 10:56:24 Functions: 93.8 % 16 15
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 51.9 % 108 56

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * method_worker.c
       4                 :             :  *    AIO - perform AIO using worker processes
       5                 :             :  *
       6                 :             :  * IO workers consume IOs from a shared memory submission queue, run
       7                 :             :  * traditional synchronous system calls, and perform the shared completion
       8                 :             :  * handling immediately.  Client code submits most requests by pushing IOs
       9                 :             :  * into the submission queue, and waits (if necessary) using condition
      10                 :             :  * variables.  Some IOs cannot be performed in another process due to lack of
      11                 :             :  * infrastructure for reopening the file, and must processed synchronously by
      12                 :             :  * the client code when submitted.
      13                 :             :  *
      14                 :             :  * So that the submitter can make just one system call when submitting a batch
      15                 :             :  * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
      16                 :             :  * could be improved by using futexes instead of latches to wake N waiters.
      17                 :             :  *
      18                 :             :  * This method of AIO is available in all builds on all operating systems, and
      19                 :             :  * is the default.
      20                 :             :  *
      21                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      22                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
      23                 :             :  *
      24                 :             :  * IDENTIFICATION
      25                 :             :  *        src/backend/storage/aio/method_worker.c
      26                 :             :  *
      27                 :             :  *-------------------------------------------------------------------------
      28                 :             :  */
      29                 :             : 
      30                 :             : #include "postgres.h"
      31                 :             : 
      32                 :             : #include "libpq/pqsignal.h"
      33                 :             : #include "miscadmin.h"
      34                 :             : #include "port/pg_bitutils.h"
      35                 :             : #include "postmaster/auxprocess.h"
      36                 :             : #include "postmaster/interrupt.h"
      37                 :             : #include "storage/aio.h"
      38                 :             : #include "storage/aio_internal.h"
      39                 :             : #include "storage/aio_subsys.h"
      40                 :             : #include "storage/io_worker.h"
      41                 :             : #include "storage/ipc.h"
      42                 :             : #include "storage/latch.h"
      43                 :             : #include "storage/proc.h"
      44                 :             : #include "tcop/tcopprot.h"
      45                 :             : #include "utils/injection_point.h"
      46                 :             : #include "utils/memdebug.h"
      47                 :             : #include "utils/ps_status.h"
      48                 :             : #include "utils/wait_event.h"
      49                 :             : 
      50                 :             : 
      51                 :             : /* How many workers should each worker wake up if needed? */
      52                 :             : #define IO_WORKER_WAKEUP_FANOUT 2
      53                 :             : 
      54                 :             : 
      55                 :             : typedef struct PgAioWorkerSubmissionQueue
      56                 :             : {
      57                 :             :         uint32          size;
      58                 :             :         uint32          head;
      59                 :             :         uint32          tail;
      60                 :             :         int                     sqes[FLEXIBLE_ARRAY_MEMBER];
      61                 :             : } PgAioWorkerSubmissionQueue;
      62                 :             : 
      63                 :             : typedef struct PgAioWorkerSlot
      64                 :             : {
      65                 :             :         Latch      *latch;
      66                 :             :         bool            in_use;
      67                 :             : } PgAioWorkerSlot;
      68                 :             : 
      69                 :             : typedef struct PgAioWorkerControl
      70                 :             : {
      71                 :             :         uint64          idle_worker_mask;
      72                 :             :         PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
      73                 :             : } PgAioWorkerControl;
      74                 :             : 
      75                 :             : 
      76                 :             : static size_t pgaio_worker_shmem_size(void);
      77                 :             : static void pgaio_worker_shmem_init(bool first_time);
      78                 :             : 
      79                 :             : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
      80                 :             : static int      pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
      81                 :             : 
      82                 :             : 
      83                 :             : const IoMethodOps pgaio_worker_ops = {
      84                 :             :         .shmem_size = pgaio_worker_shmem_size,
      85                 :             :         .shmem_init = pgaio_worker_shmem_init,
      86                 :             : 
      87                 :             :         .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
      88                 :             :         .submit = pgaio_worker_submit,
      89                 :             : };
      90                 :             : 
      91                 :             : 
      92                 :             : /* GUCs */
      93                 :             : int                     io_workers = 3;
      94                 :             : 
      95                 :             : 
      96                 :             : static int      io_worker_queue_size = 64;
      97                 :             : static int      MyIoWorkerId;
      98                 :             : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
      99                 :             : static PgAioWorkerControl *io_worker_control;
     100                 :             : 
     101                 :             : 
     102                 :             : static size_t
     103                 :          15 : pgaio_worker_queue_shmem_size(int *queue_size)
     104                 :             : {
     105                 :             :         /* Round size up to next power of two so we can make a mask. */
     106                 :          15 :         *queue_size = pg_nextpower2_32(io_worker_queue_size);
     107                 :             : 
     108                 :          15 :         return offsetof(PgAioWorkerSubmissionQueue, sqes) +
     109                 :          15 :                 sizeof(int) * *queue_size;
     110                 :             : }
     111                 :             : 
     112                 :             : static size_t
     113                 :          15 : pgaio_worker_control_shmem_size(void)
     114                 :             : {
     115                 :          15 :         return offsetof(PgAioWorkerControl, workers) +
     116                 :             :                 sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
     117                 :             : }
     118                 :             : 
     119                 :             : static size_t
     120                 :           9 : pgaio_worker_shmem_size(void)
     121                 :             : {
     122                 :           9 :         size_t          sz;
     123                 :           9 :         int                     queue_size;
     124                 :             : 
     125                 :           9 :         sz = pgaio_worker_queue_shmem_size(&queue_size);
     126                 :           9 :         sz = add_size(sz, pgaio_worker_control_shmem_size());
     127                 :             : 
     128                 :          18 :         return sz;
     129                 :           9 : }
     130                 :             : 
     131                 :             : static void
     132                 :           6 : pgaio_worker_shmem_init(bool first_time)
     133                 :             : {
     134                 :           6 :         bool            found;
     135                 :           6 :         int                     queue_size;
     136                 :             : 
     137                 :           6 :         io_worker_submission_queue =
     138                 :           6 :                 ShmemInitStruct("AioWorkerSubmissionQueue",
     139                 :           6 :                                                 pgaio_worker_queue_shmem_size(&queue_size),
     140                 :             :                                                 &found);
     141         [ -  + ]:           6 :         if (!found)
     142                 :             :         {
     143                 :           6 :                 io_worker_submission_queue->size = queue_size;
     144                 :           6 :                 io_worker_submission_queue->head = 0;
     145                 :           6 :                 io_worker_submission_queue->tail = 0;
     146                 :           6 :         }
     147                 :             : 
     148                 :           6 :         io_worker_control =
     149                 :           6 :                 ShmemInitStruct("AioWorkerControl",
     150                 :           6 :                                                 pgaio_worker_control_shmem_size(),
     151                 :             :                                                 &found);
     152         [ -  + ]:           6 :         if (!found)
     153                 :             :         {
     154                 :           6 :                 io_worker_control->idle_worker_mask = 0;
     155         [ +  + ]:         198 :                 for (int i = 0; i < MAX_IO_WORKERS; ++i)
     156                 :             :                 {
     157                 :         192 :                         io_worker_control->workers[i].latch = NULL;
     158                 :         192 :                         io_worker_control->workers[i].in_use = false;
     159                 :         192 :                 }
     160                 :           6 :         }
     161                 :           6 : }
     162                 :             : 
     163                 :             : static int
     164                 :         655 : pgaio_worker_choose_idle(void)
     165                 :             : {
     166                 :         655 :         int                     worker;
     167                 :             : 
     168         [ +  - ]:         655 :         if (io_worker_control->idle_worker_mask == 0)
     169                 :           0 :                 return -1;
     170                 :             : 
     171                 :             :         /* Find the lowest bit position, and clear it. */
     172                 :         655 :         worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
     173                 :         655 :         io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
     174         [ +  - ]:         655 :         Assert(io_worker_control->workers[worker].in_use);
     175                 :             : 
     176                 :         655 :         return worker;
     177                 :         655 : }
     178                 :             : 
     179                 :             : static bool
     180                 :         658 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
     181                 :             : {
     182                 :         658 :         PgAioWorkerSubmissionQueue *queue;
     183                 :         658 :         uint32          new_head;
     184                 :             : 
     185                 :         658 :         queue = io_worker_submission_queue;
     186                 :         658 :         new_head = (queue->head + 1) & (queue->size - 1);
     187         [ -  + ]:         658 :         if (new_head == queue->tail)
     188                 :             :         {
     189   [ #  #  #  # ]:           0 :                 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
     190                 :             :                                         io_worker_submission_queue->size);
     191                 :           0 :                 return false;                   /* full */
     192                 :             :         }
     193                 :             : 
     194                 :         658 :         queue->sqes[queue->head] = pgaio_io_get_id(ioh);
     195                 :         658 :         queue->head = new_head;
     196                 :             : 
     197                 :         658 :         return true;
     198                 :         658 : }
     199                 :             : 
     200                 :             : static int
     201                 :         583 : pgaio_worker_submission_queue_consume(void)
     202                 :             : {
     203                 :         583 :         PgAioWorkerSubmissionQueue *queue;
     204                 :         583 :         int                     result;
     205                 :             : 
     206                 :         583 :         queue = io_worker_submission_queue;
     207         [ +  + ]:         583 :         if (queue->tail == queue->head)
     208                 :         296 :                 return -1;                              /* empty */
     209                 :             : 
     210                 :         287 :         result = queue->sqes[queue->tail];
     211                 :         287 :         queue->tail = (queue->tail + 1) & (queue->size - 1);
     212                 :             : 
     213                 :         287 :         return result;
     214                 :         583 : }
     215                 :             : 
     216                 :             : static uint32
     217                 :         574 : pgaio_worker_submission_queue_depth(void)
     218                 :             : {
     219                 :         574 :         uint32          head;
     220                 :         574 :         uint32          tail;
     221                 :             : 
     222                 :         574 :         head = io_worker_submission_queue->head;
     223                 :         574 :         tail = io_worker_submission_queue->tail;
     224                 :             : 
     225         [ +  - ]:         574 :         if (tail > head)
     226                 :           0 :                 head += io_worker_submission_queue->size;
     227                 :             : 
     228         [ +  - ]:         574 :         Assert(head >= tail);
     229                 :             : 
     230                 :        1148 :         return head - tail;
     231                 :         574 : }
     232                 :             : 
     233                 :             : static bool
     234                 :        1744 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
     235                 :             : {
     236                 :        1744 :         return
     237                 :        1744 :                 !IsUnderPostmaster
     238         [ +  + ]:        1744 :                 || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
     239         [ +  + ]:        1694 :                 || !pgaio_io_can_reopen(ioh);
     240                 :             : }
     241                 :             : 
     242                 :             : static void
     243                 :         654 : pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
     244                 :             : {
     245                 :         654 :         PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
     246                 :         654 :         int                     nsync = 0;
     247                 :         654 :         Latch      *wakeup = NULL;
     248                 :         654 :         int                     worker;
     249                 :             : 
     250         [ +  - ]:         654 :         Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
     251                 :             : 
     252                 :         654 :         LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     253         [ +  + ]:        1312 :         for (int i = 0; i < num_staged_ios; ++i)
     254                 :             :         {
     255         [ +  - ]:         658 :                 Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
     256         [ +  - ]:         658 :                 if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
     257                 :             :                 {
     258                 :             :                         /*
     259                 :             :                          * We'll do it synchronously, but only after we've sent as many as
     260                 :             :                          * we can to workers, to maximize concurrency.
     261                 :             :                          */
     262                 :           0 :                         synchronous_ios[nsync++] = staged_ios[i];
     263                 :           0 :                         continue;
     264                 :             :                 }
     265                 :             : 
     266         [ +  + ]:         658 :                 if (wakeup == NULL)
     267                 :             :                 {
     268                 :             :                         /* Choose an idle worker to wake up if we haven't already. */
     269                 :         654 :                         worker = pgaio_worker_choose_idle();
     270         [ -  + ]:         654 :                         if (worker >= 0)
     271                 :         654 :                                 wakeup = io_worker_control->workers[worker].latch;
     272                 :             : 
     273   [ -  +  -  + ]:         654 :                         pgaio_debug_io(DEBUG4, staged_ios[i],
     274                 :             :                                                    "choosing worker %d",
     275                 :             :                                                    worker);
     276                 :         654 :                 }
     277                 :         658 :         }
     278                 :         654 :         LWLockRelease(AioWorkerSubmissionQueueLock);
     279                 :             : 
     280         [ -  + ]:         654 :         if (wakeup)
     281                 :         654 :                 SetLatch(wakeup);
     282                 :             : 
     283                 :             :         /* Run whatever is left synchronously. */
     284         [ +  - ]:         654 :         if (nsync > 0)
     285                 :             :         {
     286         [ #  # ]:           0 :                 for (int i = 0; i < nsync; ++i)
     287                 :             :                 {
     288                 :           0 :                         pgaio_io_perform_synchronously(synchronous_ios[i]);
     289                 :           0 :                 }
     290                 :           0 :         }
     291                 :         654 : }
     292                 :             : 
     293                 :             : static int
     294                 :         654 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
     295                 :             : {
     296         [ +  + ]:        1312 :         for (int i = 0; i < num_staged_ios; i++)
     297                 :             :         {
     298                 :         658 :                 PgAioHandle *ioh = staged_ios[i];
     299                 :             : 
     300                 :         658 :                 pgaio_io_prepare_submit(ioh);
     301                 :         658 :         }
     302                 :             : 
     303                 :         654 :         pgaio_worker_submit_internal(num_staged_ios, staged_ios);
     304                 :             : 
     305                 :         654 :         return num_staged_ios;
     306                 :             : }
     307                 :             : 
     308                 :             : /*
     309                 :             :  * on_shmem_exit() callback that releases the worker's slot in
     310                 :             :  * io_worker_control.
     311                 :             :  */
     312                 :             : static void
     313                 :           3 : pgaio_worker_die(int code, Datum arg)
     314                 :             : {
     315                 :           3 :         LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     316         [ +  - ]:           3 :         Assert(io_worker_control->workers[MyIoWorkerId].in_use);
     317         [ +  - ]:           3 :         Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
     318                 :             : 
     319                 :           3 :         io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
     320                 :           3 :         io_worker_control->workers[MyIoWorkerId].in_use = false;
     321                 :           3 :         io_worker_control->workers[MyIoWorkerId].latch = NULL;
     322                 :           3 :         LWLockRelease(AioWorkerSubmissionQueueLock);
     323                 :           3 : }
     324                 :             : 
     325                 :             : /*
     326                 :             :  * Register the worker in shared memory, assign MyIoWorkerId and register a
     327                 :             :  * shutdown callback to release registration.
     328                 :             :  */
     329                 :             : static void
     330                 :           3 : pgaio_worker_register(void)
     331                 :             : {
     332                 :           3 :         MyIoWorkerId = -1;
     333                 :             : 
     334                 :             :         /*
     335                 :             :          * XXX: This could do with more fine-grained locking. But it's also not
     336                 :             :          * very common for the number of workers to change at the moment...
     337                 :             :          */
     338                 :           3 :         LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     339                 :             : 
     340         [ +  - ]:           9 :         for (int i = 0; i < MAX_IO_WORKERS; ++i)
     341                 :             :         {
     342         [ +  + ]:           6 :                 if (!io_worker_control->workers[i].in_use)
     343                 :             :                 {
     344         [ +  - ]:           3 :                         Assert(io_worker_control->workers[i].latch == NULL);
     345                 :           3 :                         io_worker_control->workers[i].in_use = true;
     346                 :           3 :                         MyIoWorkerId = i;
     347                 :           3 :                         break;
     348                 :             :                 }
     349                 :             :                 else
     350         [ +  - ]:           3 :                         Assert(io_worker_control->workers[i].latch != NULL);
     351                 :           3 :         }
     352                 :             : 
     353         [ +  - ]:           3 :         if (MyIoWorkerId == -1)
     354   [ #  #  #  # ]:           0 :                 elog(ERROR, "couldn't find a free worker slot");
     355                 :             : 
     356                 :           3 :         io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
     357                 :           3 :         io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
     358                 :           3 :         LWLockRelease(AioWorkerSubmissionQueueLock);
     359                 :             : 
     360                 :           3 :         on_shmem_exit(pgaio_worker_die, 0);
     361                 :           3 : }
     362                 :             : 
     363                 :             : static void
     364                 :           0 : pgaio_worker_error_callback(void *arg)
     365                 :             : {
     366                 :           0 :         ProcNumber      owner;
     367                 :           0 :         PGPROC     *owner_proc;
     368                 :           0 :         int32           owner_pid;
     369                 :           0 :         PgAioHandle *ioh = arg;
     370                 :             : 
     371         [ #  # ]:           0 :         if (!ioh)
     372                 :           0 :                 return;
     373                 :             : 
     374         [ #  # ]:           0 :         Assert(ioh->owner_procno != MyProcNumber);
     375         [ #  # ]:           0 :         Assert(MyBackendType == B_IO_WORKER);
     376                 :             : 
     377                 :           0 :         owner = ioh->owner_procno;
     378                 :           0 :         owner_proc = GetPGProcByNumber(owner);
     379                 :           0 :         owner_pid = owner_proc->pid;
     380                 :             : 
     381                 :           0 :         errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
     382         [ #  # ]:           0 : }
     383                 :             : 
     384                 :             : void
     385                 :           3 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
     386                 :             : {
     387                 :           3 :         sigjmp_buf      local_sigjmp_buf;
     388                 :           3 :         PgAioHandle *volatile error_ioh = NULL;
     389                 :           3 :         ErrorContextCallback errcallback = {0};
     390                 :           3 :         volatile int error_errno = 0;
     391                 :           3 :         char            cmd[128];
     392                 :             : 
     393                 :           3 :         MyBackendType = B_IO_WORKER;
     394                 :           3 :         AuxiliaryProcessMainCommon();
     395                 :             : 
     396                 :           3 :         pqsignal(SIGHUP, SignalHandlerForConfigReload);
     397                 :           3 :         pqsignal(SIGINT, die);          /* to allow manually triggering worker restart */
     398                 :             : 
     399                 :             :         /*
     400                 :             :          * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
     401                 :             :          * shutdown sequence, similar to checkpointer.
     402                 :             :          */
     403                 :           3 :         pqsignal(SIGTERM, SIG_IGN);
     404                 :             :         /* SIGQUIT handler was already set up by InitPostmasterChild */
     405                 :           3 :         pqsignal(SIGALRM, SIG_IGN);
     406                 :           3 :         pqsignal(SIGPIPE, SIG_IGN);
     407                 :           3 :         pqsignal(SIGUSR1, procsignal_sigusr1_handler);
     408                 :           3 :         pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
     409                 :             : 
     410                 :             :         /* also registers a shutdown callback to unregister */
     411                 :           3 :         pgaio_worker_register();
     412                 :             : 
     413                 :           3 :         sprintf(cmd, "%d", MyIoWorkerId);
     414                 :           3 :         set_ps_display(cmd);
     415                 :             : 
     416                 :           3 :         errcallback.callback = pgaio_worker_error_callback;
     417                 :           3 :         errcallback.previous = error_context_stack;
     418                 :           3 :         error_context_stack = &errcallback;
     419                 :             : 
     420                 :             :         /* see PostgresMain() */
     421         [ -  + ]:           3 :         if (sigsetjmp(local_sigjmp_buf, 1) != 0)
     422                 :             :         {
     423                 :           0 :                 error_context_stack = NULL;
     424                 :           0 :                 HOLD_INTERRUPTS();
     425                 :             : 
     426                 :           0 :                 EmitErrorReport();
     427                 :             : 
     428                 :             :                 /*
     429                 :             :                  * In the - very unlikely - case that the IO failed in a way that
     430                 :             :                  * raises an error we need to mark the IO as failed.
     431                 :             :                  *
     432                 :             :                  * Need to do just enough error recovery so that we can mark the IO as
     433                 :             :                  * failed and then exit (postmaster will start a new worker).
     434                 :             :                  */
     435                 :           0 :                 LWLockReleaseAll();
     436                 :             : 
     437         [ #  # ]:           0 :                 if (error_ioh != NULL)
     438                 :             :                 {
     439                 :             :                         /* should never fail without setting error_errno */
     440         [ #  # ]:           0 :                         Assert(error_errno != 0);
     441                 :             : 
     442                 :           0 :                         errno = error_errno;
     443                 :             : 
     444                 :           0 :                         START_CRIT_SECTION();
     445                 :           0 :                         pgaio_io_process_completion(error_ioh, -error_errno);
     446         [ #  # ]:           0 :                         END_CRIT_SECTION();
     447                 :           0 :                 }
     448                 :             : 
     449                 :           0 :                 proc_exit(1);
     450                 :             :         }
     451                 :             : 
     452                 :             :         /* We can now handle ereport(ERROR) */
     453                 :           3 :         PG_exception_stack = &local_sigjmp_buf;
     454                 :             : 
     455                 :           3 :         sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
     456                 :             : 
     457         [ +  + ]:         586 :         while (!ShutdownRequestPending)
     458                 :             :         {
     459                 :         583 :                 uint32          io_index;
     460                 :         583 :                 Latch      *latches[IO_WORKER_WAKEUP_FANOUT];
     461                 :         583 :                 int                     nlatches = 0;
     462                 :         583 :                 int                     nwakeups = 0;
     463                 :         583 :                 int                     worker;
     464                 :             : 
     465                 :             :                 /*
     466                 :             :                  * Try to get a job to do.
     467                 :             :                  *
     468                 :             :                  * The lwlock acquisition also provides the necessary memory barrier
     469                 :             :                  * to ensure that we don't see an outdated data in the handle.
     470                 :             :                  */
     471                 :         583 :                 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     472         [ +  + ]:         583 :                 if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
     473                 :             :                 {
     474                 :             :                         /*
     475                 :             :                          * Nothing to do.  Mark self idle.
     476                 :             :                          *
     477                 :             :                          * XXX: Invent some kind of back pressure to reduce useless
     478                 :             :                          * wakeups?
     479                 :             :                          */
     480                 :         296 :                         io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
     481                 :         296 :                 }
     482                 :             :                 else
     483                 :             :                 {
     484                 :             :                         /* Got one.  Clear idle flag. */
     485                 :         287 :                         io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
     486                 :             : 
     487                 :             :                         /* See if we can wake up some peers. */
     488         [ +  - ]:         287 :                         nwakeups = Min(pgaio_worker_submission_queue_depth(),
     489                 :             :                                                    IO_WORKER_WAKEUP_FANOUT);
     490         [ +  + ]:         288 :                         for (int i = 0; i < nwakeups; ++i)
     491                 :             :                         {
     492         [ -  + ]:           1 :                                 if ((worker = pgaio_worker_choose_idle()) < 0)
     493                 :           0 :                                         break;
     494                 :           1 :                                 latches[nlatches++] = io_worker_control->workers[worker].latch;
     495                 :           1 :                         }
     496                 :             :                 }
     497                 :         583 :                 LWLockRelease(AioWorkerSubmissionQueueLock);
     498                 :             : 
     499         [ +  + ]:         584 :                 for (int i = 0; i < nlatches; ++i)
     500                 :           1 :                         SetLatch(latches[i]);
     501                 :             : 
     502         [ +  + ]:         583 :                 if (io_index != -1)
     503                 :             :                 {
     504                 :         287 :                         PgAioHandle *ioh = NULL;
     505                 :             : 
     506                 :         287 :                         ioh = &pgaio_ctl->io_handles[io_index];
     507                 :         287 :                         error_ioh = ioh;
     508                 :         287 :                         errcallback.arg = ioh;
     509                 :             : 
     510   [ -  +  -  + ]:         287 :                         pgaio_debug_io(DEBUG4, ioh,
     511                 :             :                                                    "worker %d processing IO",
     512                 :             :                                                    MyIoWorkerId);
     513                 :             : 
     514                 :             :                         /*
     515                 :             :                          * Prevent interrupts between pgaio_io_reopen() and
     516                 :             :                          * pgaio_io_perform_synchronously() that otherwise could lead to
     517                 :             :                          * the FD getting closed in that window.
     518                 :             :                          */
     519                 :         287 :                         HOLD_INTERRUPTS();
     520                 :             : 
     521                 :             :                         /*
     522                 :             :                          * It's very unlikely, but possible, that reopen fails. E.g. due
     523                 :             :                          * to memory allocations failing or file permissions changing or
     524                 :             :                          * such.  In that case we need to fail the IO.
     525                 :             :                          *
     526                 :             :                          * There's not really a good errno we can report here.
     527                 :             :                          */
     528                 :         287 :                         error_errno = ENOENT;
     529                 :         287 :                         pgaio_io_reopen(ioh);
     530                 :             : 
     531                 :             :                         /*
     532                 :             :                          * To be able to exercise the reopen-fails path, allow injection
     533                 :             :                          * points to trigger a failure at this point.
     534                 :             :                          */
     535                 :             :                         INJECTION_POINT("aio-worker-after-reopen", ioh);
     536                 :             : 
     537                 :         287 :                         error_errno = 0;
     538                 :         287 :                         error_ioh = NULL;
     539                 :             : 
     540                 :             :                         /*
     541                 :             :                          * As part of IO completion the buffer will be marked as NOACCESS,
     542                 :             :                          * until the buffer is pinned again - which never happens in io
     543                 :             :                          * workers. Therefore the next time there is IO for the same
     544                 :             :                          * buffer, the memory will be considered inaccessible. To avoid
     545                 :             :                          * that, explicitly allow access to the memory before reading data
     546                 :             :                          * into it.
     547                 :             :                          */
     548                 :             : #ifdef USE_VALGRIND
     549                 :             :                         {
     550                 :             :                                 struct iovec *iov;
     551                 :             :                                 uint16          iov_length = pgaio_io_get_iovec_length(ioh, &iov);
     552                 :             : 
     553                 :             :                                 for (int i = 0; i < iov_length; i++)
     554                 :             :                                         VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
     555                 :             :                         }
     556                 :             : #endif
     557                 :             : 
     558                 :             :                         /*
     559                 :             :                          * We don't expect this to ever fail with ERROR or FATAL, no need
     560                 :             :                          * to keep error_ioh set to the IO.
     561                 :             :                          * pgaio_io_perform_synchronously() contains a critical section to
     562                 :             :                          * ensure we don't accidentally fail.
     563                 :             :                          */
     564                 :         287 :                         pgaio_io_perform_synchronously(ioh);
     565                 :             : 
     566         [ +  - ]:         287 :                         RESUME_INTERRUPTS();
     567                 :         287 :                         errcallback.arg = NULL;
     568                 :         287 :                 }
     569                 :             :                 else
     570                 :             :                 {
     571                 :         296 :                         WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
     572                 :             :                                           WAIT_EVENT_IO_WORKER_MAIN);
     573                 :         296 :                         ResetLatch(MyLatch);
     574                 :             :                 }
     575                 :             : 
     576         [ +  + ]:         583 :                 CHECK_FOR_INTERRUPTS();
     577                 :             : 
     578         [ +  - ]:         583 :                 if (ConfigReloadPending)
     579                 :             :                 {
     580                 :           0 :                         ConfigReloadPending = false;
     581                 :           0 :                         ProcessConfigFile(PGC_SIGHUP);
     582                 :           0 :                 }
     583                 :         583 :         }
     584                 :             : 
     585                 :           3 :         error_context_stack = errcallback.previous;
     586                 :           3 :         proc_exit(0);
     587                 :             : }
     588                 :             : 
     589                 :             : bool
     590                 :        1153 : pgaio_workers_enabled(void)
     591                 :             : {
     592                 :        1153 :         return io_method == IOMETHOD_WORKER;
     593                 :             : }
        

Generated by: LCOV version 2.3.2-1