Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * wait.c
4 : : * Implements WAIT FOR, which allows waiting for events such as
5 : : * time passing or LSN having been replayed, flushed, or written.
6 : : *
7 : : * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/wait.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : : #include "postgres.h"
15 : :
16 : : #include <math.h>
17 : :
18 : : #include "access/xlog.h"
19 : : #include "access/xlogrecovery.h"
20 : : #include "access/xlogwait.h"
21 : : #include "commands/defrem.h"
22 : : #include "commands/wait.h"
23 : : #include "executor/executor.h"
24 : : #include "parser/parse_node.h"
25 : : #include "storage/proc.h"
26 : : #include "utils/builtins.h"
27 : : #include "utils/guc.h"
28 : : #include "utils/pg_lsn.h"
29 : : #include "utils/snapmgr.h"
30 : :
31 : :
32 : : void
33 : 0 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
34 : : {
35 : 0 : XLogRecPtr lsn;
36 : 0 : int64 timeout = 0;
37 : 0 : WaitLSNResult waitLSNResult;
38 : 0 : WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
39 : 0 : bool throw = true;
40 : 0 : TupleDesc tupdesc;
41 : 0 : TupOutputState *tstate;
42 : 0 : const char *result = "<unset>";
43 : 0 : bool timeout_specified = false;
44 : 0 : bool no_throw_specified = false;
45 : 0 : bool mode_specified = false;
46 : :
47 : : /* Parse and validate the mandatory LSN */
48 : 0 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
49 : : CStringGetDatum(stmt->lsn_literal)));
50 : :
51 [ # # # # : 0 : foreach_node(DefElem, defel, stmt->options)
# # # # ]
52 : : {
53 [ # # ]: 0 : if (strcmp(defel->defname, "mode") == 0)
54 : : {
55 : 0 : char *mode_str;
56 : :
57 [ # # ]: 0 : if (mode_specified)
58 : 0 : errorConflictingDefElem(defel, pstate);
59 : 0 : mode_specified = true;
60 : :
61 : 0 : mode_str = defGetString(defel);
62 : :
63 [ # # ]: 0 : if (pg_strcasecmp(mode_str, "standby_replay") == 0)
64 : 0 : lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
65 [ # # ]: 0 : else if (pg_strcasecmp(mode_str, "standby_write") == 0)
66 : 0 : lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
67 [ # # ]: 0 : else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
68 : 0 : lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
69 [ # # ]: 0 : else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
70 : 0 : lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
71 : : else
72 [ # # # # ]: 0 : ereport(ERROR,
73 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 : : errmsg("unrecognized value for %s option \"%s\": \"%s\"",
75 : : "WAIT", defel->defname, mode_str),
76 : : parser_errposition(pstate, defel->location)));
77 : 0 : }
78 [ # # ]: 0 : else if (strcmp(defel->defname, "timeout") == 0)
79 : : {
80 : 0 : char *timeout_str;
81 : 0 : const char *hintmsg;
82 : 0 : double result;
83 : :
84 [ # # ]: 0 : if (timeout_specified)
85 : 0 : errorConflictingDefElem(defel, pstate);
86 : 0 : timeout_specified = true;
87 : :
88 : 0 : timeout_str = defGetString(defel);
89 : :
90 [ # # ]: 0 : if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
91 : : {
92 [ # # # # : 0 : ereport(ERROR,
# # ]
93 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
94 : : errmsg("invalid timeout value: \"%s\"", timeout_str),
95 : : hintmsg ? errhint("%s", _(hintmsg)) : 0);
96 : 0 : }
97 : :
98 : : /*
99 : : * Get rid of any fractional part in the input. This is so we
100 : : * don't fail on just-out-of-range values that would round into
101 : : * range.
102 : : */
103 : 0 : result = rint(result);
104 : :
105 : : /* Range check */
106 [ # # # # : 0 : if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
# # # # #
# ]
107 [ # # # # ]: 0 : ereport(ERROR,
108 : : errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
109 : : errmsg("timeout value is out of range"));
110 : :
111 [ # # ]: 0 : if (result < 0)
112 [ # # # # ]: 0 : ereport(ERROR,
113 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114 : : errmsg("timeout cannot be negative"));
115 : :
116 : 0 : timeout = (int64) result;
117 : 0 : }
118 [ # # ]: 0 : else if (strcmp(defel->defname, "no_throw") == 0)
119 : : {
120 [ # # ]: 0 : if (no_throw_specified)
121 : 0 : errorConflictingDefElem(defel, pstate);
122 : :
123 : 0 : no_throw_specified = true;
124 : :
125 : 0 : throw = !defGetBoolean(defel);
126 : 0 : }
127 : : else
128 : : {
129 [ # # # # ]: 0 : ereport(ERROR,
130 : : errcode(ERRCODE_SYNTAX_ERROR),
131 : : errmsg("option \"%s\" not recognized",
132 : : defel->defname),
133 : : parser_errposition(pstate, defel->location));
134 : : }
135 : 0 : }
136 : :
137 : : /*
138 : : * We are going to wait for the LSN. We should first care that we don't
139 : : * hold a snapshot and correspondingly our MyProc->xmin is invalid.
140 : : * Otherwise, our snapshot could prevent the replay of WAL records
141 : : * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
142 : : * command, not a procedure or function.
143 : : *
144 : : * At first, we should check there is no active snapshot. According to
145 : : * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
146 : : * processed with a snapshot. Thankfully, we can pop this snapshot,
147 : : * because PortalRunUtility() can tolerate this.
148 : : */
149 [ # # ]: 0 : if (ActiveSnapshotSet())
150 : 0 : PopActiveSnapshot();
151 : :
152 : : /*
153 : : * At second, invalidate a catalog snapshot if any. And we should be done
154 : : * with the preparation.
155 : : */
156 : 0 : InvalidateCatalogSnapshot();
157 : :
158 : : /* Give up if there is still an active or registered snapshot. */
159 [ # # ]: 0 : if (HaveRegisteredOrActiveSnapshot())
160 [ # # # # ]: 0 : ereport(ERROR,
161 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
162 : : errmsg("WAIT FOR must be called without an active or registered snapshot"),
163 : : errdetail("WAIT FOR cannot be executed from a function or procedure, nor within a transaction with an isolation level higher than READ COMMITTED."));
164 : :
165 : : /*
166 : : * As the result we should hold no snapshot, and correspondingly our xmin
167 : : * should be unset.
168 : : */
169 [ # # ]: 0 : Assert(MyProc->xmin == InvalidTransactionId);
170 : :
171 : : /*
172 : : * Validate that the requested mode matches the current server state.
173 : : * Primary modes can only be used on a primary.
174 : : */
175 [ # # ]: 0 : if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
176 : : {
177 [ # # ]: 0 : if (RecoveryInProgress())
178 [ # # # # ]: 0 : ereport(ERROR,
179 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
180 : : errmsg("recovery is in progress"),
181 : : errhint("Waiting for primary_flush can only be done on a primary server. "
182 : : "Use standby_flush mode on a standby server.")));
183 : 0 : }
184 : :
185 : : /* Now wait for the LSN */
186 : 0 : waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
187 : :
188 : : /*
189 : : * Process the result of WaitForLSN(). Throw appropriate error if needed.
190 : : */
191 [ # # # # ]: 0 : switch (waitLSNResult)
192 : : {
193 : : case WAIT_LSN_RESULT_SUCCESS:
194 : : /* Nothing to do on success */
195 : 0 : result = "success";
196 : 0 : break;
197 : :
198 : : case WAIT_LSN_RESULT_TIMEOUT:
199 [ # # ]: 0 : if (throw)
200 : : {
201 : 0 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
202 : :
203 [ # # # # : 0 : switch (lsnType)
# ]
204 : : {
205 : : case WAIT_LSN_TYPE_STANDBY_REPLAY:
206 [ # # # # ]: 0 : ereport(ERROR,
207 : : errcode(ERRCODE_QUERY_CANCELED),
208 : : errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
209 : : LSN_FORMAT_ARGS(lsn),
210 : : LSN_FORMAT_ARGS(currentLSN)));
211 : 0 : break;
212 : :
213 : : case WAIT_LSN_TYPE_STANDBY_WRITE:
214 [ # # # # ]: 0 : ereport(ERROR,
215 : : errcode(ERRCODE_QUERY_CANCELED),
216 : : errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
217 : : LSN_FORMAT_ARGS(lsn),
218 : : LSN_FORMAT_ARGS(currentLSN)));
219 : 0 : break;
220 : :
221 : : case WAIT_LSN_TYPE_STANDBY_FLUSH:
222 [ # # # # ]: 0 : ereport(ERROR,
223 : : errcode(ERRCODE_QUERY_CANCELED),
224 : : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
225 : : LSN_FORMAT_ARGS(lsn),
226 : : LSN_FORMAT_ARGS(currentLSN)));
227 : 0 : break;
228 : :
229 : : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
230 [ # # # # ]: 0 : ereport(ERROR,
231 : : errcode(ERRCODE_QUERY_CANCELED),
232 : : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
233 : : LSN_FORMAT_ARGS(lsn),
234 : : LSN_FORMAT_ARGS(currentLSN)));
235 : 0 : break;
236 : :
237 : : default:
238 [ # # # # ]: 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
239 : 0 : }
240 : 0 : }
241 : : else
242 : 0 : result = "timeout";
243 : 0 : break;
244 : :
245 : : case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
246 [ # # ]: 0 : if (throw)
247 : : {
248 [ # # ]: 0 : if (PromoteIsTriggered())
249 : : {
250 : 0 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
251 : :
252 [ # # # # ]: 0 : switch (lsnType)
253 : : {
254 : : case WAIT_LSN_TYPE_STANDBY_REPLAY:
255 [ # # # # ]: 0 : ereport(ERROR,
256 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
257 : : errmsg("recovery is not in progress"),
258 : : errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
259 : : LSN_FORMAT_ARGS(lsn),
260 : : LSN_FORMAT_ARGS(currentLSN)));
261 : 0 : break;
262 : :
263 : : case WAIT_LSN_TYPE_STANDBY_WRITE:
264 [ # # # # ]: 0 : ereport(ERROR,
265 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
266 : : errmsg("recovery is not in progress"),
267 : : errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
268 : : LSN_FORMAT_ARGS(lsn),
269 : : LSN_FORMAT_ARGS(currentLSN)));
270 : 0 : break;
271 : :
272 : : case WAIT_LSN_TYPE_STANDBY_FLUSH:
273 [ # # # # ]: 0 : ereport(ERROR,
274 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
275 : : errmsg("recovery is not in progress"),
276 : : errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
277 : : LSN_FORMAT_ARGS(lsn),
278 : : LSN_FORMAT_ARGS(currentLSN)));
279 : 0 : break;
280 : :
281 : : default:
282 [ # # # # ]: 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
283 : 0 : }
284 : 0 : }
285 : : else
286 : : {
287 [ # # # # ]: 0 : switch (lsnType)
288 : : {
289 : : case WAIT_LSN_TYPE_STANDBY_REPLAY:
290 [ # # # # ]: 0 : ereport(ERROR,
291 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
292 : : errmsg("recovery is not in progress"),
293 : : errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
294 : 0 : break;
295 : :
296 : : case WAIT_LSN_TYPE_STANDBY_WRITE:
297 [ # # # # ]: 0 : ereport(ERROR,
298 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
299 : : errmsg("recovery is not in progress"),
300 : : errhint("Waiting for the standby_write LSN can only be executed during recovery."));
301 : 0 : break;
302 : :
303 : : case WAIT_LSN_TYPE_STANDBY_FLUSH:
304 [ # # # # ]: 0 : ereport(ERROR,
305 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
306 : : errmsg("recovery is not in progress"),
307 : : errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
308 : 0 : break;
309 : :
310 : : default:
311 [ # # # # ]: 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
312 : 0 : }
313 : : }
314 : 0 : }
315 : : else
316 : 0 : result = "not in recovery";
317 : 0 : break;
318 : : }
319 : :
320 : : /* need a tuple descriptor representing a single TEXT column */
321 : 0 : tupdesc = WaitStmtResultDesc(stmt);
322 : :
323 : : /* prepare for projection of tuples */
324 : 0 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
325 : :
326 : : /* Send it */
327 : 0 : do_text_output_oneline(tstate, result);
328 : :
329 : 0 : end_tup_output(tstate);
330 : 0 : }
331 : :
332 : : TupleDesc
333 : 0 : WaitStmtResultDesc(WaitStmt *stmt)
334 : : {
335 : 0 : TupleDesc tupdesc;
336 : :
337 : : /* Need a tuple descriptor representing a single TEXT column */
338 : 0 : tupdesc = CreateTemplateTupleDesc(1);
339 : 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
340 : : TEXTOID, -1, 0);
341 : 0 : return tupdesc;
342 : 0 : }
|