Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * tuplestore.c
4 : : * Generalized routines for temporary tuple storage.
5 : : *
6 : : * This module handles temporary storage of tuples for purposes such
7 : : * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 : : * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 : : * but can only store and regurgitate a sequence of tuples. However,
10 : : * because no sort is required, it is allowed to start reading the sequence
11 : : * before it has all been written. This is particularly useful for cursors,
12 : : * because it allows random access within the already-scanned portion of
13 : : * a query without having to process the underlying scan to completion.
14 : : * Also, it is possible to support multiple independent read pointers.
15 : : *
16 : : * A temporary file is used to handle the data if it exceeds the
17 : : * space limit specified by the caller.
18 : : *
19 : : * The (approximate) amount of memory allowed to the tuplestore is specified
20 : : * in kilobytes by the caller. We absorb tuples and simply store them in an
21 : : * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
22 : : * maxKBytes, we dump all the tuples into a temp file and then read from that
23 : : * when needed.
24 : : *
25 : : * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26 : : * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27 : : * Mark/restore behavior is supported by copying read pointers.
28 : : *
29 : : * When the caller requests backward-scan capability, we write the temp file
30 : : * in a format that allows either forward or backward scan. Otherwise, only
31 : : * forward scan is allowed. A request for backward scan must be made before
32 : : * putting any tuples into the tuplestore. Rewind is normally allowed but
33 : : * can be turned off via tuplestore_set_eflags; turning off rewind for all
34 : : * read pointers enables truncation of the tuplestore at the oldest read point
35 : : * for minimal memory usage. (The caller must explicitly call tuplestore_trim
36 : : * at appropriate times for truncation to actually happen.)
37 : : *
38 : : * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39 : : * current write position, and the write-position variables in the tuplestore
40 : : * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
41 : : * seek position is the active read pointer's position, and that read pointer
42 : : * isn't kept up to date. We update the appropriate variables using ftell()
43 : : * before switching to the other state or activating a different read pointer.
44 : : *
45 : : *
46 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
47 : : * Portions Copyright (c) 1994, Regents of the University of California
48 : : *
49 : : * IDENTIFICATION
50 : : * src/backend/utils/sort/tuplestore.c
51 : : *
52 : : *-------------------------------------------------------------------------
53 : : */
54 : :
55 : : #include "postgres.h"
56 : :
57 : : #include <limits.h>
58 : :
59 : : #include "access/htup_details.h"
60 : : #include "commands/tablespace.h"
61 : : #include "executor/executor.h"
62 : : #include "miscadmin.h"
63 : : #include "storage/buffile.h"
64 : : #include "utils/memutils.h"
65 : : #include "utils/resowner.h"
66 : :
67 : :
68 : : /*
69 : : * Possible states of a Tuplestore object. These denote the states that
70 : : * persist between calls of Tuplestore routines.
71 : : */
72 : : typedef enum
73 : : {
74 : : TSS_INMEM, /* Tuples still fit in memory */
75 : : TSS_WRITEFILE, /* Writing to temp file */
76 : : TSS_READFILE, /* Reading from temp file */
77 : : } TupStoreStatus;
78 : :
79 : : /*
80 : : * State for a single read pointer. If we are in state INMEM then all the
81 : : * read pointers' "current" fields denote the read positions. In state
82 : : * WRITEFILE, the file/offset fields denote the read positions. In state
83 : : * READFILE, inactive read pointers have valid file/offset, but the active
84 : : * read pointer implicitly has position equal to the temp file's seek position.
85 : : *
86 : : * Special case: if eof_reached is true, then the pointer's read position is
87 : : * implicitly equal to the write position, and current/file/offset aren't
88 : : * maintained. This way we need not update all the read pointers each time
89 : : * we write.
90 : : */
91 : : typedef struct
92 : : {
93 : : int eflags; /* capability flags */
94 : : bool eof_reached; /* read has reached EOF */
95 : : int current; /* next array index to read */
96 : : int file; /* temp file# */
97 : : pgoff_t offset; /* byte offset in file */
98 : : } TSReadPointer;
99 : :
100 : : /*
101 : : * Private state of a Tuplestore operation.
102 : : */
103 : : struct Tuplestorestate
104 : : {
105 : : TupStoreStatus status; /* enumerated value as shown above */
106 : : int eflags; /* capability flags (OR of pointers' flags) */
107 : : bool backward; /* store extra length words in file? */
108 : : bool interXact; /* keep open through transactions? */
109 : : bool truncated; /* tuplestore_trim has removed tuples? */
110 : : bool usedDisk; /* used by tuplestore_get_stats() */
111 : : int64 maxSpace; /* used by tuplestore_get_stats() */
112 : : int64 availMem; /* remaining memory available, in bytes */
113 : : int64 allowedMem; /* total memory allowed, in bytes */
114 : : int64 tuples; /* number of tuples added */
115 : : BufFile *myfile; /* underlying file, or NULL if none */
116 : : MemoryContext context; /* memory context for holding tuples */
117 : : ResourceOwner resowner; /* resowner for holding temp files */
118 : :
119 : : /*
120 : : * These function pointers decouple the routines that must know what kind
121 : : * of tuple we are handling from the routines that don't need to know it.
122 : : * They are set up by the tuplestore_begin_xxx routines.
123 : : *
124 : : * (Although tuplestore.c currently only supports heap tuples, I've copied
125 : : * this part of tuplesort.c so that extension to other kinds of objects
126 : : * will be easy if it's ever needed.)
127 : : *
128 : : * Function to copy a supplied input tuple into palloc'd space. (NB: we
129 : : * assume that a single pfree() is enough to release the tuple later, so
130 : : * the representation must be "flat" in one palloc chunk.) state->availMem
131 : : * must be decreased by the amount of space used.
132 : : */
133 : : void *(*copytup) (Tuplestorestate *state, void *tup);
134 : :
135 : : /*
136 : : * Function to write a stored tuple onto tape. The representation of the
137 : : * tuple on tape need not be the same as it is in memory; requirements on
138 : : * the tape representation are given below. After writing the tuple,
139 : : * pfree() it, and increase state->availMem by the amount of memory space
140 : : * thereby released.
141 : : */
142 : : void (*writetup) (Tuplestorestate *state, void *tup);
143 : :
144 : : /*
145 : : * Function to read a stored tuple from tape back into memory. 'len' is
146 : : * the already-read length of the stored tuple. Create and return a
147 : : * palloc'd copy, and decrease state->availMem by the amount of memory
148 : : * space consumed.
149 : : */
150 : : void *(*readtup) (Tuplestorestate *state, unsigned int len);
151 : :
152 : : /*
153 : : * This array holds pointers to tuples in memory if we are in state INMEM.
154 : : * In states WRITEFILE and READFILE it's not used.
155 : : *
156 : : * When memtupdeleted > 0, the first memtupdeleted pointers are already
157 : : * released due to a tuplestore_trim() operation, but we haven't expended
158 : : * the effort to slide the remaining pointers down. These unused pointers
159 : : * are set to NULL to catch any invalid accesses. Note that memtupcount
160 : : * includes the deleted pointers.
161 : : */
162 : : void **memtuples; /* array of pointers to palloc'd tuples */
163 : : int memtupdeleted; /* the first N slots are currently unused */
164 : : int memtupcount; /* number of tuples currently present */
165 : : int memtupsize; /* allocated length of memtuples array */
166 : : bool growmemtuples; /* memtuples' growth still underway? */
167 : :
168 : : /*
169 : : * These variables are used to keep track of the current positions.
170 : : *
171 : : * In state WRITEFILE, the current file seek position is the write point;
172 : : * in state READFILE, the write position is remembered in writepos_xxx.
173 : : * (The write position is the same as EOF, but since BufFileSeek doesn't
174 : : * currently implement SEEK_END, we have to remember it explicitly.)
175 : : */
176 : : TSReadPointer *readptrs; /* array of read pointers */
177 : : int activeptr; /* index of the active read pointer */
178 : : int readptrcount; /* number of pointers currently valid */
179 : : int readptrsize; /* allocated length of readptrs array */
180 : :
181 : : int writepos_file; /* file# (valid if READFILE state) */
182 : : pgoff_t writepos_offset; /* offset (valid if READFILE state) */
183 : : };
184 : :
185 : : #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
186 : : #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
187 : : #define READTUP(state,len) ((*(state)->readtup) (state, len))
188 : : #define LACKMEM(state) ((state)->availMem < 0)
189 : : #define USEMEM(state,amt) ((state)->availMem -= (amt))
190 : : #define FREEMEM(state,amt) ((state)->availMem += (amt))
191 : :
192 : : /*--------------------
193 : : *
194 : : * NOTES about on-tape representation of tuples:
195 : : *
196 : : * We require the first "unsigned int" of a stored tuple to be the total size
197 : : * on-tape of the tuple, including itself (so it is never zero).
198 : : * The remainder of the stored tuple
199 : : * may or may not match the in-memory representation of the tuple ---
200 : : * any conversion needed is the job of the writetup and readtup routines.
201 : : *
202 : : * If state->backward is true, then the stored representation of
203 : : * the tuple must be followed by another "unsigned int" that is a copy of the
204 : : * length --- so the total tape space used is actually sizeof(unsigned int)
205 : : * more than the stored length value. This allows read-backwards. When
206 : : * state->backward is not set, the write/read routines may omit the extra
207 : : * length word.
208 : : *
209 : : * writetup is expected to write both length words as well as the tuple
210 : : * data. When readtup is called, the tape is positioned just after the
211 : : * front length word; readtup must read the tuple data and advance past
212 : : * the back length word (if present).
213 : : *
214 : : * The write/read routines can make use of the tuple description data
215 : : * stored in the Tuplestorestate record, if needed. They are also expected
216 : : * to adjust state->availMem by the amount of memory space (not tape space!)
217 : : * released or consumed. There is no error return from either writetup
218 : : * or readtup; they should ereport() on failure.
219 : : *
220 : : *
221 : : * NOTES about memory consumption calculations:
222 : : *
223 : : * We count space allocated for tuples against the maxKBytes limit,
224 : : * plus the space used by the variable-size array memtuples.
225 : : * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
226 : : * We don't worry about the size of the read pointer array, either.
227 : : *
228 : : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
229 : : * rather than the originally-requested size. This is important since
230 : : * palloc can add substantial overhead. It's not a complete answer since
231 : : * we won't count any wasted space in palloc allocation blocks, but it's
232 : : * a lot better than what we were doing before 7.3.
233 : : *
234 : : *--------------------
235 : : */
236 : :
237 : :
238 : : static Tuplestorestate *tuplestore_begin_common(int eflags,
239 : : bool interXact,
240 : : int maxKBytes);
241 : : static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
242 : : static void dumptuples(Tuplestorestate *state);
243 : : static void tuplestore_updatemax(Tuplestorestate *state);
244 : : static unsigned int getlen(Tuplestorestate *state, bool eofOK);
245 : : static void *copytup_heap(Tuplestorestate *state, void *tup);
246 : : static void writetup_heap(Tuplestorestate *state, void *tup);
247 : : static void *readtup_heap(Tuplestorestate *state, unsigned int len);
248 : :
249 : :
250 : : /*
251 : : * tuplestore_begin_xxx
252 : : *
253 : : * Initialize for a tuple store operation.
254 : : */
255 : : static Tuplestorestate *
256 : 14525 : tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
257 : : {
258 : 14525 : Tuplestorestate *state;
259 : :
260 : 14525 : state = palloc0_object(Tuplestorestate);
261 : :
262 : 14525 : state->status = TSS_INMEM;
263 : 14525 : state->eflags = eflags;
264 : 14525 : state->interXact = interXact;
265 : 14525 : state->truncated = false;
266 : 14525 : state->usedDisk = false;
267 : 14525 : state->maxSpace = 0;
268 : 14525 : state->allowedMem = maxKBytes * (int64) 1024;
269 : 14525 : state->availMem = state->allowedMem;
270 : 14525 : state->myfile = NULL;
271 : :
272 : : /*
273 : : * The palloc/pfree pattern for tuple memory is in a FIFO pattern. A
274 : : * generation context is perfectly suited for this.
275 : : */
276 : 14525 : state->context = GenerationContextCreate(CurrentMemoryContext,
277 : : "tuplestore tuples",
278 : : ALLOCSET_DEFAULT_SIZES);
279 : 14525 : state->resowner = CurrentResourceOwner;
280 : :
281 : 14525 : state->memtupdeleted = 0;
282 : 14525 : state->memtupcount = 0;
283 : 14525 : state->tuples = 0;
284 : :
285 : : /*
286 : : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
287 : : * see comments in grow_memtuples().
288 : : */
289 : 14525 : state->memtupsize = Max(16384 / sizeof(void *),
290 : : ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
291 : :
292 : 14525 : state->growmemtuples = true;
293 : 14525 : state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
294 : :
295 : 14525 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
296 : :
297 : 14525 : state->activeptr = 0;
298 : 14525 : state->readptrcount = 1;
299 : 14525 : state->readptrsize = 8; /* arbitrary */
300 : 14525 : state->readptrs = (TSReadPointer *)
301 : 14525 : palloc(state->readptrsize * sizeof(TSReadPointer));
302 : :
303 : 14525 : state->readptrs[0].eflags = eflags;
304 : 14525 : state->readptrs[0].eof_reached = false;
305 : 14525 : state->readptrs[0].current = 0;
306 : :
307 : 29050 : return state;
308 : 14525 : }
309 : :
310 : : /*
311 : : * tuplestore_begin_heap
312 : : *
313 : : * Create a new tuplestore; other types of tuple stores (other than
314 : : * "heap" tuple stores, for heap tuples) are possible, but not presently
315 : : * implemented.
316 : : *
317 : : * randomAccess: if true, both forward and backward accesses to the
318 : : * tuple store are allowed.
319 : : *
320 : : * interXact: if true, the files used for on-disk storage persist beyond the
321 : : * end of the current transaction. NOTE: It's the caller's responsibility to
322 : : * create such a tuplestore in a memory context and resource owner that will
323 : : * also survive transaction boundaries, and to ensure the tuplestore is closed
324 : : * when it's no longer wanted.
325 : : *
326 : : * maxKBytes: how much data to store in memory (any data beyond this
327 : : * amount is paged to disk). When in doubt, use work_mem.
328 : : */
329 : : Tuplestorestate *
330 : 14525 : tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
331 : : {
332 : 14525 : Tuplestorestate *state;
333 : 14525 : int eflags;
334 : :
335 : : /*
336 : : * This interpretation of the meaning of randomAccess is compatible with
337 : : * the pre-8.3 behavior of tuplestores.
338 : : */
339 : 14525 : eflags = randomAccess ?
340 : : (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
341 : : (EXEC_FLAG_REWIND);
342 : :
343 : 14525 : state = tuplestore_begin_common(eflags, interXact, maxKBytes);
344 : :
345 : 14525 : state->copytup = copytup_heap;
346 : 14525 : state->writetup = writetup_heap;
347 : 14525 : state->readtup = readtup_heap;
348 : :
349 : 29050 : return state;
350 : 14525 : }
351 : :
352 : : /*
353 : : * tuplestore_set_eflags
354 : : *
355 : : * Set the capability flags for read pointer 0 at a finer grain than is
356 : : * allowed by tuplestore_begin_xxx. This must be called before inserting
357 : : * any data into the tuplestore.
358 : : *
359 : : * eflags is a bitmask following the meanings used for executor node
360 : : * startup flags (see executor.h). tuplestore pays attention to these bits:
361 : : * EXEC_FLAG_REWIND need rewind to start
362 : : * EXEC_FLAG_BACKWARD need backward fetch
363 : : * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
364 : : * is set per "randomAccess" in the tuplestore_begin_xxx call.
365 : : *
366 : : * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
367 : : * but not further than the truncation point (the furthest-back read pointer
368 : : * position at the time of the last tuplestore_trim call).
369 : : */
370 : : void
371 : 883 : tuplestore_set_eflags(Tuplestorestate *state, int eflags)
372 : : {
373 : 883 : int i;
374 : :
375 [ + - ]: 883 : if (state->status != TSS_INMEM || state->memtupcount != 0)
376 [ # # # # ]: 0 : elog(ERROR, "too late to call tuplestore_set_eflags");
377 : :
378 : 883 : state->readptrs[0].eflags = eflags;
379 [ - + ]: 883 : for (i = 1; i < state->readptrcount; i++)
380 : 0 : eflags |= state->readptrs[i].eflags;
381 : 883 : state->eflags = eflags;
382 : 883 : }
383 : :
384 : : /*
385 : : * tuplestore_alloc_read_pointer - allocate another read pointer.
386 : : *
387 : : * Returns the pointer's index.
388 : : *
389 : : * The new pointer initially copies the position of read pointer 0.
390 : : * It can have its own eflags, but if any data has been inserted into
391 : : * the tuplestore, these eflags must not represent an increase in
392 : : * requirements.
393 : : */
394 : : int
395 : 1413 : tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
396 : : {
397 : : /* Check for possible increase of requirements */
398 [ + - + + ]: 1413 : if (state->status != TSS_INMEM || state->memtupcount != 0)
399 : : {
400 [ + - ]: 104 : if ((state->eflags | eflags) != state->eflags)
401 [ # # # # ]: 0 : elog(ERROR, "too late to require new tuplestore eflags");
402 : 104 : }
403 : :
404 : : /* Make room for another read pointer if needed */
405 [ + + ]: 1413 : if (state->readptrcount >= state->readptrsize)
406 : : {
407 : 5 : int newcnt = state->readptrsize * 2;
408 : :
409 : 5 : state->readptrs = (TSReadPointer *)
410 : 5 : repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
411 : 5 : state->readptrsize = newcnt;
412 : 5 : }
413 : :
414 : : /* And set it up */
415 : 1413 : state->readptrs[state->readptrcount] = state->readptrs[0];
416 : 1413 : state->readptrs[state->readptrcount].eflags = eflags;
417 : :
418 : 1413 : state->eflags |= eflags;
419 : :
420 : 1413 : return state->readptrcount++;
421 : : }
422 : :
423 : : /*
424 : : * tuplestore_clear
425 : : *
426 : : * Delete all the contents of a tuplestore, and reset its read pointers
427 : : * to the start.
428 : : */
429 : : void
430 : 1565 : tuplestore_clear(Tuplestorestate *state)
431 : : {
432 : 1565 : int i;
433 : 1565 : TSReadPointer *readptr;
434 : :
435 : : /* update the maxSpace before doing any USEMEM/FREEMEM adjustments */
436 : 1565 : tuplestore_updatemax(state);
437 : :
438 [ + + ]: 1565 : if (state->myfile)
439 : 2 : BufFileClose(state->myfile);
440 : 1565 : state->myfile = NULL;
441 : :
442 : : #ifdef USE_ASSERT_CHECKING
443 : : {
444 : 1565 : int64 availMem = state->availMem;
445 : :
446 : : /*
447 : : * Below, we reset the memory context for storing tuples. To save
448 : : * from having to always call GetMemoryChunkSpace() on all stored
449 : : * tuples, we adjust the availMem to forget all the tuples and just
450 : : * recall USEMEM for the space used by the memtuples array. Here we
451 : : * just Assert that's correct and the memory tracking hasn't gone
452 : : * wrong anywhere.
453 : : */
454 [ + + ]: 4046 : for (i = state->memtupdeleted; i < state->memtupcount; i++)
455 : 2481 : availMem += GetMemoryChunkSpace(state->memtuples[i]);
456 : :
457 : 1565 : availMem += GetMemoryChunkSpace(state->memtuples);
458 : :
459 [ + - ]: 1565 : Assert(availMem == state->allowedMem);
460 : 1565 : }
461 : : #endif
462 : :
463 : : /* clear the memory consumed by the memory tuples */
464 : 1565 : MemoryContextReset(state->context);
465 : :
466 : : /*
467 : : * Zero the used memory and re-consume the space for the memtuples array.
468 : : * This saves having to FREEMEM for each stored tuple.
469 : : */
470 : 1565 : state->availMem = state->allowedMem;
471 : 1565 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
472 : :
473 : 1565 : state->status = TSS_INMEM;
474 : 1565 : state->truncated = false;
475 : 1565 : state->memtupdeleted = 0;
476 : 1565 : state->memtupcount = 0;
477 : 1565 : state->tuples = 0;
478 : 1565 : readptr = state->readptrs;
479 [ + + ]: 4864 : for (i = 0; i < state->readptrcount; readptr++, i++)
480 : : {
481 : 3299 : readptr->eof_reached = false;
482 : 3299 : readptr->current = 0;
483 : 3299 : }
484 : 1565 : }
485 : :
486 : : /*
487 : : * tuplestore_end
488 : : *
489 : : * Release resources and clean up.
490 : : */
491 : : void
492 : 14403 : tuplestore_end(Tuplestorestate *state)
493 : : {
494 [ + + ]: 14403 : if (state->myfile)
495 : 39 : BufFileClose(state->myfile);
496 : :
497 : 14403 : MemoryContextDelete(state->context);
498 : 14403 : pfree(state->memtuples);
499 : 14403 : pfree(state->readptrs);
500 : 14403 : pfree(state);
501 : 14403 : }
502 : :
503 : : /*
504 : : * tuplestore_select_read_pointer - make the specified read pointer active
505 : : */
506 : : void
507 : 657689 : tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
508 : : {
509 : 657689 : TSReadPointer *readptr;
510 : 657689 : TSReadPointer *oldptr;
511 : :
512 [ + - ]: 657689 : Assert(ptr >= 0 && ptr < state->readptrcount);
513 : :
514 : : /* No work if already active */
515 [ + + ]: 657689 : if (ptr == state->activeptr)
516 : 118685 : return;
517 : :
518 : 539004 : readptr = &state->readptrs[ptr];
519 : 539004 : oldptr = &state->readptrs[state->activeptr];
520 : :
521 [ + + - ]: 539004 : switch (state->status)
522 : : {
523 : : case TSS_INMEM:
524 : : case TSS_WRITEFILE:
525 : : /* no work */
526 : 539002 : break;
527 : : case TSS_READFILE:
528 : :
529 : : /*
530 : : * First, save the current read position in the pointer about to
531 : : * become inactive.
532 : : */
533 [ - + ]: 2 : if (!oldptr->eof_reached)
534 : 4 : BufFileTell(state->myfile,
535 : 2 : &oldptr->file,
536 : 2 : &oldptr->offset);
537 : :
538 : : /*
539 : : * We have to make the temp file's seek position equal to the
540 : : * logical position of the new read pointer. In eof_reached
541 : : * state, that's the EOF, which we have available from the saved
542 : : * write position.
543 : : */
544 [ - + ]: 2 : if (readptr->eof_reached)
545 : : {
546 : 0 : if (BufFileSeek(state->myfile,
547 : 0 : state->writepos_file,
548 : 0 : state->writepos_offset,
549 [ # # ]: 0 : SEEK_SET) != 0)
550 [ # # # # ]: 0 : ereport(ERROR,
551 : : (errcode_for_file_access(),
552 : : errmsg("could not seek in tuplestore temporary file")));
553 : 0 : }
554 : : else
555 : : {
556 : 4 : if (BufFileSeek(state->myfile,
557 : 2 : readptr->file,
558 : 2 : readptr->offset,
559 [ + - ]: 2 : SEEK_SET) != 0)
560 [ # # # # ]: 0 : ereport(ERROR,
561 : : (errcode_for_file_access(),
562 : : errmsg("could not seek in tuplestore temporary file")));
563 : : }
564 : 2 : break;
565 : : default:
566 [ # # # # ]: 0 : elog(ERROR, "invalid tuplestore state");
567 : 0 : break;
568 : : }
569 : :
570 : 539004 : state->activeptr = ptr;
571 [ - + ]: 657689 : }
572 : :
573 : : /*
574 : : * tuplestore_tuple_count
575 : : *
576 : : * Returns the number of tuples added since creation or the last
577 : : * tuplestore_clear().
578 : : */
579 : : int64
580 : 1012 : tuplestore_tuple_count(Tuplestorestate *state)
581 : : {
582 : 1012 : return state->tuples;
583 : : }
584 : :
585 : : /*
586 : : * tuplestore_ateof
587 : : *
588 : : * Returns the active read pointer's eof_reached state.
589 : : */
590 : : bool
591 : 303485 : tuplestore_ateof(Tuplestorestate *state)
592 : : {
593 : 303485 : return state->readptrs[state->activeptr].eof_reached;
594 : : }
595 : :
596 : : /*
597 : : * Grow the memtuples[] array, if possible within our memory constraint. We
598 : : * must not exceed INT_MAX tuples in memory or the caller-provided memory
599 : : * limit. Return true if we were able to enlarge the array, false if not.
600 : : *
601 : : * Normally, at each increment we double the size of the array. When doing
602 : : * that would exceed a limit, we attempt one last, smaller increase (and then
603 : : * clear the growmemtuples flag so we don't try any more). That allows us to
604 : : * use memory as fully as permitted; sticking to the pure doubling rule could
605 : : * result in almost half going unused. Because availMem moves around with
606 : : * tuple addition/removal, we need some rule to prevent making repeated small
607 : : * increases in memtupsize, which would just be useless thrashing. The
608 : : * growmemtuples flag accomplishes that and also prevents useless
609 : : * recalculations in this function.
610 : : */
611 : : static bool
612 : 301 : grow_memtuples(Tuplestorestate *state)
613 : : {
614 : 301 : int newmemtupsize;
615 : 301 : int memtupsize = state->memtupsize;
616 : 301 : int64 memNowUsed = state->allowedMem - state->availMem;
617 : :
618 : : /* Forget it if we've already maxed out memtuples, per comment above */
619 [ + + ]: 301 : if (!state->growmemtuples)
620 : 27 : return false;
621 : :
622 : : /* Select new value of memtupsize */
623 [ + + ]: 274 : if (memNowUsed <= state->availMem)
624 : : {
625 : : /*
626 : : * We've used no more than half of allowedMem; double our usage,
627 : : * clamping at INT_MAX tuples.
628 : : */
629 [ + - ]: 243 : if (memtupsize < INT_MAX / 2)
630 : 243 : newmemtupsize = memtupsize * 2;
631 : : else
632 : : {
633 : 0 : newmemtupsize = INT_MAX;
634 : 0 : state->growmemtuples = false;
635 : : }
636 : 243 : }
637 : : else
638 : : {
639 : : /*
640 : : * This will be the last increment of memtupsize. Abandon doubling
641 : : * strategy and instead increase as much as we safely can.
642 : : *
643 : : * To stay within allowedMem, we can't increase memtupsize by more
644 : : * than availMem / sizeof(void *) elements. In practice, we want to
645 : : * increase it by considerably less, because we need to leave some
646 : : * space for the tuples to which the new array slots will refer. We
647 : : * assume the new tuples will be about the same size as the tuples
648 : : * we've already seen, and thus we can extrapolate from the space
649 : : * consumption so far to estimate an appropriate new size for the
650 : : * memtuples array. The optimal value might be higher or lower than
651 : : * this estimate, but it's hard to know that in advance. We again
652 : : * clamp at INT_MAX tuples.
653 : : *
654 : : * This calculation is safe against enlarging the array so much that
655 : : * LACKMEM becomes true, because the memory currently used includes
656 : : * the present array; thus, there would be enough allowedMem for the
657 : : * new array elements even if no other memory were currently used.
658 : : *
659 : : * We do the arithmetic in float8, because otherwise the product of
660 : : * memtupsize and allowedMem could overflow. Any inaccuracy in the
661 : : * result should be insignificant; but even if we computed a
662 : : * completely insane result, the checks below will prevent anything
663 : : * really bad from happening.
664 : : */
665 : 31 : double grow_ratio;
666 : :
667 : 31 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
668 [ + - ]: 31 : if (memtupsize * grow_ratio < INT_MAX)
669 : 31 : newmemtupsize = (int) (memtupsize * grow_ratio);
670 : : else
671 : 0 : newmemtupsize = INT_MAX;
672 : :
673 : : /* We won't make any further enlargement attempts */
674 : 31 : state->growmemtuples = false;
675 : 31 : }
676 : :
677 : : /* Must enlarge array by at least one element, else report failure */
678 [ - + ]: 274 : if (newmemtupsize <= memtupsize)
679 : 0 : goto noalloc;
680 : :
681 : : /*
682 : : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
683 : : * to ensure our request won't be rejected. Note that we can easily
684 : : * exhaust address space before facing this outcome. (This is presently
685 : : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
686 : : * don't rely on that at this distance.)
687 : : */
688 [ + - ]: 274 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
689 : : {
690 : 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
691 : 0 : state->growmemtuples = false; /* can't grow any more */
692 : 0 : }
693 : :
694 : : /*
695 : : * We need to be sure that we do not cause LACKMEM to become true, else
696 : : * the space management algorithm will go nuts. The code above should
697 : : * never generate a dangerous request, but to be safe, check explicitly
698 : : * that the array growth fits within availMem. (We could still cause
699 : : * LACKMEM if the memory chunk overhead associated with the memtuples
700 : : * array were to increase. That shouldn't happen because we chose the
701 : : * initial array size large enough to ensure that palloc will be treating
702 : : * both old and new arrays as separate chunks. But we'll check LACKMEM
703 : : * explicitly below just in case.)
704 : : */
705 [ - + ]: 274 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
706 : 0 : goto noalloc;
707 : :
708 : : /* OK, do it */
709 : 274 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
710 : 274 : state->memtupsize = newmemtupsize;
711 : 274 : state->memtuples = (void **)
712 : 548 : repalloc_huge(state->memtuples,
713 : 274 : state->memtupsize * sizeof(void *));
714 : 274 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
715 [ + - ]: 274 : if (LACKMEM(state))
716 [ # # # # ]: 0 : elog(ERROR, "unexpected out-of-memory situation in tuplestore");
717 : 274 : return true;
718 : :
719 : : noalloc:
720 : : /* If for any reason we didn't realloc, shut off future attempts */
721 : 0 : state->growmemtuples = false;
722 : 0 : return false;
723 : 301 : }
724 : :
725 : : /*
726 : : * Accept one tuple and append it to the tuplestore.
727 : : *
728 : : * Note that the input tuple is always copied; the caller need not save it.
729 : : *
730 : : * If the active read pointer is currently "at EOF", it remains so (the read
731 : : * pointer implicitly advances along with the write pointer); otherwise the
732 : : * read pointer is unchanged. Non-active read pointers do not move, which
733 : : * means they are certain to not be "at EOF" immediately after puttuple.
734 : : * This curious-seeming behavior is for the convenience of nodeMaterial.c and
735 : : * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
736 : : * steps.
737 : : *
738 : : * tuplestore_puttupleslot() is a convenience routine to collect data from
739 : : * a TupleTableSlot without an extra copy operation.
740 : : */
741 : : void
742 : 276098 : tuplestore_puttupleslot(Tuplestorestate *state,
743 : : TupleTableSlot *slot)
744 : : {
745 : 276098 : MinimalTuple tuple;
746 : 276098 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
747 : :
748 : : /*
749 : : * Form a MinimalTuple in working memory
750 : : */
751 : 276098 : tuple = ExecCopySlotMinimalTuple(slot);
752 : 276098 : USEMEM(state, GetMemoryChunkSpace(tuple));
753 : :
754 : 276098 : tuplestore_puttuple_common(state, tuple);
755 : :
756 : 276098 : MemoryContextSwitchTo(oldcxt);
757 : 276098 : }
758 : :
759 : : /*
760 : : * "Standard" case to copy from a HeapTuple. This is actually now somewhat
761 : : * deprecated, but not worth getting rid of in view of the number of callers.
762 : : */
763 : : void
764 : 9590 : tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
765 : : {
766 : 9590 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
767 : :
768 : : /*
769 : : * Copy the tuple. (Must do this even in WRITEFILE case. Note that
770 : : * COPYTUP includes USEMEM, so we needn't do that here.)
771 : : */
772 : 9590 : tuple = COPYTUP(state, tuple);
773 : :
774 : 9590 : tuplestore_puttuple_common(state, tuple);
775 : :
776 : 9590 : MemoryContextSwitchTo(oldcxt);
777 : 9590 : }
778 : :
779 : : /*
780 : : * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
781 : : * This avoids an extra tuple-construction operation.
782 : : */
783 : : void
784 : 2095415 : tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
785 : : const Datum *values, const bool *isnull)
786 : : {
787 : 2095415 : MinimalTuple tuple;
788 : 2095415 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
789 : :
790 : 2095415 : tuple = heap_form_minimal_tuple(tdesc, values, isnull, 0);
791 : 2095415 : USEMEM(state, GetMemoryChunkSpace(tuple));
792 : :
793 : 2095415 : tuplestore_puttuple_common(state, tuple);
794 : :
795 : 2095415 : MemoryContextSwitchTo(oldcxt);
796 : 2095415 : }
797 : :
798 : : static void
799 : 2381103 : tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
800 : : {
801 : 2381103 : TSReadPointer *readptr;
802 : 2381103 : int i;
803 : 2381103 : ResourceOwner oldowner;
804 : 2381103 : MemoryContext oldcxt;
805 : :
806 : 2381103 : state->tuples++;
807 : :
808 [ + + + - ]: 2381103 : switch (state->status)
809 : : {
810 : : case TSS_INMEM:
811 : :
812 : : /*
813 : : * Update read pointers as needed; see API spec above.
814 : : */
815 : 1891106 : readptr = state->readptrs;
816 [ + + ]: 4175160 : for (i = 0; i < state->readptrcount; readptr++, i++)
817 : : {
818 [ + + + + ]: 2284054 : if (readptr->eof_reached && i != state->activeptr)
819 : : {
820 : 58 : readptr->eof_reached = false;
821 : 58 : readptr->current = state->memtupcount;
822 : 58 : }
823 : 2284054 : }
824 : :
825 : : /*
826 : : * Grow the array as needed. Note that we try to grow the array
827 : : * when there is still one free slot remaining --- if we fail,
828 : : * there'll still be room to store the incoming tuple, and then
829 : : * we'll switch to tape-based operation.
830 : : */
831 [ + + ]: 1891106 : if (state->memtupcount >= state->memtupsize - 1)
832 : : {
833 : 301 : (void) grow_memtuples(state);
834 [ + - ]: 301 : Assert(state->memtupcount < state->memtupsize);
835 : 301 : }
836 : :
837 : : /* Stash the tuple in the in-memory array */
838 : 1891106 : state->memtuples[state->memtupcount++] = tuple;
839 : :
840 : : /*
841 : : * Done if we still fit in available memory and have array slots.
842 : : */
843 [ + + + + ]: 1891106 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
844 : 1891065 : return;
845 : :
846 : : /*
847 : : * Nope; time to switch to tape-based operation. Make sure that
848 : : * the temp file(s) are created in suitable temp tablespaces.
849 : : */
850 : 41 : PrepareTempTablespaces();
851 : :
852 : : /* associate the file with the store's resource owner */
853 : 41 : oldowner = CurrentResourceOwner;
854 : 41 : CurrentResourceOwner = state->resowner;
855 : :
856 : : /*
857 : : * We switch out of the state->context as this is a generation
858 : : * context, which isn't ideal for allocations relating to the
859 : : * BufFile.
860 : : */
861 : 41 : oldcxt = MemoryContextSwitchTo(state->context->parent);
862 : :
863 : 41 : state->myfile = BufFileCreateTemp(state->interXact);
864 : :
865 : 41 : MemoryContextSwitchTo(oldcxt);
866 : :
867 : 41 : CurrentResourceOwner = oldowner;
868 : :
869 : : /*
870 : : * Freeze the decision about whether trailing length words will be
871 : : * used. We can't change this choice once data is on tape, even
872 : : * though callers might drop the requirement.
873 : : */
874 : 41 : state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
875 : :
876 : : /*
877 : : * Update the maximum space used before dumping the tuples. It's
878 : : * possible that more space will be used by the tuples in memory
879 : : * than the space that will be used on disk.
880 : : */
881 : 41 : tuplestore_updatemax(state);
882 : :
883 : 41 : state->status = TSS_WRITEFILE;
884 : 41 : dumptuples(state);
885 : 41 : break;
886 : : case TSS_WRITEFILE:
887 : :
888 : : /*
889 : : * Update read pointers as needed; see API spec above. Note:
890 : : * BufFileTell is quite cheap, so not worth trying to avoid
891 : : * multiple calls.
892 : : */
893 : 489995 : readptr = state->readptrs;
894 [ + + ]: 982733 : for (i = 0; i < state->readptrcount; readptr++, i++)
895 : : {
896 [ - + # # ]: 492738 : if (readptr->eof_reached && i != state->activeptr)
897 : : {
898 : 0 : readptr->eof_reached = false;
899 : 0 : BufFileTell(state->myfile,
900 : 0 : &readptr->file,
901 : 0 : &readptr->offset);
902 : 0 : }
903 : 492738 : }
904 : :
905 : 489995 : WRITETUP(state, tuple);
906 : 489995 : break;
907 : : case TSS_READFILE:
908 : :
909 : : /*
910 : : * Switch from reading to writing.
911 : : */
912 [ - + ]: 2 : if (!state->readptrs[state->activeptr].eof_reached)
913 : 4 : BufFileTell(state->myfile,
914 : 2 : &state->readptrs[state->activeptr].file,
915 : 2 : &state->readptrs[state->activeptr].offset);
916 : 4 : if (BufFileSeek(state->myfile,
917 : 2 : state->writepos_file, state->writepos_offset,
918 [ + - ]: 2 : SEEK_SET) != 0)
919 [ # # # # ]: 0 : ereport(ERROR,
920 : : (errcode_for_file_access(),
921 : : errmsg("could not seek in tuplestore temporary file")));
922 : 2 : state->status = TSS_WRITEFILE;
923 : :
924 : : /*
925 : : * Update read pointers as needed; see API spec above.
926 : : */
927 : 2 : readptr = state->readptrs;
928 [ + + ]: 6 : for (i = 0; i < state->readptrcount; readptr++, i++)
929 : : {
930 [ - + # # ]: 4 : if (readptr->eof_reached && i != state->activeptr)
931 : : {
932 : 0 : readptr->eof_reached = false;
933 : 0 : readptr->file = state->writepos_file;
934 : 0 : readptr->offset = state->writepos_offset;
935 : 0 : }
936 : 4 : }
937 : :
938 : 2 : WRITETUP(state, tuple);
939 : 2 : break;
940 : : default:
941 [ # # # # ]: 0 : elog(ERROR, "invalid tuplestore state");
942 : 0 : break;
943 : : }
944 [ - + ]: 2381103 : }
945 : :
946 : : /*
947 : : * Fetch the next tuple in either forward or back direction.
948 : : * Returns NULL if no more tuples. If should_free is set, the
949 : : * caller must pfree the returned tuple when done with it.
950 : : *
951 : : * Backward scan is only allowed if randomAccess was set true or
952 : : * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
953 : : */
954 : : static void *
955 : 2820007 : tuplestore_gettuple(Tuplestorestate *state, bool forward,
956 : : bool *should_free)
957 : : {
958 : 2820007 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
959 : 2820007 : unsigned int tuplen;
960 : 2820007 : void *tup;
961 : :
962 [ + + + - ]: 2820007 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
963 : :
964 [ + + + - ]: 2820007 : switch (state->status)
965 : : {
966 : : case TSS_INMEM:
967 : 1564757 : *should_free = false;
968 [ + + ]: 1564757 : if (forward)
969 : : {
970 [ + + ]: 1535366 : if (readptr->eof_reached)
971 : 30 : return NULL;
972 [ + + ]: 1535336 : if (readptr->current < state->memtupcount)
973 : : {
974 : : /* We have another tuple, so return it */
975 : 1509233 : return state->memtuples[readptr->current++];
976 : : }
977 : 26103 : readptr->eof_reached = true;
978 : 26103 : return NULL;
979 : : }
980 : : else
981 : : {
982 : : /*
983 : : * if all tuples are fetched already then we return last
984 : : * tuple, else tuple before last returned.
985 : : */
986 [ + + ]: 29391 : if (readptr->eof_reached)
987 : : {
988 : 474 : readptr->current = state->memtupcount;
989 : 474 : readptr->eof_reached = false;
990 : 474 : }
991 : : else
992 : : {
993 [ - + ]: 28917 : if (readptr->current <= state->memtupdeleted)
994 : : {
995 [ # # ]: 0 : Assert(!state->truncated);
996 : 0 : return NULL;
997 : : }
998 : 28917 : readptr->current--; /* last returned tuple */
999 : : }
1000 [ + + ]: 29391 : if (readptr->current <= state->memtupdeleted)
1001 : : {
1002 [ + - ]: 5 : Assert(!state->truncated);
1003 : 5 : return NULL;
1004 : : }
1005 : 29386 : return state->memtuples[readptr->current - 1];
1006 : : }
1007 : : break;
1008 : :
1009 : : case TSS_WRITEFILE:
1010 : : /* Skip state change if we'll just return NULL */
1011 [ - + # # ]: 43 : if (readptr->eof_reached && forward)
1012 : 0 : return NULL;
1013 : :
1014 : : /*
1015 : : * Switch from writing to reading.
1016 : : */
1017 : 86 : BufFileTell(state->myfile,
1018 : 43 : &state->writepos_file, &state->writepos_offset);
1019 [ - + ]: 43 : if (!readptr->eof_reached)
1020 : 86 : if (BufFileSeek(state->myfile,
1021 : 43 : readptr->file, readptr->offset,
1022 [ + - ]: 43 : SEEK_SET) != 0)
1023 [ # # # # ]: 0 : ereport(ERROR,
1024 : : (errcode_for_file_access(),
1025 : : errmsg("could not seek in tuplestore temporary file")));
1026 : 43 : state->status = TSS_READFILE;
1027 : : /* FALLTHROUGH */
1028 : :
1029 : : case TSS_READFILE:
1030 : 1255250 : *should_free = true;
1031 [ + - ]: 1255250 : if (forward)
1032 : : {
1033 [ + + ]: 1255250 : if ((tuplen = getlen(state, true)) != 0)
1034 : : {
1035 : 1255212 : tup = READTUP(state, tuplen);
1036 : 1255212 : return tup;
1037 : : }
1038 : : else
1039 : : {
1040 : 38 : readptr->eof_reached = true;
1041 : 38 : return NULL;
1042 : : }
1043 : : }
1044 : :
1045 : : /*
1046 : : * Backward.
1047 : : *
1048 : : * if all tuples are fetched already then we return last tuple,
1049 : : * else tuple before last returned.
1050 : : *
1051 : : * Back up to fetch previously-returned tuple's ending length
1052 : : * word. If seek fails, assume we are at start of file.
1053 : : */
1054 : 0 : if (BufFileSeek(state->myfile, 0, -(pgoff_t) sizeof(unsigned int),
1055 [ # # ]: 0 : SEEK_CUR) != 0)
1056 : : {
1057 : : /* even a failed backwards fetch gets you out of eof state */
1058 : 0 : readptr->eof_reached = false;
1059 [ # # ]: 0 : Assert(!state->truncated);
1060 : 0 : return NULL;
1061 : : }
1062 : 0 : tuplen = getlen(state, false);
1063 : :
1064 [ # # ]: 0 : if (readptr->eof_reached)
1065 : : {
1066 : 0 : readptr->eof_reached = false;
1067 : : /* We will return the tuple returned before returning NULL */
1068 : 0 : }
1069 : : else
1070 : : {
1071 : : /*
1072 : : * Back up to get ending length word of tuple before it.
1073 : : */
1074 : 0 : if (BufFileSeek(state->myfile, 0,
1075 : 0 : -(pgoff_t) (tuplen + 2 * sizeof(unsigned int)),
1076 [ # # ]: 0 : SEEK_CUR) != 0)
1077 : : {
1078 : : /*
1079 : : * If that fails, presumably the prev tuple is the first
1080 : : * in the file. Back up so that it becomes next to read
1081 : : * in forward direction (not obviously right, but that is
1082 : : * what in-memory case does).
1083 : : */
1084 : 0 : if (BufFileSeek(state->myfile, 0,
1085 : 0 : -(pgoff_t) (tuplen + sizeof(unsigned int)),
1086 [ # # ]: 0 : SEEK_CUR) != 0)
1087 [ # # # # ]: 0 : ereport(ERROR,
1088 : : (errcode_for_file_access(),
1089 : : errmsg("could not seek in tuplestore temporary file")));
1090 [ # # ]: 0 : Assert(!state->truncated);
1091 : 0 : return NULL;
1092 : : }
1093 : 0 : tuplen = getlen(state, false);
1094 : : }
1095 : :
1096 : : /*
1097 : : * Now we have the length of the prior tuple, back up and read it.
1098 : : * Note: READTUP expects we are positioned after the initial
1099 : : * length word of the tuple, so back up to that point.
1100 : : */
1101 : 0 : if (BufFileSeek(state->myfile, 0,
1102 : 0 : -(pgoff_t) tuplen,
1103 [ # # ]: 0 : SEEK_CUR) != 0)
1104 [ # # # # ]: 0 : ereport(ERROR,
1105 : : (errcode_for_file_access(),
1106 : : errmsg("could not seek in tuplestore temporary file")));
1107 : 0 : tup = READTUP(state, tuplen);
1108 : 0 : return tup;
1109 : :
1110 : : default:
1111 [ # # # # ]: 0 : elog(ERROR, "invalid tuplestore state");
1112 : 0 : return NULL; /* keep compiler quiet */
1113 : : }
1114 : 2820007 : }
1115 : :
1116 : : /*
1117 : : * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1118 : : *
1119 : : * If successful, put tuple in slot and return true; else, clear the slot
1120 : : * and return false.
1121 : : *
1122 : : * If copy is true, the slot receives a copied tuple (allocated in current
1123 : : * memory context) that will stay valid regardless of future manipulations of
1124 : : * the tuplestore's state. If copy is false, the slot may just receive a
1125 : : * pointer to a tuple held within the tuplestore. The latter is more
1126 : : * efficient but the slot contents may be corrupted if additional writes to
1127 : : * the tuplestore occur. (If using tuplestore_trim, see comments therein.)
1128 : : */
1129 : : bool
1130 : 2791155 : tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1131 : : bool copy, TupleTableSlot *slot)
1132 : : {
1133 : 2791155 : MinimalTuple tuple;
1134 : 2791155 : bool should_free;
1135 : :
1136 : 2791155 : tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1137 : :
1138 [ + + ]: 2791155 : if (tuple)
1139 : : {
1140 [ + + + + ]: 2765444 : if (copy && !should_free)
1141 : : {
1142 : 306385 : tuple = heap_copy_minimal_tuple(tuple, 0);
1143 : 306385 : should_free = true;
1144 : 306385 : }
1145 : 2765444 : ExecStoreMinimalTuple(tuple, slot, should_free);
1146 : 2765444 : return true;
1147 : : }
1148 : : else
1149 : : {
1150 : 25711 : ExecClearTuple(slot);
1151 : 25711 : return false;
1152 : : }
1153 : 2791155 : }
1154 : :
1155 : : /*
1156 : : * tuplestore_advance - exported function to adjust position without fetching
1157 : : *
1158 : : * We could optimize this case to avoid palloc/pfree overhead, but for the
1159 : : * moment it doesn't seem worthwhile.
1160 : : */
1161 : : bool
1162 : 28852 : tuplestore_advance(Tuplestorestate *state, bool forward)
1163 : : {
1164 : 28852 : void *tuple;
1165 : 28852 : bool should_free;
1166 : :
1167 : 28852 : tuple = tuplestore_gettuple(state, forward, &should_free);
1168 : :
1169 [ + + ]: 28852 : if (tuple)
1170 : : {
1171 [ + - ]: 28387 : if (should_free)
1172 : 0 : pfree(tuple);
1173 : 28387 : return true;
1174 : : }
1175 : : else
1176 : : {
1177 : 465 : return false;
1178 : : }
1179 : 28852 : }
1180 : :
1181 : : /*
1182 : : * Advance over N tuples in either forward or back direction,
1183 : : * without returning any data. N<=0 is a no-op.
1184 : : * Returns true if successful, false if ran out of tuples.
1185 : : */
1186 : : bool
1187 : 222773 : tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1188 : : {
1189 : 222773 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1190 : :
1191 [ + + + - ]: 222773 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1192 : :
1193 [ + + ]: 222773 : if (ntuples <= 0)
1194 : 3 : return true;
1195 : :
1196 [ - + ]: 222770 : switch (state->status)
1197 : : {
1198 : : case TSS_INMEM:
1199 [ + + ]: 222770 : if (forward)
1200 : : {
1201 [ - + ]: 222305 : if (readptr->eof_reached)
1202 : 0 : return false;
1203 [ + + ]: 222305 : if (state->memtupcount - readptr->current >= ntuples)
1204 : : {
1205 : 222290 : readptr->current += ntuples;
1206 : 222290 : return true;
1207 : : }
1208 : 15 : readptr->current = state->memtupcount;
1209 : 15 : readptr->eof_reached = true;
1210 : 15 : return false;
1211 : : }
1212 : : else
1213 : : {
1214 [ + - ]: 465 : if (readptr->eof_reached)
1215 : : {
1216 : 0 : readptr->current = state->memtupcount;
1217 : 0 : readptr->eof_reached = false;
1218 : 0 : ntuples--;
1219 : 0 : }
1220 [ + - ]: 465 : if (readptr->current - state->memtupdeleted > ntuples)
1221 : : {
1222 : 465 : readptr->current -= ntuples;
1223 : 465 : return true;
1224 : : }
1225 [ # # ]: 0 : Assert(!state->truncated);
1226 : 0 : readptr->current = state->memtupdeleted;
1227 : 0 : return false;
1228 : : }
1229 : : break;
1230 : :
1231 : : default:
1232 : : /* We don't currently try hard to optimize other cases */
1233 [ # # ]: 0 : while (ntuples-- > 0)
1234 : : {
1235 : 0 : void *tuple;
1236 : 0 : bool should_free;
1237 : :
1238 : 0 : tuple = tuplestore_gettuple(state, forward, &should_free);
1239 : :
1240 [ # # ]: 0 : if (tuple == NULL)
1241 : 0 : return false;
1242 [ # # ]: 0 : if (should_free)
1243 : 0 : pfree(tuple);
1244 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
1245 [ # # ]: 0 : }
1246 : 0 : return true;
1247 : : }
1248 : 222773 : }
1249 : :
1250 : : /*
1251 : : * dumptuples - remove tuples from memory and write to tape
1252 : : *
1253 : : * As a side effect, we must convert each read pointer's position from
1254 : : * "current" to file/offset format. But eof_reached pointers don't
1255 : : * need to change state.
1256 : : */
1257 : : static void
1258 : 41 : dumptuples(Tuplestorestate *state)
1259 : : {
1260 : 41 : int i;
1261 : :
1262 : 772510 : for (i = state->memtupdeleted;; i++)
1263 : : {
1264 : 772510 : TSReadPointer *readptr = state->readptrs;
1265 : 772510 : int j;
1266 : :
1267 [ + + ]: 1547275 : for (j = 0; j < state->readptrcount; readptr++, j++)
1268 : : {
1269 [ + + - + ]: 774765 : if (i == readptr->current && !readptr->eof_reached)
1270 : 86 : BufFileTell(state->myfile,
1271 : 43 : &readptr->file, &readptr->offset);
1272 : 774765 : }
1273 [ + + ]: 772510 : if (i >= state->memtupcount)
1274 : 41 : break;
1275 : 772469 : WRITETUP(state, state->memtuples[i]);
1276 [ - + + ]: 772510 : }
1277 : 41 : state->memtupdeleted = 0;
1278 : 41 : state->memtupcount = 0;
1279 : 41 : }
1280 : :
1281 : : /*
1282 : : * tuplestore_rescan - rewind the active read pointer to start
1283 : : */
1284 : : void
1285 : 18251 : tuplestore_rescan(Tuplestorestate *state)
1286 : : {
1287 : 18251 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1288 : :
1289 [ + - ]: 18251 : Assert(readptr->eflags & EXEC_FLAG_REWIND);
1290 [ + - ]: 18251 : Assert(!state->truncated);
1291 : :
1292 [ + + - - ]: 18251 : switch (state->status)
1293 : : {
1294 : : case TSS_INMEM:
1295 : 18212 : readptr->eof_reached = false;
1296 : 18212 : readptr->current = 0;
1297 : 18212 : break;
1298 : : case TSS_WRITEFILE:
1299 : 39 : readptr->eof_reached = false;
1300 : 39 : readptr->file = 0;
1301 : 39 : readptr->offset = 0;
1302 : 39 : break;
1303 : : case TSS_READFILE:
1304 : 0 : readptr->eof_reached = false;
1305 [ # # ]: 0 : if (BufFileSeek(state->myfile, 0, 0, SEEK_SET) != 0)
1306 [ # # # # ]: 0 : ereport(ERROR,
1307 : : (errcode_for_file_access(),
1308 : : errmsg("could not seek in tuplestore temporary file")));
1309 : 0 : break;
1310 : : default:
1311 [ # # # # ]: 0 : elog(ERROR, "invalid tuplestore state");
1312 : 0 : break;
1313 : : }
1314 : 18251 : }
1315 : :
1316 : : /*
1317 : : * tuplestore_copy_read_pointer - copy a read pointer's state to another
1318 : : */
1319 : : void
1320 : 10064 : tuplestore_copy_read_pointer(Tuplestorestate *state,
1321 : : int srcptr, int destptr)
1322 : : {
1323 : 10064 : TSReadPointer *sptr = &state->readptrs[srcptr];
1324 : 10064 : TSReadPointer *dptr = &state->readptrs[destptr];
1325 : :
1326 [ + - ]: 10064 : Assert(srcptr >= 0 && srcptr < state->readptrcount);
1327 [ + - ]: 10064 : Assert(destptr >= 0 && destptr < state->readptrcount);
1328 : :
1329 : : /* Assigning to self is a no-op */
1330 [ - + ]: 10064 : if (srcptr == destptr)
1331 : 0 : return;
1332 : :
1333 [ - + ]: 10064 : if (dptr->eflags != sptr->eflags)
1334 : : {
1335 : : /* Possible change of overall eflags, so copy and then recompute */
1336 : 0 : int eflags;
1337 : 0 : int i;
1338 : :
1339 : 0 : *dptr = *sptr;
1340 : 0 : eflags = state->readptrs[0].eflags;
1341 [ # # ]: 0 : for (i = 1; i < state->readptrcount; i++)
1342 : 0 : eflags |= state->readptrs[i].eflags;
1343 : 0 : state->eflags = eflags;
1344 : 0 : }
1345 : : else
1346 : 10064 : *dptr = *sptr;
1347 : :
1348 [ + - - ]: 10064 : switch (state->status)
1349 : : {
1350 : : case TSS_INMEM:
1351 : : case TSS_WRITEFILE:
1352 : : /* no work */
1353 : 10064 : break;
1354 : : case TSS_READFILE:
1355 : :
1356 : : /*
1357 : : * This case is a bit tricky since the active read pointer's
1358 : : * position corresponds to the seek point, not what is in its
1359 : : * variables. Assigning to the active requires a seek, and
1360 : : * assigning from the active requires a tell, except when
1361 : : * eof_reached.
1362 : : */
1363 [ # # ]: 0 : if (destptr == state->activeptr)
1364 : : {
1365 [ # # ]: 0 : if (dptr->eof_reached)
1366 : : {
1367 : 0 : if (BufFileSeek(state->myfile,
1368 : 0 : state->writepos_file,
1369 : 0 : state->writepos_offset,
1370 [ # # ]: 0 : SEEK_SET) != 0)
1371 [ # # # # ]: 0 : ereport(ERROR,
1372 : : (errcode_for_file_access(),
1373 : : errmsg("could not seek in tuplestore temporary file")));
1374 : 0 : }
1375 : : else
1376 : : {
1377 : 0 : if (BufFileSeek(state->myfile,
1378 : 0 : dptr->file, dptr->offset,
1379 [ # # ]: 0 : SEEK_SET) != 0)
1380 [ # # # # ]: 0 : ereport(ERROR,
1381 : : (errcode_for_file_access(),
1382 : : errmsg("could not seek in tuplestore temporary file")));
1383 : : }
1384 : 0 : }
1385 [ # # ]: 0 : else if (srcptr == state->activeptr)
1386 : : {
1387 [ # # ]: 0 : if (!dptr->eof_reached)
1388 : 0 : BufFileTell(state->myfile,
1389 : 0 : &dptr->file,
1390 : 0 : &dptr->offset);
1391 : 0 : }
1392 : 0 : break;
1393 : : default:
1394 [ # # # # ]: 0 : elog(ERROR, "invalid tuplestore state");
1395 : 0 : break;
1396 : : }
1397 [ - + ]: 10064 : }
1398 : :
1399 : : /*
1400 : : * tuplestore_trim - remove all no-longer-needed tuples
1401 : : *
1402 : : * Calling this function authorizes the tuplestore to delete all tuples
1403 : : * before the oldest read pointer, if no read pointer is marked as requiring
1404 : : * REWIND capability.
1405 : : *
1406 : : * Note: this is obviously safe if no pointer has BACKWARD capability either.
1407 : : * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1408 : : * the pointer can be moved backward but not before the oldest other read
1409 : : * pointer.
1410 : : */
1411 : : void
1412 : 151890 : tuplestore_trim(Tuplestorestate *state)
1413 : : {
1414 : 151890 : int oldest;
1415 : 151890 : int nremove;
1416 : 151890 : int i;
1417 : :
1418 : : /*
1419 : : * Truncation is disallowed if any read pointer requires rewind
1420 : : * capability.
1421 : : */
1422 [ - + ]: 151890 : if (state->eflags & EXEC_FLAG_REWIND)
1423 : 0 : return;
1424 : :
1425 : : /*
1426 : : * We don't bother trimming temp files since it usually would mean more
1427 : : * work than just letting them sit in kernel buffers until they age out.
1428 : : */
1429 [ + + ]: 151890 : if (state->status != TSS_INMEM)
1430 : 4998 : return;
1431 : :
1432 : : /* Find the oldest read pointer */
1433 : 146892 : oldest = state->memtupcount;
1434 [ + + ]: 639845 : for (i = 0; i < state->readptrcount; i++)
1435 : : {
1436 [ + + ]: 492953 : if (!state->readptrs[i].eof_reached)
1437 [ + + ]: 488118 : oldest = Min(oldest, state->readptrs[i].current);
1438 : 492953 : }
1439 : :
1440 : : /*
1441 : : * Note: you might think we could remove all the tuples before the oldest
1442 : : * "current", since that one is the next to be returned. However, since
1443 : : * tuplestore_gettuple returns a direct pointer to our internal copy of
1444 : : * the tuple, it's likely that the caller has still got the tuple just
1445 : : * before "current" referenced in a slot. So we keep one extra tuple
1446 : : * before the oldest "current". (Strictly speaking, we could require such
1447 : : * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1448 : : * efficiency we allow this one case to not use "copy".)
1449 : : */
1450 : 146892 : nremove = oldest - 1;
1451 [ + + ]: 146892 : if (nremove <= 0)
1452 : 1304 : return; /* nothing to do */
1453 : :
1454 [ + - ]: 145588 : Assert(nremove >= state->memtupdeleted);
1455 [ + - ]: 145588 : Assert(nremove <= state->memtupcount);
1456 : :
1457 : : /* before freeing any memory, update the statistics */
1458 : 145588 : tuplestore_updatemax(state);
1459 : :
1460 : : /* Release no-longer-needed tuples */
1461 [ + + ]: 291345 : for (i = state->memtupdeleted; i < nremove; i++)
1462 : : {
1463 : 145757 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1464 : 145757 : pfree(state->memtuples[i]);
1465 : 145757 : state->memtuples[i] = NULL;
1466 : 145757 : }
1467 : 145588 : state->memtupdeleted = nremove;
1468 : :
1469 : : /* mark tuplestore as truncated (used for Assert crosschecks only) */
1470 : 145588 : state->truncated = true;
1471 : :
1472 : : /*
1473 : : * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1474 : : * "deleted" slots as NULL. This prevents us from expending O(N^2) time
1475 : : * repeatedly memmove-ing a large pointer array. The worst case space
1476 : : * wastage is pretty small, since it's just pointers and not whole tuples.
1477 : : */
1478 [ + + ]: 145588 : if (nremove < state->memtupcount / 8)
1479 : 18780 : return;
1480 : :
1481 : : /*
1482 : : * Slide the array down and readjust pointers.
1483 : : *
1484 : : * In mergejoin's current usage, it's demonstrable that there will always
1485 : : * be exactly one non-removed tuple; so optimize that case.
1486 : : */
1487 [ + + ]: 126808 : if (nremove + 1 == state->memtupcount)
1488 : 105031 : state->memtuples[0] = state->memtuples[nremove];
1489 : : else
1490 : 21777 : memmove(state->memtuples, state->memtuples + nremove,
1491 : : (state->memtupcount - nremove) * sizeof(void *));
1492 : :
1493 : 126808 : state->memtupdeleted = 0;
1494 : 126808 : state->memtupcount -= nremove;
1495 [ + + ]: 556841 : for (i = 0; i < state->readptrcount; i++)
1496 : : {
1497 [ + + ]: 430033 : if (!state->readptrs[i].eof_reached)
1498 : 429359 : state->readptrs[i].current -= nremove;
1499 : 430033 : }
1500 [ - + ]: 151890 : }
1501 : :
1502 : : /*
1503 : : * tuplestore_updatemax
1504 : : * Update the maximum space used by this tuplestore and the method used
1505 : : * for storage.
1506 : : */
1507 : : static void
1508 : 147199 : tuplestore_updatemax(Tuplestorestate *state)
1509 : : {
1510 [ + + ]: 147199 : if (state->status == TSS_INMEM)
1511 [ + + ]: 147197 : state->maxSpace = Max(state->maxSpace,
1512 : : state->allowedMem - state->availMem);
1513 : : else
1514 : : {
1515 [ + - ]: 2 : state->maxSpace = Max(state->maxSpace,
1516 : : BufFileSize(state->myfile));
1517 : :
1518 : : /*
1519 : : * usedDisk never gets set to false again after spilling to disk, even
1520 : : * if tuplestore_clear() is called and new tuples go to memory again.
1521 : : */
1522 : 2 : state->usedDisk = true;
1523 : : }
1524 : 147199 : }
1525 : :
1526 : : /*
1527 : : * tuplestore_get_stats
1528 : : * Obtain statistics about the maximum space used by the tuplestore.
1529 : : * These statistics are the maximums and are not reset by calls to
1530 : : * tuplestore_trim() or tuplestore_clear().
1531 : : */
1532 : : void
1533 : 5 : tuplestore_get_stats(Tuplestorestate *state, char **max_storage_type,
1534 : : int64 *max_space)
1535 : : {
1536 : 5 : tuplestore_updatemax(state);
1537 : :
1538 [ + + ]: 5 : if (state->usedDisk)
1539 : 2 : *max_storage_type = "Disk";
1540 : : else
1541 : 3 : *max_storage_type = "Memory";
1542 : :
1543 : 5 : *max_space = state->maxSpace;
1544 : 5 : }
1545 : :
1546 : : /*
1547 : : * tuplestore_in_memory
1548 : : *
1549 : : * Returns true if the tuplestore has not spilled to disk.
1550 : : *
1551 : : * XXX exposing this is a violation of modularity ... should get rid of it.
1552 : : */
1553 : : bool
1554 : 288555 : tuplestore_in_memory(Tuplestorestate *state)
1555 : : {
1556 : 288555 : return (state->status == TSS_INMEM);
1557 : : }
1558 : :
1559 : :
1560 : : /*
1561 : : * Tape interface routines
1562 : : */
1563 : :
1564 : : static unsigned int
1565 : 1255250 : getlen(Tuplestorestate *state, bool eofOK)
1566 : : {
1567 : 1255250 : unsigned int len;
1568 : 1255250 : size_t nbytes;
1569 : :
1570 : 1255250 : nbytes = BufFileReadMaybeEOF(state->myfile, &len, sizeof(len), eofOK);
1571 [ + + ]: 1255250 : if (nbytes == 0)
1572 : 38 : return 0;
1573 : : else
1574 : 1255212 : return len;
1575 : 1255250 : }
1576 : :
1577 : :
1578 : : /*
1579 : : * Routines specialized for HeapTuple case
1580 : : *
1581 : : * The stored form is actually a MinimalTuple, but for largely historical
1582 : : * reasons we allow COPYTUP to work from a HeapTuple.
1583 : : *
1584 : : * Since MinimalTuple already has length in its first word, we don't need
1585 : : * to write that separately.
1586 : : */
1587 : :
1588 : : static void *
1589 : 9590 : copytup_heap(Tuplestorestate *state, void *tup)
1590 : : {
1591 : 9590 : MinimalTuple tuple;
1592 : :
1593 : 9590 : tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup, 0);
1594 : 9590 : USEMEM(state, GetMemoryChunkSpace(tuple));
1595 : 19180 : return tuple;
1596 : 9590 : }
1597 : :
1598 : : static void
1599 : 1262466 : writetup_heap(Tuplestorestate *state, void *tup)
1600 : : {
1601 : 1262466 : MinimalTuple tuple = (MinimalTuple) tup;
1602 : :
1603 : : /* the part of the MinimalTuple we'll write: */
1604 : 1262466 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1605 : 1262466 : unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1606 : :
1607 : : /* total on-disk footprint: */
1608 : 1262466 : unsigned int tuplen = tupbodylen + sizeof(int);
1609 : :
1610 : 1262466 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1611 : 1262466 : BufFileWrite(state->myfile, tupbody, tupbodylen);
1612 [ + - ]: 1262466 : if (state->backward) /* need trailing length word? */
1613 : 0 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1614 : :
1615 : 1262466 : FREEMEM(state, GetMemoryChunkSpace(tuple));
1616 : 1262466 : heap_free_minimal_tuple(tuple);
1617 : 1262466 : }
1618 : :
1619 : : static void *
1620 : 1255212 : readtup_heap(Tuplestorestate *state, unsigned int len)
1621 : : {
1622 : 1255212 : unsigned int tupbodylen = len - sizeof(int);
1623 : 1255212 : unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1624 : 1255212 : MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1625 : 1255212 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1626 : :
1627 : : /* read in the tuple proper */
1628 : 1255212 : tuple->t_len = tuplen;
1629 : 1255212 : BufFileReadExact(state->myfile, tupbody, tupbodylen);
1630 [ + - ]: 1255212 : if (state->backward) /* need trailing length word? */
1631 : 0 : BufFileReadExact(state->myfile, &tuplen, sizeof(tuplen));
1632 : 2510424 : return tuple;
1633 : 1255212 : }
|