LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 89.3 % 383 342
Test Date: 2026-01-26 10:56:24 Functions: 84.6 % 13 11
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 69.3 % 274 190

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * read_stream.c
       4                 :             :  *        Mechanism for accessing buffered relation data with look-ahead
       5                 :             :  *
       6                 :             :  * Code that needs to access relation data typically pins blocks one at a
       7                 :             :  * time, often in a predictable order that might be sequential or data-driven.
       8                 :             :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9                 :             :  * because blocks that are not yet in the buffer pool require I/O operations
      10                 :             :  * that are small and might stall waiting for storage.  This mechanism looks
      11                 :             :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12                 :             :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13                 :             :  * distance.
      14                 :             :  *
      15                 :             :  * A user-provided callback generates a stream of block numbers that is used
      16                 :             :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17                 :             :  * pending read.  When that isn't possible, the existing pending read is sent
      18                 :             :  * to StartReadBuffers() so that a new one can begin to form.
      19                 :             :  *
      20                 :             :  * The algorithm for controlling the look-ahead distance is based on recent
      21                 :             :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
      22                 :             :  * in looking ahead more than one block.  This is the default initial
      23                 :             :  * assumption, but when blocks needing I/O are streamed, the distance is
      24                 :             :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
      25                 :             :  * is reduced gradually when cached blocks are streamed.
      26                 :             :  *
      27                 :             :  * The main data structure is a circular queue of buffers of size
      28                 :             :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      29                 :             :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      30                 :             :  * variable sized object that is passed from the callback to the consumer of
      31                 :             :  * buffers.
      32                 :             :  *
      33                 :             :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      34                 :             :  * I/Os that have been started with StartReadBuffers(), and for which
      35                 :             :  * WaitReadBuffers() must be called before returning the buffer.
      36                 :             :  *
      37                 :             :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      38                 :             :  * successive calls, then these data structures might appear as follows:
      39                 :             :  *
      40                 :             :  *                          buffers buf/data       ios
      41                 :             :  *
      42                 :             :  *                          +----+  +-----+       +--------+
      43                 :             :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      44                 :             :  *                          +----+  +-----+  |    +--------+
      45                 :             :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      46                 :             :  *                          +----+  +-----+  | |  +--------+
      47                 :             :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      48                 :             :  *                          +----+  +-----+    |  +--------+
      49                 :             :  *                          | 43 |  |  ?  |    |  |        |
      50                 :             :  *                          +----+  +-----+    |  +--------+
      51                 :             :  *                          | 44 |  |  ?  |    |  |        |
      52                 :             :  *                          +----+  +-----+    |  +--------+
      53                 :             :  *                          | 60 |  |  ?  |<---+
      54                 :             :  *                          +----+  +-----+
      55                 :             :  *     next_buffer_index -> |    |  |     |
      56                 :             :  *                          +----+  +-----+
      57                 :             :  *
      58                 :             :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      59                 :             :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      60                 :             :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      61                 :             :  * does block 60.
      62                 :             :  *
      63                 :             :  *
      64                 :             :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
      65                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
      66                 :             :  *
      67                 :             :  * IDENTIFICATION
      68                 :             :  *        src/backend/storage/aio/read_stream.c
      69                 :             :  *
      70                 :             :  *-------------------------------------------------------------------------
      71                 :             :  */
      72                 :             : #include "postgres.h"
      73                 :             : 
      74                 :             : #include "miscadmin.h"
      75                 :             : #include "storage/aio.h"
      76                 :             : #include "storage/fd.h"
      77                 :             : #include "storage/smgr.h"
      78                 :             : #include "storage/read_stream.h"
      79                 :             : #include "utils/memdebug.h"
      80                 :             : #include "utils/rel.h"
      81                 :             : #include "utils/spccache.h"
      82                 :             : 
      83                 :             : typedef struct InProgressIO
      84                 :             : {
      85                 :             :         int16           buffer_index;
      86                 :             :         ReadBuffersOperation op;
      87                 :             : } InProgressIO;
      88                 :             : 
      89                 :             : /*
      90                 :             :  * State for managing a stream of reads.
      91                 :             :  */
      92                 :             : struct ReadStream
      93                 :             : {
      94                 :             :         int16           max_ios;
      95                 :             :         int16           io_combine_limit;
      96                 :             :         int16           ios_in_progress;
      97                 :             :         int16           queue_size;
      98                 :             :         int16           max_pinned_buffers;
      99                 :             :         int16           forwarded_buffers;
     100                 :             :         int16           pinned_buffers;
     101                 :             :         int16           distance;
     102                 :             :         int16           initialized_buffers;
     103                 :             :         int                     read_buffers_flags;
     104                 :             :         bool            sync_mode;              /* using io_method=sync */
     105                 :             :         bool            batch_mode;             /* READ_STREAM_USE_BATCHING */
     106                 :             :         bool            advice_enabled;
     107                 :             :         bool            temporary;
     108                 :             : 
     109                 :             :         /*
     110                 :             :          * One-block buffer to support 'ungetting' a block number, to resolve flow
     111                 :             :          * control problems when I/Os are split.
     112                 :             :          */
     113                 :             :         BlockNumber buffered_blocknum;
     114                 :             : 
     115                 :             :         /*
     116                 :             :          * The callback that will tell us which block numbers to read, and an
     117                 :             :          * opaque pointer that will be pass to it for its own purposes.
     118                 :             :          */
     119                 :             :         ReadStreamBlockNumberCB callback;
     120                 :             :         void       *callback_private_data;
     121                 :             : 
     122                 :             :         /* Next expected block, for detecting sequential access. */
     123                 :             :         BlockNumber seq_blocknum;
     124                 :             :         BlockNumber seq_until_processed;
     125                 :             : 
     126                 :             :         /* The read operation we are currently preparing. */
     127                 :             :         BlockNumber pending_read_blocknum;
     128                 :             :         int16           pending_read_nblocks;
     129                 :             : 
     130                 :             :         /* Space for buffers and optional per-buffer private data. */
     131                 :             :         size_t          per_buffer_data_size;
     132                 :             :         void       *per_buffer_data;
     133                 :             : 
     134                 :             :         /* Read operations that have been started but not waited for yet. */
     135                 :             :         InProgressIO *ios;
     136                 :             :         int16           oldest_io_index;
     137                 :             :         int16           next_io_index;
     138                 :             : 
     139                 :             :         bool            fast_path;
     140                 :             : 
     141                 :             :         /* Circular queue of buffers. */
     142                 :             :         int16           oldest_buffer_index;    /* Next pinned buffer to return */
     143                 :             :         int16           next_buffer_index;      /* Index of next buffer to pin */
     144                 :             :         Buffer          buffers[FLEXIBLE_ARRAY_MEMBER];
     145                 :             : };
     146                 :             : 
     147                 :             : /*
     148                 :             :  * Return a pointer to the per-buffer data by index.
     149                 :             :  */
     150                 :             : static inline void *
     151                 :      849039 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     152                 :             : {
     153                 :     1698078 :         return (char *) stream->per_buffer_data +
     154                 :      849039 :                 stream->per_buffer_data_size * buffer_index;
     155                 :             : }
     156                 :             : 
     157                 :             : /*
     158                 :             :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     159                 :             :  * blocks [current_blocknum, last_exclusive).
     160                 :             :  */
     161                 :             : BlockNumber
     162                 :        5698 : block_range_read_stream_cb(ReadStream *stream,
     163                 :             :                                                    void *callback_private_data,
     164                 :             :                                                    void *per_buffer_data)
     165                 :             : {
     166                 :        5698 :         BlockRangeReadStreamPrivate *p = callback_private_data;
     167                 :             : 
     168         [ +  + ]:        5698 :         if (p->current_blocknum < p->last_exclusive)
     169                 :        4784 :                 return p->current_blocknum++;
     170                 :             : 
     171                 :         914 :         return InvalidBlockNumber;
     172                 :        5698 : }
     173                 :             : 
     174                 :             : /*
     175                 :             :  * Ask the callback which block it would like us to read next, with a one block
     176                 :             :  * buffer in front to allow read_stream_unget_block() to work.
     177                 :             :  */
     178                 :             : static inline BlockNumber
     179                 :     1237173 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     180                 :             : {
     181                 :     1237173 :         BlockNumber blocknum;
     182                 :             : 
     183                 :     1237173 :         blocknum = stream->buffered_blocknum;
     184         [ -  + ]:     1237173 :         if (blocknum != InvalidBlockNumber)
     185                 :           0 :                 stream->buffered_blocknum = InvalidBlockNumber;
     186                 :             :         else
     187                 :             :         {
     188                 :             :                 /*
     189                 :             :                  * Tell Valgrind that the per-buffer data is undefined.  That replaces
     190                 :             :                  * the "noaccess" state that was set when the consumer moved past this
     191                 :             :                  * entry last time around the queue, and should also catch callbacks
     192                 :             :                  * that fail to initialize data that the buffer consumer later
     193                 :             :                  * accesses.  On the first go around, it is undefined already.
     194                 :             :                  */
     195                 :     1237173 :                 VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     196                 :             :                                                                         stream->per_buffer_data_size);
     197                 :     2474346 :                 blocknum = stream->callback(stream,
     198                 :     1237173 :                                                                         stream->callback_private_data,
     199                 :     1237173 :                                                                         per_buffer_data);
     200                 :             :         }
     201                 :             : 
     202                 :     2474346 :         return blocknum;
     203                 :     1237173 : }
     204                 :             : 
     205                 :             : /*
     206                 :             :  * In order to deal with buffer shortages and I/O limits after short reads, we
     207                 :             :  * sometimes need to defer handling of a block we've already consumed from the
     208                 :             :  * registered callback until later.
     209                 :             :  */
     210                 :             : static inline void
     211                 :           0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     212                 :             : {
     213                 :             :         /* We shouldn't ever unget more than one block. */
     214         [ #  # ]:           0 :         Assert(stream->buffered_blocknum == InvalidBlockNumber);
     215         [ #  # ]:           0 :         Assert(blocknum != InvalidBlockNumber);
     216                 :           0 :         stream->buffered_blocknum = blocknum;
     217                 :           0 : }
     218                 :             : 
     219                 :             : /*
     220                 :             :  * Start as much of the current pending read as we can.  If we have to split it
     221                 :             :  * because of the per-backend buffer limit, or the buffer manager decides to
     222                 :             :  * split it, then the pending read is adjusted to hold the remaining portion.
     223                 :             :  *
     224                 :             :  * We can always start a read of at least size one if we have no progress yet.
     225                 :             :  * Otherwise it's possible that we can't start a read at all because of a lack
     226                 :             :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     227                 :             :  * distance to a level that prevents look-ahead until buffers are released.
     228                 :             :  */
     229                 :             : static bool
     230                 :      402010 : read_stream_start_pending_read(ReadStream *stream)
     231                 :             : {
     232                 :      402010 :         bool            need_wait;
     233                 :      402010 :         int                     requested_nblocks;
     234                 :      402010 :         int                     nblocks;
     235                 :      402010 :         int                     flags;
     236                 :      402010 :         int                     forwarded;
     237                 :      402010 :         int16           io_index;
     238                 :      402010 :         int16           overflow;
     239                 :      402010 :         int16           buffer_index;
     240                 :      402010 :         int                     buffer_limit;
     241                 :             : 
     242                 :             :         /* This should only be called with a pending read. */
     243         [ +  - ]:      402010 :         Assert(stream->pending_read_nblocks > 0);
     244         [ +  - ]:      402010 :         Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     245                 :             : 
     246                 :             :         /* We had better not exceed the per-stream buffer limit with this read. */
     247         [ +  - ]:      402010 :         Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     248                 :             :                    stream->max_pinned_buffers);
     249                 :             : 
     250                 :             : #ifdef USE_ASSERT_CHECKING
     251                 :             :         /* We had better not be overwriting an existing pinned buffer. */
     252         [ +  + ]:      402010 :         if (stream->pinned_buffers > 0)
     253         [ +  - ]:         375 :                 Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     254                 :             :         else
     255         [ +  - ]:      401635 :                 Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     256                 :             : 
     257                 :             :         /*
     258                 :             :          * Pinned buffers forwarded by a preceding StartReadBuffers() call that
     259                 :             :          * had to split the operation should match the leading blocks of this
     260                 :             :          * following StartReadBuffers() call.
     261                 :             :          */
     262         [ +  - ]:      402010 :         Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     263         [ +  + ]:      402021 :         for (int i = 0; i < stream->forwarded_buffers; ++i)
     264         [ +  - ]:          11 :                 Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
     265                 :             :                            stream->pending_read_blocknum + i);
     266                 :             : 
     267                 :             :         /*
     268                 :             :          * Check that we've cleared the queue/overflow entries corresponding to
     269                 :             :          * the rest of the blocks covered by this read, unless it's the first go
     270                 :             :          * around and we haven't even initialized them yet.
     271                 :             :          */
     272         [ +  + ]:      817300 :         for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
     273   [ +  +  -  + ]:      415290 :                 Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
     274                 :             :                            stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
     275                 :             : #endif
     276                 :             : 
     277                 :             :         /* Do we need to issue read-ahead advice? */
     278                 :      402010 :         flags = stream->read_buffers_flags;
     279         [ +  - ]:      402010 :         if (stream->advice_enabled)
     280                 :             :         {
     281         [ #  # ]:           0 :                 if (stream->pending_read_blocknum == stream->seq_blocknum)
     282                 :             :                 {
     283                 :             :                         /*
     284                 :             :                          * Sequential:  Issue advice until the preadv() calls have caught
     285                 :             :                          * up with the first advice issued for this sequential region, and
     286                 :             :                          * then stay out of the way of the kernel's own read-ahead.
     287                 :             :                          */
     288         [ #  # ]:           0 :                         if (stream->seq_until_processed != InvalidBlockNumber)
     289                 :           0 :                                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     290                 :           0 :                 }
     291                 :             :                 else
     292                 :             :                 {
     293                 :             :                         /*
     294                 :             :                          * Random jump:  Note the starting location of a new potential
     295                 :             :                          * sequential region and start issuing advice.  Skip it this time
     296                 :             :                          * if the preadv() follows immediately, eg first block in stream.
     297                 :             :                          */
     298                 :           0 :                         stream->seq_until_processed = stream->pending_read_blocknum;
     299         [ #  # ]:           0 :                         if (stream->pinned_buffers > 0)
     300                 :           0 :                                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     301                 :             :                 }
     302                 :           0 :         }
     303                 :             : 
     304                 :             :         /*
     305                 :             :          * How many more buffers is this backend allowed?
     306                 :             :          *
     307                 :             :          * Forwarded buffers are already pinned and map to the leading blocks of
     308                 :             :          * the pending read (the remaining portion of an earlier short read that
     309                 :             :          * we're about to continue).  They are not counted in pinned_buffers, but
     310                 :             :          * they are counted as pins already held by this backend according to the
     311                 :             :          * buffer manager, so they must be added to the limit it grants us.
     312                 :             :          */
     313         [ +  + ]:      402010 :         if (stream->temporary)
     314         [ +  - ]:        3863 :                 buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     315                 :             :         else
     316         [ +  - ]:      398147 :                 buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     317         [ +  - ]:      402010 :         Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     318                 :             : 
     319                 :      402010 :         buffer_limit += stream->forwarded_buffers;
     320         [ +  - ]:      402010 :         buffer_limit = Min(buffer_limit, PG_INT16_MAX);
     321                 :             : 
     322   [ +  +  +  - ]:      402010 :         if (buffer_limit == 0 && stream->pinned_buffers == 0)
     323                 :           4 :                 buffer_limit = 1;               /* guarantee progress */
     324                 :             : 
     325                 :             :         /* Does the per-backend limit affect this read? */
     326                 :      402010 :         nblocks = stream->pending_read_nblocks;
     327         [ +  + ]:      402010 :         if (buffer_limit < nblocks)
     328                 :             :         {
     329                 :         207 :                 int16           new_distance;
     330                 :             : 
     331                 :             :                 /* Shrink distance: no more look-ahead until buffers are released. */
     332                 :         207 :                 new_distance = stream->pinned_buffers + buffer_limit;
     333         [ +  + ]:         207 :                 if (stream->distance > new_distance)
     334                 :         145 :                         stream->distance = new_distance;
     335                 :             : 
     336                 :             :                 /* Unless we have nothing to give the consumer, stop here. */
     337         [ +  + ]:         207 :                 if (stream->pinned_buffers > 0)
     338                 :         107 :                         return false;
     339                 :             : 
     340                 :             :                 /* A short read is required to make progress. */
     341                 :         100 :                 nblocks = buffer_limit;
     342         [ +  + ]:         207 :         }
     343                 :             : 
     344                 :             :         /*
     345                 :             :          * We say how many blocks we want to read, but it may be smaller on return
     346                 :             :          * if the buffer manager decides to shorten the read.  Initialize buffers
     347                 :             :          * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     348                 :             :          * and keep the original nblocks number so we can check for forwarded
     349                 :             :          * buffers as output, below.
     350                 :             :          */
     351                 :      401903 :         buffer_index = stream->next_buffer_index;
     352                 :      401903 :         io_index = stream->next_io_index;
     353         [ +  + ]:      793219 :         while (stream->initialized_buffers < buffer_index + nblocks)
     354                 :      391316 :                 stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     355                 :      401903 :         requested_nblocks = nblocks;
     356                 :      803806 :         need_wait = StartReadBuffers(&stream->ios[io_index].op,
     357                 :      401903 :                                                                  &stream->buffers[buffer_index],
     358                 :      401903 :                                                                  stream->pending_read_blocknum,
     359                 :             :                                                                  &nblocks,
     360                 :      401903 :                                                                  flags);
     361                 :      401903 :         stream->pinned_buffers += nblocks;
     362                 :             : 
     363                 :             :         /* Remember whether we need to wait before returning this buffer. */
     364         [ +  + ]:      401903 :         if (!need_wait)
     365                 :             :         {
     366                 :             :                 /* Look-ahead distance decays, no I/O necessary. */
     367         [ +  + ]:      400827 :                 if (stream->distance > 1)
     368                 :         567 :                         stream->distance--;
     369                 :      400827 :         }
     370                 :             :         else
     371                 :             :         {
     372                 :             :                 /*
     373                 :             :                  * Remember to call WaitReadBuffers() before returning head buffer.
     374                 :             :                  * Look-ahead distance will be adjusted after waiting.
     375                 :             :                  */
     376                 :        1076 :                 stream->ios[io_index].buffer_index = buffer_index;
     377         [ +  + ]:        1076 :                 if (++stream->next_io_index == stream->max_ios)
     378                 :          15 :                         stream->next_io_index = 0;
     379         [ +  - ]:        1076 :                 Assert(stream->ios_in_progress < stream->max_ios);
     380                 :        1076 :                 stream->ios_in_progress++;
     381                 :        1076 :                 stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     382                 :             :         }
     383                 :             : 
     384                 :             :         /*
     385                 :             :          * How many pins were acquired but forwarded to the next call?  These need
     386                 :             :          * to be passed to the next StartReadBuffers() call by leaving them
     387                 :             :          * exactly where they are in the queue, or released if the stream ends
     388                 :             :          * early.  We need the number for accounting purposes, since they are not
     389                 :             :          * counted in stream->pinned_buffers but we already hold them.
     390                 :             :          */
     391                 :      401903 :         forwarded = 0;
     392   [ +  +  +  + ]:      403150 :         while (nblocks + forwarded < requested_nblocks &&
     393                 :        1236 :                    stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     394                 :          11 :                 forwarded++;
     395                 :      401903 :         stream->forwarded_buffers = forwarded;
     396                 :             : 
     397                 :             :         /*
     398                 :             :          * We gave a contiguous range of buffer space to StartReadBuffers(), but
     399                 :             :          * we want it to wrap around at queue_size.  Copy overflowing buffers to
     400                 :             :          * the front of the array where they'll be consumed, but also leave a copy
     401                 :             :          * in the overflow zone which the I/O operation has a pointer to (it needs
     402                 :             :          * a contiguous array).  Both copies will be cleared when the buffers are
     403                 :             :          * handed to the consumer.
     404                 :             :          */
     405                 :      401903 :         overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     406         [ +  + ]:      401903 :         if (overflow > 0)
     407                 :             :         {
     408         [ +  - ]:          17 :                 Assert(overflow < stream->queue_size);    /* can't overlap */
     409                 :          17 :                 memcpy(&stream->buffers[0],
     410                 :             :                            &stream->buffers[stream->queue_size],
     411                 :             :                            sizeof(stream->buffers[0]) * overflow);
     412                 :          17 :         }
     413                 :             : 
     414                 :             :         /* Compute location of start of next read, without using % operator. */
     415                 :      401903 :         buffer_index += nblocks;
     416         [ +  + ]:      401903 :         if (buffer_index >= stream->queue_size)
     417                 :         356 :                 buffer_index -= stream->queue_size;
     418         [ +  - ]:      401903 :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     419                 :      401903 :         stream->next_buffer_index = buffer_index;
     420                 :             : 
     421                 :             :         /* Adjust the pending read to cover the remaining portion, if any. */
     422                 :      401903 :         stream->pending_read_blocknum += nblocks;
     423                 :      401903 :         stream->pending_read_nblocks -= nblocks;
     424                 :             : 
     425                 :      401903 :         return true;
     426                 :      402010 : }
     427                 :             : 
     428                 :             : static void
     429                 :      745506 : read_stream_look_ahead(ReadStream *stream)
     430                 :             : {
     431                 :             :         /*
     432                 :             :          * Allow amortizing the cost of submitting IO over multiple IOs. This
     433                 :             :          * requires that we don't do any operations that could lead to a deadlock
     434                 :             :          * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     435                 :             :          * careful.
     436                 :             :          */
     437         [ +  + ]:      745506 :         if (stream->batch_mode)
     438                 :      735174 :                 pgaio_enter_batchmode();
     439                 :             : 
     440   [ +  +  +  + ]:     2758917 :         while (stream->ios_in_progress < stream->max_ios &&
     441                 :     1151615 :                    stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     442                 :             :         {
     443                 :      742904 :                 BlockNumber blocknum;
     444                 :      742904 :                 int16           buffer_index;
     445                 :      742904 :                 void       *per_buffer_data;
     446                 :             : 
     447         [ +  + ]:      742904 :                 if (stream->pending_read_nblocks == stream->io_combine_limit)
     448                 :             :                 {
     449                 :         108 :                         read_stream_start_pending_read(stream);
     450                 :         108 :                         continue;
     451                 :             :                 }
     452                 :             : 
     453                 :             :                 /*
     454                 :             :                  * See which block the callback wants next in the stream.  We need to
     455                 :             :                  * compute the index of the Nth block of the pending read including
     456                 :             :                  * wrap-around, but we don't want to use the expensive % operator.
     457                 :             :                  */
     458                 :      742796 :                 buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     459         [ +  + ]:      742796 :                 if (buffer_index >= stream->queue_size)
     460                 :         112 :                         buffer_index -= stream->queue_size;
     461         [ +  - ]:      742796 :                 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     462                 :      742796 :                 per_buffer_data = get_per_buffer_data(stream, buffer_index);
     463                 :      742796 :                 blocknum = read_stream_get_block(stream, per_buffer_data);
     464         [ +  + ]:      742796 :                 if (blocknum == InvalidBlockNumber)
     465                 :             :                 {
     466                 :             :                         /* End of stream. */
     467                 :      336795 :                         stream->distance = 0;
     468                 :      336795 :                         break;
     469                 :             :                 }
     470                 :             : 
     471                 :             :                 /* Can we merge it with the pending read? */
     472   [ +  +  -  + ]:      406001 :                 if (stream->pending_read_nblocks > 0 &&
     473                 :        5426 :                         stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     474                 :             :                 {
     475                 :        5426 :                         stream->pending_read_nblocks++;
     476                 :        5426 :                         continue;
     477                 :             :                 }
     478                 :             : 
     479                 :             :                 /* We have to start the pending read before we can build another. */
     480         [ -  + ]:      400575 :                 while (stream->pending_read_nblocks > 0)
     481                 :             :                 {
     482   [ #  #  #  # ]:           0 :                         if (!read_stream_start_pending_read(stream) ||
     483                 :           0 :                                 stream->ios_in_progress == stream->max_ios)
     484                 :             :                         {
     485                 :             :                                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     486                 :           0 :                                 read_stream_unget_block(stream, blocknum);
     487         [ #  # ]:           0 :                                 if (stream->batch_mode)
     488                 :           0 :                                         pgaio_exit_batchmode();
     489                 :           0 :                                 return;
     490                 :             :                         }
     491                 :             :                 }
     492                 :             : 
     493                 :             :                 /* This is the start of a new pending read. */
     494                 :      400575 :                 stream->pending_read_blocknum = blocknum;
     495                 :      400575 :                 stream->pending_read_nblocks = 1;
     496   [ -  +  +  + ]:      742904 :         }
     497                 :             : 
     498                 :             :         /*
     499                 :             :          * We don't start the pending read just because we've hit the distance
     500                 :             :          * limit, preferring to give it another chance to grow to full
     501                 :             :          * io_combine_limit size once more buffers have been consumed.  However,
     502                 :             :          * if we've already reached io_combine_limit, or we've reached the
     503                 :             :          * distance limit and there isn't anything pinned yet, or the callback has
     504                 :             :          * signaled end-of-stream, we start the read immediately.  Note that the
     505                 :             :          * pending read can exceed the distance goal, if the latter was reduced
     506                 :             :          * after hitting the per-backend buffer limit.
     507                 :             :          */
     508         [ +  + ]:      864398 :         if (stream->pending_read_nblocks > 0 &&
     509         [ +  + ]:      403314 :                 (stream->pending_read_nblocks == stream->io_combine_limit ||
     510         [ +  + ]:      403049 :                  (stream->pending_read_nblocks >= stream->distance &&
     511                 :      401637 :                   stream->pinned_buffers == 0) ||
     512         [ +  + ]:      403314 :                  stream->distance == 0) &&
     513                 :         265 :                 stream->ios_in_progress < stream->max_ios)
     514                 :      401902 :                 read_stream_start_pending_read(stream);
     515                 :             : 
     516                 :             :         /*
     517                 :             :          * There should always be something pinned when we leave this function,
     518                 :             :          * whether started by this call or not, unless we've hit the end of the
     519                 :             :          * stream.  In the worst case we can always make progress one buffer at a
     520                 :             :          * time.
     521                 :             :          */
     522   [ +  +  +  - ]:     1667672 :         Assert(stream->pinned_buffers > 0 || stream->distance == 0);
     523                 :             : 
     524         [ +  + ]:      745504 :         if (stream->batch_mode)
     525                 :      735172 :                 pgaio_exit_batchmode();
     526                 :      864396 : }
     527                 :             : 
     528                 :             : /*
     529                 :             :  * Create a new read stream object that can be used to perform the equivalent
     530                 :             :  * of a series of ReadBuffer() calls for one fork of one relation.
     531                 :             :  * Internally, it generates larger vectored reads where possible by looking
     532                 :             :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     533                 :             :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     534                 :             :  * write extra data for each block into the space provided to it.  It will
     535                 :             :  * also receive callback_private_data for its own purposes.
     536                 :             :  */
     537                 :             : static ReadStream *
     538                 :      991283 : read_stream_begin_impl(int flags,
     539                 :             :                                            BufferAccessStrategy strategy,
     540                 :             :                                            Relation rel,
     541                 :             :                                            SMgrRelation smgr,
     542                 :             :                                            char persistence,
     543                 :             :                                            ForkNumber forknum,
     544                 :             :                                            ReadStreamBlockNumberCB callback,
     545                 :             :                                            void *callback_private_data,
     546                 :             :                                            size_t per_buffer_data_size)
     547                 :             : {
     548                 :      991283 :         ReadStream *stream;
     549                 :      991283 :         size_t          size;
     550                 :      991283 :         int16           queue_size;
     551                 :      991283 :         int16           queue_overflow;
     552                 :      991283 :         int                     max_ios;
     553                 :      991283 :         int                     strategy_pin_limit;
     554                 :      991283 :         uint32          max_pinned_buffers;
     555                 :      991283 :         uint32          max_possible_buffer_limit;
     556                 :      991283 :         Oid                     tablespace_id;
     557                 :             : 
     558                 :             :         /*
     559                 :             :          * Decide how many I/Os we will allow to run at the same time.  That
     560                 :             :          * currently means advice to the kernel to tell it that we will soon read.
     561                 :             :          * This number also affects how far we look ahead for opportunities to
     562                 :             :          * start more I/Os.
     563                 :             :          */
     564                 :      991283 :         tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     565         [ +  + ]:      991283 :         if (!OidIsValid(MyDatabaseId) ||
     566   [ +  +  +  + ]:      347523 :                 (rel && IsCatalogRelation(rel)) ||
     567                 :         738 :                 IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     568                 :             :         {
     569                 :             :                 /*
     570                 :             :                  * Avoid circularity while trying to look up tablespace settings or
     571                 :             :                  * before spccache.c is ready.
     572                 :             :                  */
     573                 :     1312464 :                 max_ios = effective_io_concurrency;
     574                 :     1312464 :         }
     575         [ +  + ]:      322657 :         else if (flags & READ_STREAM_MAINTENANCE)
     576                 :        1517 :                 max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     577                 :             :         else
     578                 :      321140 :                 max_ios = get_tablespace_io_concurrency(tablespace_id);
     579                 :             : 
     580                 :             :         /* Cap to INT16_MAX to avoid overflowing below */
     581         [ +  - ]:      347601 :         max_ios = Min(max_ios, PG_INT16_MAX);
     582                 :             : 
     583                 :             :         /*
     584                 :             :          * If starting a multi-block I/O near the end of the queue, we might
     585                 :             :          * temporarily need extra space for overflowing buffers before they are
     586                 :             :          * moved to regular circular position.  This is the maximum extra space we
     587                 :             :          * could need.
     588                 :             :          */
     589                 :      347601 :         queue_overflow = io_combine_limit - 1;
     590                 :             : 
     591                 :             :         /*
     592                 :             :          * Choose the maximum number of buffers we're prepared to pin.  We try to
     593                 :             :          * pin fewer if we can, though.  We add one so that we can make progress
     594                 :             :          * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     595                 :             :          * this also allows an extra full I/O's worth of buffers: after an I/O
     596                 :             :          * finishes we don't want to have to wait for its buffers to be consumed
     597                 :             :          * before starting a new one.
     598                 :             :          *
     599                 :             :          * Be careful not to allow int16 to overflow.  That is possible with the
     600                 :             :          * current GUC range limits, so this is an artificial limit of ~32k
     601                 :             :          * buffers and we'd need to adjust the types to exceed that.  We also have
     602                 :             :          * to allow for the spare entry and the overflow space.
     603                 :             :          */
     604                 :      347601 :         max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     605         [ +  - ]:      347601 :         max_pinned_buffers = Min(max_pinned_buffers,
     606                 :             :                                                          PG_INT16_MAX - queue_overflow - 1);
     607                 :             : 
     608                 :             :         /* Give the strategy a chance to limit the number of buffers we pin. */
     609                 :      347601 :         strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     610         [ +  + ]:      347601 :         max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     611                 :             : 
     612                 :             :         /*
     613                 :             :          * Also limit our queue to the maximum number of pins we could ever be
     614                 :             :          * allowed to acquire according to the buffer manager.  We may not really
     615                 :             :          * be able to use them all due to other pins held by this backend, but
     616                 :             :          * we'll check that later in read_stream_start_pending_read().
     617                 :             :          */
     618         [ +  + ]:      347601 :         if (SmgrIsTemp(smgr))
     619                 :        2197 :                 max_possible_buffer_limit = GetLocalPinLimit();
     620                 :             :         else
     621                 :      345404 :                 max_possible_buffer_limit = GetPinLimit();
     622         [ +  + ]:      347601 :         max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     623                 :             : 
     624                 :             :         /*
     625                 :             :          * The limit might be zero on a system configured with too few buffers for
     626                 :             :          * the number of connections.  We need at least one to make progress.
     627                 :             :          */
     628         [ -  + ]:      347601 :         max_pinned_buffers = Max(1, max_pinned_buffers);
     629                 :             : 
     630                 :             :         /*
     631                 :             :          * We need one extra entry for buffers and per-buffer data, because users
     632                 :             :          * of per-buffer data have access to the object until the next call to
     633                 :             :          * read_stream_next_buffer(), so we need a gap between the head and tail
     634                 :             :          * of the queue so that we don't clobber it.
     635                 :             :          */
     636                 :      347601 :         queue_size = max_pinned_buffers + 1;
     637                 :             : 
     638                 :             :         /*
     639                 :             :          * Allocate the object, the buffers, the ios and per_buffer_data space in
     640                 :             :          * one big chunk.  Though we have queue_size buffers, we want to be able
     641                 :             :          * to assume that all the buffers for a single read are contiguous (i.e.
     642                 :             :          * don't wrap around halfway through), so we allow temporary overflows of
     643                 :             :          * up to the maximum possible overflow size.
     644                 :             :          */
     645                 :      347601 :         size = offsetof(ReadStream, buffers);
     646                 :      347601 :         size += sizeof(Buffer) * (queue_size + queue_overflow);
     647         [ -  + ]:      347601 :         size += sizeof(InProgressIO) * Max(1, max_ios);
     648                 :      347601 :         size += per_buffer_data_size * queue_size;
     649                 :      347601 :         size += MAXIMUM_ALIGNOF * 2;
     650                 :      347601 :         stream = (ReadStream *) palloc(size);
     651                 :      347601 :         memset(stream, 0, offsetof(ReadStream, buffers));
     652                 :      347601 :         stream->ios = (InProgressIO *)
     653                 :      347601 :                 MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     654         [ +  + ]:      347601 :         if (per_buffer_data_size > 0)
     655                 :        2651 :                 stream->per_buffer_data = (void *)
     656         [ -  + ]:        2651 :                         MAXALIGN(&stream->ios[Max(1, max_ios)]);
     657                 :             : 
     658                 :      347601 :         stream->sync_mode = io_method == IOMETHOD_SYNC;
     659                 :      347601 :         stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     660                 :             : 
     661                 :             : #ifdef USE_PREFETCH
     662                 :             : 
     663                 :             :         /*
     664                 :             :          * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     665                 :             :          * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     666                 :             :          * caller hasn't promised sequential access (overriding our detection
     667                 :             :          * heuristics), and max_ios hasn't been set to zero.
     668                 :             :          */
     669         [ -  + ]:      347601 :         if (stream->sync_mode &&
     670         [ #  # ]:           0 :                 (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     671   [ #  #  #  # ]:           0 :                 (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     672                 :           0 :                 max_ios > 0)
     673                 :           0 :                 stream->advice_enabled = true;
     674                 :             : #endif
     675                 :             : 
     676                 :             :         /*
     677                 :             :          * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     678                 :             :          * we still need to allocate space to combine and run one I/O.  Bump it up
     679                 :             :          * to one, and remember to ask for synchronous I/O only.
     680                 :             :          */
     681         [ +  - ]:      347601 :         if (max_ios == 0)
     682                 :             :         {
     683                 :           0 :                 max_ios = 1;
     684                 :           0 :                 stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     685                 :           0 :         }
     686                 :             : 
     687                 :             :         /*
     688                 :             :          * Capture stable values for these two GUC-derived numbers for the
     689                 :             :          * lifetime of this stream, so we don't have to worry about the GUCs
     690                 :             :          * changing underneath us beyond this point.
     691                 :             :          */
     692                 :      347601 :         stream->max_ios = max_ios;
     693                 :      347601 :         stream->io_combine_limit = io_combine_limit;
     694                 :             : 
     695                 :      347601 :         stream->per_buffer_data_size = per_buffer_data_size;
     696                 :      347601 :         stream->max_pinned_buffers = max_pinned_buffers;
     697                 :      347601 :         stream->queue_size = queue_size;
     698                 :      347601 :         stream->callback = callback;
     699                 :      347601 :         stream->callback_private_data = callback_private_data;
     700                 :      347601 :         stream->buffered_blocknum = InvalidBlockNumber;
     701                 :      347601 :         stream->seq_blocknum = InvalidBlockNumber;
     702                 :      347601 :         stream->seq_until_processed = InvalidBlockNumber;
     703                 :      347601 :         stream->temporary = SmgrIsTemp(smgr);
     704                 :             : 
     705                 :             :         /*
     706                 :             :          * Skip the initial ramp-up phase if the caller says we're going to be
     707                 :             :          * reading the whole relation.  This way we start out assuming we'll be
     708                 :             :          * doing full io_combine_limit sized reads.
     709                 :             :          */
     710         [ +  + ]:      347601 :         if (flags & READ_STREAM_FULL)
     711         [ -  + ]:         946 :                 stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
     712                 :             :         else
     713                 :      346655 :                 stream->distance = 1;
     714                 :             : 
     715                 :             :         /*
     716                 :             :          * Since we always access the same relation, we can initialize parts of
     717                 :             :          * the ReadBuffersOperation objects and leave them that way, to avoid
     718                 :             :          * wasting CPU cycles writing to them for each read.
     719                 :             :          */
     720         [ +  + ]:     5916391 :         for (int i = 0; i < max_ios; ++i)
     721                 :             :         {
     722                 :     5568790 :                 stream->ios[i].op.rel = rel;
     723                 :     5568790 :                 stream->ios[i].op.smgr = smgr;
     724                 :     5568790 :                 stream->ios[i].op.persistence = persistence;
     725                 :     5568790 :                 stream->ios[i].op.forknum = forknum;
     726                 :     5568790 :                 stream->ios[i].op.strategy = strategy;
     727                 :     5568790 :         }
     728                 :             : 
     729                 :      695202 :         return stream;
     730                 :      347601 : }
     731                 :             : 
     732                 :             : /*
     733                 :             :  * Create a new read stream for reading a relation.
     734                 :             :  * See read_stream_begin_impl() for the detailed explanation.
     735                 :             :  */
     736                 :             : ReadStream *
     737                 :      346863 : read_stream_begin_relation(int flags,
     738                 :             :                                                    BufferAccessStrategy strategy,
     739                 :             :                                                    Relation rel,
     740                 :             :                                                    ForkNumber forknum,
     741                 :             :                                                    ReadStreamBlockNumberCB callback,
     742                 :             :                                                    void *callback_private_data,
     743                 :             :                                                    size_t per_buffer_data_size)
     744                 :             : {
     745                 :      693726 :         return read_stream_begin_impl(flags,
     746                 :      346863 :                                                                   strategy,
     747                 :      346863 :                                                                   rel,
     748                 :      346863 :                                                                   RelationGetSmgr(rel),
     749                 :      346863 :                                                                   rel->rd_rel->relpersistence,
     750                 :      346863 :                                                                   forknum,
     751                 :      346863 :                                                                   callback,
     752                 :      346863 :                                                                   callback_private_data,
     753                 :      346863 :                                                                   per_buffer_data_size);
     754                 :             : }
     755                 :             : 
     756                 :             : /*
     757                 :             :  * Create a new read stream for reading a SMgr relation.
     758                 :             :  * See read_stream_begin_impl() for the detailed explanation.
     759                 :             :  */
     760                 :             : ReadStream *
     761                 :         738 : read_stream_begin_smgr_relation(int flags,
     762                 :             :                                                                 BufferAccessStrategy strategy,
     763                 :             :                                                                 SMgrRelation smgr,
     764                 :             :                                                                 char smgr_persistence,
     765                 :             :                                                                 ForkNumber forknum,
     766                 :             :                                                                 ReadStreamBlockNumberCB callback,
     767                 :             :                                                                 void *callback_private_data,
     768                 :             :                                                                 size_t per_buffer_data_size)
     769                 :             : {
     770                 :        1476 :         return read_stream_begin_impl(flags,
     771                 :         738 :                                                                   strategy,
     772                 :             :                                                                   NULL,
     773                 :         738 :                                                                   smgr,
     774                 :         738 :                                                                   smgr_persistence,
     775                 :         738 :                                                                   forknum,
     776                 :         738 :                                                                   callback,
     777                 :         738 :                                                                   callback_private_data,
     778                 :         738 :                                                                   per_buffer_data_size);
     779                 :             : }
     780                 :             : 
     781                 :             : /*
     782                 :             :  * Pull one pinned buffer out of a stream.  Each call returns successive
     783                 :             :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     784                 :             :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     785                 :             :  * per-buffer data that the callback had a chance to populate, which remains
     786                 :             :  * valid until the next call to read_stream_next_buffer().  When the stream
     787                 :             :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     788                 :             :  * the stream early at any time by calling read_stream_end().
     789                 :             :  */
     790                 :             : Buffer
     791                 :     1281348 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     792                 :             : {
     793                 :     1281348 :         Buffer          buffer;
     794                 :     1281348 :         int16           oldest_buffer_index;
     795                 :             : 
     796                 :             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     797                 :             : 
     798                 :             :         /*
     799                 :             :          * A fast path for all-cached scans.  This is the same as the usual
     800                 :             :          * algorithm, but it is specialized for no I/O and no per-buffer data, so
     801                 :             :          * we can skip the queue management code, stay in the same buffer slot and
     802                 :             :          * use singular StartReadBuffer().
     803                 :             :          */
     804         [ +  + ]:     1281348 :         if (likely(stream->fast_path))
     805                 :             :         {
     806                 :      494377 :                 BlockNumber next_blocknum;
     807                 :             : 
     808                 :             :                 /* Fast path assumptions. */
     809         [ +  - ]:      494377 :                 Assert(stream->ios_in_progress == 0);
     810         [ +  - ]:      494377 :                 Assert(stream->forwarded_buffers == 0);
     811         [ +  - ]:      494377 :                 Assert(stream->pinned_buffers == 1);
     812         [ +  - ]:      494377 :                 Assert(stream->distance == 1);
     813         [ +  - ]:      494377 :                 Assert(stream->pending_read_nblocks == 0);
     814         [ +  - ]:      494377 :                 Assert(stream->per_buffer_data_size == 0);
     815         [ +  - ]:      494377 :                 Assert(stream->initialized_buffers > stream->oldest_buffer_index);
     816                 :             : 
     817                 :             :                 /* We're going to return the buffer we pinned last time. */
     818                 :      494377 :                 oldest_buffer_index = stream->oldest_buffer_index;
     819         [ +  - ]:      494377 :                 Assert((oldest_buffer_index + 1) % stream->queue_size ==
     820                 :             :                            stream->next_buffer_index);
     821                 :      494377 :                 buffer = stream->buffers[oldest_buffer_index];
     822         [ +  - ]:      494377 :                 Assert(buffer != InvalidBuffer);
     823                 :             : 
     824                 :             :                 /* Choose the next block to pin. */
     825                 :      494377 :                 next_blocknum = read_stream_get_block(stream, NULL);
     826                 :             : 
     827         [ +  + ]:      494377 :                 if (likely(next_blocknum != InvalidBlockNumber))
     828                 :             :                 {
     829                 :      484095 :                         int                     flags = stream->read_buffers_flags;
     830                 :             : 
     831         [ +  - ]:      484095 :                         if (stream->advice_enabled)
     832                 :           0 :                                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     833                 :             : 
     834                 :             :                         /*
     835                 :             :                          * Pin a buffer for the next call.  Same buffer entry, and
     836                 :             :                          * arbitrary I/O entry (they're all free).  We don't have to
     837                 :             :                          * adjust pinned_buffers because we're transferring one to caller
     838                 :             :                          * but pinning one more.
     839                 :             :                          *
     840                 :             :                          * In the fast path we don't need to check the pin limit.  We're
     841                 :             :                          * always allowed at least one pin so that progress can be made,
     842                 :             :                          * and that's all we need here.  Although two pins are momentarily
     843                 :             :                          * held at the same time, the model used here is that the stream
     844                 :             :                          * holds only one, and the other now belongs to the caller.
     845                 :             :                          */
     846         [ +  + ]:      484095 :                         if (likely(!StartReadBuffer(&stream->ios[0].op,
     847                 :             :                                                                                 &stream->buffers[oldest_buffer_index],
     848                 :             :                                                                                 next_blocknum,
     849                 :             :                                                                                 flags)))
     850                 :             :                         {
     851                 :             :                                 /* Fast return. */
     852                 :      484085 :                                 return buffer;
     853                 :             :                         }
     854                 :             : 
     855                 :             :                         /* Next call must wait for I/O for the newly pinned buffer. */
     856                 :          10 :                         stream->oldest_io_index = 0;
     857                 :          10 :                         stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     858                 :          10 :                         stream->ios_in_progress = 1;
     859                 :          10 :                         stream->ios[0].buffer_index = oldest_buffer_index;
     860                 :          10 :                         stream->seq_blocknum = next_blocknum + 1;
     861         [ +  + ]:      484095 :                 }
     862                 :             :                 else
     863                 :             :                 {
     864                 :             :                         /* No more blocks, end of stream. */
     865                 :       10282 :                         stream->distance = 0;
     866                 :       10282 :                         stream->oldest_buffer_index = stream->next_buffer_index;
     867                 :       10282 :                         stream->pinned_buffers = 0;
     868                 :       10282 :                         stream->buffers[oldest_buffer_index] = InvalidBuffer;
     869                 :             :                 }
     870                 :             : 
     871                 :       10292 :                 stream->fast_path = false;
     872                 :       10292 :                 return buffer;
     873                 :      494377 :         }
     874                 :             : #endif
     875                 :             : 
     876         [ +  + ]:      786971 :         if (unlikely(stream->pinned_buffers == 0))
     877                 :             :         {
     878         [ +  - ]:      727322 :                 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     879                 :             : 
     880                 :             :                 /* End of stream reached?  */
     881         [ +  + ]:      727322 :                 if (stream->distance == 0)
     882                 :      377238 :                         return InvalidBuffer;
     883                 :             : 
     884                 :             :                 /*
     885                 :             :                  * The usual order of operations is that we look ahead at the bottom
     886                 :             :                  * of this function after potentially finishing an I/O and making
     887                 :             :                  * space for more, but if we're just starting up we'll need to crank
     888                 :             :                  * the handle to get started.
     889                 :             :                  */
     890                 :      350084 :                 read_stream_look_ahead(stream);
     891                 :             : 
     892                 :             :                 /* End of stream reached? */
     893         [ +  + ]:      350084 :                 if (stream->pinned_buffers == 0)
     894                 :             :                 {
     895         [ +  - ]:       14313 :                         Assert(stream->distance == 0);
     896                 :       14313 :                         return InvalidBuffer;
     897                 :             :                 }
     898                 :      335771 :         }
     899                 :             : 
     900                 :             :         /* Grab the oldest pinned buffer and associated per-buffer data. */
     901         [ +  - ]:      395420 :         Assert(stream->pinned_buffers > 0);
     902                 :      395420 :         oldest_buffer_index = stream->oldest_buffer_index;
     903         [ +  - ]:      395420 :         Assert(oldest_buffer_index >= 0 &&
     904                 :             :                    oldest_buffer_index < stream->queue_size);
     905                 :      395420 :         buffer = stream->buffers[oldest_buffer_index];
     906         [ +  + ]:      395420 :         if (per_buffer_data)
     907                 :       53121 :                 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     908                 :             : 
     909         [ +  - ]:      395420 :         Assert(BufferIsValid(buffer));
     910                 :             : 
     911                 :             :         /* Do we have to wait for an associated I/O first? */
     912   [ +  +  +  + ]:      395420 :         if (stream->ios_in_progress > 0 &&
     913                 :        3911 :                 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     914                 :             :         {
     915                 :         988 :                 int16           io_index = stream->oldest_io_index;
     916                 :         988 :                 int32           distance;       /* wider temporary value, clamped below */
     917                 :             : 
     918                 :             :                 /* Sanity check that we still agree on the buffers. */
     919         [ +  - ]:         988 :                 Assert(stream->ios[io_index].op.buffers ==
     920                 :             :                            &stream->buffers[oldest_buffer_index]);
     921                 :             : 
     922                 :         988 :                 WaitReadBuffers(&stream->ios[io_index].op);
     923                 :             : 
     924         [ +  - ]:         988 :                 Assert(stream->ios_in_progress > 0);
     925                 :         988 :                 stream->ios_in_progress--;
     926         [ +  + ]:         988 :                 if (++stream->oldest_io_index == stream->max_ios)
     927                 :          15 :                         stream->oldest_io_index = 0;
     928                 :             : 
     929                 :             :                 /* Look-ahead distance ramps up rapidly after we do I/O. */
     930                 :         988 :                 distance = stream->distance * 2;
     931         [ +  + ]:         988 :                 distance = Min(distance, stream->max_pinned_buffers);
     932                 :         988 :                 stream->distance = distance;
     933                 :             : 
     934                 :             :                 /*
     935                 :             :                  * If we've reached the first block of a sequential region we're
     936                 :             :                  * issuing advice for, cancel that until the next jump.  The kernel
     937                 :             :                  * will see the sequential preadv() pattern starting here.
     938                 :             :                  */
     939   [ -  +  #  # ]:         988 :                 if (stream->advice_enabled &&
     940                 :           0 :                         stream->ios[io_index].op.blocknum == stream->seq_until_processed)
     941                 :           0 :                         stream->seq_until_processed = InvalidBlockNumber;
     942                 :         988 :         }
     943                 :             : 
     944                 :             :         /*
     945                 :             :          * We must zap this queue entry, or else it would appear as a forwarded
     946                 :             :          * buffer.  If it's potentially in the overflow zone (ie from a
     947                 :             :          * multi-block I/O that wrapped around the queue), also zap the copy.
     948                 :             :          */
     949                 :      395420 :         stream->buffers[oldest_buffer_index] = InvalidBuffer;
     950         [ +  + ]:      395420 :         if (oldest_buffer_index < stream->io_combine_limit - 1)
     951                 :      352358 :                 stream->buffers[stream->queue_size + oldest_buffer_index] =
     952                 :             :                         InvalidBuffer;
     953                 :             : 
     954                 :             : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
     955                 :             : 
     956                 :             :         /*
     957                 :             :          * The caller will get access to the per-buffer data, until the next call.
     958                 :             :          * We wipe the one before, which is never occupied because queue_size
     959                 :             :          * allowed one extra element.  This will hopefully trip up client code
     960                 :             :          * that is holding a dangling pointer to it.
     961                 :             :          */
     962         [ +  + ]:      395420 :         if (stream->per_buffer_data)
     963                 :             :         {
     964                 :       53122 :                 void       *per_buffer_data;
     965                 :             : 
     966                 :      106244 :                 per_buffer_data = get_per_buffer_data(stream,
     967         [ +  + ]:       53122 :                                                                                           oldest_buffer_index == 0 ?
     968                 :        1956 :                                                                                           stream->queue_size - 1 :
     969                 :       51166 :                                                                                           oldest_buffer_index - 1);
     970                 :             : 
     971                 :             : #if defined(CLOBBER_FREED_MEMORY)
     972                 :             :                 /* This also tells Valgrind the memory is "noaccess". */
     973                 :       53122 :                 wipe_mem(per_buffer_data, stream->per_buffer_data_size);
     974                 :             : #elif defined(USE_VALGRIND)
     975                 :             :                 /* Tell it ourselves. */
     976                 :             :                 VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
     977                 :             :                                                                    stream->per_buffer_data_size);
     978                 :             : #endif
     979                 :       53122 :         }
     980                 :             : #endif
     981                 :             : 
     982                 :             :         /* Pin transferred to caller. */
     983         [ +  - ]:      395420 :         Assert(stream->pinned_buffers > 0);
     984                 :      395420 :         stream->pinned_buffers--;
     985                 :             : 
     986                 :             :         /* Advance oldest buffer, with wrap-around. */
     987                 :      395420 :         stream->oldest_buffer_index++;
     988         [ +  + ]:      395420 :         if (stream->oldest_buffer_index == stream->queue_size)
     989                 :         356 :                 stream->oldest_buffer_index = 0;
     990                 :             : 
     991                 :             :         /* Prepare for the next call. */
     992                 :      395420 :         read_stream_look_ahead(stream);
     993                 :             : 
     994                 :             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     995                 :             :         /* See if we can take the fast path for all-cached scans next time. */
     996         [ +  + ]:      395420 :         if (stream->ios_in_progress == 0 &&
     997         [ +  + ]:      392000 :                 stream->forwarded_buffers == 0 &&
     998         [ +  + ]:      391996 :                 stream->pinned_buffers == 1 &&
     999         [ +  + ]:       65997 :                 stream->distance == 1 &&
    1000   [ +  +  +  + ]:       64489 :                 stream->pending_read_nblocks == 0 &&
    1001                 :       64448 :                 stream->per_buffer_data_size == 0)
    1002                 :             :         {
    1003                 :             :                 /*
    1004                 :             :                  * The fast path spins on one buffer entry repeatedly instead of
    1005                 :             :                  * rotating through the whole queue and clearing the entries behind
    1006                 :             :                  * it.  If the buffer it starts with happened to be forwarded between
    1007                 :             :                  * StartReadBuffers() calls and also wrapped around the circular queue
    1008                 :             :                  * partway through, then a copy also exists in the overflow zone, and
    1009                 :             :                  * it won't clear it out as the regular path would.  Do that now, so
    1010                 :             :                  * it doesn't need code for that.
    1011                 :             :                  */
    1012         [ +  + ]:       13188 :                 if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
    1013                 :       12989 :                         stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
    1014                 :             :                                 InvalidBuffer;
    1015                 :             : 
    1016                 :       13188 :                 stream->fast_path = true;
    1017                 :       13188 :         }
    1018                 :             : #endif
    1019                 :             : 
    1020                 :      395420 :         return buffer;
    1021                 :     1281348 : }
    1022                 :             : 
    1023                 :             : /*
    1024                 :             :  * Transitional support for code that would like to perform or skip reads
    1025                 :             :  * itself, without using the stream.  Returns, and consumes, the next block
    1026                 :             :  * number that would be read by the stream's look-ahead algorithm, or
    1027                 :             :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
    1028                 :             :  * strategy that would be used to read it.
    1029                 :             :  */
    1030                 :             : BlockNumber
    1031                 :           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
    1032                 :             : {
    1033                 :           0 :         *strategy = stream->ios[0].op.strategy;
    1034                 :           0 :         return read_stream_get_block(stream, NULL);
    1035                 :             : }
    1036                 :             : 
    1037                 :             : /*
    1038                 :             :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1039                 :             :  * to be used again for different blocks.  This can be used to clear an
    1040                 :             :  * end-of-stream condition and start again, or to throw away blocks that were
    1041                 :             :  * speculatively read and read some different blocks instead.
    1042                 :             :  */
    1043                 :             : void
    1044                 :      349771 : read_stream_reset(ReadStream *stream)
    1045                 :             : {
    1046                 :      349771 :         int16           index;
    1047                 :      349771 :         Buffer          buffer;
    1048                 :             : 
    1049                 :             :         /* Stop looking ahead. */
    1050                 :      349771 :         stream->distance = 0;
    1051                 :             : 
    1052                 :             :         /* Forget buffered block number and fast path state. */
    1053                 :      349771 :         stream->buffered_blocknum = InvalidBlockNumber;
    1054                 :      349771 :         stream->fast_path = false;
    1055                 :             : 
    1056                 :             :         /* Unpin anything that wasn't consumed. */
    1057         [ +  + ]:      352575 :         while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1058                 :        2804 :                 ReleaseBuffer(buffer);
    1059                 :             : 
    1060                 :             :         /* Unpin any unused forwarded buffers. */
    1061                 :      349771 :         index = stream->next_buffer_index;
    1062   [ +  +  -  + ]:      350939 :         while (index < stream->initialized_buffers &&
    1063                 :        1168 :                    (buffer = stream->buffers[index]) != InvalidBuffer)
    1064                 :             :         {
    1065         [ #  # ]:           0 :                 Assert(stream->forwarded_buffers > 0);
    1066                 :           0 :                 stream->forwarded_buffers--;
    1067                 :           0 :                 ReleaseBuffer(buffer);
    1068                 :             : 
    1069                 :           0 :                 stream->buffers[index] = InvalidBuffer;
    1070         [ #  # ]:           0 :                 if (index < stream->io_combine_limit - 1)
    1071                 :           0 :                         stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1072                 :             : 
    1073         [ #  # ]:           0 :                 if (++index == stream->queue_size)
    1074                 :           0 :                         index = 0;
    1075                 :             :         }
    1076                 :             : 
    1077         [ +  - ]:      349771 :         Assert(stream->forwarded_buffers == 0);
    1078         [ +  - ]:      349771 :         Assert(stream->pinned_buffers == 0);
    1079         [ +  - ]:      349771 :         Assert(stream->ios_in_progress == 0);
    1080                 :             : 
    1081                 :             :         /* Start off assuming data is cached. */
    1082                 :      349771 :         stream->distance = 1;
    1083                 :      349771 : }
    1084                 :             : 
    1085                 :             : /*
    1086                 :             :  * Release and free a read stream.
    1087                 :             :  */
    1088                 :             : void
    1089                 :      346891 : read_stream_end(ReadStream *stream)
    1090                 :             : {
    1091                 :      346891 :         read_stream_reset(stream);
    1092                 :      346891 :         pfree(stream);
    1093                 :      346891 : }
        

Generated by: LCOV version 2.3.2-1