LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgdoinsert.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 89.8 % 1103 991
Test Date: 2026-01-26 10:56:24 Functions: 100.0 % 15 15
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 57.9 % 655 379

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * spgdoinsert.c
       4                 :             :  *        implementation of insert algorithm
       5                 :             :  *
       6                 :             :  *
       7                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       9                 :             :  *
      10                 :             :  * IDENTIFICATION
      11                 :             :  *                      src/backend/access/spgist/spgdoinsert.c
      12                 :             :  *
      13                 :             :  *-------------------------------------------------------------------------
      14                 :             :  */
      15                 :             : 
      16                 :             : #include "postgres.h"
      17                 :             : 
      18                 :             : #include "access/genam.h"
      19                 :             : #include "access/spgist_private.h"
      20                 :             : #include "access/spgxlog.h"
      21                 :             : #include "access/xloginsert.h"
      22                 :             : #include "common/int.h"
      23                 :             : #include "common/pg_prng.h"
      24                 :             : #include "miscadmin.h"
      25                 :             : #include "storage/bufmgr.h"
      26                 :             : #include "utils/rel.h"
      27                 :             : 
      28                 :             : 
      29                 :             : /*
      30                 :             :  * SPPageDesc tracks all info about a page we are inserting into.  In some
      31                 :             :  * situations it actually identifies a tuple, or even a specific node within
      32                 :             :  * an inner tuple.  But any of the fields can be invalid.  If the buffer
      33                 :             :  * field is valid, it implies we hold pin and exclusive lock on that buffer.
      34                 :             :  * page pointer should be valid exactly when buffer is.
      35                 :             :  */
      36                 :             : typedef struct SPPageDesc
      37                 :             : {
      38                 :             :         BlockNumber blkno;                      /* block number, or InvalidBlockNumber */
      39                 :             :         Buffer          buffer;                 /* page's buffer number, or InvalidBuffer */
      40                 :             :         Page            page;                   /* pointer to page buffer, or NULL */
      41                 :             :         OffsetNumber offnum;            /* offset of tuple, or InvalidOffsetNumber */
      42                 :             :         int                     node;                   /* node number within inner tuple, or -1 */
      43                 :             : } SPPageDesc;
      44                 :             : 
      45                 :             : 
      46                 :             : /*
      47                 :             :  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
      48                 :             :  * is used to update the parent inner tuple's downlink after a move or
      49                 :             :  * split operation.
      50                 :             :  */
      51                 :             : void
      52                 :        1783 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
      53                 :             :                                   BlockNumber blkno, OffsetNumber offset)
      54                 :             : {
      55                 :        1783 :         int                     i;
      56                 :        1783 :         SpGistNodeTuple node;
      57                 :             : 
      58         [ +  - ]:        9281 :         SGITITERATE(tup, i, node)
      59                 :             :         {
      60         [ +  + ]:        9281 :                 if (i == nodeN)
      61                 :             :                 {
      62                 :        1783 :                         ItemPointerSet(&node->t_tid, blkno, offset);
      63                 :        1783 :                         return;
      64                 :             :                 }
      65                 :        7498 :         }
      66                 :             : 
      67   [ #  #  #  # ]:           0 :         elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
      68                 :             :                  nodeN);
      69         [ -  + ]:        1783 : }
      70                 :             : 
      71                 :             : /*
      72                 :             :  * Form a new inner tuple containing one more node than the given one, with
      73                 :             :  * the specified label datum, inserted at offset "offset" in the node array.
      74                 :             :  * The new tuple's prefix is the same as the old one's.
      75                 :             :  *
      76                 :             :  * Note that the new node initially has an invalid downlink.  We'll find a
      77                 :             :  * page to point it to later.
      78                 :             :  */
      79                 :             : static SpGistInnerTuple
      80                 :         228 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
      81                 :             : {
      82                 :         228 :         SpGistNodeTuple node,
      83                 :             :                            *nodes;
      84                 :         228 :         int                     i;
      85                 :             : 
      86                 :             :         /* if offset is negative, insert at end */
      87         [ +  - ]:         228 :         if (offset < 0)
      88                 :           0 :                 offset = tuple->nNodes;
      89         [ +  - ]:         228 :         else if (offset > tuple->nNodes)
      90   [ #  #  #  # ]:           0 :                 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
      91                 :             : 
      92                 :         228 :         nodes = palloc_array(SpGistNodeTuple, tuple->nNodes + 1);
      93         [ +  + ]:        1313 :         SGITITERATE(tuple, i, node)
      94                 :             :         {
      95         [ +  + ]:        1085 :                 if (i < offset)
      96                 :        1067 :                         nodes[i] = node;
      97                 :             :                 else
      98                 :          18 :                         nodes[i + 1] = node;
      99                 :        1085 :         }
     100                 :             : 
     101                 :         228 :         nodes[offset] = spgFormNodeTuple(state, label, false);
     102                 :             : 
     103                 :         684 :         return spgFormInnerTuple(state,
     104                 :         228 :                                                          (tuple->prefixSize > 0),
     105   [ +  +  -  + ]:         228 :                                                          SGITDATUM(tuple, state),
     106                 :         228 :                                                          tuple->nNodes + 1,
     107                 :         228 :                                                          nodes);
     108                 :         228 : }
     109                 :             : 
     110                 :             : /* qsort comparator for sorting OffsetNumbers */
     111                 :             : static int
     112                 :      835839 : cmpOffsetNumbers(const void *a, const void *b)
     113                 :             : {
     114                 :      835839 :         return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
     115                 :             : }
     116                 :             : 
     117                 :             : /*
     118                 :             :  * Delete multiple tuples from an index page, preserving tuple offset numbers.
     119                 :             :  *
     120                 :             :  * The first tuple in the given list is replaced with a dead tuple of type
     121                 :             :  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
     122                 :             :  * with dead tuples of type "reststate".  If either firststate or reststate
     123                 :             :  * is REDIRECT, blkno/offnum specify where to link to.
     124                 :             :  *
     125                 :             :  * NB: this is used during WAL replay, so beware of trying to make it too
     126                 :             :  * smart.  In particular, it shouldn't use "state" except for calling
     127                 :             :  * spgFormDeadTuple().  This is also used in a critical section, so no
     128                 :             :  * pallocs either!
     129                 :             :  */
     130                 :             : void
     131                 :        1256 : spgPageIndexMultiDelete(SpGistState *state, Page page,
     132                 :             :                                                 OffsetNumber *itemnos, int nitems,
     133                 :             :                                                 int firststate, int reststate,
     134                 :             :                                                 BlockNumber blkno, OffsetNumber offnum)
     135                 :             : {
     136                 :        1256 :         OffsetNumber firstItem;
     137                 :        1256 :         OffsetNumber sortednos[MaxIndexTuplesPerPage];
     138                 :        1256 :         SpGistDeadTuple tuple = NULL;
     139                 :        1256 :         int                     i;
     140                 :             : 
     141         [ +  + ]:        1256 :         if (nitems == 0)
     142                 :          14 :                 return;                                 /* nothing to do */
     143                 :             : 
     144                 :             :         /*
     145                 :             :          * For efficiency we want to use PageIndexMultiDelete, which requires the
     146                 :             :          * targets to be listed in sorted order, so we have to sort the itemnos
     147                 :             :          * array.  (This also greatly simplifies the math for reinserting the
     148                 :             :          * replacement tuples.)  However, we must not scribble on the caller's
     149                 :             :          * array, so we have to make a copy.
     150                 :             :          */
     151                 :        1242 :         memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
     152         [ +  + ]:        1242 :         if (nitems > 1)
     153                 :        1230 :                 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
     154                 :             : 
     155                 :        1242 :         PageIndexMultiDelete(page, sortednos, nitems);
     156                 :             : 
     157                 :        1242 :         firstItem = itemnos[0];
     158                 :             : 
     159         [ +  + ]:      122176 :         for (i = 0; i < nitems; i++)
     160                 :             :         {
     161                 :      120934 :                 OffsetNumber itemno = sortednos[i];
     162                 :      120934 :                 int                     tupstate;
     163                 :             : 
     164         [ +  + ]:      120934 :                 tupstate = (itemno == firstItem) ? firststate : reststate;
     165   [ +  +  +  + ]:      120934 :                 if (tuple == NULL || tuple->tupstate != tupstate)
     166                 :        1782 :                         tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
     167                 :             : 
     168         [ +  - ]:      120934 :                 if (PageAddItem(page, tuple, tuple->size, itemno, false, false) != itemno)
     169   [ #  #  #  # ]:           0 :                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
     170                 :             :                                  tuple->size);
     171                 :             : 
     172         [ +  + ]:      120934 :                 if (tupstate == SPGIST_REDIRECT)
     173                 :         277 :                         SpGistPageGetOpaque(page)->nRedirection++;
     174         [ -  + ]:      120657 :                 else if (tupstate == SPGIST_PLACEHOLDER)
     175                 :      120657 :                         SpGistPageGetOpaque(page)->nPlaceholder++;
     176                 :      120934 :         }
     177         [ -  + ]:        1256 : }
     178                 :             : 
     179                 :             : /*
     180                 :             :  * Update the parent inner tuple's downlink, and mark the parent buffer
     181                 :             :  * dirty (this must be the last change to the parent page in the current
     182                 :             :  * WAL action).
     183                 :             :  */
     184                 :             : static void
     185                 :        1571 : saveNodeLink(Relation index, SPPageDesc *parent,
     186                 :             :                          BlockNumber blkno, OffsetNumber offnum)
     187                 :             : {
     188                 :        1571 :         SpGistInnerTuple innerTuple;
     189                 :             : 
     190                 :        3142 :         innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
     191                 :        1571 :                                                                                                 PageGetItemId(parent->page, parent->offnum));
     192                 :             : 
     193                 :        1571 :         spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
     194                 :             : 
     195                 :        1571 :         MarkBufferDirty(parent->buffer);
     196                 :        1571 : }
     197                 :             : 
     198                 :             : /*
     199                 :             :  * Add a leaf tuple to a leaf page where there is known to be room for it
     200                 :             :  */
     201                 :             : static void
     202                 :      130689 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
     203                 :             :                          SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
     204                 :             : {
     205                 :      130689 :         spgxlogAddLeaf xlrec;
     206                 :             : 
     207                 :      130689 :         xlrec.newPage = isNew;
     208                 :      130689 :         xlrec.storesNulls = isNulls;
     209                 :             : 
     210                 :             :         /* these will be filled below as needed */
     211                 :      130689 :         xlrec.offnumLeaf = InvalidOffsetNumber;
     212                 :      130689 :         xlrec.offnumHeadLeaf = InvalidOffsetNumber;
     213                 :      130689 :         xlrec.offnumParent = InvalidOffsetNumber;
     214                 :      130689 :         xlrec.nodeI = 0;
     215                 :             : 
     216                 :      130689 :         START_CRIT_SECTION();
     217                 :             : 
     218   [ +  +  +  + ]:      258422 :         if (current->offnum == InvalidOffsetNumber ||
     219         [ +  + ]:      130385 :                 SpGistBlockIsRoot(current->blkno))
     220                 :             :         {
     221                 :             :                 /* Tuple is not part of a chain */
     222                 :        2971 :                 SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
     223                 :        5942 :                 current->offnum = SpGistPageAddNewItem(state, current->page,
     224                 :        2971 :                                                                                            leafTuple, leafTuple->size,
     225                 :             :                                                                                            NULL, false);
     226                 :             : 
     227                 :        2971 :                 xlrec.offnumLeaf = current->offnum;
     228                 :             : 
     229                 :             :                 /* Must update parent's downlink if any */
     230         [ +  + ]:        2971 :                 if (parent->buffer != InvalidBuffer)
     231                 :             :                 {
     232                 :         304 :                         xlrec.offnumParent = parent->offnum;
     233                 :         304 :                         xlrec.nodeI = parent->node;
     234                 :             : 
     235                 :         304 :                         saveNodeLink(index, parent, current->blkno, current->offnum);
     236                 :         304 :                 }
     237                 :        2971 :         }
     238                 :             :         else
     239                 :             :         {
     240                 :             :                 /*
     241                 :             :                  * Tuple must be inserted into existing chain.  We mustn't change the
     242                 :             :                  * chain's head address, but we don't need to chase the entire chain
     243                 :             :                  * to put the tuple at the end; we can insert it second.
     244                 :             :                  *
     245                 :             :                  * Also, it's possible that the "chain" consists only of a DEAD tuple,
     246                 :             :                  * in which case we should replace the DEAD tuple in-place.
     247                 :             :                  */
     248                 :      127718 :                 SpGistLeafTuple head;
     249                 :      127718 :                 OffsetNumber offnum;
     250                 :             : 
     251                 :      255436 :                 head = (SpGistLeafTuple) PageGetItem(current->page,
     252                 :      127718 :                                                                                          PageGetItemId(current->page, current->offnum));
     253         [ -  + ]:      127718 :                 if (head->tupstate == SPGIST_LIVE)
     254                 :             :                 {
     255                 :      127718 :                         SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
     256                 :      255436 :                         offnum = SpGistPageAddNewItem(state, current->page,
     257                 :      127718 :                                                                                   leafTuple, leafTuple->size,
     258                 :             :                                                                                   NULL, false);
     259                 :             : 
     260                 :             :                         /*
     261                 :             :                          * re-get head of list because it could have been moved on page,
     262                 :             :                          * and set new second element
     263                 :             :                          */
     264                 :      255436 :                         head = (SpGistLeafTuple) PageGetItem(current->page,
     265                 :      127718 :                                                                                                  PageGetItemId(current->page, current->offnum));
     266                 :      127718 :                         SGLT_SET_NEXTOFFSET(head, offnum);
     267                 :             : 
     268                 :      127718 :                         xlrec.offnumLeaf = offnum;
     269                 :      127718 :                         xlrec.offnumHeadLeaf = current->offnum;
     270                 :      127718 :                 }
     271         [ #  # ]:           0 :                 else if (head->tupstate == SPGIST_DEAD)
     272                 :             :                 {
     273                 :           0 :                         SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
     274                 :           0 :                         PageIndexTupleDelete(current->page, current->offnum);
     275                 :           0 :                         if (PageAddItem(current->page,
     276                 :             :                                                         leafTuple, leafTuple->size,
     277         [ #  # ]:           0 :                                                         current->offnum, false, false) != current->offnum)
     278   [ #  #  #  # ]:           0 :                                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     279                 :             :                                          leafTuple->size);
     280                 :             : 
     281                 :             :                         /* WAL replay distinguishes this case by equal offnums */
     282                 :           0 :                         xlrec.offnumLeaf = current->offnum;
     283                 :           0 :                         xlrec.offnumHeadLeaf = current->offnum;
     284                 :           0 :                 }
     285                 :             :                 else
     286   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
     287                 :      127718 :         }
     288                 :             : 
     289                 :      130689 :         MarkBufferDirty(current->buffer);
     290                 :             : 
     291   [ +  +  +  -  :      130689 :         if (RelationNeedsWAL(index) && !state->isBuild)
             +  +  +  + ]
     292                 :             :         {
     293                 :       38968 :                 XLogRecPtr      recptr;
     294                 :       38968 :                 int                     flags;
     295                 :             : 
     296                 :       38968 :                 XLogBeginInsert();
     297                 :       38968 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     298                 :       38968 :                 XLogRegisterData(leafTuple, leafTuple->size);
     299                 :             : 
     300                 :       38968 :                 flags = REGBUF_STANDARD;
     301         [ +  - ]:       38968 :                 if (xlrec.newPage)
     302                 :           0 :                         flags |= REGBUF_WILL_INIT;
     303                 :       38968 :                 XLogRegisterBuffer(0, current->buffer, flags);
     304         [ +  + ]:       38968 :                 if (xlrec.offnumParent != InvalidOffsetNumber)
     305                 :         120 :                         XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
     306                 :             : 
     307                 :       38968 :                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
     308                 :             : 
     309                 :       38968 :                 PageSetLSN(current->page, recptr);
     310                 :             : 
     311                 :             :                 /* update parent only if we actually changed it */
     312         [ +  + ]:       38968 :                 if (xlrec.offnumParent != InvalidOffsetNumber)
     313                 :             :                 {
     314                 :         120 :                         PageSetLSN(parent->page, recptr);
     315                 :         120 :                 }
     316                 :       38968 :         }
     317                 :             : 
     318         [ +  - ]:      208625 :         END_CRIT_SECTION();
     319                 :      208625 : }
     320                 :             : 
     321                 :             : /*
     322                 :             :  * Count the number and total size of leaf tuples in the chain starting at
     323                 :             :  * current->offnum.  Return number into *nToSplit and total size as function
     324                 :             :  * result.
     325                 :             :  *
     326                 :             :  * Klugy special case when considering the root page (i.e., root is a leaf
     327                 :             :  * page, but we're about to split for the first time): return fake large
     328                 :             :  * values to force spgdoinsert() to take the doPickSplit rather than
     329                 :             :  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
     330                 :             :  */
     331                 :             : static int
     332                 :        1279 : checkSplitConditions(Relation index, SpGistState *state,
     333                 :             :                                          SPPageDesc *current, int *nToSplit)
     334                 :             : {
     335                 :        2558 :         int                     i,
     336                 :        1279 :                                 n = 0,
     337                 :        1279 :                                 totalSize = 0;
     338                 :             : 
     339   [ +  +  -  + ]:        1279 :         if (SpGistBlockIsRoot(current->blkno))
     340                 :             :         {
     341                 :             :                 /* return impossible values to force split */
     342                 :          13 :                 *nToSplit = BLCKSZ;
     343                 :          13 :                 return BLCKSZ;
     344                 :             :         }
     345                 :             : 
     346                 :        1266 :         i = current->offnum;
     347         [ +  + ]:      130735 :         while (i != InvalidOffsetNumber)
     348                 :             :         {
     349                 :      129469 :                 SpGistLeafTuple it;
     350                 :             : 
     351         [ +  - ]:      129469 :                 Assert(i >= FirstOffsetNumber &&
     352                 :             :                            i <= PageGetMaxOffsetNumber(current->page));
     353                 :      258938 :                 it = (SpGistLeafTuple) PageGetItem(current->page,
     354                 :      129469 :                                                                                    PageGetItemId(current->page, i));
     355         [ -  + ]:      129469 :                 if (it->tupstate == SPGIST_LIVE)
     356                 :             :                 {
     357                 :      129469 :                         n++;
     358                 :      129469 :                         totalSize += it->size + sizeof(ItemIdData);
     359                 :      129469 :                 }
     360         [ #  # ]:           0 :                 else if (it->tupstate == SPGIST_DEAD)
     361                 :             :                 {
     362                 :             :                         /* We could see a DEAD tuple as first/only chain item */
     363         [ #  # ]:           0 :                         Assert(i == current->offnum);
     364         [ #  # ]:           0 :                         Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
     365                 :             :                         /* Don't count it in result, because it won't go to other page */
     366                 :           0 :                 }
     367                 :             :                 else
     368   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     369                 :             : 
     370                 :      129469 :                 i = SGLT_GET_NEXTOFFSET(it);
     371                 :      129469 :         }
     372                 :             : 
     373                 :        1266 :         *nToSplit = n;
     374                 :             : 
     375                 :        1266 :         return totalSize;
     376                 :        1279 : }
     377                 :             : 
     378                 :             : /*
     379                 :             :  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
     380                 :             :  * but the chain has to be moved because there's not enough room to add
     381                 :             :  * newLeafTuple to its page.  We use this method when the chain contains
     382                 :             :  * very little data so a split would be inefficient.  We are sure we can
     383                 :             :  * fit the chain plus newLeafTuple on one other page.
     384                 :             :  */
     385                 :             : static void
     386                 :         401 : moveLeafs(Relation index, SpGistState *state,
     387                 :             :                   SPPageDesc *current, SPPageDesc *parent,
     388                 :             :                   SpGistLeafTuple newLeafTuple, bool isNulls)
     389                 :             : {
     390                 :         401 :         int                     i,
     391                 :             :                                 nDelete,
     392                 :             :                                 nInsert,
     393                 :             :                                 size;
     394                 :         401 :         Buffer          nbuf;
     395                 :         401 :         Page            npage;
     396                 :         401 :         OffsetNumber r = InvalidOffsetNumber,
     397                 :         401 :                                 startOffset = InvalidOffsetNumber;
     398                 :         401 :         bool            replaceDead = false;
     399                 :         401 :         OffsetNumber *toDelete;
     400                 :         401 :         OffsetNumber *toInsert;
     401                 :         401 :         BlockNumber nblkno;
     402                 :         401 :         spgxlogMoveLeafs xlrec;
     403                 :         401 :         char       *leafdata,
     404                 :             :                            *leafptr;
     405                 :             : 
     406                 :             :         /* This doesn't work on root page */
     407         [ +  - ]:         401 :         Assert(parent->buffer != InvalidBuffer);
     408         [ +  - ]:         401 :         Assert(parent->buffer != current->buffer);
     409                 :             : 
     410                 :             :         /* Locate the tuples to be moved, and count up the space needed */
     411                 :         401 :         i = PageGetMaxOffsetNumber(current->page);
     412                 :         401 :         toDelete = palloc_array(OffsetNumber, i);
     413                 :         401 :         toInsert = palloc_array(OffsetNumber, i + 1);
     414                 :             : 
     415                 :         401 :         size = newLeafTuple->size + sizeof(ItemIdData);
     416                 :             : 
     417                 :         401 :         nDelete = 0;
     418                 :         401 :         i = current->offnum;
     419         [ +  + ]:       14718 :         while (i != InvalidOffsetNumber)
     420                 :             :         {
     421                 :       14317 :                 SpGistLeafTuple it;
     422                 :             : 
     423         [ +  - ]:       14317 :                 Assert(i >= FirstOffsetNumber &&
     424                 :             :                            i <= PageGetMaxOffsetNumber(current->page));
     425                 :       28634 :                 it = (SpGistLeafTuple) PageGetItem(current->page,
     426                 :       14317 :                                                                                    PageGetItemId(current->page, i));
     427                 :             : 
     428         [ -  + ]:       14317 :                 if (it->tupstate == SPGIST_LIVE)
     429                 :             :                 {
     430                 :       14317 :                         toDelete[nDelete] = i;
     431                 :       14317 :                         size += it->size + sizeof(ItemIdData);
     432                 :       14317 :                         nDelete++;
     433                 :       14317 :                 }
     434         [ #  # ]:           0 :                 else if (it->tupstate == SPGIST_DEAD)
     435                 :             :                 {
     436                 :             :                         /* We could see a DEAD tuple as first/only chain item */
     437         [ #  # ]:           0 :                         Assert(i == current->offnum);
     438         [ #  # ]:           0 :                         Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
     439                 :             :                         /* We don't want to move it, so don't count it in size */
     440                 :           0 :                         toDelete[nDelete] = i;
     441                 :           0 :                         nDelete++;
     442                 :           0 :                         replaceDead = true;
     443                 :           0 :                 }
     444                 :             :                 else
     445   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     446                 :             : 
     447                 :       14317 :                 i = SGLT_GET_NEXTOFFSET(it);
     448                 :       14317 :         }
     449                 :             : 
     450                 :             :         /* Find a leaf page that will hold them */
     451                 :         802 :         nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
     452                 :         401 :                                                    size, &xlrec.newPage);
     453                 :         401 :         npage = BufferGetPage(nbuf);
     454                 :         401 :         nblkno = BufferGetBlockNumber(nbuf);
     455         [ +  - ]:         401 :         Assert(nblkno != current->blkno);
     456                 :             : 
     457                 :         401 :         leafdata = leafptr = palloc(size);
     458                 :             : 
     459                 :         401 :         START_CRIT_SECTION();
     460                 :             : 
     461                 :             :         /* copy all the old tuples to new page, unless they're dead */
     462                 :         401 :         nInsert = 0;
     463         [ +  + ]:         401 :         if (!replaceDead)
     464                 :             :         {
     465         [ +  + ]:       14674 :                 for (i = 0; i < nDelete; i++)
     466                 :             :                 {
     467                 :       14317 :                         SpGistLeafTuple it;
     468                 :             : 
     469                 :       28634 :                         it = (SpGistLeafTuple) PageGetItem(current->page,
     470                 :       14317 :                                                                                            PageGetItemId(current->page, toDelete[i]));
     471         [ +  - ]:       14317 :                         Assert(it->tupstate == SPGIST_LIVE);
     472                 :             : 
     473                 :             :                         /*
     474                 :             :                          * Update chain link (notice the chain order gets reversed, but we
     475                 :             :                          * don't care).  We're modifying the tuple on the source page
     476                 :             :                          * here, but it's okay since we're about to delete it.
     477                 :             :                          */
     478                 :       14317 :                         SGLT_SET_NEXTOFFSET(it, r);
     479                 :             : 
     480                 :       14317 :                         r = SpGistPageAddNewItem(state, npage, it, it->size, &startOffset, false);
     481                 :             : 
     482                 :       14317 :                         toInsert[nInsert] = r;
     483                 :       14317 :                         nInsert++;
     484                 :             : 
     485                 :             :                         /* save modified tuple into leafdata as well */
     486                 :       14317 :                         memcpy(leafptr, it, it->size);
     487                 :       14317 :                         leafptr += it->size;
     488                 :       14317 :                 }
     489                 :         357 :         }
     490                 :             : 
     491                 :             :         /* add the new tuple as well */
     492                 :         401 :         SGLT_SET_NEXTOFFSET(newLeafTuple, r);
     493                 :         401 :         r = SpGistPageAddNewItem(state, npage, newLeafTuple, newLeafTuple->size, &startOffset, false);
     494                 :         401 :         toInsert[nInsert] = r;
     495                 :         401 :         nInsert++;
     496                 :         401 :         memcpy(leafptr, newLeafTuple, newLeafTuple->size);
     497                 :         401 :         leafptr += newLeafTuple->size;
     498                 :             : 
     499                 :             :         /*
     500                 :             :          * Now delete the old tuples, leaving a redirection pointer behind for the
     501                 :             :          * first one, unless we're doing an index build; in which case there can't
     502                 :             :          * be any concurrent scan so we need not provide a redirect.
     503                 :             :          */
     504                 :         802 :         spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
     505                 :         401 :                                                         state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     506                 :             :                                                         SPGIST_PLACEHOLDER,
     507                 :         401 :                                                         nblkno, r);
     508                 :             : 
     509                 :             :         /* Update parent's downlink and mark parent page dirty */
     510                 :         401 :         saveNodeLink(index, parent, nblkno, r);
     511                 :             : 
     512                 :             :         /* Mark the leaf pages too */
     513                 :         401 :         MarkBufferDirty(current->buffer);
     514                 :         401 :         MarkBufferDirty(nbuf);
     515                 :             : 
     516   [ +  +  +  -  :         401 :         if (RelationNeedsWAL(index) && !state->isBuild)
             +  +  +  + ]
     517                 :             :         {
     518                 :          77 :                 XLogRecPtr      recptr;
     519                 :             : 
     520                 :             :                 /* prepare WAL info */
     521                 :          77 :                 STORE_STATE(state, xlrec.stateSrc);
     522                 :             : 
     523                 :          77 :                 xlrec.nMoves = nDelete;
     524                 :          77 :                 xlrec.replaceDead = replaceDead;
     525                 :          77 :                 xlrec.storesNulls = isNulls;
     526                 :             : 
     527                 :          77 :                 xlrec.offnumParent = parent->offnum;
     528                 :          77 :                 xlrec.nodeI = parent->node;
     529                 :             : 
     530                 :          77 :                 XLogBeginInsert();
     531                 :          77 :                 XLogRegisterData(&xlrec, SizeOfSpgxlogMoveLeafs);
     532                 :         154 :                 XLogRegisterData(toDelete,
     533                 :          77 :                                                  sizeof(OffsetNumber) * nDelete);
     534                 :         154 :                 XLogRegisterData(toInsert,
     535                 :          77 :                                                  sizeof(OffsetNumber) * nInsert);
     536                 :          77 :                 XLogRegisterData(leafdata, leafptr - leafdata);
     537                 :             : 
     538                 :          77 :                 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
     539                 :          77 :                 XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
     540                 :          77 :                 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
     541                 :             : 
     542                 :          77 :                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
     543                 :             : 
     544                 :          77 :                 PageSetLSN(current->page, recptr);
     545                 :          77 :                 PageSetLSN(npage, recptr);
     546                 :          77 :                 PageSetLSN(parent->page, recptr);
     547                 :          77 :         }
     548                 :             : 
     549         [ +  - ]:         467 :         END_CRIT_SECTION();
     550                 :             : 
     551                 :             :         /* Update local free-space cache and release new buffer */
     552                 :         357 :         SpGistSetLastUsedPage(index, nbuf);
     553                 :         357 :         UnlockReleaseBuffer(nbuf);
     554                 :         357 : }
     555                 :             : 
     556                 :             : /*
     557                 :             :  * Update previously-created redirection tuple with appropriate destination
     558                 :             :  *
     559                 :             :  * We use this when it's not convenient to know the destination first.
     560                 :             :  * The tuple should have been made with the "impossible" destination of
     561                 :             :  * the metapage.
     562                 :             :  */
     563                 :             : static void
     564                 :         200 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
     565                 :             :                                         BlockNumber blkno, OffsetNumber offnum)
     566                 :             : {
     567                 :         200 :         SpGistDeadTuple dt;
     568                 :             : 
     569                 :         400 :         dt = (SpGistDeadTuple) PageGetItem(current->page,
     570                 :         200 :                                                                            PageGetItemId(current->page, position));
     571         [ +  - ]:         200 :         Assert(dt->tupstate == SPGIST_REDIRECT);
     572         [ +  - ]:         200 :         Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
     573                 :         200 :         ItemPointerSet(&dt->pointer, blkno, offnum);
     574                 :         200 : }
     575                 :             : 
     576                 :             : /*
     577                 :             :  * Test to see if the user-defined picksplit function failed to do its job,
     578                 :             :  * ie, it put all the leaf tuples into the same node.
     579                 :             :  * If so, randomly divide the tuples into several nodes (all with the same
     580                 :             :  * label) and return true to select allTheSame mode for this inner tuple.
     581                 :             :  *
     582                 :             :  * (This code is also used to forcibly select allTheSame mode for nulls.)
     583                 :             :  *
     584                 :             :  * If we know that the leaf tuples wouldn't all fit on one page, then we
     585                 :             :  * exclude the last tuple (which is the incoming new tuple that forced a split)
     586                 :             :  * from the check to see if more than one node is used.  The reason for this
     587                 :             :  * is that if the existing tuples are put into only one chain, then even if
     588                 :             :  * we move them all to an empty page, there would still not be room for the
     589                 :             :  * new tuple, so we'd get into an infinite loop of picksplit attempts.
     590                 :             :  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
     591                 :             :  * be split across pages.  (Exercise for the reader: figure out why this
     592                 :             :  * fixes the problem even when there is only one old tuple.)
     593                 :             :  */
     594                 :             : static bool
     595                 :         922 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
     596                 :             :                                 bool *includeNew)
     597                 :             : {
     598                 :         922 :         int                     theNode;
     599                 :         922 :         int                     limit;
     600                 :         922 :         int                     i;
     601                 :             : 
     602                 :             :         /* For the moment, assume we can include the new leaf tuple */
     603                 :         922 :         *includeNew = true;
     604                 :             : 
     605                 :             :         /* If there's only the new leaf tuple, don't select allTheSame mode */
     606         [ -  + ]:         922 :         if (in->nTuples <= 1)
     607                 :           0 :                 return false;
     608                 :             : 
     609                 :             :         /* If tuple set doesn't fit on one page, ignore the new tuple in test */
     610         [ +  + ]:         922 :         limit = tooBig ? in->nTuples - 1 : in->nTuples;
     611                 :             : 
     612                 :             :         /* Check to see if more than one node is populated */
     613                 :         922 :         theNode = out->mapTuplesToNodes[0];
     614         [ +  + ]:       20788 :         for (i = 1; i < limit; i++)
     615                 :             :         {
     616         [ +  + ]:       20736 :                 if (out->mapTuplesToNodes[i] != theNode)
     617                 :         870 :                         return false;
     618                 :       19866 :         }
     619                 :             : 
     620                 :             :         /* Nope, so override the picksplit function's decisions */
     621                 :             : 
     622                 :             :         /* If the new tuple is in its own node, it can't be included in split */
     623   [ +  +  +  - ]:          52 :         if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
     624                 :           0 :                 *includeNew = false;
     625                 :             : 
     626                 :          52 :         out->nNodes = 8;                     /* arbitrary number of child nodes */
     627                 :             : 
     628                 :             :         /* Random assignment of tuples to nodes (note we include new tuple) */
     629         [ +  + ]:        5544 :         for (i = 0; i < in->nTuples; i++)
     630                 :        5492 :                 out->mapTuplesToNodes[i] = i % out->nNodes;
     631                 :             : 
     632                 :             :         /* The opclass may not use node labels, but if it does, duplicate 'em */
     633         [ +  + ]:          52 :         if (out->nodeLabels)
     634                 :             :         {
     635                 :           9 :                 Datum           theLabel = out->nodeLabels[theNode];
     636                 :             : 
     637                 :           9 :                 out->nodeLabels = palloc_array(Datum, out->nNodes);
     638         [ +  + ]:          81 :                 for (i = 0; i < out->nNodes; i++)
     639                 :          72 :                         out->nodeLabels[i] = theLabel;
     640                 :           9 :         }
     641                 :             : 
     642                 :             :         /* We don't touch the prefix or the leaf tuple datum assignments */
     643                 :             : 
     644                 :          52 :         return true;
     645                 :         922 : }
     646                 :             : 
     647                 :             : /*
     648                 :             :  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
     649                 :             :  * but the chain has to be split because there's not enough room to add
     650                 :             :  * newLeafTuple to its page.
     651                 :             :  *
     652                 :             :  * This function splits the leaf tuple set according to picksplit's rules,
     653                 :             :  * creating one or more new chains that are spread across the current page
     654                 :             :  * and an additional leaf page (we assume that two leaf pages will be
     655                 :             :  * sufficient).  A new inner tuple is created, and the parent downlink
     656                 :             :  * pointer is updated to point to that inner tuple instead of the leaf chain.
     657                 :             :  *
     658                 :             :  * On exit, current contains the address of the new inner tuple.
     659                 :             :  *
     660                 :             :  * Returns true if we successfully inserted newLeafTuple during this function,
     661                 :             :  * false if caller still has to do it (meaning another picksplit operation is
     662                 :             :  * probably needed).  Failure could occur if the picksplit result is fairly
     663                 :             :  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
     664                 :             :  * Because we force the picksplit result to be at least two chains, each
     665                 :             :  * cycle will get rid of at least one leaf tuple from the chain, so the loop
     666                 :             :  * will eventually terminate if lack of balance is the issue.  If the tuple
     667                 :             :  * is too big, we assume that repeated picksplit operations will eventually
     668                 :             :  * make it small enough by repeated prefix-stripping.  A broken opclass could
     669                 :             :  * make this an infinite loop, though, so spgdoinsert() checks that the
     670                 :             :  * leaf datums get smaller each time.
     671                 :             :  */
     672                 :             : static bool
     673                 :         922 : doPickSplit(Relation index, SpGistState *state,
     674                 :             :                         SPPageDesc *current, SPPageDesc *parent,
     675                 :             :                         SpGistLeafTuple newLeafTuple,
     676                 :             :                         int level, bool isNulls, bool isNew)
     677                 :             : {
     678                 :         922 :         bool            insertedNew = false;
     679                 :         922 :         spgPickSplitIn in;
     680                 :         922 :         spgPickSplitOut out;
     681                 :         922 :         FmgrInfo   *procinfo;
     682                 :         922 :         bool            includeNew;
     683                 :         922 :         int                     i,
     684                 :             :                                 max,
     685                 :             :                                 n;
     686                 :         922 :         SpGistInnerTuple innerTuple;
     687                 :         922 :         SpGistNodeTuple node,
     688                 :             :                            *nodes;
     689                 :         922 :         Buffer          newInnerBuffer,
     690                 :             :                                 newLeafBuffer;
     691                 :         922 :         uint8      *leafPageSelect;
     692                 :         922 :         int                *leafSizes;
     693                 :         922 :         OffsetNumber *toDelete;
     694                 :         922 :         OffsetNumber *toInsert;
     695                 :         922 :         OffsetNumber redirectTuplePos = InvalidOffsetNumber;
     696                 :         922 :         OffsetNumber startOffsets[2];
     697                 :         922 :         SpGistLeafTuple *oldLeafs;
     698                 :         922 :         SpGistLeafTuple *newLeafs;
     699                 :         922 :         Datum           leafDatums[INDEX_MAX_KEYS];
     700                 :         922 :         bool            leafIsnulls[INDEX_MAX_KEYS];
     701                 :         922 :         int                     spaceToDelete;
     702                 :         922 :         int                     currentFreeSpace;
     703                 :         922 :         int                     totalLeafSizes;
     704                 :         922 :         bool            allTheSame;
     705                 :         922 :         spgxlogPickSplit xlrec;
     706                 :         922 :         char       *leafdata,
     707                 :             :                            *leafptr;
     708                 :         922 :         SPPageDesc      saveCurrent;
     709                 :         922 :         int                     nToDelete,
     710                 :             :                                 nToInsert,
     711                 :             :                                 maxToInclude;
     712                 :             : 
     713                 :         922 :         in.level = level;
     714                 :             : 
     715                 :             :         /*
     716                 :             :          * Allocate per-leaf-tuple work arrays with max possible size
     717                 :             :          */
     718                 :         922 :         max = PageGetMaxOffsetNumber(current->page);
     719                 :         922 :         n = max + 1;
     720                 :         922 :         in.datums = palloc_array(Datum, n);
     721                 :         922 :         toDelete = palloc_array(OffsetNumber, n);
     722                 :         922 :         toInsert = palloc_array(OffsetNumber, n);
     723                 :         922 :         oldLeafs = palloc_array(SpGistLeafTuple, n);
     724                 :         922 :         newLeafs = palloc_array(SpGistLeafTuple, n);
     725                 :         922 :         leafPageSelect = palloc_array(uint8, n);
     726                 :             : 
     727                 :         922 :         STORE_STATE(state, xlrec.stateSrc);
     728                 :             : 
     729                 :             :         /*
     730                 :             :          * Form list of leaf tuples which will be distributed as split result;
     731                 :             :          * also, count up the amount of space that will be freed from current.
     732                 :             :          * (Note that in the non-root case, we won't actually delete the old
     733                 :             :          * tuples, only replace them with redirects or placeholders.)
     734                 :             :          */
     735                 :         922 :         nToInsert = 0;
     736                 :         922 :         nToDelete = 0;
     737                 :         922 :         spaceToDelete = 0;
     738   [ +  +  -  + ]:         922 :         if (SpGistBlockIsRoot(current->blkno))
     739                 :             :         {
     740                 :             :                 /*
     741                 :             :                  * We are splitting the root (which up to now is also a leaf page).
     742                 :             :                  * Its tuples are not linked, so scan sequentially to get them all. We
     743                 :             :                  * ignore the original value of current->offnum.
     744                 :             :                  */
     745         [ +  + ]:        2449 :                 for (i = FirstOffsetNumber; i <= max; i++)
     746                 :             :                 {
     747                 :        2436 :                         SpGistLeafTuple it;
     748                 :             : 
     749                 :        4872 :                         it = (SpGistLeafTuple) PageGetItem(current->page,
     750                 :        2436 :                                                                                            PageGetItemId(current->page, i));
     751         [ +  - ]:        2436 :                         if (it->tupstate == SPGIST_LIVE)
     752                 :             :                         {
     753                 :        2436 :                                 in.datums[nToInsert] =
     754         [ -  + ]:        2436 :                                         isNulls ? (Datum) 0 : SGLTDATUM(it, state);
     755                 :        2436 :                                 oldLeafs[nToInsert] = it;
     756                 :        2436 :                                 nToInsert++;
     757                 :        2436 :                                 toDelete[nToDelete] = i;
     758                 :        2436 :                                 nToDelete++;
     759                 :             :                                 /* we will delete the tuple altogether, so count full space */
     760                 :        2436 :                                 spaceToDelete += it->size + sizeof(ItemIdData);
     761                 :        2436 :                         }
     762                 :             :                         else                            /* tuples on root should be live */
     763   [ #  #  #  # ]:           0 :                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     764                 :        2436 :                 }
     765                 :          13 :         }
     766                 :             :         else
     767                 :             :         {
     768                 :             :                 /* Normal case, just collect the leaf tuples in the chain */
     769                 :         909 :                 i = current->offnum;
     770         [ +  + ]:      116061 :                 while (i != InvalidOffsetNumber)
     771                 :             :                 {
     772                 :      115152 :                         SpGistLeafTuple it;
     773                 :             : 
     774         [ +  - ]:      115152 :                         Assert(i >= FirstOffsetNumber && i <= max);
     775                 :      230304 :                         it = (SpGistLeafTuple) PageGetItem(current->page,
     776                 :      115152 :                                                                                            PageGetItemId(current->page, i));
     777         [ -  + ]:      115152 :                         if (it->tupstate == SPGIST_LIVE)
     778                 :             :                         {
     779                 :      115152 :                                 in.datums[nToInsert] =
     780         [ -  + ]:      115152 :                                         isNulls ? (Datum) 0 : SGLTDATUM(it, state);
     781                 :      115152 :                                 oldLeafs[nToInsert] = it;
     782                 :      115152 :                                 nToInsert++;
     783                 :      115152 :                                 toDelete[nToDelete] = i;
     784                 :      115152 :                                 nToDelete++;
     785                 :             :                                 /* we will not delete the tuple, only replace with dead */
     786         [ -  + ]:      115152 :                                 Assert(it->size >= SGDTSIZE);
     787                 :      115152 :                                 spaceToDelete += it->size - SGDTSIZE;
     788                 :      115152 :                         }
     789         [ #  # ]:           0 :                         else if (it->tupstate == SPGIST_DEAD)
     790                 :             :                         {
     791                 :             :                                 /* We could see a DEAD tuple as first/only chain item */
     792         [ #  # ]:           0 :                                 Assert(i == current->offnum);
     793         [ #  # ]:           0 :                                 Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
     794                 :           0 :                                 toDelete[nToDelete] = i;
     795                 :           0 :                                 nToDelete++;
     796                 :             :                                 /* replacing it with redirect will save no space */
     797                 :           0 :                         }
     798                 :             :                         else
     799   [ #  #  #  # ]:           0 :                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     800                 :             : 
     801                 :      115152 :                         i = SGLT_GET_NEXTOFFSET(it);
     802                 :      115152 :                 }
     803                 :             :         }
     804                 :         922 :         in.nTuples = nToInsert;
     805                 :             : 
     806                 :             :         /*
     807                 :             :          * We may not actually insert new tuple because another picksplit may be
     808                 :             :          * necessary due to too large value, but we will try to allocate enough
     809                 :             :          * space to include it; and in any case it has to be included in the input
     810                 :             :          * for the picksplit function.  So don't increment nToInsert yet.
     811                 :             :          */
     812                 :         922 :         in.datums[in.nTuples] =
     813         [ -  + ]:         922 :                 isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
     814                 :         922 :         oldLeafs[in.nTuples] = newLeafTuple;
     815                 :         922 :         in.nTuples++;
     816                 :             : 
     817                 :         922 :         memset(&out, 0, sizeof(out));
     818                 :             : 
     819         [ -  + ]:         922 :         if (!isNulls)
     820                 :             :         {
     821                 :             :                 /*
     822                 :             :                  * Perform split using user-defined method.
     823                 :             :                  */
     824                 :         922 :                 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
     825                 :        1844 :                 FunctionCall2Coll(procinfo,
     826                 :         922 :                                                   index->rd_indcollation[0],
     827                 :         922 :                                                   PointerGetDatum(&in),
     828                 :         922 :                                                   PointerGetDatum(&out));
     829                 :             : 
     830                 :             :                 /*
     831                 :             :                  * Form new leaf tuples and count up the total space needed.
     832                 :             :                  */
     833                 :         922 :                 totalLeafSizes = 0;
     834         [ +  + ]:      119432 :                 for (i = 0; i < in.nTuples; i++)
     835                 :             :                 {
     836         [ +  + ]:      118510 :                         if (state->leafTupDesc->natts > 1)
     837                 :       17666 :                                 spgDeformLeafTuple(oldLeafs[i],
     838                 :        8833 :                                                                    state->leafTupDesc,
     839                 :        8833 :                                                                    leafDatums,
     840                 :        8833 :                                                                    leafIsnulls,
     841                 :        8833 :                                                                    isNulls);
     842                 :             : 
     843                 :      118510 :                         leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
     844                 :      118510 :                         leafIsnulls[spgKeyColumn] = false;
     845                 :             : 
     846                 :      237020 :                         newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
     847                 :      118510 :                                                                                    leafDatums,
     848                 :      118510 :                                                                                    leafIsnulls);
     849                 :      118510 :                         totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
     850                 :      118510 :                 }
     851                 :         922 :         }
     852                 :             :         else
     853                 :             :         {
     854                 :             :                 /*
     855                 :             :                  * Perform dummy split that puts all tuples into one node.
     856                 :             :                  * checkAllTheSame will override this and force allTheSame mode.
     857                 :             :                  */
     858                 :           0 :                 out.hasPrefix = false;
     859                 :           0 :                 out.nNodes = 1;
     860                 :           0 :                 out.nodeLabels = NULL;
     861                 :           0 :                 out.mapTuplesToNodes = palloc0_array(int, in.nTuples);
     862                 :             : 
     863                 :             :                 /*
     864                 :             :                  * Form new leaf tuples and count up the total space needed.
     865                 :             :                  */
     866                 :           0 :                 totalLeafSizes = 0;
     867         [ #  # ]:           0 :                 for (i = 0; i < in.nTuples; i++)
     868                 :             :                 {
     869         [ #  # ]:           0 :                         if (state->leafTupDesc->natts > 1)
     870                 :           0 :                                 spgDeformLeafTuple(oldLeafs[i],
     871                 :           0 :                                                                    state->leafTupDesc,
     872                 :           0 :                                                                    leafDatums,
     873                 :           0 :                                                                    leafIsnulls,
     874                 :           0 :                                                                    isNulls);
     875                 :             : 
     876                 :             :                         /*
     877                 :             :                          * Nulls tree can contain only null key values.
     878                 :             :                          */
     879                 :           0 :                         leafDatums[spgKeyColumn] = (Datum) 0;
     880                 :           0 :                         leafIsnulls[spgKeyColumn] = true;
     881                 :             : 
     882                 :           0 :                         newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
     883                 :           0 :                                                                                    leafDatums,
     884                 :           0 :                                                                                    leafIsnulls);
     885                 :           0 :                         totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
     886                 :           0 :                 }
     887                 :             :         }
     888                 :             : 
     889                 :             :         /*
     890                 :             :          * Check to see if the picksplit function failed to separate the values,
     891                 :             :          * ie, it put them all into the same child node.  If so, select allTheSame
     892                 :             :          * mode and create a random split instead.  See comments for
     893                 :             :          * checkAllTheSame as to why we need to know if the new leaf tuples could
     894                 :             :          * fit on one page.
     895                 :             :          */
     896                 :         922 :         allTheSame = checkAllTheSame(&in, &out,
     897                 :         922 :                                                                  totalLeafSizes > SPGIST_PAGE_CAPACITY,
     898                 :             :                                                                  &includeNew);
     899                 :             : 
     900                 :             :         /*
     901                 :             :          * If checkAllTheSame decided we must exclude the new tuple, don't
     902                 :             :          * consider it any further.
     903                 :             :          */
     904         [ +  - ]:         922 :         if (includeNew)
     905                 :         922 :                 maxToInclude = in.nTuples;
     906                 :             :         else
     907                 :             :         {
     908                 :           0 :                 maxToInclude = in.nTuples - 1;
     909                 :           0 :                 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
     910                 :             :         }
     911                 :             : 
     912                 :             :         /*
     913                 :             :          * Allocate per-node work arrays.  Since checkAllTheSame could replace
     914                 :             :          * out.nNodes with a value larger than the number of tuples on the input
     915                 :             :          * page, we can't allocate these arrays before here.
     916                 :             :          */
     917                 :         922 :         nodes = palloc_array(SpGistNodeTuple, out.nNodes);
     918                 :         922 :         leafSizes = palloc0_array(int, out.nNodes);
     919                 :             : 
     920                 :             :         /*
     921                 :             :          * Form nodes of inner tuple and inner tuple itself
     922                 :             :          */
     923         [ +  + ]:        7217 :         for (i = 0; i < out.nNodes; i++)
     924                 :             :         {
     925                 :        6295 :                 Datum           label = (Datum) 0;
     926                 :        6295 :                 bool            labelisnull = (out.nodeLabels == NULL);
     927                 :             : 
     928         [ +  + ]:        6295 :                 if (!labelisnull)
     929                 :         491 :                         label = out.nodeLabels[i];
     930                 :        6295 :                 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
     931                 :        6295 :         }
     932                 :        1844 :         innerTuple = spgFormInnerTuple(state,
     933                 :         922 :                                                                    out.hasPrefix, out.prefixDatum,
     934                 :         922 :                                                                    out.nNodes, nodes);
     935                 :         922 :         innerTuple->allTheSame = allTheSame;
     936                 :             : 
     937                 :             :         /*
     938                 :             :          * Update nodes[] array to point into the newly formed innerTuple, so that
     939                 :             :          * we can adjust their downlinks below.
     940                 :             :          */
     941         [ +  + ]:        7217 :         SGITITERATE(innerTuple, i, node)
     942                 :             :         {
     943                 :        6295 :                 nodes[i] = node;
     944                 :        6295 :         }
     945                 :             : 
     946                 :             :         /*
     947                 :             :          * Re-scan new leaf tuples and count up the space needed under each node.
     948                 :             :          */
     949         [ +  + ]:      119432 :         for (i = 0; i < maxToInclude; i++)
     950                 :             :         {
     951                 :      118510 :                 n = out.mapTuplesToNodes[i];
     952         [ +  - ]:      118510 :                 if (n < 0 || n >= out.nNodes)
     953   [ #  #  #  # ]:           0 :                         elog(ERROR, "inconsistent result of SPGiST picksplit function");
     954                 :      118510 :                 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
     955                 :      118510 :         }
     956                 :             : 
     957                 :             :         /*
     958                 :             :          * To perform the split, we must insert a new inner tuple, which can't go
     959                 :             :          * on a leaf page; and unless we are splitting the root page, we must then
     960                 :             :          * update the parent tuple's downlink to point to the inner tuple.  If
     961                 :             :          * there is room, we'll put the new inner tuple on the same page as the
     962                 :             :          * parent tuple, otherwise we need another non-leaf buffer. But if the
     963                 :             :          * parent page is the root, we can't add the new inner tuple there,
     964                 :             :          * because the root page must have only one inner tuple.
     965                 :             :          */
     966                 :         922 :         xlrec.initInner = false;
     967         [ +  + ]:         922 :         if (parent->buffer != InvalidBuffer &&
     968   [ +  +  -  +  :        1769 :                 !SpGistBlockIsRoot(parent->blkno) &&
                   +  + ]
     969         [ -  + ]:         860 :                 (SpGistPageGetFreeSpace(parent->page, 1) >=
     970                 :         860 :                  innerTuple->size + sizeof(ItemIdData)))
     971                 :             :         {
     972                 :             :                 /* New inner tuple will fit on parent page */
     973                 :         792 :                 newInnerBuffer = parent->buffer;
     974                 :         792 :         }
     975         [ +  + ]:         130 :         else if (parent->buffer != InvalidBuffer)
     976                 :             :         {
     977                 :             :                 /* Send tuple to page with next triple parity (see README) */
     978                 :         234 :                 newInnerBuffer = SpGistGetBuffer(index,
     979                 :         234 :                                                                                  GBUF_INNER_PARITY(parent->blkno + 1) |
     980                 :         117 :                                                                                  (isNulls ? GBUF_NULLS : 0),
     981                 :         117 :                                                                                  innerTuple->size + sizeof(ItemIdData),
     982                 :         117 :                                                                                  &xlrec.initInner);
     983                 :         117 :         }
     984                 :             :         else
     985                 :             :         {
     986                 :             :                 /* Root page split ... inner tuple will go to root page */
     987                 :          13 :                 newInnerBuffer = InvalidBuffer;
     988                 :             :         }
     989                 :             : 
     990                 :             :         /*
     991                 :             :          * The new leaf tuples converted from the existing ones should require the
     992                 :             :          * same or less space, and therefore should all fit onto one page
     993                 :             :          * (although that's not necessarily the current page, since we can't
     994                 :             :          * delete the old tuples but only replace them with placeholders).
     995                 :             :          * However, the incoming new tuple might not also fit, in which case we
     996                 :             :          * might need another picksplit cycle to reduce it some more.
     997                 :             :          *
     998                 :             :          * If there's not room to put everything back onto the current page, then
     999                 :             :          * we decide on a per-node basis which tuples go to the new page. (We do
    1000                 :             :          * it like that because leaf tuple chains can't cross pages, so we must
    1001                 :             :          * place all leaf tuples belonging to the same parent node on the same
    1002                 :             :          * page.)
    1003                 :             :          *
    1004                 :             :          * If we are splitting the root page (turning it from a leaf page into an
    1005                 :             :          * inner page), then no leaf tuples can go back to the current page; they
    1006                 :             :          * must all go somewhere else.
    1007                 :             :          */
    1008   [ +  +  -  + ]:         922 :         if (!SpGistBlockIsRoot(current->blkno))
    1009                 :         909 :                 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
    1010                 :             :         else
    1011                 :          13 :                 currentFreeSpace = 0;   /* prevent assigning any tuples to current */
    1012                 :             : 
    1013                 :         922 :         xlrec.initDest = false;
    1014                 :             : 
    1015         [ +  + ]:         922 :         if (totalLeafSizes <= currentFreeSpace)
    1016                 :             :         {
    1017                 :             :                 /* All the leaf tuples will fit on current page */
    1018                 :           4 :                 newLeafBuffer = InvalidBuffer;
    1019                 :             :                 /* mark new leaf tuple as included in insertions, if allowed */
    1020         [ -  + ]:           4 :                 if (includeNew)
    1021                 :             :                 {
    1022                 :           4 :                         nToInsert++;
    1023                 :           4 :                         insertedNew = true;
    1024                 :           4 :                 }
    1025         [ +  + ]:         495 :                 for (i = 0; i < nToInsert; i++)
    1026                 :         491 :                         leafPageSelect[i] = 0;  /* signifies current page */
    1027                 :           4 :         }
    1028   [ -  +  #  # ]:         918 :         else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
    1029                 :             :         {
    1030                 :             :                 /*
    1031                 :             :                  * We're trying to split up a long value by repeated suffixing, but
    1032                 :             :                  * it's not going to fit yet.  Don't bother allocating a second leaf
    1033                 :             :                  * buffer that we won't be able to use.
    1034                 :             :                  */
    1035                 :           0 :                 newLeafBuffer = InvalidBuffer;
    1036         [ #  # ]:           0 :                 Assert(includeNew);
    1037         [ #  # ]:           0 :                 Assert(nToInsert == 0);
    1038                 :           0 :         }
    1039                 :             :         else
    1040                 :             :         {
    1041                 :             :                 /* We will need another leaf page */
    1042                 :         918 :                 uint8      *nodePageSelect;
    1043                 :         918 :                 int                     curspace;
    1044                 :         918 :                 int                     newspace;
    1045                 :             : 
    1046                 :        1836 :                 newLeafBuffer = SpGistGetBuffer(index,
    1047                 :         918 :                                                                                 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
    1048         [ +  + ]:         918 :                                                                                 Min(totalLeafSizes,
    1049                 :             :                                                                                         SPGIST_PAGE_CAPACITY),
    1050                 :         918 :                                                                                 &xlrec.initDest);
    1051                 :             : 
    1052                 :             :                 /*
    1053                 :             :                  * Attempt to assign node groups to the two pages.  We might fail to
    1054                 :             :                  * do so, even if totalLeafSizes is less than the available space,
    1055                 :             :                  * because we can't split a group across pages.
    1056                 :             :                  */
    1057                 :         918 :                 nodePageSelect = palloc_array(uint8, out.nNodes);
    1058                 :             : 
    1059                 :         918 :                 curspace = currentFreeSpace;
    1060                 :         918 :                 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
    1061         [ +  + ]:        7205 :                 for (i = 0; i < out.nNodes; i++)
    1062                 :             :                 {
    1063         [ +  + ]:        6287 :                         if (leafSizes[i] <= curspace)
    1064                 :             :                         {
    1065                 :        4082 :                                 nodePageSelect[i] = 0;  /* signifies current page */
    1066                 :        4082 :                                 curspace -= leafSizes[i];
    1067                 :        4082 :                         }
    1068                 :             :                         else
    1069                 :             :                         {
    1070                 :        2205 :                                 nodePageSelect[i] = 1;  /* signifies new leaf page */
    1071                 :        2205 :                                 newspace -= leafSizes[i];
    1072                 :             :                         }
    1073                 :        6287 :                 }
    1074   [ +  -  +  + ]:         918 :                 if (curspace >= 0 && newspace >= 0)
    1075                 :             :                 {
    1076                 :             :                         /* Successful assignment, so we can include the new leaf tuple */
    1077         [ -  + ]:         882 :                         if (includeNew)
    1078                 :             :                         {
    1079                 :         882 :                                 nToInsert++;
    1080                 :         882 :                                 insertedNew = true;
    1081                 :         882 :                         }
    1082                 :         882 :                 }
    1083         [ +  - ]:          36 :                 else if (includeNew)
    1084                 :             :                 {
    1085                 :             :                         /* We must exclude the new leaf tuple from the split */
    1086                 :          36 :                         int                     nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
    1087                 :             : 
    1088                 :          36 :                         leafSizes[nodeOfNewTuple] -=
    1089                 :          36 :                                 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
    1090                 :             : 
    1091                 :             :                         /* Repeat the node assignment process --- should succeed now */
    1092                 :          36 :                         curspace = currentFreeSpace;
    1093                 :          36 :                         newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
    1094         [ +  + ]:         174 :                         for (i = 0; i < out.nNodes; i++)
    1095                 :             :                         {
    1096         [ +  + ]:         138 :                                 if (leafSizes[i] <= curspace)
    1097                 :             :                                 {
    1098                 :          42 :                                         nodePageSelect[i] = 0;  /* signifies current page */
    1099                 :          42 :                                         curspace -= leafSizes[i];
    1100                 :          42 :                                 }
    1101                 :             :                                 else
    1102                 :             :                                 {
    1103                 :          96 :                                         nodePageSelect[i] = 1;  /* signifies new leaf page */
    1104                 :          96 :                                         newspace -= leafSizes[i];
    1105                 :             :                                 }
    1106                 :         138 :                         }
    1107         [ +  - ]:          36 :                         if (curspace < 0 || newspace < 0)
    1108   [ #  #  #  # ]:           0 :                                 elog(ERROR, "failed to divide leaf tuple groups across pages");
    1109                 :          36 :                 }
    1110                 :             :                 else
    1111                 :             :                 {
    1112                 :             :                         /* oops, we already excluded new tuple ... should not get here */
    1113   [ #  #  #  # ]:           0 :                         elog(ERROR, "failed to divide leaf tuple groups across pages");
    1114                 :             :                 }
    1115                 :             :                 /* Expand the per-node assignments to be shown per leaf tuple */
    1116         [ +  + ]:      118901 :                 for (i = 0; i < nToInsert; i++)
    1117                 :             :                 {
    1118                 :      117983 :                         n = out.mapTuplesToNodes[i];
    1119                 :      117983 :                         leafPageSelect[i] = nodePageSelect[n];
    1120                 :      117983 :                 }
    1121                 :         918 :         }
    1122                 :             : 
    1123                 :             :         /* Start preparing WAL record */
    1124                 :         922 :         xlrec.nDelete = 0;
    1125                 :         922 :         xlrec.initSrc = isNew;
    1126                 :         922 :         xlrec.storesNulls = isNulls;
    1127         [ +  + ]:         922 :         xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
    1128                 :             : 
    1129                 :         922 :         leafdata = leafptr = (char *) palloc(totalLeafSizes);
    1130                 :             : 
    1131                 :             :         /* Here we begin making the changes to the target pages */
    1132                 :         922 :         START_CRIT_SECTION();
    1133                 :             : 
    1134                 :             :         /*
    1135                 :             :          * Delete old leaf tuples from current buffer, except when we're splitting
    1136                 :             :          * the root; in that case there's no need because we'll re-init the page
    1137                 :             :          * below.  We do this first to make room for reinserting new leaf tuples.
    1138                 :             :          */
    1139   [ +  +  -  + ]:         922 :         if (!SpGistBlockIsRoot(current->blkno))
    1140                 :             :         {
    1141                 :             :                 /*
    1142                 :             :                  * Init buffer instead of deleting individual tuples, but only if
    1143                 :             :                  * there aren't any other live tuples and only during build; otherwise
    1144                 :             :                  * we need to set a redirection tuple for concurrent scans.
    1145                 :             :                  */
    1146   [ +  +  +  + ]:         909 :                 if (state->isBuild &&
    1147                 :        1418 :                         nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
    1148                 :         709 :                         PageGetMaxOffsetNumber(current->page))
    1149                 :             :                 {
    1150                 :          98 :                         SpGistInitBuffer(current->buffer,
    1151                 :          49 :                                                          SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
    1152                 :          49 :                         xlrec.initSrc = true;
    1153                 :          49 :                 }
    1154         [ -  + ]:         860 :                 else if (isNew)
    1155                 :             :                 {
    1156                 :             :                         /* don't expose the freshly init'd buffer as a backup block */
    1157         [ #  # ]:           0 :                         Assert(nToDelete == 0);
    1158                 :           0 :                 }
    1159                 :             :                 else
    1160                 :             :                 {
    1161                 :         860 :                         xlrec.nDelete = nToDelete;
    1162                 :             : 
    1163         [ +  + ]:         860 :                         if (!state->isBuild)
    1164                 :             :                         {
    1165                 :             :                                 /*
    1166                 :             :                                  * Need to create redirect tuple (it will point to new inner
    1167                 :             :                                  * tuple) but right now the new tuple's location is not known
    1168                 :             :                                  * yet.  So, set the redirection pointer to "impossible" value
    1169                 :             :                                  * and remember its position to update tuple later.
    1170                 :             :                                  */
    1171         [ -  + ]:         200 :                                 if (nToDelete > 0)
    1172                 :         200 :                                         redirectTuplePos = toDelete[0];
    1173                 :         400 :                                 spgPageIndexMultiDelete(state, current->page,
    1174                 :         200 :                                                                                 toDelete, nToDelete,
    1175                 :             :                                                                                 SPGIST_REDIRECT,
    1176                 :             :                                                                                 SPGIST_PLACEHOLDER,
    1177                 :             :                                                                                 SPGIST_METAPAGE_BLKNO,
    1178                 :             :                                                                                 FirstOffsetNumber);
    1179                 :         200 :                         }
    1180                 :             :                         else
    1181                 :             :                         {
    1182                 :             :                                 /*
    1183                 :             :                                  * During index build there is not concurrent searches, so we
    1184                 :             :                                  * don't need to create redirection tuple.
    1185                 :             :                                  */
    1186                 :        1320 :                                 spgPageIndexMultiDelete(state, current->page,
    1187                 :         660 :                                                                                 toDelete, nToDelete,
    1188                 :             :                                                                                 SPGIST_PLACEHOLDER,
    1189                 :             :                                                                                 SPGIST_PLACEHOLDER,
    1190                 :             :                                                                                 InvalidBlockNumber,
    1191                 :             :                                                                                 InvalidOffsetNumber);
    1192                 :             :                         }
    1193                 :             :                 }
    1194                 :         909 :         }
    1195                 :             : 
    1196                 :             :         /*
    1197                 :             :          * Put leaf tuples on proper pages, and update downlinks in innerTuple's
    1198                 :             :          * nodes.
    1199                 :             :          */
    1200                 :         922 :         startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
    1201         [ +  + ]:      119396 :         for (i = 0; i < nToInsert; i++)
    1202                 :             :         {
    1203                 :      118474 :                 SpGistLeafTuple it = newLeafs[i];
    1204                 :      118474 :                 Buffer          leafBuffer;
    1205                 :      118474 :                 BlockNumber leafBlock;
    1206                 :      118474 :                 OffsetNumber newoffset;
    1207                 :             : 
    1208                 :             :                 /* Which page is it going to? */
    1209         [ +  + ]:      118474 :                 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
    1210                 :      118474 :                 leafBlock = BufferGetBlockNumber(leafBuffer);
    1211                 :             : 
    1212                 :             :                 /* Link tuple into correct chain for its node */
    1213                 :      118474 :                 n = out.mapTuplesToNodes[i];
    1214                 :             : 
    1215         [ +  + ]:      118474 :                 if (ItemPointerIsValid(&nodes[n]->t_tid))
    1216                 :             :                 {
    1217         [ -  + ]:      114926 :                         Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
    1218                 :      114926 :                         SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
    1219                 :      114926 :                 }
    1220                 :             :                 else
    1221                 :        3548 :                         SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
    1222                 :             : 
    1223                 :             :                 /* Insert it on page */
    1224                 :      236948 :                 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
    1225                 :      118474 :                                                                                  it, it->size,
    1226                 :      118474 :                                                                                  &startOffsets[leafPageSelect[i]],
    1227                 :             :                                                                                  false);
    1228                 :      118474 :                 toInsert[i] = newoffset;
    1229                 :             : 
    1230                 :             :                 /* ... and complete the chain linking */
    1231                 :      118474 :                 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
    1232                 :             : 
    1233                 :             :                 /* Also copy leaf tuple into WAL data */
    1234                 :      118474 :                 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
    1235                 :      118474 :                 leafptr += newLeafs[i]->size;
    1236                 :      118474 :         }
    1237                 :             : 
    1238                 :             :         /*
    1239                 :             :          * We're done modifying the other leaf buffer (if any), so mark it dirty.
    1240                 :             :          * current->buffer will be marked below, after we're entirely done
    1241                 :             :          * modifying it.
    1242                 :             :          */
    1243         [ +  + ]:         922 :         if (newLeafBuffer != InvalidBuffer)
    1244                 :             :         {
    1245                 :         918 :                 MarkBufferDirty(newLeafBuffer);
    1246                 :         918 :         }
    1247                 :             : 
    1248                 :             :         /* Remember current buffer, since we're about to change "current" */
    1249                 :         922 :         saveCurrent = *current;
    1250                 :             : 
    1251                 :             :         /*
    1252                 :             :          * Store the new innerTuple
    1253                 :             :          */
    1254   [ +  +  +  + ]:         922 :         if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
    1255                 :             :         {
    1256                 :             :                 /*
    1257                 :             :                  * new inner tuple goes to parent page
    1258                 :             :                  */
    1259         [ +  - ]:         792 :                 Assert(current->buffer != parent->buffer);
    1260                 :             : 
    1261                 :             :                 /* Repoint "current" at the new inner tuple */
    1262                 :         792 :                 current->blkno = parent->blkno;
    1263                 :         792 :                 current->buffer = parent->buffer;
    1264                 :         792 :                 current->page = parent->page;
    1265                 :         792 :                 xlrec.offnumInner = current->offnum =
    1266                 :        1584 :                         SpGistPageAddNewItem(state, current->page,
    1267                 :         792 :                                                                  innerTuple, innerTuple->size,
    1268                 :             :                                                                  NULL, false);
    1269                 :             : 
    1270                 :             :                 /*
    1271                 :             :                  * Update parent node link and mark parent page dirty
    1272                 :             :                  */
    1273                 :         792 :                 xlrec.innerIsParent = true;
    1274                 :         792 :                 xlrec.offnumParent = parent->offnum;
    1275                 :         792 :                 xlrec.nodeI = parent->node;
    1276                 :         792 :                 saveNodeLink(index, parent, current->blkno, current->offnum);
    1277                 :             : 
    1278                 :             :                 /*
    1279                 :             :                  * Update redirection link (in old current buffer)
    1280                 :             :                  */
    1281         [ +  + ]:         792 :                 if (redirectTuplePos != InvalidOffsetNumber)
    1282                 :         370 :                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
    1283                 :         185 :                                                                 current->blkno, current->offnum);
    1284                 :             : 
    1285                 :             :                 /* Done modifying old current buffer, mark it dirty */
    1286                 :         792 :                 MarkBufferDirty(saveCurrent.buffer);
    1287                 :         792 :         }
    1288         [ +  + ]:         130 :         else if (parent->buffer != InvalidBuffer)
    1289                 :             :         {
    1290                 :             :                 /*
    1291                 :             :                  * new inner tuple will be stored on a new page
    1292                 :             :                  */
    1293         [ +  - ]:         117 :                 Assert(newInnerBuffer != InvalidBuffer);
    1294                 :             : 
    1295                 :             :                 /* Repoint "current" at the new inner tuple */
    1296                 :         117 :                 current->buffer = newInnerBuffer;
    1297                 :         117 :                 current->blkno = BufferGetBlockNumber(current->buffer);
    1298                 :         117 :                 current->page = BufferGetPage(current->buffer);
    1299                 :         117 :                 xlrec.offnumInner = current->offnum =
    1300                 :         234 :                         SpGistPageAddNewItem(state, current->page,
    1301                 :         117 :                                                                  innerTuple, innerTuple->size,
    1302                 :             :                                                                  NULL, false);
    1303                 :             : 
    1304                 :             :                 /* Done modifying new current buffer, mark it dirty */
    1305                 :         117 :                 MarkBufferDirty(current->buffer);
    1306                 :             : 
    1307                 :             :                 /*
    1308                 :             :                  * Update parent node link and mark parent page dirty
    1309                 :             :                  */
    1310                 :         117 :                 xlrec.innerIsParent = (parent->buffer == current->buffer);
    1311                 :         117 :                 xlrec.offnumParent = parent->offnum;
    1312                 :         117 :                 xlrec.nodeI = parent->node;
    1313                 :         117 :                 saveNodeLink(index, parent, current->blkno, current->offnum);
    1314                 :             : 
    1315                 :             :                 /*
    1316                 :             :                  * Update redirection link (in old current buffer)
    1317                 :             :                  */
    1318         [ +  + ]:         117 :                 if (redirectTuplePos != InvalidOffsetNumber)
    1319                 :          30 :                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
    1320                 :          15 :                                                                 current->blkno, current->offnum);
    1321                 :             : 
    1322                 :             :                 /* Done modifying old current buffer, mark it dirty */
    1323                 :         117 :                 MarkBufferDirty(saveCurrent.buffer);
    1324                 :         117 :         }
    1325                 :             :         else
    1326                 :             :         {
    1327                 :             :                 /*
    1328                 :             :                  * Splitting root page, which was a leaf but now becomes inner page
    1329                 :             :                  * (and so "current" continues to point at it)
    1330                 :             :                  */
    1331   [ -  +  #  # ]:          13 :                 Assert(SpGistBlockIsRoot(current->blkno));
    1332         [ +  - ]:          13 :                 Assert(redirectTuplePos == InvalidOffsetNumber);
    1333                 :             : 
    1334                 :          13 :                 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
    1335                 :          13 :                 xlrec.initInner = true;
    1336                 :          13 :                 xlrec.innerIsParent = false;
    1337                 :             : 
    1338                 :          13 :                 xlrec.offnumInner = current->offnum =
    1339                 :          13 :                         PageAddItem(current->page, innerTuple, innerTuple->size,
    1340                 :             :                                                 InvalidOffsetNumber, false, false);
    1341         [ +  - ]:          13 :                 if (current->offnum != FirstOffsetNumber)
    1342   [ #  #  #  # ]:           0 :                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1343                 :             :                                  innerTuple->size);
    1344                 :             : 
    1345                 :             :                 /* No parent link to update, nor redirection to do */
    1346                 :          13 :                 xlrec.offnumParent = InvalidOffsetNumber;
    1347                 :          13 :                 xlrec.nodeI = 0;
    1348                 :             : 
    1349                 :             :                 /* Done modifying new current buffer, mark it dirty */
    1350                 :          13 :                 MarkBufferDirty(current->buffer);
    1351                 :             : 
    1352                 :             :                 /* saveCurrent doesn't represent a different buffer */
    1353                 :          13 :                 saveCurrent.buffer = InvalidBuffer;
    1354                 :             :         }
    1355                 :             : 
    1356   [ +  +  +  -  :         922 :         if (RelationNeedsWAL(index) && !state->isBuild)
             +  +  +  + ]
    1357                 :             :         {
    1358                 :         203 :                 XLogRecPtr      recptr;
    1359                 :         203 :                 int                     flags;
    1360                 :             : 
    1361                 :         203 :                 XLogBeginInsert();
    1362                 :             : 
    1363                 :         203 :                 xlrec.nInsert = nToInsert;
    1364                 :         203 :                 XLogRegisterData(&xlrec, SizeOfSpgxlogPickSplit);
    1365                 :             : 
    1366                 :         406 :                 XLogRegisterData(toDelete,
    1367                 :         203 :                                                  sizeof(OffsetNumber) * xlrec.nDelete);
    1368                 :         406 :                 XLogRegisterData(toInsert,
    1369                 :         203 :                                                  sizeof(OffsetNumber) * xlrec.nInsert);
    1370                 :         406 :                 XLogRegisterData(leafPageSelect,
    1371                 :         203 :                                                  sizeof(uint8) * xlrec.nInsert);
    1372                 :         203 :                 XLogRegisterData(innerTuple, innerTuple->size);
    1373                 :         203 :                 XLogRegisterData(leafdata, leafptr - leafdata);
    1374                 :             : 
    1375                 :             :                 /* Old leaf page */
    1376         [ +  + ]:         203 :                 if (BufferIsValid(saveCurrent.buffer))
    1377                 :             :                 {
    1378                 :         200 :                         flags = REGBUF_STANDARD;
    1379         [ +  - ]:         200 :                         if (xlrec.initSrc)
    1380                 :           0 :                                 flags |= REGBUF_WILL_INIT;
    1381                 :         200 :                         XLogRegisterBuffer(0, saveCurrent.buffer, flags);
    1382                 :         200 :                 }
    1383                 :             : 
    1384                 :             :                 /* New leaf page */
    1385         [ -  + ]:         203 :                 if (BufferIsValid(newLeafBuffer))
    1386                 :             :                 {
    1387                 :         203 :                         flags = REGBUF_STANDARD;
    1388         [ +  + ]:         203 :                         if (xlrec.initDest)
    1389                 :         189 :                                 flags |= REGBUF_WILL_INIT;
    1390                 :         203 :                         XLogRegisterBuffer(1, newLeafBuffer, flags);
    1391                 :         203 :                 }
    1392                 :             : 
    1393                 :             :                 /* Inner page */
    1394                 :         203 :                 flags = REGBUF_STANDARD;
    1395         [ +  + ]:         203 :                 if (xlrec.initInner)
    1396                 :           6 :                         flags |= REGBUF_WILL_INIT;
    1397                 :         203 :                 XLogRegisterBuffer(2, current->buffer, flags);
    1398                 :             : 
    1399                 :             :                 /* Parent page, if different from inner page */
    1400         [ +  + ]:         203 :                 if (parent->buffer != InvalidBuffer)
    1401                 :             :                 {
    1402         [ +  + ]:         200 :                         if (parent->buffer != current->buffer)
    1403                 :          15 :                                 XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
    1404                 :             :                         else
    1405         [ +  - ]:         185 :                                 Assert(xlrec.innerIsParent);
    1406                 :         200 :                 }
    1407                 :             : 
    1408                 :             :                 /* Issue the WAL record */
    1409                 :         203 :                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
    1410                 :             : 
    1411                 :             :                 /* Update page LSNs on all affected pages */
    1412         [ -  + ]:         203 :                 if (newLeafBuffer != InvalidBuffer)
    1413                 :             :                 {
    1414                 :         203 :                         Page            page = BufferGetPage(newLeafBuffer);
    1415                 :             : 
    1416                 :         203 :                         PageSetLSN(page, recptr);
    1417                 :         203 :                 }
    1418                 :             : 
    1419         [ +  + ]:         203 :                 if (saveCurrent.buffer != InvalidBuffer)
    1420                 :             :                 {
    1421                 :         200 :                         Page            page = BufferGetPage(saveCurrent.buffer);
    1422                 :             : 
    1423                 :         200 :                         PageSetLSN(page, recptr);
    1424                 :         200 :                 }
    1425                 :             : 
    1426                 :         203 :                 PageSetLSN(current->page, recptr);
    1427                 :             : 
    1428         [ +  + ]:         203 :                 if (parent->buffer != InvalidBuffer)
    1429                 :             :                 {
    1430                 :         200 :                         PageSetLSN(parent->page, recptr);
    1431                 :         200 :                 }
    1432                 :         203 :         }
    1433                 :             : 
    1434         [ +  - ]:        1328 :         END_CRIT_SECTION();
    1435                 :             : 
    1436                 :             :         /* Update local free-space cache and unlock buffers */
    1437         [ +  + ]:        1328 :         if (newLeafBuffer != InvalidBuffer)
    1438                 :             :         {
    1439                 :         918 :                 SpGistSetLastUsedPage(index, newLeafBuffer);
    1440                 :         918 :                 UnlockReleaseBuffer(newLeafBuffer);
    1441                 :         918 :         }
    1442         [ +  + ]:        1328 :         if (saveCurrent.buffer != InvalidBuffer)
    1443                 :             :         {
    1444                 :         909 :                 SpGistSetLastUsedPage(index, saveCurrent.buffer);
    1445                 :         909 :                 UnlockReleaseBuffer(saveCurrent.buffer);
    1446                 :         909 :         }
    1447                 :             : 
    1448                 :        2656 :         return insertedNew;
    1449                 :        1328 : }
    1450                 :             : 
    1451                 :             : /*
    1452                 :             :  * spgMatchNode action: descend to N'th child node of current inner tuple
    1453                 :             :  */
    1454                 :             : static void
    1455                 :     3106050 : spgMatchNodeAction(Relation index, SpGistState *state,
    1456                 :             :                                    SpGistInnerTuple innerTuple,
    1457                 :             :                                    SPPageDesc *current, SPPageDesc *parent, int nodeN)
    1458                 :             : {
    1459                 :     3106050 :         int                     i;
    1460                 :     3106050 :         SpGistNodeTuple node;
    1461                 :             : 
    1462                 :             :         /* Release previous parent buffer if any */
    1463   [ +  +  +  + ]:     3106050 :         if (parent->buffer != InvalidBuffer &&
    1464                 :     2976786 :                 parent->buffer != current->buffer)
    1465                 :             :         {
    1466                 :      129584 :                 SpGistSetLastUsedPage(index, parent->buffer);
    1467                 :      129584 :                 UnlockReleaseBuffer(parent->buffer);
    1468                 :      129584 :         }
    1469                 :             : 
    1470                 :             :         /* Repoint parent to specified node of current inner tuple */
    1471                 :     3106050 :         parent->blkno = current->blkno;
    1472                 :     3106050 :         parent->buffer = current->buffer;
    1473                 :     3106050 :         parent->page = current->page;
    1474                 :     3106050 :         parent->offnum = current->offnum;
    1475                 :     3106050 :         parent->node = nodeN;
    1476                 :             : 
    1477                 :             :         /* Locate that node */
    1478         [ -  + ]:     5177975 :         SGITITERATE(innerTuple, i, node)
    1479                 :             :         {
    1480         [ +  + ]:     5177975 :                 if (i == nodeN)
    1481                 :     3106050 :                         break;
    1482                 :     2071925 :         }
    1483                 :             : 
    1484         [ +  - ]:     3106050 :         if (i != nodeN)
    1485   [ #  #  #  # ]:           0 :                 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
    1486                 :             :                          nodeN);
    1487                 :             : 
    1488                 :             :         /* Point current to the downlink location, if any */
    1489         [ +  + ]:     3106050 :         if (ItemPointerIsValid(&node->t_tid))
    1490                 :             :         {
    1491                 :     3105746 :                 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
    1492                 :     3105746 :                 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
    1493                 :     3105746 :         }
    1494                 :             :         else
    1495                 :             :         {
    1496                 :             :                 /* Downlink is empty, so we'll need to find a new page */
    1497                 :         304 :                 current->blkno = InvalidBlockNumber;
    1498                 :         304 :                 current->offnum = InvalidOffsetNumber;
    1499                 :             :         }
    1500                 :             : 
    1501                 :     3106050 :         current->buffer = InvalidBuffer;
    1502                 :     3106050 :         current->page = NULL;
    1503                 :     3106050 : }
    1504                 :             : 
    1505                 :             : /*
    1506                 :             :  * spgAddNode action: add a node to the inner tuple at current
    1507                 :             :  */
    1508                 :             : static void
    1509                 :         428 : spgAddNodeAction(Relation index, SpGistState *state,
    1510                 :             :                                  SpGistInnerTuple innerTuple,
    1511                 :             :                                  SPPageDesc *current, SPPageDesc *parent,
    1512                 :             :                                  int nodeN, Datum nodeLabel)
    1513                 :             : {
    1514                 :         428 :         SpGistInnerTuple newInnerTuple;
    1515                 :         428 :         spgxlogAddNode xlrec;
    1516                 :             : 
    1517                 :             :         /* Should not be applied to nulls */
    1518         [ +  - ]:         428 :         Assert(!SpGistPageStoresNulls(current->page));
    1519                 :             : 
    1520                 :             :         /* Construct new inner tuple with additional node */
    1521                 :         428 :         newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
    1522                 :             : 
    1523                 :             :         /* Prepare WAL record */
    1524                 :         428 :         STORE_STATE(state, xlrec.stateSrc);
    1525                 :         428 :         xlrec.offnum = current->offnum;
    1526                 :             : 
    1527                 :             :         /* we don't fill these unless we need to change the parent downlink */
    1528                 :         428 :         xlrec.parentBlk = -1;
    1529                 :         428 :         xlrec.offnumParent = InvalidOffsetNumber;
    1530                 :         428 :         xlrec.nodeI = 0;
    1531                 :             : 
    1532                 :             :         /* we don't fill these unless tuple has to be moved */
    1533                 :         428 :         xlrec.offnumNew = InvalidOffsetNumber;
    1534                 :         428 :         xlrec.newPage = false;
    1535                 :             : 
    1536   [ +  +  +  + ]:         856 :         if (PageGetExactFreeSpace(current->page) >=
    1537                 :         428 :                 newInnerTuple->size - innerTuple->size)
    1538                 :             :         {
    1539                 :             :                 /*
    1540                 :             :                  * We can replace the inner tuple by new version in-place
    1541                 :             :                  */
    1542                 :         427 :                 START_CRIT_SECTION();
    1543                 :             : 
    1544                 :         427 :                 PageIndexTupleDelete(current->page, current->offnum);
    1545                 :         427 :                 if (PageAddItem(current->page,
    1546                 :             :                                                 newInnerTuple, newInnerTuple->size,
    1547         [ +  - ]:         427 :                                                 current->offnum, false, false) != current->offnum)
    1548   [ #  #  #  # ]:           0 :                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1549                 :             :                                  newInnerTuple->size);
    1550                 :             : 
    1551                 :         427 :                 MarkBufferDirty(current->buffer);
    1552                 :             : 
    1553   [ +  +  +  -  :         427 :                 if (RelationNeedsWAL(index) && !state->isBuild)
             +  +  +  + ]
    1554                 :             :                 {
    1555                 :         100 :                         XLogRecPtr      recptr;
    1556                 :             : 
    1557                 :         100 :                         XLogBeginInsert();
    1558                 :         100 :                         XLogRegisterData(&xlrec, sizeof(xlrec));
    1559                 :         100 :                         XLogRegisterData(newInnerTuple, newInnerTuple->size);
    1560                 :             : 
    1561                 :         100 :                         XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
    1562                 :             : 
    1563                 :         100 :                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
    1564                 :             : 
    1565                 :         100 :                         PageSetLSN(current->page, recptr);
    1566                 :         100 :                 }
    1567                 :             : 
    1568         [ +  - ]:         627 :                 END_CRIT_SECTION();
    1569                 :         227 :         }
    1570                 :             :         else
    1571                 :             :         {
    1572                 :             :                 /*
    1573                 :             :                  * move inner tuple to another page, and update parent
    1574                 :             :                  */
    1575                 :           1 :                 SpGistDeadTuple dt;
    1576                 :           1 :                 SPPageDesc      saveCurrent;
    1577                 :             : 
    1578                 :             :                 /*
    1579                 :             :                  * It should not be possible to get here for the root page, since we
    1580                 :             :                  * allow only one inner tuple on the root page, and spgFormInnerTuple
    1581                 :             :                  * always checks that inner tuples don't exceed the size of a page.
    1582                 :             :                  */
    1583         [ +  - ]:           1 :                 if (SpGistBlockIsRoot(current->blkno))
    1584   [ #  #  #  # ]:           0 :                         elog(ERROR, "cannot enlarge root tuple any more");
    1585         [ +  - ]:           1 :                 Assert(parent->buffer != InvalidBuffer);
    1586                 :             : 
    1587                 :           1 :                 saveCurrent = *current;
    1588                 :             : 
    1589                 :           1 :                 xlrec.offnumParent = parent->offnum;
    1590                 :           1 :                 xlrec.nodeI = parent->node;
    1591                 :             : 
    1592                 :             :                 /*
    1593                 :             :                  * obtain new buffer with the same parity as current, since it will be
    1594                 :             :                  * a child of same parent tuple
    1595                 :             :                  */
    1596                 :           2 :                 current->buffer = SpGistGetBuffer(index,
    1597                 :           1 :                                                                                   GBUF_INNER_PARITY(current->blkno),
    1598                 :           1 :                                                                                   newInnerTuple->size + sizeof(ItemIdData),
    1599                 :           1 :                                                                                   &xlrec.newPage);
    1600                 :           1 :                 current->blkno = BufferGetBlockNumber(current->buffer);
    1601                 :           1 :                 current->page = BufferGetPage(current->buffer);
    1602                 :             : 
    1603                 :             :                 /*
    1604                 :             :                  * Let's just make real sure new current isn't same as old.  Right now
    1605                 :             :                  * that's impossible, but if SpGistGetBuffer ever got smart enough to
    1606                 :             :                  * delete placeholder tuples before checking space, maybe it wouldn't
    1607                 :             :                  * be impossible.  The case would appear to work except that WAL
    1608                 :             :                  * replay would be subtly wrong, so I think a mere assert isn't enough
    1609                 :             :                  * here.
    1610                 :             :                  */
    1611         [ +  - ]:           1 :                 if (current->blkno == saveCurrent.blkno)
    1612   [ #  #  #  # ]:           0 :                         elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
    1613                 :             : 
    1614                 :             :                 /*
    1615                 :             :                  * New current and parent buffer will both be modified; but note that
    1616                 :             :                  * parent buffer could be same as either new or old current.
    1617                 :             :                  */
    1618         [ -  + ]:           1 :                 if (parent->buffer == saveCurrent.buffer)
    1619                 :           0 :                         xlrec.parentBlk = 0;
    1620         [ -  + ]:           1 :                 else if (parent->buffer == current->buffer)
    1621                 :           0 :                         xlrec.parentBlk = 1;
    1622                 :             :                 else
    1623                 :           1 :                         xlrec.parentBlk = 2;
    1624                 :             : 
    1625                 :           1 :                 START_CRIT_SECTION();
    1626                 :             : 
    1627                 :             :                 /* insert new ... */
    1628                 :           1 :                 xlrec.offnumNew = current->offnum =
    1629                 :           2 :                         SpGistPageAddNewItem(state, current->page,
    1630                 :           1 :                                                                  newInnerTuple, newInnerTuple->size,
    1631                 :             :                                                                  NULL, false);
    1632                 :             : 
    1633                 :           1 :                 MarkBufferDirty(current->buffer);
    1634                 :             : 
    1635                 :             :                 /* update parent's downlink and mark parent page dirty */
    1636                 :           1 :                 saveNodeLink(index, parent, current->blkno, current->offnum);
    1637                 :             : 
    1638                 :             :                 /*
    1639                 :             :                  * Replace old tuple with a placeholder or redirection tuple.  Unless
    1640                 :             :                  * doing an index build, we have to insert a redirection tuple for
    1641                 :             :                  * possible concurrent scans.  We can't just delete it in any case,
    1642                 :             :                  * because that could change the offsets of other tuples on the page,
    1643                 :             :                  * breaking downlinks from their parents.
    1644                 :             :                  */
    1645         [ -  + ]:           1 :                 if (state->isBuild)
    1646                 :           0 :                         dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
    1647                 :             :                                                                   InvalidBlockNumber, InvalidOffsetNumber);
    1648                 :             :                 else
    1649                 :           2 :                         dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
    1650                 :           1 :                                                                   current->blkno, current->offnum);
    1651                 :             : 
    1652                 :           1 :                 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
    1653                 :           1 :                 if (PageAddItem(saveCurrent.page, dt, dt->size,
    1654                 :             :                                                 saveCurrent.offnum,
    1655         [ +  - ]:           1 :                                                 false, false) != saveCurrent.offnum)
    1656   [ #  #  #  # ]:           0 :                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1657                 :             :                                  dt->size);
    1658                 :             : 
    1659         [ -  + ]:           1 :                 if (state->isBuild)
    1660                 :           0 :                         SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
    1661                 :             :                 else
    1662                 :           1 :                         SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
    1663                 :             : 
    1664                 :           1 :                 MarkBufferDirty(saveCurrent.buffer);
    1665                 :             : 
    1666   [ +  -  +  -  :           1 :                 if (RelationNeedsWAL(index) && !state->isBuild)
             +  -  +  + ]
    1667                 :             :                 {
    1668                 :           1 :                         XLogRecPtr      recptr;
    1669                 :           1 :                         int                     flags;
    1670                 :             : 
    1671                 :           1 :                         XLogBeginInsert();
    1672                 :             : 
    1673                 :             :                         /* orig page */
    1674                 :           1 :                         XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
    1675                 :             :                         /* new page */
    1676                 :           1 :                         flags = REGBUF_STANDARD;
    1677         [ -  + ]:           1 :                         if (xlrec.newPage)
    1678                 :           1 :                                 flags |= REGBUF_WILL_INIT;
    1679                 :           1 :                         XLogRegisterBuffer(1, current->buffer, flags);
    1680                 :             :                         /* parent page (if different from orig and new) */
    1681         [ -  + ]:           1 :                         if (xlrec.parentBlk == 2)
    1682                 :           1 :                                 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
    1683                 :             : 
    1684                 :           1 :                         XLogRegisterData(&xlrec, sizeof(xlrec));
    1685                 :           1 :                         XLogRegisterData(newInnerTuple, newInnerTuple->size);
    1686                 :             : 
    1687                 :           1 :                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
    1688                 :             : 
    1689                 :             :                         /* we don't bother to check if any of these are redundant */
    1690                 :           1 :                         PageSetLSN(current->page, recptr);
    1691                 :           1 :                         PageSetLSN(parent->page, recptr);
    1692                 :           1 :                         PageSetLSN(saveCurrent.page, recptr);
    1693                 :           1 :                 }
    1694                 :             : 
    1695         [ +  - ]:           3 :                 END_CRIT_SECTION();
    1696                 :             : 
    1697                 :             :                 /* Release saveCurrent if it's not same as current or parent */
    1698   [ +  +  -  + ]:           3 :                 if (saveCurrent.buffer != current->buffer &&
    1699                 :           1 :                         saveCurrent.buffer != parent->buffer)
    1700                 :             :                 {
    1701                 :           1 :                         SpGistSetLastUsedPage(index, saveCurrent.buffer);
    1702                 :           1 :                         UnlockReleaseBuffer(saveCurrent.buffer);
    1703                 :           1 :                 }
    1704                 :           3 :         }
    1705                 :         230 : }
    1706                 :             : 
    1707                 :             : /*
    1708                 :             :  * spgSplitNode action: split inner tuple at current into prefix and postfix
    1709                 :             :  */
    1710                 :             : static void
    1711                 :         106 : spgSplitNodeAction(Relation index, SpGistState *state,
    1712                 :             :                                    SpGistInnerTuple innerTuple,
    1713                 :             :                                    SPPageDesc *current, spgChooseOut *out)
    1714                 :             : {
    1715                 :         106 :         SpGistInnerTuple prefixTuple,
    1716                 :             :                                 postfixTuple;
    1717                 :         106 :         SpGistNodeTuple node,
    1718                 :             :                            *nodes;
    1719                 :         106 :         BlockNumber postfixBlkno;
    1720                 :         106 :         OffsetNumber postfixOffset;
    1721                 :         106 :         int                     i;
    1722                 :         106 :         spgxlogSplitTuple xlrec;
    1723                 :         106 :         Buffer          newBuffer = InvalidBuffer;
    1724                 :             : 
    1725                 :             :         /* Should not be applied to nulls */
    1726         [ +  - ]:         106 :         Assert(!SpGistPageStoresNulls(current->page));
    1727                 :             : 
    1728                 :             :         /* Check opclass gave us sane values */
    1729         [ +  - ]:         106 :         if (out->result.splitTuple.prefixNNodes <= 0 ||
    1730                 :         106 :                 out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
    1731   [ #  #  #  # ]:           0 :                 elog(ERROR, "invalid number of prefix nodes: %d",
    1732                 :             :                          out->result.splitTuple.prefixNNodes);
    1733         [ +  - ]:         106 :         if (out->result.splitTuple.childNodeN < 0 ||
    1734                 :         212 :                 out->result.splitTuple.childNodeN >=
    1735                 :         106 :                 out->result.splitTuple.prefixNNodes)
    1736   [ #  #  #  # ]:           0 :                 elog(ERROR, "invalid child node number: %d",
    1737                 :             :                          out->result.splitTuple.childNodeN);
    1738                 :             : 
    1739                 :             :         /*
    1740                 :             :          * Construct new prefix tuple with requested number of nodes.  We'll fill
    1741                 :             :          * in the childNodeN'th node's downlink below.
    1742                 :             :          */
    1743                 :         106 :         nodes = palloc_array(SpGistNodeTuple, out->result.splitTuple.prefixNNodes);
    1744                 :             : 
    1745         [ +  + ]:         212 :         for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
    1746                 :             :         {
    1747                 :         106 :                 Datum           label = (Datum) 0;
    1748                 :         106 :                 bool            labelisnull;
    1749                 :             : 
    1750                 :         106 :                 labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
    1751         [ -  + ]:         106 :                 if (!labelisnull)
    1752                 :         106 :                         label = out->result.splitTuple.prefixNodeLabels[i];
    1753                 :         106 :                 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
    1754                 :         106 :         }
    1755                 :             : 
    1756                 :         212 :         prefixTuple = spgFormInnerTuple(state,
    1757                 :         106 :                                                                         out->result.splitTuple.prefixHasPrefix,
    1758                 :         106 :                                                                         out->result.splitTuple.prefixPrefixDatum,
    1759                 :         106 :                                                                         out->result.splitTuple.prefixNNodes,
    1760                 :         106 :                                                                         nodes);
    1761                 :             : 
    1762                 :             :         /* it must fit in the space that innerTuple now occupies */
    1763         [ +  - ]:         106 :         if (prefixTuple->size > innerTuple->size)
    1764   [ #  #  #  # ]:           0 :                 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
    1765                 :             : 
    1766                 :             :         /*
    1767                 :             :          * Construct new postfix tuple, containing all nodes of innerTuple with
    1768                 :             :          * same node datums, but with the prefix specified by the picksplit
    1769                 :             :          * function.
    1770                 :             :          */
    1771                 :         106 :         nodes = palloc_array(SpGistNodeTuple, innerTuple->nNodes);
    1772         [ +  + ]:         368 :         SGITITERATE(innerTuple, i, node)
    1773                 :             :         {
    1774                 :         262 :                 nodes[i] = node;
    1775                 :         262 :         }
    1776                 :             : 
    1777                 :         212 :         postfixTuple = spgFormInnerTuple(state,
    1778                 :         106 :                                                                          out->result.splitTuple.postfixHasPrefix,
    1779                 :         106 :                                                                          out->result.splitTuple.postfixPrefixDatum,
    1780                 :         106 :                                                                          innerTuple->nNodes, nodes);
    1781                 :             : 
    1782                 :             :         /* Postfix tuple is allTheSame if original tuple was */
    1783                 :         106 :         postfixTuple->allTheSame = innerTuple->allTheSame;
    1784                 :             : 
    1785                 :             :         /* prep data for WAL record */
    1786                 :         106 :         xlrec.newPage = false;
    1787                 :             : 
    1788                 :             :         /*
    1789                 :             :          * If we can't fit both tuples on the current page, get a new page for the
    1790                 :             :          * postfix tuple.  In particular, can't split to the root page.
    1791                 :             :          *
    1792                 :             :          * For the space calculation, note that prefixTuple replaces innerTuple
    1793                 :             :          * but postfixTuple will be a new entry.
    1794                 :             :          */
    1795   [ +  +  -  +  :         210 :         if (SpGistBlockIsRoot(current->blkno) ||
                   +  + ]
    1796         [ -  + ]:         104 :                 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
    1797                 :         104 :                 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
    1798                 :             :         {
    1799                 :             :                 /*
    1800                 :             :                  * Choose page with next triple parity, because postfix tuple is a
    1801                 :             :                  * child of prefix one
    1802                 :             :                  */
    1803                 :          56 :                 newBuffer = SpGistGetBuffer(index,
    1804                 :          28 :                                                                         GBUF_INNER_PARITY(current->blkno + 1),
    1805                 :          28 :                                                                         postfixTuple->size + sizeof(ItemIdData),
    1806                 :          28 :                                                                         &xlrec.newPage);
    1807                 :          28 :         }
    1808                 :             : 
    1809                 :         106 :         START_CRIT_SECTION();
    1810                 :             : 
    1811                 :             :         /*
    1812                 :             :          * Replace old tuple by prefix tuple
    1813                 :             :          */
    1814                 :         106 :         PageIndexTupleDelete(current->page, current->offnum);
    1815                 :         106 :         xlrec.offnumPrefix = PageAddItem(current->page,
    1816                 :             :                                                                          prefixTuple, prefixTuple->size,
    1817                 :             :                                                                          current->offnum, false, false);
    1818         [ +  - ]:         106 :         if (xlrec.offnumPrefix != current->offnum)
    1819   [ #  #  #  # ]:           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1820                 :             :                          prefixTuple->size);
    1821                 :             : 
    1822                 :             :         /*
    1823                 :             :          * put postfix tuple into appropriate page
    1824                 :             :          */
    1825         [ +  + ]:         106 :         if (newBuffer == InvalidBuffer)
    1826                 :             :         {
    1827                 :          78 :                 postfixBlkno = current->blkno;
    1828                 :          78 :                 xlrec.offnumPostfix = postfixOffset =
    1829                 :         156 :                         SpGistPageAddNewItem(state, current->page,
    1830                 :          78 :                                                                  postfixTuple, postfixTuple->size,
    1831                 :             :                                                                  NULL, false);
    1832                 :          78 :                 xlrec.postfixBlkSame = true;
    1833                 :          78 :         }
    1834                 :             :         else
    1835                 :             :         {
    1836                 :          28 :                 postfixBlkno = BufferGetBlockNumber(newBuffer);
    1837                 :          28 :                 xlrec.offnumPostfix = postfixOffset =
    1838                 :          56 :                         SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
    1839                 :          28 :                                                                  postfixTuple, postfixTuple->size,
    1840                 :             :                                                                  NULL, false);
    1841                 :          28 :                 MarkBufferDirty(newBuffer);
    1842                 :          28 :                 xlrec.postfixBlkSame = false;
    1843                 :             :         }
    1844                 :             : 
    1845                 :             :         /*
    1846                 :             :          * And set downlink pointer in the prefix tuple to point to postfix tuple.
    1847                 :             :          * (We can't avoid this step by doing the above two steps in opposite
    1848                 :             :          * order, because there might not be enough space on the page to insert
    1849                 :             :          * the postfix tuple first.)  We have to update the local copy of the
    1850                 :             :          * prefixTuple too, because that's what will be written to WAL.
    1851                 :             :          */
    1852                 :         212 :         spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
    1853                 :         106 :                                           postfixBlkno, postfixOffset);
    1854                 :         212 :         prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
    1855                 :         106 :                                                                                                  PageGetItemId(current->page, current->offnum));
    1856                 :         212 :         spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
    1857                 :         106 :                                           postfixBlkno, postfixOffset);
    1858                 :             : 
    1859                 :         106 :         MarkBufferDirty(current->buffer);
    1860                 :             : 
    1861   [ +  -  +  -  :         106 :         if (RelationNeedsWAL(index) && !state->isBuild)
             +  +  +  + ]
    1862                 :             :         {
    1863                 :         101 :                 XLogRecPtr      recptr;
    1864                 :             : 
    1865                 :         101 :                 XLogBeginInsert();
    1866                 :         101 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
    1867                 :         101 :                 XLogRegisterData(prefixTuple, prefixTuple->size);
    1868                 :         101 :                 XLogRegisterData(postfixTuple, postfixTuple->size);
    1869                 :             : 
    1870                 :         101 :                 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
    1871         [ +  + ]:         101 :                 if (newBuffer != InvalidBuffer)
    1872                 :             :                 {
    1873                 :          27 :                         int                     flags;
    1874                 :             : 
    1875                 :          27 :                         flags = REGBUF_STANDARD;
    1876         [ +  + ]:          27 :                         if (xlrec.newPage)
    1877                 :           1 :                                 flags |= REGBUF_WILL_INIT;
    1878                 :          27 :                         XLogRegisterBuffer(1, newBuffer, flags);
    1879                 :          27 :                 }
    1880                 :             : 
    1881                 :         101 :                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
    1882                 :             : 
    1883                 :         101 :                 PageSetLSN(current->page, recptr);
    1884                 :             : 
    1885         [ +  + ]:         101 :                 if (newBuffer != InvalidBuffer)
    1886                 :             :                 {
    1887                 :          27 :                         PageSetLSN(BufferGetPage(newBuffer), recptr);
    1888                 :          27 :                 }
    1889                 :         101 :         }
    1890                 :             : 
    1891         [ +  - ]:         308 :         END_CRIT_SECTION();
    1892                 :             : 
    1893                 :             :         /* Update local free-space cache and release buffer */
    1894         [ +  + ]:         308 :         if (newBuffer != InvalidBuffer)
    1895                 :             :         {
    1896                 :          28 :                 SpGistSetLastUsedPage(index, newBuffer);
    1897                 :          28 :                 UnlockReleaseBuffer(newBuffer);
    1898                 :          28 :         }
    1899                 :         308 : }
    1900                 :             : 
    1901                 :             : /*
    1902                 :             :  * Insert one item into the index.
    1903                 :             :  *
    1904                 :             :  * Returns true on success, false if we failed to complete the insertion
    1905                 :             :  * (typically because of conflict with a concurrent insert).  In the latter
    1906                 :             :  * case, caller should re-call spgdoinsert() with the same args.
    1907                 :             :  */
    1908                 :             : bool
    1909                 :      131932 : spgdoinsert(Relation index, SpGistState *state,
    1910                 :             :                         const ItemPointerData *heapPtr, const Datum *datums, const bool *isnulls)
    1911                 :             : {
    1912                 :      131932 :         bool            result = true;
    1913                 :      131932 :         TupleDesc       leafDescriptor = state->leafTupDesc;
    1914                 :      131932 :         bool            isnull = isnulls[spgKeyColumn];
    1915                 :      131932 :         int                     level = 0;
    1916                 :      131932 :         Datum           leafDatums[INDEX_MAX_KEYS];
    1917                 :      131932 :         int                     leafSize;
    1918                 :      131932 :         int                     bestLeafSize;
    1919                 :      131932 :         int                     numNoProgressCycles = 0;
    1920                 :      131932 :         SPPageDesc      current,
    1921                 :             :                                 parent;
    1922                 :      131932 :         FmgrInfo   *procinfo = NULL;
    1923                 :             : 
    1924                 :             :         /*
    1925                 :             :          * Look up FmgrInfo of the user-defined choose function once, to save
    1926                 :             :          * cycles in the loop below.
    1927                 :             :          */
    1928         [ +  + ]:      131932 :         if (!isnull)
    1929                 :      131917 :                 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
    1930                 :             : 
    1931                 :             :         /*
    1932                 :             :          * Prepare the leaf datum to insert.
    1933                 :             :          *
    1934                 :             :          * If an optional "compress" method is provided, then call it to form the
    1935                 :             :          * leaf key datum from the input datum.  Otherwise, store the input datum
    1936                 :             :          * as is.  Since we don't use index_form_tuple in this AM, we have to make
    1937                 :             :          * sure value to be inserted is not toasted; FormIndexDatum doesn't
    1938                 :             :          * guarantee that.  But we assume the "compress" method to return an
    1939                 :             :          * untoasted value.
    1940                 :             :          */
    1941         [ +  + ]:      131932 :         if (!isnull)
    1942                 :             :         {
    1943         [ +  + ]:      131917 :                 if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
    1944                 :             :                 {
    1945                 :       11000 :                         FmgrInfo   *compressProcinfo = NULL;
    1946                 :             : 
    1947                 :       11000 :                         compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
    1948                 :       11000 :                         leafDatums[spgKeyColumn] =
    1949                 :       22000 :                                 FunctionCall1Coll(compressProcinfo,
    1950                 :       11000 :                                                                   index->rd_indcollation[spgKeyColumn],
    1951                 :       11000 :                                                                   datums[spgKeyColumn]);
    1952                 :       11000 :                 }
    1953                 :             :                 else
    1954                 :             :                 {
    1955         [ +  - ]:      120917 :                         Assert(state->attLeafType.type == state->attType.type);
    1956                 :             : 
    1957         [ +  + ]:      120917 :                         if (state->attType.attlen == -1)
    1958                 :       29872 :                                 leafDatums[spgKeyColumn] =
    1959                 :       29872 :                                         PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
    1960                 :             :                         else
    1961                 :       91045 :                                 leafDatums[spgKeyColumn] = datums[spgKeyColumn];
    1962                 :             :                 }
    1963                 :      131917 :         }
    1964                 :             :         else
    1965                 :          15 :                 leafDatums[spgKeyColumn] = (Datum) 0;
    1966                 :             : 
    1967                 :             :         /* Likewise, ensure that any INCLUDE values are not toasted */
    1968         [ +  + ]:      142935 :         for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
    1969                 :             :         {
    1970         [ +  + ]:       11003 :                 if (!isnulls[i])
    1971                 :             :                 {
    1972         [ +  - ]:       11000 :                         if (TupleDescCompactAttr(leafDescriptor, i)->attlen == -1)
    1973                 :           0 :                                 leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
    1974                 :             :                         else
    1975                 :       11000 :                                 leafDatums[i] = datums[i];
    1976                 :       11000 :                 }
    1977                 :             :                 else
    1978                 :           3 :                         leafDatums[i] = (Datum) 0;
    1979                 :       11003 :         }
    1980                 :             : 
    1981                 :             :         /*
    1982                 :             :          * Compute space needed for a leaf tuple containing the given data.
    1983                 :             :          */
    1984                 :      131932 :         leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
    1985                 :             :         /* Account for an item pointer, too */
    1986                 :      131932 :         leafSize += sizeof(ItemIdData);
    1987                 :             : 
    1988                 :             :         /*
    1989                 :             :          * If it isn't gonna fit, and the opclass can't reduce the datum size by
    1990                 :             :          * suffixing, bail out now rather than doing a lot of useless work.
    1991                 :             :          */
    1992         [ +  - ]:      131932 :         if (leafSize > SPGIST_PAGE_CAPACITY &&
    1993         [ #  # ]:           0 :                 (isnull || !state->config.longValuesOK))
    1994   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1995                 :             :                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    1996                 :             :                                  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
    1997                 :             :                                                 leafSize - sizeof(ItemIdData),
    1998                 :             :                                                 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
    1999                 :             :                                                 RelationGetRelationName(index)),
    2000                 :             :                                  errhint("Values larger than a buffer page cannot be indexed.")));
    2001                 :      131932 :         bestLeafSize = leafSize;
    2002                 :             : 
    2003                 :             :         /* Initialize "current" to the appropriate root page */
    2004                 :      131932 :         current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
    2005                 :      131932 :         current.buffer = InvalidBuffer;
    2006                 :      131932 :         current.page = NULL;
    2007                 :      131932 :         current.offnum = FirstOffsetNumber;
    2008                 :      131932 :         current.node = -1;
    2009                 :             : 
    2010                 :             :         /* "parent" is invalid for the moment */
    2011                 :      131932 :         parent.blkno = InvalidBlockNumber;
    2012                 :      131932 :         parent.buffer = InvalidBuffer;
    2013                 :      131932 :         parent.page = NULL;
    2014                 :      131932 :         parent.offnum = InvalidOffsetNumber;
    2015                 :      131932 :         parent.node = -1;
    2016                 :             : 
    2017                 :             :         /*
    2018                 :             :          * Before entering the loop, try to clear any pending interrupt condition.
    2019                 :             :          * If a query cancel is pending, we might as well accept it now not later;
    2020                 :             :          * while if a non-canceling condition is pending, servicing it here avoids
    2021                 :             :          * having to restart the insertion and redo all the work so far.
    2022                 :             :          */
    2023         [ +  - ]:      131932 :         CHECK_FOR_INTERRUPTS();
    2024                 :             : 
    2025                 :     3237982 :         for (;;)
    2026                 :             :         {
    2027                 :     3237982 :                 bool            isNew = false;
    2028                 :             : 
    2029                 :             :                 /*
    2030                 :             :                  * Bail out if query cancel is pending.  We must have this somewhere
    2031                 :             :                  * in the loop since a broken opclass could produce an infinite
    2032                 :             :                  * picksplit loop.  However, because we'll be holding buffer lock(s)
    2033                 :             :                  * after the first iteration, ProcessInterrupts() wouldn't be able to
    2034                 :             :                  * throw a cancel error here.  Hence, if we see that an interrupt is
    2035                 :             :                  * pending, break out of the loop and deal with the situation below.
    2036                 :             :                  * Set result = false because we must restart the insertion if the
    2037                 :             :                  * interrupt isn't a query-cancel-or-die case.
    2038                 :             :                  */
    2039         [ -  + ]:     3237982 :                 if (INTERRUPTS_PENDING_CONDITION())
    2040                 :             :                 {
    2041                 :           0 :                         result = false;
    2042                 :           0 :                         break;
    2043                 :             :                 }
    2044                 :             : 
    2045         [ +  + ]:     3237982 :                 if (current.blkno == InvalidBlockNumber)
    2046                 :             :                 {
    2047                 :             :                         /*
    2048                 :             :                          * Create a leaf page.  If leafSize is too large to fit on a page,
    2049                 :             :                          * we won't actually use the page yet, but it simplifies the API
    2050                 :             :                          * for doPickSplit to always have a leaf page at hand; so just
    2051                 :             :                          * quietly limit our request to a page size.
    2052                 :             :                          */
    2053                 :         304 :                         current.buffer =
    2054                 :         608 :                                 SpGistGetBuffer(index,
    2055                 :         304 :                                                                 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
    2056         [ +  - ]:         304 :                                                                 Min(leafSize, SPGIST_PAGE_CAPACITY),
    2057                 :             :                                                                 &isNew);
    2058                 :         304 :                         current.blkno = BufferGetBlockNumber(current.buffer);
    2059                 :         304 :                 }
    2060         [ +  + ]:     3237678 :                 else if (parent.buffer == InvalidBuffer)
    2061                 :             :                 {
    2062                 :             :                         /* we hold no parent-page lock, so no deadlock is possible */
    2063                 :      131932 :                         current.buffer = ReadBuffer(index, current.blkno);
    2064                 :      131932 :                         LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
    2065                 :      131932 :                 }
    2066         [ +  + ]:     3105746 :                 else if (current.blkno != parent.blkno)
    2067                 :             :                 {
    2068                 :             :                         /* descend to a new child page */
    2069                 :      258568 :                         current.buffer = ReadBuffer(index, current.blkno);
    2070                 :             : 
    2071                 :             :                         /*
    2072                 :             :                          * Attempt to acquire lock on child page.  We must beware of
    2073                 :             :                          * deadlock against another insertion process descending from that
    2074                 :             :                          * page to our parent page (see README).  If we fail to get lock,
    2075                 :             :                          * abandon the insertion and tell our caller to start over.
    2076                 :             :                          *
    2077                 :             :                          * XXX this could be improved, because failing to get lock on a
    2078                 :             :                          * buffer is not proof of a deadlock situation; the lock might be
    2079                 :             :                          * held by a reader, or even just background writer/checkpointer
    2080                 :             :                          * process.  Perhaps it'd be worth retrying after sleeping a bit?
    2081                 :             :                          */
    2082         [ +  - ]:      258568 :                         if (!ConditionalLockBuffer(current.buffer))
    2083                 :             :                         {
    2084                 :           0 :                                 ReleaseBuffer(current.buffer);
    2085                 :           0 :                                 UnlockReleaseBuffer(parent.buffer);
    2086                 :           0 :                                 return false;
    2087                 :             :                         }
    2088                 :      258568 :                 }
    2089                 :             :                 else
    2090                 :             :                 {
    2091                 :             :                         /* inner tuple can be stored on the same page as parent one */
    2092                 :     2847178 :                         current.buffer = parent.buffer;
    2093                 :             :                 }
    2094                 :     3237982 :                 current.page = BufferGetPage(current.buffer);
    2095                 :             : 
    2096                 :             :                 /* should not arrive at a page of the wrong type */
    2097   [ +  +  +  - ]:     3237982 :                 if (isnull ? !SpGistPageStoresNulls(current.page) :
    2098                 :     3237967 :                         SpGistPageStoresNulls(current.page))
    2099   [ #  #  #  # ]:           0 :                         elog(ERROR, "SPGiST index page %u has wrong nulls flag",
    2100                 :             :                                  current.blkno);
    2101                 :             : 
    2102         [ +  + ]:     3237982 :                 if (SpGistPageIsLeaf(current.page))
    2103                 :             :                 {
    2104                 :      131968 :                         SpGistLeafTuple leafTuple;
    2105                 :      131968 :                         int                     nToSplit,
    2106                 :             :                                                 sizeToSplit;
    2107                 :             : 
    2108                 :      131968 :                         leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
    2109         [ +  + ]:      263936 :                         if (leafTuple->size + sizeof(ItemIdData) <=
    2110         [ +  + ]:      131968 :                                 SpGistPageGetFreeSpace(current.page, 1))
    2111                 :             :                         {
    2112                 :             :                                 /* it fits on page, so insert it and we're done */
    2113                 :      261378 :                                 addLeafTuple(index, state, leafTuple,
    2114                 :      130689 :                                                          &current, &parent, isnull, isNew);
    2115                 :      130689 :                                 break;
    2116                 :             :                         }
    2117                 :        1279 :                         else if ((sizeToSplit =
    2118                 :        1279 :                                           checkSplitConditions(index, state, &current,
    2119         [ +  + ]:        1279 :                                                                                    &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
    2120   [ +  +  -  + ]:         582 :                                          nToSplit < 64 &&
    2121                 :         357 :                                          leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
    2122                 :             :                         {
    2123                 :             :                                 /*
    2124                 :             :                                  * the amount of data is pretty small, so just move the whole
    2125                 :             :                                  * chain to another leaf page rather than splitting it.
    2126                 :             :                                  */
    2127         [ +  - ]:         357 :                                 Assert(!isNew);
    2128                 :         357 :                                 moveLeafs(index, state, &current, &parent, leafTuple, isnull);
    2129                 :         357 :                                 break;                  /* we're done */
    2130                 :             :                         }
    2131                 :             :                         else
    2132                 :             :                         {
    2133                 :             :                                 /* picksplit */
    2134   [ +  +  +  + ]:        1844 :                                 if (doPickSplit(index, state, &current, &parent,
    2135                 :         922 :                                                                 leafTuple, level, isnull, isNew))
    2136                 :         886 :                                         break;          /* doPickSplit installed new tuples */
    2137                 :             : 
    2138                 :             :                                 /* leaf tuple will not be inserted yet */
    2139                 :          36 :                                 pfree(leafTuple);
    2140                 :             : 
    2141                 :             :                                 /*
    2142                 :             :                                  * current now describes new inner tuple, go insert into it
    2143                 :             :                                  */
    2144         [ -  + ]:          36 :                                 Assert(!SpGistPageIsLeaf(current.page));
    2145                 :          36 :                                 goto process_inner_tuple;
    2146                 :             :                         }
    2147         [ +  + ]:      131968 :                 }
    2148                 :             :                 else                                    /* non-leaf page */
    2149                 :             :                 {
    2150                 :             :                         /*
    2151                 :             :                          * Apply the opclass choose function to figure out how to insert
    2152                 :             :                          * the given datum into the current inner tuple.
    2153                 :             :                          */
    2154                 :             :                         SpGistInnerTuple innerTuple;
    2155                 :             :                         spgChooseIn in;
    2156                 :     3106014 :                         spgChooseOut out;
    2157                 :             : 
    2158                 :             :                         /*
    2159                 :             :                          * spgAddNode and spgSplitTuple cases will loop back to here to
    2160                 :             :                          * complete the insertion operation.  Just in case the choose
    2161                 :             :                          * function is broken and produces add or split requests
    2162                 :             :                          * repeatedly, check for query cancel (see comments above).
    2163                 :             :                          */
    2164                 :             :         process_inner_tuple:
    2165         [ +  - ]:     3106384 :                         if (INTERRUPTS_PENDING_CONDITION())
    2166                 :             :                         {
    2167                 :           0 :                                 result = false;
    2168                 :           0 :                                 break;
    2169                 :             :                         }
    2170                 :             : 
    2171                 :     6212768 :                         innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
    2172                 :     3106384 :                                                                                                                 PageGetItemId(current.page, current.offnum));
    2173                 :             : 
    2174                 :     3106384 :                         in.datum = datums[spgKeyColumn];
    2175                 :     3106384 :                         in.leafDatum = leafDatums[spgKeyColumn];
    2176                 :     3106384 :                         in.level = level;
    2177                 :     3106384 :                         in.allTheSame = innerTuple->allTheSame;
    2178                 :     3106384 :                         in.hasPrefix = (innerTuple->prefixSize > 0);
    2179   [ +  +  +  + ]:     3106384 :                         in.prefixDatum = SGITDATUM(innerTuple, state);
    2180                 :     3106384 :                         in.nNodes = innerTuple->nNodes;
    2181                 :     3106384 :                         in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
    2182                 :             : 
    2183                 :     3106384 :                         memset(&out, 0, sizeof(out));
    2184                 :             : 
    2185         [ -  + ]:     3106384 :                         if (!isnull)
    2186                 :             :                         {
    2187                 :             :                                 /* use user-defined choose method */
    2188                 :     6212768 :                                 FunctionCall2Coll(procinfo,
    2189                 :     3106384 :                                                                   index->rd_indcollation[0],
    2190                 :     3106384 :                                                                   PointerGetDatum(&in),
    2191                 :     3106384 :                                                                   PointerGetDatum(&out));
    2192                 :     3106384 :                         }
    2193                 :             :                         else
    2194                 :             :                         {
    2195                 :             :                                 /* force "match" action (to insert to random subnode) */
    2196                 :           0 :                                 out.resultType = spgMatchNode;
    2197                 :             :                         }
    2198                 :             : 
    2199         [ +  + ]:     3106384 :                         if (innerTuple->allTheSame)
    2200                 :             :                         {
    2201                 :             :                                 /*
    2202                 :             :                                  * It's not allowed to do an AddNode at an allTheSame tuple.
    2203                 :             :                                  * Opclass must say "match", in which case we choose a random
    2204                 :             :                                  * one of the nodes to descend into, or "split".
    2205                 :             :                                  */
    2206         [ -  + ]:       23799 :                                 if (out.resultType == spgAddNode)
    2207   [ #  #  #  # ]:           0 :                                         elog(ERROR, "cannot add a node to an allTheSame inner tuple");
    2208         [ +  + ]:       23799 :                                 else if (out.resultType == spgMatchNode)
    2209                 :       23797 :                                         out.result.matchNode.nodeN =
    2210                 :       23797 :                                                 pg_prng_uint64_range(&pg_global_prng_state,
    2211                 :       23797 :                                                                                          0, innerTuple->nNodes - 1);
    2212                 :       23799 :                         }
    2213                 :             : 
    2214   [ +  +  +  - ]:     3106384 :                         switch (out.resultType)
    2215                 :             :                         {
    2216                 :             :                                 case spgMatchNode:
    2217                 :             :                                         /* Descend to N'th child node */
    2218                 :     6212100 :                                         spgMatchNodeAction(index, state, innerTuple,
    2219                 :             :                                                                            &current, &parent,
    2220                 :     3106050 :                                                                            out.result.matchNode.nodeN);
    2221                 :             :                                         /* Adjust level as per opclass request */
    2222                 :     3106050 :                                         level += out.result.matchNode.levelAdd;
    2223                 :             :                                         /* Replace leafDatum and recompute leafSize */
    2224         [ -  + ]:     3106050 :                                         if (!isnull)
    2225                 :             :                                         {
    2226                 :     3106050 :                                                 leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
    2227                 :     6212100 :                                                 leafSize = SpGistGetLeafTupleSize(leafDescriptor,
    2228                 :     3106050 :                                                                                                                   leafDatums, isnulls);
    2229                 :     3106050 :                                                 leafSize += sizeof(ItemIdData);
    2230                 :     3106050 :                                         }
    2231                 :             : 
    2232                 :             :                                         /*
    2233                 :             :                                          * Check new tuple size; fail if it can't fit, unless the
    2234                 :             :                                          * opclass says it can handle the situation by suffixing.
    2235                 :             :                                          *
    2236                 :             :                                          * However, the opclass can only shorten the leaf datum,
    2237                 :             :                                          * which may not be enough to ever make the tuple fit,
    2238                 :             :                                          * since INCLUDE columns might alone use more than a page.
    2239                 :             :                                          * Depending on the opclass' behavior, that could lead to
    2240                 :             :                                          * an infinite loop --- spgtextproc.c, for example, will
    2241                 :             :                                          * just repeatedly generate an empty-string leaf datum
    2242                 :             :                                          * once it runs out of data.  Actual bugs in opclasses
    2243                 :             :                                          * might cause infinite looping, too.  To detect such a
    2244                 :             :                                          * loop, check to see if we are making progress by
    2245                 :             :                                          * reducing the leafSize in each pass.  This is a bit
    2246                 :             :                                          * tricky though.  Because of alignment considerations,
    2247                 :             :                                          * the total tuple size might not decrease on every pass.
    2248                 :             :                                          * Also, there are edge cases where the choose method
    2249                 :             :                                          * might seem to not make progress for a cycle or two.
    2250                 :             :                                          * Somewhat arbitrarily, we allow up to 10 no-progress
    2251                 :             :                                          * iterations before failing.  (This limit should be more
    2252                 :             :                                          * than MAXALIGN, to accommodate opclasses that trim one
    2253                 :             :                                          * byte from the leaf datum per pass.)
    2254                 :             :                                          */
    2255         [ +  - ]:     3106050 :                                         if (leafSize > SPGIST_PAGE_CAPACITY)
    2256                 :             :                                         {
    2257                 :           0 :                                                 bool            ok = false;
    2258                 :             : 
    2259   [ #  #  #  # ]:           0 :                                                 if (state->config.longValuesOK && !isnull)
    2260                 :             :                                                 {
    2261         [ #  # ]:           0 :                                                         if (leafSize < bestLeafSize)
    2262                 :             :                                                         {
    2263                 :           0 :                                                                 ok = true;
    2264                 :           0 :                                                                 bestLeafSize = leafSize;
    2265                 :           0 :                                                                 numNoProgressCycles = 0;
    2266                 :           0 :                                                         }
    2267         [ #  # ]:           0 :                                                         else if (++numNoProgressCycles < 10)
    2268                 :           0 :                                                                 ok = true;
    2269                 :           0 :                                                 }
    2270         [ #  # ]:           0 :                                                 if (!ok)
    2271   [ #  #  #  # ]:           0 :                                                         ereport(ERROR,
    2272                 :             :                                                                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2273                 :             :                                                                          errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
    2274                 :             :                                                                                         leafSize - sizeof(ItemIdData),
    2275                 :             :                                                                                         SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
    2276                 :             :                                                                                         RelationGetRelationName(index)),
    2277                 :             :                                                                          errhint("Values larger than a buffer page cannot be indexed.")));
    2278                 :           0 :                                         }
    2279                 :             : 
    2280                 :             :                                         /*
    2281                 :             :                                          * Loop around and attempt to insert the new leafDatum at
    2282                 :             :                                          * "current" (which might reference an existing child
    2283                 :             :                                          * tuple, or might be invalid to force us to find a new
    2284                 :             :                                          * page for the tuple).
    2285                 :             :                                          */
    2286                 :     3106050 :                                         break;
    2287                 :             :                                 case spgAddNode:
    2288                 :             :                                         /* AddNode is not sensible if nodes don't have labels */
    2289         [ +  - ]:         228 :                                         if (in.nodeLabels == NULL)
    2290   [ #  #  #  # ]:           0 :                                                 elog(ERROR, "cannot add a node to an inner tuple without node labels");
    2291                 :             :                                         /* Add node to inner tuple, per request */
    2292                 :         456 :                                         spgAddNodeAction(index, state, innerTuple,
    2293                 :             :                                                                          &current, &parent,
    2294                 :         228 :                                                                          out.result.addNode.nodeN,
    2295                 :         228 :                                                                          out.result.addNode.nodeLabel);
    2296                 :             : 
    2297                 :             :                                         /*
    2298                 :             :                                          * Retry insertion into the enlarged node.  We assume that
    2299                 :             :                                          * we'll get a MatchNode result this time.
    2300                 :             :                                          */
    2301                 :         228 :                                         goto process_inner_tuple;
    2302                 :             :                                         break;
    2303                 :             :                                 case spgSplitTuple:
    2304                 :             :                                         /* Split inner tuple, per request */
    2305                 :         106 :                                         spgSplitNodeAction(index, state, innerTuple,
    2306                 :             :                                                                            &current, &out);
    2307                 :             : 
    2308                 :             :                                         /* Retry insertion into the split node */
    2309                 :         106 :                                         goto process_inner_tuple;
    2310                 :             :                                         break;
    2311                 :             :                                 default:
    2312   [ #  #  #  # ]:           0 :                                         elog(ERROR, "unrecognized SPGiST choose result: %d",
    2313                 :             :                                                  (int) out.resultType);
    2314                 :           0 :                                         break;
    2315                 :             :                         }
    2316                 :             :                 }
    2317      [ -  +  + ]:     3237982 :         }                                                       /* end loop */
    2318                 :             : 
    2319                 :             :         /*
    2320                 :             :          * Release any buffers we're still holding.  Beware of possibility that
    2321                 :             :          * current and parent reference same buffer.
    2322                 :             :          */
    2323         [ +  - ]:      131932 :         if (current.buffer != InvalidBuffer)
    2324                 :             :         {
    2325                 :      131932 :                 SpGistSetLastUsedPage(index, current.buffer);
    2326                 :      131932 :                 UnlockReleaseBuffer(current.buffer);
    2327                 :      131932 :         }
    2328   [ +  +  +  + ]:      131932 :         if (parent.buffer != InvalidBuffer &&
    2329                 :      129264 :                 parent.buffer != current.buffer)
    2330                 :             :         {
    2331                 :      128496 :                 SpGistSetLastUsedPage(index, parent.buffer);
    2332                 :      128496 :                 UnlockReleaseBuffer(parent.buffer);
    2333                 :      128496 :         }
    2334                 :             : 
    2335                 :             :         /*
    2336                 :             :          * We do not support being called while some outer function is holding a
    2337                 :             :          * buffer lock (or any other reason to postpone query cancels).  If that
    2338                 :             :          * were the case, telling the caller to retry would create an infinite
    2339                 :             :          * loop.
    2340                 :             :          */
    2341         [ +  - ]:      131932 :         Assert(INTERRUPTS_CAN_BE_PROCESSED());
    2342                 :             : 
    2343                 :             :         /*
    2344                 :             :          * Finally, check for interrupts again.  If there was a query cancel,
    2345                 :             :          * ProcessInterrupts() will be able to throw the error here.  If it was
    2346                 :             :          * some other kind of interrupt that can just be cleared, return false to
    2347                 :             :          * tell our caller to retry.
    2348                 :             :          */
    2349         [ +  - ]:      131932 :         CHECK_FOR_INTERRUPTS();
    2350                 :             : 
    2351                 :      131932 :         return result;
    2352                 :      131932 : }
        

Generated by: LCOV version 2.3.2-1