Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * spgxlog.c
4 : : * WAL replay logic for SP-GiST
5 : : *
6 : : *
7 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/spgist/spgxlog.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : #include "postgres.h"
16 : :
17 : : #include "access/bufmask.h"
18 : : #include "access/spgist_private.h"
19 : : #include "access/spgxlog.h"
20 : : #include "access/xlogutils.h"
21 : : #include "storage/standby.h"
22 : : #include "utils/memutils.h"
23 : :
24 : :
25 : : static MemoryContext opCtx; /* working memory for operations */
26 : :
27 : :
28 : : /*
29 : : * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30 : : *
31 : : * At present, all we need is enough info to support spgFormDeadTuple(),
32 : : * plus the isBuild flag.
33 : : */
34 : : static void
35 : 0 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
36 : : {
37 : 0 : memset(state, 0, sizeof(*state));
38 : :
39 : 0 : state->redirectXid = stateSrc.redirectXid;
40 : 0 : state->isBuild = stateSrc.isBuild;
41 : 0 : state->deadTupleStorage = palloc0(SGDTSIZE);
42 : 0 : }
43 : :
44 : : /*
45 : : * Add a leaf tuple, or replace an existing placeholder tuple. This is used
46 : : * to replay SpGistPageAddNewItem() operations. If the offset points at an
47 : : * existing tuple, it had better be a placeholder tuple.
48 : : */
49 : : static void
50 : 0 : addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
51 : : {
52 [ # # ]: 0 : if (offset <= PageGetMaxOffsetNumber(page))
53 : : {
54 : 0 : SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
55 : 0 : PageGetItemId(page, offset));
56 : :
57 [ # # ]: 0 : if (dt->tupstate != SPGIST_PLACEHOLDER)
58 [ # # # # ]: 0 : elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59 : :
60 [ # # ]: 0 : Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61 : 0 : SpGistPageGetOpaque(page)->nPlaceholder--;
62 : :
63 : 0 : PageIndexTupleDelete(page, offset);
64 : 0 : }
65 : :
66 [ # # ]: 0 : Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67 : :
68 [ # # ]: 0 : if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69 [ # # # # ]: 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
70 : : size);
71 : 0 : }
72 : :
73 : : static void
74 : 0 : spgRedoAddLeaf(XLogReaderState *record)
75 : : {
76 : 0 : XLogRecPtr lsn = record->EndRecPtr;
77 : 0 : char *ptr = XLogRecGetData(record);
78 : 0 : spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
79 : 0 : char *leafTuple;
80 : 0 : SpGistLeafTupleData leafTupleHdr;
81 : 0 : Buffer buffer;
82 : 0 : Page page;
83 : 0 : XLogRedoAction action;
84 : :
85 : 0 : ptr += sizeof(spgxlogAddLeaf);
86 : 0 : leafTuple = ptr;
87 : : /* the leaf tuple is unaligned, so make a copy to access its header */
88 : 0 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
89 : :
90 : : /*
91 : : * In normal operation we would have both current and parent pages locked
92 : : * simultaneously; but in WAL replay it should be safe to update the leaf
93 : : * page before updating the parent.
94 : : */
95 [ # # ]: 0 : if (xldata->newPage)
96 : : {
97 : 0 : buffer = XLogInitBufferForRedo(record, 0);
98 : 0 : SpGistInitBuffer(buffer,
99 : 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
100 : 0 : action = BLK_NEEDS_REDO;
101 : 0 : }
102 : : else
103 : 0 : action = XLogReadBufferForRedo(record, 0, &buffer);
104 : :
105 [ # # ]: 0 : if (action == BLK_NEEDS_REDO)
106 : : {
107 : 0 : page = BufferGetPage(buffer);
108 : :
109 : : /* insert new tuple */
110 [ # # ]: 0 : if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
111 : : {
112 : : /* normal cases, tuple was added by SpGistPageAddNewItem */
113 : 0 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, xldata->offnumLeaf);
114 : :
115 : : /* update head tuple's chain link if needed */
116 [ # # ]: 0 : if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
117 : : {
118 : 0 : SpGistLeafTuple head;
119 : :
120 : 0 : head = (SpGistLeafTuple) PageGetItem(page,
121 : 0 : PageGetItemId(page, xldata->offnumHeadLeaf));
122 [ # # ]: 0 : Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
123 : 0 : SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
124 : 0 : }
125 : 0 : }
126 : : else
127 : : {
128 : : /* replacing a DEAD tuple */
129 : 0 : PageIndexTupleDelete(page, xldata->offnumLeaf);
130 : 0 : if (PageAddItem(page,
131 : : leafTuple, leafTupleHdr.size,
132 [ # # ]: 0 : xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
133 [ # # # # ]: 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
134 : : leafTupleHdr.size);
135 : : }
136 : :
137 : 0 : PageSetLSN(page, lsn);
138 : 0 : MarkBufferDirty(buffer);
139 : 0 : }
140 [ # # ]: 0 : if (BufferIsValid(buffer))
141 : 0 : UnlockReleaseBuffer(buffer);
142 : :
143 : : /* update parent downlink if necessary */
144 [ # # ]: 0 : if (xldata->offnumParent != InvalidOffsetNumber)
145 : : {
146 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
147 : : {
148 : 0 : SpGistInnerTuple tuple;
149 : 0 : BlockNumber blknoLeaf;
150 : :
151 : 0 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
152 : :
153 : 0 : page = BufferGetPage(buffer);
154 : :
155 : 0 : tuple = (SpGistInnerTuple) PageGetItem(page,
156 : 0 : PageGetItemId(page, xldata->offnumParent));
157 : :
158 : 0 : spgUpdateNodeLink(tuple, xldata->nodeI,
159 : 0 : blknoLeaf, xldata->offnumLeaf);
160 : :
161 : 0 : PageSetLSN(page, lsn);
162 : 0 : MarkBufferDirty(buffer);
163 : 0 : }
164 [ # # ]: 0 : if (BufferIsValid(buffer))
165 : 0 : UnlockReleaseBuffer(buffer);
166 : 0 : }
167 : 0 : }
168 : :
169 : : static void
170 : 0 : spgRedoMoveLeafs(XLogReaderState *record)
171 : : {
172 : 0 : XLogRecPtr lsn = record->EndRecPtr;
173 : 0 : char *ptr = XLogRecGetData(record);
174 : 0 : spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
175 : 0 : SpGistState state;
176 : 0 : OffsetNumber *toDelete;
177 : 0 : OffsetNumber *toInsert;
178 : 0 : int nInsert;
179 : 0 : Buffer buffer;
180 : 0 : Page page;
181 : 0 : XLogRedoAction action;
182 : 0 : BlockNumber blknoDst;
183 : :
184 : 0 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
185 : :
186 : 0 : fillFakeState(&state, xldata->stateSrc);
187 : :
188 [ # # ]: 0 : nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
189 : :
190 : 0 : ptr += SizeOfSpgxlogMoveLeafs;
191 : 0 : toDelete = (OffsetNumber *) ptr;
192 : 0 : ptr += sizeof(OffsetNumber) * xldata->nMoves;
193 : 0 : toInsert = (OffsetNumber *) ptr;
194 : 0 : ptr += sizeof(OffsetNumber) * nInsert;
195 : :
196 : : /* now ptr points to the list of leaf tuples */
197 : :
198 : : /*
199 : : * In normal operation we would have all three pages (source, dest, and
200 : : * parent) locked simultaneously; but in WAL replay it should be safe to
201 : : * update them one at a time, as long as we do it in the right order.
202 : : */
203 : :
204 : : /* Insert tuples on the dest page (do first, so redirect is valid) */
205 [ # # ]: 0 : if (xldata->newPage)
206 : : {
207 : 0 : buffer = XLogInitBufferForRedo(record, 1);
208 : 0 : SpGistInitBuffer(buffer,
209 : 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
210 : 0 : action = BLK_NEEDS_REDO;
211 : 0 : }
212 : : else
213 : 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
214 : :
215 [ # # ]: 0 : if (action == BLK_NEEDS_REDO)
216 : : {
217 : 0 : int i;
218 : :
219 : 0 : page = BufferGetPage(buffer);
220 : :
221 [ # # ]: 0 : for (i = 0; i < nInsert; i++)
222 : : {
223 : 0 : char *leafTuple;
224 : 0 : SpGistLeafTupleData leafTupleHdr;
225 : :
226 : : /*
227 : : * the tuples are not aligned, so must copy to access the size
228 : : * field.
229 : : */
230 : 0 : leafTuple = ptr;
231 : 0 : memcpy(&leafTupleHdr, leafTuple,
232 : : sizeof(SpGistLeafTupleData));
233 : :
234 : 0 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
235 : 0 : ptr += leafTupleHdr.size;
236 : 0 : }
237 : :
238 : 0 : PageSetLSN(page, lsn);
239 : 0 : MarkBufferDirty(buffer);
240 : 0 : }
241 [ # # ]: 0 : if (BufferIsValid(buffer))
242 : 0 : UnlockReleaseBuffer(buffer);
243 : :
244 : : /* Delete tuples from the source page, inserting a redirection pointer */
245 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
246 : : {
247 : 0 : page = BufferGetPage(buffer);
248 : :
249 : 0 : spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
250 : 0 : state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
251 : : SPGIST_PLACEHOLDER,
252 : 0 : blknoDst,
253 : 0 : toInsert[nInsert - 1]);
254 : :
255 : 0 : PageSetLSN(page, lsn);
256 : 0 : MarkBufferDirty(buffer);
257 : 0 : }
258 [ # # ]: 0 : if (BufferIsValid(buffer))
259 : 0 : UnlockReleaseBuffer(buffer);
260 : :
261 : : /* And update the parent downlink */
262 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
263 : : {
264 : 0 : SpGistInnerTuple tuple;
265 : :
266 : 0 : page = BufferGetPage(buffer);
267 : :
268 : 0 : tuple = (SpGistInnerTuple) PageGetItem(page,
269 : 0 : PageGetItemId(page, xldata->offnumParent));
270 : :
271 : 0 : spgUpdateNodeLink(tuple, xldata->nodeI,
272 : 0 : blknoDst, toInsert[nInsert - 1]);
273 : :
274 : 0 : PageSetLSN(page, lsn);
275 : 0 : MarkBufferDirty(buffer);
276 : 0 : }
277 [ # # ]: 0 : if (BufferIsValid(buffer))
278 : 0 : UnlockReleaseBuffer(buffer);
279 : 0 : }
280 : :
281 : : static void
282 : 0 : spgRedoAddNode(XLogReaderState *record)
283 : : {
284 : 0 : XLogRecPtr lsn = record->EndRecPtr;
285 : 0 : char *ptr = XLogRecGetData(record);
286 : 0 : spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
287 : 0 : char *innerTuple;
288 : 0 : SpGistInnerTupleData innerTupleHdr;
289 : 0 : SpGistState state;
290 : 0 : Buffer buffer;
291 : 0 : Page page;
292 : 0 : XLogRedoAction action;
293 : :
294 : 0 : ptr += sizeof(spgxlogAddNode);
295 : 0 : innerTuple = ptr;
296 : : /* the tuple is unaligned, so make a copy to access its header */
297 : 0 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
298 : :
299 : 0 : fillFakeState(&state, xldata->stateSrc);
300 : :
301 [ # # # # ]: 0 : if (!XLogRecHasBlockRef(record, 1))
302 : : {
303 : : /* update in place */
304 [ # # ]: 0 : Assert(xldata->parentBlk == -1);
305 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
306 : : {
307 : 0 : page = BufferGetPage(buffer);
308 : :
309 : 0 : PageIndexTupleDelete(page, xldata->offnum);
310 : 0 : if (PageAddItem(page, innerTuple, innerTupleHdr.size,
311 : : xldata->offnum,
312 [ # # ]: 0 : false, false) != xldata->offnum)
313 [ # # # # ]: 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
314 : : innerTupleHdr.size);
315 : :
316 : 0 : PageSetLSN(page, lsn);
317 : 0 : MarkBufferDirty(buffer);
318 : 0 : }
319 [ # # ]: 0 : if (BufferIsValid(buffer))
320 : 0 : UnlockReleaseBuffer(buffer);
321 : 0 : }
322 : : else
323 : : {
324 : 0 : BlockNumber blkno;
325 : 0 : BlockNumber blknoNew;
326 : :
327 : 0 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
328 : 0 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
329 : :
330 : : /*
331 : : * In normal operation we would have all three pages (source, dest,
332 : : * and parent) locked simultaneously; but in WAL replay it should be
333 : : * safe to update them one at a time, as long as we do it in the right
334 : : * order. We must insert the new tuple before replacing the old tuple
335 : : * with the redirect tuple.
336 : : */
337 : :
338 : : /* Install new tuple first so redirect is valid */
339 [ # # ]: 0 : if (xldata->newPage)
340 : : {
341 : : /* AddNode is not used for nulls pages */
342 : 0 : buffer = XLogInitBufferForRedo(record, 1);
343 : 0 : SpGistInitBuffer(buffer, 0);
344 : 0 : action = BLK_NEEDS_REDO;
345 : 0 : }
346 : : else
347 : 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
348 [ # # ]: 0 : if (action == BLK_NEEDS_REDO)
349 : : {
350 : 0 : page = BufferGetPage(buffer);
351 : :
352 : 0 : addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumNew);
353 : :
354 : : /*
355 : : * If parent is in this same page, update it now.
356 : : */
357 [ # # ]: 0 : if (xldata->parentBlk == 1)
358 : : {
359 : 0 : SpGistInnerTuple parentTuple;
360 : :
361 : 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
362 : 0 : PageGetItemId(page, xldata->offnumParent));
363 : :
364 : 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
365 : 0 : blknoNew, xldata->offnumNew);
366 : 0 : }
367 : 0 : PageSetLSN(page, lsn);
368 : 0 : MarkBufferDirty(buffer);
369 : 0 : }
370 [ # # ]: 0 : if (BufferIsValid(buffer))
371 : 0 : UnlockReleaseBuffer(buffer);
372 : :
373 : : /* Delete old tuple, replacing it with redirect or placeholder tuple */
374 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
375 : : {
376 : 0 : SpGistDeadTuple dt;
377 : :
378 : 0 : page = BufferGetPage(buffer);
379 : :
380 [ # # ]: 0 : if (state.isBuild)
381 : 0 : dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
382 : : InvalidBlockNumber,
383 : : InvalidOffsetNumber);
384 : : else
385 : 0 : dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
386 : 0 : blknoNew,
387 : 0 : xldata->offnumNew);
388 : :
389 : 0 : PageIndexTupleDelete(page, xldata->offnum);
390 : 0 : if (PageAddItem(page, dt, dt->size,
391 : : xldata->offnum,
392 [ # # ]: 0 : false, false) != xldata->offnum)
393 [ # # # # ]: 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
394 : : dt->size);
395 : :
396 [ # # ]: 0 : if (state.isBuild)
397 : 0 : SpGistPageGetOpaque(page)->nPlaceholder++;
398 : : else
399 : 0 : SpGistPageGetOpaque(page)->nRedirection++;
400 : :
401 : : /*
402 : : * If parent is in this same page, update it now.
403 : : */
404 [ # # ]: 0 : if (xldata->parentBlk == 0)
405 : : {
406 : 0 : SpGistInnerTuple parentTuple;
407 : :
408 : 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
409 : 0 : PageGetItemId(page, xldata->offnumParent));
410 : :
411 : 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
412 : 0 : blknoNew, xldata->offnumNew);
413 : 0 : }
414 : 0 : PageSetLSN(page, lsn);
415 : 0 : MarkBufferDirty(buffer);
416 : 0 : }
417 [ # # ]: 0 : if (BufferIsValid(buffer))
418 : 0 : UnlockReleaseBuffer(buffer);
419 : :
420 : : /*
421 : : * Update parent downlink (if we didn't do it as part of the source or
422 : : * destination page update already).
423 : : */
424 [ # # ]: 0 : if (xldata->parentBlk == 2)
425 : : {
426 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
427 : : {
428 : 0 : SpGistInnerTuple parentTuple;
429 : :
430 : 0 : page = BufferGetPage(buffer);
431 : :
432 : 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
433 : 0 : PageGetItemId(page, xldata->offnumParent));
434 : :
435 : 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
436 : 0 : blknoNew, xldata->offnumNew);
437 : :
438 : 0 : PageSetLSN(page, lsn);
439 : 0 : MarkBufferDirty(buffer);
440 : 0 : }
441 [ # # ]: 0 : if (BufferIsValid(buffer))
442 : 0 : UnlockReleaseBuffer(buffer);
443 : 0 : }
444 : 0 : }
445 : 0 : }
446 : :
447 : : static void
448 : 0 : spgRedoSplitTuple(XLogReaderState *record)
449 : : {
450 : 0 : XLogRecPtr lsn = record->EndRecPtr;
451 : 0 : char *ptr = XLogRecGetData(record);
452 : 0 : spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
453 : 0 : char *prefixTuple;
454 : 0 : SpGistInnerTupleData prefixTupleHdr;
455 : 0 : char *postfixTuple;
456 : 0 : SpGistInnerTupleData postfixTupleHdr;
457 : 0 : Buffer buffer;
458 : 0 : Page page;
459 : 0 : XLogRedoAction action;
460 : :
461 : 0 : ptr += sizeof(spgxlogSplitTuple);
462 : 0 : prefixTuple = ptr;
463 : : /* the prefix tuple is unaligned, so make a copy to access its header */
464 : 0 : memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
465 : 0 : ptr += prefixTupleHdr.size;
466 : 0 : postfixTuple = ptr;
467 : : /* postfix tuple is also unaligned */
468 : 0 : memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
469 : :
470 : : /*
471 : : * In normal operation we would have both pages locked simultaneously; but
472 : : * in WAL replay it should be safe to update them one at a time, as long
473 : : * as we do it in the right order.
474 : : */
475 : :
476 : : /* insert postfix tuple first to avoid dangling link */
477 [ # # ]: 0 : if (!xldata->postfixBlkSame)
478 : : {
479 [ # # ]: 0 : if (xldata->newPage)
480 : : {
481 : 0 : buffer = XLogInitBufferForRedo(record, 1);
482 : : /* SplitTuple is not used for nulls pages */
483 : 0 : SpGistInitBuffer(buffer, 0);
484 : 0 : action = BLK_NEEDS_REDO;
485 : 0 : }
486 : : else
487 : 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
488 [ # # ]: 0 : if (action == BLK_NEEDS_REDO)
489 : : {
490 : 0 : page = BufferGetPage(buffer);
491 : :
492 : 0 : addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
493 : :
494 : 0 : PageSetLSN(page, lsn);
495 : 0 : MarkBufferDirty(buffer);
496 : 0 : }
497 [ # # ]: 0 : if (BufferIsValid(buffer))
498 : 0 : UnlockReleaseBuffer(buffer);
499 : 0 : }
500 : :
501 : : /* now handle the original page */
502 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
503 : : {
504 : 0 : page = BufferGetPage(buffer);
505 : :
506 : 0 : PageIndexTupleDelete(page, xldata->offnumPrefix);
507 : 0 : if (PageAddItem(page, prefixTuple, prefixTupleHdr.size,
508 [ # # ]: 0 : xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
509 [ # # # # ]: 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
510 : : prefixTupleHdr.size);
511 : :
512 [ # # ]: 0 : if (xldata->postfixBlkSame)
513 : 0 : addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
514 : :
515 : 0 : PageSetLSN(page, lsn);
516 : 0 : MarkBufferDirty(buffer);
517 : 0 : }
518 [ # # ]: 0 : if (BufferIsValid(buffer))
519 : 0 : UnlockReleaseBuffer(buffer);
520 : 0 : }
521 : :
522 : : static void
523 : 0 : spgRedoPickSplit(XLogReaderState *record)
524 : : {
525 : 0 : XLogRecPtr lsn = record->EndRecPtr;
526 : 0 : char *ptr = XLogRecGetData(record);
527 : 0 : spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
528 : 0 : char *innerTuple;
529 : 0 : SpGistInnerTupleData innerTupleHdr;
530 : 0 : SpGistState state;
531 : 0 : OffsetNumber *toDelete;
532 : 0 : OffsetNumber *toInsert;
533 : 0 : uint8 *leafPageSelect;
534 : 0 : Buffer srcBuffer;
535 : 0 : Buffer destBuffer;
536 : 0 : Buffer innerBuffer;
537 : 0 : Page srcPage;
538 : 0 : Page destPage;
539 : 0 : Page page;
540 : 0 : int i;
541 : 0 : BlockNumber blknoInner;
542 : 0 : XLogRedoAction action;
543 : :
544 : 0 : XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
545 : :
546 : 0 : fillFakeState(&state, xldata->stateSrc);
547 : :
548 : 0 : ptr += SizeOfSpgxlogPickSplit;
549 : 0 : toDelete = (OffsetNumber *) ptr;
550 : 0 : ptr += sizeof(OffsetNumber) * xldata->nDelete;
551 : 0 : toInsert = (OffsetNumber *) ptr;
552 : 0 : ptr += sizeof(OffsetNumber) * xldata->nInsert;
553 : 0 : leafPageSelect = (uint8 *) ptr;
554 : 0 : ptr += sizeof(uint8) * xldata->nInsert;
555 : :
556 : 0 : innerTuple = ptr;
557 : : /* the inner tuple is unaligned, so make a copy to access its header */
558 : 0 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
559 : 0 : ptr += innerTupleHdr.size;
560 : :
561 : : /* now ptr points to the list of leaf tuples */
562 : :
563 [ # # ]: 0 : if (xldata->isRootSplit)
564 : : {
565 : : /* when splitting root, we touch it only in the guise of new inner */
566 : 0 : srcBuffer = InvalidBuffer;
567 : 0 : srcPage = NULL;
568 : 0 : }
569 [ # # ]: 0 : else if (xldata->initSrc)
570 : : {
571 : : /* just re-init the source page */
572 : 0 : srcBuffer = XLogInitBufferForRedo(record, 0);
573 : 0 : srcPage = BufferGetPage(srcBuffer);
574 : :
575 : 0 : SpGistInitBuffer(srcBuffer,
576 : 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
577 : : /* don't update LSN etc till we're done with it */
578 : 0 : }
579 : : else
580 : : {
581 : : /*
582 : : * Delete the specified tuples from source page. (In case we're in
583 : : * Hot Standby, we need to hold lock on the page till we're done
584 : : * inserting leaf tuples and the new inner tuple, else the added
585 : : * redirect tuple will be a dangling link.)
586 : : */
587 : 0 : srcPage = NULL;
588 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
589 : : {
590 : 0 : srcPage = BufferGetPage(srcBuffer);
591 : :
592 : : /*
593 : : * We have it a bit easier here than in doPickSplit(), because we
594 : : * know the inner tuple's location already, so we can inject the
595 : : * correct redirection tuple now.
596 : : */
597 [ # # ]: 0 : if (!state.isBuild)
598 : 0 : spgPageIndexMultiDelete(&state, srcPage,
599 : 0 : toDelete, xldata->nDelete,
600 : : SPGIST_REDIRECT,
601 : : SPGIST_PLACEHOLDER,
602 : 0 : blknoInner,
603 : 0 : xldata->offnumInner);
604 : : else
605 : 0 : spgPageIndexMultiDelete(&state, srcPage,
606 : 0 : toDelete, xldata->nDelete,
607 : : SPGIST_PLACEHOLDER,
608 : : SPGIST_PLACEHOLDER,
609 : : InvalidBlockNumber,
610 : : InvalidOffsetNumber);
611 : :
612 : : /* don't update LSN etc till we're done with it */
613 : 0 : }
614 : : }
615 : :
616 : : /* try to access dest page if any */
617 [ # # # # ]: 0 : if (!XLogRecHasBlockRef(record, 1))
618 : : {
619 : 0 : destBuffer = InvalidBuffer;
620 : 0 : destPage = NULL;
621 : 0 : }
622 [ # # ]: 0 : else if (xldata->initDest)
623 : : {
624 : : /* just re-init the dest page */
625 : 0 : destBuffer = XLogInitBufferForRedo(record, 1);
626 : 0 : destPage = BufferGetPage(destBuffer);
627 : :
628 : 0 : SpGistInitBuffer(destBuffer,
629 : 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
630 : : /* don't update LSN etc till we're done with it */
631 : 0 : }
632 : : else
633 : : {
634 : : /*
635 : : * We could probably release the page lock immediately in the
636 : : * full-page-image case, but for safety let's hold it till later.
637 : : */
638 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
639 : 0 : destPage = BufferGetPage(destBuffer);
640 : : else
641 : 0 : destPage = NULL; /* don't do any page updates */
642 : : }
643 : :
644 : : /* restore leaf tuples to src and/or dest page */
645 [ # # ]: 0 : for (i = 0; i < xldata->nInsert; i++)
646 : : {
647 : 0 : char *leafTuple;
648 : 0 : SpGistLeafTupleData leafTupleHdr;
649 : :
650 : : /* the tuples are not aligned, so must copy to access the size field. */
651 : 0 : leafTuple = ptr;
652 : 0 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
653 : 0 : ptr += leafTupleHdr.size;
654 : :
655 [ # # ]: 0 : page = leafPageSelect[i] ? destPage : srcPage;
656 [ # # ]: 0 : if (page == NULL)
657 : 0 : continue; /* no need to touch this page */
658 : :
659 : 0 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
660 [ # # # ]: 0 : }
661 : :
662 : : /* Now update src and dest page LSNs if needed */
663 [ # # ]: 0 : if (srcPage != NULL)
664 : : {
665 : 0 : PageSetLSN(srcPage, lsn);
666 : 0 : MarkBufferDirty(srcBuffer);
667 : 0 : }
668 [ # # ]: 0 : if (destPage != NULL)
669 : : {
670 : 0 : PageSetLSN(destPage, lsn);
671 : 0 : MarkBufferDirty(destBuffer);
672 : 0 : }
673 : :
674 : : /* restore new inner tuple */
675 [ # # ]: 0 : if (xldata->initInner)
676 : : {
677 : 0 : innerBuffer = XLogInitBufferForRedo(record, 2);
678 : 0 : SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
679 : 0 : action = BLK_NEEDS_REDO;
680 : 0 : }
681 : : else
682 : 0 : action = XLogReadBufferForRedo(record, 2, &innerBuffer);
683 : :
684 [ # # ]: 0 : if (action == BLK_NEEDS_REDO)
685 : : {
686 : 0 : page = BufferGetPage(innerBuffer);
687 : :
688 : 0 : addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumInner);
689 : :
690 : : /* if inner is also parent, update link while we're here */
691 [ # # ]: 0 : if (xldata->innerIsParent)
692 : : {
693 : 0 : SpGistInnerTuple parent;
694 : :
695 : 0 : parent = (SpGistInnerTuple) PageGetItem(page,
696 : 0 : PageGetItemId(page, xldata->offnumParent));
697 : 0 : spgUpdateNodeLink(parent, xldata->nodeI,
698 : 0 : blknoInner, xldata->offnumInner);
699 : 0 : }
700 : :
701 : 0 : PageSetLSN(page, lsn);
702 : 0 : MarkBufferDirty(innerBuffer);
703 : 0 : }
704 [ # # ]: 0 : if (BufferIsValid(innerBuffer))
705 : 0 : UnlockReleaseBuffer(innerBuffer);
706 : :
707 : : /*
708 : : * Now we can release the leaf-page locks. It's okay to do this before
709 : : * updating the parent downlink.
710 : : */
711 [ # # ]: 0 : if (BufferIsValid(srcBuffer))
712 : 0 : UnlockReleaseBuffer(srcBuffer);
713 [ # # ]: 0 : if (BufferIsValid(destBuffer))
714 : 0 : UnlockReleaseBuffer(destBuffer);
715 : :
716 : : /* update parent downlink, unless we did it above */
717 [ # # # # ]: 0 : if (XLogRecHasBlockRef(record, 3))
718 : : {
719 : 0 : Buffer parentBuffer;
720 : :
721 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
722 : : {
723 : 0 : SpGistInnerTuple parent;
724 : :
725 : 0 : page = BufferGetPage(parentBuffer);
726 : :
727 : 0 : parent = (SpGistInnerTuple) PageGetItem(page,
728 : 0 : PageGetItemId(page, xldata->offnumParent));
729 : 0 : spgUpdateNodeLink(parent, xldata->nodeI,
730 : 0 : blknoInner, xldata->offnumInner);
731 : :
732 : 0 : PageSetLSN(page, lsn);
733 : 0 : MarkBufferDirty(parentBuffer);
734 : 0 : }
735 [ # # ]: 0 : if (BufferIsValid(parentBuffer))
736 : 0 : UnlockReleaseBuffer(parentBuffer);
737 : 0 : }
738 : : else
739 [ # # # # ]: 0 : Assert(xldata->innerIsParent || xldata->isRootSplit);
740 : 0 : }
741 : :
742 : : static void
743 : 0 : spgRedoVacuumLeaf(XLogReaderState *record)
744 : : {
745 : 0 : XLogRecPtr lsn = record->EndRecPtr;
746 : 0 : char *ptr = XLogRecGetData(record);
747 : 0 : spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
748 : 0 : OffsetNumber *toDead;
749 : 0 : OffsetNumber *toPlaceholder;
750 : 0 : OffsetNumber *moveSrc;
751 : 0 : OffsetNumber *moveDest;
752 : 0 : OffsetNumber *chainSrc;
753 : 0 : OffsetNumber *chainDest;
754 : 0 : SpGistState state;
755 : 0 : Buffer buffer;
756 : 0 : Page page;
757 : 0 : int i;
758 : :
759 : 0 : fillFakeState(&state, xldata->stateSrc);
760 : :
761 : 0 : ptr += SizeOfSpgxlogVacuumLeaf;
762 : 0 : toDead = (OffsetNumber *) ptr;
763 : 0 : ptr += sizeof(OffsetNumber) * xldata->nDead;
764 : 0 : toPlaceholder = (OffsetNumber *) ptr;
765 : 0 : ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
766 : 0 : moveSrc = (OffsetNumber *) ptr;
767 : 0 : ptr += sizeof(OffsetNumber) * xldata->nMove;
768 : 0 : moveDest = (OffsetNumber *) ptr;
769 : 0 : ptr += sizeof(OffsetNumber) * xldata->nMove;
770 : 0 : chainSrc = (OffsetNumber *) ptr;
771 : 0 : ptr += sizeof(OffsetNumber) * xldata->nChain;
772 : 0 : chainDest = (OffsetNumber *) ptr;
773 : :
774 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
775 : : {
776 : 0 : page = BufferGetPage(buffer);
777 : :
778 : 0 : spgPageIndexMultiDelete(&state, page,
779 : 0 : toDead, xldata->nDead,
780 : : SPGIST_DEAD, SPGIST_DEAD,
781 : : InvalidBlockNumber,
782 : : InvalidOffsetNumber);
783 : :
784 : 0 : spgPageIndexMultiDelete(&state, page,
785 : 0 : toPlaceholder, xldata->nPlaceholder,
786 : : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
787 : : InvalidBlockNumber,
788 : : InvalidOffsetNumber);
789 : :
790 : : /* see comments in vacuumLeafPage() */
791 [ # # ]: 0 : for (i = 0; i < xldata->nMove; i++)
792 : : {
793 : 0 : ItemId idSrc = PageGetItemId(page, moveSrc[i]);
794 : 0 : ItemId idDest = PageGetItemId(page, moveDest[i]);
795 : 0 : ItemIdData tmp;
796 : :
797 : 0 : tmp = *idSrc;
798 : 0 : *idSrc = *idDest;
799 : 0 : *idDest = tmp;
800 : 0 : }
801 : :
802 : 0 : spgPageIndexMultiDelete(&state, page,
803 : 0 : moveSrc, xldata->nMove,
804 : : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
805 : : InvalidBlockNumber,
806 : : InvalidOffsetNumber);
807 : :
808 [ # # ]: 0 : for (i = 0; i < xldata->nChain; i++)
809 : : {
810 : 0 : SpGistLeafTuple lt;
811 : :
812 : 0 : lt = (SpGistLeafTuple) PageGetItem(page,
813 : 0 : PageGetItemId(page, chainSrc[i]));
814 [ # # ]: 0 : Assert(lt->tupstate == SPGIST_LIVE);
815 : 0 : SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
816 : 0 : }
817 : :
818 : 0 : PageSetLSN(page, lsn);
819 : 0 : MarkBufferDirty(buffer);
820 : 0 : }
821 [ # # ]: 0 : if (BufferIsValid(buffer))
822 : 0 : UnlockReleaseBuffer(buffer);
823 : 0 : }
824 : :
825 : : static void
826 : 0 : spgRedoVacuumRoot(XLogReaderState *record)
827 : : {
828 : 0 : XLogRecPtr lsn = record->EndRecPtr;
829 : 0 : char *ptr = XLogRecGetData(record);
830 : 0 : spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
831 : 0 : OffsetNumber *toDelete;
832 : 0 : Buffer buffer;
833 : 0 : Page page;
834 : :
835 : 0 : toDelete = xldata->offsets;
836 : :
837 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
838 : : {
839 : 0 : page = BufferGetPage(buffer);
840 : :
841 : : /* The tuple numbers are in order */
842 : 0 : PageIndexMultiDelete(page, toDelete, xldata->nDelete);
843 : :
844 : 0 : PageSetLSN(page, lsn);
845 : 0 : MarkBufferDirty(buffer);
846 : 0 : }
847 [ # # ]: 0 : if (BufferIsValid(buffer))
848 : 0 : UnlockReleaseBuffer(buffer);
849 : 0 : }
850 : :
851 : : static void
852 : 0 : spgRedoVacuumRedirect(XLogReaderState *record)
853 : : {
854 : 0 : XLogRecPtr lsn = record->EndRecPtr;
855 : 0 : char *ptr = XLogRecGetData(record);
856 : 0 : spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
857 : 0 : OffsetNumber *itemToPlaceholder;
858 : 0 : Buffer buffer;
859 : :
860 : 0 : itemToPlaceholder = xldata->offsets;
861 : :
862 : : /*
863 : : * If any redirection tuples are being removed, make sure there are no
864 : : * live Hot Standby transactions that might need to see them.
865 : : */
866 [ # # ]: 0 : if (InHotStandby)
867 : : {
868 : 0 : RelFileLocator locator;
869 : :
870 : 0 : XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
871 : 0 : ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
872 : 0 : xldata->isCatalogRel,
873 : : locator);
874 : 0 : }
875 : :
876 [ # # ]: 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
877 : : {
878 : 0 : Page page = BufferGetPage(buffer);
879 : 0 : SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
880 : 0 : int i;
881 : :
882 : : /* Convert redirect pointers to plain placeholders */
883 [ # # ]: 0 : for (i = 0; i < xldata->nToPlaceholder; i++)
884 : : {
885 : 0 : SpGistDeadTuple dt;
886 : :
887 : 0 : dt = (SpGistDeadTuple) PageGetItem(page,
888 : 0 : PageGetItemId(page, itemToPlaceholder[i]));
889 [ # # ]: 0 : Assert(dt->tupstate == SPGIST_REDIRECT);
890 : 0 : dt->tupstate = SPGIST_PLACEHOLDER;
891 : 0 : ItemPointerSetInvalid(&dt->pointer);
892 : 0 : }
893 : :
894 [ # # ]: 0 : Assert(opaque->nRedirection >= xldata->nToPlaceholder);
895 : 0 : opaque->nRedirection -= xldata->nToPlaceholder;
896 : 0 : opaque->nPlaceholder += xldata->nToPlaceholder;
897 : :
898 : : /* Remove placeholder tuples at end of page */
899 [ # # ]: 0 : if (xldata->firstPlaceholder != InvalidOffsetNumber)
900 : : {
901 : 0 : int max = PageGetMaxOffsetNumber(page);
902 : 0 : OffsetNumber *toDelete;
903 : :
904 : 0 : toDelete = palloc_array(OffsetNumber, max);
905 : :
906 [ # # ]: 0 : for (i = xldata->firstPlaceholder; i <= max; i++)
907 : 0 : toDelete[i - xldata->firstPlaceholder] = i;
908 : :
909 : 0 : i = max - xldata->firstPlaceholder + 1;
910 [ # # ]: 0 : Assert(opaque->nPlaceholder >= i);
911 : 0 : opaque->nPlaceholder -= i;
912 : :
913 : : /* The array is sorted, so can use PageIndexMultiDelete */
914 : 0 : PageIndexMultiDelete(page, toDelete, i);
915 : :
916 : 0 : pfree(toDelete);
917 : 0 : }
918 : :
919 : 0 : PageSetLSN(page, lsn);
920 : 0 : MarkBufferDirty(buffer);
921 : 0 : }
922 [ # # ]: 0 : if (BufferIsValid(buffer))
923 : 0 : UnlockReleaseBuffer(buffer);
924 : 0 : }
925 : :
926 : : void
927 : 0 : spg_redo(XLogReaderState *record)
928 : : {
929 : 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
930 : 0 : MemoryContext oldCxt;
931 : :
932 : 0 : oldCxt = MemoryContextSwitchTo(opCtx);
933 [ # # # # : 0 : switch (info)
# # # #
# ]
934 : : {
935 : : case XLOG_SPGIST_ADD_LEAF:
936 : 0 : spgRedoAddLeaf(record);
937 : 0 : break;
938 : : case XLOG_SPGIST_MOVE_LEAFS:
939 : 0 : spgRedoMoveLeafs(record);
940 : 0 : break;
941 : : case XLOG_SPGIST_ADD_NODE:
942 : 0 : spgRedoAddNode(record);
943 : 0 : break;
944 : : case XLOG_SPGIST_SPLIT_TUPLE:
945 : 0 : spgRedoSplitTuple(record);
946 : 0 : break;
947 : : case XLOG_SPGIST_PICKSPLIT:
948 : 0 : spgRedoPickSplit(record);
949 : 0 : break;
950 : : case XLOG_SPGIST_VACUUM_LEAF:
951 : 0 : spgRedoVacuumLeaf(record);
952 : 0 : break;
953 : : case XLOG_SPGIST_VACUUM_ROOT:
954 : 0 : spgRedoVacuumRoot(record);
955 : 0 : break;
956 : : case XLOG_SPGIST_VACUUM_REDIRECT:
957 : 0 : spgRedoVacuumRedirect(record);
958 : 0 : break;
959 : : default:
960 [ # # # # ]: 0 : elog(PANIC, "spg_redo: unknown op code %u", info);
961 : 0 : }
962 : :
963 : 0 : MemoryContextSwitchTo(oldCxt);
964 : 0 : MemoryContextReset(opCtx);
965 : 0 : }
966 : :
967 : : void
968 : 0 : spg_xlog_startup(void)
969 : : {
970 : 0 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
971 : : "SP-GiST temporary context",
972 : : ALLOCSET_DEFAULT_SIZES);
973 : 0 : }
974 : :
975 : : void
976 : 0 : spg_xlog_cleanup(void)
977 : : {
978 : 0 : MemoryContextDelete(opCtx);
979 : 0 : opCtx = NULL;
980 : 0 : }
981 : :
982 : : /*
983 : : * Mask a SpGist page before performing consistency checks on it.
984 : : */
985 : : void
986 : 0 : spg_mask(char *pagedata, BlockNumber blkno)
987 : : {
988 : 0 : Page page = (Page) pagedata;
989 : 0 : PageHeader pagehdr = (PageHeader) page;
990 : :
991 : 0 : mask_page_lsn_and_checksum(page);
992 : :
993 : 0 : mask_page_hint_bits(page);
994 : :
995 : : /*
996 : : * Mask the unused space, but only if the page's pd_lower appears to have
997 : : * been set correctly.
998 : : */
999 [ # # ]: 0 : if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1000 : 0 : mask_unused_space(page);
1001 : 0 : }
|