LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogprefetcher.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 56.0 % 352 197
Test Date: 2026-01-26 10:56:24 Functions: 83.3 % 24 20
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 23.7 % 173 41

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * xlogprefetcher.c
       4                 :             :  *              Prefetching support for recovery.
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 2022-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  *
      10                 :             :  * IDENTIFICATION
      11                 :             :  *              src/backend/access/transam/xlogprefetcher.c
      12                 :             :  *
      13                 :             :  * This module provides a drop-in replacement for an XLogReader that tries to
      14                 :             :  * minimize I/O stalls by looking ahead in the WAL.  If blocks that will be
      15                 :             :  * accessed in the near future are not already in the buffer pool, it initiates
      16                 :             :  * I/Os that might complete before the caller eventually needs the data.  When
      17                 :             :  * referenced blocks are found in the buffer pool already, the buffer is
      18                 :             :  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
      19                 :             :  * avoid a second buffer mapping table lookup.
      20                 :             :  *
      21                 :             :  * Currently, only the main fork is considered for prefetching.  Currently,
      22                 :             :  * prefetching is only effective on systems where PrefetchBuffer() does
      23                 :             :  * something useful (mainly Linux).
      24                 :             :  *
      25                 :             :  *-------------------------------------------------------------------------
      26                 :             :  */
      27                 :             : 
      28                 :             : #include "postgres.h"
      29                 :             : 
      30                 :             : #include "access/xlogprefetcher.h"
      31                 :             : #include "access/xlogreader.h"
      32                 :             : #include "catalog/pg_control.h"
      33                 :             : #include "catalog/storage_xlog.h"
      34                 :             : #include "commands/dbcommands_xlog.h"
      35                 :             : #include "funcapi.h"
      36                 :             : #include "miscadmin.h"
      37                 :             : #include "port/atomics.h"
      38                 :             : #include "storage/bufmgr.h"
      39                 :             : #include "storage/shmem.h"
      40                 :             : #include "storage/smgr.h"
      41                 :             : #include "utils/fmgrprotos.h"
      42                 :             : #include "utils/guc_hooks.h"
      43                 :             : #include "utils/hsearch.h"
      44                 :             : #include "utils/timestamp.h"
      45                 :             : 
      46                 :             : /*
      47                 :             :  * Every time we process this much WAL, we'll update the values in
      48                 :             :  * pg_stat_recovery_prefetch.
      49                 :             :  */
      50                 :             : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
      51                 :             : 
      52                 :             : /*
      53                 :             :  * To detect repeated access to the same block and skip useless extra system
      54                 :             :  * calls, we remember a small window of recently prefetched blocks.
      55                 :             :  */
      56                 :             : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
      57                 :             : 
      58                 :             : /*
      59                 :             :  * When maintenance_io_concurrency is not saturated, we're prepared to look
      60                 :             :  * ahead up to N times that number of block references.
      61                 :             :  */
      62                 :             : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
      63                 :             : 
      64                 :             : /* Define to log internal debugging messages. */
      65                 :             : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
      66                 :             : 
      67                 :             : /* GUCs */
      68                 :             : int                     recovery_prefetch = RECOVERY_PREFETCH_TRY;
      69                 :             : 
      70                 :             : #ifdef USE_PREFETCH
      71                 :             : #define RecoveryPrefetchEnabled() \
      72                 :             :                 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
      73                 :             :                  maintenance_io_concurrency > 0)
      74                 :             : #else
      75                 :             : #define RecoveryPrefetchEnabled() false
      76                 :             : #endif
      77                 :             : 
      78                 :             : static int      XLogPrefetchReconfigureCount = 0;
      79                 :             : 
      80                 :             : /*
      81                 :             :  * Enum used to report whether an IO should be started.
      82                 :             :  */
      83                 :             : typedef enum
      84                 :             : {
      85                 :             :         LRQ_NEXT_NO_IO,
      86                 :             :         LRQ_NEXT_IO,
      87                 :             :         LRQ_NEXT_AGAIN,
      88                 :             : } LsnReadQueueNextStatus;
      89                 :             : 
      90                 :             : /*
      91                 :             :  * Type of callback that can decide which block to prefetch next.  For now
      92                 :             :  * there is only one.
      93                 :             :  */
      94                 :             : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
      95                 :             :                                                                                                            XLogRecPtr *lsn);
      96                 :             : 
      97                 :             : /*
      98                 :             :  * A simple circular queue of LSNs, using to control the number of
      99                 :             :  * (potentially) inflight IOs.  This stands in for a later more general IO
     100                 :             :  * control mechanism, which is why it has the apparently unnecessary
     101                 :             :  * indirection through a function pointer.
     102                 :             :  */
     103                 :             : typedef struct LsnReadQueue
     104                 :             : {
     105                 :             :         LsnReadQueueNextFun next;
     106                 :             :         uintptr_t       lrq_private;
     107                 :             :         uint32          max_inflight;
     108                 :             :         uint32          inflight;
     109                 :             :         uint32          completed;
     110                 :             :         uint32          head;
     111                 :             :         uint32          tail;
     112                 :             :         uint32          size;
     113                 :             :         struct
     114                 :             :         {
     115                 :             :                 bool            io;
     116                 :             :                 XLogRecPtr      lsn;
     117                 :             :         }                       queue[FLEXIBLE_ARRAY_MEMBER];
     118                 :             : } LsnReadQueue;
     119                 :             : 
     120                 :             : /*
     121                 :             :  * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
     122                 :             :  * blocks that will be soon be referenced, to try to avoid IO stalls.
     123                 :             :  */
     124                 :             : struct XLogPrefetcher
     125                 :             : {
     126                 :             :         /* WAL reader and current reading state. */
     127                 :             :         XLogReaderState *reader;
     128                 :             :         DecodedXLogRecord *record;
     129                 :             :         int                     next_block_id;
     130                 :             : 
     131                 :             :         /* When to publish stats. */
     132                 :             :         XLogRecPtr      next_stats_shm_lsn;
     133                 :             : 
     134                 :             :         /* Book-keeping to avoid accessing blocks that don't exist yet. */
     135                 :             :         HTAB       *filter_table;
     136                 :             :         dlist_head      filter_queue;
     137                 :             : 
     138                 :             :         /* Book-keeping to avoid repeat prefetches. */
     139                 :             :         RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     140                 :             :         BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     141                 :             :         int                     recent_idx;
     142                 :             : 
     143                 :             :         /* Book-keeping to disable prefetching temporarily. */
     144                 :             :         XLogRecPtr      no_readahead_until;
     145                 :             : 
     146                 :             :         /* IO depth manager. */
     147                 :             :         LsnReadQueue *streaming_read;
     148                 :             : 
     149                 :             :         XLogRecPtr      begin_ptr;
     150                 :             : 
     151                 :             :         int                     reconfigure_count;
     152                 :             : };
     153                 :             : 
     154                 :             : /*
     155                 :             :  * A temporary filter used to track block ranges that haven't been created
     156                 :             :  * yet, whole relations that haven't been created yet, and whole relations
     157                 :             :  * that (we assume) have already been dropped, or will be created by bulk WAL
     158                 :             :  * operators.
     159                 :             :  */
     160                 :             : typedef struct XLogPrefetcherFilter
     161                 :             : {
     162                 :             :         RelFileLocator rlocator;
     163                 :             :         XLogRecPtr      filter_until_replayed;
     164                 :             :         BlockNumber filter_from_block;
     165                 :             :         dlist_node      link;
     166                 :             : } XLogPrefetcherFilter;
     167                 :             : 
     168                 :             : /*
     169                 :             :  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
     170                 :             :  */
     171                 :             : typedef struct XLogPrefetchStats
     172                 :             : {
     173                 :             :         pg_atomic_uint64 reset_time;    /* Time of last reset. */
     174                 :             :         pg_atomic_uint64 prefetch;      /* Prefetches initiated. */
     175                 :             :         pg_atomic_uint64 hit;           /* Blocks already in cache. */
     176                 :             :         pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
     177                 :             :         pg_atomic_uint64 skip_new;      /* New/missing blocks filtered. */
     178                 :             :         pg_atomic_uint64 skip_fpw;      /* FPWs skipped. */
     179                 :             :         pg_atomic_uint64 skip_rep;      /* Repeat accesses skipped. */
     180                 :             : 
     181                 :             :         /* Dynamic values */
     182                 :             :         int                     wal_distance;   /* Number of WAL bytes ahead. */
     183                 :             :         int                     block_distance; /* Number of block references ahead. */
     184                 :             :         int                     io_depth;               /* Number of I/Os in progress. */
     185                 :             : } XLogPrefetchStats;
     186                 :             : 
     187                 :             : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
     188                 :             :                                                                                    RelFileLocator rlocator,
     189                 :             :                                                                                    BlockNumber blockno,
     190                 :             :                                                                                    XLogRecPtr lsn);
     191                 :             : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
     192                 :             :                                                                                         RelFileLocator rlocator,
     193                 :             :                                                                                         BlockNumber blockno);
     194                 :             : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
     195                 :             :                                                                                                  XLogRecPtr replaying_lsn);
     196                 :             : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
     197                 :             :                                                                                                           XLogRecPtr *lsn);
     198                 :             : 
     199                 :             : static XLogPrefetchStats *SharedStats;
     200                 :             : 
     201                 :             : static inline LsnReadQueue *
     202                 :           8 : lrq_alloc(uint32 max_distance,
     203                 :             :                   uint32 max_inflight,
     204                 :             :                   uintptr_t lrq_private,
     205                 :             :                   LsnReadQueueNextFun next)
     206                 :             : {
     207                 :           8 :         LsnReadQueue *lrq;
     208                 :           8 :         uint32          size;
     209                 :             : 
     210         [ +  - ]:           8 :         Assert(max_distance >= max_inflight);
     211                 :             : 
     212                 :           8 :         size = max_distance + 1;        /* full ring buffer has a gap */
     213                 :           8 :         lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
     214                 :           8 :         lrq->lrq_private = lrq_private;
     215                 :           8 :         lrq->max_inflight = max_inflight;
     216                 :           8 :         lrq->size = size;
     217                 :           8 :         lrq->next = next;
     218                 :           8 :         lrq->head = 0;
     219                 :           8 :         lrq->tail = 0;
     220                 :           8 :         lrq->inflight = 0;
     221                 :           8 :         lrq->completed = 0;
     222                 :             : 
     223                 :          16 :         return lrq;
     224                 :           8 : }
     225                 :             : 
     226                 :             : static inline void
     227                 :           8 : lrq_free(LsnReadQueue *lrq)
     228                 :             : {
     229                 :           8 :         pfree(lrq);
     230                 :           8 : }
     231                 :             : 
     232                 :             : static inline uint32
     233                 :           8 : lrq_inflight(LsnReadQueue *lrq)
     234                 :             : {
     235                 :           8 :         return lrq->inflight;
     236                 :             : }
     237                 :             : 
     238                 :             : static inline uint32
     239                 :           8 : lrq_completed(LsnReadQueue *lrq)
     240                 :             : {
     241                 :           8 :         return lrq->completed;
     242                 :             : }
     243                 :             : 
     244                 :             : static inline void
     245                 :           8 : lrq_prefetch(LsnReadQueue *lrq)
     246                 :             : {
     247                 :             :         /* Try to start as many IOs as we can within our limits. */
     248   [ -  +  -  + ]:          16 :         while (lrq->inflight < lrq->max_inflight &&
     249                 :           8 :                    lrq->inflight + lrq->completed < lrq->size - 1)
     250                 :             :         {
     251         [ -  + ]:           8 :                 Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
     252   [ +  -  -  - ]:           8 :                 switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
     253                 :             :                 {
     254                 :             :                         case LRQ_NEXT_AGAIN:
     255                 :           8 :                                 return;
     256                 :             :                         case LRQ_NEXT_IO:
     257                 :           0 :                                 lrq->queue[lrq->head].io = true;
     258                 :           0 :                                 lrq->inflight++;
     259                 :           0 :                                 break;
     260                 :             :                         case LRQ_NEXT_NO_IO:
     261                 :           0 :                                 lrq->queue[lrq->head].io = false;
     262                 :           0 :                                 lrq->completed++;
     263                 :           0 :                                 break;
     264                 :             :                 }
     265                 :           0 :                 lrq->head++;
     266         [ #  # ]:           0 :                 if (lrq->head == lrq->size)
     267                 :           0 :                         lrq->head = 0;
     268                 :             :         }
     269                 :           8 : }
     270                 :             : 
     271                 :             : static inline void
     272                 :           8 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
     273                 :             : {
     274                 :             :         /*
     275                 :             :          * We know that LSNs before 'lsn' have been replayed, so we can now assume
     276                 :             :          * that any IOs that were started before then have finished.
     277                 :             :          */
     278   [ +  -  +  - ]:           8 :         while (lrq->tail != lrq->head &&
     279                 :           0 :                    lrq->queue[lrq->tail].lsn < lsn)
     280                 :             :         {
     281         [ #  # ]:           0 :                 if (lrq->queue[lrq->tail].io)
     282                 :           0 :                         lrq->inflight--;
     283                 :             :                 else
     284                 :           0 :                         lrq->completed--;
     285                 :           0 :                 lrq->tail++;
     286         [ #  # ]:           0 :                 if (lrq->tail == lrq->size)
     287                 :           0 :                         lrq->tail = 0;
     288                 :             :         }
     289   [ +  -  -  + ]:           8 :         if (RecoveryPrefetchEnabled())
     290                 :           8 :                 lrq_prefetch(lrq);
     291                 :           8 : }
     292                 :             : 
     293                 :             : size_t
     294                 :           9 : XLogPrefetchShmemSize(void)
     295                 :             : {
     296                 :           9 :         return sizeof(XLogPrefetchStats);
     297                 :             : }
     298                 :             : 
     299                 :             : /*
     300                 :             :  * Reset all counters to zero.
     301                 :             :  */
     302                 :             : void
     303                 :           1 : XLogPrefetchResetStats(void)
     304                 :             : {
     305                 :           1 :         pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     306                 :           1 :         pg_atomic_write_u64(&SharedStats->prefetch, 0);
     307                 :           1 :         pg_atomic_write_u64(&SharedStats->hit, 0);
     308                 :           1 :         pg_atomic_write_u64(&SharedStats->skip_init, 0);
     309                 :           1 :         pg_atomic_write_u64(&SharedStats->skip_new, 0);
     310                 :           1 :         pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
     311                 :           1 :         pg_atomic_write_u64(&SharedStats->skip_rep, 0);
     312                 :           1 : }
     313                 :             : 
     314                 :             : void
     315                 :           6 : XLogPrefetchShmemInit(void)
     316                 :             : {
     317                 :           6 :         bool            found;
     318                 :             : 
     319                 :           6 :         SharedStats = (XLogPrefetchStats *)
     320                 :           6 :                 ShmemInitStruct("XLogPrefetchStats",
     321                 :             :                                                 sizeof(XLogPrefetchStats),
     322                 :             :                                                 &found);
     323                 :             : 
     324         [ -  + ]:           6 :         if (!found)
     325                 :             :         {
     326                 :           6 :                 pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     327                 :           6 :                 pg_atomic_init_u64(&SharedStats->prefetch, 0);
     328                 :           6 :                 pg_atomic_init_u64(&SharedStats->hit, 0);
     329                 :           6 :                 pg_atomic_init_u64(&SharedStats->skip_init, 0);
     330                 :           6 :                 pg_atomic_init_u64(&SharedStats->skip_new, 0);
     331                 :           6 :                 pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
     332                 :           6 :                 pg_atomic_init_u64(&SharedStats->skip_rep, 0);
     333                 :           6 :         }
     334                 :           6 : }
     335                 :             : 
     336                 :             : /*
     337                 :             :  * Called when any GUC is changed that affects prefetching.
     338                 :             :  */
     339                 :             : void
     340                 :           0 : XLogPrefetchReconfigure(void)
     341                 :             : {
     342                 :           0 :         XLogPrefetchReconfigureCount++;
     343                 :           0 : }
     344                 :             : 
     345                 :             : /*
     346                 :             :  * Increment a counter in shared memory.  This is equivalent to *counter++ on a
     347                 :             :  * plain uint64 without any memory barrier or locking, except on platforms
     348                 :             :  * where readers can't read uint64 without possibly observing a torn value.
     349                 :             :  */
     350                 :             : static inline void
     351                 :           0 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
     352                 :             : {
     353   [ #  #  #  # ]:           0 :         Assert(AmStartupProcess() || !IsUnderPostmaster);
     354                 :           0 :         pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
     355                 :           0 : }
     356                 :             : 
     357                 :             : /*
     358                 :             :  * Create a prefetcher that is ready to begin prefetching blocks referenced by
     359                 :             :  * WAL records.
     360                 :             :  */
     361                 :             : XLogPrefetcher *
     362                 :           4 : XLogPrefetcherAllocate(XLogReaderState *reader)
     363                 :             : {
     364                 :           4 :         XLogPrefetcher *prefetcher;
     365                 :           4 :         HASHCTL         ctl;
     366                 :             : 
     367                 :           4 :         prefetcher = palloc0_object(XLogPrefetcher);
     368                 :           4 :         prefetcher->reader = reader;
     369                 :             : 
     370                 :           4 :         ctl.keysize = sizeof(RelFileLocator);
     371                 :           4 :         ctl.entrysize = sizeof(XLogPrefetcherFilter);
     372                 :           4 :         prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
     373                 :             :                                                                                    &ctl, HASH_ELEM | HASH_BLOBS);
     374                 :           4 :         dlist_init(&prefetcher->filter_queue);
     375                 :             : 
     376                 :           4 :         SharedStats->wal_distance = 0;
     377                 :           4 :         SharedStats->block_distance = 0;
     378                 :           4 :         SharedStats->io_depth = 0;
     379                 :             : 
     380                 :             :         /* First usage will cause streaming_read to be allocated. */
     381                 :           4 :         prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
     382                 :             : 
     383                 :           8 :         return prefetcher;
     384                 :           4 : }
     385                 :             : 
     386                 :             : /*
     387                 :             :  * Destroy a prefetcher and release all resources.
     388                 :             :  */
     389                 :             : void
     390                 :           4 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
     391                 :             : {
     392                 :           4 :         lrq_free(prefetcher->streaming_read);
     393                 :           4 :         hash_destroy(prefetcher->filter_table);
     394                 :           4 :         pfree(prefetcher);
     395                 :           4 : }
     396                 :             : 
     397                 :             : /*
     398                 :             :  * Provide access to the reader.
     399                 :             :  */
     400                 :             : XLogReaderState *
     401                 :           8 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
     402                 :             : {
     403                 :           8 :         return prefetcher->reader;
     404                 :             : }
     405                 :             : 
     406                 :             : /*
     407                 :             :  * Update the statistics visible in the pg_stat_recovery_prefetch view.
     408                 :             :  */
     409                 :             : void
     410                 :           8 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
     411                 :             : {
     412                 :           8 :         uint32          io_depth;
     413                 :           8 :         uint32          completed;
     414                 :           8 :         int64           wal_distance;
     415                 :             : 
     416                 :             : 
     417                 :             :         /* How far ahead of replay are we now? */
     418         [ +  - ]:           8 :         if (prefetcher->reader->decode_queue_tail)
     419                 :             :         {
     420                 :           8 :                 wal_distance =
     421                 :          16 :                         prefetcher->reader->decode_queue_tail->lsn -
     422                 :           8 :                         prefetcher->reader->decode_queue_head->lsn;
     423                 :           8 :         }
     424                 :             :         else
     425                 :             :         {
     426                 :           0 :                 wal_distance = 0;
     427                 :             :         }
     428                 :             : 
     429                 :             :         /* How many IOs are currently in flight and completed? */
     430                 :           8 :         io_depth = lrq_inflight(prefetcher->streaming_read);
     431                 :           8 :         completed = lrq_completed(prefetcher->streaming_read);
     432                 :             : 
     433                 :             :         /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
     434                 :           8 :         SharedStats->io_depth = io_depth;
     435                 :           8 :         SharedStats->block_distance = io_depth + completed;
     436                 :           8 :         SharedStats->wal_distance = wal_distance;
     437                 :             : 
     438                 :           8 :         prefetcher->next_stats_shm_lsn =
     439                 :           8 :                 prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
     440                 :           8 : }
     441                 :             : 
     442                 :             : /*
     443                 :             :  * A callback that examines the next block reference in the WAL, and possibly
     444                 :             :  * starts an IO so that a later read will be fast.
     445                 :             :  *
     446                 :             :  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
     447                 :             :  *
     448                 :             :  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
     449                 :             :  * that isn't in the buffer pool, and the kernel has been asked to start
     450                 :             :  * reading it to make a future read system call faster. An LSN is written to
     451                 :             :  * *lsn, and the I/O will be considered to have completed once that LSN is
     452                 :             :  * replayed.
     453                 :             :  *
     454                 :             :  * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
     455                 :             :  * that it was already in the buffer pool, or we decided for various reasons
     456                 :             :  * not to prefetch.
     457                 :             :  */
     458                 :             : static LsnReadQueueNextStatus
     459                 :           8 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
     460                 :             : {
     461                 :           8 :         XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
     462                 :           8 :         XLogReaderState *reader = prefetcher->reader;
     463                 :           8 :         XLogRecPtr      replaying_lsn = reader->ReadRecPtr;
     464                 :             : 
     465                 :             :         /*
     466                 :             :          * We keep track of the record and block we're up to between calls with
     467                 :             :          * prefetcher->record and prefetcher->next_block_id.
     468                 :             :          */
     469                 :           8 :         for (;;)
     470                 :             :         {
     471                 :           8 :                 DecodedXLogRecord *record;
     472                 :             : 
     473                 :             :                 /* Try to read a new future record, if we don't already have one. */
     474         [ -  + ]:           8 :                 if (prefetcher->record == NULL)
     475                 :             :                 {
     476                 :           8 :                         bool            nonblocking;
     477                 :             : 
     478                 :             :                         /*
     479                 :             :                          * If there are already records or an error queued up that could
     480                 :             :                          * be replayed, we don't want to block here.  Otherwise, it's OK
     481                 :             :                          * to block waiting for more data: presumably the caller has
     482                 :             :                          * nothing else to do.
     483                 :             :                          */
     484                 :           8 :                         nonblocking = XLogReaderHasQueuedRecordOrError(reader);
     485                 :             : 
     486                 :             :                         /* Readahead is disabled until we replay past a certain point. */
     487   [ -  +  #  # ]:           8 :                         if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
     488                 :           0 :                                 return LRQ_NEXT_AGAIN;
     489                 :             : 
     490                 :           8 :                         record = XLogReadAhead(prefetcher->reader, nonblocking);
     491         [ +  - ]:           8 :                         if (record == NULL)
     492                 :             :                         {
     493                 :             :                                 /*
     494                 :             :                                  * We can't read any more, due to an error or lack of data in
     495                 :             :                                  * nonblocking mode.  Don't try to read ahead again until
     496                 :             :                                  * we've replayed everything already decoded.
     497                 :             :                                  */
     498   [ #  #  #  # ]:           0 :                                 if (nonblocking && prefetcher->reader->decode_queue_tail)
     499                 :           0 :                                         prefetcher->no_readahead_until =
     500                 :           0 :                                                 prefetcher->reader->decode_queue_tail->lsn;
     501                 :             : 
     502                 :           0 :                                 return LRQ_NEXT_AGAIN;
     503                 :             :                         }
     504                 :             : 
     505                 :             :                         /*
     506                 :             :                          * If prefetching is disabled, we don't need to analyze the record
     507                 :             :                          * or issue any prefetches.  We just need to cause one record to
     508                 :             :                          * be decoded.
     509                 :             :                          */
     510   [ +  -  -  + ]:           8 :                         if (!RecoveryPrefetchEnabled())
     511                 :             :                         {
     512                 :           0 :                                 *lsn = InvalidXLogRecPtr;
     513                 :           0 :                                 return LRQ_NEXT_NO_IO;
     514                 :             :                         }
     515                 :             : 
     516                 :             :                         /* We have a new record to process. */
     517                 :           8 :                         prefetcher->record = record;
     518                 :           8 :                         prefetcher->next_block_id = 0;
     519         [ -  + ]:           8 :                 }
     520                 :             :                 else
     521                 :             :                 {
     522                 :             :                         /* Continue to process from last call, or last loop. */
     523                 :           0 :                         record = prefetcher->record;
     524                 :             :                 }
     525                 :             : 
     526                 :             :                 /*
     527                 :             :                  * Check for operations that require us to filter out block ranges, or
     528                 :             :                  * pause readahead completely.
     529                 :             :                  */
     530         [ -  + ]:           8 :                 if (replaying_lsn < record->lsn)
     531                 :             :                 {
     532                 :           8 :                         uint8           rmid = record->header.xl_rmid;
     533                 :           8 :                         uint8           record_type = record->header.xl_info & ~XLR_INFO_MASK;
     534                 :             : 
     535         [ -  + ]:           8 :                         if (rmid == RM_XLOG_ID)
     536                 :             :                         {
     537   [ -  +  #  # ]:           8 :                                 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
     538                 :           0 :                                         record_type == XLOG_END_OF_RECOVERY)
     539                 :             :                                 {
     540                 :             :                                         /*
     541                 :             :                                          * These records might change the TLI.  Avoid potential
     542                 :             :                                          * bugs if we were to allow "read TLI" and "replay TLI" to
     543                 :             :                                          * differ without more analysis.
     544                 :             :                                          */
     545                 :           8 :                                         prefetcher->no_readahead_until = record->lsn;
     546                 :             : 
     547                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     548                 :             :                                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     549                 :             :                                                  "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
     550                 :             :                                                  LSN_FORMAT_ARGS(record->lsn));
     551                 :             : #endif
     552                 :             : 
     553                 :             :                                         /* Fall through so we move past this record. */
     554                 :           8 :                                 }
     555                 :           8 :                         }
     556         [ #  # ]:           0 :                         else if (rmid == RM_DBASE_ID)
     557                 :             :                         {
     558                 :             :                                 /*
     559                 :             :                                  * When databases are created with the file-copy strategy,
     560                 :             :                                  * there are no WAL records to tell us about the creation of
     561                 :             :                                  * individual relations.
     562                 :             :                                  */
     563         [ #  # ]:           0 :                                 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
     564                 :             :                                 {
     565                 :           0 :                                         xl_dbase_create_file_copy_rec *xlrec =
     566                 :           0 :                                                 (xl_dbase_create_file_copy_rec *) record->main_data;
     567                 :           0 :                                         RelFileLocator rlocator =
     568                 :           0 :                                         {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
     569                 :             : 
     570                 :             :                                         /*
     571                 :             :                                          * Don't try to prefetch anything in this database until
     572                 :             :                                          * it has been created, or we might confuse the blocks of
     573                 :             :                                          * different generations, if a database OID or
     574                 :             :                                          * relfilenumber is reused.  It's also more efficient than
     575                 :             :                                          * discovering that relations don't exist on disk yet with
     576                 :             :                                          * ENOENT errors.
     577                 :             :                                          */
     578                 :           0 :                                         XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
     579                 :             : 
     580                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     581                 :             :                                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     582                 :             :                                                  "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
     583                 :             :                                                  rlocator.dbOid,
     584                 :             :                                                  LSN_FORMAT_ARGS(record->lsn));
     585                 :             : #endif
     586                 :           0 :                                 }
     587                 :           0 :                         }
     588         [ #  # ]:           0 :                         else if (rmid == RM_SMGR_ID)
     589                 :             :                         {
     590         [ #  # ]:           0 :                                 if (record_type == XLOG_SMGR_CREATE)
     591                 :             :                                 {
     592                 :           0 :                                         xl_smgr_create *xlrec = (xl_smgr_create *)
     593                 :           0 :                                                 record->main_data;
     594                 :             : 
     595         [ #  # ]:           0 :                                         if (xlrec->forkNum == MAIN_FORKNUM)
     596                 :             :                                         {
     597                 :             :                                                 /*
     598                 :             :                                                  * Don't prefetch anything for this whole relation
     599                 :             :                                                  * until it has been created.  Otherwise we might
     600                 :             :                                                  * confuse the blocks of different generations, if a
     601                 :             :                                                  * relfilenumber is reused.  This also avoids the need
     602                 :             :                                                  * to discover the problem via extra syscalls that
     603                 :             :                                                  * report ENOENT.
     604                 :             :                                                  */
     605                 :           0 :                                                 XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
     606                 :           0 :                                                                                                 record->lsn);
     607                 :             : 
     608                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     609                 :             :                                                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     610                 :             :                                                          "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
     611                 :             :                                                          xlrec->rlocator.spcOid,
     612                 :             :                                                          xlrec->rlocator.dbOid,
     613                 :             :                                                          xlrec->rlocator.relNumber,
     614                 :             :                                                          LSN_FORMAT_ARGS(record->lsn));
     615                 :             : #endif
     616                 :           0 :                                         }
     617                 :           0 :                                 }
     618         [ #  # ]:           0 :                                 else if (record_type == XLOG_SMGR_TRUNCATE)
     619                 :             :                                 {
     620                 :           0 :                                         xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
     621                 :           0 :                                                 record->main_data;
     622                 :             : 
     623                 :             :                                         /*
     624                 :             :                                          * Don't consider prefetching anything in the truncated
     625                 :             :                                          * range until the truncation has been performed.
     626                 :             :                                          */
     627                 :           0 :                                         XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
     628                 :           0 :                                                                                         xlrec->blkno,
     629                 :           0 :                                                                                         record->lsn);
     630                 :             : 
     631                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     632                 :             :                                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     633                 :             :                                                  "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
     634                 :             :                                                  xlrec->rlocator.spcOid,
     635                 :             :                                                  xlrec->rlocator.dbOid,
     636                 :             :                                                  xlrec->rlocator.relNumber,
     637                 :             :                                                  xlrec->blkno,
     638                 :             :                                                  LSN_FORMAT_ARGS(record->lsn));
     639                 :             : #endif
     640                 :           0 :                                 }
     641                 :           0 :                         }
     642                 :           8 :                 }
     643                 :             : 
     644                 :             :                 /* Scan the block references, starting where we left off last time. */
     645         [ -  + ]:           8 :                 while (prefetcher->next_block_id <= record->max_block_id)
     646                 :             :                 {
     647                 :           0 :                         int                     block_id = prefetcher->next_block_id++;
     648                 :           0 :                         DecodedBkpBlock *block = &record->blocks[block_id];
     649                 :           0 :                         SMgrRelation reln;
     650                 :           0 :                         PrefetchBufferResult result;
     651                 :             : 
     652         [ #  # ]:           0 :                         if (!block->in_use)
     653                 :           0 :                                 continue;
     654                 :             : 
     655         [ #  # ]:           0 :                         Assert(!BufferIsValid(block->prefetch_buffer));
     656                 :             : 
     657                 :             :                         /*
     658                 :             :                          * Record the LSN of this record.  When it's replayed,
     659                 :             :                          * LsnReadQueue will consider any IOs submitted for earlier LSNs
     660                 :             :                          * to be finished.
     661                 :             :                          */
     662                 :           0 :                         *lsn = record->lsn;
     663                 :             : 
     664                 :             :                         /* We don't try to prefetch anything but the main fork for now. */
     665         [ #  # ]:           0 :                         if (block->forknum != MAIN_FORKNUM)
     666                 :             :                         {
     667                 :           0 :                                 return LRQ_NEXT_NO_IO;
     668                 :             :                         }
     669                 :             : 
     670                 :             :                         /*
     671                 :             :                          * If there is a full page image attached, we won't be reading the
     672                 :             :                          * page, so don't bother trying to prefetch.
     673                 :             :                          */
     674         [ #  # ]:           0 :                         if (block->has_image)
     675                 :             :                         {
     676                 :           0 :                                 XLogPrefetchIncrement(&SharedStats->skip_fpw);
     677                 :           0 :                                 return LRQ_NEXT_NO_IO;
     678                 :             :                         }
     679                 :             : 
     680                 :             :                         /* There is no point in reading a page that will be zeroed. */
     681         [ #  # ]:           0 :                         if (block->flags & BKPBLOCK_WILL_INIT)
     682                 :             :                         {
     683                 :           0 :                                 XLogPrefetchIncrement(&SharedStats->skip_init);
     684                 :           0 :                                 return LRQ_NEXT_NO_IO;
     685                 :             :                         }
     686                 :             : 
     687                 :             :                         /* Should we skip prefetching this block due to a filter? */
     688         [ #  # ]:           0 :                         if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
     689                 :             :                         {
     690                 :           0 :                                 XLogPrefetchIncrement(&SharedStats->skip_new);
     691                 :           0 :                                 return LRQ_NEXT_NO_IO;
     692                 :             :                         }
     693                 :             : 
     694                 :             :                         /* There is no point in repeatedly prefetching the same block. */
     695   [ #  #  #  # ]:           0 :                         for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
     696                 :             :                         {
     697   [ #  #  #  # ]:           0 :                                 if (block->blkno == prefetcher->recent_block[i] &&
     698   [ #  #  #  # ]:           0 :                                         RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
     699                 :             :                                 {
     700                 :             :                                         /*
     701                 :             :                                          * XXX If we also remembered where it was, we could set
     702                 :             :                                          * recent_buffer so that recovery could skip smgropen()
     703                 :             :                                          * and a buffer table lookup.
     704                 :             :                                          */
     705                 :           0 :                                         XLogPrefetchIncrement(&SharedStats->skip_rep);
     706                 :           0 :                                         return LRQ_NEXT_NO_IO;
     707                 :             :                                 }
     708                 :           0 :                         }
     709                 :           0 :                         prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
     710                 :           0 :                         prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
     711                 :           0 :                         prefetcher->recent_idx =
     712                 :           0 :                                 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
     713                 :             : 
     714                 :             :                         /*
     715                 :             :                          * We could try to have a fast path for repeated references to the
     716                 :             :                          * same relation (with some scheme to handle invalidations
     717                 :             :                          * safely), but for now we'll call smgropen() every time.
     718                 :             :                          */
     719                 :           0 :                         reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
     720                 :             : 
     721                 :             :                         /*
     722                 :             :                          * If the relation file doesn't exist on disk, for example because
     723                 :             :                          * we're replaying after a crash and the file will be created and
     724                 :             :                          * then unlinked by WAL that hasn't been replayed yet, suppress
     725                 :             :                          * further prefetching in the relation until this record is
     726                 :             :                          * replayed.
     727                 :             :                          */
     728         [ #  # ]:           0 :                         if (!smgrexists(reln, MAIN_FORKNUM))
     729                 :             :                         {
     730                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     731                 :             :                                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     732                 :             :                                          "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
     733                 :             :                                          reln->smgr_rlocator.locator.spcOid,
     734                 :             :                                          reln->smgr_rlocator.locator.dbOid,
     735                 :             :                                          reln->smgr_rlocator.locator.relNumber,
     736                 :             :                                          LSN_FORMAT_ARGS(record->lsn));
     737                 :             : #endif
     738                 :           0 :                                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
     739                 :           0 :                                                                                 record->lsn);
     740                 :           0 :                                 XLogPrefetchIncrement(&SharedStats->skip_new);
     741                 :           0 :                                 return LRQ_NEXT_NO_IO;
     742                 :             :                         }
     743                 :             : 
     744                 :             :                         /*
     745                 :             :                          * If the relation isn't big enough to contain the referenced
     746                 :             :                          * block yet, suppress prefetching of this block and higher until
     747                 :             :                          * this record is replayed.
     748                 :             :                          */
     749         [ #  # ]:           0 :                         if (block->blkno >= smgrnblocks(reln, block->forknum))
     750                 :             :                         {
     751                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     752                 :             :                                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     753                 :             :                                          "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
     754                 :             :                                          reln->smgr_rlocator.locator.spcOid,
     755                 :             :                                          reln->smgr_rlocator.locator.dbOid,
     756                 :             :                                          reln->smgr_rlocator.locator.relNumber,
     757                 :             :                                          block->blkno,
     758                 :             :                                          LSN_FORMAT_ARGS(record->lsn));
     759                 :             : #endif
     760                 :           0 :                                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
     761                 :           0 :                                                                                 record->lsn);
     762                 :           0 :                                 XLogPrefetchIncrement(&SharedStats->skip_new);
     763                 :           0 :                                 return LRQ_NEXT_NO_IO;
     764                 :             :                         }
     765                 :             : 
     766                 :             :                         /* Try to initiate prefetching. */
     767                 :           0 :                         result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
     768         [ #  # ]:           0 :                         if (BufferIsValid(result.recent_buffer))
     769                 :             :                         {
     770                 :             :                                 /* Cache hit, nothing to do. */
     771                 :           0 :                                 XLogPrefetchIncrement(&SharedStats->hit);
     772                 :           0 :                                 block->prefetch_buffer = result.recent_buffer;
     773                 :           0 :                                 return LRQ_NEXT_NO_IO;
     774                 :             :                         }
     775         [ #  # ]:           0 :                         else if (result.initiated_io)
     776                 :             :                         {
     777                 :             :                                 /* Cache miss, I/O (presumably) started. */
     778                 :           0 :                                 XLogPrefetchIncrement(&SharedStats->prefetch);
     779                 :           0 :                                 block->prefetch_buffer = InvalidBuffer;
     780                 :           0 :                                 return LRQ_NEXT_IO;
     781                 :             :                         }
     782         [ #  # ]:           0 :                         else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
     783                 :             :                         {
     784                 :             :                                 /*
     785                 :             :                                  * This shouldn't be possible, because we already determined
     786                 :             :                                  * that the relation exists on disk and is big enough.
     787                 :             :                                  * Something is wrong with the cache invalidation for
     788                 :             :                                  * smgrexists(), smgrnblocks(), or the file was unlinked or
     789                 :             :                                  * truncated beneath our feet?
     790                 :             :                                  */
     791   [ #  #  #  # ]:           0 :                                 elog(ERROR,
     792                 :             :                                          "could not prefetch relation %u/%u/%u block %u",
     793                 :             :                                          reln->smgr_rlocator.locator.spcOid,
     794                 :             :                                          reln->smgr_rlocator.locator.dbOid,
     795                 :             :                                          reln->smgr_rlocator.locator.relNumber,
     796                 :             :                                          block->blkno);
     797                 :           0 :                         }
     798      [ #  #  # ]:           0 :                 }
     799                 :             : 
     800                 :             :                 /*
     801                 :             :                  * Several callsites need to be able to read exactly one record
     802                 :             :                  * without any internal readahead.  Examples: xlog.c reading
     803                 :             :                  * checkpoint records with emode set to PANIC, which might otherwise
     804                 :             :                  * cause XLogPageRead() to panic on some future page, and xlog.c
     805                 :             :                  * determining where to start writing WAL next, which depends on the
     806                 :             :                  * contents of the reader's internal buffer after reading one record.
     807                 :             :                  * Therefore, don't even think about prefetching until the first
     808                 :             :                  * record after XLogPrefetcherBeginRead() has been consumed.
     809                 :             :                  */
     810   [ +  -  -  + ]:           8 :                 if (prefetcher->reader->decode_queue_tail &&
     811                 :           8 :                         prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
     812                 :           8 :                         return LRQ_NEXT_AGAIN;
     813                 :             : 
     814                 :             :                 /* Advance to the next record. */
     815                 :           0 :                 prefetcher->record = NULL;
     816         [ -  + ]:           8 :         }
     817                 :             :         pg_unreachable();
     818                 :           8 : }
     819                 :             : 
     820                 :             : /*
     821                 :             :  * Expose statistics about recovery prefetching.
     822                 :             :  */
     823                 :             : Datum
     824                 :           2 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
     825                 :             : {
     826                 :             : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
     827                 :           2 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     828                 :           2 :         Datum           values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     829                 :           2 :         bool            nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     830                 :             : 
     831                 :           2 :         InitMaterializedSRF(fcinfo, 0);
     832                 :             : 
     833         [ +  + ]:          22 :         for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
     834                 :          20 :                 nulls[i] = false;
     835                 :             : 
     836                 :           2 :         values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
     837                 :           2 :         values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
     838                 :           2 :         values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
     839                 :           2 :         values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
     840                 :           2 :         values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
     841                 :           2 :         values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
     842                 :           2 :         values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
     843                 :           2 :         values[7] = Int32GetDatum(SharedStats->wal_distance);
     844                 :           2 :         values[8] = Int32GetDatum(SharedStats->block_distance);
     845                 :           2 :         values[9] = Int32GetDatum(SharedStats->io_depth);
     846                 :           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     847                 :             : 
     848                 :           2 :         return (Datum) 0;
     849                 :           2 : }
     850                 :             : 
     851                 :             : /*
     852                 :             :  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
     853                 :             :  * has been replayed.
     854                 :             :  */
     855                 :             : static inline void
     856                 :           0 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     857                 :             :                                                 BlockNumber blockno, XLogRecPtr lsn)
     858                 :             : {
     859                 :           0 :         XLogPrefetcherFilter *filter;
     860                 :           0 :         bool            found;
     861                 :             : 
     862                 :           0 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
     863         [ #  # ]:           0 :         if (!found)
     864                 :             :         {
     865                 :             :                 /*
     866                 :             :                  * Don't allow any prefetching of this block or higher until replayed.
     867                 :             :                  */
     868                 :           0 :                 filter->filter_until_replayed = lsn;
     869                 :           0 :                 filter->filter_from_block = blockno;
     870                 :           0 :                 dlist_push_head(&prefetcher->filter_queue, &filter->link);
     871                 :           0 :         }
     872                 :             :         else
     873                 :             :         {
     874                 :             :                 /*
     875                 :             :                  * We were already filtering this rlocator.  Extend the filter's
     876                 :             :                  * lifetime to cover this WAL record, but leave the lower of the block
     877                 :             :                  * numbers there because we don't want to have to track individual
     878                 :             :                  * blocks.
     879                 :             :                  */
     880                 :           0 :                 filter->filter_until_replayed = lsn;
     881                 :           0 :                 dlist_delete(&filter->link);
     882                 :           0 :                 dlist_push_head(&prefetcher->filter_queue, &filter->link);
     883         [ #  # ]:           0 :                 filter->filter_from_block = Min(filter->filter_from_block, blockno);
     884                 :             :         }
     885                 :           0 : }
     886                 :             : 
     887                 :             : /*
     888                 :             :  * Have we replayed any records that caused us to begin filtering a block
     889                 :             :  * range?  That means that relations should have been created, extended or
     890                 :             :  * dropped as required, so we can stop filtering out accesses to a given
     891                 :             :  * relfilenumber.
     892                 :             :  */
     893                 :             : static inline void
     894                 :           8 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
     895                 :             : {
     896         [ +  - ]:           8 :         while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     897                 :             :         {
     898                 :           0 :                 XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
     899                 :             :                                                                                                                   link,
     900                 :             :                                                                                                                   &prefetcher->filter_queue);
     901                 :             : 
     902         [ #  # ]:           0 :                 if (filter->filter_until_replayed >= replaying_lsn)
     903                 :           0 :                         break;
     904                 :             : 
     905                 :           0 :                 dlist_delete(&filter->link);
     906                 :           0 :                 hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
     907      [ #  #  # ]:           0 :         }
     908                 :           8 : }
     909                 :             : 
     910                 :             : /*
     911                 :             :  * Check if a given block should be skipped due to a filter.
     912                 :             :  */
     913                 :             : static inline bool
     914                 :           0 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     915                 :             :                                                  BlockNumber blockno)
     916                 :             : {
     917                 :             :         /*
     918                 :             :          * Test for empty queue first, because we expect it to be empty most of
     919                 :             :          * the time and we can avoid the hash table lookup in that case.
     920                 :             :          */
     921         [ #  # ]:           0 :         if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     922                 :             :         {
     923                 :           0 :                 XLogPrefetcherFilter *filter;
     924                 :             : 
     925                 :             :                 /* See if the block range is filtered. */
     926                 :           0 :                 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     927   [ #  #  #  # ]:           0 :                 if (filter && filter->filter_from_block <= blockno)
     928                 :             :                 {
     929                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     930                 :             :                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     931                 :             :                                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
     932                 :             :                                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     933                 :             :                                  LSN_FORMAT_ARGS(filter->filter_until_replayed),
     934                 :             :                                  filter->filter_from_block);
     935                 :             : #endif
     936                 :           0 :                         return true;
     937                 :             :                 }
     938                 :             : 
     939                 :             :                 /* See if the whole database is filtered. */
     940                 :           0 :                 rlocator.relNumber = InvalidRelFileNumber;
     941                 :           0 :                 rlocator.spcOid = InvalidOid;
     942                 :           0 :                 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     943         [ #  # ]:           0 :                 if (filter)
     944                 :             :                 {
     945                 :             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     946                 :             :                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     947                 :             :                                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
     948                 :             :                                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     949                 :             :                                  LSN_FORMAT_ARGS(filter->filter_until_replayed));
     950                 :             : #endif
     951                 :           0 :                         return true;
     952                 :             :                 }
     953      [ #  #  # ]:           0 :         }
     954                 :             : 
     955                 :           0 :         return false;
     956                 :           0 : }
     957                 :             : 
     958                 :             : /*
     959                 :             :  * A wrapper for XLogBeginRead() that also resets the prefetcher.
     960                 :             :  */
     961                 :             : void
     962                 :           8 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
     963                 :             : {
     964                 :             :         /* This will forget about any in-flight IO. */
     965                 :           8 :         prefetcher->reconfigure_count--;
     966                 :             : 
     967                 :             :         /* Book-keeping to avoid readahead on first read. */
     968                 :           8 :         prefetcher->begin_ptr = recPtr;
     969                 :             : 
     970                 :           8 :         prefetcher->no_readahead_until = 0;
     971                 :             : 
     972                 :             :         /* This will forget about any queued up records in the decoder. */
     973                 :           8 :         XLogBeginRead(prefetcher->reader, recPtr);
     974                 :           8 : }
     975                 :             : 
     976                 :             : /*
     977                 :             :  * A wrapper for XLogReadRecord() that provides the same interface, but also
     978                 :             :  * tries to initiate I/O for blocks referenced in future WAL records.
     979                 :             :  */
     980                 :             : XLogRecord *
     981                 :           8 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
     982                 :             : {
     983                 :           8 :         DecodedXLogRecord *record;
     984                 :           8 :         XLogRecPtr      replayed_up_to;
     985                 :             : 
     986                 :             :         /*
     987                 :             :          * See if it's time to reset the prefetching machinery, because a relevant
     988                 :             :          * GUC was changed.
     989                 :             :          */
     990         [ -  + ]:           8 :         if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
     991                 :             :         {
     992                 :           8 :                 uint32          max_distance;
     993                 :           8 :                 uint32          max_inflight;
     994                 :             : 
     995         [ +  + ]:           8 :                 if (prefetcher->streaming_read)
     996                 :           4 :                         lrq_free(prefetcher->streaming_read);
     997                 :             : 
     998   [ +  -  -  + ]:           8 :                 if (RecoveryPrefetchEnabled())
     999                 :             :                 {
    1000         [ +  - ]:           8 :                         Assert(maintenance_io_concurrency > 0);
    1001                 :           8 :                         max_inflight = maintenance_io_concurrency;
    1002                 :           8 :                         max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
    1003                 :           8 :                 }
    1004                 :             :                 else
    1005                 :             :                 {
    1006                 :           0 :                         max_inflight = 1;
    1007                 :           0 :                         max_distance = 1;
    1008                 :             :                 }
    1009                 :             : 
    1010                 :          16 :                 prefetcher->streaming_read = lrq_alloc(max_distance,
    1011                 :           8 :                                                                                            max_inflight,
    1012                 :           8 :                                                                                            (uintptr_t) prefetcher,
    1013                 :             :                                                                                            XLogPrefetcherNextBlock);
    1014                 :             : 
    1015                 :           8 :                 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
    1016                 :           8 :         }
    1017                 :             : 
    1018                 :             :         /*
    1019                 :             :          * Release last returned record, if there is one, as it's now been
    1020                 :             :          * replayed.
    1021                 :             :          */
    1022                 :           8 :         replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
    1023                 :             : 
    1024                 :             :         /*
    1025                 :             :          * Can we drop any filters yet?  If we were waiting for a relation to be
    1026                 :             :          * created or extended, it is now OK to access blocks in the covered
    1027                 :             :          * range.
    1028                 :             :          */
    1029                 :           8 :         XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
    1030                 :             : 
    1031                 :             :         /*
    1032                 :             :          * All IO initiated by earlier WAL is now completed.  This might trigger
    1033                 :             :          * further prefetching.
    1034                 :             :          */
    1035                 :           8 :         lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
    1036                 :             : 
    1037                 :             :         /*
    1038                 :             :          * If there's nothing queued yet, then start prefetching to cause at least
    1039                 :             :          * one record to be queued.
    1040                 :             :          */
    1041         [ +  - ]:           8 :         if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
    1042                 :             :         {
    1043         [ #  # ]:           0 :                 Assert(lrq_inflight(prefetcher->streaming_read) == 0);
    1044         [ #  # ]:           0 :                 Assert(lrq_completed(prefetcher->streaming_read) == 0);
    1045                 :           0 :                 lrq_prefetch(prefetcher->streaming_read);
    1046                 :           0 :         }
    1047                 :             : 
    1048                 :             :         /* Read the next record. */
    1049                 :           8 :         record = XLogNextRecord(prefetcher->reader, errmsg);
    1050         [ +  - ]:           8 :         if (!record)
    1051                 :           0 :                 return NULL;
    1052                 :             : 
    1053                 :             :         /*
    1054                 :             :          * The record we just got is the "current" one, for the benefit of the
    1055                 :             :          * XLogRecXXX() macros.
    1056                 :             :          */
    1057         [ +  - ]:           8 :         Assert(record == prefetcher->reader->record);
    1058                 :             : 
    1059                 :             :         /*
    1060                 :             :          * If maintenance_io_concurrency is set very low, we might have started
    1061                 :             :          * prefetching some but not all of the blocks referenced in the record
    1062                 :             :          * we're about to return.  Forget about the rest of the blocks in this
    1063                 :             :          * record by dropping the prefetcher's reference to it.
    1064                 :             :          */
    1065         [ -  + ]:           8 :         if (record == prefetcher->record)
    1066                 :           8 :                 prefetcher->record = NULL;
    1067                 :             : 
    1068                 :             :         /*
    1069                 :             :          * See if it's time to compute some statistics, because enough WAL has
    1070                 :             :          * been processed.
    1071                 :             :          */
    1072         [ +  + ]:           8 :         if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
    1073                 :           4 :                 XLogPrefetcherComputeStats(prefetcher);
    1074                 :             : 
    1075         [ +  - ]:           8 :         Assert(record == prefetcher->reader->record);
    1076                 :             : 
    1077                 :           8 :         return &record->header;
    1078                 :           8 : }
    1079                 :             : 
    1080                 :             : bool
    1081                 :           6 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
    1082                 :             : {
    1083                 :             : #ifndef USE_PREFETCH
    1084                 :             :         if (*new_value == RECOVERY_PREFETCH_ON)
    1085                 :             :         {
    1086                 :             :                 GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
    1087                 :             :                 return false;
    1088                 :             :         }
    1089                 :             : #endif
    1090                 :             : 
    1091                 :           6 :         return true;
    1092                 :             : }
    1093                 :             : 
    1094                 :             : void
    1095                 :           6 : assign_recovery_prefetch(int new_value, void *extra)
    1096                 :             : {
    1097                 :             :         /* Reconfigure prefetching, because a setting it depends on changed. */
    1098                 :           6 :         recovery_prefetch = new_value;
    1099         [ +  - ]:           6 :         if (AmStartupProcess())
    1100                 :           0 :                 XLogPrefetchReconfigure();
    1101                 :           6 : }
        

Generated by: LCOV version 2.3.2-1