Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogwait.c
4 : : * Implements waiting for WAL operations to reach specific LSNs.
5 : : *
6 : : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/access/transam/xlogwait.c
10 : : *
11 : : * NOTES
12 : : * This file implements waiting for WAL operations to reach specific LSNs
13 : : * on both physical standby and primary servers. The core idea is simple:
14 : : * every process that wants to wait publishes the LSN it needs to the
15 : : * shared memory, and the appropriate process (startup on standby,
16 : : * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 : : * once that LSN has been reached.
18 : : *
19 : : * The shared memory used by this module comprises a procInfos
20 : : * per-backend array with the information of the awaited LSN for each
21 : : * of the backend processes. The elements of that array are organized
22 : : * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 : : * allows for very fast finding of the least awaited LSN for each type.
24 : : *
25 : : * In addition, the least-awaited LSN for each type is cached in the
26 : : * minWaitedLSN array. The waiter process publishes information about
27 : : * itself to the shared memory and waits on the latch until it is woken
28 : : * up by the appropriate process, standby is promoted, or the postmaster
29 : : * dies. Then, it cleans information about itself in the shared memory.
30 : : *
31 : : * On standby servers:
32 : : * - After replaying a WAL record, the startup process performs a fast
33 : : * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 : : * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 : : * whose awaited LSNs are reached.
36 : : * - After receiving WAL, the walreceiver process performs similar checks
37 : : * against the flush and write LSNs, waking up waiters in the FLUSH
38 : : * and WRITE heaps, respectively.
39 : : *
40 : : * On primary servers: After flushing WAL, the WAL writer or backend
41 : : * process performs a similar check against the flush LSN and wakes up
42 : : * waiters whose target flush LSNs have been reached.
43 : : *
44 : : *-------------------------------------------------------------------------
45 : : */
46 : :
47 : : #include "postgres.h"
48 : :
49 : : #include <float.h>
50 : :
51 : : #include "access/xlog.h"
52 : : #include "access/xlogrecovery.h"
53 : : #include "access/xlogwait.h"
54 : : #include "miscadmin.h"
55 : : #include "pgstat.h"
56 : : #include "replication/walreceiver.h"
57 : : #include "storage/latch.h"
58 : : #include "storage/proc.h"
59 : : #include "storage/shmem.h"
60 : : #include "utils/fmgrprotos.h"
61 : : #include "utils/pg_lsn.h"
62 : : #include "utils/snapmgr.h"
63 : :
64 : :
65 : : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
66 : : void *arg);
67 : :
68 : : struct WaitLSNState *waitLSNState = NULL;
69 : :
70 : : /*
71 : : * Wait event for each WaitLSNType, used with WaitLatch() to report
72 : : * the wait in pg_stat_activity.
73 : : */
74 : : static const uint32 WaitLSNWaitEvents[] = {
75 : : [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
76 : : [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
77 : : [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
78 : : [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
79 : : };
80 : :
81 : : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
82 : : "WaitLSNWaitEvents must match WaitLSNType enum");
83 : :
84 : : /*
85 : : * Get the current LSN for the specified wait type.
86 : : */
87 : : XLogRecPtr
88 : 0 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
89 : : {
90 [ # # ]: 0 : Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
91 : :
92 [ # # # # : 0 : switch (lsnType)
# ]
93 : : {
94 : : case WAIT_LSN_TYPE_STANDBY_REPLAY:
95 : 0 : return GetXLogReplayRecPtr(NULL);
96 : :
97 : : case WAIT_LSN_TYPE_STANDBY_WRITE:
98 : 0 : return GetWalRcvWriteRecPtr();
99 : :
100 : : case WAIT_LSN_TYPE_STANDBY_FLUSH:
101 : 0 : return GetWalRcvFlushRecPtr(NULL, NULL);
102 : :
103 : : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
104 : 0 : return GetFlushRecPtr(NULL);
105 : : }
106 : :
107 [ # # # # ]: 0 : elog(ERROR, "invalid LSN wait type: %d", lsnType);
108 : 0 : pg_unreachable();
109 : 0 : }
110 : :
111 : : /* Report the amount of shared memory space needed for WaitLSNState. */
112 : : Size
113 : 15 : WaitLSNShmemSize(void)
114 : : {
115 : 15 : Size size;
116 : :
117 : 15 : size = offsetof(WaitLSNState, procInfos);
118 : 15 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
119 : 30 : return size;
120 : 15 : }
121 : :
122 : : /* Initialize the WaitLSNState in the shared memory. */
123 : : void
124 : 6 : WaitLSNShmemInit(void)
125 : : {
126 : 6 : bool found;
127 : :
128 : 6 : waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
129 : 6 : WaitLSNShmemSize(),
130 : : &found);
131 [ - + ]: 6 : if (!found)
132 : : {
133 : 6 : int i;
134 : :
135 : : /* Initialize heaps and tracking */
136 [ + + ]: 30 : for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
137 : : {
138 : 24 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
139 : 24 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
140 : 24 : }
141 : :
142 : : /* Initialize process info array */
143 : 6 : memset(&waitLSNState->procInfos, 0,
144 : : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
145 : 6 : }
146 : 6 : }
147 : :
148 : : /*
149 : : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
150 : : * LSN, so that the waiter with smallest LSN is at the top.
151 : : */
152 : : static int
153 : 0 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
154 : : {
155 : 0 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
156 : 0 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
157 : :
158 [ # # ]: 0 : if (aproc->waitLSN < bproc->waitLSN)
159 : 0 : return 1;
160 [ # # ]: 0 : else if (aproc->waitLSN > bproc->waitLSN)
161 : 0 : return -1;
162 : : else
163 : 0 : return 0;
164 : 0 : }
165 : :
166 : : /*
167 : : * Update minimum waited LSN for the specified LSN type
168 : : */
169 : : static void
170 : 12 : updateMinWaitedLSN(WaitLSNType lsnType)
171 : : {
172 : 12 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
173 : 12 : int i = (int) lsnType;
174 : :
175 [ + - ]: 12 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
176 : :
177 [ + - ]: 12 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
178 : : {
179 : 0 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
180 : 0 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
181 : :
182 : 0 : minWaitedLSN = procInfo->waitLSN;
183 : 0 : }
184 : 12 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
185 : 12 : }
186 : :
187 : : /*
188 : : * Add current process to appropriate waiters heap based on LSN type
189 : : */
190 : : static void
191 : 0 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
192 : : {
193 : 0 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
194 : 0 : int i = (int) lsnType;
195 : :
196 [ # # ]: 0 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
197 : :
198 : 0 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
199 : :
200 : 0 : procInfo->procno = MyProcNumber;
201 : 0 : procInfo->waitLSN = lsn;
202 : 0 : procInfo->lsnType = lsnType;
203 : :
204 [ # # ]: 0 : Assert(!procInfo->inHeap);
205 : 0 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
206 : 0 : procInfo->inHeap = true;
207 : 0 : updateMinWaitedLSN(lsnType);
208 : :
209 : 0 : LWLockRelease(WaitLSNLock);
210 : 0 : }
211 : :
212 : : /*
213 : : * Remove current process from appropriate waiters heap based on LSN type
214 : : */
215 : : static void
216 : 0 : deleteLSNWaiter(WaitLSNType lsnType)
217 : : {
218 : 0 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
219 : 0 : int i = (int) lsnType;
220 : :
221 [ # # ]: 0 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
222 : :
223 : 0 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
224 : :
225 [ # # ]: 0 : Assert(procInfo->lsnType == lsnType);
226 : :
227 [ # # ]: 0 : if (procInfo->inHeap)
228 : : {
229 : 0 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
230 : 0 : procInfo->inHeap = false;
231 : 0 : updateMinWaitedLSN(lsnType);
232 : 0 : }
233 : :
234 : 0 : LWLockRelease(WaitLSNLock);
235 : 0 : }
236 : :
237 : : /*
238 : : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
239 : : * on the stack. It should be enough to take single iteration for most cases.
240 : : */
241 : : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
242 : :
243 : : /*
244 : : * Remove waiters whose LSN has been reached from the heap and set their
245 : : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
246 : : * and set latches for all waiters.
247 : : *
248 : : * This function first accumulates waiters to wake up into an array, then
249 : : * wakes them up without holding a WaitLSNLock. The array size is static and
250 : : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
251 : : * to wake up all the waiters at once in the vast majority of cases. However,
252 : : * if there are more waiters, this function will loop to process them in
253 : : * multiple chunks.
254 : : */
255 : : static void
256 : 12 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
257 : : {
258 : 12 : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
259 : 12 : int numWakeUpProcs;
260 : 12 : int i = (int) lsnType;
261 : :
262 [ + - ]: 12 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
263 : :
264 : 12 : do
265 : : {
266 : 12 : int j;
267 : :
268 : 12 : numWakeUpProcs = 0;
269 : 12 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
270 : :
271 : : /*
272 : : * Iterate the waiters heap until we find LSN not yet reached. Record
273 : : * process numbers to wake up, but send wakeups after releasing lock.
274 : : */
275 [ + - ]: 12 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
276 : : {
277 : 0 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
278 : 0 : WaitLSNProcInfo *procInfo;
279 : :
280 : : /* Get procInfo using appropriate heap node */
281 : 0 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
282 : :
283 [ # # # # ]: 0 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
284 : 0 : break;
285 : :
286 [ # # ]: 0 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
287 : 0 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
288 : 0 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
289 : :
290 : : /* Update appropriate flag */
291 : 0 : procInfo->inHeap = false;
292 : :
293 [ # # ]: 0 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
294 : 0 : break;
295 [ # # # ]: 0 : }
296 : :
297 : 12 : updateMinWaitedLSN(lsnType);
298 : 12 : LWLockRelease(WaitLSNLock);
299 : :
300 : : /*
301 : : * Set latches for processes whose waited LSNs have been reached.
302 : : * Since SetLatch() is a time-consuming operation, we do this outside
303 : : * of WaitLSNLock. This is safe because procLatch is never freed, so
304 : : * at worst we may set a latch for the wrong process or for no process
305 : : * at all, which is harmless.
306 : : */
307 [ - + ]: 12 : for (j = 0; j < numWakeUpProcs; j++)
308 : 0 : SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
309 : :
310 [ - + ]: 12 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
311 : 12 : }
312 : :
313 : : /*
314 : : * Wake up processes waiting for LSN to reach currentLSN
315 : : */
316 : : void
317 : 12 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
318 : : {
319 : 12 : int i = (int) lsnType;
320 : :
321 [ + - ]: 12 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
322 : :
323 : : /*
324 : : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
325 : : * "wake all waiters" (e.g., during promotion when recovery ends).
326 : : */
327 [ - + # # ]: 12 : if (XLogRecPtrIsValid(currentLSN) &&
328 : 0 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
329 : 0 : return;
330 : :
331 : 12 : wakeupWaiters(lsnType, currentLSN);
332 [ - + ]: 12 : }
333 : :
334 : : /*
335 : : * Clean up LSN waiters for exiting process
336 : : */
337 : : void
338 : 7815 : WaitLSNCleanup(void)
339 : : {
340 [ - + ]: 7815 : if (waitLSNState)
341 : : {
342 : : /*
343 : : * We do a fast-path check of the inHeap flag without the lock. This
344 : : * flag is set to true only by the process itself. So, it's only
345 : : * possible to get a false positive. But that will be eliminated by a
346 : : * recheck inside deleteLSNWaiter().
347 : : */
348 [ + - ]: 7815 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
349 : 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
350 : 7815 : }
351 : 7815 : }
352 : :
353 : : /*
354 : : * Check if the given LSN type requires recovery to be in progress.
355 : : * Standby wait types (replay, write, flush) require recovery;
356 : : * primary wait types (flush) do not.
357 : : */
358 : : static inline bool
359 : 0 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
360 : : {
361 [ # # ]: 0 : return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
362 [ # # ]: 0 : t == WAIT_LSN_TYPE_STANDBY_WRITE ||
363 : 0 : t == WAIT_LSN_TYPE_STANDBY_FLUSH;
364 : : }
365 : :
366 : : /*
367 : : * Wait using MyLatch till the given LSN is reached, the replica gets
368 : : * promoted, or the postmaster dies.
369 : : *
370 : : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
371 : : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
372 : : * or replica got promoted before the target LSN reached.
373 : : */
374 : : WaitLSNResult
375 : 0 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
376 : : {
377 : 0 : XLogRecPtr currentLSN;
378 : 0 : TimestampTz endtime = 0;
379 : 0 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
380 : :
381 : : /* Shouldn't be called when shmem isn't initialized */
382 [ # # ]: 0 : Assert(waitLSNState);
383 : :
384 : : /* Should have a valid proc number */
385 [ # # ]: 0 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
386 : :
387 [ # # ]: 0 : if (timeout > 0)
388 : : {
389 : 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
390 : 0 : wake_events |= WL_TIMEOUT;
391 : 0 : }
392 : :
393 : : /*
394 : : * Add our process to the waiters heap. It might happen that target LSN
395 : : * gets reached before we do. The check at the beginning of the loop
396 : : * below prevents the race condition.
397 : : */
398 : 0 : addLSNWaiter(targetLSN, lsnType);
399 : :
400 : 0 : for (;;)
401 : : {
402 : 0 : int rc;
403 : 0 : long delay_ms = -1;
404 : :
405 : : /* Get current LSN for the wait type */
406 : 0 : currentLSN = GetCurrentLSNForWaitType(lsnType);
407 : :
408 : : /* Check that recovery is still in-progress */
409 [ # # # # ]: 0 : if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
410 : : {
411 : : /*
412 : : * Recovery was ended, but check if target LSN was already
413 : : * reached.
414 : : */
415 : 0 : deleteLSNWaiter(lsnType);
416 : :
417 [ # # # # ]: 0 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
418 : 0 : return WAIT_LSN_RESULT_SUCCESS;
419 : 0 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
420 : : }
421 : : else
422 : : {
423 : : /* Check if the waited LSN has been reached */
424 [ # # ]: 0 : if (targetLSN <= currentLSN)
425 : 0 : break;
426 : : }
427 : :
428 [ # # ]: 0 : if (timeout > 0)
429 : : {
430 : 0 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
431 [ # # ]: 0 : if (delay_ms <= 0)
432 : 0 : break;
433 : 0 : }
434 : :
435 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
436 : :
437 : 0 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
438 : 0 : WaitLSNWaitEvents[lsnType]);
439 : :
440 : : /*
441 : : * Emergency bailout if postmaster has died. This is to avoid the
442 : : * necessity for manual cleanup of all postmaster children.
443 : : */
444 [ # # ]: 0 : if (rc & WL_POSTMASTER_DEATH)
445 [ # # # # ]: 0 : ereport(FATAL,
446 : : errcode(ERRCODE_ADMIN_SHUTDOWN),
447 : : errmsg("terminating connection due to unexpected postmaster exit"),
448 : : errcontext("while waiting for LSN"));
449 : :
450 [ # # ]: 0 : if (rc & WL_LATCH_SET)
451 : 0 : ResetLatch(MyLatch);
452 [ # # # ]: 0 : }
453 : :
454 : : /*
455 : : * Delete our process from the shared memory heap. We might already be
456 : : * deleted by the startup process. The 'inHeap' flags prevents us from
457 : : * the double deletion.
458 : : */
459 : 0 : deleteLSNWaiter(lsnType);
460 : :
461 : : /*
462 : : * If we didn't reach the target LSN, we must be exited by timeout.
463 : : */
464 [ # # ]: 0 : if (targetLSN > currentLSN)
465 : 0 : return WAIT_LSN_RESULT_TIMEOUT;
466 : :
467 : 0 : return WAIT_LSN_RESULT_SUCCESS;
468 : 0 : }
|