LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 93.9 % 196 184
Test Date: 2026-01-26 10:56:24 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 68.0 % 103 70

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * sinvaladt.c
       4                 :             :  *        POSTGRES shared cache invalidation data manager.
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  *
      10                 :             :  * IDENTIFICATION
      11                 :             :  *        src/backend/storage/ipc/sinvaladt.c
      12                 :             :  *
      13                 :             :  *-------------------------------------------------------------------------
      14                 :             :  */
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include <signal.h>
      18                 :             : #include <unistd.h>
      19                 :             : 
      20                 :             : #include "miscadmin.h"
      21                 :             : #include "storage/ipc.h"
      22                 :             : #include "storage/proc.h"
      23                 :             : #include "storage/procnumber.h"
      24                 :             : #include "storage/procsignal.h"
      25                 :             : #include "storage/shmem.h"
      26                 :             : #include "storage/sinvaladt.h"
      27                 :             : #include "storage/spin.h"
      28                 :             : 
      29                 :             : /*
      30                 :             :  * Conceptually, the shared cache invalidation messages are stored in an
      31                 :             :  * infinite array, where maxMsgNum is the next array subscript to store a
      32                 :             :  * submitted message in, minMsgNum is the smallest array subscript containing
      33                 :             :  * a message not yet read by all backends, and we always have maxMsgNum >=
      34                 :             :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
      35                 :             :  * active backend, there is a nextMsgNum pointer indicating the next message it
      36                 :             :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
      37                 :             :  * backend.
      38                 :             :  *
      39                 :             :  * (In the current implementation, minMsgNum is a lower bound for the
      40                 :             :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
      41                 :             :  * smallest nextMsgNum --- it may lag behind.  We only update it when
      42                 :             :  * SICleanupQueue is called, and we try not to do that often.)
      43                 :             :  *
      44                 :             :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
      45                 :             :  * entries.  We translate MsgNum values into circular-buffer indexes by
      46                 :             :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
      47                 :             :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
      48                 :             :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
      49                 :             :  * in the buffer.  If the buffer does overflow, we recover by setting the
      50                 :             :  * "reset" flag for each backend that has fallen too far behind.  A backend
      51                 :             :  * that is in "reset" state is ignored while determining minMsgNum.  When
      52                 :             :  * it does finally attempt to receive inval messages, it must discard all
      53                 :             :  * its invalidatable state, since it won't know what it missed.
      54                 :             :  *
      55                 :             :  * To reduce the probability of needing resets, we send a "catchup" interrupt
      56                 :             :  * to any backend that seems to be falling unreasonably far behind.  The
      57                 :             :  * normal behavior is that at most one such interrupt is in flight at a time;
      58                 :             :  * when a backend completes processing a catchup interrupt, it executes
      59                 :             :  * SICleanupQueue, which will signal the next-furthest-behind backend if
      60                 :             :  * needed.  This avoids undue contention from multiple backends all trying
      61                 :             :  * to catch up at once.  However, the furthest-back backend might be stuck
      62                 :             :  * in a state where it can't catch up.  Eventually it will get reset, so it
      63                 :             :  * won't cause any more problems for anyone but itself.  But we don't want
      64                 :             :  * to find that a bunch of other backends are now too close to the reset
      65                 :             :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
      66                 :             :  * send extra catchup interrupts as the queue gets fuller, to backends that
      67                 :             :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
      68                 :             :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
      69                 :             :  * that aren't stuck will propagate their interrupts to the next guy.
      70                 :             :  *
      71                 :             :  * We would have problems if the MsgNum values overflow an integer, so
      72                 :             :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
      73                 :             :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
      74                 :             :  * large so that we don't need to do this often.  It must be a multiple of
      75                 :             :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
      76                 :             :  * to be moved when we do it.
      77                 :             :  *
      78                 :             :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
      79                 :             :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
      80                 :             :  * authorizes them to modify their own ProcState but not to modify or even
      81                 :             :  * look at anyone else's.  When we need to perform array-wide updates,
      82                 :             :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
      83                 :             :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
      84                 :             :  * mode) to serialize adding messages to the queue.  Note that a writer
      85                 :             :  * can operate in parallel with one or more readers, because the writer
      86                 :             :  * has no need to touch anyone's ProcState, except in the infrequent cases
      87                 :             :  * when SICleanupQueue is needed.  The only point of overlap is that
      88                 :             :  * the writer wants to change maxMsgNum while readers need to read it.
      89                 :             :  * We deal with that by having a spinlock that readers must take for just
      90                 :             :  * long enough to read maxMsgNum, while writers take it for just long enough
      91                 :             :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
      92                 :             :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
      93                 :             :  * spinlock to write maxMsgNum unless you are holding both locks.)
      94                 :             :  *
      95                 :             :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
      96                 :             :  * writable, the spinlock might seem unnecessary.  The reason it is needed
      97                 :             :  * is to provide a memory barrier: we need to be sure that messages written
      98                 :             :  * to the array are actually there before maxMsgNum is increased, and that
      99                 :             :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
     100                 :             :  * that have weak memory-ordering guarantees can fail without the memory
     101                 :             :  * barrier instructions that are included in the spinlock sequences.
     102                 :             :  */
     103                 :             : 
     104                 :             : 
     105                 :             : /*
     106                 :             :  * Configurable parameters.
     107                 :             :  *
     108                 :             :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
     109                 :             :  * Must be a power of 2 for speed.
     110                 :             :  *
     111                 :             :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
     112                 :             :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
     113                 :             :  *
     114                 :             :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
     115                 :             :  * before we bother to call SICleanupQueue.
     116                 :             :  *
     117                 :             :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
     118                 :             :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
     119                 :             :  *
     120                 :             :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
     121                 :             :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
     122                 :             :  *
     123                 :             :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
     124                 :             :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
     125                 :             :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
     126                 :             :  * per iteration.
     127                 :             :  */
     128                 :             : 
     129                 :             : #define MAXNUMMESSAGES 4096
     130                 :             : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
     131                 :             : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
     132                 :             : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
     133                 :             : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
     134                 :             : #define WRITE_QUANTUM 64
     135                 :             : 
     136                 :             : /* Per-backend state in shared invalidation structure */
     137                 :             : typedef struct ProcState
     138                 :             : {
     139                 :             :         /* procPid is zero in an inactive ProcState array entry. */
     140                 :             :         pid_t           procPid;                /* PID of backend, for signaling */
     141                 :             :         /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
     142                 :             :         int                     nextMsgNum;             /* next message number to read */
     143                 :             :         bool            resetState;             /* backend needs to reset its state */
     144                 :             :         bool            signaled;               /* backend has been sent catchup signal */
     145                 :             :         bool            hasMessages;    /* backend has unread messages */
     146                 :             : 
     147                 :             :         /*
     148                 :             :          * Backend only sends invalidations, never receives them. This only makes
     149                 :             :          * sense for Startup process during recovery because it doesn't maintain a
     150                 :             :          * relcache, yet it fires inval messages to allow query backends to see
     151                 :             :          * schema changes.
     152                 :             :          */
     153                 :             :         bool            sendOnly;               /* backend only sends, never receives */
     154                 :             : 
     155                 :             :         /*
     156                 :             :          * Next LocalTransactionId to use for each idle backend slot.  We keep
     157                 :             :          * this here because it is indexed by ProcNumber and it is convenient to
     158                 :             :          * copy the value to and from local memory when MyProcNumber is set. It's
     159                 :             :          * meaningless in an active ProcState entry.
     160                 :             :          */
     161                 :             :         LocalTransactionId nextLXID;
     162                 :             : } ProcState;
     163                 :             : 
     164                 :             : /* Shared cache invalidation memory segment */
     165                 :             : typedef struct SISeg
     166                 :             : {
     167                 :             :         /*
     168                 :             :          * General state information
     169                 :             :          */
     170                 :             :         int                     minMsgNum;              /* oldest message still needed */
     171                 :             :         int                     maxMsgNum;              /* next message number to be assigned */
     172                 :             :         int                     nextThreshold;  /* # of messages to call SICleanupQueue */
     173                 :             : 
     174                 :             :         slock_t         msgnumLock;             /* spinlock protecting maxMsgNum */
     175                 :             : 
     176                 :             :         /*
     177                 :             :          * Circular buffer holding shared-inval messages
     178                 :             :          */
     179                 :             :         SharedInvalidationMessage buffer[MAXNUMMESSAGES];
     180                 :             : 
     181                 :             :         /*
     182                 :             :          * Per-backend invalidation state info.
     183                 :             :          *
     184                 :             :          * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
     185                 :             :          * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
     186                 :             :          * a dense array of their indexes, to speed up scanning all in-use slots.
     187                 :             :          *
     188                 :             :          * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
     189                 :             :          * having our separate copy avoids contention on ProcArrayLock, and allows
     190                 :             :          * us to track only the processes that participate in shared cache
     191                 :             :          * invalidations.
     192                 :             :          */
     193                 :             :         int                     numProcs;
     194                 :             :         int                *pgprocnos;
     195                 :             :         ProcState       procState[FLEXIBLE_ARRAY_MEMBER];
     196                 :             : } SISeg;
     197                 :             : 
     198                 :             : /*
     199                 :             :  * We reserve a slot for each possible ProcNumber, plus one for each
     200                 :             :  * possible auxiliary process type.  (This scheme assumes there is not
     201                 :             :  * more than one of any auxiliary process type at a time, except for
     202                 :             :  * IO workers.)
     203                 :             :  */
     204                 :             : #define NumProcStateSlots       (MaxBackends + NUM_AUXILIARY_PROCS)
     205                 :             : 
     206                 :             : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
     207                 :             : 
     208                 :             : 
     209                 :             : static LocalTransactionId nextLocalTransactionId;
     210                 :             : 
     211                 :             : static void CleanupInvalidationState(int status, Datum arg);
     212                 :             : 
     213                 :             : 
     214                 :             : /*
     215                 :             :  * SharedInvalShmemSize --- return shared-memory space needed
     216                 :             :  */
     217                 :             : Size
     218                 :          15 : SharedInvalShmemSize(void)
     219                 :             : {
     220                 :          15 :         Size            size;
     221                 :             : 
     222                 :          15 :         size = offsetof(SISeg, procState);
     223                 :          15 :         size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots));  /* procState */
     224                 :          15 :         size = add_size(size, mul_size(sizeof(int), NumProcStateSlots));        /* pgprocnos */
     225                 :             : 
     226                 :          30 :         return size;
     227                 :          15 : }
     228                 :             : 
     229                 :             : /*
     230                 :             :  * SharedInvalShmemInit
     231                 :             :  *              Create and initialize the SI message buffer
     232                 :             :  */
     233                 :             : void
     234                 :           6 : SharedInvalShmemInit(void)
     235                 :             : {
     236                 :           6 :         int                     i;
     237                 :           6 :         bool            found;
     238                 :             : 
     239                 :             :         /* Allocate space in shared memory */
     240                 :           6 :         shmInvalBuffer = (SISeg *)
     241                 :           6 :                 ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
     242         [ -  + ]:           6 :         if (found)
     243                 :           0 :                 return;
     244                 :             : 
     245                 :             :         /* Clear message counters, save size of procState array, init spinlock */
     246                 :           6 :         shmInvalBuffer->minMsgNum = 0;
     247                 :           6 :         shmInvalBuffer->maxMsgNum = 0;
     248                 :           6 :         shmInvalBuffer->nextThreshold = CLEANUP_MIN;
     249                 :           6 :         SpinLockInit(&shmInvalBuffer->msgnumLock);
     250                 :             : 
     251                 :             :         /* The buffer[] array is initially all unused, so we need not fill it */
     252                 :             : 
     253                 :             :         /* Mark all backends inactive, and initialize nextLXID */
     254         [ +  + ]:        1040 :         for (i = 0; i < NumProcStateSlots; i++)
     255                 :             :         {
     256                 :        1034 :                 shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
     257                 :        1034 :                 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
     258                 :        1034 :                 shmInvalBuffer->procState[i].resetState = false;
     259                 :        1034 :                 shmInvalBuffer->procState[i].signaled = false;
     260                 :        1034 :                 shmInvalBuffer->procState[i].hasMessages = false;
     261                 :        1034 :                 shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
     262                 :        1034 :         }
     263                 :           6 :         shmInvalBuffer->numProcs = 0;
     264                 :           6 :         shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
     265         [ -  + ]:           6 : }
     266                 :             : 
     267                 :             : /*
     268                 :             :  * SharedInvalBackendInit
     269                 :             :  *              Initialize a new backend to operate on the sinval buffer
     270                 :             :  */
     271                 :             : void
     272                 :         798 : SharedInvalBackendInit(bool sendOnly)
     273                 :             : {
     274                 :         798 :         ProcState  *stateP;
     275                 :         798 :         pid_t           oldPid;
     276                 :         798 :         SISeg      *segP = shmInvalBuffer;
     277                 :             : 
     278         [ +  - ]:         798 :         if (MyProcNumber < 0)
     279   [ #  #  #  # ]:           0 :                 elog(ERROR, "MyProcNumber not set");
     280         [ +  - ]:         798 :         if (MyProcNumber >= NumProcStateSlots)
     281   [ #  #  #  # ]:           0 :                 elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
     282                 :             :                          MyProcNumber, NumProcStateSlots);
     283                 :         798 :         stateP = &segP->procState[MyProcNumber];
     284                 :             : 
     285                 :             :         /*
     286                 :             :          * This can run in parallel with read operations, but not with write
     287                 :             :          * operations, since SIInsertDataEntries relies on the pgprocnos array to
     288                 :             :          * set hasMessages appropriately.
     289                 :             :          */
     290                 :         798 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     291                 :             : 
     292                 :         798 :         oldPid = stateP->procPid;
     293         [ +  - ]:         798 :         if (oldPid != 0)
     294                 :             :         {
     295                 :           0 :                 LWLockRelease(SInvalWriteLock);
     296   [ #  #  #  # ]:           0 :                 elog(ERROR, "sinval slot for backend %d is already in use by process %d",
     297                 :             :                          MyProcNumber, (int) oldPid);
     298                 :           0 :         }
     299                 :             : 
     300                 :         798 :         shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
     301                 :             : 
     302                 :             :         /* Fetch next local transaction ID into local memory */
     303                 :         798 :         nextLocalTransactionId = stateP->nextLXID;
     304                 :             : 
     305                 :             :         /* mark myself active, with all extant messages already read */
     306                 :         798 :         stateP->procPid = MyProcPid;
     307                 :         798 :         stateP->nextMsgNum = segP->maxMsgNum;
     308                 :         798 :         stateP->resetState = false;
     309                 :         798 :         stateP->signaled = false;
     310                 :         798 :         stateP->hasMessages = false;
     311                 :         798 :         stateP->sendOnly = sendOnly;
     312                 :             : 
     313                 :         798 :         LWLockRelease(SInvalWriteLock);
     314                 :             : 
     315                 :             :         /* register exit routine to mark my entry inactive at exit */
     316                 :         798 :         on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
     317                 :         798 : }
     318                 :             : 
     319                 :             : /*
     320                 :             :  * CleanupInvalidationState
     321                 :             :  *              Mark the current backend as no longer active.
     322                 :             :  *
     323                 :             :  * This function is called via on_shmem_exit() during backend shutdown.
     324                 :             :  *
     325                 :             :  * arg is really of type "SISeg*".
     326                 :             :  */
     327                 :             : static void
     328                 :         798 : CleanupInvalidationState(int status, Datum arg)
     329                 :             : {
     330                 :         798 :         SISeg      *segP = (SISeg *) DatumGetPointer(arg);
     331                 :         798 :         ProcState  *stateP;
     332                 :         798 :         int                     i;
     333                 :             : 
     334         [ +  - ]:         798 :         Assert(segP);
     335                 :             : 
     336                 :         798 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     337                 :             : 
     338                 :         798 :         stateP = &segP->procState[MyProcNumber];
     339                 :             : 
     340                 :             :         /* Update next local transaction ID for next holder of this proc number */
     341                 :         798 :         stateP->nextLXID = nextLocalTransactionId;
     342                 :             : 
     343                 :             :         /* Mark myself inactive */
     344                 :         798 :         stateP->procPid = 0;
     345                 :         798 :         stateP->nextMsgNum = 0;
     346                 :         798 :         stateP->resetState = false;
     347                 :         798 :         stateP->signaled = false;
     348                 :             : 
     349         [ -  + ]:        2123 :         for (i = segP->numProcs - 1; i >= 0; i--)
     350                 :             :         {
     351         [ +  + ]:        2123 :                 if (segP->pgprocnos[i] == MyProcNumber)
     352                 :             :                 {
     353         [ +  + ]:         798 :                         if (i != segP->numProcs - 1)
     354                 :         389 :                                 segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
     355                 :         798 :                         break;
     356                 :             :                 }
     357                 :        1325 :         }
     358         [ +  - ]:         798 :         if (i < 0)
     359   [ #  #  #  # ]:           0 :                 elog(PANIC, "could not find entry in sinval array");
     360                 :         798 :         segP->numProcs--;
     361                 :             : 
     362                 :         798 :         LWLockRelease(SInvalWriteLock);
     363                 :         798 : }
     364                 :             : 
     365                 :             : /*
     366                 :             :  * SIInsertDataEntries
     367                 :             :  *              Add new invalidation message(s) to the buffer.
     368                 :             :  */
     369                 :             : void
     370                 :       55290 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
     371                 :             : {
     372                 :       55290 :         SISeg      *segP = shmInvalBuffer;
     373                 :             : 
     374                 :             :         /*
     375                 :             :          * N can be arbitrarily large.  We divide the work into groups of no more
     376                 :             :          * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     377                 :             :          * an unreasonably long time.  (This is not so much because we care about
     378                 :             :          * letting in other writers, as that some just-caught-up backend might be
     379                 :             :          * trying to do SICleanupQueue to pass on its signal, and we don't want it
     380                 :             :          * to have to wait a long time.)  Also, we need to consider calling
     381                 :             :          * SICleanupQueue every so often.
     382                 :             :          */
     383         [ +  + ]:      112514 :         while (n > 0)
     384                 :             :         {
     385         [ +  + ]:       57224 :                 int                     nthistime = Min(n, WRITE_QUANTUM);
     386                 :       57224 :                 int                     numMsgs;
     387                 :       57224 :                 int                     max;
     388                 :       57224 :                 int                     i;
     389                 :             : 
     390                 :       57224 :                 n -= nthistime;
     391                 :             : 
     392                 :       57224 :                 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     393                 :             : 
     394                 :             :                 /*
     395                 :             :                  * If the buffer is full, we *must* acquire some space.  Clean the
     396                 :             :                  * queue and reset anyone who is preventing space from being freed.
     397                 :             :                  * Otherwise, clean the queue only when it's exceeded the next
     398                 :             :                  * fullness threshold.  We have to loop and recheck the buffer state
     399                 :             :                  * after any call of SICleanupQueue.
     400                 :             :                  */
     401                 :       58092 :                 for (;;)
     402                 :             :                 {
     403                 :       58092 :                         numMsgs = segP->maxMsgNum - segP->minMsgNum;
     404   [ +  +  +  + ]:       58092 :                         if (numMsgs + nthistime > MAXNUMMESSAGES ||
     405                 :       58040 :                                 numMsgs >= segP->nextThreshold)
     406                 :         868 :                                 SICleanupQueue(true, nthistime);
     407                 :             :                         else
     408                 :       57224 :                                 break;
     409                 :             :                 }
     410                 :             : 
     411                 :             :                 /*
     412                 :             :                  * Insert new message(s) into proper slot of circular buffer
     413                 :             :                  */
     414                 :       57224 :                 max = segP->maxMsgNum;
     415         [ +  + ]:      562990 :                 while (nthistime-- > 0)
     416                 :             :                 {
     417                 :      505766 :                         segP->buffer[max % MAXNUMMESSAGES] = *data++;
     418                 :      505766 :                         max++;
     419                 :             :                 }
     420                 :             : 
     421                 :             :                 /* Update current value of maxMsgNum using spinlock */
     422         [ -  + ]:       57224 :                 SpinLockAcquire(&segP->msgnumLock);
     423                 :       57224 :                 segP->maxMsgNum = max;
     424                 :       57224 :                 SpinLockRelease(&segP->msgnumLock);
     425                 :             : 
     426                 :             :                 /*
     427                 :             :                  * Now that the maxMsgNum change is globally visible, we give everyone
     428                 :             :                  * a swift kick to make sure they read the newly added messages.
     429                 :             :                  * Releasing SInvalWriteLock will enforce a full memory barrier, so
     430                 :             :                  * these (unlocked) changes will be committed to memory before we exit
     431                 :             :                  * the function.
     432                 :             :                  */
     433         [ +  + ]:      515455 :                 for (i = 0; i < segP->numProcs; i++)
     434                 :             :                 {
     435                 :      458231 :                         ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     436                 :             : 
     437                 :      458231 :                         stateP->hasMessages = true;
     438                 :      458231 :                 }
     439                 :             : 
     440                 :       57224 :                 LWLockRelease(SInvalWriteLock);
     441                 :       57224 :         }
     442                 :       55290 : }
     443                 :             : 
     444                 :             : /*
     445                 :             :  * SIGetDataEntries
     446                 :             :  *              get next SI message(s) for current backend, if there are any
     447                 :             :  *
     448                 :             :  * Possible return values:
     449                 :             :  *      0:       no SI message available
     450                 :             :  *      n>0: next n SI messages have been extracted into data[]
     451                 :             :  * -1:   SI reset message extracted
     452                 :             :  *
     453                 :             :  * If the return value is less than the array size "datasize", the caller
     454                 :             :  * can assume that there are no more SI messages after the one(s) returned.
     455                 :             :  * Otherwise, another call is needed to collect more messages.
     456                 :             :  *
     457                 :             :  * NB: this can run in parallel with other instances of SIGetDataEntries
     458                 :             :  * executing on behalf of other backends, since each instance will modify only
     459                 :             :  * fields of its own backend's ProcState, and no instance will look at fields
     460                 :             :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
     461                 :             :  * in shared mode.  Note that this is not exactly the normal (read-only)
     462                 :             :  * interpretation of a shared lock! Look closely at the interactions before
     463                 :             :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
     464                 :             :  *
     465                 :             :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
     466                 :             :  * guaranteed that we will return any messages added after the routine is
     467                 :             :  * entered.
     468                 :             :  *
     469                 :             :  * Note: we assume that "datasize" is not so large that it might be important
     470                 :             :  * to break our hold on SInvalReadLock into segments.
     471                 :             :  */
     472                 :             : int
     473                 :     2448705 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     474                 :             : {
     475                 :     2448705 :         SISeg      *segP;
     476                 :     2448705 :         ProcState  *stateP;
     477                 :     2448705 :         int                     max;
     478                 :     2448705 :         int                     n;
     479                 :             : 
     480                 :     2448705 :         segP = shmInvalBuffer;
     481                 :     2448705 :         stateP = &segP->procState[MyProcNumber];
     482                 :             : 
     483                 :             :         /*
     484                 :             :          * Before starting to take locks, do a quick, unlocked test to see whether
     485                 :             :          * there can possibly be anything to read.  On a multiprocessor system,
     486                 :             :          * it's possible that this load could migrate backwards and occur before
     487                 :             :          * we actually enter this function, so we might miss a sinval message that
     488                 :             :          * was just added by some other processor.  But they can't migrate
     489                 :             :          * backwards over a preceding lock acquisition, so it should be OK.  If we
     490                 :             :          * haven't acquired a lock preventing against further relevant
     491                 :             :          * invalidations, any such occurrence is not much different than if the
     492                 :             :          * invalidation had arrived slightly later in the first place.
     493                 :             :          */
     494         [ +  + ]:     2448705 :         if (!stateP->hasMessages)
     495                 :     2325697 :                 return 0;
     496                 :             : 
     497                 :      123008 :         LWLockAcquire(SInvalReadLock, LW_SHARED);
     498                 :             : 
     499                 :             :         /*
     500                 :             :          * We must reset hasMessages before determining how many messages we're
     501                 :             :          * going to read.  That way, if new messages arrive after we have
     502                 :             :          * determined how many we're reading, the flag will get reset and we'll
     503                 :             :          * notice those messages part-way through.
     504                 :             :          *
     505                 :             :          * Note that, if we don't end up reading all of the messages, we had
     506                 :             :          * better be certain to reset this flag before exiting!
     507                 :             :          */
     508                 :      123008 :         stateP->hasMessages = false;
     509                 :             : 
     510                 :             :         /* Fetch current value of maxMsgNum using spinlock */
     511         [ +  + ]:      123008 :         SpinLockAcquire(&segP->msgnumLock);
     512                 :      123008 :         max = segP->maxMsgNum;
     513                 :      123008 :         SpinLockRelease(&segP->msgnumLock);
     514                 :             : 
     515         [ +  + ]:      123008 :         if (stateP->resetState)
     516                 :             :         {
     517                 :             :                 /*
     518                 :             :                  * Force reset.  We can say we have dealt with any messages added
     519                 :             :                  * since the reset, as well; and that means we should clear the
     520                 :             :                  * signaled flag, too.
     521                 :             :                  */
     522                 :          52 :                 stateP->nextMsgNum = max;
     523                 :          52 :                 stateP->resetState = false;
     524                 :          52 :                 stateP->signaled = false;
     525                 :          52 :                 LWLockRelease(SInvalReadLock);
     526                 :          52 :                 return -1;
     527                 :             :         }
     528                 :             : 
     529                 :             :         /*
     530                 :             :          * Retrieve messages and advance backend's counter, until data array is
     531                 :             :          * full or there are no more messages.
     532                 :             :          *
     533                 :             :          * There may be other backends that haven't read the message(s), so we
     534                 :             :          * cannot delete them here.  SICleanupQueue() will eventually remove them
     535                 :             :          * from the queue.
     536                 :             :          */
     537                 :      122956 :         n = 0;
     538   [ +  +  +  + ]:     2740567 :         while (n < datasize && stateP->nextMsgNum < max)
     539                 :             :         {
     540                 :     2617611 :                 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
     541                 :     2617611 :                 stateP->nextMsgNum++;
     542                 :             :         }
     543                 :             : 
     544                 :             :         /*
     545                 :             :          * If we have caught up completely, reset our "signaled" flag so that
     546                 :             :          * we'll get another signal if we fall behind again.
     547                 :             :          *
     548                 :             :          * If we haven't caught up completely, reset the hasMessages flag so that
     549                 :             :          * we see the remaining messages next time.
     550                 :             :          */
     551         [ +  + ]:      122956 :         if (stateP->nextMsgNum >= max)
     552                 :       60603 :                 stateP->signaled = false;
     553                 :             :         else
     554                 :       62353 :                 stateP->hasMessages = true;
     555                 :             : 
     556                 :      122956 :         LWLockRelease(SInvalReadLock);
     557                 :      122956 :         return n;
     558                 :     2448705 : }
     559                 :             : 
     560                 :             : /*
     561                 :             :  * SICleanupQueue
     562                 :             :  *              Remove messages that have been consumed by all active backends
     563                 :             :  *
     564                 :             :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
     565                 :             :  * minFree is the minimum number of message slots to make free.
     566                 :             :  *
     567                 :             :  * Possible side effects of this routine include marking one or more
     568                 :             :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
     569                 :             :  * to some backend that seems to be getting too far behind.  We signal at
     570                 :             :  * most one backend at a time, for reasons explained at the top of the file.
     571                 :             :  *
     572                 :             :  * Caution: because we transiently release write lock when we have to signal
     573                 :             :  * some other backend, it is NOT guaranteed that there are still minFree
     574                 :             :  * free message slots at exit.  Caller must recheck and perhaps retry.
     575                 :             :  */
     576                 :             : void
     577                 :         998 : SICleanupQueue(bool callerHasWriteLock, int minFree)
     578                 :             : {
     579                 :         998 :         SISeg      *segP = shmInvalBuffer;
     580                 :         998 :         int                     min,
     581                 :             :                                 minsig,
     582                 :             :                                 lowbound,
     583                 :             :                                 numMsgs,
     584                 :             :                                 i;
     585                 :         998 :         ProcState  *needSig = NULL;
     586                 :             : 
     587                 :             :         /* Lock out all writers and readers */
     588         [ +  + ]:         998 :         if (!callerHasWriteLock)
     589                 :         130 :                 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     590                 :         998 :         LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
     591                 :             : 
     592                 :             :         /*
     593                 :             :          * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     594                 :             :          * furthest-back backend that needs signaling (if any), and reset any
     595                 :             :          * backends that are too far back.  Note that because we ignore sendOnly
     596                 :             :          * backends here it is possible for them to keep sending messages without
     597                 :             :          * a problem even when they are the only active backend.
     598                 :             :          */
     599                 :         998 :         min = segP->maxMsgNum;
     600                 :         998 :         minsig = min - SIG_THRESHOLD;
     601                 :         998 :         lowbound = min - MAXNUMMESSAGES + minFree;
     602                 :             : 
     603         [ +  + ]:       10714 :         for (i = 0; i < segP->numProcs; i++)
     604                 :             :         {
     605                 :        9716 :                 ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     606                 :        9716 :                 int                     n = stateP->nextMsgNum;
     607                 :             : 
     608                 :             :                 /* Ignore if already in reset state */
     609         [ +  - ]:        9716 :                 Assert(stateP->procPid != 0);
     610   [ +  +  -  + ]:        9716 :                 if (stateP->resetState || stateP->sendOnly)
     611                 :         612 :                         continue;
     612                 :             : 
     613                 :             :                 /*
     614                 :             :                  * If we must free some space and this backend is preventing it, force
     615                 :             :                  * him into reset state and then ignore until he catches up.
     616                 :             :                  */
     617         [ +  + ]:        9104 :                 if (n < lowbound)
     618                 :             :                 {
     619                 :          53 :                         stateP->resetState = true;
     620                 :             :                         /* no point in signaling him ... */
     621                 :          53 :                         continue;
     622                 :             :                 }
     623                 :             : 
     624                 :             :                 /* Track the global minimum nextMsgNum */
     625         [ +  + ]:        9051 :                 if (n < min)
     626                 :        1874 :                         min = n;
     627                 :             : 
     628                 :             :                 /* Also see who's furthest back of the unsignaled backends */
     629   [ +  +  +  + ]:        9051 :                 if (n < minsig && !stateP->signaled)
     630                 :             :                 {
     631                 :         460 :                         minsig = n;
     632                 :         460 :                         needSig = stateP;
     633                 :         460 :                 }
     634      [ -  +  + ]:        9716 :         }
     635                 :         998 :         segP->minMsgNum = min;
     636                 :             : 
     637                 :             :         /*
     638                 :             :          * When minMsgNum gets really large, decrement all message counters so as
     639                 :             :          * to forestall overflow of the counters.  This happens seldom enough that
     640                 :             :          * folding it into the previous loop would be a loser.
     641                 :             :          */
     642         [ +  - ]:         998 :         if (min >= MSGNUMWRAPAROUND)
     643                 :             :         {
     644                 :           0 :                 segP->minMsgNum -= MSGNUMWRAPAROUND;
     645                 :           0 :                 segP->maxMsgNum -= MSGNUMWRAPAROUND;
     646         [ #  # ]:           0 :                 for (i = 0; i < segP->numProcs; i++)
     647                 :           0 :                         segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
     648                 :           0 :         }
     649                 :             : 
     650                 :             :         /*
     651                 :             :          * Determine how many messages are still in the queue, and set the
     652                 :             :          * threshold at which we should repeat SICleanupQueue().
     653                 :             :          */
     654                 :         998 :         numMsgs = segP->maxMsgNum - segP->minMsgNum;
     655         [ +  + ]:         998 :         if (numMsgs < CLEANUP_MIN)
     656                 :         183 :                 segP->nextThreshold = CLEANUP_MIN;
     657                 :             :         else
     658                 :         815 :                 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
     659                 :             : 
     660                 :             :         /*
     661                 :             :          * Lastly, signal anyone who needs a catchup interrupt.  Since
     662                 :             :          * SendProcSignal() might not be fast, we don't want to hold locks while
     663                 :             :          * executing it.
     664                 :             :          */
     665         [ +  + ]:         998 :         if (needSig)
     666                 :             :         {
     667                 :         447 :                 pid_t           his_pid = needSig->procPid;
     668                 :         447 :                 ProcNumber      his_procNumber = (needSig - &segP->procState[0]);
     669                 :             : 
     670                 :         447 :                 needSig->signaled = true;
     671                 :         447 :                 LWLockRelease(SInvalReadLock);
     672                 :         447 :                 LWLockRelease(SInvalWriteLock);
     673   [ -  +  -  + ]:         447 :                 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
     674                 :         447 :                 SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
     675         [ +  + ]:         447 :                 if (callerHasWriteLock)
     676                 :         432 :                         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     677                 :         447 :         }
     678                 :             :         else
     679                 :             :         {
     680                 :         551 :                 LWLockRelease(SInvalReadLock);
     681         [ +  + ]:         551 :                 if (!callerHasWriteLock)
     682                 :         115 :                         LWLockRelease(SInvalWriteLock);
     683                 :             :         }
     684                 :         998 : }
     685                 :             : 
     686                 :             : 
     687                 :             : /*
     688                 :             :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
     689                 :             :  *
     690                 :             :  * We split VirtualTransactionIds into two parts so that it is possible
     691                 :             :  * to allocate a new one without any contention for shared memory, except
     692                 :             :  * for a bit of additional overhead during backend startup/shutdown.
     693                 :             :  * The high-order part of a VirtualTransactionId is a ProcNumber, and the
     694                 :             :  * low-order part is a LocalTransactionId, which we assign from a local
     695                 :             :  * counter.  To avoid the risk of a VirtualTransactionId being reused
     696                 :             :  * within a short interval, successive procs occupying the same PGPROC slot
     697                 :             :  * should use a consecutive sequence of local IDs, which is implemented
     698                 :             :  * by copying nextLocalTransactionId as seen above.
     699                 :             :  */
     700                 :             : LocalTransactionId
     701                 :       57914 : GetNextLocalTransactionId(void)
     702                 :             : {
     703                 :       57914 :         LocalTransactionId result;
     704                 :             : 
     705                 :             :         /* loop to avoid returning InvalidLocalTransactionId at wraparound */
     706                 :       57914 :         do
     707                 :             :         {
     708                 :       58039 :                 result = nextLocalTransactionId++;
     709         [ +  + ]:       58039 :         } while (!LocalTransactionIdIsValid(result));
     710                 :             : 
     711                 :      115828 :         return result;
     712                 :       57914 : }
        

Generated by: LCOV version 2.3.2-1