LCOV - code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 61.5 % 1045 643
Test Date: 2026-01-26 10:56:24 Functions: 79.6 % 54 43
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 33.6 % 657 221

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * async.c
       4                 :             :  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *        src/backend/commands/async.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : /*-------------------------------------------------------------------------
      16                 :             :  * Async Notification Model as of v19:
      17                 :             :  *
      18                 :             :  * 1. Multiple backends on same machine.  Multiple backends may be listening
      19                 :             :  *        on each of several channels.
      20                 :             :  *
      21                 :             :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
      22                 :             :  *        with actively-used pages mapped into shared memory by the slru.c module.
      23                 :             :  *        All notification messages are placed in the queue and later read out
      24                 :             :  *        by listening backends.  The single queue allows us to guarantee that
      25                 :             :  *        notifications are received in commit order.
      26                 :             :  *
      27                 :             :  *        Although there is only one queue, notifications are treated as being
      28                 :             :  *        database-local; this is done by including the sender's database OID
      29                 :             :  *        in each notification message.  Listening backends ignore messages
      30                 :             :  *        that don't match their database OID.  This is important because it
      31                 :             :  *        ensures senders and receivers have the same database encoding and won't
      32                 :             :  *        misinterpret non-ASCII text in the channel name or payload string.
      33                 :             :  *
      34                 :             :  *        Since notifications are not expected to survive database crashes,
      35                 :             :  *        we can simply clean out the pg_notify data at any reboot, and there
      36                 :             :  *        is no need for WAL support or fsync'ing.
      37                 :             :  *
      38                 :             :  * 3. Every backend that is listening on at least one channel registers by
      39                 :             :  *        entering its PID into the array in AsyncQueueControl. It then scans all
      40                 :             :  *        incoming notifications in the central queue and first compares the
      41                 :             :  *        database OID of the notification with its own database OID and then
      42                 :             :  *        compares the notified channel with the list of channels that it listens
      43                 :             :  *        to. In case there is a match it delivers the notification event to its
      44                 :             :  *        frontend.  Non-matching events are simply skipped.
      45                 :             :  *
      46                 :             :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
      47                 :             :  *        a backend-local list which will not be processed until transaction end.
      48                 :             :  *
      49                 :             :  *        Duplicate notifications from the same transaction are sent out as one
      50                 :             :  *        notification only. This is done to save work when for example a trigger
      51                 :             :  *        on a 2 million row table fires a notification for each row that has been
      52                 :             :  *        changed. If the application needs to receive every single notification
      53                 :             :  *        that has been sent, it can easily add some unique string into the extra
      54                 :             :  *        payload parameter.
      55                 :             :  *
      56                 :             :  *        When the transaction is ready to commit, PreCommit_Notify() adds the
      57                 :             :  *        pending notifications to the head of the queue. The head pointer of the
      58                 :             :  *        queue always points to the next free position and a position is just a
      59                 :             :  *        page number and the offset in that page. This is done before marking the
      60                 :             :  *        transaction as committed in clog. If we run into problems writing the
      61                 :             :  *        notifications, we can still call elog(ERROR, ...) and the transaction
      62                 :             :  *        will roll back safely.
      63                 :             :  *
      64                 :             :  *        Once we have put all of the notifications into the queue, we return to
      65                 :             :  *        CommitTransaction() which will then do the actual transaction commit.
      66                 :             :  *
      67                 :             :  *        After commit we are called another time (AtCommit_Notify()). Here we
      68                 :             :  *        make any required updates to the effective listen state (see below).
      69                 :             :  *        Then we signal any backends that may be interested in our messages
      70                 :             :  *        (including our own backend, if listening).  This is done by
      71                 :             :  *        SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
      72                 :             :  *        each relevant backend, as described below.
      73                 :             :  *
      74                 :             :  *        Finally, after we are out of the transaction altogether and about to go
      75                 :             :  *        idle, we scan the queue for messages that need to be sent to our
      76                 :             :  *        frontend (which might be notifies from other backends, or self-notifies
      77                 :             :  *        from our own).  This step is not part of the CommitTransaction sequence
      78                 :             :  *        for two important reasons.  First, we could get errors while sending
      79                 :             :  *        data to our frontend, and it's really bad for errors to happen in
      80                 :             :  *        post-commit cleanup.  Second, in cases where a procedure issues commits
      81                 :             :  *        within a single frontend command, we don't want to send notifies to our
      82                 :             :  *        frontend until the command is done; but notifies to other backends
      83                 :             :  *        should go out immediately after each commit.
      84                 :             :  *
      85                 :             :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
      86                 :             :  *        sets the process's latch, which triggers the event to be processed
      87                 :             :  *        immediately if this backend is idle (i.e., it is waiting for a frontend
      88                 :             :  *        command and is not within a transaction block. C.f.
      89                 :             :  *        ProcessClientReadInterrupt()).  Otherwise the handler may only set a
      90                 :             :  *        flag, which will cause the processing to occur just before we next go
      91                 :             :  *        idle.
      92                 :             :  *
      93                 :             :  *        Inbound-notify processing consists of reading all of the notifications
      94                 :             :  *        that have arrived since scanning last time. We read every notification
      95                 :             :  *        until we reach either a notification from an uncommitted transaction or
      96                 :             :  *        the head pointer's position.
      97                 :             :  *
      98                 :             :  * 6. To limit disk space consumption, the tail pointer needs to be advanced
      99                 :             :  *        so that old pages can be truncated. This is relatively expensive
     100                 :             :  *        (notably, it requires an exclusive lock), so we don't want to do it
     101                 :             :  *        often. We make sending backends do this work if they advanced the queue
     102                 :             :  *        head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
     103                 :             :  *
     104                 :             :  * 7. So far we have not discussed how backends change their listening state,
     105                 :             :  *        nor how notification senders know which backends to awaken.  To handle
     106                 :             :  *        the latter, we maintain a global channel table (implemented as a dynamic
     107                 :             :  *        shared hash table, or dshash) that maps channel names to the set of
     108                 :             :  *        backends listening on each channel.  This table is created lazily on the
     109                 :             :  *        first LISTEN command and grows dynamically as needed.  There is also a
     110                 :             :  *        local channel table (a plain dynahash table) in each listening backend,
     111                 :             :  *        tracking which channels that backend is listening to.  The local table
     112                 :             :  *        serves to reduce the number of accesses needed to the shared table.
     113                 :             :  *
     114                 :             :  *        If the current transaction has executed any LISTEN/UNLISTEN actions,
     115                 :             :  *        PreCommit_Notify() prepares to commit those.  For LISTEN, it
     116                 :             :  *        pre-allocates entries in both the per-backend localChannelTable and the
     117                 :             :  *        shared globalChannelTable (with listening=false so that these entries
     118                 :             :  *        are no-ops for the moment).  It also records the final per-channel
     119                 :             :  *        intent in pendingListenActions, so post-commit/abort processing can
     120                 :             :  *        apply that in a single step.  Since all these allocations happen before
     121                 :             :  *        committing to clog, we can safely abort the transaction on failure.
     122                 :             :  *
     123                 :             :  *        After commit, AtCommit_Notify() runs through pendingListenActions and
     124                 :             :  *        updates the backend's per-channel listening flags to activate or
     125                 :             :  *        deactivate listening.  This happens before sending signals.
     126                 :             :  *
     127                 :             :  *        SignalBackends() consults the shared global channel table to identify
     128                 :             :  *        listeners for the channels that the current transaction sent
     129                 :             :  *        notification(s) to.  Each selected backend is marked as having a wakeup
     130                 :             :  *        pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
     131                 :             :  *        signal is sent to it.
     132                 :             :  *
     133                 :             :  * 8. While writing notifications, PreCommit_Notify() records the queue head
     134                 :             :  *        position both before and after the write.  Because all writers serialize
     135                 :             :  *        on a cluster-wide heavyweight lock, no other backend can insert entries
     136                 :             :  *        between these two points.  SignalBackends() uses this fact to directly
     137                 :             :  *        advance the queue pointer for any backend that is still positioned at
     138                 :             :  *        the old head, or within the range written, but is not interested in any
     139                 :             :  *        of our notifications.  This avoids unnecessary wakeups for idle
     140                 :             :  *        listeners that have nothing to read.  Backends that are not interested
     141                 :             :  *        in our notifications, but cannot be directly advanced, are signaled only
     142                 :             :  *        if they are far behind the current queue head; that is to ensure that
     143                 :             :  *        we can advance the queue tail without undue delay.
     144                 :             :  *
     145                 :             :  * An application that listens on the same channel it notifies will get
     146                 :             :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
     147                 :             :  * by comparing be_pid in the NOTIFY message to the application's own backend's
     148                 :             :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
     149                 :             :  * frontend during startup.)  The above design guarantees that notifies from
     150                 :             :  * other backends will never be missed by ignoring self-notifies.
     151                 :             :  *
     152                 :             :  * The amount of shared memory used for notify management (notify_buffers)
     153                 :             :  * can be varied without affecting anything but performance.  The maximum
     154                 :             :  * amount of notification data that can be queued at one time is determined
     155                 :             :  * by the max_notify_queue_pages GUC.
     156                 :             :  *-------------------------------------------------------------------------
     157                 :             :  */
     158                 :             : 
     159                 :             : #include "postgres.h"
     160                 :             : 
     161                 :             : #include <limits.h>
     162                 :             : #include <unistd.h>
     163                 :             : #include <signal.h>
     164                 :             : 
     165                 :             : #include "access/parallel.h"
     166                 :             : #include "access/slru.h"
     167                 :             : #include "access/transam.h"
     168                 :             : #include "access/xact.h"
     169                 :             : #include "catalog/pg_database.h"
     170                 :             : #include "commands/async.h"
     171                 :             : #include "common/hashfn.h"
     172                 :             : #include "funcapi.h"
     173                 :             : #include "lib/dshash.h"
     174                 :             : #include "libpq/libpq.h"
     175                 :             : #include "libpq/pqformat.h"
     176                 :             : #include "miscadmin.h"
     177                 :             : #include "storage/dsm_registry.h"
     178                 :             : #include "storage/ipc.h"
     179                 :             : #include "storage/lmgr.h"
     180                 :             : #include "storage/procsignal.h"
     181                 :             : #include "tcop/tcopprot.h"
     182                 :             : #include "utils/builtins.h"
     183                 :             : #include "utils/dsa.h"
     184                 :             : #include "utils/guc_hooks.h"
     185                 :             : #include "utils/memutils.h"
     186                 :             : #include "utils/ps_status.h"
     187                 :             : #include "utils/snapmgr.h"
     188                 :             : #include "utils/timestamp.h"
     189                 :             : 
     190                 :             : 
     191                 :             : /*
     192                 :             :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
     193                 :             :  * must be kept small enough so that a notification message fits on one
     194                 :             :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
     195                 :             :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
     196                 :             :  * than that, so changes in that data structure won't affect user-visible
     197                 :             :  * restrictions.
     198                 :             :  */
     199                 :             : #define NOTIFY_PAYLOAD_MAX_LENGTH       (BLCKSZ - NAMEDATALEN - 128)
     200                 :             : 
     201                 :             : /*
     202                 :             :  * Struct representing an entry in the global notify queue
     203                 :             :  *
     204                 :             :  * This struct declaration has the maximal length, but in a real queue entry
     205                 :             :  * the data area is only big enough for the actual channel and payload strings
     206                 :             :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
     207                 :             :  * entry size, if both channel and payload strings are empty (but note it
     208                 :             :  * doesn't include alignment padding).
     209                 :             :  *
     210                 :             :  * The "length" field should always be rounded up to the next QUEUEALIGN
     211                 :             :  * multiple so that all fields are properly aligned.
     212                 :             :  */
     213                 :             : typedef struct AsyncQueueEntry
     214                 :             : {
     215                 :             :         int                     length;                 /* total allocated length of entry */
     216                 :             :         Oid                     dboid;                  /* sender's database OID */
     217                 :             :         TransactionId xid;                      /* sender's XID */
     218                 :             :         int32           srcPid;                 /* sender's PID */
     219                 :             :         char            data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
     220                 :             : } AsyncQueueEntry;
     221                 :             : 
     222                 :             : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
     223                 :             : #define QUEUEALIGN(len)         INTALIGN(len)
     224                 :             : 
     225                 :             : #define AsyncQueueEntryEmptySize        (offsetof(AsyncQueueEntry, data) + 2)
     226                 :             : 
     227                 :             : /*
     228                 :             :  * Struct describing a queue position, and assorted macros for working with it
     229                 :             :  */
     230                 :             : typedef struct QueuePosition
     231                 :             : {
     232                 :             :         int64           page;                   /* SLRU page number */
     233                 :             :         int                     offset;                 /* byte offset within page */
     234                 :             : } QueuePosition;
     235                 :             : 
     236                 :             : #define QUEUE_POS_PAGE(x)               ((x).page)
     237                 :             : #define QUEUE_POS_OFFSET(x)             ((x).offset)
     238                 :             : 
     239                 :             : #define SET_QUEUE_POS(x,y,z) \
     240                 :             :         do { \
     241                 :             :                 (x).page = (y); \
     242                 :             :                 (x).offset = (z); \
     243                 :             :         } while (0)
     244                 :             : 
     245                 :             : #define QUEUE_POS_EQUAL(x,y) \
     246                 :             :         ((x).page == (y).page && (x).offset == (y).offset)
     247                 :             : 
     248                 :             : #define QUEUE_POS_IS_ZERO(x) \
     249                 :             :         ((x).page == 0 && (x).offset == 0)
     250                 :             : 
     251                 :             : /* choose logically smaller QueuePosition */
     252                 :             : #define QUEUE_POS_MIN(x,y) \
     253                 :             :         (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     254                 :             :          (x).page != (y).page ? (y) : \
     255                 :             :          (x).offset < (y).offset ? (x) : (y))
     256                 :             : 
     257                 :             : /* choose logically larger QueuePosition */
     258                 :             : #define QUEUE_POS_MAX(x,y) \
     259                 :             :         (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
     260                 :             :          (x).page != (y).page ? (x) : \
     261                 :             :          (x).offset > (y).offset ? (x) : (y))
     262                 :             : 
     263                 :             : /* returns true if x comes before y in queue order */
     264                 :             : #define QUEUE_POS_PRECEDES(x,y) \
     265                 :             :         (asyncQueuePagePrecedes((x).page, (y).page) || \
     266                 :             :          ((x).page == (y).page && (x).offset < (y).offset))
     267                 :             : 
     268                 :             : /*
     269                 :             :  * Parameter determining how often we try to advance the tail pointer:
     270                 :             :  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
     271                 :             :  * also the distance by which a backend that's not interested in our
     272                 :             :  * notifications needs to be behind before we'll decide we need to wake it
     273                 :             :  * up so it can advance its pointer.
     274                 :             :  *
     275                 :             :  * Resist the temptation to make this really large.  While that would save
     276                 :             :  * work in some places, it would add cost in others.  In particular, this
     277                 :             :  * should likely be less than notify_buffers, to ensure that backends
     278                 :             :  * catch up before the pages they'll need to read fall out of SLRU cache.
     279                 :             :  */
     280                 :             : #define QUEUE_CLEANUP_DELAY 4
     281                 :             : 
     282                 :             : /*
     283                 :             :  * Struct describing a listening backend's status
     284                 :             :  */
     285                 :             : typedef struct QueueBackendStatus
     286                 :             : {
     287                 :             :         int32           pid;                    /* either a PID or InvalidPid */
     288                 :             :         Oid                     dboid;                  /* backend's database OID, or InvalidOid */
     289                 :             :         ProcNumber      nextListener;   /* id of next listener, or INVALID_PROC_NUMBER */
     290                 :             :         QueuePosition pos;                      /* backend has read queue up to here */
     291                 :             :         bool            wakeupPending;  /* signal sent to backend, not yet processed */
     292                 :             :         bool            isAdvancing;    /* backend is advancing its position */
     293                 :             : } QueueBackendStatus;
     294                 :             : 
     295                 :             : /*
     296                 :             :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
     297                 :             :  *
     298                 :             :  * The AsyncQueueControl structure is protected by the NotifyQueueLock and
     299                 :             :  * NotifyQueueTailLock.
     300                 :             :  *
     301                 :             :  * When holding NotifyQueueLock in SHARED mode, backends may only inspect
     302                 :             :  * their own entries as well as the head and tail pointers. Consequently we
     303                 :             :  * can allow a backend to update its own record while holding only SHARED lock
     304                 :             :  * (since no other backend will inspect it).
     305                 :             :  *
     306                 :             :  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
     307                 :             :  * entries of other backends and also change the head pointer. They can
     308                 :             :  * also advance other backends' queue positions, unless the other backend
     309                 :             :  * has isAdvancing set (i.e., is in process of doing that itself).
     310                 :             :  *
     311                 :             :  * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
     312                 :             :  * mode, backends can change the tail pointers.
     313                 :             :  *
     314                 :             :  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
     315                 :             :  * the control lock for the pg_notify SLRU buffers.
     316                 :             :  * In order to avoid deadlocks, whenever we need multiple locks, we first get
     317                 :             :  * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
     318                 :             :  * globalChannelTable partition locks.
     319                 :             :  *
     320                 :             :  * Each backend uses the backend[] array entry with index equal to its
     321                 :             :  * ProcNumber.  We rely on this to make SendProcSignal fast.
     322                 :             :  *
     323                 :             :  * The backend[] array entries for actively-listening backends are threaded
     324                 :             :  * together using firstListener and the nextListener links, so that we can
     325                 :             :  * scan them without having to iterate over inactive entries.  We keep this
     326                 :             :  * list in order by ProcNumber so that the scan is cache-friendly when there
     327                 :             :  * are many active entries.
     328                 :             :  */
     329                 :             : typedef struct AsyncQueueControl
     330                 :             : {
     331                 :             :         QueuePosition head;                     /* head points to the next free location */
     332                 :             :         QueuePosition tail;                     /* tail must be <= the queue position of every
     333                 :             :                                                                  * listening backend */
     334                 :             :         int64           stopPage;               /* oldest unrecycled page; must be <=
     335                 :             :                                                                  * tail.page */
     336                 :             :         ProcNumber      firstListener;  /* id of first listener, or
     337                 :             :                                                                  * INVALID_PROC_NUMBER */
     338                 :             :         TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
     339                 :             :         dsa_handle      globalChannelTableDSA;  /* global channel table's DSA handle */
     340                 :             :         dshash_table_handle globalChannelTableDSH;      /* and its dshash handle */
     341                 :             :         /* Array with room for MaxBackends entries: */
     342                 :             :         QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
     343                 :             : } AsyncQueueControl;
     344                 :             : 
     345                 :             : static AsyncQueueControl *asyncQueueControl;
     346                 :             : 
     347                 :             : #define QUEUE_HEAD                                      (asyncQueueControl->head)
     348                 :             : #define QUEUE_TAIL                                      (asyncQueueControl->tail)
     349                 :             : #define QUEUE_STOP_PAGE                         (asyncQueueControl->stopPage)
     350                 :             : #define QUEUE_FIRST_LISTENER            (asyncQueueControl->firstListener)
     351                 :             : #define QUEUE_BACKEND_PID(i)            (asyncQueueControl->backend[i].pid)
     352                 :             : #define QUEUE_BACKEND_DBOID(i)          (asyncQueueControl->backend[i].dboid)
     353                 :             : #define QUEUE_NEXT_LISTENER(i)          (asyncQueueControl->backend[i].nextListener)
     354                 :             : #define QUEUE_BACKEND_POS(i)            (asyncQueueControl->backend[i].pos)
     355                 :             : #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
     356                 :             : #define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
     357                 :             : 
     358                 :             : /*
     359                 :             :  * The SLRU buffer area through which we access the notification queue
     360                 :             :  */
     361                 :             : static SlruCtlData NotifyCtlData;
     362                 :             : 
     363                 :             : #define NotifyCtl                                       (&NotifyCtlData)
     364                 :             : #define QUEUE_PAGESIZE                          BLCKSZ
     365                 :             : 
     366                 :             : #define QUEUE_FULL_WARN_INTERVAL        5000    /* warn at most once every 5s */
     367                 :             : 
     368                 :             : /*
     369                 :             :  * Global channel table definitions
     370                 :             :  *
     371                 :             :  * This hash table maps (database OID, channel name) keys to arrays of
     372                 :             :  * ProcNumbers representing the backends listening or about to listen
     373                 :             :  * on each channel.  The "listening" flags allow us to create hash table
     374                 :             :  * entries pre-commit and not have to assume that creating them post-commit
     375                 :             :  * will succeed.
     376                 :             :  */
     377                 :             : #define INITIAL_LISTENERS_ARRAY_SIZE 4
     378                 :             : 
     379                 :             : typedef struct GlobalChannelKey
     380                 :             : {
     381                 :             :         Oid                     dboid;
     382                 :             :         char            channel[NAMEDATALEN];
     383                 :             : } GlobalChannelKey;
     384                 :             : 
     385                 :             : typedef struct ListenerEntry
     386                 :             : {
     387                 :             :         ProcNumber      procNo;                 /* listener's ProcNumber */
     388                 :             :         bool            listening;              /* true if committed listener */
     389                 :             : } ListenerEntry;
     390                 :             : 
     391                 :             : typedef struct GlobalChannelEntry
     392                 :             : {
     393                 :             :         GlobalChannelKey key;           /* hash key */
     394                 :             :         dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
     395                 :             :         int                     numListeners;   /* Number of listeners currently stored */
     396                 :             :         int                     allocatedListeners; /* Allocated size of array */
     397                 :             : } GlobalChannelEntry;
     398                 :             : 
     399                 :             : static dshash_table *globalChannelTable = NULL;
     400                 :             : static dsa_area *globalChannelDSA = NULL;
     401                 :             : 
     402                 :             : /*
     403                 :             :  * localChannelTable caches the channel names this backend is listening on
     404                 :             :  * (including those we have staged to be listened on, but not yet committed).
     405                 :             :  * Used by IsListeningOn() for fast lookups when reading notifications.
     406                 :             :  */
     407                 :             : static HTAB *localChannelTable = NULL;
     408                 :             : 
     409                 :             : /* We test this condition to detect that we're not listening at all */
     410                 :             : #define LocalChannelTableIsEmpty() \
     411                 :             :         (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
     412                 :             : 
     413                 :             : /*
     414                 :             :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
     415                 :             :  * all actions requested in the current transaction.  As explained above,
     416                 :             :  * we don't actually change listen state until we reach transaction commit.
     417                 :             :  *
     418                 :             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     419                 :             :  * subtransaction has its own list in its own CurTransactionContext, but
     420                 :             :  * successful subtransactions attach their lists to their parent's list.
     421                 :             :  * Failed subtransactions simply discard their lists.
     422                 :             :  */
     423                 :             : typedef enum
     424                 :             : {
     425                 :             :         LISTEN_LISTEN,
     426                 :             :         LISTEN_UNLISTEN,
     427                 :             :         LISTEN_UNLISTEN_ALL,
     428                 :             : } ListenActionKind;
     429                 :             : 
     430                 :             : typedef struct
     431                 :             : {
     432                 :             :         ListenActionKind action;
     433                 :             :         char            channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
     434                 :             : } ListenAction;
     435                 :             : 
     436                 :             : typedef struct ActionList
     437                 :             : {
     438                 :             :         int                     nestingLevel;   /* current transaction nesting depth */
     439                 :             :         List       *actions;            /* list of ListenAction structs */
     440                 :             :         struct ActionList *upper;       /* details for upper transaction levels */
     441                 :             : } ActionList;
     442                 :             : 
     443                 :             : static ActionList *pendingActions = NULL;
     444                 :             : 
     445                 :             : /*
     446                 :             :  * Hash table recording the final listen/unlisten intent per channel for
     447                 :             :  * the current transaction.  Key is channel name, value is PENDING_LISTEN or
     448                 :             :  * PENDING_UNLISTEN.  This keeps critical commit/abort processing to one step
     449                 :             :  * per channel instead of replaying every action.  This is built from the
     450                 :             :  * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
     451                 :             :  * AtAbort_Notify.
     452                 :             :  */
     453                 :             : typedef enum
     454                 :             : {
     455                 :             :         PENDING_LISTEN,
     456                 :             :         PENDING_UNLISTEN,
     457                 :             : } PendingListenAction;
     458                 :             : 
     459                 :             : typedef struct PendingListenEntry
     460                 :             : {
     461                 :             :         char            channel[NAMEDATALEN];   /* hash key */
     462                 :             :         PendingListenAction action; /* which action should we perform? */
     463                 :             : } PendingListenEntry;
     464                 :             : 
     465                 :             : static HTAB *pendingListenActions = NULL;
     466                 :             : 
     467                 :             : /*
     468                 :             :  * State for outbound notifies consists of a list of all channels+payloads
     469                 :             :  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
     470                 :             :  * until and unless the transaction commits.  pendingNotifies is NULL if no
     471                 :             :  * NOTIFYs have been done in the current (sub) transaction.
     472                 :             :  *
     473                 :             :  * We discard duplicate notify events issued in the same transaction.
     474                 :             :  * Hence, in addition to the list proper (which we need to track the order
     475                 :             :  * of the events, since we guarantee to deliver them in order), we build a
     476                 :             :  * hash table which we can probe to detect duplicates.  Since building the
     477                 :             :  * hash table is somewhat expensive, we do so only once we have at least
     478                 :             :  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
     479                 :             :  * before that we just scan the events linearly.
     480                 :             :  *
     481                 :             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     482                 :             :  * subtransaction has its own list in its own CurTransactionContext, but
     483                 :             :  * successful subtransactions add their entries to their parent's list.
     484                 :             :  * Failed subtransactions simply discard their lists.  Since these lists
     485                 :             :  * are independent, there may be notify events in a subtransaction's list
     486                 :             :  * that duplicate events in some ancestor (sub) transaction; we get rid of
     487                 :             :  * the dups when merging the subtransaction's list into its parent's.
     488                 :             :  *
     489                 :             :  * Note: the action and notify lists do not interact within a transaction.
     490                 :             :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
     491                 :             :  * condition name, it will get a self-notify at commit.  This is a bit odd
     492                 :             :  * but is consistent with our historical behavior.
     493                 :             :  */
     494                 :             : typedef struct Notification
     495                 :             : {
     496                 :             :         uint16          channel_len;    /* length of channel-name string */
     497                 :             :         uint16          payload_len;    /* length of payload string */
     498                 :             :         /* null-terminated channel name, then null-terminated payload follow */
     499                 :             :         char            data[FLEXIBLE_ARRAY_MEMBER];
     500                 :             : } Notification;
     501                 :             : 
     502                 :             : typedef struct NotificationList
     503                 :             : {
     504                 :             :         int                     nestingLevel;   /* current transaction nesting depth */
     505                 :             :         List       *events;                     /* list of Notification structs */
     506                 :             :         HTAB       *hashtab;            /* hash of NotificationHash structs, or NULL */
     507                 :             :         List       *uniqueChannelNames; /* unique channel names being notified */
     508                 :             :         HTAB       *uniqueChannelHash;  /* hash of unique channel names, or NULL */
     509                 :             :         struct NotificationList *upper; /* details for upper transaction levels */
     510                 :             : } NotificationList;
     511                 :             : 
     512                 :             : #define MIN_HASHABLE_NOTIFIES 16        /* threshold to build hashtab */
     513                 :             : 
     514                 :             : struct NotificationHash
     515                 :             : {
     516                 :             :         Notification *event;            /* => the actual Notification struct */
     517                 :             : };
     518                 :             : 
     519                 :             : static NotificationList *pendingNotifies = NULL;
     520                 :             : 
     521                 :             : /*
     522                 :             :  * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
     523                 :             :  * (both just carry the channel name, with no payload).
     524                 :             :  */
     525                 :             : typedef struct ChannelName
     526                 :             : {
     527                 :             :         char            channel[NAMEDATALEN];   /* hash key */
     528                 :             : } ChannelName;
     529                 :             : 
     530                 :             : /*
     531                 :             :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
     532                 :             :  * called from inside a signal handler. That just sets the
     533                 :             :  * notifyInterruptPending flag and sets the process
     534                 :             :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
     535                 :             :  * actually deal with the interrupt.
     536                 :             :  */
     537                 :             : volatile sig_atomic_t notifyInterruptPending = false;
     538                 :             : 
     539                 :             : /* True if we've registered an on_shmem_exit cleanup */
     540                 :             : static bool unlistenExitRegistered = false;
     541                 :             : 
     542                 :             : /* True if we're currently registered as a listener in asyncQueueControl */
     543                 :             : static bool amRegisteredListener = false;
     544                 :             : 
     545                 :             : /*
     546                 :             :  * Queue head positions for direct advancement.
     547                 :             :  * These are captured during PreCommit_Notify while holding the heavyweight
     548                 :             :  * lock on database 0, ensuring no other backend can insert notifications
     549                 :             :  * between them.  SignalBackends uses these to advance idle backends.
     550                 :             :  */
     551                 :             : static QueuePosition queueHeadBeforeWrite;
     552                 :             : static QueuePosition queueHeadAfterWrite;
     553                 :             : 
     554                 :             : /*
     555                 :             :  * Workspace arrays for SignalBackends.  These are preallocated in
     556                 :             :  * PreCommit_Notify to avoid needing memory allocation after committing to
     557                 :             :  * clog.
     558                 :             :  */
     559                 :             : static int32 *signalPids = NULL;
     560                 :             : static ProcNumber *signalProcnos = NULL;
     561                 :             : 
     562                 :             : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
     563                 :             : static bool tryAdvanceTail = false;
     564                 :             : 
     565                 :             : /* GUC parameters */
     566                 :             : bool            Trace_notify = false;
     567                 :             : 
     568                 :             : /* For 8 KB pages this gives 8 GB of disk space */
     569                 :             : int                     max_notify_queue_pages = 1048576;
     570                 :             : 
     571                 :             : /* local function prototypes */
     572                 :             : static inline int64 asyncQueuePageDiff(int64 p, int64 q);
     573                 :             : static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
     574                 :             : static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
     575                 :             :                                                                                 const char *channel);
     576                 :             : static dshash_hash globalChannelTableHash(const void *key, size_t size,
     577                 :             :                                                                                   void *arg);
     578                 :             : static void initGlobalChannelTable(void);
     579                 :             : static void initLocalChannelTable(void);
     580                 :             : static void queue_listen(ListenActionKind action, const char *channel);
     581                 :             : static void Async_UnlistenOnExit(int code, Datum arg);
     582                 :             : static void BecomeRegisteredListener(void);
     583                 :             : static void PrepareTableEntriesForListen(const char *channel);
     584                 :             : static void PrepareTableEntriesForUnlisten(const char *channel);
     585                 :             : static void PrepareTableEntriesForUnlistenAll(void);
     586                 :             : static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
     587                 :             :                                                                           ListenerEntry *listeners,
     588                 :             :                                                                           int idx);
     589                 :             : static void ApplyPendingListenActions(bool isCommit);
     590                 :             : static void CleanupListenersOnExit(void);
     591                 :             : static bool IsListeningOn(const char *channel);
     592                 :             : static void asyncQueueUnregister(void);
     593                 :             : static bool asyncQueueIsFull(void);
     594                 :             : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
     595                 :             : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
     596                 :             : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
     597                 :             : static double asyncQueueUsage(void);
     598                 :             : static void asyncQueueFillWarning(void);
     599                 :             : static void SignalBackends(void);
     600                 :             : static void asyncQueueReadAllNotifications(void);
     601                 :             : static bool asyncQueueProcessPageEntries(QueuePosition *current,
     602                 :             :                                                                                  QueuePosition stop,
     603                 :             :                                                                                  Snapshot snapshot);
     604                 :             : static void asyncQueueAdvanceTail(void);
     605                 :             : static void ProcessIncomingNotify(bool flush);
     606                 :             : static bool AsyncExistsPendingNotify(Notification *n);
     607                 :             : static void AddEventToPendingNotifies(Notification *n);
     608                 :             : static uint32 notification_hash(const void *key, Size keysize);
     609                 :             : static int      notification_match(const void *key1, const void *key2, Size keysize);
     610                 :             : static void ClearPendingActionsAndNotifies(void);
     611                 :             : 
     612                 :             : /*
     613                 :             :  * Compute the difference between two queue page numbers.
     614                 :             :  * Previously this function accounted for a wraparound.
     615                 :             :  */
     616                 :             : static inline int64
     617                 :           0 : asyncQueuePageDiff(int64 p, int64 q)
     618                 :             : {
     619                 :           0 :         return p - q;
     620                 :             : }
     621                 :             : 
     622                 :             : /*
     623                 :             :  * Determines whether p precedes q.
     624                 :             :  * Previously this function accounted for a wraparound.
     625                 :             :  */
     626                 :             : static inline bool
     627                 :           1 : asyncQueuePagePrecedes(int64 p, int64 q)
     628                 :             : {
     629                 :           1 :         return p < q;
     630                 :             : }
     631                 :             : 
     632                 :             : /*
     633                 :             :  * GlobalChannelKeyInit
     634                 :             :  *              Prepare a global channel table key for hashing.
     635                 :             :  */
     636                 :             : static inline void
     637                 :          10 : GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
     638                 :             : {
     639                 :          10 :         memset(key, 0, sizeof(GlobalChannelKey));
     640                 :          10 :         key->dboid = dboid;
     641                 :          10 :         strlcpy(key->channel, channel, NAMEDATALEN);
     642                 :          10 : }
     643                 :             : 
     644                 :             : /*
     645                 :             :  * globalChannelTableHash
     646                 :             :  *              Hash function for global channel table keys.
     647                 :             :  */
     648                 :             : static dshash_hash
     649                 :          10 : globalChannelTableHash(const void *key, size_t size, void *arg)
     650                 :             : {
     651                 :          10 :         const GlobalChannelKey *k = (const GlobalChannelKey *) key;
     652                 :          10 :         dshash_hash h;
     653                 :             : 
     654                 :          10 :         h = DatumGetUInt32(hash_uint32(k->dboid));
     655                 :          20 :         h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
     656                 :          10 :                                                                  strnlen(k->channel, NAMEDATALEN)));
     657                 :             : 
     658                 :          20 :         return h;
     659                 :          10 : }
     660                 :             : 
     661                 :             : /* parameters for the global channel table */
     662                 :             : static const dshash_parameters globalChannelTableDSHParams = {
     663                 :             :         sizeof(GlobalChannelKey),
     664                 :             :         sizeof(GlobalChannelEntry),
     665                 :             :         dshash_memcmp,
     666                 :             :         globalChannelTableHash,
     667                 :             :         dshash_memcpy,
     668                 :             :         LWTRANCHE_NOTIFY_CHANNEL_HASH
     669                 :             : };
     670                 :             : 
     671                 :             : /*
     672                 :             :  * initGlobalChannelTable
     673                 :             :  *              Lazy initialization of the global channel table.
     674                 :             :  */
     675                 :             : static void
     676                 :           9 : initGlobalChannelTable(void)
     677                 :             : {
     678                 :           9 :         MemoryContext oldcontext;
     679                 :             : 
     680                 :             :         /* Quick exit if we already did this */
     681   [ +  +  +  + ]:           9 :         if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID &&
     682                 :           8 :                 globalChannelTable != NULL)
     683                 :           7 :                 return;
     684                 :             : 
     685                 :             :         /* Otherwise, use a lock to ensure only one process creates the table */
     686                 :           2 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
     687                 :             : 
     688                 :             :         /* Be sure any local memory allocated by DSA routines is persistent */
     689                 :           2 :         oldcontext = MemoryContextSwitchTo(TopMemoryContext);
     690                 :             : 
     691         [ +  + ]:           2 :         if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID)
     692                 :             :         {
     693                 :             :                 /* Initialize dynamic shared hash table for global channels */
     694                 :           1 :                 globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
     695                 :           1 :                 dsa_pin(globalChannelDSA);
     696                 :           1 :                 dsa_pin_mapping(globalChannelDSA);
     697                 :           1 :                 globalChannelTable = dshash_create(globalChannelDSA,
     698                 :             :                                                                                    &globalChannelTableDSHParams,
     699                 :             :                                                                                    NULL);
     700                 :             : 
     701                 :             :                 /* Store handles in shared memory for other backends to use */
     702                 :           1 :                 asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA);
     703                 :           1 :                 asyncQueueControl->globalChannelTableDSH =
     704                 :           1 :                         dshash_get_hash_table_handle(globalChannelTable);
     705                 :           1 :         }
     706         [ -  + ]:           1 :         else if (!globalChannelTable)
     707                 :             :         {
     708                 :             :                 /* Attach to existing dynamic shared hash table */
     709                 :           1 :                 globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA);
     710                 :           1 :                 dsa_pin_mapping(globalChannelDSA);
     711                 :           2 :                 globalChannelTable = dshash_attach(globalChannelDSA,
     712                 :             :                                                                                    &globalChannelTableDSHParams,
     713                 :           1 :                                                                                    asyncQueueControl->globalChannelTableDSH,
     714                 :             :                                                                                    NULL);
     715                 :           1 :         }
     716                 :             : 
     717                 :           2 :         MemoryContextSwitchTo(oldcontext);
     718                 :           2 :         LWLockRelease(NotifyQueueLock);
     719         [ -  + ]:           9 : }
     720                 :             : 
     721                 :             : /*
     722                 :             :  * initLocalChannelTable
     723                 :             :  *              Lazy initialization of the local channel table.
     724                 :             :  *              Once created, this table lasts for the life of the session.
     725                 :             :  */
     726                 :             : static void
     727                 :           5 : initLocalChannelTable(void)
     728                 :             : {
     729                 :           5 :         HASHCTL         hash_ctl;
     730                 :             : 
     731                 :             :         /* Quick exit if we already did this */
     732         [ +  + ]:           5 :         if (localChannelTable != NULL)
     733                 :           3 :                 return;
     734                 :             : 
     735                 :             :         /* Initialize local hash table for this backend's listened channels */
     736                 :           2 :         hash_ctl.keysize = NAMEDATALEN;
     737                 :           2 :         hash_ctl.entrysize = sizeof(ChannelName);
     738                 :             : 
     739                 :           2 :         localChannelTable =
     740                 :           2 :                 hash_create("Local Listen Channels",
     741                 :             :                                         64,
     742                 :             :                                         &hash_ctl,
     743                 :             :                                         HASH_ELEM | HASH_STRINGS);
     744         [ -  + ]:           5 : }
     745                 :             : 
     746                 :             : /*
     747                 :             :  * initPendingListenActions
     748                 :             :  *              Lazy initialization of the pending listen actions hash table.
     749                 :             :  *              This is allocated in CurTransactionContext during PreCommit_Notify,
     750                 :             :  *              and destroyed at transaction end.
     751                 :             :  */
     752                 :             : static void
     753                 :           5 : initPendingListenActions(void)
     754                 :             : {
     755                 :           5 :         HASHCTL         hash_ctl;
     756                 :             : 
     757         [ -  + ]:           5 :         if (pendingListenActions != NULL)
     758                 :           0 :                 return;
     759                 :             : 
     760                 :           5 :         hash_ctl.keysize = NAMEDATALEN;
     761                 :           5 :         hash_ctl.entrysize = sizeof(PendingListenEntry);
     762                 :           5 :         hash_ctl.hcxt = CurTransactionContext;
     763                 :             : 
     764                 :           5 :         pendingListenActions =
     765                 :           5 :                 hash_create("Pending Listen Actions",
     766                 :           5 :                                         list_length(pendingActions->actions),
     767                 :             :                                         &hash_ctl,
     768                 :             :                                         HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
     769         [ -  + ]:           5 : }
     770                 :             : 
     771                 :             : /*
     772                 :             :  * Report space needed for our shared memory area
     773                 :             :  */
     774                 :             : Size
     775                 :           9 : AsyncShmemSize(void)
     776                 :             : {
     777                 :           9 :         Size            size;
     778                 :             : 
     779                 :             :         /* This had better match AsyncShmemInit */
     780                 :           9 :         size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
     781                 :           9 :         size = add_size(size, offsetof(AsyncQueueControl, backend));
     782                 :             : 
     783                 :           9 :         size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
     784                 :             : 
     785                 :          18 :         return size;
     786                 :           9 : }
     787                 :             : 
     788                 :             : /*
     789                 :             :  * Initialize our shared memory area
     790                 :             :  */
     791                 :             : void
     792                 :           6 : AsyncShmemInit(void)
     793                 :             : {
     794                 :           6 :         bool            found;
     795                 :           6 :         Size            size;
     796                 :             : 
     797                 :             :         /*
     798                 :             :          * Create or attach to the AsyncQueueControl structure.
     799                 :             :          */
     800                 :           6 :         size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
     801                 :           6 :         size = add_size(size, offsetof(AsyncQueueControl, backend));
     802                 :             : 
     803                 :           6 :         asyncQueueControl = (AsyncQueueControl *)
     804                 :           6 :                 ShmemInitStruct("Async Queue Control", size, &found);
     805                 :             : 
     806         [ -  + ]:           6 :         if (!found)
     807                 :             :         {
     808                 :             :                 /* First time through, so initialize it */
     809                 :           6 :                 SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
     810                 :           6 :                 SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
     811                 :           6 :                 QUEUE_STOP_PAGE = 0;
     812                 :           6 :                 QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
     813                 :           6 :                 asyncQueueControl->lastQueueFillWarn = 0;
     814                 :           6 :                 asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID;
     815                 :           6 :                 asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID;
     816         [ +  + ]:         812 :                 for (int i = 0; i < MaxBackends; i++)
     817                 :             :                 {
     818                 :         806 :                         QUEUE_BACKEND_PID(i) = InvalidPid;
     819                 :         806 :                         QUEUE_BACKEND_DBOID(i) = InvalidOid;
     820                 :         806 :                         QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
     821                 :         806 :                         SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
     822                 :         806 :                         QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
     823                 :         806 :                         QUEUE_BACKEND_IS_ADVANCING(i) = false;
     824                 :         806 :                 }
     825                 :           6 :         }
     826                 :             : 
     827                 :             :         /*
     828                 :             :          * Set up SLRU management of the pg_notify data. Note that long segment
     829                 :             :          * names are used in order to avoid wraparound.
     830                 :             :          */
     831                 :           6 :         NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
     832                 :           6 :         SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
     833                 :             :                                   "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
     834                 :             :                                   SYNC_HANDLER_NONE, true);
     835                 :             : 
     836         [ -  + ]:           6 :         if (!found)
     837                 :             :         {
     838                 :             :                 /*
     839                 :             :                  * During start or reboot, clean out the pg_notify directory.
     840                 :             :                  */
     841                 :           6 :                 (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
     842                 :           6 :         }
     843                 :           6 : }
     844                 :             : 
     845                 :             : 
     846                 :             : /*
     847                 :             :  * pg_notify -
     848                 :             :  *        SQL function to send a notification event
     849                 :             :  */
     850                 :             : Datum
     851                 :           6 : pg_notify(PG_FUNCTION_ARGS)
     852                 :             : {
     853                 :           6 :         const char *channel;
     854                 :           6 :         const char *payload;
     855                 :             : 
     856         [ +  + ]:           6 :         if (PG_ARGISNULL(0))
     857                 :           1 :                 channel = "";
     858                 :             :         else
     859                 :           5 :                 channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
     860                 :             : 
     861         [ +  + ]:           6 :         if (PG_ARGISNULL(1))
     862                 :           1 :                 payload = "";
     863                 :             :         else
     864                 :           5 :                 payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
     865                 :             : 
     866                 :             :         /* For NOTIFY as a statement, this is checked in ProcessUtility */
     867                 :           6 :         PreventCommandDuringRecovery("NOTIFY");
     868                 :             : 
     869                 :           6 :         Async_Notify(channel, payload);
     870                 :             : 
     871                 :           6 :         PG_RETURN_VOID();
     872                 :           6 : }
     873                 :             : 
     874                 :             : 
     875                 :             : /*
     876                 :             :  * Async_Notify
     877                 :             :  *
     878                 :             :  *              This is executed by the SQL notify command.
     879                 :             :  *
     880                 :             :  *              Adds the message to the list of pending notifies.
     881                 :             :  *              Actual notification happens during transaction commit.
     882                 :             :  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     883                 :             :  */
     884                 :             : void
     885                 :           7 : Async_Notify(const char *channel, const char *payload)
     886                 :             : {
     887                 :           7 :         int                     my_level = GetCurrentTransactionNestLevel();
     888                 :           7 :         size_t          channel_len;
     889                 :           7 :         size_t          payload_len;
     890                 :           7 :         Notification *n;
     891                 :           7 :         MemoryContext oldcontext;
     892                 :             : 
     893         [ +  - ]:           7 :         if (IsParallelWorker())
     894   [ #  #  #  # ]:           0 :                 elog(ERROR, "cannot send notifications from a parallel worker");
     895                 :             : 
     896         [ +  - ]:           7 :         if (Trace_notify)
     897   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "Async_Notify(%s)", channel);
     898                 :             : 
     899         [ +  - ]:           7 :         channel_len = channel ? strlen(channel) : 0;
     900         [ +  + ]:           7 :         payload_len = payload ? strlen(payload) : 0;
     901                 :             : 
     902                 :             :         /* a channel name must be specified */
     903         [ +  + ]:           7 :         if (channel_len == 0)
     904   [ +  -  +  - ]:           2 :                 ereport(ERROR,
     905                 :             :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     906                 :             :                                  errmsg("channel name cannot be empty")));
     907                 :             : 
     908                 :             :         /* enforce length limits */
     909         [ +  + ]:           5 :         if (channel_len >= NAMEDATALEN)
     910   [ +  -  +  - ]:           1 :                 ereport(ERROR,
     911                 :             :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     912                 :             :                                  errmsg("channel name too long")));
     913                 :             : 
     914         [ +  - ]:           4 :         if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
     915   [ #  #  #  # ]:           0 :                 ereport(ERROR,
     916                 :             :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     917                 :             :                                  errmsg("payload string too long")));
     918                 :             : 
     919                 :             :         /*
     920                 :             :          * We must construct the Notification entry, even if we end up not using
     921                 :             :          * it, in order to compare it cheaply to existing list entries.
     922                 :             :          *
     923                 :             :          * The notification list needs to live until end of transaction, so store
     924                 :             :          * it in the transaction context.
     925                 :             :          */
     926                 :           4 :         oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     927                 :             : 
     928                 :           8 :         n = (Notification *) palloc(offsetof(Notification, data) +
     929                 :           8 :                                                                 channel_len + payload_len + 2);
     930                 :           4 :         n->channel_len = channel_len;
     931                 :           4 :         n->payload_len = payload_len;
     932                 :           4 :         strcpy(n->data, channel);
     933         [ +  + ]:           4 :         if (payload)
     934                 :           3 :                 strcpy(n->data + channel_len + 1, payload);
     935                 :             :         else
     936                 :           1 :                 n->data[channel_len + 1] = '\0';
     937                 :             : 
     938   [ -  +  #  # ]:           4 :         if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
     939                 :             :         {
     940                 :           4 :                 NotificationList *notifies;
     941                 :             : 
     942                 :             :                 /*
     943                 :             :                  * First notify event in current (sub)xact. Note that we allocate the
     944                 :             :                  * NotificationList in TopTransactionContext; the nestingLevel might
     945                 :             :                  * get changed later by AtSubCommit_Notify.
     946                 :             :                  */
     947                 :           4 :                 notifies = (NotificationList *)
     948                 :           4 :                         MemoryContextAlloc(TopTransactionContext,
     949                 :             :                                                            sizeof(NotificationList));
     950                 :           4 :                 notifies->nestingLevel = my_level;
     951                 :           4 :                 notifies->events = list_make1(n);
     952                 :             :                 /* We certainly don't need a hashtable yet */
     953                 :           4 :                 notifies->hashtab = NULL;
     954                 :             :                 /* We won't build uniqueChannelNames/Hash till later, either */
     955                 :           4 :                 notifies->uniqueChannelNames = NIL;
     956                 :           4 :                 notifies->uniqueChannelHash = NULL;
     957                 :           4 :                 notifies->upper = pendingNotifies;
     958                 :           4 :                 pendingNotifies = notifies;
     959                 :           4 :         }
     960                 :             :         else
     961                 :             :         {
     962                 :             :                 /* Now check for duplicates */
     963         [ #  # ]:           0 :                 if (AsyncExistsPendingNotify(n))
     964                 :             :                 {
     965                 :             :                         /* It's a dup, so forget it */
     966                 :           0 :                         pfree(n);
     967                 :           0 :                         MemoryContextSwitchTo(oldcontext);
     968                 :           0 :                         return;
     969                 :             :                 }
     970                 :             : 
     971                 :             :                 /* Append more events to existing list */
     972                 :           0 :                 AddEventToPendingNotifies(n);
     973                 :             :         }
     974                 :             : 
     975                 :           4 :         MemoryContextSwitchTo(oldcontext);
     976         [ -  + ]:           4 : }
     977                 :             : 
     978                 :             : /*
     979                 :             :  * queue_listen
     980                 :             :  *              Common code for listen, unlisten, unlisten all commands.
     981                 :             :  *
     982                 :             :  *              Adds the request to the list of pending actions.
     983                 :             :  *              Actual update of localChannelTable and globalChannelTable happens during
     984                 :             :  *              PreCommit_Notify, with staged changes committed in AtCommit_Notify.
     985                 :             :  */
     986                 :             : static void
     987                 :           5 : queue_listen(ListenActionKind action, const char *channel)
     988                 :             : {
     989                 :           5 :         MemoryContext oldcontext;
     990                 :           5 :         ListenAction *actrec;
     991                 :           5 :         int                     my_level = GetCurrentTransactionNestLevel();
     992                 :             : 
     993                 :             :         /*
     994                 :             :          * Unlike Async_Notify, we don't try to collapse out duplicates here. We
     995                 :             :          * keep the ordered list to preserve interactions like UNLISTEN ALL; the
     996                 :             :          * final per-channel intent is computed during PreCommit_Notify.
     997                 :             :          */
     998                 :           5 :         oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     999                 :             : 
    1000                 :             :         /* space for terminating null is included in sizeof(ListenAction) */
    1001                 :          10 :         actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
    1002                 :          10 :                                                                          strlen(channel) + 1);
    1003                 :           5 :         actrec->action = action;
    1004                 :           5 :         strcpy(actrec->channel, channel);
    1005                 :             : 
    1006   [ -  +  #  # ]:           5 :         if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
    1007                 :             :         {
    1008                 :           5 :                 ActionList *actions;
    1009                 :             : 
    1010                 :             :                 /*
    1011                 :             :                  * First action in current sub(xact). Note that we allocate the
    1012                 :             :                  * ActionList in TopTransactionContext; the nestingLevel might get
    1013                 :             :                  * changed later by AtSubCommit_Notify.
    1014                 :             :                  */
    1015                 :           5 :                 actions = (ActionList *)
    1016                 :           5 :                         MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
    1017                 :           5 :                 actions->nestingLevel = my_level;
    1018                 :           5 :                 actions->actions = list_make1(actrec);
    1019                 :           5 :                 actions->upper = pendingActions;
    1020                 :           5 :                 pendingActions = actions;
    1021                 :           5 :         }
    1022                 :             :         else
    1023                 :           0 :                 pendingActions->actions = lappend(pendingActions->actions, actrec);
    1024                 :             : 
    1025                 :           5 :         MemoryContextSwitchTo(oldcontext);
    1026                 :           5 : }
    1027                 :             : 
    1028                 :             : /*
    1029                 :             :  * Async_Listen
    1030                 :             :  *
    1031                 :             :  *              This is executed by the SQL listen command.
    1032                 :             :  */
    1033                 :             : void
    1034                 :           2 : Async_Listen(const char *channel)
    1035                 :             : {
    1036         [ +  - ]:           2 :         if (Trace_notify)
    1037   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
    1038                 :             : 
    1039                 :           2 :         queue_listen(LISTEN_LISTEN, channel);
    1040                 :           2 : }
    1041                 :             : 
    1042                 :             : /*
    1043                 :             :  * Async_Unlisten
    1044                 :             :  *
    1045                 :             :  *              This is executed by the SQL unlisten command.
    1046                 :             :  */
    1047                 :             : void
    1048                 :           1 : Async_Unlisten(const char *channel)
    1049                 :             : {
    1050         [ +  - ]:           1 :         if (Trace_notify)
    1051   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
    1052                 :             : 
    1053                 :             :         /* If we couldn't possibly be listening, no need to queue anything */
    1054   [ +  -  +  - ]:           1 :         if (pendingActions == NULL && !unlistenExitRegistered)
    1055                 :           0 :                 return;
    1056                 :             : 
    1057                 :           1 :         queue_listen(LISTEN_UNLISTEN, channel);
    1058                 :           1 : }
    1059                 :             : 
    1060                 :             : /*
    1061                 :             :  * Async_UnlistenAll
    1062                 :             :  *
    1063                 :             :  *              This is invoked by UNLISTEN * command, and also at backend exit.
    1064                 :             :  */
    1065                 :             : void
    1066                 :           2 : Async_UnlistenAll(void)
    1067                 :             : {
    1068         [ +  - ]:           2 :         if (Trace_notify)
    1069   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
    1070                 :             : 
    1071                 :             :         /* If we couldn't possibly be listening, no need to queue anything */
    1072   [ +  -  +  - ]:           2 :         if (pendingActions == NULL && !unlistenExitRegistered)
    1073                 :           0 :                 return;
    1074                 :             : 
    1075                 :           2 :         queue_listen(LISTEN_UNLISTEN_ALL, "");
    1076                 :           2 : }
    1077                 :             : 
    1078                 :             : /*
    1079                 :             :  * SQL function: return a set of the channel names this backend is actively
    1080                 :             :  * listening to.
    1081                 :             :  *
    1082                 :             :  * Note: this coding relies on the fact that the localChannelTable cannot
    1083                 :             :  * change within a transaction.
    1084                 :             :  */
    1085                 :             : Datum
    1086                 :           3 : pg_listening_channels(PG_FUNCTION_ARGS)
    1087                 :             : {
    1088                 :           3 :         FuncCallContext *funcctx;
    1089                 :           3 :         HASH_SEQ_STATUS *status;
    1090                 :             : 
    1091                 :             :         /* stuff done only on the first call of the function */
    1092         [ +  + ]:           3 :         if (SRF_IS_FIRSTCALL())
    1093                 :             :         {
    1094                 :             :                 /* create a function context for cross-call persistence */
    1095                 :           2 :                 funcctx = SRF_FIRSTCALL_INIT();
    1096                 :             : 
    1097                 :             :                 /* Initialize hash table iteration if we have any channels */
    1098         [ +  - ]:           2 :                 if (localChannelTable != NULL)
    1099                 :             :                 {
    1100                 :           2 :                         MemoryContext oldcontext;
    1101                 :             : 
    1102                 :           2 :                         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1103                 :           2 :                         status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
    1104                 :           2 :                         hash_seq_init(status, localChannelTable);
    1105                 :           2 :                         funcctx->user_fctx = status;
    1106                 :           2 :                         MemoryContextSwitchTo(oldcontext);
    1107                 :           2 :                 }
    1108                 :             :                 else
    1109                 :             :                 {
    1110                 :           0 :                         funcctx->user_fctx = NULL;
    1111                 :             :                 }
    1112                 :           2 :         }
    1113                 :             : 
    1114                 :             :         /* stuff done on every call of the function */
    1115                 :           3 :         funcctx = SRF_PERCALL_SETUP();
    1116                 :           3 :         status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
    1117                 :             : 
    1118         [ -  + ]:           3 :         if (status != NULL)
    1119                 :             :         {
    1120                 :           3 :                 ChannelName *entry;
    1121                 :             : 
    1122                 :           3 :                 entry = (ChannelName *) hash_seq_search(status);
    1123         [ +  + ]:           3 :                 if (entry != NULL)
    1124                 :           1 :                         SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
    1125         [ +  + ]:           3 :         }
    1126                 :             : 
    1127         [ +  - ]:           2 :         SRF_RETURN_DONE(funcctx);
    1128         [ -  + ]:           3 : }
    1129                 :             : 
    1130                 :             : /*
    1131                 :             :  * Async_UnlistenOnExit
    1132                 :             :  *
    1133                 :             :  * This is executed at backend exit if we have done any LISTENs in this
    1134                 :             :  * backend.  It might not be necessary anymore, if the user UNLISTENed
    1135                 :             :  * everything, but we don't try to detect that case.
    1136                 :             :  */
    1137                 :             : static void
    1138                 :           2 : Async_UnlistenOnExit(int code, Datum arg)
    1139                 :             : {
    1140                 :           2 :         CleanupListenersOnExit();
    1141                 :           2 :         asyncQueueUnregister();
    1142                 :           2 : }
    1143                 :             : 
    1144                 :             : /*
    1145                 :             :  * AtPrepare_Notify
    1146                 :             :  *
    1147                 :             :  *              This is called at the prepare phase of a two-phase
    1148                 :             :  *              transaction.  Save the state for possible commit later.
    1149                 :             :  */
    1150                 :             : void
    1151                 :           0 : AtPrepare_Notify(void)
    1152                 :             : {
    1153                 :             :         /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
    1154         [ #  # ]:           0 :         if (pendingActions || pendingNotifies)
    1155   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1156                 :             :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1157                 :             :                                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
    1158                 :           0 : }
    1159                 :             : 
    1160                 :             : /*
    1161                 :             :  * PreCommit_Notify
    1162                 :             :  *
    1163                 :             :  *              This is called at transaction commit, before actually committing to
    1164                 :             :  *              clog.
    1165                 :             :  *
    1166                 :             :  *              If there are pending LISTEN actions, make sure we are listed in the
    1167                 :             :  *              shared-memory listener array.  This must happen before commit to
    1168                 :             :  *              ensure we don't miss any notifies from transactions that commit
    1169                 :             :  *              just after ours.
    1170                 :             :  *
    1171                 :             :  *              If there are outbound notify requests in the pendingNotifies list,
    1172                 :             :  *              add them to the global queue.  We do that before commit so that
    1173                 :             :  *              we can still throw error if we run out of queue space.
    1174                 :             :  */
    1175                 :             : void
    1176                 :       50901 : PreCommit_Notify(void)
    1177                 :             : {
    1178                 :       50901 :         ListCell   *p;
    1179                 :             : 
    1180   [ +  +  +  + ]:       50901 :         if (!pendingActions && !pendingNotifies)
    1181                 :       50892 :                 return;                                 /* no relevant statements in this xact */
    1182                 :             : 
    1183         [ +  - ]:           9 :         if (Trace_notify)
    1184   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "PreCommit_Notify");
    1185                 :             : 
    1186                 :             :         /* Preflight for any pending listen/unlisten actions */
    1187                 :           9 :         initGlobalChannelTable();
    1188                 :             : 
    1189         [ +  + ]:           9 :         if (pendingActions != NULL)
    1190                 :             :         {
    1191                 :             :                 /* Ensure we have a local channel table */
    1192                 :           5 :                 initLocalChannelTable();
    1193                 :             :                 /* Create pendingListenActions hash table for this transaction */
    1194                 :           5 :                 initPendingListenActions();
    1195                 :             : 
    1196                 :             :                 /* Stage all the actions this transaction wants to perform */
    1197   [ +  -  +  +  :          10 :                 foreach(p, pendingActions->actions)
                   +  + ]
    1198                 :             :                 {
    1199                 :           5 :                         ListenAction *actrec = (ListenAction *) lfirst(p);
    1200                 :             : 
    1201   [ +  -  +  + ]:           5 :                         switch (actrec->action)
    1202                 :             :                         {
    1203                 :             :                                 case LISTEN_LISTEN:
    1204                 :           2 :                                         BecomeRegisteredListener();
    1205                 :           2 :                                         PrepareTableEntriesForListen(actrec->channel);
    1206                 :           2 :                                         break;
    1207                 :             :                                 case LISTEN_UNLISTEN:
    1208                 :           1 :                                         PrepareTableEntriesForUnlisten(actrec->channel);
    1209                 :           1 :                                         break;
    1210                 :             :                                 case LISTEN_UNLISTEN_ALL:
    1211                 :           2 :                                         PrepareTableEntriesForUnlistenAll();
    1212                 :           2 :                                         break;
    1213                 :             :                         }
    1214                 :           5 :                 }
    1215                 :           5 :         }
    1216                 :             : 
    1217                 :             :         /* Queue any pending notifies (must happen after the above) */
    1218         [ +  + ]:           9 :         if (pendingNotifies)
    1219                 :             :         {
    1220                 :           4 :                 ListCell   *nextNotify;
    1221                 :           4 :                 bool            firstIteration = true;
    1222                 :             : 
    1223                 :             :                 /*
    1224                 :             :                  * Build list of unique channel names being notified for use by
    1225                 :             :                  * SignalBackends().
    1226                 :             :                  *
    1227                 :             :                  * If uniqueChannelHash is available, use it to efficiently get the
    1228                 :             :                  * unique channels.  Otherwise, fall back to the O(N^2) approach.
    1229                 :             :                  */
    1230                 :           4 :                 pendingNotifies->uniqueChannelNames = NIL;
    1231         [ -  + ]:           4 :                 if (pendingNotifies->uniqueChannelHash != NULL)
    1232                 :             :                 {
    1233                 :           0 :                         HASH_SEQ_STATUS status;
    1234                 :           0 :                         ChannelName *channelEntry;
    1235                 :             : 
    1236                 :           0 :                         hash_seq_init(&status, pendingNotifies->uniqueChannelHash);
    1237         [ #  # ]:           0 :                         while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
    1238                 :           0 :                                 pendingNotifies->uniqueChannelNames =
    1239                 :           0 :                                         lappend(pendingNotifies->uniqueChannelNames,
    1240                 :           0 :                                                         channelEntry->channel);
    1241                 :           0 :                 }
    1242                 :             :                 else
    1243                 :             :                 {
    1244                 :             :                         /* O(N^2) approach is better for small number of notifications */
    1245   [ +  +  +  -  :          12 :                         foreach_ptr(Notification, n, pendingNotifies->events)
             +  +  +  + ]
    1246                 :             :                         {
    1247                 :           4 :                                 char       *channel = n->data;
    1248                 :           4 :                                 bool            found = false;
    1249                 :             : 
    1250                 :             :                                 /* Name present in list? */
    1251   [ +  +  -  +  :           8 :                                 foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames)
             #  #  -  + ]
    1252                 :             :                                 {
    1253         [ #  # ]:           0 :                                         if (strcmp(oldchan, channel) == 0)
    1254                 :             :                                         {
    1255                 :           0 :                                                 found = true;
    1256                 :           0 :                                                 break;
    1257                 :             :                                         }
    1258                 :           4 :                                 }
    1259                 :             :                                 /* Add if not already in list */
    1260         [ -  + ]:           4 :                                 if (!found)
    1261                 :           4 :                                         pendingNotifies->uniqueChannelNames =
    1262                 :           8 :                                                 lappend(pendingNotifies->uniqueChannelNames,
    1263                 :           4 :                                                                 channel);
    1264                 :           8 :                         }
    1265                 :             :                 }
    1266                 :             : 
    1267                 :             :                 /* Preallocate workspace that will be needed by SignalBackends() */
    1268         [ +  + ]:           4 :                 if (signalPids == NULL)
    1269                 :           2 :                         signalPids = MemoryContextAlloc(TopMemoryContext,
    1270                 :           1 :                                                                                         MaxBackends * sizeof(int32));
    1271                 :             : 
    1272         [ +  + ]:           4 :                 if (signalProcnos == NULL)
    1273                 :           2 :                         signalProcnos = MemoryContextAlloc(TopMemoryContext,
    1274                 :           1 :                                                                                            MaxBackends * sizeof(ProcNumber));
    1275                 :             : 
    1276                 :             :                 /*
    1277                 :             :                  * Make sure that we have an XID assigned to the current transaction.
    1278                 :             :                  * GetCurrentTransactionId is cheap if we already have an XID, but not
    1279                 :             :                  * so cheap if we don't, and we'd prefer not to do that work while
    1280                 :             :                  * holding NotifyQueueLock.
    1281                 :             :                  */
    1282                 :           4 :                 (void) GetCurrentTransactionId();
    1283                 :             : 
    1284                 :             :                 /*
    1285                 :             :                  * Serialize writers by acquiring a special lock that we hold till
    1286                 :             :                  * after commit.  This ensures that queue entries appear in commit
    1287                 :             :                  * order, and in particular that there are never uncommitted queue
    1288                 :             :                  * entries ahead of committed ones, so an uncommitted transaction
    1289                 :             :                  * can't block delivery of deliverable notifications.
    1290                 :             :                  *
    1291                 :             :                  * We use a heavyweight lock so that it'll automatically be released
    1292                 :             :                  * after either commit or abort.  This also allows deadlocks to be
    1293                 :             :                  * detected, though really a deadlock shouldn't be possible here.
    1294                 :             :                  *
    1295                 :             :                  * The lock is on "database 0", which is pretty ugly but it doesn't
    1296                 :             :                  * seem worth inventing a special locktag category just for this.
    1297                 :             :                  * (Historical note: before PG 9.0, a similar lock on "database 0" was
    1298                 :             :                  * used by the flatfiles mechanism.)
    1299                 :             :                  */
    1300                 :           4 :                 LockSharedObject(DatabaseRelationId, InvalidOid, 0,
    1301                 :             :                                                  AccessExclusiveLock);
    1302                 :             : 
    1303                 :             :                 /*
    1304                 :             :                  * For the direct advancement optimization in SignalBackends(), we
    1305                 :             :                  * need to ensure that no other backend can insert queue entries
    1306                 :             :                  * between queueHeadBeforeWrite and queueHeadAfterWrite.  The
    1307                 :             :                  * heavyweight lock above provides this guarantee, since it serializes
    1308                 :             :                  * all writers.
    1309                 :             :                  *
    1310                 :             :                  * Note: if the heavyweight lock were ever removed for scalability
    1311                 :             :                  * reasons, we could achieve the same guarantee by holding
    1312                 :             :                  * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
    1313                 :             :                  * than releasing and reacquiring it for each page as we do below.
    1314                 :             :                  */
    1315                 :             : 
    1316                 :             :                 /* Initialize values to a safe default in case list is empty */
    1317                 :           4 :                 SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
    1318                 :           4 :                 SET_QUEUE_POS(queueHeadAfterWrite, 0, 0);
    1319                 :             : 
    1320                 :             :                 /* Now push the notifications into the queue */
    1321                 :           4 :                 nextNotify = list_head(pendingNotifies->events);
    1322         [ +  + ]:           8 :                 while (nextNotify != NULL)
    1323                 :             :                 {
    1324                 :             :                         /*
    1325                 :             :                          * Add the pending notifications to the queue.  We acquire and
    1326                 :             :                          * release NotifyQueueLock once per page, which might be overkill
    1327                 :             :                          * but it does allow readers to get in while we're doing this.
    1328                 :             :                          *
    1329                 :             :                          * A full queue is very uncommon and should really not happen,
    1330                 :             :                          * given that we have so much space available in the SLRU pages.
    1331                 :             :                          * Nevertheless we need to deal with this possibility. Note that
    1332                 :             :                          * when we get here we are in the process of committing our
    1333                 :             :                          * transaction, but we have not yet committed to clog, so at this
    1334                 :             :                          * point in time we can still roll the transaction back.
    1335                 :             :                          */
    1336                 :           4 :                         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1337         [ -  + ]:           4 :                         if (firstIteration)
    1338                 :             :                         {
    1339                 :           4 :                                 queueHeadBeforeWrite = QUEUE_HEAD;
    1340                 :           4 :                                 firstIteration = false;
    1341                 :           4 :                         }
    1342                 :           4 :                         asyncQueueFillWarning();
    1343         [ +  - ]:           4 :                         if (asyncQueueIsFull())
    1344   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
    1345                 :             :                                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    1346                 :             :                                                  errmsg("too many notifications in the NOTIFY queue")));
    1347                 :           4 :                         nextNotify = asyncQueueAddEntries(nextNotify);
    1348                 :           4 :                         queueHeadAfterWrite = QUEUE_HEAD;
    1349                 :           4 :                         LWLockRelease(NotifyQueueLock);
    1350                 :             :                 }
    1351                 :             : 
    1352                 :             :                 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
    1353                 :           4 :         }
    1354         [ -  + ]:       50901 : }
    1355                 :             : 
    1356                 :             : /*
    1357                 :             :  * AtCommit_Notify
    1358                 :             :  *
    1359                 :             :  *              This is called at transaction commit, after committing to clog.
    1360                 :             :  *
    1361                 :             :  *              Apply pending listen/unlisten changes and clear transaction-local state.
    1362                 :             :  *
    1363                 :             :  *              If we issued any notifications in the transaction, send signals to
    1364                 :             :  *              listening backends (possibly including ourselves) to process them.
    1365                 :             :  *              Also, if we filled enough queue pages with new notifies, try to
    1366                 :             :  *              advance the queue tail pointer.
    1367                 :             :  */
    1368                 :             : void
    1369                 :       50901 : AtCommit_Notify(void)
    1370                 :             : {
    1371                 :             :         /*
    1372                 :             :          * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
    1373                 :             :          * return as soon as possible
    1374                 :             :          */
    1375   [ +  +  +  + ]:       50901 :         if (!pendingActions && !pendingNotifies)
    1376                 :       50892 :                 return;
    1377                 :             : 
    1378         [ +  - ]:           9 :         if (Trace_notify)
    1379   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "AtCommit_Notify");
    1380                 :             : 
    1381                 :             :         /* Apply staged listen/unlisten changes */
    1382                 :           9 :         ApplyPendingListenActions(true);
    1383                 :             : 
    1384                 :             :         /* If no longer listening to anything, get out of listener array */
    1385   [ +  +  +  -  :           9 :         if (amRegisteredListener && LocalChannelTableIsEmpty())
                   +  + ]
    1386                 :           2 :                 asyncQueueUnregister();
    1387                 :             : 
    1388                 :             :         /*
    1389                 :             :          * Send signals to listening backends.  We need do this only if there are
    1390                 :             :          * pending notifies, which were previously added to the shared queue by
    1391                 :             :          * PreCommit_Notify().
    1392                 :             :          */
    1393         [ +  + ]:           9 :         if (pendingNotifies != NULL)
    1394                 :           4 :                 SignalBackends();
    1395                 :             : 
    1396                 :             :         /*
    1397                 :             :          * If it's time to try to advance the global tail pointer, do that.
    1398                 :             :          *
    1399                 :             :          * (It might seem odd to do this in the sender, when more than likely the
    1400                 :             :          * listeners won't yet have read the messages we just sent.  However,
    1401                 :             :          * there's less contention if only the sender does it, and there is little
    1402                 :             :          * need for urgency in advancing the global tail.  So this typically will
    1403                 :             :          * be clearing out messages that were sent some time ago.)
    1404                 :             :          */
    1405         [ +  - ]:           9 :         if (tryAdvanceTail)
    1406                 :             :         {
    1407                 :           0 :                 tryAdvanceTail = false;
    1408                 :           0 :                 asyncQueueAdvanceTail();
    1409                 :           0 :         }
    1410                 :             : 
    1411                 :             :         /* And clean up */
    1412                 :           9 :         ClearPendingActionsAndNotifies();
    1413                 :       50901 : }
    1414                 :             : 
    1415                 :             : /*
    1416                 :             :  * BecomeRegisteredListener --- subroutine for PreCommit_Notify
    1417                 :             :  *
    1418                 :             :  * This function must make sure we are ready to catch any incoming messages.
    1419                 :             :  */
    1420                 :             : static void
    1421                 :           2 : BecomeRegisteredListener(void)
    1422                 :             : {
    1423                 :           2 :         QueuePosition head;
    1424                 :           2 :         QueuePosition max;
    1425                 :           2 :         ProcNumber      prevListener;
    1426                 :             : 
    1427                 :             :         /*
    1428                 :             :          * Nothing to do if we are already listening to something, nor if we
    1429                 :             :          * already ran this routine in this transaction.
    1430                 :             :          */
    1431         [ -  + ]:           2 :         if (amRegisteredListener)
    1432                 :           0 :                 return;
    1433                 :             : 
    1434         [ +  - ]:           2 :         if (Trace_notify)
    1435   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
    1436                 :             : 
    1437                 :             :         /*
    1438                 :             :          * Before registering, make sure we will unlisten before dying. (Note:
    1439                 :             :          * this action does not get undone if we abort later.)
    1440                 :             :          */
    1441         [ -  + ]:           2 :         if (!unlistenExitRegistered)
    1442                 :             :         {
    1443                 :           2 :                 before_shmem_exit(Async_UnlistenOnExit, 0);
    1444                 :           2 :                 unlistenExitRegistered = true;
    1445                 :           2 :         }
    1446                 :             : 
    1447                 :             :         /*
    1448                 :             :          * This is our first LISTEN, so establish our pointer.
    1449                 :             :          *
    1450                 :             :          * We set our pointer to the global tail pointer and then move it forward
    1451                 :             :          * over already-committed notifications.  This ensures we cannot miss any
    1452                 :             :          * not-yet-committed notifications.  We might get a few more but that
    1453                 :             :          * doesn't hurt.
    1454                 :             :          *
    1455                 :             :          * In some scenarios there might be a lot of committed notifications that
    1456                 :             :          * have not yet been pruned away (because some backend is being lazy about
    1457                 :             :          * reading them).  To reduce our startup time, we can look at other
    1458                 :             :          * backends and adopt the maximum "pos" pointer of any backend that's in
    1459                 :             :          * our database; any notifications it's already advanced over are surely
    1460                 :             :          * committed and need not be re-examined by us.  (We must consider only
    1461                 :             :          * backends connected to our DB, because others will not have bothered to
    1462                 :             :          * check committed-ness of notifications in our DB.)
    1463                 :             :          *
    1464                 :             :          * We need exclusive lock here so we can look at other backends' entries
    1465                 :             :          * and manipulate the list links.
    1466                 :             :          */
    1467                 :           2 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1468                 :           2 :         head = QUEUE_HEAD;
    1469                 :           2 :         max = QUEUE_TAIL;
    1470                 :           2 :         prevListener = INVALID_PROC_NUMBER;
    1471         [ -  + ]:           2 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1472                 :             :         {
    1473         [ #  # ]:           0 :                 if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1474   [ #  #  #  #  :           0 :                         max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
                   #  # ]
    1475                 :             :                 /* Also find last listening backend before this one */
    1476         [ #  # ]:           0 :                 if (i < MyProcNumber)
    1477                 :           0 :                         prevListener = i;
    1478                 :           0 :         }
    1479                 :           2 :         QUEUE_BACKEND_POS(MyProcNumber) = max;
    1480                 :           2 :         QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
    1481                 :           2 :         QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
    1482                 :           2 :         QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    1483                 :           2 :         QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    1484                 :             :         /* Insert backend into list of listeners at correct position */
    1485         [ -  + ]:           2 :         if (prevListener != INVALID_PROC_NUMBER)
    1486                 :             :         {
    1487                 :           0 :                 QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
    1488                 :           0 :                 QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
    1489                 :           0 :         }
    1490                 :             :         else
    1491                 :             :         {
    1492                 :           2 :                 QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
    1493                 :           2 :                 QUEUE_FIRST_LISTENER = MyProcNumber;
    1494                 :             :         }
    1495                 :           2 :         LWLockRelease(NotifyQueueLock);
    1496                 :             : 
    1497                 :             :         /* Now we are listed in the global array, so remember we're listening */
    1498                 :           2 :         amRegisteredListener = true;
    1499                 :             : 
    1500                 :             :         /*
    1501                 :             :          * Try to move our pointer forward as far as possible.  This will skip
    1502                 :             :          * over already-committed notifications, which we want to do because they
    1503                 :             :          * might be quite stale.  Note that we are not yet listening on anything,
    1504                 :             :          * so we won't deliver such notifications to our frontend.  Also, although
    1505                 :             :          * our transaction might have executed NOTIFY, those message(s) aren't
    1506                 :             :          * queued yet so we won't skip them here.
    1507                 :             :          */
    1508   [ +  -  +  + ]:           2 :         if (!QUEUE_POS_EQUAL(max, head))
    1509                 :           1 :                 asyncQueueReadAllNotifications();
    1510         [ -  + ]:           2 : }
    1511                 :             : 
    1512                 :             : /*
    1513                 :             :  * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
    1514                 :             :  *
    1515                 :             :  * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
    1516                 :             :  * an entry in localChannelTable, and pre-allocating an entry in the shared
    1517                 :             :  * globalChannelTable with listening=false.  The listening flag will be set
    1518                 :             :  * to true in AtCommit_Notify.  If we abort later, unwanted table entries
    1519                 :             :  * will be removed.
    1520                 :             :  */
    1521                 :             : static void
    1522                 :           2 : PrepareTableEntriesForListen(const char *channel)
    1523                 :             : {
    1524                 :           2 :         GlobalChannelKey key;
    1525                 :           2 :         GlobalChannelEntry *entry;
    1526                 :           2 :         bool            found;
    1527                 :           2 :         ListenerEntry *listeners;
    1528                 :           2 :         PendingListenEntry *pending;
    1529                 :             : 
    1530                 :             :         /*
    1531                 :             :          * Record in local pending hash that we want to LISTEN, overwriting any
    1532                 :             :          * earlier attempt to UNLISTEN.
    1533                 :             :          */
    1534                 :           2 :         pending = (PendingListenEntry *)
    1535                 :           2 :                 hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
    1536                 :           2 :         pending->action = PENDING_LISTEN;
    1537                 :             : 
    1538                 :             :         /*
    1539                 :             :          * Ensure that there is an entry for the channel in localChannelTable.
    1540                 :             :          * (Should this fail, we can just roll back.)  If the transaction fails
    1541                 :             :          * after this point, we will remove the entry if appropriate during
    1542                 :             :          * ApplyPendingListenActions.  Note that this entry allows IsListeningOn()
    1543                 :             :          * to return TRUE; we assume nothing is going to consult that before
    1544                 :             :          * AtCommit_Notify/AtAbort_Notify.  However, if later actions attempt to
    1545                 :             :          * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
    1546                 :             :          * present to ensure they do the right things; see
    1547                 :             :          * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
    1548                 :             :          */
    1549                 :           2 :         (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
    1550                 :             : 
    1551                 :             :         /* Pre-allocate entry in shared globalChannelTable with listening=false */
    1552                 :           2 :         GlobalChannelKeyInit(&key, MyDatabaseId, channel);
    1553                 :           2 :         entry = dshash_find_or_insert(globalChannelTable, &key, &found);
    1554                 :             : 
    1555         [ -  + ]:           2 :         if (!found)
    1556                 :             :         {
    1557                 :             :                 /* New channel entry, so initialize it to a safe state */
    1558                 :           2 :                 entry->listenersArray = InvalidDsaPointer;
    1559                 :           2 :                 entry->numListeners = 0;
    1560                 :           2 :                 entry->allocatedListeners = 0;
    1561                 :           2 :         }
    1562                 :             : 
    1563                 :             :         /*
    1564                 :             :          * Create listenersArray if entry doesn't have one.  It's tempting to fold
    1565                 :             :          * this into the !found case, but this coding allows us to cope in case
    1566                 :             :          * dsa_allocate() failed in an earlier attempt.
    1567                 :             :          */
    1568         [ -  + ]:           2 :         if (!DsaPointerIsValid(entry->listenersArray))
    1569                 :             :         {
    1570                 :           2 :                 entry->listenersArray = dsa_allocate(globalChannelDSA,
    1571                 :             :                                                                                          sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
    1572                 :           2 :                 entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
    1573                 :           2 :         }
    1574                 :             : 
    1575                 :           2 :         listeners = (ListenerEntry *)
    1576                 :           2 :                 dsa_get_address(globalChannelDSA, entry->listenersArray);
    1577                 :             : 
    1578                 :             :         /*
    1579                 :             :          * Check if we already have a ListenerEntry (possibly from earlier in this
    1580                 :             :          * transaction)
    1581                 :             :          */
    1582   [ -  +  -  + ]:           2 :         for (int i = 0; i < entry->numListeners; i++)
    1583                 :             :         {
    1584         [ #  # ]:           0 :                 if (listeners[i].procNo == MyProcNumber)
    1585                 :             :                 {
    1586                 :             :                         /* Already have an entry; listening flag stays as-is until commit */
    1587                 :           0 :                         dshash_release_lock(globalChannelTable, entry);
    1588                 :           0 :                         return;
    1589                 :             :                 }
    1590                 :           0 :         }
    1591                 :             : 
    1592                 :             :         /* Need to add a new entry; grow array if necessary */
    1593         [ +  - ]:           2 :         if (entry->numListeners >= entry->allocatedListeners)
    1594                 :             :         {
    1595                 :           0 :                 int                     new_size = entry->allocatedListeners * 2;
    1596                 :           0 :                 dsa_pointer old_array = entry->listenersArray;
    1597                 :           0 :                 dsa_pointer new_array = dsa_allocate(globalChannelDSA,
    1598                 :             :                                                                                          sizeof(ListenerEntry) * new_size);
    1599                 :           0 :                 ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array);
    1600                 :             : 
    1601                 :           0 :                 memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
    1602                 :           0 :                 entry->listenersArray = new_array;
    1603                 :           0 :                 entry->allocatedListeners = new_size;
    1604                 :           0 :                 dsa_free(globalChannelDSA, old_array);
    1605                 :           0 :                 listeners = new_listeners;
    1606                 :           0 :         }
    1607                 :             : 
    1608                 :           2 :         listeners[entry->numListeners].procNo = MyProcNumber;
    1609                 :           2 :         listeners[entry->numListeners].listening = false;    /* staged, not yet
    1610                 :             :                                                                                                                  * committed */
    1611                 :           2 :         entry->numListeners++;
    1612                 :             : 
    1613                 :           2 :         dshash_release_lock(globalChannelTable, entry);
    1614         [ -  + ]:           2 : }
    1615                 :             : 
    1616                 :             : /*
    1617                 :             :  * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
    1618                 :             :  *
    1619                 :             :  * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
    1620                 :             :  * we're currently listening (committed or staged).  We don't touch
    1621                 :             :  * globalChannelTable yet - the listener keeps receiving signals until
    1622                 :             :  * commit, when the entry is removed.
    1623                 :             :  */
    1624                 :             : static void
    1625                 :           1 : PrepareTableEntriesForUnlisten(const char *channel)
    1626                 :             : {
    1627                 :           1 :         PendingListenEntry *pending;
    1628                 :             : 
    1629                 :             :         /*
    1630                 :             :          * If the channel name is not in localChannelTable, then we are neither
    1631                 :             :          * listening on it nor preparing to listen on it, so we don't need to
    1632                 :             :          * record an UNLISTEN action.
    1633                 :             :          */
    1634         [ +  - ]:           1 :         Assert(localChannelTable != NULL);
    1635         [ +  - ]:           1 :         if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
    1636                 :           0 :                 return;
    1637                 :             : 
    1638                 :             :         /*
    1639                 :             :          * Record in local pending hash that we want to UNLISTEN, overwriting any
    1640                 :             :          * earlier attempt to LISTEN.  Don't touch localChannelTable or
    1641                 :             :          * globalChannelTable yet - we keep receiving signals until commit.
    1642                 :             :          */
    1643                 :           1 :         pending = (PendingListenEntry *)
    1644                 :           1 :                 hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
    1645                 :           1 :         pending->action = PENDING_UNLISTEN;
    1646         [ -  + ]:           1 : }
    1647                 :             : 
    1648                 :             : /*
    1649                 :             :  * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
    1650                 :             :  *
    1651                 :             :  * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
    1652                 :             :  * about-to-be-listened channels in pendingListenActions.
    1653                 :             :  */
    1654                 :             : static void
    1655                 :           2 : PrepareTableEntriesForUnlistenAll(void)
    1656                 :             : {
    1657                 :           2 :         HASH_SEQ_STATUS seq;
    1658                 :           2 :         ChannelName *channelEntry;
    1659                 :           2 :         PendingListenEntry *pending;
    1660                 :             : 
    1661                 :             :         /*
    1662                 :             :          * Scan localChannelTable, which will have the names of all channels that
    1663                 :             :          * we are listening on or have prepared to listen on.  Record an UNLISTEN
    1664                 :             :          * action for each one, overwriting any earlier attempt to LISTEN.
    1665                 :             :          */
    1666                 :           2 :         hash_seq_init(&seq, localChannelTable);
    1667         [ +  + ]:           3 :         while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
    1668                 :             :         {
    1669                 :           1 :                 pending = (PendingListenEntry *)
    1670                 :           1 :                         hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL);
    1671                 :           1 :                 pending->action = PENDING_UNLISTEN;
    1672                 :             :         }
    1673                 :           2 : }
    1674                 :             : 
    1675                 :             : /*
    1676                 :             :  * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
    1677                 :             :  *
    1678                 :             :  * Decrements numListeners, compacts the array, and frees the entry if empty.
    1679                 :             :  * Sets *entry_ptr to NULL if the entry was deleted.
    1680                 :             :  *
    1681                 :             :  * We could get the listeners pointer from the entry, but all callers
    1682                 :             :  * already have it at hand.
    1683                 :             :  */
    1684                 :             : static void
    1685                 :           2 : RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
    1686                 :             :                                                   ListenerEntry *listeners,
    1687                 :             :                                                   int idx)
    1688                 :             : {
    1689                 :           2 :         GlobalChannelEntry *entry = *entry_ptr;
    1690                 :             : 
    1691                 :           2 :         entry->numListeners--;
    1692         [ +  - ]:           2 :         if (idx < entry->numListeners)
    1693                 :           0 :                 memmove(&listeners[idx], &listeners[idx + 1],
    1694                 :             :                                 sizeof(ListenerEntry) * (entry->numListeners - idx));
    1695                 :             : 
    1696         [ -  + ]:           2 :         if (entry->numListeners == 0)
    1697                 :             :         {
    1698                 :           2 :                 dsa_free(globalChannelDSA, entry->listenersArray);
    1699                 :           2 :                 dshash_delete_entry(globalChannelTable, entry);
    1700                 :             :                 /* tells caller not to release the entry's lock: */
    1701                 :           2 :                 *entry_ptr = NULL;
    1702                 :           2 :         }
    1703                 :           2 : }
    1704                 :             : 
    1705                 :             : /*
    1706                 :             :  * ApplyPendingListenActions
    1707                 :             :  *
    1708                 :             :  * Apply, or revert, staged listen/unlisten changes to the local and global
    1709                 :             :  * hash tables.
    1710                 :             :  */
    1711                 :             : static void
    1712                 :        7025 : ApplyPendingListenActions(bool isCommit)
    1713                 :             : {
    1714                 :        7025 :         HASH_SEQ_STATUS seq;
    1715                 :        7025 :         PendingListenEntry *pending;
    1716                 :             : 
    1717                 :             :         /* Quick exit if nothing to do */
    1718         [ +  + ]:        7025 :         if (pendingListenActions == NULL)
    1719                 :        7020 :                 return;
    1720                 :             : 
    1721                 :             :         /* We made a globalChannelTable before building pendingListenActions */
    1722         [ +  - ]:           5 :         if (globalChannelTable == NULL)
    1723   [ #  #  #  # ]:           0 :                 elog(PANIC, "global channel table missing post-commit/abort");
    1724                 :             : 
    1725                 :             :         /* For each staged action ... */
    1726                 :           5 :         hash_seq_init(&seq, pendingListenActions);
    1727         [ +  + ]:           9 :         while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
    1728                 :             :         {
    1729                 :           4 :                 GlobalChannelKey key;
    1730                 :           4 :                 GlobalChannelEntry *entry;
    1731                 :           4 :                 bool            removeLocal = true;
    1732                 :           4 :                 bool            foundListener = false;
    1733                 :             : 
    1734                 :             :                 /*
    1735                 :             :                  * Find the global entry for this channel.  If isCommit, it had better
    1736                 :             :                  * exist (it was created in PreCommit).  In an abort, it might not
    1737                 :             :                  * exist, in which case we are not listening and should discard any
    1738                 :             :                  * local entry that PreCommit may have managed to create.
    1739                 :             :                  */
    1740                 :           4 :                 GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
    1741                 :           4 :                 entry = dshash_find(globalChannelTable, &key, true);
    1742         [ -  + ]:           4 :                 if (entry != NULL)
    1743                 :             :                 {
    1744                 :             :                         /* Scan entry to find the ListenerEntry for this backend */
    1745                 :           4 :                         ListenerEntry *listeners;
    1746                 :             : 
    1747                 :           4 :                         listeners = (ListenerEntry *)
    1748                 :           4 :                                 dsa_get_address(globalChannelDSA, entry->listenersArray);
    1749                 :             : 
    1750         [ +  - ]:           8 :                         for (int i = 0; i < entry->numListeners; i++)
    1751                 :             :                         {
    1752         [ -  + ]:           4 :                                 if (listeners[i].procNo != MyProcNumber)
    1753                 :           0 :                                         continue;
    1754                 :           4 :                                 foundListener = true;
    1755         [ +  - ]:           4 :                                 if (isCommit)
    1756                 :             :                                 {
    1757         [ +  + ]:           4 :                                         if (pending->action == PENDING_LISTEN)
    1758                 :             :                                         {
    1759                 :             :                                                 /*
    1760                 :             :                                                  * LISTEN being committed: set listening=true.
    1761                 :             :                                                  * localChannelTable entry was created during
    1762                 :             :                                                  * PreCommit and should be kept.
    1763                 :             :                                                  */
    1764                 :           2 :                                                 listeners[i].listening = true;
    1765                 :           2 :                                                 removeLocal = false;
    1766                 :           2 :                                         }
    1767                 :             :                                         else
    1768                 :             :                                         {
    1769                 :             :                                                 /*
    1770                 :             :                                                  * UNLISTEN being committed: remove pre-allocated
    1771                 :             :                                                  * entries from both tables.
    1772                 :             :                                                  */
    1773                 :           2 :                                                 RemoveListenerFromChannel(&entry, listeners, i);
    1774                 :             :                                         }
    1775                 :           4 :                                 }
    1776                 :             :                                 else
    1777                 :             :                                 {
    1778                 :             :                                         /*
    1779                 :             :                                          * Note: this part is reachable only if the transaction
    1780                 :             :                                          * aborts after PreCommit_Notify() has made some
    1781                 :             :                                          * pendingListenActions entries, so it's pretty hard to
    1782                 :             :                                          * test.
    1783                 :             :                                          */
    1784         [ #  # ]:           0 :                                         if (!listeners[i].listening)
    1785                 :             :                                         {
    1786                 :             :                                                 /*
    1787                 :             :                                                  * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
    1788                 :             :                                                  * and we weren't listening before, so remove
    1789                 :             :                                                  * pre-allocated entries from both tables.
    1790                 :             :                                                  */
    1791                 :           0 :                                                 RemoveListenerFromChannel(&entry, listeners, i);
    1792                 :           0 :                                         }
    1793                 :             :                                         else
    1794                 :             :                                         {
    1795                 :             :                                                 /*
    1796                 :             :                                                  * We're aborting, but the previous state was that
    1797                 :             :                                                  * we're listening, so keep localChannelTable entry.
    1798                 :             :                                                  */
    1799                 :           0 :                                                 removeLocal = false;
    1800                 :             :                                         }
    1801                 :             :                                 }
    1802                 :           4 :                                 break;                  /* there shouldn't be another match */
    1803                 :             :                         }
    1804                 :             : 
    1805                 :             :                         /* We might have already released the entry by removing it */
    1806         [ +  + ]:           4 :                         if (entry != NULL)
    1807                 :           2 :                                 dshash_release_lock(globalChannelTable, entry);
    1808                 :           4 :                 }
    1809                 :             : 
    1810                 :             :                 /*
    1811                 :             :                  * If we're committing a LISTEN action, we should have found a
    1812                 :             :                  * matching ListenerEntry, but otherwise it's okay if we didn't.
    1813                 :             :                  */
    1814   [ +  -  +  +  :           4 :                 if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
                   +  - ]
    1815   [ #  #  #  # ]:           0 :                         elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
    1816                 :             :                                  pending->channel, MyProcNumber);
    1817                 :             : 
    1818                 :             :                 /*
    1819                 :             :                  * If we did not find a globalChannelTable entry for our backend, or
    1820                 :             :                  * if we are unlistening, remove any localChannelTable entry that may
    1821                 :             :                  * exist.  (Note in particular that this cleans up if we created a
    1822                 :             :                  * localChannelTable entry and then failed while trying to create a
    1823                 :             :                  * globalChannelTable entry.)
    1824                 :             :                  */
    1825   [ +  +  +  - ]:           4 :                 if (removeLocal && localChannelTable != NULL)
    1826                 :           2 :                         (void) hash_search(localChannelTable, pending->channel,
    1827                 :             :                                                            HASH_REMOVE, NULL);
    1828                 :           4 :         }
    1829         [ -  + ]:        7025 : }
    1830                 :             : 
    1831                 :             : /*
    1832                 :             :  * CleanupListenersOnExit --- called from Async_UnlistenOnExit
    1833                 :             :  *
    1834                 :             :  *              Remove this backend from all channels in the shared global table.
    1835                 :             :  */
    1836                 :             : static void
    1837                 :           2 : CleanupListenersOnExit(void)
    1838                 :             : {
    1839                 :           2 :         dshash_seq_status status;
    1840                 :           2 :         GlobalChannelEntry *entry;
    1841                 :             : 
    1842         [ +  - ]:           2 :         if (Trace_notify)
    1843   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
    1844                 :             : 
    1845                 :             :         /* Clear our local cache (not really necessary, but be consistent) */
    1846         [ +  - ]:           2 :         if (localChannelTable != NULL)
    1847                 :             :         {
    1848                 :           2 :                 hash_destroy(localChannelTable);
    1849                 :           2 :                 localChannelTable = NULL;
    1850                 :           2 :         }
    1851                 :             : 
    1852                 :             :         /* Now remove our entries from the shared globalChannelTable */
    1853         [ +  - ]:           2 :         if (globalChannelTable == NULL)
    1854                 :           0 :                 return;
    1855                 :             : 
    1856                 :           2 :         dshash_seq_init(&status, globalChannelTable, true);
    1857         [ -  + ]:           2 :         while ((entry = dshash_seq_next(&status)) != NULL)
    1858                 :             :         {
    1859                 :           0 :                 ListenerEntry *listeners;
    1860                 :             : 
    1861         [ #  # ]:           0 :                 if (entry->key.dboid != MyDatabaseId)
    1862                 :           0 :                         continue;                       /* not relevant */
    1863                 :             : 
    1864                 :           0 :                 listeners = (ListenerEntry *)
    1865                 :           0 :                         dsa_get_address(globalChannelDSA, entry->listenersArray);
    1866                 :             : 
    1867         [ #  # ]:           0 :                 for (int i = 0; i < entry->numListeners; i++)
    1868                 :             :                 {
    1869         [ #  # ]:           0 :                         if (listeners[i].procNo == MyProcNumber)
    1870                 :             :                         {
    1871                 :           0 :                                 entry->numListeners--;
    1872         [ #  # ]:           0 :                                 if (i < entry->numListeners)
    1873                 :           0 :                                         memmove(&listeners[i], &listeners[i + 1],
    1874                 :             :                                                         sizeof(ListenerEntry) * (entry->numListeners - i));
    1875                 :             : 
    1876         [ #  # ]:           0 :                                 if (entry->numListeners == 0)
    1877                 :             :                                 {
    1878                 :           0 :                                         dsa_free(globalChannelDSA, entry->listenersArray);
    1879                 :           0 :                                         dshash_delete_current(&status);
    1880                 :           0 :                                 }
    1881                 :           0 :                                 break;
    1882                 :             :                         }
    1883                 :           0 :                 }
    1884         [ #  # ]:           0 :         }
    1885                 :           2 :         dshash_seq_term(&status);
    1886                 :           2 : }
    1887                 :             : 
    1888                 :             : /*
    1889                 :             :  * Test whether we are actively listening on the given channel name.
    1890                 :             :  *
    1891                 :             :  * Note: this function is executed for every notification found in the queue.
    1892                 :             :  */
    1893                 :             : static bool
    1894                 :           0 : IsListeningOn(const char *channel)
    1895                 :             : {
    1896         [ #  # ]:           0 :         if (localChannelTable == NULL)
    1897                 :           0 :                 return false;
    1898                 :             : 
    1899                 :           0 :         return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
    1900                 :           0 : }
    1901                 :             : 
    1902                 :             : /*
    1903                 :             :  * Remove our entry from the listeners array when we are no longer listening
    1904                 :             :  * on any channel.  NB: must not fail if we're already not listening.
    1905                 :             :  */
    1906                 :             : static void
    1907                 :           4 : asyncQueueUnregister(void)
    1908                 :             : {
    1909   [ +  +  +  - ]:           4 :         Assert(LocalChannelTableIsEmpty()); /* else caller error */
    1910                 :             : 
    1911         [ +  + ]:           4 :         if (!amRegisteredListener)      /* nothing to do */
    1912                 :           2 :                 return;
    1913                 :             : 
    1914                 :             :         /*
    1915                 :             :          * Need exclusive lock here to manipulate list links.
    1916                 :             :          */
    1917                 :           2 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1918                 :             :         /* Mark our entry as invalid */
    1919                 :           2 :         QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
    1920                 :           2 :         QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
    1921                 :           2 :         QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    1922                 :           2 :         QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    1923                 :             :         /* and remove it from the list */
    1924         [ +  - ]:           2 :         if (QUEUE_FIRST_LISTENER == MyProcNumber)
    1925                 :           2 :                 QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
    1926                 :             :         else
    1927                 :             :         {
    1928         [ #  # ]:           0 :                 for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1929                 :             :                 {
    1930         [ #  # ]:           0 :                         if (QUEUE_NEXT_LISTENER(i) == MyProcNumber)
    1931                 :             :                         {
    1932                 :           0 :                                 QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber);
    1933                 :           0 :                                 break;
    1934                 :             :                         }
    1935                 :           0 :                 }
    1936                 :             :         }
    1937                 :           2 :         QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER;
    1938                 :           2 :         LWLockRelease(NotifyQueueLock);
    1939                 :             : 
    1940                 :             :         /* mark ourselves as no longer listed in the global array */
    1941                 :           2 :         amRegisteredListener = false;
    1942                 :           4 : }
    1943                 :             : 
    1944                 :             : /*
    1945                 :             :  * Test whether there is room to insert more notification messages.
    1946                 :             :  *
    1947                 :             :  * Caller must hold at least shared NotifyQueueLock.
    1948                 :             :  */
    1949                 :             : static bool
    1950                 :           4 : asyncQueueIsFull(void)
    1951                 :             : {
    1952                 :           4 :         int64           headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1953                 :           4 :         int64           tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1954                 :           4 :         int64           occupied = headPage - tailPage;
    1955                 :             : 
    1956                 :           8 :         return occupied >= max_notify_queue_pages;
    1957                 :           4 : }
    1958                 :             : 
    1959                 :             : /*
    1960                 :             :  * Advance the QueuePosition to the next entry, assuming that the current
    1961                 :             :  * entry is of length entryLength.  If we jump to a new page the function
    1962                 :             :  * returns true, else false.
    1963                 :             :  */
    1964                 :             : static bool
    1965                 :           8 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
    1966                 :             : {
    1967                 :           8 :         int64           pageno = QUEUE_POS_PAGE(*position);
    1968                 :           8 :         int                     offset = QUEUE_POS_OFFSET(*position);
    1969                 :           8 :         bool            pageJump = false;
    1970                 :             : 
    1971                 :             :         /*
    1972                 :             :          * Move to the next writing position: First jump over what we have just
    1973                 :             :          * written or read.
    1974                 :             :          */
    1975                 :           8 :         offset += entryLength;
    1976         [ +  - ]:           8 :         Assert(offset <= QUEUE_PAGESIZE);
    1977                 :             : 
    1978                 :             :         /*
    1979                 :             :          * In a second step check if another entry can possibly be written to the
    1980                 :             :          * page. If so, stay here, we have reached the next position. If not, then
    1981                 :             :          * we need to move on to the next page.
    1982                 :             :          */
    1983         [ +  - ]:           8 :         if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
    1984                 :             :         {
    1985                 :           0 :                 pageno++;
    1986                 :           0 :                 offset = 0;
    1987                 :           0 :                 pageJump = true;
    1988                 :           0 :         }
    1989                 :             : 
    1990                 :           8 :         SET_QUEUE_POS(*position, pageno, offset);
    1991                 :          16 :         return pageJump;
    1992                 :           8 : }
    1993                 :             : 
    1994                 :             : /*
    1995                 :             :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
    1996                 :             :  */
    1997                 :             : static void
    1998                 :           4 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
    1999                 :             : {
    2000                 :           4 :         size_t          channellen = n->channel_len;
    2001                 :           4 :         size_t          payloadlen = n->payload_len;
    2002                 :           4 :         int                     entryLength;
    2003                 :             : 
    2004         [ +  - ]:           4 :         Assert(channellen < NAMEDATALEN);
    2005         [ +  - ]:           4 :         Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
    2006                 :             : 
    2007                 :             :         /* The terminators are already included in AsyncQueueEntryEmptySize */
    2008                 :           4 :         entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
    2009                 :           4 :         entryLength = QUEUEALIGN(entryLength);
    2010                 :           4 :         qe->length = entryLength;
    2011                 :           4 :         qe->dboid = MyDatabaseId;
    2012                 :           4 :         qe->xid = GetCurrentTransactionId();
    2013                 :           4 :         qe->srcPid = MyProcPid;
    2014                 :           4 :         memcpy(qe->data, n->data, channellen + payloadlen + 2);
    2015                 :           4 : }
    2016                 :             : 
    2017                 :             : /*
    2018                 :             :  * Add pending notifications to the queue.
    2019                 :             :  *
    2020                 :             :  * We go page by page here, i.e. we stop once we have to go to a new page but
    2021                 :             :  * we will be called again and then fill that next page. If an entry does not
    2022                 :             :  * fit into the current page, we write a dummy entry with an InvalidOid as the
    2023                 :             :  * database OID in order to fill the page. So every page is always used up to
    2024                 :             :  * the last byte which simplifies reading the page later.
    2025                 :             :  *
    2026                 :             :  * We are passed the list cell (in pendingNotifies->events) containing the next
    2027                 :             :  * notification to write and return the first still-unwritten cell back.
    2028                 :             :  * Eventually we will return NULL indicating all is done.
    2029                 :             :  *
    2030                 :             :  * We are holding NotifyQueueLock already from the caller and grab
    2031                 :             :  * page specific SLRU bank lock locally in this function.
    2032                 :             :  */
    2033                 :             : static ListCell *
    2034                 :           4 : asyncQueueAddEntries(ListCell *nextNotify)
    2035                 :             : {
    2036                 :           4 :         AsyncQueueEntry qe;
    2037                 :           4 :         QueuePosition queue_head;
    2038                 :           4 :         int64           pageno;
    2039                 :           4 :         int                     offset;
    2040                 :           4 :         int                     slotno;
    2041                 :           4 :         LWLock     *prevlock;
    2042                 :             : 
    2043                 :             :         /*
    2044                 :             :          * We work with a local copy of QUEUE_HEAD, which we write back to shared
    2045                 :             :          * memory upon exiting.  The reason for this is that if we have to advance
    2046                 :             :          * to a new page, SimpleLruZeroPage might fail (out of disk space, for
    2047                 :             :          * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
    2048                 :             :          * subsequent insertions would try to put entries into a page that slru.c
    2049                 :             :          * thinks doesn't exist yet.)  So, use a local position variable.  Note
    2050                 :             :          * that if we do fail, any already-inserted queue entries are forgotten;
    2051                 :             :          * this is okay, since they'd be useless anyway after our transaction
    2052                 :             :          * rolls back.
    2053                 :             :          */
    2054                 :           4 :         queue_head = QUEUE_HEAD;
    2055                 :             : 
    2056                 :             :         /*
    2057                 :             :          * If this is the first write since the postmaster started, we need to
    2058                 :             :          * initialize the first page of the async SLRU.  Otherwise, the current
    2059                 :             :          * page should be initialized already, so just fetch it.
    2060                 :             :          */
    2061                 :           4 :         pageno = QUEUE_POS_PAGE(queue_head);
    2062                 :           4 :         prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2063                 :             : 
    2064                 :             :         /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
    2065                 :           4 :         LWLockAcquire(prevlock, LW_EXCLUSIVE);
    2066                 :             : 
    2067   [ +  -  +  + ]:           4 :         if (QUEUE_POS_IS_ZERO(queue_head))
    2068                 :           1 :                 slotno = SimpleLruZeroPage(NotifyCtl, pageno);
    2069                 :             :         else
    2070                 :           3 :                 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    2071                 :             :                                                                    InvalidTransactionId);
    2072                 :             : 
    2073                 :             :         /* Note we mark the page dirty before writing in it */
    2074                 :           4 :         NotifyCtl->shared->page_dirty[slotno] = true;
    2075                 :             : 
    2076         [ +  + ]:           8 :         while (nextNotify != NULL)
    2077                 :             :         {
    2078                 :           4 :                 Notification *n = (Notification *) lfirst(nextNotify);
    2079                 :             : 
    2080                 :             :                 /* Construct a valid queue entry in local variable qe */
    2081                 :           4 :                 asyncQueueNotificationToEntry(n, &qe);
    2082                 :             : 
    2083                 :           4 :                 offset = QUEUE_POS_OFFSET(queue_head);
    2084                 :             : 
    2085                 :             :                 /* Check whether the entry really fits on the current page */
    2086         [ +  - ]:           4 :                 if (offset + qe.length <= QUEUE_PAGESIZE)
    2087                 :             :                 {
    2088                 :             :                         /* OK, so advance nextNotify past this item */
    2089                 :           4 :                         nextNotify = lnext(pendingNotifies->events, nextNotify);
    2090                 :           4 :                 }
    2091                 :             :                 else
    2092                 :             :                 {
    2093                 :             :                         /*
    2094                 :             :                          * Write a dummy entry to fill up the page. Actually readers will
    2095                 :             :                          * only check dboid and since it won't match any reader's database
    2096                 :             :                          * OID, they will ignore this entry and move on.
    2097                 :             :                          */
    2098                 :           0 :                         qe.length = QUEUE_PAGESIZE - offset;
    2099                 :           0 :                         qe.dboid = InvalidOid;
    2100                 :           0 :                         qe.xid = InvalidTransactionId;
    2101                 :           0 :                         qe.data[0] = '\0';      /* empty channel */
    2102                 :           0 :                         qe.data[1] = '\0';      /* empty payload */
    2103                 :             :                 }
    2104                 :             : 
    2105                 :             :                 /* Now copy qe into the shared buffer page */
    2106                 :           4 :                 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
    2107                 :             :                            &qe,
    2108                 :             :                            qe.length);
    2109                 :             : 
    2110                 :             :                 /* Advance queue_head appropriately, and detect if page is full */
    2111         [ -  + ]:           4 :                 if (asyncQueueAdvance(&(queue_head), qe.length))
    2112                 :             :                 {
    2113                 :           0 :                         LWLock     *lock;
    2114                 :             : 
    2115                 :           0 :                         pageno = QUEUE_POS_PAGE(queue_head);
    2116                 :           0 :                         lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2117         [ #  # ]:           0 :                         if (lock != prevlock)
    2118                 :             :                         {
    2119                 :           0 :                                 LWLockRelease(prevlock);
    2120                 :           0 :                                 LWLockAcquire(lock, LW_EXCLUSIVE);
    2121                 :           0 :                                 prevlock = lock;
    2122                 :           0 :                         }
    2123                 :             : 
    2124                 :             :                         /*
    2125                 :             :                          * Page is full, so we're done here, but first fill the next page
    2126                 :             :                          * with zeroes.  The reason to do this is to ensure that slru.c's
    2127                 :             :                          * idea of the head page is always the same as ours, which avoids
    2128                 :             :                          * boundary problems in SimpleLruTruncate.  The test in
    2129                 :             :                          * asyncQueueIsFull() ensured that there is room to create this
    2130                 :             :                          * page without overrunning the queue.
    2131                 :             :                          */
    2132                 :           0 :                         slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
    2133                 :             : 
    2134                 :             :                         /*
    2135                 :             :                          * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
    2136                 :             :                          * set flag to remember that we should try to advance the tail
    2137                 :             :                          * pointer (we don't want to actually do that right here).
    2138                 :             :                          */
    2139         [ #  # ]:           0 :                         if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
    2140                 :           0 :                                 tryAdvanceTail = true;
    2141                 :             : 
    2142                 :             :                         /* And exit the loop */
    2143                 :             :                         break;
    2144                 :           0 :                 }
    2145      [ -  -  + ]:           4 :         }
    2146                 :             : 
    2147                 :             :         /* Success, so update the global QUEUE_HEAD */
    2148                 :           4 :         QUEUE_HEAD = queue_head;
    2149                 :             : 
    2150                 :           4 :         LWLockRelease(prevlock);
    2151                 :             : 
    2152                 :           8 :         return nextNotify;
    2153                 :           4 : }
    2154                 :             : 
    2155                 :             : /*
    2156                 :             :  * SQL function to return the fraction of the notification queue currently
    2157                 :             :  * occupied.
    2158                 :             :  */
    2159                 :             : Datum
    2160                 :           1 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
    2161                 :             : {
    2162                 :           1 :         double          usage;
    2163                 :             : 
    2164                 :             :         /* Advance the queue tail so we don't report a too-large result */
    2165                 :           1 :         asyncQueueAdvanceTail();
    2166                 :             : 
    2167                 :           1 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2168                 :           1 :         usage = asyncQueueUsage();
    2169                 :           1 :         LWLockRelease(NotifyQueueLock);
    2170                 :             : 
    2171                 :           2 :         PG_RETURN_FLOAT8(usage);
    2172                 :           1 : }
    2173                 :             : 
    2174                 :             : /*
    2175                 :             :  * Return the fraction of the queue that is currently occupied.
    2176                 :             :  *
    2177                 :             :  * The caller must hold NotifyQueueLock in (at least) shared mode.
    2178                 :             :  *
    2179                 :             :  * Note: we measure the distance to the logical tail page, not the physical
    2180                 :             :  * tail page.  In some sense that's wrong, but the relative position of the
    2181                 :             :  * physical tail is affected by details such as SLRU segment boundaries,
    2182                 :             :  * so that a result based on that is unpleasantly unstable.
    2183                 :             :  */
    2184                 :             : static double
    2185                 :           5 : asyncQueueUsage(void)
    2186                 :             : {
    2187                 :           5 :         int64           headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    2188                 :           5 :         int64           tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    2189                 :           5 :         int64           occupied = headPage - tailPage;
    2190                 :             : 
    2191         [ -  + ]:           5 :         if (occupied == 0)
    2192                 :           5 :                 return (double) 0;              /* fast exit for common case */
    2193                 :             : 
    2194                 :           0 :         return (double) occupied / (double) max_notify_queue_pages;
    2195                 :           5 : }
    2196                 :             : 
    2197                 :             : /*
    2198                 :             :  * Check whether the queue is at least half full, and emit a warning if so.
    2199                 :             :  *
    2200                 :             :  * This is unlikely given the size of the queue, but possible.
    2201                 :             :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
    2202                 :             :  *
    2203                 :             :  * Caller must hold exclusive NotifyQueueLock.
    2204                 :             :  */
    2205                 :             : static void
    2206                 :           4 : asyncQueueFillWarning(void)
    2207                 :             : {
    2208                 :           4 :         double          fillDegree;
    2209                 :           4 :         TimestampTz t;
    2210                 :             : 
    2211                 :           4 :         fillDegree = asyncQueueUsage();
    2212         [ +  - ]:           4 :         if (fillDegree < 0.5)
    2213                 :           4 :                 return;
    2214                 :             : 
    2215                 :           0 :         t = GetCurrentTimestamp();
    2216                 :             : 
    2217   [ #  #  #  # ]:           0 :         if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
    2218                 :           0 :                                                                    t, QUEUE_FULL_WARN_INTERVAL))
    2219                 :             :         {
    2220                 :           0 :                 QueuePosition min = QUEUE_HEAD;
    2221                 :           0 :                 int32           minPid = InvalidPid;
    2222                 :             : 
    2223         [ #  # ]:           0 :                 for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2224                 :             :                 {
    2225         [ #  # ]:           0 :                         Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2226   [ #  #  #  #  :           0 :                         min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
                   #  # ]
    2227   [ #  #  #  # ]:           0 :                         if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
    2228                 :           0 :                                 minPid = QUEUE_BACKEND_PID(i);
    2229                 :           0 :                 }
    2230                 :             : 
    2231   [ #  #  #  #  :           0 :                 ereport(WARNING,
             #  #  #  # ]
    2232                 :             :                                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
    2233                 :             :                                  (minPid != InvalidPid ?
    2234                 :             :                                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
    2235                 :             :                                   : 0),
    2236                 :             :                                  (minPid != InvalidPid ?
    2237                 :             :                                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
    2238                 :             :                                   : 0)));
    2239                 :             : 
    2240                 :           0 :                 asyncQueueControl->lastQueueFillWarn = t;
    2241                 :           0 :         }
    2242         [ -  + ]:           4 : }
    2243                 :             : 
    2244                 :             : /*
    2245                 :             :  * Send signals to listening backends.
    2246                 :             :  *
    2247                 :             :  * Normally we signal only backends that are interested in the notifies that
    2248                 :             :  * we just sent.  However, that will leave idle listeners falling further and
    2249                 :             :  * further behind.  Waken them anyway if they're far enough behind, so they'll
    2250                 :             :  * advance their queue position pointers, allowing the global tail to advance.
    2251                 :             :  *
    2252                 :             :  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
    2253                 :             :  *
    2254                 :             :  * This is called during CommitTransaction(), so it's important for it
    2255                 :             :  * to have very low probability of failure.
    2256                 :             :  */
    2257                 :             : static void
    2258                 :           4 : SignalBackends(void)
    2259                 :             : {
    2260                 :           4 :         int                     count;
    2261                 :             : 
    2262                 :             :         /* Can't get here without PreCommit_Notify having made the global table */
    2263         [ +  - ]:           4 :         Assert(globalChannelTable != NULL);
    2264                 :             : 
    2265                 :             :         /* It should have set up these arrays, too */
    2266         [ +  - ]:           4 :         Assert(signalPids != NULL && signalProcnos != NULL);
    2267                 :             : 
    2268                 :             :         /*
    2269                 :             :          * Identify backends that we need to signal.  We don't want to send
    2270                 :             :          * signals while holding the NotifyQueueLock, so this part just builds a
    2271                 :             :          * list of target PIDs in signalPids[] and signalProcnos[].
    2272                 :             :          */
    2273                 :           4 :         count = 0;
    2274                 :             : 
    2275                 :           4 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2276                 :             : 
    2277                 :             :         /* Scan each channel name that we notified in this transaction */
    2278   [ +  +  +  -  :          12 :         foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames)
             +  +  +  + ]
    2279                 :             :         {
    2280                 :           4 :                 GlobalChannelKey key;
    2281                 :           4 :                 GlobalChannelEntry *entry;
    2282                 :           4 :                 ListenerEntry *listeners;
    2283                 :             : 
    2284                 :           4 :                 GlobalChannelKeyInit(&key, MyDatabaseId, channel);
    2285                 :           4 :                 entry = dshash_find(globalChannelTable, &key, false);
    2286         [ -  + ]:           4 :                 if (entry == NULL)
    2287                 :           4 :                         continue;                       /* nobody is listening */
    2288                 :             : 
    2289                 :           0 :                 listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
    2290                 :           0 :                                                                                                           entry->listenersArray);
    2291                 :             : 
    2292                 :             :                 /* Identify listeners that now need waking, add them to arrays */
    2293         [ #  # ]:           0 :                 for (int j = 0; j < entry->numListeners; j++)
    2294                 :             :                 {
    2295                 :           0 :                         ProcNumber      i;
    2296                 :           0 :                         int32           pid;
    2297                 :           0 :                         QueuePosition pos;
    2298                 :             : 
    2299         [ #  # ]:           0 :                         if (!listeners[j].listening)
    2300                 :           0 :                                 continue;               /* ignore not-yet-committed listeners */
    2301                 :             : 
    2302                 :           0 :                         i = listeners[j].procNo;
    2303                 :             : 
    2304         [ #  # ]:           0 :                         if (QUEUE_BACKEND_WAKEUP_PENDING(i))
    2305                 :           0 :                                 continue;               /* already signaled, no need to repeat */
    2306                 :             : 
    2307                 :           0 :                         pid = QUEUE_BACKEND_PID(i);
    2308                 :           0 :                         pos = QUEUE_BACKEND_POS(i);
    2309                 :             : 
    2310   [ #  #  #  # ]:           0 :                         if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
    2311                 :           0 :                                 continue;               /* it's fully caught up already */
    2312                 :             : 
    2313         [ #  # ]:           0 :                         Assert(pid != InvalidPid);
    2314                 :             : 
    2315                 :           0 :                         QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
    2316                 :           0 :                         signalPids[count] = pid;
    2317                 :           0 :                         signalProcnos[count] = i;
    2318                 :           0 :                         count++;
    2319         [ #  # ]:           0 :                 }
    2320                 :             : 
    2321                 :           0 :                 dshash_release_lock(globalChannelTable, entry);
    2322         [ +  - ]:           8 :         }
    2323                 :             : 
    2324                 :             :         /*
    2325                 :             :          * Scan all listeners.  Any that are not already pending wakeup must not
    2326                 :             :          * be interested in our notifications (else we'd have set their wakeup
    2327                 :             :          * flags above).  Check to see if we can directly advance their queue
    2328                 :             :          * pointers to save a wakeup.  Otherwise, if they are far behind, wake
    2329                 :             :          * them anyway so they will catch up.
    2330                 :             :          */
    2331         [ -  + ]:           4 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2332                 :             :         {
    2333                 :           0 :                 int32           pid;
    2334                 :           0 :                 QueuePosition pos;
    2335                 :             : 
    2336         [ #  # ]:           0 :                 if (QUEUE_BACKEND_WAKEUP_PENDING(i))
    2337                 :           0 :                         continue;
    2338                 :             : 
    2339                 :             :                 /* If it's currently advancing, we should not touch it */
    2340         [ #  # ]:           0 :                 if (QUEUE_BACKEND_IS_ADVANCING(i))
    2341                 :           0 :                         continue;
    2342                 :             : 
    2343                 :           0 :                 pid = QUEUE_BACKEND_PID(i);
    2344                 :           0 :                 pos = QUEUE_BACKEND_POS(i);
    2345                 :             : 
    2346                 :             :                 /*
    2347                 :             :                  * We can directly advance the other backend's queue pointer if it's
    2348                 :             :                  * not currently advancing (else there are race conditions), and its
    2349                 :             :                  * current pointer is not behind queueHeadBeforeWrite (else we'd make
    2350                 :             :                  * it miss some older messages), and we'd not be moving the pointer
    2351                 :             :                  * backward.
    2352                 :             :                  */
    2353   [ #  #  #  #  :           0 :                 if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) &&
                   #  # ]
    2354   [ #  #  #  # ]:           0 :                         QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
    2355                 :             :                 {
    2356                 :             :                         /* We can directly advance its pointer past what we wrote */
    2357                 :           0 :                         QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
    2358                 :           0 :                 }
    2359                 :           0 :                 else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
    2360   [ #  #  #  # ]:           0 :                                                                         QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY)
    2361                 :             :                 {
    2362                 :             :                         /* It's idle and far behind, so wake it up */
    2363         [ #  # ]:           0 :                         Assert(pid != InvalidPid);
    2364                 :             : 
    2365                 :           0 :                         QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
    2366                 :           0 :                         signalPids[count] = pid;
    2367                 :           0 :                         signalProcnos[count] = i;
    2368                 :           0 :                         count++;
    2369                 :           0 :                 }
    2370         [ #  # ]:           0 :         }
    2371                 :             : 
    2372                 :           4 :         LWLockRelease(NotifyQueueLock);
    2373                 :             : 
    2374                 :             :         /* Now send signals */
    2375         [ -  + ]:           4 :         for (int i = 0; i < count; i++)
    2376                 :             :         {
    2377                 :           0 :                 int32           pid = signalPids[i];
    2378                 :             : 
    2379                 :             :                 /*
    2380                 :             :                  * If we are signaling our own process, no need to involve the kernel;
    2381                 :             :                  * just set the flag directly.
    2382                 :             :                  */
    2383         [ #  # ]:           0 :                 if (pid == MyProcPid)
    2384                 :             :                 {
    2385                 :           0 :                         notifyInterruptPending = true;
    2386                 :           0 :                         continue;
    2387                 :             :                 }
    2388                 :             : 
    2389                 :             :                 /*
    2390                 :             :                  * Note: assuming things aren't broken, a signal failure here could
    2391                 :             :                  * only occur if the target backend exited since we released
    2392                 :             :                  * NotifyQueueLock; which is unlikely but certainly possible. So we
    2393                 :             :                  * just log a low-level debug message if it happens.
    2394                 :             :                  */
    2395         [ #  # ]:           0 :                 if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
    2396   [ #  #  #  # ]:           0 :                         elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
    2397         [ #  # ]:           0 :         }
    2398                 :           4 : }
    2399                 :             : 
    2400                 :             : /*
    2401                 :             :  * AtAbort_Notify
    2402                 :             :  *
    2403                 :             :  *      This is called at transaction abort.
    2404                 :             :  *
    2405                 :             :  *      Revert any staged listen/unlisten changes and clean up transaction state.
    2406                 :             :  *      This only does anything if we abort after PreCommit_Notify has staged
    2407                 :             :  *      some entries.
    2408                 :             :  */
    2409                 :             : void
    2410                 :        7016 : AtAbort_Notify(void)
    2411                 :             : {
    2412                 :             :         /* Revert staged listen/unlisten changes */
    2413                 :        7016 :         ApplyPendingListenActions(false);
    2414                 :             : 
    2415                 :             :         /* If we're no longer listening on anything, unregister */
    2416   [ -  +  #  #  :        7016 :         if (amRegisteredListener && LocalChannelTableIsEmpty())
                   #  # ]
    2417                 :           0 :                 asyncQueueUnregister();
    2418                 :             : 
    2419                 :             :         /* And clean up */
    2420                 :        7016 :         ClearPendingActionsAndNotifies();
    2421                 :        7016 : }
    2422                 :             : 
    2423                 :             : /*
    2424                 :             :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
    2425                 :             :  *
    2426                 :             :  * Reassign all items in the pending lists to the parent transaction.
    2427                 :             :  */
    2428                 :             : void
    2429                 :         482 : AtSubCommit_Notify(void)
    2430                 :             : {
    2431                 :         482 :         int                     my_level = GetCurrentTransactionNestLevel();
    2432                 :             : 
    2433                 :             :         /* If there are actions at our nesting level, we must reparent them. */
    2434   [ -  +  #  # ]:         482 :         if (pendingActions != NULL &&
    2435                 :           0 :                 pendingActions->nestingLevel >= my_level)
    2436                 :             :         {
    2437   [ #  #  #  # ]:           0 :                 if (pendingActions->upper == NULL ||
    2438                 :           0 :                         pendingActions->upper->nestingLevel < my_level - 1)
    2439                 :             :                 {
    2440                 :             :                         /* nothing to merge; give the whole thing to the parent */
    2441                 :           0 :                         --pendingActions->nestingLevel;
    2442                 :           0 :                 }
    2443                 :             :                 else
    2444                 :             :                 {
    2445                 :           0 :                         ActionList *childPendingActions = pendingActions;
    2446                 :             : 
    2447                 :           0 :                         pendingActions = pendingActions->upper;
    2448                 :             : 
    2449                 :             :                         /*
    2450                 :             :                          * Mustn't try to eliminate duplicates here --- see queue_listen()
    2451                 :             :                          */
    2452                 :           0 :                         pendingActions->actions =
    2453                 :           0 :                                 list_concat(pendingActions->actions,
    2454                 :           0 :                                                         childPendingActions->actions);
    2455                 :           0 :                         pfree(childPendingActions);
    2456                 :           0 :                 }
    2457                 :           0 :         }
    2458                 :             : 
    2459                 :             :         /* If there are notifies at our nesting level, we must reparent them. */
    2460   [ -  +  #  # ]:         482 :         if (pendingNotifies != NULL &&
    2461                 :           0 :                 pendingNotifies->nestingLevel >= my_level)
    2462                 :             :         {
    2463         [ #  # ]:           0 :                 Assert(pendingNotifies->nestingLevel == my_level);
    2464                 :             : 
    2465   [ #  #  #  # ]:           0 :                 if (pendingNotifies->upper == NULL ||
    2466                 :           0 :                         pendingNotifies->upper->nestingLevel < my_level - 1)
    2467                 :             :                 {
    2468                 :             :                         /* nothing to merge; give the whole thing to the parent */
    2469                 :           0 :                         --pendingNotifies->nestingLevel;
    2470                 :           0 :                 }
    2471                 :             :                 else
    2472                 :             :                 {
    2473                 :             :                         /*
    2474                 :             :                          * Formerly, we didn't bother to eliminate duplicates here, but
    2475                 :             :                          * now we must, else we fall foul of "Assert(!found)", either here
    2476                 :             :                          * or during a later attempt to build the parent-level hashtable.
    2477                 :             :                          */
    2478                 :           0 :                         NotificationList *childPendingNotifies = pendingNotifies;
    2479                 :           0 :                         ListCell   *l;
    2480                 :             : 
    2481                 :           0 :                         pendingNotifies = pendingNotifies->upper;
    2482                 :             :                         /* Insert all the subxact's events into parent, except for dups */
    2483   [ #  #  #  #  :           0 :                         foreach(l, childPendingNotifies->events)
                   #  # ]
    2484                 :             :                         {
    2485                 :           0 :                                 Notification *childn = (Notification *) lfirst(l);
    2486                 :             : 
    2487         [ #  # ]:           0 :                                 if (!AsyncExistsPendingNotify(childn))
    2488                 :           0 :                                         AddEventToPendingNotifies(childn);
    2489                 :           0 :                         }
    2490                 :           0 :                         pfree(childPendingNotifies);
    2491                 :           0 :                 }
    2492                 :           0 :         }
    2493                 :         482 : }
    2494                 :             : 
    2495                 :             : /*
    2496                 :             :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
    2497                 :             :  */
    2498                 :             : void
    2499                 :        1183 : AtSubAbort_Notify(void)
    2500                 :             : {
    2501                 :        1183 :         int                     my_level = GetCurrentTransactionNestLevel();
    2502                 :             : 
    2503                 :             :         /*
    2504                 :             :          * All we have to do is pop the stack --- the actions/notifies made in
    2505                 :             :          * this subxact are no longer interesting, and the space will be freed
    2506                 :             :          * when CurTransactionContext is recycled. We still have to free the
    2507                 :             :          * ActionList and NotificationList objects themselves, though, because
    2508                 :             :          * those are allocated in TopTransactionContext.
    2509                 :             :          *
    2510                 :             :          * Note that there might be no entries at all, or no entries for the
    2511                 :             :          * current subtransaction level, either because none were ever created, or
    2512                 :             :          * because we reentered this routine due to trouble during subxact abort.
    2513                 :             :          */
    2514   [ +  -  -  + ]:        1183 :         while (pendingActions != NULL &&
    2515                 :           0 :                    pendingActions->nestingLevel >= my_level)
    2516                 :             :         {
    2517                 :           0 :                 ActionList *childPendingActions = pendingActions;
    2518                 :             : 
    2519                 :           0 :                 pendingActions = pendingActions->upper;
    2520                 :           0 :                 pfree(childPendingActions);
    2521                 :           0 :         }
    2522                 :             : 
    2523   [ +  -  -  + ]:        1183 :         while (pendingNotifies != NULL &&
    2524                 :           0 :                    pendingNotifies->nestingLevel >= my_level)
    2525                 :             :         {
    2526                 :           0 :                 NotificationList *childPendingNotifies = pendingNotifies;
    2527                 :             : 
    2528                 :           0 :                 pendingNotifies = pendingNotifies->upper;
    2529                 :           0 :                 pfree(childPendingNotifies);
    2530                 :           0 :         }
    2531                 :        1183 : }
    2532                 :             : 
    2533                 :             : /*
    2534                 :             :  * HandleNotifyInterrupt
    2535                 :             :  *
    2536                 :             :  *              Signal handler portion of interrupt handling. Let the backend know
    2537                 :             :  *              that there's a pending notify interrupt. If we're currently reading
    2538                 :             :  *              from the client, this will interrupt the read and
    2539                 :             :  *              ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
    2540                 :             :  */
    2541                 :             : void
    2542                 :           0 : HandleNotifyInterrupt(void)
    2543                 :             : {
    2544                 :             :         /*
    2545                 :             :          * Note: this is called by a SIGNAL HANDLER. You must be very wary what
    2546                 :             :          * you do here.
    2547                 :             :          */
    2548                 :             : 
    2549                 :             :         /* signal that work needs to be done */
    2550                 :           0 :         notifyInterruptPending = true;
    2551                 :             : 
    2552                 :             :         /* make sure the event is processed in due course */
    2553                 :           0 :         SetLatch(MyLatch);
    2554                 :           0 : }
    2555                 :             : 
    2556                 :             : /*
    2557                 :             :  * ProcessNotifyInterrupt
    2558                 :             :  *
    2559                 :             :  *              This is called if we see notifyInterruptPending set, just before
    2560                 :             :  *              transmitting ReadyForQuery at the end of a frontend command, and
    2561                 :             :  *              also if a notify signal occurs while reading from the frontend.
    2562                 :             :  *              HandleNotifyInterrupt() will cause the read to be interrupted
    2563                 :             :  *              via the process's latch, and this routine will get called.
    2564                 :             :  *              If we are truly idle (ie, *not* inside a transaction block),
    2565                 :             :  *              process the incoming notifies.
    2566                 :             :  *
    2567                 :             :  *              If "flush" is true, force any frontend messages out immediately.
    2568                 :             :  *              This can be false when being called at the end of a frontend command,
    2569                 :             :  *              since we'll flush after sending ReadyForQuery.
    2570                 :             :  */
    2571                 :             : void
    2572                 :           0 : ProcessNotifyInterrupt(bool flush)
    2573                 :             : {
    2574         [ #  # ]:           0 :         if (IsTransactionOrTransactionBlock())
    2575                 :           0 :                 return;                                 /* not really idle */
    2576                 :             : 
    2577                 :             :         /* Loop in case another signal arrives while sending messages */
    2578         [ #  # ]:           0 :         while (notifyInterruptPending)
    2579                 :           0 :                 ProcessIncomingNotify(flush);
    2580                 :           0 : }
    2581                 :             : 
    2582                 :             : 
    2583                 :             : /*
    2584                 :             :  * Read all pending notifications from the queue, and deliver appropriate
    2585                 :             :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
    2586                 :             :  * notification.
    2587                 :             :  */
    2588                 :             : static void
    2589                 :           1 : asyncQueueReadAllNotifications(void)
    2590                 :             : {
    2591                 :           1 :         QueuePosition pos;
    2592                 :           1 :         QueuePosition head;
    2593                 :           1 :         Snapshot        snapshot;
    2594                 :             : 
    2595                 :             :         /*
    2596                 :             :          * Fetch current state, indicate to others that we have woken up, and that
    2597                 :             :          * we are in process of advancing our position.
    2598                 :             :          */
    2599                 :           1 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2600                 :             :         /* Assert checks that we have a valid state entry */
    2601         [ +  - ]:           1 :         Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
    2602                 :           1 :         QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    2603                 :           1 :         pos = QUEUE_BACKEND_POS(MyProcNumber);
    2604                 :           1 :         head = QUEUE_HEAD;
    2605                 :             : 
    2606   [ +  -  +  - ]:           1 :         if (QUEUE_POS_EQUAL(pos, head))
    2607                 :             :         {
    2608                 :             :                 /* Nothing to do, we have read all notifications already. */
    2609                 :           0 :                 LWLockRelease(NotifyQueueLock);
    2610                 :           0 :                 return;
    2611                 :             :         }
    2612                 :             : 
    2613                 :           1 :         QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
    2614                 :           1 :         LWLockRelease(NotifyQueueLock);
    2615                 :             : 
    2616                 :             :         /*----------
    2617                 :             :          * Get snapshot we'll use to decide which xacts are still in progress.
    2618                 :             :          * This is trickier than it might seem, because of race conditions.
    2619                 :             :          * Consider the following example:
    2620                 :             :          *
    2621                 :             :          * Backend 1:                                    Backend 2:
    2622                 :             :          *
    2623                 :             :          * transaction starts
    2624                 :             :          * UPDATE foo SET ...;
    2625                 :             :          * NOTIFY foo;
    2626                 :             :          * commit starts
    2627                 :             :          * queue the notify message
    2628                 :             :          *                                                               transaction starts
    2629                 :             :          *                                                               LISTEN foo;  -- first LISTEN in session
    2630                 :             :          *                                                               SELECT * FROM foo WHERE ...;
    2631                 :             :          * commit to clog
    2632                 :             :          *                                                               commit starts
    2633                 :             :          *                                                               add backend 2 to array of listeners
    2634                 :             :          *                                                               advance to queue head (this code)
    2635                 :             :          *                                                               commit to clog
    2636                 :             :          *
    2637                 :             :          * Transaction 2's SELECT has not seen the UPDATE's effects, since that
    2638                 :             :          * wasn't committed yet.  Ideally we'd ensure that client 2 would
    2639                 :             :          * eventually get transaction 1's notify message, but there's no way
    2640                 :             :          * to do that; until we're in the listener array, there's no guarantee
    2641                 :             :          * that the notify message doesn't get removed from the queue.
    2642                 :             :          *
    2643                 :             :          * Therefore the coding technique transaction 2 is using is unsafe:
    2644                 :             :          * applications must commit a LISTEN before inspecting database state,
    2645                 :             :          * if they want to ensure they will see notifications about subsequent
    2646                 :             :          * changes to that state.
    2647                 :             :          *
    2648                 :             :          * What we do guarantee is that we'll see all notifications from
    2649                 :             :          * transactions committing after the snapshot we take here.
    2650                 :             :          * BecomeRegisteredListener has already added us to the listener array,
    2651                 :             :          * so no not-yet-committed messages can be removed from the queue
    2652                 :             :          * before we see them.
    2653                 :             :          *----------
    2654                 :             :          */
    2655                 :           1 :         snapshot = RegisterSnapshot(GetLatestSnapshot());
    2656                 :             : 
    2657                 :             :         /*
    2658                 :             :          * It is possible that we fail while trying to send a message to our
    2659                 :             :          * frontend (for example, because of encoding conversion failure).  If
    2660                 :             :          * that happens it is critical that we not try to send the same message
    2661                 :             :          * over and over again.  Therefore, we set ExitOnAnyError to upgrade any
    2662                 :             :          * ERRORs to FATAL, causing the client connection to be closed on error.
    2663                 :             :          *
    2664                 :             :          * We used to only skip over the offending message and try to soldier on,
    2665                 :             :          * but it was somewhat questionable to lose a notification and give the
    2666                 :             :          * client an ERROR instead.  A client application is not be prepared for
    2667                 :             :          * that and can't tell that a notification was missed.  It was also not
    2668                 :             :          * very useful in practice because notifications are often processed while
    2669                 :             :          * a connection is idle and reading a message from the client, and in that
    2670                 :             :          * state, any error is upgraded to FATAL anyway.  Closing the connection
    2671                 :             :          * is a clear signal to the application that it might have missed
    2672                 :             :          * notifications.
    2673                 :             :          */
    2674                 :             :         {
    2675                 :           1 :                 bool            save_ExitOnAnyError = ExitOnAnyError;
    2676                 :           1 :                 bool            reachedStop;
    2677                 :             : 
    2678                 :           1 :                 ExitOnAnyError = true;
    2679                 :             : 
    2680                 :           1 :                 do
    2681                 :             :                 {
    2682                 :             :                         /*
    2683                 :             :                          * Process messages up to the stop position, end of page, or an
    2684                 :             :                          * uncommitted message.
    2685                 :             :                          *
    2686                 :             :                          * Our stop position is what we found to be the head's position
    2687                 :             :                          * when we entered this function. It might have changed already.
    2688                 :             :                          * But if it has, we will receive (or have already received and
    2689                 :             :                          * queued) another signal and come here again.
    2690                 :             :                          *
    2691                 :             :                          * We are not holding NotifyQueueLock here! The queue can only
    2692                 :             :                          * extend beyond the head pointer (see above) and we leave our
    2693                 :             :                          * backend's pointer where it is so nobody will truncate or
    2694                 :             :                          * rewrite pages under us. Especially we don't want to hold a lock
    2695                 :             :                          * while sending the notifications to the frontend.
    2696                 :             :                          */
    2697                 :           1 :                         reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
    2698         [ -  + ]:           1 :                 } while (!reachedStop);
    2699                 :             : 
    2700                 :             :                 /* Update shared state */
    2701                 :           1 :                 LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2702                 :           1 :                 QUEUE_BACKEND_POS(MyProcNumber) = pos;
    2703                 :           1 :                 QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    2704                 :           1 :                 LWLockRelease(NotifyQueueLock);
    2705                 :             : 
    2706                 :           1 :                 ExitOnAnyError = save_ExitOnAnyError;
    2707                 :           1 :         }
    2708                 :             : 
    2709                 :             :         /* Done with snapshot */
    2710                 :           1 :         UnregisterSnapshot(snapshot);
    2711         [ -  + ]:           1 : }
    2712                 :             : 
    2713                 :             : /*
    2714                 :             :  * Fetch notifications from the shared queue, beginning at position current,
    2715                 :             :  * and deliver relevant ones to my frontend.
    2716                 :             :  *
    2717                 :             :  * The function returns true once we have reached the stop position or an
    2718                 :             :  * uncommitted notification, and false if we have finished with the page.
    2719                 :             :  * In other words: once it returns true there is no need to look further.
    2720                 :             :  * The QueuePosition *current is advanced past all processed messages.
    2721                 :             :  */
    2722                 :             : static bool
    2723                 :           1 : asyncQueueProcessPageEntries(QueuePosition *current,
    2724                 :             :                                                          QueuePosition stop,
    2725                 :             :                                                          Snapshot snapshot)
    2726                 :             : {
    2727                 :           1 :         int64           curpage = QUEUE_POS_PAGE(*current);
    2728                 :           1 :         int                     slotno;
    2729                 :           1 :         char       *page_buffer;
    2730                 :           1 :         bool            reachedStop = false;
    2731                 :           1 :         bool            reachedEndOfPage;
    2732                 :             : 
    2733                 :             :         /*
    2734                 :             :          * We copy the entries into a local buffer to avoid holding the SLRU lock
    2735                 :             :          * while we transmit them to our frontend.  The local buffer must be
    2736                 :             :          * adequately aligned.
    2737                 :             :          */
    2738                 :           1 :         alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
    2739                 :           1 :         char       *local_buf_end = local_buf;
    2740                 :             : 
    2741                 :           1 :         slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
    2742                 :             :                                                                                 InvalidTransactionId);
    2743                 :           1 :         page_buffer = NotifyCtl->shared->page_buffer[slotno];
    2744                 :             : 
    2745                 :           1 :         do
    2746                 :             :         {
    2747                 :           5 :                 QueuePosition thisentry = *current;
    2748                 :           5 :                 AsyncQueueEntry *qe;
    2749                 :             : 
    2750   [ +  -  +  + ]:           5 :                 if (QUEUE_POS_EQUAL(thisentry, stop))
    2751                 :           1 :                         break;
    2752                 :             : 
    2753                 :           4 :                 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
    2754                 :             : 
    2755                 :             :                 /*
    2756                 :             :                  * Advance *current over this message, possibly to the next page. As
    2757                 :             :                  * noted in the comments for asyncQueueReadAllNotifications, we must
    2758                 :             :                  * do this before possibly failing while processing the message.
    2759                 :             :                  */
    2760                 :           4 :                 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
    2761                 :             : 
    2762                 :             :                 /* Ignore messages destined for other databases */
    2763         [ -  + ]:           4 :                 if (qe->dboid == MyDatabaseId)
    2764                 :             :                 {
    2765         [ -  + ]:           4 :                         if (XidInMVCCSnapshot(qe->xid, snapshot))
    2766                 :             :                         {
    2767                 :             :                                 /*
    2768                 :             :                                  * The source transaction is still in progress, so we can't
    2769                 :             :                                  * process this message yet.  Break out of the loop, but first
    2770                 :             :                                  * back up *current so we will reprocess the message next
    2771                 :             :                                  * time.  (Note: it is unlikely but not impossible for
    2772                 :             :                                  * TransactionIdDidCommit to fail, so we can't really avoid
    2773                 :             :                                  * this advance-then-back-up behavior when dealing with an
    2774                 :             :                                  * uncommitted message.)
    2775                 :             :                                  *
    2776                 :             :                                  * Note that we must test XidInMVCCSnapshot before we test
    2777                 :             :                                  * TransactionIdDidCommit, else we might return a message from
    2778                 :             :                                  * a transaction that is not yet visible to snapshots; compare
    2779                 :             :                                  * the comments at the head of heapam_visibility.c.
    2780                 :             :                                  *
    2781                 :             :                                  * Also, while our own xact won't be listed in the snapshot,
    2782                 :             :                                  * we need not check for TransactionIdIsCurrentTransactionId
    2783                 :             :                                  * because our transaction cannot (yet) have queued any
    2784                 :             :                                  * messages.
    2785                 :             :                                  */
    2786                 :           0 :                                 *current = thisentry;
    2787                 :           0 :                                 reachedStop = true;
    2788                 :           0 :                                 break;
    2789                 :             :                         }
    2790                 :             : 
    2791                 :             :                         /*
    2792                 :             :                          * Quick check for the case that we're not listening on any
    2793                 :             :                          * channels, before calling TransactionIdDidCommit().  This makes
    2794                 :             :                          * that case a little faster, but more importantly, it ensures
    2795                 :             :                          * that if there's a bad entry in the queue for which
    2796                 :             :                          * TransactionIdDidCommit() fails for some reason, we can skip
    2797                 :             :                          * over it on the first LISTEN in a session, and not get stuck on
    2798                 :             :                          * it indefinitely.  (This is a little trickier than it looks: it
    2799                 :             :                          * works because BecomeRegisteredListener runs this code before we
    2800                 :             :                          * have made the first entry in localChannelTable.)
    2801                 :             :                          */
    2802   [ +  -  +  - ]:           4 :                         if (LocalChannelTableIsEmpty())
    2803                 :           4 :                                 continue;
    2804                 :             : 
    2805         [ #  # ]:           0 :                         if (TransactionIdDidCommit(qe->xid))
    2806                 :             :                         {
    2807                 :           0 :                                 memcpy(local_buf_end, qe, qe->length);
    2808                 :           0 :                                 local_buf_end += qe->length;
    2809                 :           0 :                         }
    2810                 :             :                         else
    2811                 :             :                         {
    2812                 :             :                                 /*
    2813                 :             :                                  * The source transaction aborted or crashed, so we just
    2814                 :             :                                  * ignore its notifications.
    2815                 :             :                                  */
    2816                 :             :                         }
    2817                 :           0 :                 }
    2818                 :             : 
    2819                 :             :                 /* Loop back if we're not at end of page */
    2820   [ -  +  -  +  :           5 :         } while (!reachedEndOfPage);
                      + ]
    2821                 :             : 
    2822                 :             :         /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
    2823                 :           1 :         LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2824                 :             : 
    2825                 :             :         /*
    2826                 :             :          * Now that we have let go of the SLRU bank lock, send the notifications
    2827                 :             :          * to our backend
    2828                 :             :          */
    2829         [ +  - ]:           1 :         Assert(local_buf_end - local_buf <= BLCKSZ);
    2830         [ -  + ]:           1 :         for (char *p = local_buf; p < local_buf_end;)
    2831                 :             :         {
    2832                 :           0 :                 AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
    2833                 :             : 
    2834                 :             :                 /* qe->data is the null-terminated channel name */
    2835                 :           0 :                 char       *channel = qe->data;
    2836                 :             : 
    2837         [ #  # ]:           0 :                 if (IsListeningOn(channel))
    2838                 :             :                 {
    2839                 :             :                         /* payload follows channel name */
    2840                 :           0 :                         char       *payload = qe->data + strlen(channel) + 1;
    2841                 :             : 
    2842                 :           0 :                         NotifyMyFrontEnd(channel, payload, qe->srcPid);
    2843                 :           0 :                 }
    2844                 :             : 
    2845                 :           0 :                 p += qe->length;
    2846                 :           0 :         }
    2847                 :             : 
    2848   [ +  -  -  + ]:           1 :         if (QUEUE_POS_EQUAL(*current, stop))
    2849                 :           1 :                 reachedStop = true;
    2850                 :             : 
    2851                 :           2 :         return reachedStop;
    2852                 :           1 : }
    2853                 :             : 
    2854                 :             : /*
    2855                 :             :  * Advance the shared queue tail variable to the minimum of all the
    2856                 :             :  * per-backend tail pointers.  Truncate pg_notify space if possible.
    2857                 :             :  *
    2858                 :             :  * This is (usually) called during CommitTransaction(), so it's important for
    2859                 :             :  * it to have very low probability of failure.
    2860                 :             :  */
    2861                 :             : static void
    2862                 :           1 : asyncQueueAdvanceTail(void)
    2863                 :             : {
    2864                 :           1 :         QueuePosition min;
    2865                 :           1 :         int64           oldtailpage;
    2866                 :           1 :         int64           newtailpage;
    2867                 :           1 :         int64           boundary;
    2868                 :             : 
    2869                 :             :         /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
    2870                 :           1 :         LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
    2871                 :             : 
    2872                 :             :         /*
    2873                 :             :          * Compute the new tail.  Pre-v13, it's essential that QUEUE_TAIL be exact
    2874                 :             :          * (ie, exactly match at least one backend's queue position), so it must
    2875                 :             :          * be updated atomically with the actual computation.  Since v13, we could
    2876                 :             :          * get away with not doing it like that, but it seems prudent to keep it
    2877                 :             :          * so.
    2878                 :             :          *
    2879                 :             :          * Also, because incoming backends will scan forward from QUEUE_TAIL, that
    2880                 :             :          * must be advanced before we can truncate any data.  Thus, QUEUE_TAIL is
    2881                 :             :          * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
    2882                 :             :          * un-truncated page.  When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
    2883                 :             :          * there are pages we can truncate but haven't yet finished doing so.
    2884                 :             :          *
    2885                 :             :          * For concurrency's sake, we don't want to hold NotifyQueueLock while
    2886                 :             :          * performing SimpleLruTruncate.  This is OK because no backend will try
    2887                 :             :          * to access the pages we are in the midst of truncating.
    2888                 :             :          */
    2889                 :           1 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2890                 :           1 :         min = QUEUE_HEAD;
    2891         [ +  - ]:           1 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2892                 :             :         {
    2893         [ #  # ]:           0 :                 Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2894   [ #  #  #  #  :           0 :                 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
                   #  # ]
    2895                 :           0 :         }
    2896                 :           1 :         QUEUE_TAIL = min;
    2897                 :           1 :         oldtailpage = QUEUE_STOP_PAGE;
    2898                 :           1 :         LWLockRelease(NotifyQueueLock);
    2899                 :             : 
    2900                 :             :         /*
    2901                 :             :          * We can truncate something if the global tail advanced across an SLRU
    2902                 :             :          * segment boundary.
    2903                 :             :          *
    2904                 :             :          * XXX it might be better to truncate only once every several segments, to
    2905                 :             :          * reduce the number of directory scans.
    2906                 :             :          */
    2907                 :           1 :         newtailpage = QUEUE_POS_PAGE(min);
    2908                 :           1 :         boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
    2909         [ +  - ]:           1 :         if (asyncQueuePagePrecedes(oldtailpage, boundary))
    2910                 :             :         {
    2911                 :             :                 /*
    2912                 :             :                  * SimpleLruTruncate() will ask for SLRU bank locks but will also
    2913                 :             :                  * release the lock again.
    2914                 :             :                  */
    2915                 :           0 :                 SimpleLruTruncate(NotifyCtl, newtailpage);
    2916                 :             : 
    2917                 :           0 :                 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2918                 :           0 :                 QUEUE_STOP_PAGE = newtailpage;
    2919                 :           0 :                 LWLockRelease(NotifyQueueLock);
    2920                 :           0 :         }
    2921                 :             : 
    2922                 :           1 :         LWLockRelease(NotifyQueueTailLock);
    2923                 :           1 : }
    2924                 :             : 
    2925                 :             : /*
    2926                 :             :  * AsyncNotifyFreezeXids
    2927                 :             :  *
    2928                 :             :  * Prepare the async notification queue for CLOG truncation by freezing
    2929                 :             :  * transaction IDs that are about to become inaccessible.
    2930                 :             :  *
    2931                 :             :  * This function is called by VACUUM before advancing datfrozenxid. It scans
    2932                 :             :  * the notification queue and replaces XIDs that would become inaccessible
    2933                 :             :  * after CLOG truncation with special markers:
    2934                 :             :  * - Committed transactions are set to FrozenTransactionId
    2935                 :             :  * - Aborted/crashed transactions are set to InvalidTransactionId
    2936                 :             :  *
    2937                 :             :  * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
    2938                 :             :  * pages will be truncated. If XID < newFrozenXid, it cannot still be running
    2939                 :             :  * (or it would have held back newFrozenXid through ProcArray).
    2940                 :             :  * Therefore, if TransactionIdDidCommit returns false, we know the transaction
    2941                 :             :  * either aborted explicitly or crashed, and we can safely mark it invalid.
    2942                 :             :  */
    2943                 :             : void
    2944                 :           2 : AsyncNotifyFreezeXids(TransactionId newFrozenXid)
    2945                 :             : {
    2946                 :           2 :         QueuePosition pos;
    2947                 :           2 :         QueuePosition head;
    2948                 :           2 :         int64           curpage = -1;
    2949                 :           2 :         int                     slotno = -1;
    2950                 :           2 :         char       *page_buffer = NULL;
    2951                 :           2 :         bool            page_dirty = false;
    2952                 :             : 
    2953                 :             :         /*
    2954                 :             :          * Acquire locks in the correct order to avoid deadlocks. As per the
    2955                 :             :          * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
    2956                 :             :          * bank locks.
    2957                 :             :          *
    2958                 :             :          * We only need SHARED mode since we're just reading the head/tail
    2959                 :             :          * positions, not modifying them.
    2960                 :             :          */
    2961                 :           2 :         LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
    2962                 :           2 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2963                 :             : 
    2964                 :           2 :         pos = QUEUE_TAIL;
    2965                 :           2 :         head = QUEUE_HEAD;
    2966                 :             : 
    2967                 :             :         /* Release NotifyQueueLock early, we only needed to read the positions */
    2968                 :           2 :         LWLockRelease(NotifyQueueLock);
    2969                 :             : 
    2970                 :             :         /*
    2971                 :             :          * Scan the queue from tail to head, freezing XIDs as needed. We hold
    2972                 :             :          * NotifyQueueTailLock throughout to ensure the tail doesn't move while
    2973                 :             :          * we're working.
    2974                 :             :          */
    2975   [ -  +  +  - ]:           2 :         while (!QUEUE_POS_EQUAL(pos, head))
    2976                 :             :         {
    2977                 :           0 :                 AsyncQueueEntry *qe;
    2978                 :           0 :                 TransactionId xid;
    2979                 :           0 :                 int64           pageno = QUEUE_POS_PAGE(pos);
    2980                 :           0 :                 int                     offset = QUEUE_POS_OFFSET(pos);
    2981                 :             : 
    2982                 :             :                 /* If we need a different page, release old lock and get new one */
    2983         [ #  # ]:           0 :                 if (pageno != curpage)
    2984                 :             :                 {
    2985                 :           0 :                         LWLock     *lock;
    2986                 :             : 
    2987                 :             :                         /* Release previous page if any */
    2988         [ #  # ]:           0 :                         if (slotno >= 0)
    2989                 :             :                         {
    2990         [ #  # ]:           0 :                                 if (page_dirty)
    2991                 :             :                                 {
    2992                 :           0 :                                         NotifyCtl->shared->page_dirty[slotno] = true;
    2993                 :           0 :                                         page_dirty = false;
    2994                 :           0 :                                 }
    2995                 :           0 :                                 LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2996                 :           0 :                         }
    2997                 :             : 
    2998                 :           0 :                         lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2999                 :           0 :                         LWLockAcquire(lock, LW_EXCLUSIVE);
    3000                 :           0 :                         slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    3001                 :             :                                                                            InvalidTransactionId);
    3002                 :           0 :                         page_buffer = NotifyCtl->shared->page_buffer[slotno];
    3003                 :           0 :                         curpage = pageno;
    3004                 :           0 :                 }
    3005                 :             : 
    3006                 :           0 :                 qe = (AsyncQueueEntry *) (page_buffer + offset);
    3007                 :           0 :                 xid = qe->xid;
    3008                 :             : 
    3009   [ #  #  #  # ]:           0 :                 if (TransactionIdIsNormal(xid) &&
    3010                 :           0 :                         TransactionIdPrecedes(xid, newFrozenXid))
    3011                 :             :                 {
    3012         [ #  # ]:           0 :                         if (TransactionIdDidCommit(xid))
    3013                 :             :                         {
    3014                 :           0 :                                 qe->xid = FrozenTransactionId;
    3015                 :           0 :                                 page_dirty = true;
    3016                 :           0 :                         }
    3017                 :             :                         else
    3018                 :             :                         {
    3019                 :           0 :                                 qe->xid = InvalidTransactionId;
    3020                 :           0 :                                 page_dirty = true;
    3021                 :             :                         }
    3022                 :           0 :                 }
    3023                 :             : 
    3024                 :             :                 /* Advance to next entry */
    3025                 :           0 :                 asyncQueueAdvance(&pos, qe->length);
    3026                 :           0 :         }
    3027                 :             : 
    3028                 :             :         /* Release final page lock if we acquired one */
    3029         [ +  - ]:           2 :         if (slotno >= 0)
    3030                 :             :         {
    3031         [ #  # ]:           0 :                 if (page_dirty)
    3032                 :           0 :                         NotifyCtl->shared->page_dirty[slotno] = true;
    3033                 :           0 :                 LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    3034                 :           0 :         }
    3035                 :             : 
    3036                 :           2 :         LWLockRelease(NotifyQueueTailLock);
    3037                 :           2 : }
    3038                 :             : 
    3039                 :             : /*
    3040                 :             :  * ProcessIncomingNotify
    3041                 :             :  *
    3042                 :             :  *              Scan the queue for arriving notifications and report them to the front
    3043                 :             :  *              end.  The notifications might be from other sessions, or our own;
    3044                 :             :  *              there's no need to distinguish here.
    3045                 :             :  *
    3046                 :             :  *              If "flush" is true, force any frontend messages out immediately.
    3047                 :             :  *
    3048                 :             :  *              NOTE: since we are outside any transaction, we must create our own.
    3049                 :             :  */
    3050                 :             : static void
    3051                 :           0 : ProcessIncomingNotify(bool flush)
    3052                 :             : {
    3053                 :             :         /* We *must* reset the flag */
    3054                 :           0 :         notifyInterruptPending = false;
    3055                 :             : 
    3056                 :             :         /* Do nothing else if we aren't actively listening */
    3057   [ #  #  #  # ]:           0 :         if (LocalChannelTableIsEmpty())
    3058                 :           0 :                 return;
    3059                 :             : 
    3060         [ #  # ]:           0 :         if (Trace_notify)
    3061   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "ProcessIncomingNotify");
    3062                 :             : 
    3063                 :           0 :         set_ps_display("notify interrupt");
    3064                 :             : 
    3065                 :             :         /*
    3066                 :             :          * We must run asyncQueueReadAllNotifications inside a transaction, else
    3067                 :             :          * bad things happen if it gets an error.
    3068                 :             :          */
    3069                 :           0 :         StartTransactionCommand();
    3070                 :             : 
    3071                 :           0 :         asyncQueueReadAllNotifications();
    3072                 :             : 
    3073                 :           0 :         CommitTransactionCommand();
    3074                 :             : 
    3075                 :             :         /*
    3076                 :             :          * If this isn't an end-of-command case, we must flush the notify messages
    3077                 :             :          * to ensure frontend gets them promptly.
    3078                 :             :          */
    3079         [ #  # ]:           0 :         if (flush)
    3080                 :           0 :                 pq_flush();
    3081                 :             : 
    3082                 :           0 :         set_ps_display("idle");
    3083                 :             : 
    3084         [ #  # ]:           0 :         if (Trace_notify)
    3085   [ #  #  #  # ]:           0 :                 elog(DEBUG1, "ProcessIncomingNotify: done");
    3086                 :           0 : }
    3087                 :             : 
    3088                 :             : /*
    3089                 :             :  * Send NOTIFY message to my front end.
    3090                 :             :  */
    3091                 :             : void
    3092                 :           0 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
    3093                 :             : {
    3094         [ #  # ]:           0 :         if (whereToSendOutput == DestRemote)
    3095                 :             :         {
    3096                 :           0 :                 StringInfoData buf;
    3097                 :             : 
    3098                 :           0 :                 pq_beginmessage(&buf, PqMsg_NotificationResponse);
    3099                 :           0 :                 pq_sendint32(&buf, srcPid);
    3100                 :           0 :                 pq_sendstring(&buf, channel);
    3101                 :           0 :                 pq_sendstring(&buf, payload);
    3102                 :           0 :                 pq_endmessage(&buf);
    3103                 :             : 
    3104                 :             :                 /*
    3105                 :             :                  * NOTE: we do not do pq_flush() here.  Some level of caller will
    3106                 :             :                  * handle it later, allowing this message to be combined into a packet
    3107                 :             :                  * with other ones.
    3108                 :             :                  */
    3109                 :           0 :         }
    3110                 :             :         else
    3111   [ #  #  #  # ]:           0 :                 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
    3112                 :           0 : }
    3113                 :             : 
    3114                 :             : /* Does pendingNotifies include a match for the given event? */
    3115                 :             : static bool
    3116                 :           0 : AsyncExistsPendingNotify(Notification *n)
    3117                 :             : {
    3118         [ #  # ]:           0 :         if (pendingNotifies == NULL)
    3119                 :           0 :                 return false;
    3120                 :             : 
    3121         [ #  # ]:           0 :         if (pendingNotifies->hashtab != NULL)
    3122                 :             :         {
    3123                 :             :                 /* Use the hash table to probe for a match */
    3124         [ #  # ]:           0 :                 if (hash_search(pendingNotifies->hashtab,
    3125                 :             :                                                 &n,
    3126                 :             :                                                 HASH_FIND,
    3127                 :             :                                                 NULL))
    3128                 :           0 :                         return true;
    3129                 :           0 :         }
    3130                 :             :         else
    3131                 :             :         {
    3132                 :             :                 /* Must scan the event list */
    3133                 :           0 :                 ListCell   *l;
    3134                 :             : 
    3135   [ #  #  #  #  :           0 :                 foreach(l, pendingNotifies->events)
             #  #  #  # ]
    3136                 :             :                 {
    3137                 :           0 :                         Notification *oldn = (Notification *) lfirst(l);
    3138                 :             : 
    3139         [ #  # ]:           0 :                         if (n->channel_len == oldn->channel_len &&
    3140   [ #  #  #  # ]:           0 :                                 n->payload_len == oldn->payload_len &&
    3141                 :           0 :                                 memcmp(n->data, oldn->data,
    3142                 :           0 :                                            n->channel_len + n->payload_len + 2) == 0)
    3143                 :           0 :                                 return true;
    3144         [ #  # ]:           0 :                 }
    3145      [ #  #  # ]:           0 :         }
    3146                 :             : 
    3147                 :           0 :         return false;
    3148                 :           0 : }
    3149                 :             : 
    3150                 :             : /*
    3151                 :             :  * Add a notification event to a pre-existing pendingNotifies list.
    3152                 :             :  *
    3153                 :             :  * Because pendingNotifies->events is already nonempty, this works
    3154                 :             :  * correctly no matter what CurrentMemoryContext is.
    3155                 :             :  */
    3156                 :             : static void
    3157                 :           0 : AddEventToPendingNotifies(Notification *n)
    3158                 :             : {
    3159         [ #  # ]:           0 :         Assert(pendingNotifies->events != NIL);
    3160                 :             : 
    3161                 :             :         /* Create the hash tables if it's time to */
    3162   [ #  #  #  # ]:           0 :         if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
    3163                 :           0 :                 pendingNotifies->hashtab == NULL)
    3164                 :             :         {
    3165                 :           0 :                 HASHCTL         hash_ctl;
    3166                 :           0 :                 ListCell   *l;
    3167                 :             : 
    3168                 :             :                 /* Create the hash table */
    3169                 :           0 :                 hash_ctl.keysize = sizeof(Notification *);
    3170                 :           0 :                 hash_ctl.entrysize = sizeof(struct NotificationHash);
    3171                 :           0 :                 hash_ctl.hash = notification_hash;
    3172                 :           0 :                 hash_ctl.match = notification_match;
    3173                 :           0 :                 hash_ctl.hcxt = CurTransactionContext;
    3174                 :           0 :                 pendingNotifies->hashtab =
    3175                 :           0 :                         hash_create("Pending Notifies",
    3176                 :             :                                                 256L,
    3177                 :             :                                                 &hash_ctl,
    3178                 :             :                                                 HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
    3179                 :             : 
    3180                 :             :                 /* Create the unique channel name table */
    3181         [ #  # ]:           0 :                 Assert(pendingNotifies->uniqueChannelHash == NULL);
    3182                 :           0 :                 hash_ctl.keysize = NAMEDATALEN;
    3183                 :           0 :                 hash_ctl.entrysize = sizeof(ChannelName);
    3184                 :           0 :                 hash_ctl.hcxt = CurTransactionContext;
    3185                 :           0 :                 pendingNotifies->uniqueChannelHash =
    3186                 :           0 :                         hash_create("Pending Notify Channel Names",
    3187                 :             :                                                 64L,
    3188                 :             :                                                 &hash_ctl,
    3189                 :             :                                                 HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
    3190                 :             : 
    3191                 :             :                 /* Insert all the already-existing events */
    3192   [ #  #  #  #  :           0 :                 foreach(l, pendingNotifies->events)
                   #  # ]
    3193                 :             :                 {
    3194                 :           0 :                         Notification *oldn = (Notification *) lfirst(l);
    3195                 :           0 :                         char       *channel = oldn->data;
    3196                 :           0 :                         bool            found;
    3197                 :             : 
    3198                 :           0 :                         (void) hash_search(pendingNotifies->hashtab,
    3199                 :             :                                                            &oldn,
    3200                 :             :                                                            HASH_ENTER,
    3201                 :             :                                                            &found);
    3202         [ #  # ]:           0 :                         Assert(!found);
    3203                 :             : 
    3204                 :             :                         /* Add channel name to uniqueChannelHash; might be there already */
    3205                 :           0 :                         (void) hash_search(pendingNotifies->uniqueChannelHash,
    3206                 :           0 :                                                            channel,
    3207                 :             :                                                            HASH_ENTER,
    3208                 :             :                                                            NULL);
    3209                 :           0 :                 }
    3210                 :           0 :         }
    3211                 :             : 
    3212                 :             :         /* Add new event to the list, in order */
    3213                 :           0 :         pendingNotifies->events = lappend(pendingNotifies->events, n);
    3214                 :             : 
    3215                 :             :         /* Add event to the hash tables if needed */
    3216         [ #  # ]:           0 :         if (pendingNotifies->hashtab != NULL)
    3217                 :             :         {
    3218                 :           0 :                 char       *channel = n->data;
    3219                 :           0 :                 bool            found;
    3220                 :             : 
    3221                 :           0 :                 (void) hash_search(pendingNotifies->hashtab,
    3222                 :             :                                                    &n,
    3223                 :             :                                                    HASH_ENTER,
    3224                 :             :                                                    &found);
    3225         [ #  # ]:           0 :                 Assert(!found);
    3226                 :             : 
    3227                 :             :                 /* Add channel name to uniqueChannelHash; might be there already */
    3228                 :           0 :                 (void) hash_search(pendingNotifies->uniqueChannelHash,
    3229                 :           0 :                                                    channel,
    3230                 :             :                                                    HASH_ENTER,
    3231                 :             :                                                    NULL);
    3232                 :           0 :         }
    3233                 :           0 : }
    3234                 :             : 
    3235                 :             : /*
    3236                 :             :  * notification_hash: hash function for notification hash table
    3237                 :             :  *
    3238                 :             :  * The hash "keys" are pointers to Notification structs.
    3239                 :             :  */
    3240                 :             : static uint32
    3241                 :           0 : notification_hash(const void *key, Size keysize)
    3242                 :             : {
    3243                 :           0 :         const Notification *k = *(const Notification *const *) key;
    3244                 :             : 
    3245         [ #  # ]:           0 :         Assert(keysize == sizeof(Notification *));
    3246                 :             :         /* We don't bother to include the payload's trailing null in the hash */
    3247                 :           0 :         return DatumGetUInt32(hash_any((const unsigned char *) k->data,
    3248                 :           0 :                                                                    k->channel_len + k->payload_len + 1));
    3249                 :           0 : }
    3250                 :             : 
    3251                 :             : /*
    3252                 :             :  * notification_match: match function to use with notification_hash
    3253                 :             :  */
    3254                 :             : static int
    3255                 :           0 : notification_match(const void *key1, const void *key2, Size keysize)
    3256                 :             : {
    3257                 :           0 :         const Notification *k1 = *(const Notification *const *) key1;
    3258                 :           0 :         const Notification *k2 = *(const Notification *const *) key2;
    3259                 :             : 
    3260         [ #  # ]:           0 :         Assert(keysize == sizeof(Notification *));
    3261         [ #  # ]:           0 :         if (k1->channel_len == k2->channel_len &&
    3262   [ #  #  #  # ]:           0 :                 k1->payload_len == k2->payload_len &&
    3263                 :           0 :                 memcmp(k1->data, k2->data,
    3264                 :           0 :                            k1->channel_len + k1->payload_len + 2) == 0)
    3265                 :           0 :                 return 0;                               /* equal */
    3266                 :           0 :         return 1;                                       /* not equal */
    3267                 :           0 : }
    3268                 :             : 
    3269                 :             : /* Clear the pendingActions and pendingNotifies lists. */
    3270                 :             : static void
    3271                 :        7025 : ClearPendingActionsAndNotifies(void)
    3272                 :             : {
    3273                 :             :         /*
    3274                 :             :          * Everything's allocated in either TopTransactionContext or the context
    3275                 :             :          * for the subtransaction to which it corresponds.  So, there's nothing to
    3276                 :             :          * do here except reset the pointers; the space will be reclaimed when the
    3277                 :             :          * contexts are deleted.
    3278                 :             :          */
    3279                 :        7025 :         pendingActions = NULL;
    3280                 :        7025 :         pendingNotifies = NULL;
    3281                 :             :         /* Also clear pendingListenActions, which is derived from pendingActions */
    3282                 :        7025 :         pendingListenActions = NULL;
    3283                 :        7025 : }
    3284                 :             : 
    3285                 :             : /*
    3286                 :             :  * GUC check_hook for notify_buffers
    3287                 :             :  */
    3288                 :             : bool
    3289                 :           6 : check_notify_buffers(int *newval, void **extra, GucSource source)
    3290                 :             : {
    3291                 :           6 :         return check_slru_buffers("notify_buffers", newval);
    3292                 :             : }
        

Generated by: LCOV version 2.3.2-1