LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - setup.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 128 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 6 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*--------------------------------------------------------------------------
       2              :  *
       3              :  * setup.c
       4              :  *              Code to set up a dynamic shared memory segments and a specified
       5              :  *              number of background workers for shared memory message queue
       6              :  *              testing.
       7              :  *
       8              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *              src/test/modules/test_shm_mq/setup.c
      12              :  *
      13              :  * -------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres.h"
      17              : 
      18              : #include "miscadmin.h"
      19              : #include "pgstat.h"
      20              : #include "postmaster/bgworker.h"
      21              : #include "storage/shm_toc.h"
      22              : #include "test_shm_mq.h"
      23              : #include "utils/memutils.h"
      24              : 
      25              : typedef struct
      26              : {
      27              :         int                     nworkers;
      28              :         BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
      29              : } worker_state;
      30              : 
      31              : static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      32              :                                                                                 dsm_segment **segp,
      33              :                                                                                 test_shm_mq_header **hdrp,
      34              :                                                                                 shm_mq **outp, shm_mq **inp);
      35              : static worker_state *setup_background_workers(int nworkers,
      36              :                                                                                           dsm_segment *seg);
      37              : static void cleanup_background_workers(dsm_segment *seg, Datum arg);
      38              : static void wait_for_workers_to_become_ready(worker_state *wstate,
      39              :                                                                                          volatile test_shm_mq_header *hdr);
      40              : static bool check_worker_status(worker_state *wstate);
      41              : 
      42              : /* value cached, fetched from shared memory */
      43              : static uint32 we_bgworker_startup = 0;
      44              : 
      45              : /*
      46              :  * Set up a dynamic shared memory segment and zero or more background workers
      47              :  * for a test run.
      48              :  */
      49              : void
      50            0 : test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
      51              :                                   shm_mq_handle **output, shm_mq_handle **input)
      52              : {
      53            0 :         dsm_segment *seg;
      54            0 :         test_shm_mq_header *hdr;
      55            0 :         shm_mq     *outq = NULL;        /* placate compiler */
      56            0 :         shm_mq     *inq = NULL;         /* placate compiler */
      57            0 :         worker_state *wstate;
      58              : 
      59              :         /* Set up a dynamic shared memory segment. */
      60            0 :         setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
      61            0 :         *segp = seg;
      62              : 
      63              :         /* Register background workers. */
      64            0 :         wstate = setup_background_workers(nworkers, seg);
      65              : 
      66              :         /* Attach the queues. */
      67            0 :         *output = shm_mq_attach(outq, seg, wstate->handle[0]);
      68            0 :         *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
      69              : 
      70              :         /* Wait for workers to become ready. */
      71            0 :         wait_for_workers_to_become_ready(wstate, hdr);
      72              : 
      73              :         /*
      74              :          * Once we reach this point, all workers are ready.  We no longer need to
      75              :          * kill them if we die; they'll die on their own as the message queues
      76              :          * shut down.
      77              :          */
      78            0 :         cancel_on_dsm_detach(seg, cleanup_background_workers,
      79            0 :                                                  PointerGetDatum(wstate));
      80            0 :         pfree(wstate);
      81            0 : }
      82              : 
      83              : /*
      84              :  * Set up a dynamic shared memory segment.
      85              :  *
      86              :  * We set up a small control region that contains only a test_shm_mq_header,
      87              :  * plus one region per message queue.  There are as many message queues as
      88              :  * the number of workers, plus one.
      89              :  */
      90              : static void
      91            0 : setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      92              :                                                         dsm_segment **segp, test_shm_mq_header **hdrp,
      93              :                                                         shm_mq **outp, shm_mq **inp)
      94              : {
      95            0 :         shm_toc_estimator e;
      96            0 :         int                     i;
      97            0 :         Size            segsize;
      98            0 :         dsm_segment *seg;
      99            0 :         shm_toc    *toc;
     100            0 :         test_shm_mq_header *hdr;
     101              : 
     102              :         /* Ensure a valid queue size. */
     103            0 :         if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
     104            0 :                 ereport(ERROR,
     105              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     106              :                                  errmsg("queue size must be at least %zu bytes",
     107              :                                                 shm_mq_minimum_size)));
     108            0 :         if (queue_size != ((Size) queue_size))
     109            0 :                 ereport(ERROR,
     110              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     111              :                                  errmsg("queue size overflows size_t")));
     112              : 
     113              :         /*
     114              :          * Estimate how much shared memory we need.
     115              :          *
     116              :          * Because the TOC machinery may choose to insert padding of oddly-sized
     117              :          * requests, we must estimate each chunk separately.
     118              :          *
     119              :          * We need one key to register the location of the header, and we need
     120              :          * nworkers + 1 keys to track the locations of the message queues.
     121              :          */
     122            0 :         shm_toc_initialize_estimator(&e);
     123            0 :         shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
     124            0 :         for (i = 0; i <= nworkers; ++i)
     125            0 :                 shm_toc_estimate_chunk(&e, (Size) queue_size);
     126            0 :         shm_toc_estimate_keys(&e, 2 + nworkers);
     127            0 :         segsize = shm_toc_estimate(&e);
     128              : 
     129              :         /* Create the shared memory segment and establish a table of contents. */
     130            0 :         seg = dsm_create(shm_toc_estimate(&e), 0);
     131            0 :         toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
     132            0 :                                                  segsize);
     133              : 
     134              :         /* Set up the header region. */
     135            0 :         hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
     136            0 :         SpinLockInit(&hdr->mutex);
     137            0 :         hdr->workers_total = nworkers;
     138            0 :         hdr->workers_attached = 0;
     139            0 :         hdr->workers_ready = 0;
     140            0 :         shm_toc_insert(toc, 0, hdr);
     141              : 
     142              :         /* Set up one message queue per worker, plus one. */
     143            0 :         for (i = 0; i <= nworkers; ++i)
     144              :         {
     145            0 :                 shm_mq     *mq;
     146              : 
     147            0 :                 mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
     148            0 :                                                    (Size) queue_size);
     149            0 :                 shm_toc_insert(toc, i + 1, mq);
     150              : 
     151            0 :                 if (i == 0)
     152              :                 {
     153              :                         /* We send messages to the first queue. */
     154            0 :                         shm_mq_set_sender(mq, MyProc);
     155            0 :                         *outp = mq;
     156            0 :                 }
     157            0 :                 if (i == nworkers)
     158              :                 {
     159              :                         /* We receive messages from the last queue. */
     160            0 :                         shm_mq_set_receiver(mq, MyProc);
     161            0 :                         *inp = mq;
     162            0 :                 }
     163            0 :         }
     164              : 
     165              :         /* Return results to caller. */
     166            0 :         *segp = seg;
     167            0 :         *hdrp = hdr;
     168            0 : }
     169              : 
     170              : /*
     171              :  * Register background workers.
     172              :  */
     173              : static worker_state *
     174            0 : setup_background_workers(int nworkers, dsm_segment *seg)
     175              : {
     176            0 :         MemoryContext oldcontext;
     177            0 :         BackgroundWorker worker;
     178            0 :         worker_state *wstate;
     179            0 :         int                     i;
     180              : 
     181              :         /*
     182              :          * We need the worker_state object and the background worker handles to
     183              :          * which it points to be allocated in CurTransactionContext rather than
     184              :          * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
     185              :          * hooks run.
     186              :          */
     187            0 :         oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     188              : 
     189              :         /* Create worker state object. */
     190            0 :         wstate = MemoryContextAlloc(TopTransactionContext,
     191            0 :                                                                 offsetof(worker_state, handle) +
     192            0 :                                                                 sizeof(BackgroundWorkerHandle *) * nworkers);
     193            0 :         wstate->nworkers = 0;
     194              : 
     195              :         /*
     196              :          * Arrange to kill all the workers if we abort before all workers are
     197              :          * finished hooking themselves up to the dynamic shared memory segment.
     198              :          *
     199              :          * If we die after all the workers have finished hooking themselves up to
     200              :          * the dynamic shared memory segment, we'll mark the two queues to which
     201              :          * we're directly connected as detached, and the worker(s) connected to
     202              :          * those queues will exit, marking any other queues to which they are
     203              :          * connected as detached.  This will cause any as-yet-unaware workers
     204              :          * connected to those queues to exit in their turn, and so on, until
     205              :          * everybody exits.
     206              :          *
     207              :          * But suppose the workers which are supposed to connect to the queues to
     208              :          * which we're directly attached exit due to some error before they
     209              :          * actually attach the queues.  The remaining workers will have no way of
     210              :          * knowing this.  From their perspective, they're still waiting for those
     211              :          * workers to start, when in fact they've already died.
     212              :          */
     213            0 :         on_dsm_detach(seg, cleanup_background_workers,
     214            0 :                                   PointerGetDatum(wstate));
     215              : 
     216              :         /* Configure a worker. */
     217            0 :         memset(&worker, 0, sizeof(worker));
     218            0 :         worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     219            0 :         worker.bgw_start_time = BgWorkerStart_ConsistentState;
     220            0 :         worker.bgw_restart_time = BGW_NEVER_RESTART;
     221            0 :         sprintf(worker.bgw_library_name, "test_shm_mq");
     222            0 :         sprintf(worker.bgw_function_name, "test_shm_mq_main");
     223            0 :         snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
     224            0 :         worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
     225              :         /* set bgw_notify_pid, so we can detect if the worker stops */
     226            0 :         worker.bgw_notify_pid = MyProcPid;
     227              : 
     228              :         /* Register the workers. */
     229            0 :         for (i = 0; i < nworkers; ++i)
     230              :         {
     231            0 :                 if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
     232            0 :                         ereport(ERROR,
     233              :                                         (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     234              :                                          errmsg("could not register background process"),
     235              :                                          errhint("You may need to increase \"max_worker_processes\".")));
     236            0 :                 ++wstate->nworkers;
     237            0 :         }
     238              : 
     239              :         /* All done. */
     240            0 :         MemoryContextSwitchTo(oldcontext);
     241            0 :         return wstate;
     242            0 : }
     243              : 
     244              : static void
     245            0 : cleanup_background_workers(dsm_segment *seg, Datum arg)
     246              : {
     247            0 :         worker_state *wstate = (worker_state *) DatumGetPointer(arg);
     248              : 
     249            0 :         while (wstate->nworkers > 0)
     250              :         {
     251            0 :                 --wstate->nworkers;
     252            0 :                 TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
     253              :         }
     254            0 : }
     255              : 
     256              : static void
     257            0 : wait_for_workers_to_become_ready(worker_state *wstate,
     258              :                                                                  volatile test_shm_mq_header *hdr)
     259              : {
     260            0 :         bool            result = false;
     261              : 
     262            0 :         for (;;)
     263              :         {
     264            0 :                 int                     workers_ready;
     265              : 
     266              :                 /* If all the workers are ready, we have succeeded. */
     267            0 :                 SpinLockAcquire(&hdr->mutex);
     268            0 :                 workers_ready = hdr->workers_ready;
     269            0 :                 SpinLockRelease(&hdr->mutex);
     270            0 :                 if (workers_ready >= wstate->nworkers)
     271              :                 {
     272            0 :                         result = true;
     273            0 :                         break;
     274              :                 }
     275              : 
     276              :                 /* If any workers (or the postmaster) have died, we have failed. */
     277            0 :                 if (!check_worker_status(wstate))
     278              :                 {
     279            0 :                         result = false;
     280            0 :                         break;
     281              :                 }
     282              : 
     283              :                 /* first time, allocate or get the custom wait event */
     284            0 :                 if (we_bgworker_startup == 0)
     285            0 :                         we_bgworker_startup = WaitEventExtensionNew("TestShmMqBgWorkerStartup");
     286              : 
     287              :                 /* Wait to be signaled. */
     288            0 :                 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     289            0 :                                                  we_bgworker_startup);
     290              : 
     291              :                 /* Reset the latch so we don't spin. */
     292            0 :                 ResetLatch(MyLatch);
     293              : 
     294              :                 /* An interrupt may have occurred while we were waiting. */
     295            0 :                 CHECK_FOR_INTERRUPTS();
     296            0 :         }
     297              : 
     298            0 :         if (!result)
     299            0 :                 ereport(ERROR,
     300              :                                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     301              :                                  errmsg("one or more background workers failed to start")));
     302            0 : }
     303              : 
     304              : static bool
     305            0 : check_worker_status(worker_state *wstate)
     306              : {
     307            0 :         int                     n;
     308              : 
     309              :         /* If any workers (or the postmaster) have died, we have failed. */
     310            0 :         for (n = 0; n < wstate->nworkers; ++n)
     311              :         {
     312            0 :                 BgwHandleStatus status;
     313            0 :                 pid_t           pid;
     314              : 
     315            0 :                 status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
     316            0 :                 if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
     317            0 :                         return false;
     318            0 :         }
     319              : 
     320              :         /* Otherwise, things still look OK. */
     321            0 :         return true;
     322            0 : }
        

Generated by: LCOV version 2.3.2-1