Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * test_aio.c
4 : * Helpers to write tests for AIO
5 : *
6 : * This module provides interface functions for C functionality to SQL, to
7 : * make it possible to test AIO related behavior in a targeted way from SQL.
8 : * It'd not generally be safe to export these functions to SQL, but for a test
9 : * that's fine.
10 : *
11 : * Copyright (c) 2020-2026, PostgreSQL Global Development Group
12 : *
13 : * IDENTIFICATION
14 : * src/test/modules/test_aio/test_aio.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "access/relation.h"
22 : #include "fmgr.h"
23 : #include "storage/aio.h"
24 : #include "storage/aio_internal.h"
25 : #include "storage/buf_internals.h"
26 : #include "storage/bufmgr.h"
27 : #include "storage/checksum.h"
28 : #include "storage/ipc.h"
29 : #include "storage/lwlock.h"
30 : #include "utils/builtins.h"
31 : #include "utils/injection_point.h"
32 : #include "utils/rel.h"
33 :
34 :
35 0 : PG_MODULE_MAGIC;
36 :
37 :
38 : typedef struct InjIoErrorState
39 : {
40 : bool enabled_short_read;
41 : bool enabled_reopen;
42 :
43 : bool short_read_result_set;
44 : int short_read_result;
45 : } InjIoErrorState;
46 :
47 : static InjIoErrorState *inj_io_error_state;
48 :
49 : /* Shared memory init callbacks */
50 : static shmem_request_hook_type prev_shmem_request_hook = NULL;
51 : static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
52 :
53 :
54 : static PgAioHandle *last_handle;
55 :
56 :
57 :
58 : static void
59 0 : test_aio_shmem_request(void)
60 : {
61 0 : if (prev_shmem_request_hook)
62 0 : prev_shmem_request_hook();
63 :
64 0 : RequestAddinShmemSpace(sizeof(InjIoErrorState));
65 0 : }
66 :
67 : static void
68 0 : test_aio_shmem_startup(void)
69 : {
70 0 : bool found;
71 :
72 0 : if (prev_shmem_startup_hook)
73 0 : prev_shmem_startup_hook();
74 :
75 : /* Create or attach to the shared memory state */
76 0 : LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
77 :
78 0 : inj_io_error_state = ShmemInitStruct("injection_points",
79 : sizeof(InjIoErrorState),
80 : &found);
81 :
82 0 : if (!found)
83 : {
84 : /* First time through, initialize */
85 0 : inj_io_error_state->enabled_short_read = false;
86 0 : inj_io_error_state->enabled_reopen = false;
87 :
88 : #ifdef USE_INJECTION_POINTS
89 : InjectionPointAttach("aio-process-completion-before-shared",
90 : "test_aio",
91 : "inj_io_short_read",
92 : NULL,
93 : 0);
94 : InjectionPointLoad("aio-process-completion-before-shared");
95 :
96 : InjectionPointAttach("aio-worker-after-reopen",
97 : "test_aio",
98 : "inj_io_reopen",
99 : NULL,
100 : 0);
101 : InjectionPointLoad("aio-worker-after-reopen");
102 :
103 : #endif
104 0 : }
105 : else
106 : {
107 : /*
108 : * Pre-load the injection points now, so we can call them in a
109 : * critical section.
110 : */
111 : #ifdef USE_INJECTION_POINTS
112 : InjectionPointLoad("aio-process-completion-before-shared");
113 : InjectionPointLoad("aio-worker-after-reopen");
114 : elog(LOG, "injection point loaded");
115 : #endif
116 : }
117 :
118 0 : LWLockRelease(AddinShmemInitLock);
119 0 : }
120 :
121 : void
122 0 : _PG_init(void)
123 : {
124 0 : if (!process_shared_preload_libraries_in_progress)
125 0 : return;
126 :
127 0 : prev_shmem_request_hook = shmem_request_hook;
128 0 : shmem_request_hook = test_aio_shmem_request;
129 0 : prev_shmem_startup_hook = shmem_startup_hook;
130 0 : shmem_startup_hook = test_aio_shmem_startup;
131 0 : }
132 :
133 :
134 0 : PG_FUNCTION_INFO_V1(errno_from_string);
135 : Datum
136 0 : errno_from_string(PG_FUNCTION_ARGS)
137 : {
138 0 : const char *sym = text_to_cstring(PG_GETARG_TEXT_PP(0));
139 :
140 0 : if (strcmp(sym, "EIO") == 0)
141 0 : PG_RETURN_INT32(EIO);
142 0 : else if (strcmp(sym, "EAGAIN") == 0)
143 0 : PG_RETURN_INT32(EAGAIN);
144 0 : else if (strcmp(sym, "EINTR") == 0)
145 0 : PG_RETURN_INT32(EINTR);
146 0 : else if (strcmp(sym, "ENOSPC") == 0)
147 0 : PG_RETURN_INT32(ENOSPC);
148 0 : else if (strcmp(sym, "EROFS") == 0)
149 0 : PG_RETURN_INT32(EROFS);
150 :
151 0 : ereport(ERROR,
152 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
153 : errmsg_internal("%s is not a supported errno value", sym));
154 0 : PG_RETURN_INT32(0);
155 0 : }
156 :
157 0 : PG_FUNCTION_INFO_V1(grow_rel);
158 : Datum
159 0 : grow_rel(PG_FUNCTION_ARGS)
160 : {
161 0 : Oid relid = PG_GETARG_OID(0);
162 0 : uint32 nblocks = PG_GETARG_UINT32(1);
163 0 : Relation rel;
164 : #define MAX_BUFFERS_TO_EXTEND_BY 64
165 0 : Buffer victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
166 :
167 0 : rel = relation_open(relid, AccessExclusiveLock);
168 :
169 0 : while (nblocks > 0)
170 : {
171 0 : uint32 extend_by_pages;
172 :
173 0 : extend_by_pages = Min(nblocks, MAX_BUFFERS_TO_EXTEND_BY);
174 :
175 0 : ExtendBufferedRelBy(BMR_REL(rel),
176 : MAIN_FORKNUM,
177 : NULL,
178 : 0,
179 0 : extend_by_pages,
180 0 : victim_buffers,
181 : &extend_by_pages);
182 :
183 0 : nblocks -= extend_by_pages;
184 :
185 0 : for (uint32 i = 0; i < extend_by_pages; i++)
186 : {
187 0 : ReleaseBuffer(victim_buffers[i]);
188 0 : }
189 0 : }
190 :
191 0 : relation_close(rel, NoLock);
192 :
193 0 : PG_RETURN_VOID();
194 0 : }
195 :
196 0 : PG_FUNCTION_INFO_V1(modify_rel_block);
197 : Datum
198 0 : modify_rel_block(PG_FUNCTION_ARGS)
199 : {
200 0 : Oid relid = PG_GETARG_OID(0);
201 0 : BlockNumber blkno = PG_GETARG_UINT32(1);
202 0 : bool zero = PG_GETARG_BOOL(2);
203 0 : bool corrupt_header = PG_GETARG_BOOL(3);
204 0 : bool corrupt_checksum = PG_GETARG_BOOL(4);
205 0 : Page page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
206 0 : bool flushed;
207 0 : Relation rel;
208 0 : Buffer buf;
209 0 : PageHeader ph;
210 :
211 0 : rel = relation_open(relid, AccessExclusiveLock);
212 :
213 0 : buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
214 : RBM_ZERO_ON_ERROR, NULL);
215 :
216 0 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
217 :
218 : /*
219 : * copy the page to local memory, seems nicer than to directly modify in
220 : * the buffer pool.
221 : */
222 0 : memcpy(page, BufferGetPage(buf), BLCKSZ);
223 :
224 0 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
225 :
226 0 : ReleaseBuffer(buf);
227 :
228 : /*
229 : * Don't want to have a buffer in-memory that's marked valid where the
230 : * on-disk contents are invalid. Particularly not if the in-memory buffer
231 : * could be dirty...
232 : *
233 : * While we hold an AEL on the relation nobody else should be able to read
234 : * the buffer in.
235 : *
236 : * NB: This is probably racy, better don't copy this to non-test code.
237 : */
238 0 : if (BufferIsLocal(buf))
239 0 : InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
240 : else
241 0 : EvictUnpinnedBuffer(buf, &flushed);
242 :
243 : /*
244 : * Now modify the page as asked for by the caller.
245 : */
246 0 : if (zero)
247 0 : memset(page, 0, BufferGetPageSize(buf));
248 :
249 0 : if (PageIsEmpty(page) && (corrupt_header || corrupt_checksum))
250 0 : PageInit(page, BufferGetPageSize(buf), 0);
251 :
252 0 : ph = (PageHeader) page;
253 :
254 0 : if (corrupt_header)
255 0 : ph->pd_special = BLCKSZ + 1;
256 :
257 0 : if (corrupt_checksum)
258 : {
259 0 : bool successfully_corrupted = 0;
260 :
261 : /*
262 : * Any single modification of the checksum could just end up being
263 : * valid again, due to e.g. corrupt_header changing the data in a way
264 : * that'd result in the "corrupted" checksum, or the checksum already
265 : * being invalid. Retry in that, unlikely, case.
266 : */
267 0 : for (int i = 0; i < 100; i++)
268 : {
269 0 : uint16 verify_checksum;
270 0 : uint16 old_checksum;
271 :
272 0 : old_checksum = ph->pd_checksum;
273 0 : ph->pd_checksum = old_checksum + 1;
274 :
275 0 : elog(LOG, "corrupting checksum of blk %u from %u to %u",
276 : blkno, old_checksum, ph->pd_checksum);
277 :
278 0 : verify_checksum = pg_checksum_page(page, blkno);
279 0 : if (verify_checksum != ph->pd_checksum)
280 : {
281 0 : successfully_corrupted = true;
282 0 : break;
283 : }
284 0 : }
285 :
286 0 : if (!successfully_corrupted)
287 0 : elog(ERROR, "could not corrupt checksum, what's going on?");
288 0 : }
289 : else
290 : {
291 0 : PageSetChecksumInplace(page, blkno);
292 : }
293 :
294 0 : smgrwrite(RelationGetSmgr(rel),
295 0 : MAIN_FORKNUM, blkno, page, true);
296 :
297 0 : relation_close(rel, NoLock);
298 :
299 0 : PG_RETURN_VOID();
300 0 : }
301 :
302 : /*
303 : * Ensures a buffer for rel & blkno is in shared buffers, without actually
304 : * caring about the buffer contents. Used to set up test scenarios.
305 : */
306 : static Buffer
307 0 : create_toy_buffer(Relation rel, BlockNumber blkno)
308 : {
309 0 : Buffer buf;
310 0 : BufferDesc *buf_hdr;
311 0 : uint64 buf_state;
312 0 : bool was_pinned = false;
313 0 : uint64 unset_bits = 0;
314 :
315 : /* place buffer in shared buffers without erroring out */
316 0 : buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
317 0 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
318 :
319 0 : if (RelationUsesLocalBuffers(rel))
320 : {
321 0 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
322 0 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
323 0 : }
324 : else
325 : {
326 0 : buf_hdr = GetBufferDescriptor(buf - 1);
327 0 : buf_state = LockBufHdr(buf_hdr);
328 : }
329 :
330 : /*
331 : * We should be the only backend accessing this buffer. This is just a
332 : * small bit of belt-and-suspenders defense, none of this code should ever
333 : * run in a cluster with real data.
334 : */
335 0 : if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
336 0 : was_pinned = true;
337 : else
338 0 : unset_bits |= BM_VALID | BM_DIRTY;
339 :
340 0 : if (RelationUsesLocalBuffers(rel))
341 : {
342 0 : buf_state &= ~unset_bits;
343 0 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
344 0 : }
345 : else
346 : {
347 0 : UnlockBufHdrExt(buf_hdr, buf_state, 0, unset_bits, 0);
348 : }
349 :
350 0 : if (was_pinned)
351 0 : elog(ERROR, "toy buffer %d was already pinned",
352 : buf);
353 :
354 0 : return buf;
355 0 : }
356 :
357 : /*
358 : * A "low level" read. This does similar things to what
359 : * StartReadBuffers()/WaitReadBuffers() do, but provides more control (and
360 : * less sanity).
361 : */
362 0 : PG_FUNCTION_INFO_V1(read_rel_block_ll);
363 : Datum
364 0 : read_rel_block_ll(PG_FUNCTION_ARGS)
365 : {
366 0 : Oid relid = PG_GETARG_OID(0);
367 0 : BlockNumber blkno = PG_GETARG_UINT32(1);
368 0 : int nblocks = PG_GETARG_INT32(2);
369 0 : bool wait_complete = PG_GETARG_BOOL(3);
370 0 : bool batchmode_enter = PG_GETARG_BOOL(4);
371 0 : bool call_smgrreleaseall = PG_GETARG_BOOL(5);
372 0 : bool batchmode_exit = PG_GETARG_BOOL(6);
373 0 : bool zero_on_error = PG_GETARG_BOOL(7);
374 0 : Relation rel;
375 0 : Buffer bufs[PG_IOV_MAX];
376 0 : BufferDesc *buf_hdrs[PG_IOV_MAX];
377 0 : Page pages[PG_IOV_MAX];
378 0 : uint8 srb_flags = 0;
379 0 : PgAioReturn ior;
380 0 : PgAioHandle *ioh;
381 0 : PgAioWaitRef iow;
382 0 : SMgrRelation smgr;
383 :
384 0 : if (nblocks <= 0 || nblocks > PG_IOV_MAX)
385 0 : elog(ERROR, "nblocks is out of range");
386 :
387 0 : rel = relation_open(relid, AccessExclusiveLock);
388 :
389 0 : for (int i = 0; i < nblocks; i++)
390 : {
391 0 : bufs[i] = create_toy_buffer(rel, blkno + i);
392 0 : pages[i] = BufferGetBlock(bufs[i]);
393 0 : buf_hdrs[i] = BufferIsLocal(bufs[i]) ?
394 0 : GetLocalBufferDescriptor(-bufs[i] - 1) :
395 0 : GetBufferDescriptor(bufs[i] - 1);
396 0 : }
397 :
398 0 : smgr = RelationGetSmgr(rel);
399 :
400 0 : pgstat_prepare_report_checksum_failure(smgr->smgr_rlocator.locator.dbOid);
401 :
402 0 : ioh = pgaio_io_acquire(CurrentResourceOwner, &ior);
403 0 : pgaio_io_get_wref(ioh, &iow);
404 :
405 0 : if (RelationUsesLocalBuffers(rel))
406 : {
407 0 : for (int i = 0; i < nblocks; i++)
408 0 : StartLocalBufferIO(buf_hdrs[i], true, false);
409 0 : pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
410 0 : }
411 : else
412 : {
413 0 : for (int i = 0; i < nblocks; i++)
414 0 : StartBufferIO(buf_hdrs[i], true, false);
415 : }
416 :
417 0 : pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
418 :
419 0 : if (zero_on_error | zero_damaged_pages)
420 0 : srb_flags |= READ_BUFFERS_ZERO_ON_ERROR;
421 0 : if (ignore_checksum_failure)
422 0 : srb_flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
423 :
424 0 : pgaio_io_register_callbacks(ioh,
425 0 : RelationUsesLocalBuffers(rel) ?
426 : PGAIO_HCB_LOCAL_BUFFER_READV :
427 : PGAIO_HCB_SHARED_BUFFER_READV,
428 0 : srb_flags);
429 :
430 0 : if (batchmode_enter)
431 0 : pgaio_enter_batchmode();
432 :
433 0 : smgrstartreadv(ioh, smgr, MAIN_FORKNUM, blkno,
434 0 : (void *) pages, nblocks);
435 :
436 0 : if (call_smgrreleaseall)
437 0 : smgrreleaseall();
438 :
439 0 : if (batchmode_exit)
440 0 : pgaio_exit_batchmode();
441 :
442 0 : for (int i = 0; i < nblocks; i++)
443 0 : ReleaseBuffer(bufs[i]);
444 :
445 0 : if (wait_complete)
446 : {
447 0 : pgaio_wref_wait(&iow);
448 :
449 0 : if (ior.result.status != PGAIO_RS_OK)
450 0 : pgaio_result_report(ior.result,
451 0 : &ior.target_data,
452 0 : ior.result.status == PGAIO_RS_ERROR ?
453 : ERROR : WARNING);
454 0 : }
455 :
456 0 : relation_close(rel, NoLock);
457 :
458 0 : PG_RETURN_VOID();
459 0 : }
460 :
461 0 : PG_FUNCTION_INFO_V1(invalidate_rel_block);
462 : Datum
463 0 : invalidate_rel_block(PG_FUNCTION_ARGS)
464 : {
465 0 : Oid relid = PG_GETARG_OID(0);
466 0 : BlockNumber blkno = PG_GETARG_UINT32(1);
467 0 : Relation rel;
468 0 : PrefetchBufferResult pr;
469 0 : Buffer buf;
470 :
471 0 : rel = relation_open(relid, AccessExclusiveLock);
472 :
473 : /*
474 : * This is a gross hack, but there's no other API exposed that allows to
475 : * get a buffer ID without actually reading the block in.
476 : */
477 0 : pr = PrefetchBuffer(rel, MAIN_FORKNUM, blkno);
478 0 : buf = pr.recent_buffer;
479 :
480 0 : if (BufferIsValid(buf))
481 : {
482 : /* if the buffer contents aren't valid, this'll return false */
483 0 : if (ReadRecentBuffer(rel->rd_locator, MAIN_FORKNUM, blkno, buf))
484 : {
485 0 : BufferDesc *buf_hdr = BufferIsLocal(buf) ?
486 0 : GetLocalBufferDescriptor(-buf - 1)
487 0 : : GetBufferDescriptor(buf - 1);
488 0 : bool flushed;
489 :
490 0 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
491 :
492 0 : if (pg_atomic_read_u64(&buf_hdr->state) & BM_DIRTY)
493 : {
494 0 : if (BufferIsLocal(buf))
495 0 : FlushLocalBuffer(buf_hdr, NULL);
496 : else
497 0 : FlushOneBuffer(buf);
498 0 : }
499 0 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
500 0 : ReleaseBuffer(buf);
501 :
502 0 : if (BufferIsLocal(buf))
503 0 : InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
504 0 : else if (!EvictUnpinnedBuffer(buf, &flushed))
505 0 : elog(ERROR, "couldn't evict");
506 0 : }
507 0 : }
508 :
509 0 : relation_close(rel, AccessExclusiveLock);
510 :
511 0 : PG_RETURN_VOID();
512 0 : }
513 :
514 0 : PG_FUNCTION_INFO_V1(buffer_create_toy);
515 : Datum
516 0 : buffer_create_toy(PG_FUNCTION_ARGS)
517 : {
518 0 : Oid relid = PG_GETARG_OID(0);
519 0 : BlockNumber blkno = PG_GETARG_UINT32(1);
520 0 : Relation rel;
521 0 : Buffer buf;
522 :
523 0 : rel = relation_open(relid, AccessExclusiveLock);
524 :
525 0 : buf = create_toy_buffer(rel, blkno);
526 0 : ReleaseBuffer(buf);
527 :
528 0 : relation_close(rel, NoLock);
529 :
530 0 : PG_RETURN_INT32(buf);
531 0 : }
532 :
533 0 : PG_FUNCTION_INFO_V1(buffer_call_start_io);
534 : Datum
535 0 : buffer_call_start_io(PG_FUNCTION_ARGS)
536 : {
537 0 : Buffer buf = PG_GETARG_INT32(0);
538 0 : bool for_input = PG_GETARG_BOOL(1);
539 0 : bool nowait = PG_GETARG_BOOL(2);
540 0 : bool can_start;
541 :
542 0 : if (BufferIsLocal(buf))
543 0 : can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
544 0 : for_input, nowait);
545 : else
546 0 : can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
547 0 : for_input, nowait);
548 :
549 : /*
550 : * For tests we don't want the resowner release preventing us from
551 : * orchestrating odd scenarios.
552 : */
553 0 : if (can_start && !BufferIsLocal(buf))
554 0 : ResourceOwnerForgetBufferIO(CurrentResourceOwner,
555 0 : buf);
556 :
557 0 : ereport(LOG,
558 : errmsg("buffer %d after StartBufferIO: %s",
559 : buf, DebugPrintBufferRefcount(buf)),
560 : errhidestmt(true), errhidecontext(true));
561 :
562 0 : PG_RETURN_BOOL(can_start);
563 0 : }
564 :
565 0 : PG_FUNCTION_INFO_V1(buffer_call_terminate_io);
566 : Datum
567 0 : buffer_call_terminate_io(PG_FUNCTION_ARGS)
568 : {
569 0 : Buffer buf = PG_GETARG_INT32(0);
570 0 : bool for_input = PG_GETARG_BOOL(1);
571 0 : bool succeed = PG_GETARG_BOOL(2);
572 0 : bool io_error = PG_GETARG_BOOL(3);
573 0 : bool release_aio = PG_GETARG_BOOL(4);
574 0 : bool clear_dirty = false;
575 0 : uint64 set_flag_bits = 0;
576 :
577 0 : if (io_error)
578 0 : set_flag_bits |= BM_IO_ERROR;
579 :
580 0 : if (for_input)
581 : {
582 0 : clear_dirty = false;
583 :
584 0 : if (succeed)
585 0 : set_flag_bits |= BM_VALID;
586 0 : }
587 : else
588 : {
589 0 : if (succeed)
590 0 : clear_dirty = true;
591 : }
592 :
593 0 : ereport(LOG,
594 : errmsg("buffer %d before Terminate[Local]BufferIO: %s",
595 : buf, DebugPrintBufferRefcount(buf)),
596 : errhidestmt(true), errhidecontext(true));
597 :
598 0 : if (BufferIsLocal(buf))
599 0 : TerminateLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
600 0 : clear_dirty, set_flag_bits, release_aio);
601 : else
602 0 : TerminateBufferIO(GetBufferDescriptor(buf - 1),
603 0 : clear_dirty, set_flag_bits, false, release_aio);
604 :
605 0 : ereport(LOG,
606 : errmsg("buffer %d after Terminate[Local]BufferIO: %s",
607 : buf, DebugPrintBufferRefcount(buf)),
608 : errhidestmt(true), errhidecontext(true));
609 :
610 0 : PG_RETURN_VOID();
611 0 : }
612 :
613 0 : PG_FUNCTION_INFO_V1(handle_get);
614 : Datum
615 0 : handle_get(PG_FUNCTION_ARGS)
616 : {
617 0 : last_handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
618 :
619 0 : PG_RETURN_VOID();
620 : }
621 :
622 0 : PG_FUNCTION_INFO_V1(handle_release_last);
623 : Datum
624 0 : handle_release_last(PG_FUNCTION_ARGS)
625 : {
626 0 : if (!last_handle)
627 0 : elog(ERROR, "no handle");
628 :
629 0 : pgaio_io_release(last_handle);
630 :
631 0 : PG_RETURN_VOID();
632 : }
633 :
634 0 : PG_FUNCTION_INFO_V1(handle_get_and_error);
635 : Datum
636 0 : handle_get_and_error(PG_FUNCTION_ARGS)
637 : {
638 0 : pgaio_io_acquire(CurrentResourceOwner, NULL);
639 :
640 0 : elog(ERROR, "as you command");
641 0 : PG_RETURN_VOID();
642 : }
643 :
644 0 : PG_FUNCTION_INFO_V1(handle_get_twice);
645 : Datum
646 0 : handle_get_twice(PG_FUNCTION_ARGS)
647 : {
648 0 : pgaio_io_acquire(CurrentResourceOwner, NULL);
649 0 : pgaio_io_acquire(CurrentResourceOwner, NULL);
650 :
651 0 : PG_RETURN_VOID();
652 : }
653 :
654 0 : PG_FUNCTION_INFO_V1(handle_get_release);
655 : Datum
656 0 : handle_get_release(PG_FUNCTION_ARGS)
657 : {
658 0 : PgAioHandle *handle;
659 :
660 0 : handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
661 0 : pgaio_io_release(handle);
662 :
663 0 : PG_RETURN_VOID();
664 0 : }
665 :
666 0 : PG_FUNCTION_INFO_V1(batch_start);
667 : Datum
668 0 : batch_start(PG_FUNCTION_ARGS)
669 : {
670 0 : pgaio_enter_batchmode();
671 0 : PG_RETURN_VOID();
672 : }
673 :
674 0 : PG_FUNCTION_INFO_V1(batch_end);
675 : Datum
676 0 : batch_end(PG_FUNCTION_ARGS)
677 : {
678 0 : pgaio_exit_batchmode();
679 0 : PG_RETURN_VOID();
680 : }
681 :
682 : #ifdef USE_INJECTION_POINTS
683 : extern PGDLLEXPORT void inj_io_short_read(const char *name,
684 : const void *private_data,
685 : void *arg);
686 : extern PGDLLEXPORT void inj_io_reopen(const char *name,
687 : const void *private_data,
688 : void *arg);
689 :
690 : void
691 : inj_io_short_read(const char *name, const void *private_data, void *arg)
692 : {
693 : PgAioHandle *ioh = (PgAioHandle *) arg;
694 :
695 : ereport(LOG,
696 : errmsg("short read injection point called, is enabled: %d",
697 : inj_io_error_state->enabled_reopen),
698 : errhidestmt(true), errhidecontext(true));
699 :
700 : if (inj_io_error_state->enabled_short_read)
701 : {
702 : /*
703 : * Only shorten reads that are actually longer than the target size,
704 : * otherwise we can trigger over-reads.
705 : */
706 : if (inj_io_error_state->short_read_result_set
707 : && ioh->op == PGAIO_OP_READV
708 : && inj_io_error_state->short_read_result <= ioh->result)
709 : {
710 : struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
711 : int32 old_result = ioh->result;
712 : int32 new_result = inj_io_error_state->short_read_result;
713 : int32 processed = 0;
714 :
715 : ereport(LOG,
716 : errmsg("short read inject point, changing result from %d to %d",
717 : old_result, new_result),
718 : errhidestmt(true), errhidecontext(true));
719 :
720 : /*
721 : * The underlying IO actually completed OK, and thus the "invalid"
722 : * portion of the IOV actually contains valid data. That can hide
723 : * a lot of problems, e.g. if we were to wrongly mark a buffer,
724 : * that wasn't read according to the shortened-read, IO as valid,
725 : * the contents would look valid and we might miss a bug.
726 : *
727 : * To avoid that, iterate through the IOV and zero out the
728 : * "failed" portion of the IO.
729 : */
730 : for (int i = 0; i < ioh->op_data.read.iov_length; i++)
731 : {
732 : if (processed + iov[i].iov_len <= new_result)
733 : processed += iov[i].iov_len;
734 : else if (processed <= new_result)
735 : {
736 : uint32 ok_part = new_result - processed;
737 :
738 : memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
739 : processed += iov[i].iov_len;
740 : }
741 : else
742 : {
743 : memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
744 : }
745 : }
746 :
747 : ioh->result = new_result;
748 : }
749 : }
750 : }
751 :
752 : void
753 : inj_io_reopen(const char *name, const void *private_data, void *arg)
754 : {
755 : ereport(LOG,
756 : errmsg("reopen injection point called, is enabled: %d",
757 : inj_io_error_state->enabled_reopen),
758 : errhidestmt(true), errhidecontext(true));
759 :
760 : if (inj_io_error_state->enabled_reopen)
761 : elog(ERROR, "injection point triggering failure to reopen ");
762 : }
763 : #endif
764 :
765 0 : PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
766 : Datum
767 0 : inj_io_short_read_attach(PG_FUNCTION_ARGS)
768 : {
769 : #ifdef USE_INJECTION_POINTS
770 : inj_io_error_state->enabled_short_read = true;
771 : inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
772 : if (inj_io_error_state->short_read_result_set)
773 : inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
774 : #else
775 0 : elog(ERROR, "injection points not supported");
776 : #endif
777 :
778 0 : PG_RETURN_VOID();
779 : }
780 :
781 0 : PG_FUNCTION_INFO_V1(inj_io_short_read_detach);
782 : Datum
783 0 : inj_io_short_read_detach(PG_FUNCTION_ARGS)
784 : {
785 : #ifdef USE_INJECTION_POINTS
786 : inj_io_error_state->enabled_short_read = false;
787 : #else
788 0 : elog(ERROR, "injection points not supported");
789 : #endif
790 0 : PG_RETURN_VOID();
791 : }
792 :
793 0 : PG_FUNCTION_INFO_V1(inj_io_reopen_attach);
794 : Datum
795 0 : inj_io_reopen_attach(PG_FUNCTION_ARGS)
796 : {
797 : #ifdef USE_INJECTION_POINTS
798 : inj_io_error_state->enabled_reopen = true;
799 : #else
800 0 : elog(ERROR, "injection points not supported");
801 : #endif
802 :
803 0 : PG_RETURN_VOID();
804 : }
805 :
806 0 : PG_FUNCTION_INFO_V1(inj_io_reopen_detach);
807 : Datum
808 0 : inj_io_reopen_detach(PG_FUNCTION_ARGS)
809 : {
810 : #ifdef USE_INJECTION_POINTS
811 : inj_io_error_state->enabled_reopen = false;
812 : #else
813 0 : elog(ERROR, "injection points not supported");
814 : #endif
815 0 : PG_RETURN_VOID();
816 : }
|