LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - test.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 103 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 6 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*--------------------------------------------------------------------------
       2              :  *
       3              :  * test.c
       4              :  *              Test harness code for shared memory message queues.
       5              :  *
       6              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *              src/test/modules/test_shm_mq/test.c
      10              :  *
      11              :  * -------------------------------------------------------------------------
      12              :  */
      13              : 
      14              : #include "postgres.h"
      15              : 
      16              : #include "fmgr.h"
      17              : #include "miscadmin.h"
      18              : #include "pgstat.h"
      19              : #include "varatt.h"
      20              : 
      21              : #include "test_shm_mq.h"
      22              : 
      23            0 : PG_MODULE_MAGIC;
      24              : 
      25            0 : PG_FUNCTION_INFO_V1(test_shm_mq);
      26            0 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
      27              : 
      28              : static void verify_message(Size origlen, char *origdata, Size newlen,
      29              :                                                    char *newdata);
      30              : 
      31              : /* value cached, fetched from shared memory */
      32              : static uint32 we_message_queue = 0;
      33              : 
      34              : /*
      35              :  * Simple test of the shared memory message queue infrastructure.
      36              :  *
      37              :  * We set up a ring of message queues passing through 1 or more background
      38              :  * processes and eventually looping back to ourselves.  We then send a message
      39              :  * through the ring a number of times indicated by the loop count.  At the end,
      40              :  * we check whether the final message matches the one we started with.
      41              :  */
      42              : Datum
      43            0 : test_shm_mq(PG_FUNCTION_ARGS)
      44              : {
      45            0 :         int64           queue_size = PG_GETARG_INT64(0);
      46            0 :         text       *message = PG_GETARG_TEXT_PP(1);
      47            0 :         char       *message_contents = VARDATA_ANY(message);
      48            0 :         int                     message_size = VARSIZE_ANY_EXHDR(message);
      49            0 :         int32           loop_count = PG_GETARG_INT32(2);
      50            0 :         int32           nworkers = PG_GETARG_INT32(3);
      51            0 :         dsm_segment *seg;
      52            0 :         shm_mq_handle *outqh;
      53            0 :         shm_mq_handle *inqh;
      54            0 :         shm_mq_result res;
      55            0 :         Size            len;
      56            0 :         void       *data;
      57              : 
      58              :         /* A negative loopcount is nonsensical. */
      59            0 :         if (loop_count < 0)
      60            0 :                 ereport(ERROR,
      61              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      62              :                                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
      63              : 
      64              :         /*
      65              :          * Since this test sends data using the blocking interfaces, it cannot
      66              :          * send data to itself.  Therefore, a minimum of 1 worker is required. Of
      67              :          * course, a negative worker count is nonsensical.
      68              :          */
      69            0 :         if (nworkers <= 0)
      70            0 :                 ereport(ERROR,
      71              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      72              :                                  errmsg("number of workers must be an integer value greater than zero")));
      73              : 
      74              :         /* Set up dynamic shared memory segment and background workers. */
      75            0 :         test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
      76              : 
      77              :         /* Send the initial message. */
      78            0 :         res = shm_mq_send(outqh, message_size, message_contents, false, true);
      79            0 :         if (res != SHM_MQ_SUCCESS)
      80            0 :                 ereport(ERROR,
      81              :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      82              :                                  errmsg("could not send message")));
      83              : 
      84              :         /*
      85              :          * Receive a message and send it back out again.  Do this a number of
      86              :          * times equal to the loop count.
      87              :          */
      88            0 :         for (;;)
      89              :         {
      90              :                 /* Receive a message. */
      91            0 :                 res = shm_mq_receive(inqh, &len, &data, false);
      92            0 :                 if (res != SHM_MQ_SUCCESS)
      93            0 :                         ereport(ERROR,
      94              :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      95              :                                          errmsg("could not receive message")));
      96              : 
      97              :                 /* If this is supposed to be the last iteration, stop here. */
      98            0 :                 if (--loop_count <= 0)
      99            0 :                         break;
     100              : 
     101              :                 /* Send it back out. */
     102            0 :                 res = shm_mq_send(outqh, len, data, false, true);
     103            0 :                 if (res != SHM_MQ_SUCCESS)
     104            0 :                         ereport(ERROR,
     105              :                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     106              :                                          errmsg("could not send message")));
     107              :         }
     108              : 
     109              :         /*
     110              :          * Finally, check that we got back the same message from the last
     111              :          * iteration that we originally sent.
     112              :          */
     113            0 :         verify_message(message_size, message_contents, len, data);
     114              : 
     115              :         /* Clean up. */
     116            0 :         dsm_detach(seg);
     117              : 
     118            0 :         PG_RETURN_VOID();
     119            0 : }
     120              : 
     121              : /*
     122              :  * Pipelined test of the shared memory message queue infrastructure.
     123              :  *
     124              :  * As in the basic test, we set up a ring of message queues passing through
     125              :  * 1 or more background processes and eventually looping back to ourselves.
     126              :  * Then, we send N copies of the user-specified message through the ring and
     127              :  * receive them all back.  Since this might fill up all message queues in the
     128              :  * ring and then stall, we must be prepared to begin receiving the messages
     129              :  * back before we've finished sending them.
     130              :  */
     131              : Datum
     132            0 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
     133              : {
     134            0 :         int64           queue_size = PG_GETARG_INT64(0);
     135            0 :         text       *message = PG_GETARG_TEXT_PP(1);
     136            0 :         char       *message_contents = VARDATA_ANY(message);
     137            0 :         int                     message_size = VARSIZE_ANY_EXHDR(message);
     138            0 :         int32           loop_count = PG_GETARG_INT32(2);
     139            0 :         int32           nworkers = PG_GETARG_INT32(3);
     140            0 :         bool            verify = PG_GETARG_BOOL(4);
     141            0 :         int32           send_count = 0;
     142            0 :         int32           receive_count = 0;
     143            0 :         dsm_segment *seg;
     144            0 :         shm_mq_handle *outqh;
     145            0 :         shm_mq_handle *inqh;
     146            0 :         shm_mq_result res;
     147            0 :         Size            len;
     148            0 :         void       *data;
     149              : 
     150              :         /* A negative loopcount is nonsensical. */
     151            0 :         if (loop_count < 0)
     152            0 :                 ereport(ERROR,
     153              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     154              :                                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
     155              : 
     156              :         /*
     157              :          * Using the nonblocking interfaces, we can even send data to ourselves,
     158              :          * so the minimum number of workers for this test is zero.
     159              :          */
     160            0 :         if (nworkers < 0)
     161            0 :                 ereport(ERROR,
     162              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     163              :                                  errmsg("number of workers must be an integer value greater than or equal to zero")));
     164              : 
     165              :         /* Set up dynamic shared memory segment and background workers. */
     166            0 :         test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
     167              : 
     168              :         /* Main loop. */
     169            0 :         for (;;)
     170              :         {
     171            0 :                 bool            wait = true;
     172              : 
     173              :                 /*
     174              :                  * If we haven't yet sent the message the requisite number of times,
     175              :                  * try again to send it now.  Note that when shm_mq_send() returns
     176              :                  * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
     177              :                  * same message size and contents; that's not an issue here because
     178              :                  * we're sending the same message every time.
     179              :                  */
     180            0 :                 if (send_count < loop_count)
     181              :                 {
     182            0 :                         res = shm_mq_send(outqh, message_size, message_contents, true,
     183              :                                                           true);
     184            0 :                         if (res == SHM_MQ_SUCCESS)
     185              :                         {
     186            0 :                                 ++send_count;
     187            0 :                                 wait = false;
     188            0 :                         }
     189            0 :                         else if (res == SHM_MQ_DETACHED)
     190            0 :                                 ereport(ERROR,
     191              :                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     192              :                                                  errmsg("could not send message")));
     193            0 :                 }
     194              : 
     195              :                 /*
     196              :                  * If we haven't yet received the message the requisite number of
     197              :                  * times, try to receive it again now.
     198              :                  */
     199            0 :                 if (receive_count < loop_count)
     200              :                 {
     201            0 :                         res = shm_mq_receive(inqh, &len, &data, true);
     202            0 :                         if (res == SHM_MQ_SUCCESS)
     203              :                         {
     204            0 :                                 ++receive_count;
     205              :                                 /* Verifying every time is slow, so it's optional. */
     206            0 :                                 if (verify)
     207            0 :                                         verify_message(message_size, message_contents, len, data);
     208            0 :                                 wait = false;
     209            0 :                         }
     210            0 :                         else if (res == SHM_MQ_DETACHED)
     211            0 :                                 ereport(ERROR,
     212              :                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     213              :                                                  errmsg("could not receive message")));
     214            0 :                 }
     215              :                 else
     216              :                 {
     217              :                         /*
     218              :                          * Otherwise, we've received the message enough times.  This
     219              :                          * shouldn't happen unless we've also sent it enough times.
     220              :                          */
     221            0 :                         if (send_count != receive_count)
     222            0 :                                 ereport(ERROR,
     223              :                                                 (errcode(ERRCODE_INTERNAL_ERROR),
     224              :                                                  errmsg("message sent %d times, but received %d times",
     225              :                                                                 send_count, receive_count)));
     226            0 :                         break;
     227              :                 }
     228              : 
     229            0 :                 if (wait)
     230              :                 {
     231              :                         /* first time, allocate or get the custom wait event */
     232            0 :                         if (we_message_queue == 0)
     233            0 :                                 we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
     234              : 
     235              :                         /*
     236              :                          * If we made no progress, wait for one of the other processes to
     237              :                          * which we are connected to set our latch, indicating that they
     238              :                          * have read or written data and therefore there may now be work
     239              :                          * for us to do.
     240              :                          */
     241            0 :                         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     242            0 :                                                          we_message_queue);
     243            0 :                         ResetLatch(MyLatch);
     244            0 :                         CHECK_FOR_INTERRUPTS();
     245            0 :                 }
     246            0 :         }
     247              : 
     248              :         /* Clean up. */
     249            0 :         dsm_detach(seg);
     250              : 
     251            0 :         PG_RETURN_VOID();
     252            0 : }
     253              : 
     254              : /*
     255              :  * Verify that two messages are the same.
     256              :  */
     257              : static void
     258            0 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
     259              : {
     260            0 :         Size            i;
     261              : 
     262            0 :         if (origlen != newlen)
     263            0 :                 ereport(ERROR,
     264              :                                 (errmsg("message corrupted"),
     265              :                                  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
     266              :                                                    origlen, newlen)));
     267              : 
     268            0 :         for (i = 0; i < origlen; ++i)
     269            0 :                 if (origdata[i] != newdata[i])
     270            0 :                         ereport(ERROR,
     271              :                                         (errmsg("message corrupted"),
     272              :                                          errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
     273            0 : }
        

Generated by: LCOV version 2.3.2-1