LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - worker.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 57 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 3 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*--------------------------------------------------------------------------
       2              :  *
       3              :  * worker.c
       4              :  *              Code for sample worker making use of shared memory message queues.
       5              :  *              Our test worker simply reads messages from one message queue and
       6              :  *              writes them back out to another message queue.  In a real
       7              :  *              application, you'd presumably want the worker to do some more
       8              :  *              complex calculation rather than simply returning the input,
       9              :  *              but it should be possible to use much of the control logic just
      10              :  *              as presented here.
      11              :  *
      12              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
      13              :  *
      14              :  * IDENTIFICATION
      15              :  *              src/test/modules/test_shm_mq/worker.c
      16              :  *
      17              :  * -------------------------------------------------------------------------
      18              :  */
      19              : 
      20              : #include "postgres.h"
      21              : 
      22              : #include "miscadmin.h"
      23              : #include "storage/ipc.h"
      24              : #include "storage/procarray.h"
      25              : #include "storage/shm_mq.h"
      26              : #include "storage/shm_toc.h"
      27              : #include "tcop/tcopprot.h"
      28              : 
      29              : #include "test_shm_mq.h"
      30              : 
      31              : static void attach_to_queues(dsm_segment *seg, shm_toc *toc,
      32              :                                                          int myworkernumber, shm_mq_handle **inqhp,
      33              :                                                          shm_mq_handle **outqhp);
      34              : static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh);
      35              : 
      36              : /*
      37              :  * Background worker entrypoint.
      38              :  *
      39              :  * This is intended to demonstrate how a background worker can be used to
      40              :  * facilitate a parallel computation.  Most of the logic here is fairly
      41              :  * boilerplate stuff, designed to attach to the shared memory segment,
      42              :  * notify the user backend that we're alive, and so on.  The
      43              :  * application-specific bits of logic that you'd replace for your own worker
      44              :  * are attach_to_queues() and copy_messages().
      45              :  */
      46              : void
      47            0 : test_shm_mq_main(Datum main_arg)
      48              : {
      49            0 :         dsm_segment *seg;
      50            0 :         shm_toc    *toc;
      51            0 :         shm_mq_handle *inqh;
      52            0 :         shm_mq_handle *outqh;
      53            0 :         volatile test_shm_mq_header *hdr;
      54            0 :         int                     myworkernumber;
      55            0 :         PGPROC     *registrant;
      56              : 
      57              :         /*
      58              :          * Establish signal handlers.
      59              :          *
      60              :          * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
      61              :          * it would a normal user backend.  To make that happen, we use die().
      62              :          */
      63            0 :         pqsignal(SIGTERM, die);
      64            0 :         BackgroundWorkerUnblockSignals();
      65              : 
      66              :         /*
      67              :          * Connect to the dynamic shared memory segment.
      68              :          *
      69              :          * The backend that registered this worker passed us the ID of a shared
      70              :          * memory segment to which we must attach for further instructions.  Once
      71              :          * we've mapped the segment in our address space, attach to the table of
      72              :          * contents so we can locate the various data structures we'll need to
      73              :          * find within the segment.
      74              :          *
      75              :          * Note: at this point, we have not created any ResourceOwner in this
      76              :          * process.  This will result in our DSM mapping surviving until process
      77              :          * exit, which is fine.  If there were a ResourceOwner, it would acquire
      78              :          * ownership of the mapping, but we have no need for that.
      79              :          */
      80            0 :         seg = dsm_attach(DatumGetUInt32(main_arg));
      81            0 :         if (seg == NULL)
      82            0 :                 ereport(ERROR,
      83              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      84              :                                  errmsg("unable to map dynamic shared memory segment")));
      85            0 :         toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg));
      86            0 :         if (toc == NULL)
      87            0 :                 ereport(ERROR,
      88              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      89              :                                  errmsg("bad magic number in dynamic shared memory segment")));
      90              : 
      91              :         /*
      92              :          * Acquire a worker number.
      93              :          *
      94              :          * By convention, the process registering this background worker should
      95              :          * have stored the control structure at key 0.  We look up that key to
      96              :          * find it.  Our worker number gives our identity: there may be just one
      97              :          * worker involved in this parallel operation, or there may be many.
      98              :          */
      99            0 :         hdr = shm_toc_lookup(toc, 0, false);
     100            0 :         SpinLockAcquire(&hdr->mutex);
     101            0 :         myworkernumber = ++hdr->workers_attached;
     102            0 :         SpinLockRelease(&hdr->mutex);
     103            0 :         if (myworkernumber > hdr->workers_total)
     104            0 :                 ereport(ERROR,
     105              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     106              :                                  errmsg("too many message queue testing workers already")));
     107              : 
     108              :         /*
     109              :          * Attach to the appropriate message queues.
     110              :          */
     111            0 :         attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh);
     112              : 
     113              :         /*
     114              :          * Indicate that we're fully initialized and ready to begin the main part
     115              :          * of the parallel operation.
     116              :          *
     117              :          * Once we signal that we're ready, the user backend is entitled to assume
     118              :          * that our on_dsm_detach callbacks will fire before we disconnect from
     119              :          * the shared memory segment and exit.  Generally, that means we must have
     120              :          * attached to all relevant dynamic shared memory data structures by now.
     121              :          */
     122            0 :         SpinLockAcquire(&hdr->mutex);
     123            0 :         ++hdr->workers_ready;
     124            0 :         SpinLockRelease(&hdr->mutex);
     125            0 :         registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
     126            0 :         if (registrant == NULL)
     127              :         {
     128            0 :                 elog(DEBUG1, "registrant backend has exited prematurely");
     129            0 :                 proc_exit(1);
     130              :         }
     131            0 :         SetLatch(&registrant->procLatch);
     132              : 
     133              :         /* Do the work. */
     134            0 :         copy_messages(inqh, outqh);
     135              : 
     136              :         /*
     137              :          * We're done.  For cleanliness, explicitly detach from the shared memory
     138              :          * segment (that would happen anyway during process exit, though).
     139              :          */
     140            0 :         dsm_detach(seg);
     141            0 :         proc_exit(1);
     142              : }
     143              : 
     144              : /*
     145              :  * Attach to shared memory message queues.
     146              :  *
     147              :  * We use our worker number to determine to which queue we should attach.
     148              :  * The queues are registered at keys 1..<number-of-workers>.  The user backend
     149              :  * writes to queue #1 and reads from queue #<number-of-workers>; each worker
     150              :  * reads from the queue whose number is equal to its worker number and writes
     151              :  * to the next higher-numbered queue.
     152              :  */
     153              : static void
     154            0 : attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber,
     155              :                                  shm_mq_handle **inqhp, shm_mq_handle **outqhp)
     156              : {
     157            0 :         shm_mq     *inq;
     158            0 :         shm_mq     *outq;
     159              : 
     160            0 :         inq = shm_toc_lookup(toc, myworkernumber, false);
     161            0 :         shm_mq_set_receiver(inq, MyProc);
     162            0 :         *inqhp = shm_mq_attach(inq, seg, NULL);
     163            0 :         outq = shm_toc_lookup(toc, myworkernumber + 1, false);
     164            0 :         shm_mq_set_sender(outq, MyProc);
     165            0 :         *outqhp = shm_mq_attach(outq, seg, NULL);
     166            0 : }
     167              : 
     168              : /*
     169              :  * Loop, receiving and sending messages, until the connection is broken.
     170              :  *
     171              :  * This is the "real work" performed by this worker process.  Everything that
     172              :  * happens before this is initialization of one form or another, and everything
     173              :  * after this point is cleanup.
     174              :  */
     175              : static void
     176            0 : copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
     177              : {
     178            0 :         Size            len;
     179            0 :         void       *data;
     180            0 :         shm_mq_result res;
     181              : 
     182            0 :         for (;;)
     183              :         {
     184              :                 /* Notice any interrupts that have occurred. */
     185            0 :                 CHECK_FOR_INTERRUPTS();
     186              : 
     187              :                 /* Receive a message. */
     188            0 :                 res = shm_mq_receive(inqh, &len, &data, false);
     189            0 :                 if (res != SHM_MQ_SUCCESS)
     190            0 :                         break;
     191              : 
     192              :                 /* Send it back out. */
     193            0 :                 res = shm_mq_send(outqh, len, data, false, true);
     194            0 :                 if (res != SHM_MQ_SUCCESS)
     195            0 :                         break;
     196              :         }
     197            0 : }
        

Generated by: LCOV version 2.3.2-1