Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * twophase.c
4 : : * Two-phase commit support functions.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/access/transam/twophase.c
11 : : *
12 : : * NOTES
13 : : * Each global transaction is associated with a global transaction
14 : : * identifier (GID). The client assigns a GID to a postgres
15 : : * transaction with the PREPARE TRANSACTION command.
16 : : *
17 : : * We keep all active global transactions in a shared memory array.
18 : : * When the PREPARE TRANSACTION command is issued, the GID is
19 : : * reserved for the transaction in the array. This is done before
20 : : * a WAL entry is made, because the reservation checks for duplicate
21 : : * GIDs and aborts the transaction if there already is a global
22 : : * transaction in prepared state with the same GID.
23 : : *
24 : : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : : * the XID considered running by TransactionIdIsInProgress. It is also
26 : : * convenient as a PGPROC to hook the gxact's locks to.
27 : : *
28 : : * Information to recover prepared transactions in case of crash is
29 : : * now stored in WAL for the common case. In some cases there will be
30 : : * an extended period between preparing a GXACT and commit/abort, in
31 : : * which case we need to separately record prepared transaction data
32 : : * in permanent storage. This includes locking information, pending
33 : : * notifications etc. All that state information is written to the
34 : : * per-transaction state file in the pg_twophase directory.
35 : : * All prepared transactions will be written prior to shutdown.
36 : : *
37 : : * Life track of state data is following:
38 : : *
39 : : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : : * stores pointer to the start of the WAL record in
41 : : * gxact->prepare_start_lsn.
42 : : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : : * using prepare_start_lsn.
44 : : * * On checkpoint state data copied to files in pg_twophase directory and
45 : : * fsynced
46 : : * * If COMMIT happens after checkpoint then backend reads state data from
47 : : * files
48 : : *
49 : : * During replay and replication, TwoPhaseState also holds information
50 : : * about active prepared transactions that haven't been moved to disk yet.
51 : : *
52 : : * Replay of twophase records happens by the following rules:
53 : : *
54 : : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : : * TwoPhaseState with entries marked with gxact->inredo and
56 : : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : : * the redo position are discarded.
58 : : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : : * gxact->inredo is set to true for such entries.
60 : : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : : * that have gxact->inredo set and are behind the redo_horizon. We
62 : : * save them to disk and then switch gxact->ondisk to true.
63 : : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : : * If gxact->ondisk is true, the corresponding entry from the disk
65 : : * is additionally deleted.
66 : : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : : * and PrescanPreparedTransactions() have been modified to go through
68 : : * gxact->inredo entries that have not made it to disk.
69 : : *
70 : : *-------------------------------------------------------------------------
71 : : */
72 : : #include "postgres.h"
73 : :
74 : : #include <fcntl.h>
75 : : #include <sys/stat.h>
76 : : #include <time.h>
77 : : #include <unistd.h>
78 : :
79 : : #include "access/commit_ts.h"
80 : : #include "access/htup_details.h"
81 : : #include "access/subtrans.h"
82 : : #include "access/transam.h"
83 : : #include "access/twophase.h"
84 : : #include "access/twophase_rmgr.h"
85 : : #include "access/xact.h"
86 : : #include "access/xlog.h"
87 : : #include "access/xloginsert.h"
88 : : #include "access/xlogreader.h"
89 : : #include "access/xlogrecovery.h"
90 : : #include "access/xlogutils.h"
91 : : #include "catalog/pg_type.h"
92 : : #include "catalog/storage.h"
93 : : #include "funcapi.h"
94 : : #include "miscadmin.h"
95 : : #include "pg_trace.h"
96 : : #include "pgstat.h"
97 : : #include "replication/origin.h"
98 : : #include "replication/syncrep.h"
99 : : #include "storage/fd.h"
100 : : #include "storage/ipc.h"
101 : : #include "storage/md.h"
102 : : #include "storage/predicate.h"
103 : : #include "storage/proc.h"
104 : : #include "storage/procarray.h"
105 : : #include "utils/builtins.h"
106 : : #include "utils/injection_point.h"
107 : : #include "utils/memutils.h"
108 : : #include "utils/timestamp.h"
109 : :
110 : : /*
111 : : * Directory where Two-phase commit files reside within PGDATA
112 : : */
113 : : #define TWOPHASE_DIR "pg_twophase"
114 : :
115 : : /* GUC variable, can't be changed after startup */
116 : : int max_prepared_xacts = 0;
117 : :
118 : : /*
119 : : * This struct describes one global transaction that is in prepared state
120 : : * or attempting to become prepared.
121 : : *
122 : : * The lifecycle of a global transaction is:
123 : : *
124 : : * 1. After checking that the requested GID is not in use, set up an entry in
125 : : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
126 : : * and mark it as locked by my backend.
127 : : *
128 : : * 2. After successfully completing prepare, set valid = true and enter the
129 : : * referenced PGPROC into the global ProcArray.
130 : : *
131 : : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
132 : : * valid and not locked, then mark the entry as locked by storing my current
133 : : * proc number into locking_backend. This prevents concurrent attempts to
134 : : * commit or rollback the same prepared xact.
135 : : *
136 : : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
137 : : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
138 : : * the freelist.
139 : : *
140 : : * Note that if the preparing transaction fails between steps 1 and 2, the
141 : : * entry must be removed so that the GID and the GlobalTransaction struct
142 : : * can be reused. See AtAbort_Twophase().
143 : : *
144 : : * typedef struct GlobalTransactionData *GlobalTransaction appears in
145 : : * twophase.h
146 : : */
147 : :
148 : : typedef struct GlobalTransactionData
149 : : {
150 : : GlobalTransaction next; /* list link for free list */
151 : : int pgprocno; /* ID of associated dummy PGPROC */
152 : : TimestampTz prepared_at; /* time of preparation */
153 : :
154 : : /*
155 : : * Note that we need to keep track of two LSNs for each GXACT. We keep
156 : : * track of the start LSN because this is the address we must use to read
157 : : * state data back from WAL when committing a prepared GXACT. We keep
158 : : * track of the end LSN because that is the LSN we need to wait for prior
159 : : * to commit.
160 : : */
161 : : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
162 : : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
163 : : FullTransactionId fxid; /* The GXACT full xid */
164 : :
165 : : Oid owner; /* ID of user that executed the xact */
166 : : ProcNumber locking_backend; /* backend currently working on the xact */
167 : : bool valid; /* true if PGPROC entry is in proc array */
168 : : bool ondisk; /* true if prepare state file is on disk */
169 : : bool inredo; /* true if entry was added via xlog_redo */
170 : : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
171 : : } GlobalTransactionData;
172 : :
173 : : /*
174 : : * Two Phase Commit shared state. Access to this struct is protected
175 : : * by TwoPhaseStateLock.
176 : : */
177 : : typedef struct TwoPhaseStateData
178 : : {
179 : : /* Head of linked list of free GlobalTransactionData structs */
180 : : GlobalTransaction freeGXacts;
181 : :
182 : : /* Number of valid prepXacts entries. */
183 : : int numPrepXacts;
184 : :
185 : : /* There are max_prepared_xacts items in this array */
186 : : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
187 : : } TwoPhaseStateData;
188 : :
189 : : static TwoPhaseStateData *TwoPhaseState;
190 : :
191 : : /*
192 : : * Global transaction entry currently locked by us, if any. Note that any
193 : : * access to the entry pointed to by this variable must be protected by
194 : : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
195 : : * (since it's just local memory).
196 : : */
197 : : static GlobalTransaction MyLockedGxact = NULL;
198 : :
199 : : static bool twophaseExitRegistered = false;
200 : :
201 : : static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning);
202 : : static void RecordTransactionCommitPrepared(TransactionId xid,
203 : : int nchildren,
204 : : TransactionId *children,
205 : : int nrels,
206 : : RelFileLocator *rels,
207 : : int nstats,
208 : : xl_xact_stats_item *stats,
209 : : int ninvalmsgs,
210 : : SharedInvalidationMessage *invalmsgs,
211 : : bool initfileinval,
212 : : const char *gid);
213 : : static void RecordTransactionAbortPrepared(TransactionId xid,
214 : : int nchildren,
215 : : TransactionId *children,
216 : : int nrels,
217 : : RelFileLocator *rels,
218 : : int nstats,
219 : : xl_xact_stats_item *stats,
220 : : const char *gid);
221 : : static void ProcessRecords(char *bufptr, FullTransactionId fxid,
222 : : const TwoPhaseCallback callbacks[]);
223 : : static void RemoveGXact(GlobalTransaction gxact);
224 : :
225 : : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
226 : : static char *ProcessTwoPhaseBuffer(FullTransactionId fxid,
227 : : XLogRecPtr prepare_start_lsn,
228 : : bool fromdisk, bool setParent, bool setNextXid);
229 : : static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
230 : : const char *gid, TimestampTz prepared_at, Oid owner,
231 : : Oid databaseid);
232 : : static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
233 : : static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
234 : :
235 : : /*
236 : : * Initialization of shared memory
237 : : */
238 : : Size
239 : 15 : TwoPhaseShmemSize(void)
240 : : {
241 : 15 : Size size;
242 : :
243 : : /* Need the fixed struct, the array of pointers, and the GTD structs */
244 : 15 : size = offsetof(TwoPhaseStateData, prepXacts);
245 : 15 : size = add_size(size, mul_size(max_prepared_xacts,
246 : : sizeof(GlobalTransaction)));
247 : 15 : size = MAXALIGN(size);
248 : 15 : size = add_size(size, mul_size(max_prepared_xacts,
249 : : sizeof(GlobalTransactionData)));
250 : :
251 : 30 : return size;
252 : 15 : }
253 : :
254 : : void
255 : 6 : TwoPhaseShmemInit(void)
256 : : {
257 : 6 : bool found;
258 : :
259 : 6 : TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
260 : 6 : TwoPhaseShmemSize(),
261 : : &found);
262 [ + - ]: 6 : if (!IsUnderPostmaster)
263 : : {
264 : 6 : GlobalTransaction gxacts;
265 : 6 : int i;
266 : :
267 [ + - ]: 6 : Assert(!found);
268 : 6 : TwoPhaseState->freeGXacts = NULL;
269 : 6 : TwoPhaseState->numPrepXacts = 0;
270 : :
271 : : /*
272 : : * Initialize the linked list of free GlobalTransactionData structs
273 : : */
274 : 6 : gxacts = (GlobalTransaction)
275 : 12 : ((char *) TwoPhaseState +
276 : 6 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
277 : : sizeof(GlobalTransaction) * max_prepared_xacts));
278 [ + + ]: 8 : for (i = 0; i < max_prepared_xacts; i++)
279 : : {
280 : : /* insert into linked list */
281 : 2 : gxacts[i].next = TwoPhaseState->freeGXacts;
282 : 2 : TwoPhaseState->freeGXacts = &gxacts[i];
283 : :
284 : : /* associate it with a PGPROC assigned by InitProcGlobal */
285 : 2 : gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
286 : 2 : }
287 : 6 : }
288 : : else
289 [ # # ]: 0 : Assert(found);
290 : 6 : }
291 : :
292 : : /*
293 : : * Exit hook to unlock the global transaction entry we're working on.
294 : : */
295 : : static void
296 : 3 : AtProcExit_Twophase(int code, Datum arg)
297 : : {
298 : : /* same logic as abort */
299 : 3 : AtAbort_Twophase();
300 : 3 : }
301 : :
302 : : /*
303 : : * Abort hook to unlock the global transaction entry we're working on.
304 : : */
305 : : void
306 : 7019 : AtAbort_Twophase(void)
307 : : {
308 [ - + ]: 7019 : if (MyLockedGxact == NULL)
309 : 7019 : return;
310 : :
311 : : /*
312 : : * What to do with the locked global transaction entry? If we were in the
313 : : * process of preparing the transaction, but haven't written the WAL
314 : : * record and state file yet, the transaction must not be considered as
315 : : * prepared. Likewise, if we are in the process of finishing an
316 : : * already-prepared transaction, and fail after having already written the
317 : : * 2nd phase commit or rollback record to the WAL, the transaction should
318 : : * not be considered as prepared anymore. In those cases, just remove the
319 : : * entry from shared memory.
320 : : *
321 : : * Otherwise, the entry must be left in place so that the transaction can
322 : : * be finished later, so just unlock it.
323 : : *
324 : : * If we abort during prepare, after having written the WAL record, we
325 : : * might not have transferred all locks and other state to the prepared
326 : : * transaction yet. Likewise, if we abort during commit or rollback,
327 : : * after having written the WAL record, we might not have released all the
328 : : * resources held by the transaction yet. In those cases, the in-memory
329 : : * state can be wrong, but it's too late to back out.
330 : : */
331 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
332 [ # # ]: 0 : if (!MyLockedGxact->valid)
333 : 0 : RemoveGXact(MyLockedGxact);
334 : : else
335 : 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
336 : 0 : LWLockRelease(TwoPhaseStateLock);
337 : :
338 : 0 : MyLockedGxact = NULL;
339 : 7019 : }
340 : :
341 : : /*
342 : : * This is called after we have finished transferring state to the prepared
343 : : * PGPROC entry.
344 : : */
345 : : void
346 : 0 : PostPrepare_Twophase(void)
347 : : {
348 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
349 : 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
350 : 0 : LWLockRelease(TwoPhaseStateLock);
351 : :
352 : 0 : MyLockedGxact = NULL;
353 : 0 : }
354 : :
355 : :
356 : : /*
357 : : * MarkAsPreparing
358 : : * Reserve the GID for the given transaction.
359 : : */
360 : : GlobalTransaction
361 : 16 : MarkAsPreparing(FullTransactionId fxid, const char *gid,
362 : : TimestampTz prepared_at, Oid owner, Oid databaseid)
363 : : {
364 : 16 : GlobalTransaction gxact;
365 : 16 : int i;
366 : :
367 [ + - ]: 16 : if (strlen(gid) >= GIDSIZE)
368 [ # # # # ]: 0 : ereport(ERROR,
369 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
370 : : errmsg("transaction identifier \"%s\" is too long",
371 : : gid)));
372 : :
373 : : /* fail immediately if feature is disabled */
374 [ - + ]: 16 : if (max_prepared_xacts == 0)
375 [ + - + - ]: 16 : ereport(ERROR,
376 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
377 : : errmsg("prepared transactions are disabled"),
378 : : errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
379 : :
380 : : /* on first call, register the exit hook */
381 [ # # ]: 0 : if (!twophaseExitRegistered)
382 : : {
383 : 0 : before_shmem_exit(AtProcExit_Twophase, 0);
384 : 0 : twophaseExitRegistered = true;
385 : 0 : }
386 : :
387 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
388 : :
389 : : /* Check for conflicting GID */
390 [ # # ]: 0 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
391 : : {
392 : 0 : gxact = TwoPhaseState->prepXacts[i];
393 [ # # ]: 0 : if (strcmp(gxact->gid, gid) == 0)
394 : : {
395 [ # # # # ]: 0 : ereport(ERROR,
396 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
397 : : errmsg("transaction identifier \"%s\" is already in use",
398 : : gid)));
399 : 0 : }
400 : 0 : }
401 : :
402 : : /* Get a free gxact from the freelist */
403 [ # # ]: 0 : if (TwoPhaseState->freeGXacts == NULL)
404 [ # # # # ]: 0 : ereport(ERROR,
405 : : (errcode(ERRCODE_OUT_OF_MEMORY),
406 : : errmsg("maximum number of prepared transactions reached"),
407 : : errhint("Increase \"max_prepared_transactions\" (currently %d).",
408 : : max_prepared_xacts)));
409 : 0 : gxact = TwoPhaseState->freeGXacts;
410 : 0 : TwoPhaseState->freeGXacts = gxact->next;
411 : :
412 : 0 : MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
413 : :
414 : 0 : gxact->ondisk = false;
415 : :
416 : : /* And insert it into the active array */
417 [ # # ]: 0 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
418 : 0 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
419 : :
420 : 0 : LWLockRelease(TwoPhaseStateLock);
421 : :
422 : 0 : return gxact;
423 : 0 : }
424 : :
425 : : /*
426 : : * MarkAsPreparingGuts
427 : : *
428 : : * This uses a gxact struct and puts it into the active array.
429 : : * NOTE: this is also used when reloading a gxact after a crash; so avoid
430 : : * assuming that we can use very much backend context.
431 : : *
432 : : * Note: This function should be called with appropriate locks held.
433 : : */
434 : : static void
435 : 0 : MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
436 : : const char *gid, TimestampTz prepared_at, Oid owner,
437 : : Oid databaseid)
438 : : {
439 : 0 : PGPROC *proc;
440 : 0 : int i;
441 : 0 : TransactionId xid = XidFromFullTransactionId(fxid);
442 : :
443 [ # # ]: 0 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
444 : :
445 [ # # ]: 0 : Assert(gxact != NULL);
446 : 0 : proc = GetPGProcByNumber(gxact->pgprocno);
447 : :
448 : : /* Initialize the PGPROC entry */
449 [ # # # # : 0 : MemSet(proc, 0, sizeof(PGPROC));
# # # # #
# ]
450 : 0 : dlist_node_init(&proc->links);
451 : 0 : proc->waitStatus = PROC_WAIT_STATUS_OK;
452 [ # # ]: 0 : if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
453 : : {
454 : : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
455 : 0 : proc->vxid.lxid = MyProc->vxid.lxid;
456 : 0 : proc->vxid.procNumber = MyProcNumber;
457 : 0 : }
458 : : else
459 : : {
460 [ # # # # ]: 0 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
461 : : /* GetLockConflicts() uses this to specify a wait on the XID */
462 : 0 : proc->vxid.lxid = xid;
463 : 0 : proc->vxid.procNumber = INVALID_PROC_NUMBER;
464 : : }
465 : 0 : proc->xid = xid;
466 [ # # ]: 0 : Assert(proc->xmin == InvalidTransactionId);
467 : 0 : proc->delayChkptFlags = 0;
468 : 0 : proc->statusFlags = 0;
469 : 0 : proc->pid = 0;
470 : 0 : proc->databaseId = databaseid;
471 : 0 : proc->roleId = owner;
472 : 0 : proc->tempNamespaceId = InvalidOid;
473 : 0 : proc->isRegularBackend = false;
474 : 0 : proc->lwWaiting = LW_WS_NOT_WAITING;
475 : 0 : proc->lwWaitMode = 0;
476 : 0 : proc->waitLock = NULL;
477 : 0 : proc->waitProcLock = NULL;
478 : 0 : pg_atomic_init_u64(&proc->waitStart, 0);
479 [ # # ]: 0 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
480 : 0 : dlist_init(&proc->myProcLocks[i]);
481 : : /* subxid data must be filled later by GXactLoadSubxactData */
482 : 0 : proc->subxidStatus.overflowed = false;
483 : 0 : proc->subxidStatus.count = 0;
484 : :
485 : 0 : gxact->prepared_at = prepared_at;
486 : 0 : gxact->fxid = fxid;
487 : 0 : gxact->owner = owner;
488 : 0 : gxact->locking_backend = MyProcNumber;
489 : 0 : gxact->valid = false;
490 : 0 : gxact->inredo = false;
491 : 0 : strcpy(gxact->gid, gid);
492 : :
493 : : /*
494 : : * Remember that we have this GlobalTransaction entry locked for us. If we
495 : : * abort after this, we must release it.
496 : : */
497 : 0 : MyLockedGxact = gxact;
498 : 0 : }
499 : :
500 : : /*
501 : : * GXactLoadSubxactData
502 : : *
503 : : * If the transaction being persisted had any subtransactions, this must
504 : : * be called before MarkAsPrepared() to load information into the dummy
505 : : * PGPROC.
506 : : */
507 : : static void
508 : 0 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
509 : : TransactionId *children)
510 : : {
511 : 0 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
512 : :
513 : : /* We need no extra lock since the GXACT isn't valid yet */
514 [ # # ]: 0 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
515 : : {
516 : 0 : proc->subxidStatus.overflowed = true;
517 : 0 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
518 : 0 : }
519 [ # # ]: 0 : if (nsubxacts > 0)
520 : : {
521 : 0 : memcpy(proc->subxids.xids, children,
522 : : nsubxacts * sizeof(TransactionId));
523 : 0 : proc->subxidStatus.count = nsubxacts;
524 : 0 : }
525 : 0 : }
526 : :
527 : : /*
528 : : * MarkAsPrepared
529 : : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
530 : : *
531 : : * lock_held indicates whether caller already holds TwoPhaseStateLock.
532 : : */
533 : : static void
534 : 0 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
535 : : {
536 : : /* Lock here may be overkill, but I'm not convinced of that ... */
537 [ # # ]: 0 : if (!lock_held)
538 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
539 [ # # ]: 0 : Assert(!gxact->valid);
540 : 0 : gxact->valid = true;
541 [ # # ]: 0 : if (!lock_held)
542 : 0 : LWLockRelease(TwoPhaseStateLock);
543 : :
544 : : /*
545 : : * Put it into the global ProcArray so TransactionIdIsInProgress considers
546 : : * the XID as still running.
547 : : */
548 : 0 : ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
549 : 0 : }
550 : :
551 : : /*
552 : : * LockGXact
553 : : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
554 : : */
555 : : static GlobalTransaction
556 : 13 : LockGXact(const char *gid, Oid user)
557 : : {
558 : 13 : int i;
559 : :
560 : : /* on first call, register the exit hook */
561 [ + + ]: 13 : if (!twophaseExitRegistered)
562 : : {
563 : 3 : before_shmem_exit(AtProcExit_Twophase, 0);
564 : 3 : twophaseExitRegistered = true;
565 : 3 : }
566 : :
567 : 13 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
568 : :
569 [ - + ]: 13 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
570 : : {
571 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
572 : 0 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
573 : :
574 : : /* Ignore not-yet-valid GIDs */
575 [ # # ]: 0 : if (!gxact->valid)
576 : 0 : continue;
577 [ # # ]: 0 : if (strcmp(gxact->gid, gid) != 0)
578 : 0 : continue;
579 : :
580 : : /* Found it, but has someone else got it locked? */
581 [ # # ]: 0 : if (gxact->locking_backend != INVALID_PROC_NUMBER)
582 [ # # # # ]: 0 : ereport(ERROR,
583 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
584 : : errmsg("prepared transaction with identifier \"%s\" is busy",
585 : : gid)));
586 : :
587 [ # # # # ]: 0 : if (user != gxact->owner && !superuser_arg(user))
588 [ # # # # ]: 0 : ereport(ERROR,
589 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
590 : : errmsg("permission denied to finish prepared transaction"),
591 : : errhint("Must be superuser or the user that prepared the transaction.")));
592 : :
593 : : /*
594 : : * Note: it probably would be possible to allow committing from
595 : : * another database; but at the moment NOTIFY is known not to work and
596 : : * there may be some other issues as well. Hence disallow until
597 : : * someone gets motivated to make it work.
598 : : */
599 [ # # ]: 0 : if (MyDatabaseId != proc->databaseId)
600 [ # # # # ]: 0 : ereport(ERROR,
601 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
602 : : errmsg("prepared transaction belongs to another database"),
603 : : errhint("Connect to the database where the transaction was prepared to finish it.")));
604 : :
605 : : /* OK for me to lock it */
606 : 0 : gxact->locking_backend = MyProcNumber;
607 : 0 : MyLockedGxact = gxact;
608 : :
609 : 0 : LWLockRelease(TwoPhaseStateLock);
610 : :
611 : 0 : return gxact;
612 [ # # ]: 0 : }
613 : :
614 : 13 : LWLockRelease(TwoPhaseStateLock);
615 : :
616 [ + - + - ]: 13 : ereport(ERROR,
617 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
618 : : errmsg("prepared transaction with identifier \"%s\" does not exist",
619 : : gid)));
620 : :
621 : : /* NOTREACHED */
622 : 0 : return NULL;
623 : 0 : }
624 : :
625 : : /*
626 : : * RemoveGXact
627 : : * Remove the prepared transaction from the shared memory array.
628 : : *
629 : : * NB: caller should have already removed it from ProcArray
630 : : */
631 : : static void
632 : 0 : RemoveGXact(GlobalTransaction gxact)
633 : : {
634 : 0 : int i;
635 : :
636 [ # # ]: 0 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
637 : :
638 [ # # ]: 0 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
639 : : {
640 [ # # ]: 0 : if (gxact == TwoPhaseState->prepXacts[i])
641 : : {
642 : : /* remove from the active array */
643 : 0 : TwoPhaseState->numPrepXacts--;
644 : 0 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
645 : :
646 : : /* and put it back in the freelist */
647 : 0 : gxact->next = TwoPhaseState->freeGXacts;
648 : 0 : TwoPhaseState->freeGXacts = gxact;
649 : :
650 : 0 : return;
651 : : }
652 : 0 : }
653 : :
654 [ # # # # ]: 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
655 [ # # ]: 0 : }
656 : :
657 : : /*
658 : : * Returns an array of all prepared transactions for the user-level
659 : : * function pg_prepared_xact.
660 : : *
661 : : * The returned array and all its elements are copies of internal data
662 : : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
663 : : *
664 : : * WARNING -- we return even those transactions that are not fully prepared
665 : : * yet. The caller should filter them out if he doesn't want them.
666 : : *
667 : : * The returned array is palloc'd.
668 : : */
669 : : static int
670 : 11 : GetPreparedTransactionList(GlobalTransaction *gxacts)
671 : : {
672 : 11 : GlobalTransaction array;
673 : 11 : int num;
674 : 11 : int i;
675 : :
676 : 11 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
677 : :
678 [ - + ]: 11 : if (TwoPhaseState->numPrepXacts == 0)
679 : : {
680 : 11 : LWLockRelease(TwoPhaseStateLock);
681 : :
682 : 11 : *gxacts = NULL;
683 : 11 : return 0;
684 : : }
685 : :
686 : 0 : num = TwoPhaseState->numPrepXacts;
687 : 0 : array = palloc_array(GlobalTransactionData, num);
688 : 0 : *gxacts = array;
689 [ # # ]: 0 : for (i = 0; i < num; i++)
690 : 0 : memcpy(array + i, TwoPhaseState->prepXacts[i],
691 : : sizeof(GlobalTransactionData));
692 : :
693 : 0 : LWLockRelease(TwoPhaseStateLock);
694 : :
695 : 0 : return num;
696 : 11 : }
697 : :
698 : :
699 : : /* Working status for pg_prepared_xact */
700 : : typedef struct
701 : : {
702 : : GlobalTransaction array;
703 : : int ngxacts;
704 : : int currIdx;
705 : : } Working_State;
706 : :
707 : : /*
708 : : * pg_prepared_xact
709 : : * Produce a view with one row per prepared transaction.
710 : : *
711 : : * This function is here so we don't have to export the
712 : : * GlobalTransactionData struct definition.
713 : : */
714 : : Datum
715 : 11 : pg_prepared_xact(PG_FUNCTION_ARGS)
716 : : {
717 : 11 : FuncCallContext *funcctx;
718 : 11 : Working_State *status;
719 : :
720 [ - + ]: 11 : if (SRF_IS_FIRSTCALL())
721 : : {
722 : 11 : TupleDesc tupdesc;
723 : 11 : MemoryContext oldcontext;
724 : :
725 : : /* create a function context for cross-call persistence */
726 : 11 : funcctx = SRF_FIRSTCALL_INIT();
727 : :
728 : : /*
729 : : * Switch to memory context appropriate for multiple function calls
730 : : */
731 : 11 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
732 : :
733 : : /* build tupdesc for result tuples */
734 : : /* this had better match pg_prepared_xacts view in system_views.sql */
735 : 11 : tupdesc = CreateTemplateTupleDesc(5);
736 : 11 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
737 : : XIDOID, -1, 0);
738 : 11 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
739 : : TEXTOID, -1, 0);
740 : 11 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
741 : : TIMESTAMPTZOID, -1, 0);
742 : 11 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
743 : : OIDOID, -1, 0);
744 : 11 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
745 : : OIDOID, -1, 0);
746 : :
747 : 11 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
748 : :
749 : : /*
750 : : * Collect all the 2PC status information that we will format and send
751 : : * out as a result set.
752 : : */
753 : 11 : status = palloc_object(Working_State);
754 : 11 : funcctx->user_fctx = status;
755 : :
756 : 11 : status->ngxacts = GetPreparedTransactionList(&status->array);
757 : 11 : status->currIdx = 0;
758 : :
759 : 11 : MemoryContextSwitchTo(oldcontext);
760 : 11 : }
761 : :
762 : 11 : funcctx = SRF_PERCALL_SETUP();
763 : 11 : status = (Working_State *) funcctx->user_fctx;
764 : :
765 [ + - + - ]: 11 : while (status->array != NULL && status->currIdx < status->ngxacts)
766 : : {
767 : 0 : GlobalTransaction gxact = &status->array[status->currIdx++];
768 : 0 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
769 : 0 : Datum values[5] = {0};
770 : 0 : bool nulls[5] = {0};
771 : 0 : HeapTuple tuple;
772 : 0 : Datum result;
773 : :
774 [ # # ]: 0 : if (!gxact->valid)
775 : 0 : continue;
776 : :
777 : : /*
778 : : * Form tuple with appropriate data.
779 : : */
780 : :
781 : 0 : values[0] = TransactionIdGetDatum(proc->xid);
782 : 0 : values[1] = CStringGetTextDatum(gxact->gid);
783 : 0 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
784 : 0 : values[3] = ObjectIdGetDatum(gxact->owner);
785 : 0 : values[4] = ObjectIdGetDatum(proc->databaseId);
786 : :
787 : 0 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
788 : 0 : result = HeapTupleGetDatum(tuple);
789 : 0 : SRF_RETURN_NEXT(funcctx, result);
790 [ # # # ]: 0 : }
791 : :
792 [ + - ]: 11 : SRF_RETURN_DONE(funcctx);
793 [ - + ]: 11 : }
794 : :
795 : : /*
796 : : * TwoPhaseGetGXact
797 : : * Get the GlobalTransaction struct for a prepared transaction
798 : : * specified by XID
799 : : *
800 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
801 : : * caller had better hold it.
802 : : */
803 : : static GlobalTransaction
804 : 0 : TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
805 : : {
806 : 0 : GlobalTransaction result = NULL;
807 : 0 : int i;
808 : :
809 : : static FullTransactionId cached_fxid = {InvalidTransactionId};
810 : : static GlobalTransaction cached_gxact = NULL;
811 : :
812 [ # # # # ]: 0 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
813 : :
814 : : /*
815 : : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
816 : : * repeatedly for the same XID. We can save work with a simple cache.
817 : : */
818 [ # # ]: 0 : if (FullTransactionIdEquals(fxid, cached_fxid))
819 : 0 : return cached_gxact;
820 : :
821 [ # # ]: 0 : if (!lock_held)
822 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
823 : :
824 [ # # ]: 0 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
825 : : {
826 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
827 : :
828 [ # # ]: 0 : if (FullTransactionIdEquals(gxact->fxid, fxid))
829 : : {
830 : 0 : result = gxact;
831 : 0 : break;
832 : : }
833 [ # # # ]: 0 : }
834 : :
835 [ # # ]: 0 : if (!lock_held)
836 : 0 : LWLockRelease(TwoPhaseStateLock);
837 : :
838 [ # # ]: 0 : if (result == NULL) /* should not happen */
839 [ # # # # ]: 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u",
840 : : XidFromFullTransactionId(fxid));
841 : :
842 : 0 : cached_fxid = fxid;
843 : 0 : cached_gxact = result;
844 : :
845 : 0 : return result;
846 : 0 : }
847 : :
848 : : /*
849 : : * TwoPhaseGetXidByVirtualXID
850 : : * Lookup VXID among xacts prepared since last startup.
851 : : *
852 : : * (This won't find recovered xacts.) If more than one matches, return any
853 : : * and set "have_more" to true. To witness multiple matches, a single
854 : : * proc number must consume 2^32 LXIDs, with no intervening database restart.
855 : : */
856 : : TransactionId
857 : 0 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
858 : : bool *have_more)
859 : : {
860 : 0 : int i;
861 : 0 : TransactionId result = InvalidTransactionId;
862 : :
863 [ # # ]: 0 : Assert(VirtualTransactionIdIsValid(vxid));
864 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
865 : :
866 [ # # ]: 0 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
867 : : {
868 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
869 : 0 : PGPROC *proc;
870 : 0 : VirtualTransactionId proc_vxid;
871 : :
872 [ # # ]: 0 : if (!gxact->valid)
873 : 0 : continue;
874 : 0 : proc = GetPGProcByNumber(gxact->pgprocno);
875 : 0 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
876 [ # # # # ]: 0 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
877 : : {
878 : : /*
879 : : * Startup process sets proc->vxid.procNumber to
880 : : * INVALID_PROC_NUMBER.
881 : : */
882 [ # # ]: 0 : Assert(!gxact->inredo);
883 : :
884 [ # # ]: 0 : if (result != InvalidTransactionId)
885 : : {
886 : 0 : *have_more = true;
887 : 0 : break;
888 : : }
889 : 0 : result = XidFromFullTransactionId(gxact->fxid);
890 : 0 : }
891 [ # # # # ]: 0 : }
892 : :
893 : 0 : LWLockRelease(TwoPhaseStateLock);
894 : :
895 : 0 : return result;
896 : 0 : }
897 : :
898 : : /*
899 : : * TwoPhaseGetDummyProcNumber
900 : : * Get the dummy proc number for prepared transaction
901 : : *
902 : : * Dummy proc numbers are similar to proc numbers of real backends. They
903 : : * start at MaxBackends, and are unique across all currently active real
904 : : * backends and prepared transactions. If lock_held is set to true,
905 : : * TwoPhaseStateLock will not be taken, so the caller had better hold it.
906 : : */
907 : : ProcNumber
908 : 0 : TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
909 : : {
910 : 0 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
911 : :
912 : 0 : return gxact->pgprocno;
913 : 0 : }
914 : :
915 : : /*
916 : : * TwoPhaseGetDummyProc
917 : : * Get the PGPROC that represents a prepared transaction
918 : : *
919 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
920 : : * caller had better hold it.
921 : : */
922 : : PGPROC *
923 : 0 : TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
924 : : {
925 : 0 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
926 : :
927 : 0 : return GetPGProcByNumber(gxact->pgprocno);
928 : 0 : }
929 : :
930 : : /************************************************************************/
931 : : /* State file support */
932 : : /************************************************************************/
933 : :
934 : : /*
935 : : * Compute the FullTransactionId for the given TransactionId.
936 : : *
937 : : * This is safe if the xid has not yet reached COMMIT PREPARED or ROLLBACK
938 : : * PREPARED. After those commands, concurrent vac_truncate_clog() may make
939 : : * the xid cease to qualify as allowable. XXX Not all callers limit their
940 : : * calls accordingly.
941 : : */
942 : : static inline FullTransactionId
943 : 0 : AdjustToFullTransactionId(TransactionId xid)
944 : : {
945 [ # # ]: 0 : Assert(TransactionIdIsValid(xid));
946 : 0 : return FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
947 : : }
948 : :
949 : : static inline int
950 : 0 : TwoPhaseFilePath(char *path, FullTransactionId fxid)
951 : : {
952 : 0 : return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
953 : 0 : EpochFromFullTransactionId(fxid),
954 : 0 : XidFromFullTransactionId(fxid));
955 : : }
956 : :
957 : : /*
958 : : * 2PC state file format:
959 : : *
960 : : * 1. TwoPhaseFileHeader
961 : : * 2. TransactionId[] (subtransactions)
962 : : * 3. RelFileLocator[] (files to be deleted at commit)
963 : : * 4. RelFileLocator[] (files to be deleted at abort)
964 : : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
965 : : * 6. TwoPhaseRecordOnDisk
966 : : * 7. ...
967 : : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
968 : : * 9. checksum (CRC-32C)
969 : : *
970 : : * Each segment except the final checksum is MAXALIGN'd.
971 : : */
972 : :
973 : : /*
974 : : * Header for a 2PC state file
975 : : */
976 : : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
977 : :
978 : : typedef xl_xact_prepare TwoPhaseFileHeader;
979 : :
980 : : /*
981 : : * Header for each record in a state file
982 : : *
983 : : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
984 : : * The rmgr data will be stored starting on a MAXALIGN boundary.
985 : : */
986 : : typedef struct TwoPhaseRecordOnDisk
987 : : {
988 : : uint32 len; /* length of rmgr data */
989 : : TwoPhaseRmgrId rmid; /* resource manager for this record */
990 : : uint16 info; /* flag bits for use by rmgr */
991 : : } TwoPhaseRecordOnDisk;
992 : :
993 : : /*
994 : : * During prepare, the state file is assembled in memory before writing it
995 : : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
996 : : * for that.
997 : : */
998 : : typedef struct StateFileChunk
999 : : {
1000 : : char *data;
1001 : : uint32 len;
1002 : : struct StateFileChunk *next;
1003 : : } StateFileChunk;
1004 : :
1005 : : static struct xllist
1006 : : {
1007 : : StateFileChunk *head; /* first data block in the chain */
1008 : : StateFileChunk *tail; /* last block in chain */
1009 : : uint32 num_chunks;
1010 : : uint32 bytes_free; /* free bytes left in tail block */
1011 : : uint32 total_len; /* total data bytes in chain */
1012 : : } records;
1013 : :
1014 : :
1015 : : /*
1016 : : * Append a block of data to records data structure.
1017 : : *
1018 : : * NB: each block is padded to a MAXALIGN multiple. This must be
1019 : : * accounted for when the file is later read!
1020 : : *
1021 : : * The data is copied, so the caller is free to modify it afterwards.
1022 : : */
1023 : : static void
1024 : 0 : save_state_data(const void *data, uint32 len)
1025 : : {
1026 : 0 : uint32 padlen = MAXALIGN(len);
1027 : :
1028 [ # # ]: 0 : if (padlen > records.bytes_free)
1029 : : {
1030 : 0 : records.tail->next = palloc0_object(StateFileChunk);
1031 : 0 : records.tail = records.tail->next;
1032 : 0 : records.tail->len = 0;
1033 : 0 : records.tail->next = NULL;
1034 : 0 : records.num_chunks++;
1035 : :
1036 [ # # ]: 0 : records.bytes_free = Max(padlen, 512);
1037 : 0 : records.tail->data = palloc(records.bytes_free);
1038 : 0 : }
1039 : :
1040 : 0 : memcpy(records.tail->data + records.tail->len, data, len);
1041 : 0 : records.tail->len += padlen;
1042 : 0 : records.bytes_free -= padlen;
1043 : 0 : records.total_len += padlen;
1044 : 0 : }
1045 : :
1046 : : /*
1047 : : * Start preparing a state file.
1048 : : *
1049 : : * Initializes data structure and inserts the 2PC file header record.
1050 : : */
1051 : : void
1052 : 0 : StartPrepare(GlobalTransaction gxact)
1053 : : {
1054 : 0 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1055 : 0 : TransactionId xid = XidFromFullTransactionId(gxact->fxid);
1056 : 0 : TwoPhaseFileHeader hdr;
1057 : 0 : TransactionId *children;
1058 : 0 : RelFileLocator *commitrels;
1059 : 0 : RelFileLocator *abortrels;
1060 : 0 : xl_xact_stats_item *abortstats = NULL;
1061 : 0 : xl_xact_stats_item *commitstats = NULL;
1062 : 0 : SharedInvalidationMessage *invalmsgs;
1063 : :
1064 : : /* Initialize linked list */
1065 : 0 : records.head = palloc0_object(StateFileChunk);
1066 : 0 : records.head->len = 0;
1067 : 0 : records.head->next = NULL;
1068 : :
1069 : 0 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1070 : 0 : records.head->data = palloc(records.bytes_free);
1071 : :
1072 : 0 : records.tail = records.head;
1073 : 0 : records.num_chunks = 1;
1074 : :
1075 : 0 : records.total_len = 0;
1076 : :
1077 : : /* Create header */
1078 : 0 : hdr.magic = TWOPHASE_MAGIC;
1079 : 0 : hdr.total_len = 0; /* EndPrepare will fill this in */
1080 : 0 : hdr.xid = xid;
1081 : 0 : hdr.database = proc->databaseId;
1082 : 0 : hdr.prepared_at = gxact->prepared_at;
1083 : 0 : hdr.owner = gxact->owner;
1084 : 0 : hdr.nsubxacts = xactGetCommittedChildren(&children);
1085 : 0 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1086 : 0 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1087 : 0 : hdr.ncommitstats =
1088 : 0 : pgstat_get_transactional_drops(true, &commitstats);
1089 : 0 : hdr.nabortstats =
1090 : 0 : pgstat_get_transactional_drops(false, &abortstats);
1091 : 0 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1092 : 0 : &hdr.initfileinval);
1093 : 0 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1094 : : /* EndPrepare will fill the origin data, if necessary */
1095 : 0 : hdr.origin_lsn = InvalidXLogRecPtr;
1096 : 0 : hdr.origin_timestamp = 0;
1097 : :
1098 : 0 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1099 : 0 : save_state_data(gxact->gid, hdr.gidlen);
1100 : :
1101 : : /*
1102 : : * Add the additional info about subxacts, deletable files and cache
1103 : : * invalidation messages.
1104 : : */
1105 [ # # ]: 0 : if (hdr.nsubxacts > 0)
1106 : : {
1107 : 0 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1108 : : /* While we have the child-xact data, stuff it in the gxact too */
1109 : 0 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1110 : 0 : }
1111 [ # # ]: 0 : if (hdr.ncommitrels > 0)
1112 : : {
1113 : 0 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
1114 : 0 : pfree(commitrels);
1115 : 0 : }
1116 [ # # ]: 0 : if (hdr.nabortrels > 0)
1117 : : {
1118 : 0 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
1119 : 0 : pfree(abortrels);
1120 : 0 : }
1121 [ # # ]: 0 : if (hdr.ncommitstats > 0)
1122 : : {
1123 : 0 : save_state_data(commitstats,
1124 : 0 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
1125 : 0 : pfree(commitstats);
1126 : 0 : }
1127 [ # # ]: 0 : if (hdr.nabortstats > 0)
1128 : : {
1129 : 0 : save_state_data(abortstats,
1130 : 0 : hdr.nabortstats * sizeof(xl_xact_stats_item));
1131 : 0 : pfree(abortstats);
1132 : 0 : }
1133 [ # # ]: 0 : if (hdr.ninvalmsgs > 0)
1134 : : {
1135 : 0 : save_state_data(invalmsgs,
1136 : 0 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1137 : 0 : pfree(invalmsgs);
1138 : 0 : }
1139 : 0 : }
1140 : :
1141 : : /*
1142 : : * Finish preparing state data and writing it to WAL.
1143 : : */
1144 : : void
1145 : 0 : EndPrepare(GlobalTransaction gxact)
1146 : : {
1147 : 0 : TwoPhaseFileHeader *hdr;
1148 : 0 : StateFileChunk *record;
1149 : 0 : bool replorigin;
1150 : :
1151 : : /* Add the end sentinel to the list of 2PC records */
1152 : 0 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1153 : : NULL, 0);
1154 : :
1155 : : /* Go back and fill in total_len in the file header record */
1156 : 0 : hdr = (TwoPhaseFileHeader *) records.head->data;
1157 [ # # ]: 0 : Assert(hdr->magic == TWOPHASE_MAGIC);
1158 : 0 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
1159 : :
1160 [ # # ]: 0 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1161 : 0 : replorigin_session_origin != DoNotReplicateId);
1162 : :
1163 [ # # ]: 0 : if (replorigin)
1164 : : {
1165 : 0 : hdr->origin_lsn = replorigin_session_origin_lsn;
1166 : 0 : hdr->origin_timestamp = replorigin_session_origin_timestamp;
1167 : 0 : }
1168 : :
1169 : : /*
1170 : : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1171 : : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1172 : : * where we write data to file and then re-read at commit time.
1173 : : */
1174 [ # # ]: 0 : if (hdr->total_len > MaxAllocSize)
1175 [ # # # # ]: 0 : ereport(ERROR,
1176 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1177 : : errmsg("two-phase state file maximum length exceeded")));
1178 : :
1179 : : /*
1180 : : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1181 : : * cover us, so no need to calculate a separate CRC.
1182 : : *
1183 : : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1184 : : * starting immediately after the WAL record is inserted could complete
1185 : : * without fsync'ing our state file. (This is essentially the same kind
1186 : : * of race condition as the COMMIT-to-clog-write case that
1187 : : * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
1188 : : * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
1189 : : * the critical commit section. We need to know about such transactions
1190 : : * for conflict detection in logical replication. See
1191 : : * GetOldestActiveTransactionId(true, false) and its use.
1192 : : *
1193 : : * We save the PREPARE record's location in the gxact for later use by
1194 : : * CheckPointTwoPhase.
1195 : : */
1196 : 0 : XLogEnsureRecordSpace(0, records.num_chunks);
1197 : :
1198 : 0 : START_CRIT_SECTION();
1199 : :
1200 [ # # ]: 0 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1201 : 0 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1202 : :
1203 : 0 : XLogBeginInsert();
1204 [ # # ]: 0 : for (record = records.head; record != NULL; record = record->next)
1205 : 0 : XLogRegisterData(record->data, record->len);
1206 : :
1207 : 0 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1208 : :
1209 : 0 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1210 : :
1211 [ # # ]: 0 : if (replorigin)
1212 : : {
1213 : : /* Move LSNs forward for this replication origin */
1214 : 0 : replorigin_session_advance(replorigin_session_origin_lsn,
1215 : 0 : gxact->prepare_end_lsn);
1216 : 0 : }
1217 : :
1218 : 0 : XLogFlush(gxact->prepare_end_lsn);
1219 : :
1220 : : /* If we crash now, we have prepared: WAL replay will fix things */
1221 : :
1222 : : /* Store record's start location to read that later on Commit */
1223 : 0 : gxact->prepare_start_lsn = ProcLastRecPtr;
1224 : :
1225 : : /*
1226 : : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1227 : : * as not running our XID (which it will do immediately after this
1228 : : * function returns), others can commit/rollback the xact.
1229 : : *
1230 : : * NB: a side effect of this is to make a dummy ProcArray entry for the
1231 : : * prepared XID. This must happen before we clear the XID from MyProc /
1232 : : * ProcGlobal->xids[], else there is a window where the XID is not running
1233 : : * according to TransactionIdIsInProgress, and onlookers would be entitled
1234 : : * to assume the xact crashed. Instead we have a window where the same
1235 : : * XID appears twice in ProcArray, which is OK.
1236 : : */
1237 : 0 : MarkAsPrepared(gxact, false);
1238 : :
1239 : : /*
1240 : : * Now we can mark ourselves as out of the commit critical section: a
1241 : : * checkpoint starting after this will certainly see the gxact as a
1242 : : * candidate for fsyncing.
1243 : : */
1244 : 0 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1245 : :
1246 : : /*
1247 : : * Remember that we have this GlobalTransaction entry locked for us. If
1248 : : * we crash after this point, it's too late to abort, but we must unlock
1249 : : * it so that the prepared transaction can be committed or rolled back.
1250 : : */
1251 : 0 : MyLockedGxact = gxact;
1252 : :
1253 [ # # ]: 0 : END_CRIT_SECTION();
1254 : :
1255 : : /*
1256 : : * Wait for synchronous replication, if required.
1257 : : *
1258 : : * Note that at this stage we have marked the prepare, but still show as
1259 : : * running in the procarray (twice!) and continue to hold locks.
1260 : : */
1261 : 0 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1262 : :
1263 : 0 : records.tail = records.head = NULL;
1264 : 0 : records.num_chunks = 0;
1265 : 0 : }
1266 : :
1267 : : /*
1268 : : * Register a 2PC record to be written to state file.
1269 : : */
1270 : : void
1271 : 0 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1272 : : const void *data, uint32 len)
1273 : : {
1274 : 0 : TwoPhaseRecordOnDisk record;
1275 : :
1276 : 0 : record.rmid = rmid;
1277 : 0 : record.info = info;
1278 : 0 : record.len = len;
1279 : 0 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1280 [ # # ]: 0 : if (len > 0)
1281 : 0 : save_state_data(data, len);
1282 : 0 : }
1283 : :
1284 : :
1285 : : /*
1286 : : * Read and validate the state file for xid.
1287 : : *
1288 : : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1289 : : * contents of the file, issuing an error when finding corrupted data. If
1290 : : * missing_ok is true, which indicates that missing files can be safely
1291 : : * ignored, then return NULL. This state can be reached when doing recovery
1292 : : * after discarding two-phase files from frozen epochs.
1293 : : */
1294 : : static char *
1295 : 0 : ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
1296 : : {
1297 : 0 : char path[MAXPGPATH];
1298 : 0 : char *buf;
1299 : 0 : TwoPhaseFileHeader *hdr;
1300 : 0 : int fd;
1301 : 0 : struct stat stat;
1302 : 0 : uint32 crc_offset;
1303 : 0 : pg_crc32c calc_crc,
1304 : : file_crc;
1305 : 0 : int r;
1306 : :
1307 : 0 : TwoPhaseFilePath(path, fxid);
1308 : :
1309 : 0 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1310 [ # # ]: 0 : if (fd < 0)
1311 : : {
1312 [ # # ]: 0 : if (missing_ok && errno == ENOENT)
1313 : 0 : return NULL;
1314 : :
1315 [ # # # # ]: 0 : ereport(ERROR,
1316 : : (errcode_for_file_access(),
1317 : : errmsg("could not open file \"%s\": %m", path)));
1318 : 0 : }
1319 : :
1320 : : /*
1321 : : * Check file length. We can determine a lower bound pretty easily. We
1322 : : * set an upper bound to avoid palloc() failure on a corrupt file, though
1323 : : * we can't guarantee that we won't get an out of memory error anyway,
1324 : : * even on a valid file.
1325 : : */
1326 [ # # ]: 0 : if (fstat(fd, &stat))
1327 [ # # # # ]: 0 : ereport(ERROR,
1328 : : (errcode_for_file_access(),
1329 : : errmsg("could not stat file \"%s\": %m", path)));
1330 : :
1331 : 0 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1332 : : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1333 [ # # ]: 0 : sizeof(pg_crc32c)) ||
1334 : 0 : stat.st_size > MaxAllocSize)
1335 [ # # # # ]: 0 : ereport(ERROR,
1336 : : (errcode(ERRCODE_DATA_CORRUPTED),
1337 : : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1338 : : "incorrect size of file \"%s\": %lld bytes",
1339 : : (long long int) stat.st_size, path,
1340 : : (long long int) stat.st_size)));
1341 : :
1342 : 0 : crc_offset = stat.st_size - sizeof(pg_crc32c);
1343 [ # # ]: 0 : if (crc_offset != MAXALIGN(crc_offset))
1344 [ # # # # ]: 0 : ereport(ERROR,
1345 : : (errcode(ERRCODE_DATA_CORRUPTED),
1346 : : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1347 : : path)));
1348 : :
1349 : : /*
1350 : : * OK, slurp in the file.
1351 : : */
1352 : 0 : buf = (char *) palloc(stat.st_size);
1353 : :
1354 : 0 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1355 : 0 : r = read(fd, buf, stat.st_size);
1356 [ # # ]: 0 : if (r != stat.st_size)
1357 : : {
1358 [ # # ]: 0 : if (r < 0)
1359 [ # # # # ]: 0 : ereport(ERROR,
1360 : : (errcode_for_file_access(),
1361 : : errmsg("could not read file \"%s\": %m", path)));
1362 : : else
1363 [ # # # # ]: 0 : ereport(ERROR,
1364 : : (errmsg("could not read file \"%s\": read %d of %lld",
1365 : : path, r, (long long int) stat.st_size)));
1366 : 0 : }
1367 : :
1368 : 0 : pgstat_report_wait_end();
1369 : :
1370 [ # # ]: 0 : if (CloseTransientFile(fd) != 0)
1371 [ # # # # ]: 0 : ereport(ERROR,
1372 : : (errcode_for_file_access(),
1373 : : errmsg("could not close file \"%s\": %m", path)));
1374 : :
1375 : 0 : hdr = (TwoPhaseFileHeader *) buf;
1376 [ # # ]: 0 : if (hdr->magic != TWOPHASE_MAGIC)
1377 [ # # # # ]: 0 : ereport(ERROR,
1378 : : (errcode(ERRCODE_DATA_CORRUPTED),
1379 : : errmsg("invalid magic number stored in file \"%s\"",
1380 : : path)));
1381 : :
1382 [ # # ]: 0 : if (hdr->total_len != stat.st_size)
1383 [ # # # # ]: 0 : ereport(ERROR,
1384 : : (errcode(ERRCODE_DATA_CORRUPTED),
1385 : : errmsg("invalid size stored in file \"%s\"",
1386 : : path)));
1387 : :
1388 : 0 : INIT_CRC32C(calc_crc);
1389 : 0 : COMP_CRC32C(calc_crc, buf, crc_offset);
1390 : 0 : FIN_CRC32C(calc_crc);
1391 : :
1392 : 0 : file_crc = *((pg_crc32c *) (buf + crc_offset));
1393 : :
1394 [ # # ]: 0 : if (!EQ_CRC32C(calc_crc, file_crc))
1395 [ # # # # ]: 0 : ereport(ERROR,
1396 : : (errcode(ERRCODE_DATA_CORRUPTED),
1397 : : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1398 : : path)));
1399 : :
1400 : 0 : return buf;
1401 : 0 : }
1402 : :
1403 : :
1404 : : /*
1405 : : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1406 : : * twophase files and ReadTwoPhaseFile should be used instead.
1407 : : *
1408 : : * Note clearly that this function can access WAL during normal operation,
1409 : : * similarly to the way WALSender or Logical Decoding would do.
1410 : : */
1411 : : static void
1412 : 0 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1413 : : {
1414 : 0 : XLogRecord *record;
1415 : 0 : XLogReaderState *xlogreader;
1416 : 0 : char *errormsg;
1417 : :
1418 : 0 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1419 : 0 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1420 : : .segment_open = &wal_segment_open,
1421 : : .segment_close = &wal_segment_close),
1422 : : NULL);
1423 [ # # ]: 0 : if (!xlogreader)
1424 [ # # # # ]: 0 : ereport(ERROR,
1425 : : (errcode(ERRCODE_OUT_OF_MEMORY),
1426 : : errmsg("out of memory"),
1427 : : errdetail("Failed while allocating a WAL reading processor.")));
1428 : :
1429 : 0 : XLogBeginRead(xlogreader, lsn);
1430 : 0 : record = XLogReadRecord(xlogreader, &errormsg);
1431 : :
1432 [ # # ]: 0 : if (record == NULL)
1433 : : {
1434 [ # # ]: 0 : if (errormsg)
1435 [ # # # # ]: 0 : ereport(ERROR,
1436 : : (errcode_for_file_access(),
1437 : : errmsg("could not read two-phase state from WAL at %X/%08X: %s",
1438 : : LSN_FORMAT_ARGS(lsn), errormsg)));
1439 : : else
1440 [ # # # # ]: 0 : ereport(ERROR,
1441 : : (errcode_for_file_access(),
1442 : : errmsg("could not read two-phase state from WAL at %X/%08X",
1443 : : LSN_FORMAT_ARGS(lsn))));
1444 : 0 : }
1445 : :
1446 [ # # ]: 0 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1447 : 0 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1448 [ # # # # ]: 0 : ereport(ERROR,
1449 : : (errcode_for_file_access(),
1450 : : errmsg("expected two-phase state data is not present in WAL at %X/%08X",
1451 : : LSN_FORMAT_ARGS(lsn))));
1452 : :
1453 [ # # ]: 0 : if (len != NULL)
1454 : 0 : *len = XLogRecGetDataLen(xlogreader);
1455 : :
1456 : 0 : *buf = palloc_array(char, XLogRecGetDataLen(xlogreader));
1457 : 0 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1458 : :
1459 : 0 : XLogReaderFree(xlogreader);
1460 : 0 : }
1461 : :
1462 : :
1463 : : /*
1464 : : * Confirms an xid is prepared, during recovery
1465 : : */
1466 : : bool
1467 : 0 : StandbyTransactionIdIsPrepared(TransactionId xid)
1468 : : {
1469 : 0 : char *buf;
1470 : 0 : TwoPhaseFileHeader *hdr;
1471 : 0 : bool result;
1472 : 0 : FullTransactionId fxid;
1473 : :
1474 [ # # ]: 0 : Assert(TransactionIdIsValid(xid));
1475 : :
1476 [ # # ]: 0 : if (max_prepared_xacts <= 0)
1477 : 0 : return false; /* nothing to do */
1478 : :
1479 : : /* Read and validate file */
1480 : 0 : fxid = AdjustToFullTransactionId(xid);
1481 : 0 : buf = ReadTwoPhaseFile(fxid, true);
1482 [ # # ]: 0 : if (buf == NULL)
1483 : 0 : return false;
1484 : :
1485 : : /* Check header also */
1486 : 0 : hdr = (TwoPhaseFileHeader *) buf;
1487 : 0 : result = TransactionIdEquals(hdr->xid, xid);
1488 : 0 : pfree(buf);
1489 : :
1490 : 0 : return result;
1491 : 0 : }
1492 : :
1493 : : /*
1494 : : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1495 : : */
1496 : : void
1497 : 0 : FinishPreparedTransaction(const char *gid, bool isCommit)
1498 : : {
1499 : 0 : GlobalTransaction gxact;
1500 : 0 : PGPROC *proc;
1501 : 0 : FullTransactionId fxid;
1502 : 0 : TransactionId xid;
1503 : 0 : bool ondisk;
1504 : 0 : char *buf;
1505 : 0 : char *bufptr;
1506 : 0 : TwoPhaseFileHeader *hdr;
1507 : 0 : TransactionId latestXid;
1508 : 0 : TransactionId *children;
1509 : 0 : RelFileLocator *commitrels;
1510 : 0 : RelFileLocator *abortrels;
1511 : 0 : RelFileLocator *delrels;
1512 : 0 : int ndelrels;
1513 : 0 : xl_xact_stats_item *commitstats;
1514 : 0 : xl_xact_stats_item *abortstats;
1515 : 0 : SharedInvalidationMessage *invalmsgs;
1516 : :
1517 : : /*
1518 : : * Validate the GID, and lock the GXACT to ensure that two backends do not
1519 : : * try to commit the same GID at once.
1520 : : */
1521 : 0 : gxact = LockGXact(gid, GetUserId());
1522 : 0 : proc = GetPGProcByNumber(gxact->pgprocno);
1523 : 0 : fxid = gxact->fxid;
1524 : 0 : xid = XidFromFullTransactionId(fxid);
1525 : :
1526 : : /*
1527 : : * Read and validate 2PC state data. State data will typically be stored
1528 : : * in WAL files if the LSN is after the last checkpoint record, or moved
1529 : : * to disk if for some reason they have lived for a long time.
1530 : : */
1531 [ # # ]: 0 : if (gxact->ondisk)
1532 : 0 : buf = ReadTwoPhaseFile(fxid, false);
1533 : : else
1534 : 0 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1535 : :
1536 : :
1537 : : /*
1538 : : * Disassemble the header area
1539 : : */
1540 : 0 : hdr = (TwoPhaseFileHeader *) buf;
1541 [ # # ]: 0 : Assert(TransactionIdEquals(hdr->xid, xid));
1542 : 0 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1543 : 0 : bufptr += MAXALIGN(hdr->gidlen);
1544 : 0 : children = (TransactionId *) bufptr;
1545 : 0 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1546 : 0 : commitrels = (RelFileLocator *) bufptr;
1547 : 0 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1548 : 0 : abortrels = (RelFileLocator *) bufptr;
1549 : 0 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1550 : 0 : commitstats = (xl_xact_stats_item *) bufptr;
1551 : 0 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
1552 : 0 : abortstats = (xl_xact_stats_item *) bufptr;
1553 : 0 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
1554 : 0 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1555 : 0 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1556 : :
1557 : : /* compute latestXid among all children */
1558 : 0 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1559 : :
1560 : : /* Prevent cancel/die interrupt while cleaning up */
1561 : 0 : HOLD_INTERRUPTS();
1562 : :
1563 : : /*
1564 : : * The order of operations here is critical: make the XLOG entry for
1565 : : * commit or abort, then mark the transaction committed or aborted in
1566 : : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1567 : : * TransactionIdIsInProgress will stop saying the prepared xact is in
1568 : : * progress), then run the post-commit or post-abort callbacks. The
1569 : : * callbacks will release the locks the transaction held.
1570 : : */
1571 [ # # ]: 0 : if (isCommit)
1572 : 0 : RecordTransactionCommitPrepared(xid,
1573 : 0 : hdr->nsubxacts, children,
1574 : 0 : hdr->ncommitrels, commitrels,
1575 : 0 : hdr->ncommitstats,
1576 : 0 : commitstats,
1577 : 0 : hdr->ninvalmsgs, invalmsgs,
1578 : 0 : hdr->initfileinval, gid);
1579 : : else
1580 : 0 : RecordTransactionAbortPrepared(xid,
1581 : 0 : hdr->nsubxacts, children,
1582 : 0 : hdr->nabortrels, abortrels,
1583 : 0 : hdr->nabortstats,
1584 : 0 : abortstats,
1585 : 0 : gid);
1586 : :
1587 : 0 : ProcArrayRemove(proc, latestXid);
1588 : :
1589 : : /*
1590 : : * In case we fail while running the callbacks, mark the gxact invalid so
1591 : : * no one else will try to commit/rollback, and so it will be recycled if
1592 : : * we fail after this point. It is still locked by our backend so it
1593 : : * won't go away yet.
1594 : : *
1595 : : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1596 : : */
1597 : 0 : gxact->valid = false;
1598 : :
1599 : : /*
1600 : : * We have to remove any files that were supposed to be dropped. For
1601 : : * consistency with the regular xact.c code paths, must do this before
1602 : : * releasing locks, so do it before running the callbacks.
1603 : : *
1604 : : * NB: this code knows that we couldn't be dropping any temp rels ...
1605 : : */
1606 [ # # ]: 0 : if (isCommit)
1607 : : {
1608 : 0 : delrels = commitrels;
1609 : 0 : ndelrels = hdr->ncommitrels;
1610 : 0 : }
1611 : : else
1612 : : {
1613 : 0 : delrels = abortrels;
1614 : 0 : ndelrels = hdr->nabortrels;
1615 : : }
1616 : :
1617 : : /* Make sure files supposed to be dropped are dropped */
1618 : 0 : DropRelationFiles(delrels, ndelrels, false);
1619 : :
1620 [ # # ]: 0 : if (isCommit)
1621 : 0 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1622 : : else
1623 : 0 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1624 : :
1625 : : /*
1626 : : * Handle cache invalidation messages.
1627 : : *
1628 : : * Relcache init file invalidation requires processing both before and
1629 : : * after we send the SI messages, only when committing. See
1630 : : * AtEOXact_Inval().
1631 : : */
1632 [ # # ]: 0 : if (isCommit)
1633 : : {
1634 [ # # ]: 0 : if (hdr->initfileinval)
1635 : 0 : RelationCacheInitFilePreInvalidate();
1636 : 0 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1637 [ # # ]: 0 : if (hdr->initfileinval)
1638 : 0 : RelationCacheInitFilePostInvalidate();
1639 : 0 : }
1640 : :
1641 : : /*
1642 : : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1643 : : * while holding it to avoid potential conflicts with other transactions
1644 : : * attempting to use the same GID, so the lock is released once the shared
1645 : : * memory state is cleared.
1646 : : */
1647 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1648 : :
1649 : : /* And now do the callbacks */
1650 [ # # ]: 0 : if (isCommit)
1651 : 0 : ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks);
1652 : : else
1653 : 0 : ProcessRecords(bufptr, fxid, twophase_postabort_callbacks);
1654 : :
1655 : 0 : PredicateLockTwoPhaseFinish(fxid, isCommit);
1656 : :
1657 : : /*
1658 : : * Read this value while holding the two-phase lock, as the on-disk 2PC
1659 : : * file is physically removed after the lock is released.
1660 : : */
1661 : 0 : ondisk = gxact->ondisk;
1662 : :
1663 : : /* Clear shared memory state */
1664 : 0 : RemoveGXact(gxact);
1665 : :
1666 : : /*
1667 : : * Release the lock as all callbacks are called and shared memory cleanup
1668 : : * is done.
1669 : : */
1670 : 0 : LWLockRelease(TwoPhaseStateLock);
1671 : :
1672 : : /* Count the prepared xact as committed or aborted */
1673 : 0 : AtEOXact_PgStat(isCommit, false);
1674 : :
1675 : : /*
1676 : : * And now we can clean up any files we may have left.
1677 : : */
1678 [ # # ]: 0 : if (ondisk)
1679 : 0 : RemoveTwoPhaseFile(fxid, true);
1680 : :
1681 : 0 : MyLockedGxact = NULL;
1682 : :
1683 [ # # ]: 0 : RESUME_INTERRUPTS();
1684 : :
1685 : 0 : pfree(buf);
1686 : 0 : }
1687 : :
1688 : : /*
1689 : : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1690 : : */
1691 : : static void
1692 : 0 : ProcessRecords(char *bufptr, FullTransactionId fxid,
1693 : : const TwoPhaseCallback callbacks[])
1694 : : {
1695 : 0 : for (;;)
1696 : : {
1697 : 0 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1698 : :
1699 [ # # ]: 0 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1700 [ # # ]: 0 : if (record->rmid == TWOPHASE_RM_END_ID)
1701 : 0 : break;
1702 : :
1703 : 0 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1704 : :
1705 [ # # ]: 0 : if (callbacks[record->rmid] != NULL)
1706 : 0 : callbacks[record->rmid] (fxid, record->info, bufptr, record->len);
1707 : :
1708 : 0 : bufptr += MAXALIGN(record->len);
1709 [ # # # ]: 0 : }
1710 : 0 : }
1711 : :
1712 : : /*
1713 : : * Remove the 2PC file.
1714 : : *
1715 : : * If giveWarning is false, do not complain about file-not-present;
1716 : : * this is an expected case during WAL replay.
1717 : : *
1718 : : * This routine is used at early stages at recovery where future and
1719 : : * past orphaned files are checked, hence the FullTransactionId to build
1720 : : * a complete file name fit for the removal.
1721 : : */
1722 : : static void
1723 : 0 : RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
1724 : : {
1725 : 0 : char path[MAXPGPATH];
1726 : :
1727 : 0 : TwoPhaseFilePath(path, fxid);
1728 [ # # ]: 0 : if (unlink(path))
1729 [ # # # # ]: 0 : if (errno != ENOENT || giveWarning)
1730 [ # # # # ]: 0 : ereport(WARNING,
1731 : : (errcode_for_file_access(),
1732 : : errmsg("could not remove file \"%s\": %m", path)));
1733 : 0 : }
1734 : :
1735 : : /*
1736 : : * Recreates a state file. This is used in WAL replay and during
1737 : : * checkpoint creation.
1738 : : *
1739 : : * Note: content and len don't include CRC.
1740 : : */
1741 : : static void
1742 : 0 : RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
1743 : : {
1744 : 0 : char path[MAXPGPATH];
1745 : 0 : pg_crc32c statefile_crc;
1746 : 0 : int fd;
1747 : :
1748 : : /* Recompute CRC */
1749 : 0 : INIT_CRC32C(statefile_crc);
1750 : 0 : COMP_CRC32C(statefile_crc, content, len);
1751 : 0 : FIN_CRC32C(statefile_crc);
1752 : :
1753 : 0 : TwoPhaseFilePath(path, fxid);
1754 : :
1755 : 0 : fd = OpenTransientFile(path,
1756 : : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1757 [ # # ]: 0 : if (fd < 0)
1758 [ # # # # ]: 0 : ereport(ERROR,
1759 : : (errcode_for_file_access(),
1760 : : errmsg("could not recreate file \"%s\": %m", path)));
1761 : :
1762 : : /* Write content and CRC */
1763 : 0 : errno = 0;
1764 : 0 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1765 [ # # ]: 0 : if (write(fd, content, len) != len)
1766 : : {
1767 : : /* if write didn't set errno, assume problem is no disk space */
1768 [ # # ]: 0 : if (errno == 0)
1769 : 0 : errno = ENOSPC;
1770 [ # # # # ]: 0 : ereport(ERROR,
1771 : : (errcode_for_file_access(),
1772 : : errmsg("could not write file \"%s\": %m", path)));
1773 : 0 : }
1774 [ # # ]: 0 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1775 : : {
1776 : : /* if write didn't set errno, assume problem is no disk space */
1777 [ # # ]: 0 : if (errno == 0)
1778 : 0 : errno = ENOSPC;
1779 [ # # # # ]: 0 : ereport(ERROR,
1780 : : (errcode_for_file_access(),
1781 : : errmsg("could not write file \"%s\": %m", path)));
1782 : 0 : }
1783 : 0 : pgstat_report_wait_end();
1784 : :
1785 : : /*
1786 : : * We must fsync the file because the end-of-replay checkpoint will not do
1787 : : * so, there being no GXACT in shared memory yet to tell it to.
1788 : : */
1789 : 0 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1790 [ # # ]: 0 : if (pg_fsync(fd) != 0)
1791 [ # # # # ]: 0 : ereport(ERROR,
1792 : : (errcode_for_file_access(),
1793 : : errmsg("could not fsync file \"%s\": %m", path)));
1794 : 0 : pgstat_report_wait_end();
1795 : :
1796 [ # # ]: 0 : if (CloseTransientFile(fd) != 0)
1797 [ # # # # ]: 0 : ereport(ERROR,
1798 : : (errcode_for_file_access(),
1799 : : errmsg("could not close file \"%s\": %m", path)));
1800 : 0 : }
1801 : :
1802 : : /*
1803 : : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1804 : : *
1805 : : * We must fsync the state file of any GXACT that is valid or has been
1806 : : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1807 : : * horizon. (If the gxact isn't valid yet, has not been generated in
1808 : : * redo, or has a later LSN, this checkpoint is not responsible for
1809 : : * fsyncing it.)
1810 : : *
1811 : : * This is deliberately run as late as possible in the checkpoint sequence,
1812 : : * because GXACTs ordinarily have short lifespans, and so it is quite
1813 : : * possible that GXACTs that were valid at checkpoint start will no longer
1814 : : * exist if we wait a little bit. With typical checkpoint settings this
1815 : : * will be about 3 minutes for an online checkpoint, so as a result we
1816 : : * expect that there will be no GXACTs that need to be copied to disk.
1817 : : *
1818 : : * If a GXACT remains valid across multiple checkpoints, it will already
1819 : : * be on disk so we don't bother to repeat that write.
1820 : : */
1821 : : void
1822 : 7 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
1823 : : {
1824 : 7 : int i;
1825 : 7 : int serialized_xacts = 0;
1826 : :
1827 [ + + ]: 7 : if (max_prepared_xacts <= 0)
1828 : 6 : return; /* nothing to do */
1829 : :
1830 : 1 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1831 : :
1832 : : /*
1833 : : * We are expecting there to be zero GXACTs that need to be copied to
1834 : : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1835 : : * simplicity. This prevents any new xacts from preparing while this
1836 : : * occurs, which shouldn't be a problem since the presence of long-lived
1837 : : * prepared xacts indicates the transaction manager isn't active.
1838 : : *
1839 : : * It's also possible to move I/O out of the lock, but on every error we
1840 : : * should check whether somebody committed our transaction in different
1841 : : * backend. Let's leave this optimization for future, if somebody will
1842 : : * spot that this place cause bottleneck.
1843 : : *
1844 : : * Note that it isn't possible for there to be a GXACT with a
1845 : : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1846 : : * because of the efforts with delayChkptFlags.
1847 : : */
1848 : 1 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1849 [ - + ]: 1 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1850 : : {
1851 : : /*
1852 : : * Note that we are using gxact not PGPROC so this works in recovery
1853 : : * also
1854 : : */
1855 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1856 : :
1857 [ # # ]: 0 : if ((gxact->valid || gxact->inredo) &&
1858 [ # # # # ]: 0 : !gxact->ondisk &&
1859 : 0 : gxact->prepare_end_lsn <= redo_horizon)
1860 : : {
1861 : 0 : char *buf;
1862 : 0 : int len;
1863 : :
1864 : 0 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1865 : 0 : RecreateTwoPhaseFile(gxact->fxid, buf, len);
1866 : 0 : gxact->ondisk = true;
1867 : 0 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1868 : 0 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
1869 : 0 : pfree(buf);
1870 : 0 : serialized_xacts++;
1871 : 0 : }
1872 : 0 : }
1873 : 1 : LWLockRelease(TwoPhaseStateLock);
1874 : :
1875 : : /*
1876 : : * Flush unconditionally the parent directory to make any information
1877 : : * durable on disk. Two-phase files could have been removed and those
1878 : : * removals need to be made persistent as well as any files newly created
1879 : : * previously since the last checkpoint.
1880 : : */
1881 : 1 : fsync_fname(TWOPHASE_DIR, true);
1882 : :
1883 : 1 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1884 : :
1885 [ + - + - ]: 1 : if (log_checkpoints && serialized_xacts > 0)
1886 [ # # # # ]: 0 : ereport(LOG,
1887 : : (errmsg_plural("%u two-phase state file was written "
1888 : : "for a long-running prepared transaction",
1889 : : "%u two-phase state files were written "
1890 : : "for long-running prepared transactions",
1891 : : serialized_xacts,
1892 : : serialized_xacts)));
1893 [ - + ]: 7 : }
1894 : :
1895 : : /*
1896 : : * restoreTwoPhaseData
1897 : : *
1898 : : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1899 : : * This is called once at the beginning of recovery, saving any extra
1900 : : * lookups in the future. Two-phase files that are newer than the
1901 : : * minimum XID horizon are discarded on the way.
1902 : : */
1903 : : void
1904 : 4 : restoreTwoPhaseData(void)
1905 : : {
1906 : 4 : DIR *cldir;
1907 : 4 : struct dirent *clde;
1908 : :
1909 : 4 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1910 : 4 : cldir = AllocateDir(TWOPHASE_DIR);
1911 [ + + ]: 12 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1912 : : {
1913 [ - + # # ]: 8 : if (strlen(clde->d_name) == 16 &&
1914 : 0 : strspn(clde->d_name, "0123456789ABCDEF") == 16)
1915 : : {
1916 : 0 : FullTransactionId fxid;
1917 : 0 : char *buf;
1918 : :
1919 : 0 : fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1920 : :
1921 : 0 : buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
1922 : : true, false, false);
1923 [ # # ]: 0 : if (buf == NULL)
1924 : 0 : continue;
1925 : :
1926 : 0 : PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
1927 : : InvalidXLogRecPtr, InvalidRepOriginId);
1928 [ # # # ]: 0 : }
1929 : : }
1930 : 4 : LWLockRelease(TwoPhaseStateLock);
1931 : 4 : FreeDir(cldir);
1932 : 4 : }
1933 : :
1934 : : /*
1935 : : * PrescanPreparedTransactions
1936 : : *
1937 : : * Scan the shared memory entries of TwoPhaseState and determine the range
1938 : : * of valid XIDs present. This is run during database startup, after we
1939 : : * have completed reading WAL. TransamVariables->nextXid has been set to
1940 : : * one more than the highest XID for which evidence exists in WAL.
1941 : : *
1942 : : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1943 : : * are present, it suggests that the DBA has done a PITR recovery to an
1944 : : * earlier point in time without cleaning out pg_twophase. We dare not
1945 : : * try to recover such prepared xacts since they likely depend on database
1946 : : * state that doesn't exist now.
1947 : : *
1948 : : * However, we will advance nextXid beyond any subxact XIDs belonging to
1949 : : * valid prepared xacts. We need to do this since subxact commit doesn't
1950 : : * write a WAL entry, and so there might be no evidence in WAL of those
1951 : : * subxact XIDs.
1952 : : *
1953 : : * On corrupted two-phase files, fail immediately. Keeping around broken
1954 : : * entries and let replay continue causes harm on the system, and a new
1955 : : * backup should be rolled in.
1956 : : *
1957 : : * Our other responsibility is to determine and return the oldest valid XID
1958 : : * among the prepared xacts (if none, return TransamVariables->nextXid).
1959 : : * This is needed to synchronize pg_subtrans startup properly.
1960 : : *
1961 : : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1962 : : * top-level xids is stored in *xids_p. The number of entries in the array
1963 : : * is returned in *nxids_p.
1964 : : */
1965 : : TransactionId
1966 : 4 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1967 : : {
1968 : 4 : FullTransactionId nextXid = TransamVariables->nextXid;
1969 : 4 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
1970 : 4 : TransactionId result = origNextXid;
1971 : 4 : TransactionId *xids = NULL;
1972 : 4 : int nxids = 0;
1973 : 4 : int allocsize = 0;
1974 : 4 : int i;
1975 : :
1976 : 4 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1977 [ + - ]: 4 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1978 : : {
1979 : 0 : TransactionId xid;
1980 : 0 : char *buf;
1981 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1982 : :
1983 [ # # ]: 0 : Assert(gxact->inredo);
1984 : :
1985 : 0 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
1986 : 0 : gxact->prepare_start_lsn,
1987 : 0 : gxact->ondisk, false, true);
1988 : :
1989 [ # # ]: 0 : if (buf == NULL)
1990 : 0 : continue;
1991 : :
1992 : : /*
1993 : : * OK, we think this file is valid. Incorporate xid into the
1994 : : * running-minimum result.
1995 : : */
1996 : 0 : xid = XidFromFullTransactionId(gxact->fxid);
1997 [ # # ]: 0 : if (TransactionIdPrecedes(xid, result))
1998 : 0 : result = xid;
1999 : :
2000 [ # # ]: 0 : if (xids_p)
2001 : : {
2002 [ # # ]: 0 : if (nxids == allocsize)
2003 : : {
2004 [ # # ]: 0 : if (nxids == 0)
2005 : : {
2006 : 0 : allocsize = 10;
2007 : 0 : xids = palloc(allocsize * sizeof(TransactionId));
2008 : 0 : }
2009 : : else
2010 : : {
2011 : 0 : allocsize = allocsize * 2;
2012 : 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
2013 : : }
2014 : 0 : }
2015 : 0 : xids[nxids++] = xid;
2016 : 0 : }
2017 : :
2018 : 0 : pfree(buf);
2019 [ # # # ]: 0 : }
2020 : 4 : LWLockRelease(TwoPhaseStateLock);
2021 : :
2022 [ - + ]: 4 : if (xids_p)
2023 : : {
2024 : 0 : *xids_p = xids;
2025 : 0 : *nxids_p = nxids;
2026 : 0 : }
2027 : :
2028 : 8 : return result;
2029 : 4 : }
2030 : :
2031 : : /*
2032 : : * StandbyRecoverPreparedTransactions
2033 : : *
2034 : : * Scan the shared memory entries of TwoPhaseState and setup all the required
2035 : : * information to allow standby queries to treat prepared transactions as still
2036 : : * active.
2037 : : *
2038 : : * This is never called at the end of recovery - we use
2039 : : * RecoverPreparedTransactions() at that point.
2040 : : *
2041 : : * This updates pg_subtrans, so that any subtransactions will be correctly
2042 : : * seen as in-progress in snapshots taken during recovery.
2043 : : */
2044 : : void
2045 : 0 : StandbyRecoverPreparedTransactions(void)
2046 : : {
2047 : 0 : int i;
2048 : :
2049 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2050 [ # # ]: 0 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2051 : : {
2052 : 0 : char *buf;
2053 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2054 : :
2055 [ # # ]: 0 : Assert(gxact->inredo);
2056 : :
2057 : 0 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2058 : 0 : gxact->prepare_start_lsn,
2059 : 0 : gxact->ondisk, true, false);
2060 [ # # ]: 0 : if (buf != NULL)
2061 : 0 : pfree(buf);
2062 : 0 : }
2063 : 0 : LWLockRelease(TwoPhaseStateLock);
2064 : 0 : }
2065 : :
2066 : : /*
2067 : : * RecoverPreparedTransactions
2068 : : *
2069 : : * Scan the shared memory entries of TwoPhaseState and reload the state for
2070 : : * each prepared transaction (reacquire locks, etc).
2071 : : *
2072 : : * This is run at the end of recovery, but before we allow backends to write
2073 : : * WAL.
2074 : : *
2075 : : * At the end of recovery the way we take snapshots will change. We now need
2076 : : * to mark all running transactions with their full SubTransSetParent() info
2077 : : * to allow normal snapshots to work correctly if snapshots overflow.
2078 : : * We do this here because by definition prepared transactions are the only
2079 : : * type of write transaction still running, so this is necessary and
2080 : : * complete.
2081 : : */
2082 : : void
2083 : 4 : RecoverPreparedTransactions(void)
2084 : : {
2085 : 4 : int i;
2086 : :
2087 : 4 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2088 [ - + ]: 4 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2089 : : {
2090 : 0 : char *buf;
2091 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2092 : 0 : FullTransactionId fxid = gxact->fxid;
2093 : 0 : char *bufptr;
2094 : 0 : TwoPhaseFileHeader *hdr;
2095 : 0 : TransactionId *subxids;
2096 : 0 : const char *gid;
2097 : :
2098 : : /*
2099 : : * Reconstruct subtrans state for the transaction --- needed because
2100 : : * pg_subtrans is not preserved over a restart. Note that we are
2101 : : * linking all the subtransactions directly to the top-level XID;
2102 : : * there may originally have been a more complex hierarchy, but
2103 : : * there's no need to restore that exactly. It's possible that
2104 : : * SubTransSetParent has been set before, if the prepared transaction
2105 : : * generated xid assignment records.
2106 : : */
2107 : 0 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2108 : 0 : gxact->prepare_start_lsn,
2109 : 0 : gxact->ondisk, true, false);
2110 [ # # ]: 0 : if (buf == NULL)
2111 : 0 : continue;
2112 : :
2113 [ # # # # ]: 0 : ereport(LOG,
2114 : : (errmsg("recovering prepared transaction %u of epoch %u from shared memory",
2115 : : XidFromFullTransactionId(gxact->fxid),
2116 : : EpochFromFullTransactionId(gxact->fxid))));
2117 : :
2118 : 0 : hdr = (TwoPhaseFileHeader *) buf;
2119 [ # # ]: 0 : Assert(TransactionIdEquals(hdr->xid,
2120 : : XidFromFullTransactionId(gxact->fxid)));
2121 : 0 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2122 : 0 : gid = (const char *) bufptr;
2123 : 0 : bufptr += MAXALIGN(hdr->gidlen);
2124 : 0 : subxids = (TransactionId *) bufptr;
2125 : 0 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2126 : 0 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2127 : 0 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
2128 : 0 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2129 : 0 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
2130 : 0 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2131 : :
2132 : : /*
2133 : : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2134 : : * added in redo and already has a shmem entry for it.
2135 : : */
2136 : 0 : MarkAsPreparingGuts(gxact, gxact->fxid, gid,
2137 : 0 : hdr->prepared_at,
2138 : 0 : hdr->owner, hdr->database);
2139 : :
2140 : : /* recovered, so reset the flag for entries generated by redo */
2141 : 0 : gxact->inredo = false;
2142 : :
2143 : 0 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2144 : 0 : MarkAsPrepared(gxact, true);
2145 : :
2146 : 0 : LWLockRelease(TwoPhaseStateLock);
2147 : :
2148 : : /*
2149 : : * Recover other state (notably locks) using resource managers.
2150 : : */
2151 : 0 : ProcessRecords(bufptr, fxid, twophase_recover_callbacks);
2152 : :
2153 : : /*
2154 : : * Release locks held by the standby process after we process each
2155 : : * prepared transaction. As a result, we don't need too many
2156 : : * additional locks at any one time.
2157 : : */
2158 [ # # ]: 0 : if (InHotStandby)
2159 : 0 : StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
2160 : :
2161 : : /*
2162 : : * We're done with recovering this transaction. Clear MyLockedGxact,
2163 : : * like we do in PrepareTransaction() during normal operation.
2164 : : */
2165 : 0 : PostPrepare_Twophase();
2166 : :
2167 : 0 : pfree(buf);
2168 : :
2169 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2170 [ # # # ]: 0 : }
2171 : :
2172 : 4 : LWLockRelease(TwoPhaseStateLock);
2173 : 4 : }
2174 : :
2175 : : /*
2176 : : * ProcessTwoPhaseBuffer
2177 : : *
2178 : : * Given a FullTransactionId, read it either from disk or read it directly
2179 : : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2180 : : *
2181 : : * If setParent is true, set up subtransaction parent linkages.
2182 : : *
2183 : : * If setNextXid is true, set TransamVariables->nextXid to the newest
2184 : : * value scanned.
2185 : : */
2186 : : static char *
2187 : 0 : ProcessTwoPhaseBuffer(FullTransactionId fxid,
2188 : : XLogRecPtr prepare_start_lsn,
2189 : : bool fromdisk,
2190 : : bool setParent, bool setNextXid)
2191 : : {
2192 : 0 : FullTransactionId nextXid = TransamVariables->nextXid;
2193 : 0 : TransactionId *subxids;
2194 : 0 : char *buf;
2195 : 0 : TwoPhaseFileHeader *hdr;
2196 : 0 : int i;
2197 : :
2198 [ # # ]: 0 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2199 : :
2200 [ # # ]: 0 : if (!fromdisk)
2201 [ # # ]: 0 : Assert(XLogRecPtrIsValid(prepare_start_lsn));
2202 : :
2203 : : /* Already processed? */
2204 [ # # # # ]: 0 : if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
2205 : 0 : TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
2206 : : {
2207 [ # # ]: 0 : if (fromdisk)
2208 : : {
2209 [ # # # # ]: 0 : ereport(WARNING,
2210 : : (errmsg("removing stale two-phase state file for transaction %u of epoch %u",
2211 : : XidFromFullTransactionId(fxid),
2212 : : EpochFromFullTransactionId(fxid))));
2213 : 0 : RemoveTwoPhaseFile(fxid, true);
2214 : 0 : }
2215 : : else
2216 : : {
2217 [ # # # # ]: 0 : ereport(WARNING,
2218 : : (errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
2219 : : XidFromFullTransactionId(fxid),
2220 : : EpochFromFullTransactionId(fxid))));
2221 : 0 : PrepareRedoRemoveFull(fxid, true);
2222 : : }
2223 : 0 : return NULL;
2224 : : }
2225 : :
2226 : : /* Reject XID if too new */
2227 [ # # ]: 0 : if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
2228 : : {
2229 [ # # ]: 0 : if (fromdisk)
2230 : : {
2231 [ # # # # ]: 0 : ereport(WARNING,
2232 : : (errmsg("removing future two-phase state file for transaction %u of epoch %u",
2233 : : XidFromFullTransactionId(fxid),
2234 : : EpochFromFullTransactionId(fxid))));
2235 : 0 : RemoveTwoPhaseFile(fxid, true);
2236 : 0 : }
2237 : : else
2238 : : {
2239 [ # # # # ]: 0 : ereport(WARNING,
2240 : : (errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
2241 : : XidFromFullTransactionId(fxid),
2242 : : EpochFromFullTransactionId(fxid))));
2243 : 0 : PrepareRedoRemoveFull(fxid, true);
2244 : : }
2245 : 0 : return NULL;
2246 : : }
2247 : :
2248 [ # # ]: 0 : if (fromdisk)
2249 : : {
2250 : : /* Read and validate file */
2251 : 0 : buf = ReadTwoPhaseFile(fxid, false);
2252 : 0 : }
2253 : : else
2254 : : {
2255 : : /* Read xlog data */
2256 : 0 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2257 : : }
2258 : :
2259 : : /* Deconstruct header */
2260 : 0 : hdr = (TwoPhaseFileHeader *) buf;
2261 [ # # ]: 0 : if (!TransactionIdEquals(hdr->xid, XidFromFullTransactionId(fxid)))
2262 : : {
2263 [ # # ]: 0 : if (fromdisk)
2264 [ # # # # ]: 0 : ereport(ERROR,
2265 : : (errcode(ERRCODE_DATA_CORRUPTED),
2266 : : errmsg("corrupted two-phase state file for transaction %u of epoch %u",
2267 : : XidFromFullTransactionId(fxid),
2268 : : EpochFromFullTransactionId(fxid))));
2269 : : else
2270 [ # # # # ]: 0 : ereport(ERROR,
2271 : : (errcode(ERRCODE_DATA_CORRUPTED),
2272 : : errmsg("corrupted two-phase state in memory for transaction %u of epoch %u",
2273 : : XidFromFullTransactionId(fxid),
2274 : : EpochFromFullTransactionId(fxid))));
2275 : 0 : }
2276 : :
2277 : : /*
2278 : : * Examine subtransaction XIDs ... they should all follow main XID, and
2279 : : * they may force us to advance nextXid.
2280 : : */
2281 : 0 : subxids = (TransactionId *) (buf +
2282 : 0 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2283 : 0 : MAXALIGN(hdr->gidlen));
2284 [ # # ]: 0 : for (i = 0; i < hdr->nsubxacts; i++)
2285 : : {
2286 : 0 : TransactionId subxid = subxids[i];
2287 : :
2288 [ # # ]: 0 : Assert(TransactionIdFollows(subxid, XidFromFullTransactionId(fxid)));
2289 : :
2290 : : /* update nextXid if needed */
2291 [ # # ]: 0 : if (setNextXid)
2292 : 0 : AdvanceNextFullTransactionIdPastXid(subxid);
2293 : :
2294 [ # # ]: 0 : if (setParent)
2295 : 0 : SubTransSetParent(subxid, XidFromFullTransactionId(fxid));
2296 : 0 : }
2297 : :
2298 : 0 : return buf;
2299 : 0 : }
2300 : :
2301 : :
2302 : : /*
2303 : : * RecordTransactionCommitPrepared
2304 : : *
2305 : : * This is basically the same as RecordTransactionCommit (q.v. if you change
2306 : : * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
2307 : : * race condition.
2308 : : *
2309 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2310 : : * so it is never possible to optimize out the commit record.
2311 : : */
2312 : : static void
2313 : 0 : RecordTransactionCommitPrepared(TransactionId xid,
2314 : : int nchildren,
2315 : : TransactionId *children,
2316 : : int nrels,
2317 : : RelFileLocator *rels,
2318 : : int nstats,
2319 : : xl_xact_stats_item *stats,
2320 : : int ninvalmsgs,
2321 : : SharedInvalidationMessage *invalmsgs,
2322 : : bool initfileinval,
2323 : : const char *gid)
2324 : : {
2325 : 0 : XLogRecPtr recptr;
2326 : 0 : TimestampTz committs;
2327 : 0 : bool replorigin;
2328 : :
2329 : : /*
2330 : : * Are we using the replication origins feature? Or, in other words, are
2331 : : * we replaying remote actions?
2332 : : */
2333 [ # # ]: 0 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2334 : 0 : replorigin_session_origin != DoNotReplicateId);
2335 : :
2336 : : /* Load the injection point before entering the critical section */
2337 : : INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
2338 : :
2339 : 0 : START_CRIT_SECTION();
2340 : :
2341 : : /* See notes in RecordTransactionCommit */
2342 [ # # ]: 0 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
2343 : 0 : MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
2344 : :
2345 : : INJECTION_POINT_CACHED("commit-after-delay-checkpoint", NULL);
2346 : :
2347 : : /*
2348 : : * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
2349 : : * commit time is written.
2350 : : */
2351 : 0 : pg_write_barrier();
2352 : :
2353 : : /*
2354 : : * Note it is important to set committs value after marking ourselves as
2355 : : * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
2356 : : * we want to ensure all transactions that have acquired commit timestamp
2357 : : * are finished before we allow the logical replication client to advance
2358 : : * its xid which is used to hold back dead rows for conflict detection.
2359 : : * See comments atop worker.c.
2360 : : */
2361 : 0 : committs = GetCurrentTimestamp();
2362 : :
2363 : : /*
2364 : : * Emit the XLOG commit record. Note that we mark 2PC commits as
2365 : : * potentially having AccessExclusiveLocks since we don't know whether or
2366 : : * not they do.
2367 : : */
2368 : 0 : recptr = XactLogCommitRecord(committs,
2369 : 0 : nchildren, children, nrels, rels,
2370 : 0 : nstats, stats,
2371 : 0 : ninvalmsgs, invalmsgs,
2372 : 0 : initfileinval,
2373 : 0 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2374 : 0 : xid, gid);
2375 : :
2376 : :
2377 [ # # ]: 0 : if (replorigin)
2378 : : /* Move LSNs forward for this replication origin */
2379 : 0 : replorigin_session_advance(replorigin_session_origin_lsn,
2380 : 0 : XactLastRecEnd);
2381 : :
2382 : : /*
2383 : : * Record commit timestamp. The value comes from plain commit timestamp
2384 : : * if replorigin is not enabled, or replorigin already set a value for us
2385 : : * in replorigin_session_origin_timestamp otherwise.
2386 : : *
2387 : : * We don't need to WAL-log anything here, as the commit record written
2388 : : * above already contains the data.
2389 : : */
2390 [ # # # # ]: 0 : if (!replorigin || replorigin_session_origin_timestamp == 0)
2391 : 0 : replorigin_session_origin_timestamp = committs;
2392 : :
2393 : 0 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2394 : 0 : replorigin_session_origin_timestamp,
2395 : 0 : replorigin_session_origin);
2396 : :
2397 : : /*
2398 : : * We don't currently try to sleep before flush here ... nor is there any
2399 : : * support for async commit of a prepared xact (the very idea is probably
2400 : : * a contradiction)
2401 : : */
2402 : :
2403 : : /* Flush XLOG to disk */
2404 : 0 : XLogFlush(recptr);
2405 : :
2406 : : /* Mark the transaction committed in pg_xact */
2407 : 0 : TransactionIdCommitTree(xid, nchildren, children);
2408 : :
2409 : : /* Checkpoint can proceed now */
2410 : 0 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
2411 : :
2412 [ # # ]: 0 : END_CRIT_SECTION();
2413 : :
2414 : : /*
2415 : : * Wait for synchronous replication, if required.
2416 : : *
2417 : : * Note that at this stage we have marked clog, but still show as running
2418 : : * in the procarray and continue to hold locks.
2419 : : */
2420 : 0 : SyncRepWaitForLSN(recptr, true);
2421 : 0 : }
2422 : :
2423 : : /*
2424 : : * RecordTransactionAbortPrepared
2425 : : *
2426 : : * This is basically the same as RecordTransactionAbort.
2427 : : *
2428 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2429 : : * so it is never possible to optimize out the abort record.
2430 : : */
2431 : : static void
2432 : 0 : RecordTransactionAbortPrepared(TransactionId xid,
2433 : : int nchildren,
2434 : : TransactionId *children,
2435 : : int nrels,
2436 : : RelFileLocator *rels,
2437 : : int nstats,
2438 : : xl_xact_stats_item *stats,
2439 : : const char *gid)
2440 : : {
2441 : 0 : XLogRecPtr recptr;
2442 : 0 : bool replorigin;
2443 : :
2444 : : /*
2445 : : * Are we using the replication origins feature? Or, in other words, are
2446 : : * we replaying remote actions?
2447 : : */
2448 [ # # ]: 0 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2449 : 0 : replorigin_session_origin != DoNotReplicateId);
2450 : :
2451 : : /*
2452 : : * Catch the scenario where we aborted partway through
2453 : : * RecordTransactionCommitPrepared ...
2454 : : */
2455 [ # # ]: 0 : if (TransactionIdDidCommit(xid))
2456 [ # # # # ]: 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
2457 : : xid);
2458 : :
2459 : 0 : START_CRIT_SECTION();
2460 : :
2461 : : /*
2462 : : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2463 : : * potentially having AccessExclusiveLocks since we don't know whether or
2464 : : * not they do.
2465 : : */
2466 : 0 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2467 : 0 : nchildren, children,
2468 : 0 : nrels, rels,
2469 : 0 : nstats, stats,
2470 : 0 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2471 : 0 : xid, gid);
2472 : :
2473 [ # # ]: 0 : if (replorigin)
2474 : : /* Move LSNs forward for this replication origin */
2475 : 0 : replorigin_session_advance(replorigin_session_origin_lsn,
2476 : 0 : XactLastRecEnd);
2477 : :
2478 : : /* Always flush, since we're about to remove the 2PC state file */
2479 : 0 : XLogFlush(recptr);
2480 : :
2481 : : /*
2482 : : * Mark the transaction aborted in clog. This is not absolutely necessary
2483 : : * but we may as well do it while we are here.
2484 : : */
2485 : 0 : TransactionIdAbortTree(xid, nchildren, children);
2486 : :
2487 [ # # ]: 0 : END_CRIT_SECTION();
2488 : :
2489 : : /*
2490 : : * Wait for synchronous replication, if required.
2491 : : *
2492 : : * Note that at this stage we have marked clog, but still show as running
2493 : : * in the procarray and continue to hold locks.
2494 : : */
2495 : 0 : SyncRepWaitForLSN(recptr, false);
2496 : 0 : }
2497 : :
2498 : : /*
2499 : : * PrepareRedoAdd
2500 : : *
2501 : : * Store pointers to the start/end of the WAL record along with the xid in
2502 : : * a gxact entry in shared memory TwoPhaseState structure. If caller
2503 : : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2504 : : * data, the entry is marked as located on disk.
2505 : : */
2506 : : void
2507 : 0 : PrepareRedoAdd(FullTransactionId fxid, char *buf,
2508 : : XLogRecPtr start_lsn, XLogRecPtr end_lsn,
2509 : : RepOriginId origin_id)
2510 : : {
2511 : 0 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2512 : 0 : char *bufptr;
2513 : 0 : const char *gid;
2514 : 0 : GlobalTransaction gxact;
2515 : :
2516 [ # # ]: 0 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2517 [ # # ]: 0 : Assert(RecoveryInProgress());
2518 : :
2519 [ # # ]: 0 : if (!FullTransactionIdIsValid(fxid))
2520 : : {
2521 [ # # ]: 0 : Assert(InRecovery);
2522 : 0 : fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
2523 : 0 : hdr->xid);
2524 : 0 : }
2525 : :
2526 : 0 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2527 : 0 : gid = (const char *) bufptr;
2528 : :
2529 : : /*
2530 : : * Reserve the GID for the given transaction in the redo code path.
2531 : : *
2532 : : * This creates a gxact struct and puts it into the active array.
2533 : : *
2534 : : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2535 : : * shared memory. Hence, we only fill up the bare minimum contents here.
2536 : : * The gxact also gets marked with gxact->inredo set to true to indicate
2537 : : * that it got added in the redo phase
2538 : : */
2539 : :
2540 : : /*
2541 : : * In the event of a crash while a checkpoint was running, it may be
2542 : : * possible that some two-phase data found its way to disk while its
2543 : : * corresponding record needs to be replayed in the follow-up recovery. As
2544 : : * the 2PC data was on disk, it has already been restored at the beginning
2545 : : * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2546 : : * duplicates in TwoPhaseState. If a consistent state has been reached,
2547 : : * the record is added to TwoPhaseState and it should have no
2548 : : * corresponding file in pg_twophase.
2549 : : */
2550 [ # # ]: 0 : if (XLogRecPtrIsValid(start_lsn))
2551 : : {
2552 : 0 : char path[MAXPGPATH];
2553 : :
2554 [ # # ]: 0 : Assert(InRecovery);
2555 : 0 : TwoPhaseFilePath(path, fxid);
2556 : :
2557 [ # # ]: 0 : if (access(path, F_OK) == 0)
2558 : : {
2559 [ # # # # : 0 : ereport(reachedConsistency ? ERROR : WARNING,
# # # # #
# ]
2560 : : (errmsg("could not recover two-phase state file for transaction %u",
2561 : : hdr->xid),
2562 : : errdetail("Two-phase state file has been found in WAL record %X/%08X, but this transaction has already been restored from disk.",
2563 : : LSN_FORMAT_ARGS(start_lsn))));
2564 : 0 : return;
2565 : : }
2566 : :
2567 [ # # ]: 0 : if (errno != ENOENT)
2568 [ # # # # ]: 0 : ereport(ERROR,
2569 : : (errcode_for_file_access(),
2570 : : errmsg("could not access file \"%s\": %m", path)));
2571 [ # # ]: 0 : }
2572 : :
2573 : : /* Get a free gxact from the freelist */
2574 [ # # ]: 0 : if (TwoPhaseState->freeGXacts == NULL)
2575 [ # # # # ]: 0 : ereport(ERROR,
2576 : : (errcode(ERRCODE_OUT_OF_MEMORY),
2577 : : errmsg("maximum number of prepared transactions reached"),
2578 : : errhint("Increase \"max_prepared_transactions\" (currently %d).",
2579 : : max_prepared_xacts)));
2580 : 0 : gxact = TwoPhaseState->freeGXacts;
2581 : 0 : TwoPhaseState->freeGXacts = gxact->next;
2582 : :
2583 : 0 : gxact->prepared_at = hdr->prepared_at;
2584 : 0 : gxact->prepare_start_lsn = start_lsn;
2585 : 0 : gxact->prepare_end_lsn = end_lsn;
2586 : 0 : gxact->fxid = fxid;
2587 : 0 : gxact->owner = hdr->owner;
2588 : 0 : gxact->locking_backend = INVALID_PROC_NUMBER;
2589 : 0 : gxact->valid = false;
2590 : 0 : gxact->ondisk = !XLogRecPtrIsValid(start_lsn);
2591 : 0 : gxact->inredo = true; /* yes, added in redo */
2592 : 0 : strcpy(gxact->gid, gid);
2593 : :
2594 : : /* And insert it into the active array */
2595 [ # # ]: 0 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2596 : 0 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2597 : :
2598 [ # # ]: 0 : if (origin_id != InvalidRepOriginId)
2599 : : {
2600 : : /* recover apply progress */
2601 : 0 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2602 : : false /* backward */ , false /* WAL */ );
2603 : 0 : }
2604 : :
2605 [ # # # # ]: 0 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
2606 : : XidFromFullTransactionId(gxact->fxid),
2607 : : EpochFromFullTransactionId(gxact->fxid));
2608 [ # # ]: 0 : }
2609 : :
2610 : : /*
2611 : : * PrepareRedoRemoveFull
2612 : : *
2613 : : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2614 : : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2615 : : *
2616 : : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2617 : : * is updated.
2618 : : */
2619 : : static void
2620 : 0 : PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
2621 : : {
2622 : 0 : GlobalTransaction gxact = NULL;
2623 : 0 : int i;
2624 : 0 : bool found = false;
2625 : :
2626 [ # # ]: 0 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2627 [ # # ]: 0 : Assert(RecoveryInProgress());
2628 : :
2629 [ # # ]: 0 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2630 : : {
2631 : 0 : gxact = TwoPhaseState->prepXacts[i];
2632 : :
2633 [ # # ]: 0 : if (FullTransactionIdEquals(gxact->fxid, fxid))
2634 : : {
2635 [ # # ]: 0 : Assert(gxact->inredo);
2636 : 0 : found = true;
2637 : 0 : break;
2638 : : }
2639 : 0 : }
2640 : :
2641 : : /*
2642 : : * Just leave if there is nothing, this is expected during WAL replay.
2643 : : */
2644 [ # # ]: 0 : if (!found)
2645 : 0 : return;
2646 : :
2647 : : /*
2648 : : * And now we can clean up any files we may have left.
2649 : : */
2650 [ # # # # ]: 0 : elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
2651 : : XidFromFullTransactionId(fxid),
2652 : : EpochFromFullTransactionId(fxid));
2653 : :
2654 [ # # ]: 0 : if (gxact->ondisk)
2655 : 0 : RemoveTwoPhaseFile(fxid, giveWarning);
2656 : :
2657 : 0 : RemoveGXact(gxact);
2658 [ # # ]: 0 : }
2659 : :
2660 : : /*
2661 : : * Wrapper of PrepareRedoRemoveFull(), for TransactionIds.
2662 : : */
2663 : : void
2664 : 0 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2665 : : {
2666 : 0 : FullTransactionId fxid =
2667 : 0 : FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid);
2668 : :
2669 : 0 : PrepareRedoRemoveFull(fxid, giveWarning);
2670 : 0 : }
2671 : :
2672 : : /*
2673 : : * LookupGXact
2674 : : * Check if the prepared transaction with the given GID, lsn and timestamp
2675 : : * exists.
2676 : : *
2677 : : * Note that we always compare with the LSN where prepare ends because that is
2678 : : * what is stored as origin_lsn in the 2PC file.
2679 : : *
2680 : : * This function is primarily used to check if the prepared transaction
2681 : : * received from the upstream (remote node) already exists. Checking only GID
2682 : : * is not sufficient because a different prepared xact with the same GID can
2683 : : * exist on the same node. So, we are ensuring to match origin_lsn and
2684 : : * origin_timestamp of prepared xact to avoid the possibility of a match of
2685 : : * prepared xact from two different nodes.
2686 : : */
2687 : : bool
2688 : 0 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2689 : : TimestampTz origin_prepare_timestamp)
2690 : : {
2691 : 0 : int i;
2692 : 0 : bool found = false;
2693 : :
2694 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2695 [ # # ]: 0 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2696 : : {
2697 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2698 : :
2699 : : /* Ignore not-yet-valid GIDs. */
2700 [ # # # # ]: 0 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2701 : : {
2702 : 0 : char *buf;
2703 : 0 : TwoPhaseFileHeader *hdr;
2704 : :
2705 : : /*
2706 : : * We are not expecting collisions of GXACTs (same gid) between
2707 : : * publisher and subscribers, so we perform all I/O while holding
2708 : : * TwoPhaseStateLock for simplicity.
2709 : : *
2710 : : * To move the I/O out of the lock, we need to ensure that no
2711 : : * other backend commits the prepared xact in the meantime. We can
2712 : : * do this optimization if we encounter many collisions in GID
2713 : : * between publisher and subscriber.
2714 : : */
2715 [ # # ]: 0 : if (gxact->ondisk)
2716 : 0 : buf = ReadTwoPhaseFile(gxact->fxid, false);
2717 : : else
2718 : : {
2719 [ # # ]: 0 : Assert(gxact->prepare_start_lsn);
2720 : 0 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2721 : : }
2722 : :
2723 : 0 : hdr = (TwoPhaseFileHeader *) buf;
2724 : :
2725 [ # # # # ]: 0 : if (hdr->origin_lsn == prepare_end_lsn &&
2726 : 0 : hdr->origin_timestamp == origin_prepare_timestamp)
2727 : : {
2728 : 0 : found = true;
2729 : 0 : pfree(buf);
2730 : 0 : break;
2731 : : }
2732 : :
2733 : 0 : pfree(buf);
2734 [ # # ]: 0 : }
2735 [ # # # ]: 0 : }
2736 : 0 : LWLockRelease(TwoPhaseStateLock);
2737 : 0 : return found;
2738 : 0 : }
2739 : :
2740 : : /*
2741 : : * TwoPhaseTransactionGid
2742 : : * Form the prepared transaction GID for two_phase transactions.
2743 : : *
2744 : : * Return the GID in the supplied buffer.
2745 : : */
2746 : : void
2747 : 0 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
2748 : : {
2749 [ # # ]: 0 : Assert(OidIsValid(subid));
2750 : :
2751 [ # # ]: 0 : if (!TransactionIdIsValid(xid))
2752 [ # # # # ]: 0 : ereport(ERROR,
2753 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2754 : : errmsg_internal("invalid two-phase transaction ID")));
2755 : :
2756 : 0 : snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
2757 : 0 : }
2758 : :
2759 : : /*
2760 : : * IsTwoPhaseTransactionGidForSubid
2761 : : * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2762 : : * for the specified 'subid'.
2763 : : */
2764 : : static bool
2765 : 0 : IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
2766 : : {
2767 : 0 : int ret;
2768 : 0 : Oid subid_from_gid;
2769 : 0 : TransactionId xid_from_gid;
2770 : 0 : char gid_tmp[GIDSIZE];
2771 : :
2772 : : /* Extract the subid and xid from the given GID */
2773 : 0 : ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2774 : :
2775 : : /*
2776 : : * Check that the given GID has expected format, and at least the subid
2777 : : * matches.
2778 : : */
2779 [ # # # # ]: 0 : if (ret != 2 || subid != subid_from_gid)
2780 : 0 : return false;
2781 : :
2782 : : /*
2783 : : * Reconstruct a temporary GID based on the subid and xid extracted from
2784 : : * the given GID and check whether the temporary GID and the given GID
2785 : : * match.
2786 : : */
2787 : 0 : TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
2788 : :
2789 : 0 : return strcmp(gid, gid_tmp) == 0;
2790 : 0 : }
2791 : :
2792 : : /*
2793 : : * LookupGXactBySubid
2794 : : * Check if the prepared transaction done by apply worker exists.
2795 : : */
2796 : : bool
2797 : 0 : LookupGXactBySubid(Oid subid)
2798 : : {
2799 : 0 : bool found = false;
2800 : :
2801 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2802 [ # # ]: 0 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2803 : : {
2804 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2805 : :
2806 : : /* Ignore not-yet-valid GIDs. */
2807 [ # # # # ]: 0 : if (gxact->valid &&
2808 : 0 : IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
2809 : : {
2810 : 0 : found = true;
2811 : 0 : break;
2812 : : }
2813 [ # # ]: 0 : }
2814 : 0 : LWLockRelease(TwoPhaseStateLock);
2815 : :
2816 : 0 : return found;
2817 : 0 : }
2818 : :
2819 : : /*
2820 : : * TwoPhaseGetOldestXidInCommit
2821 : : * Return the oldest transaction ID from prepared transactions that are
2822 : : * currently in the commit critical section.
2823 : : *
2824 : : * This function only considers transactions in the currently connected
2825 : : * database. If no matching transactions are found, it returns
2826 : : * InvalidTransactionId.
2827 : : */
2828 : : TransactionId
2829 : 0 : TwoPhaseGetOldestXidInCommit(void)
2830 : : {
2831 : 0 : TransactionId oldestRunningXid = InvalidTransactionId;
2832 : :
2833 : 0 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2834 : :
2835 [ # # ]: 0 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2836 : : {
2837 : 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2838 : 0 : PGPROC *commitproc;
2839 : 0 : TransactionId xid;
2840 : :
2841 [ # # ]: 0 : if (!gxact->valid)
2842 : 0 : continue;
2843 : :
2844 [ # # ]: 0 : if (gxact->locking_backend == INVALID_PROC_NUMBER)
2845 : 0 : continue;
2846 : :
2847 : : /*
2848 : : * Get the backend that is handling the transaction. It's safe to
2849 : : * access this backend while holding TwoPhaseStateLock, as the backend
2850 : : * can only be destroyed after either removing or unlocking the
2851 : : * current global transaction, both of which require an exclusive
2852 : : * TwoPhaseStateLock.
2853 : : */
2854 : 0 : commitproc = GetPGProcByNumber(gxact->locking_backend);
2855 : :
2856 [ # # ]: 0 : if (MyDatabaseId != commitproc->databaseId)
2857 : 0 : continue;
2858 : :
2859 [ # # ]: 0 : if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
2860 : 0 : continue;
2861 : :
2862 : 0 : xid = XidFromFullTransactionId(gxact->fxid);
2863 : :
2864 [ # # # # ]: 0 : if (!TransactionIdIsValid(oldestRunningXid) ||
2865 : 0 : TransactionIdPrecedes(xid, oldestRunningXid))
2866 : 0 : oldestRunningXid = xid;
2867 [ # # # ]: 0 : }
2868 : :
2869 : 0 : LWLockRelease(TwoPhaseStateLock);
2870 : :
2871 : 0 : return oldestRunningXid;
2872 : 0 : }
|