Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pgpa_collector.c
4 : : * collect advice into backend-local or shared memory
5 : : *
6 : : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
7 : : *
8 : : * contrib/pg_plan_advice/pgpa_collector.c
9 : : *
10 : : *-------------------------------------------------------------------------
11 : : */
12 : :
13 : : #include "postgres.h"
14 : :
15 : : #include "pg_plan_advice.h"
16 : : #include "pgpa_collector.h"
17 : :
18 : : #include "datatype/timestamp.h"
19 : : #include "funcapi.h"
20 : : #include "miscadmin.h"
21 : : #include "nodes/pg_list.h"
22 : : #include "utils/acl.h"
23 : : #include "utils/builtins.h"
24 : : #include "utils/timestamp.h"
25 : :
26 : 3 : PG_FUNCTION_INFO_V1(pg_clear_collected_local_advice);
27 : 3 : PG_FUNCTION_INFO_V1(pg_clear_collected_shared_advice);
28 : 3 : PG_FUNCTION_INFO_V1(pg_get_collected_local_advice);
29 : 23 : PG_FUNCTION_INFO_V1(pg_get_collected_shared_advice);
30 : :
31 : : #define ADVICE_CHUNK_SIZE 1024
32 : : #define ADVICE_CHUNK_ARRAY_SIZE 64
33 : :
34 : : #define PG_GET_ADVICE_COLUMNS 7
35 : :
36 : : /*
37 : : * Advice extracted from one query plan, together with the query string
38 : : * and various other identifying details.
39 : : */
40 : : typedef struct pgpa_collected_advice
41 : : {
42 : : Oid userid; /* user OID */
43 : : Oid dbid; /* database OID */
44 : : uint64 queryid; /* query identifier */
45 : : TimestampTz timestamp; /* query timestamp */
46 : : int advice_offset; /* start of advice in textual data */
47 : : char textual_data[FLEXIBLE_ARRAY_MEMBER];
48 : : } pgpa_collected_advice;
49 : :
50 : : /*
51 : : * A bunch of pointers to pgpa_collected_advice objects, stored in
52 : : * backend-local memory.
53 : : */
54 : : typedef struct pgpa_local_advice_chunk
55 : : {
56 : : pgpa_collected_advice *entries[ADVICE_CHUNK_SIZE];
57 : : } pgpa_local_advice_chunk;
58 : :
59 : : /*
60 : : * Information about all of the pgpa_collected_advice objects that we're
61 : : * storing in local memory.
62 : : *
63 : : * We assign consecutive IDs, starting from 0, to each pgpa_collected_advice
64 : : * object that we store. The actual storage is an array of chunks, which
65 : : * helps keep memcpy() overhead low when we start discarding older data.
66 : : */
67 : : typedef struct pgpa_local_advice
68 : : {
69 : : uint64 next_id;
70 : : uint64 oldest_id;
71 : : uint64 base_id;
72 : : int chunk_array_allocated_size;
73 : : pgpa_local_advice_chunk **chunks;
74 : : } pgpa_local_advice;
75 : :
76 : : /*
77 : : * Just like pgpa_local_advice_chunk, but stored in a dynamic shared area,
78 : : * so we must use dsa_pointer instead of native pointers.
79 : : */
80 : : typedef struct pgpa_shared_advice_chunk
81 : : {
82 : : dsa_pointer entries[ADVICE_CHUNK_SIZE];
83 : : } pgpa_shared_advice_chunk;
84 : :
85 : : /*
86 : : * Just like pgpa_local_advice, but stored in a dynamic shared area, so
87 : : * we must use dsa_pointer instead of native pointers.
88 : : */
89 : : typedef struct pgpa_shared_advice
90 : : {
91 : : uint64 next_id;
92 : : uint64 oldest_id;
93 : : uint64 base_id;
94 : : int chunk_array_allocated_size;
95 : : dsa_pointer chunks;
96 : : } pgpa_shared_advice;
97 : :
98 : : /* Pointers to local and shared collectors */
99 : : static pgpa_local_advice *local_collector = NULL;
100 : : static pgpa_shared_advice *shared_collector = NULL;
101 : :
102 : : /* Static functions */
103 : : static pgpa_collected_advice *pgpa_make_collected_advice(Oid userid,
104 : : Oid dbid,
105 : : uint64 queryId,
106 : : TimestampTz timestamp,
107 : : const char *query_string,
108 : : const char *advice_string,
109 : : dsa_area *area,
110 : : dsa_pointer *result);
111 : : static void pgpa_store_local_advice(pgpa_collected_advice *ca);
112 : : static void pgpa_trim_local_advice(int limit);
113 : : static void pgpa_store_shared_advice(dsa_pointer ca_pointer);
114 : : static void pgpa_trim_shared_advice(dsa_area *area, int limit);
115 : :
116 : : /* Helper function to extract the query string from pgpa_collected_advice */
117 : : static inline const char *
118 : 470470 : query_string(pgpa_collected_advice *ca)
119 : : {
120 : 470470 : return ca->textual_data;
121 : : }
122 : :
123 : : /* Helper function to extract the advice string from pgpa_collected_advice */
124 : : static inline const char *
125 : 470470 : advice_string(pgpa_collected_advice *ca)
126 : : {
127 : 470470 : return ca->textual_data + ca->advice_offset;
128 : : }
129 : :
130 : : /*
131 : : * Store collected query advice into the local or shared advice collector,
132 : : * as appropriate.
133 : : */
134 : : void
135 : 24430 : pgpa_collect_advice(uint64 queryId, const char *query_string,
136 : : const char *advice_string)
137 : : {
138 : 24430 : Oid userid = GetUserId();
139 : 24430 : Oid dbid = MyDatabaseId;
140 : 24430 : TimestampTz now = GetCurrentTimestamp();
141 : :
142 [ + + - + ]: 24430 : if (pg_plan_advice_local_collector &&
143 : 2002 : pg_plan_advice_local_collection_limit > 0)
144 : : {
145 : 2002 : pgpa_collected_advice *ca;
146 : 2002 : MemoryContext oldcontext;
147 : :
148 : 2002 : oldcontext = MemoryContextSwitchTo(pg_plan_advice_get_mcxt());
149 : 4004 : ca = pgpa_make_collected_advice(userid, dbid, queryId, now,
150 : 2002 : query_string, advice_string,
151 : : NULL, NULL);
152 : 2002 : pgpa_store_local_advice(ca);
153 : 2002 : MemoryContextSwitchTo(oldcontext);
154 : 2002 : }
155 : :
156 [ + + - + ]: 24430 : if (pg_plan_advice_shared_collector &&
157 : 22308 : pg_plan_advice_shared_collection_limit > 0)
158 : : {
159 : 22308 : dsa_area *area = pg_plan_advice_dsa_area();
160 : 22308 : dsa_pointer ca_pointer = InvalidDsaPointer; /* placate compiler */
161 : :
162 : 44616 : pgpa_make_collected_advice(userid, dbid, queryId, now,
163 : 22308 : query_string, advice_string, area,
164 : : &ca_pointer);
165 : 22308 : pgpa_store_shared_advice(ca_pointer);
166 : 22308 : }
167 : 24430 : }
168 : :
169 : : /*
170 : : * Allocate and fill a new pgpa_collected_advice object.
171 : : *
172 : : * If area != NULL, it is used to allocate the new object, and the resulting
173 : : * dsa_pointer is returned via *result.
174 : : *
175 : : * If area == NULL, the new object is allocated in the current memory context,
176 : : * and result is not examined or modified.
177 : : */
178 : : static pgpa_collected_advice *
179 : 24310 : pgpa_make_collected_advice(Oid userid, Oid dbid, uint64 queryId,
180 : : TimestampTz timestamp,
181 : : const char *query_string,
182 : : const char *advice_string,
183 : : dsa_area *area, dsa_pointer *result)
184 : : {
185 : 24310 : size_t query_string_length = strlen(query_string) + 1;
186 : 24310 : size_t advice_string_length = strlen(advice_string) + 1;
187 : 24310 : size_t total_length;
188 : 24310 : pgpa_collected_advice *ca;
189 : :
190 : 24310 : total_length = offsetof(pgpa_collected_advice, textual_data)
191 : 24310 : + query_string_length + advice_string_length;
192 : :
193 [ + + ]: 24310 : if (area == NULL)
194 : 2002 : ca = palloc(total_length);
195 : : else
196 : : {
197 : 22308 : *result = dsa_allocate(area, total_length);
198 : 22308 : ca = dsa_get_address(area, *result);
199 : : }
200 : :
201 : 24310 : ca->userid = userid;
202 : 24310 : ca->dbid = dbid;
203 : 24310 : ca->queryid = queryId;
204 : 24310 : ca->timestamp = timestamp;
205 : 24310 : ca->advice_offset = query_string_length;
206 : :
207 : 24310 : memcpy(ca->textual_data, query_string, query_string_length);
208 : 24310 : memcpy(&ca->textual_data[ca->advice_offset],
209 : : advice_string, advice_string_length);
210 : :
211 : 48620 : return ca;
212 : 24310 : }
213 : :
214 : : /*
215 : : * Add a pg_collected_advice object to our backend-local advice collection.
216 : : *
217 : : * Caller is responsible for switching to the appropriate memory context;
218 : : * the provided object should have been allocated in that same context.
219 : : */
220 : : static void
221 : 2002 : pgpa_store_local_advice(pgpa_collected_advice *ca)
222 : : {
223 : 2002 : uint64 chunk_number;
224 : 2002 : uint64 chunk_offset;
225 : 2002 : pgpa_local_advice *la = local_collector;
226 : :
227 : : /* If the local advice collector isn't initialized yet, do that now. */
228 [ + + ]: 2002 : if (la == NULL)
229 : : {
230 : 1 : la = palloc0(sizeof(pgpa_local_advice));
231 : 1 : la->chunk_array_allocated_size = ADVICE_CHUNK_ARRAY_SIZE;
232 : 1 : la->chunks = palloc0_array(pgpa_local_advice_chunk *,
233 : : la->chunk_array_allocated_size);
234 : 1 : local_collector = la;
235 : 1 : }
236 : :
237 : : /* Compute chunk and offset at which to store this advice. */
238 : 2002 : chunk_number = (la->next_id - la->base_id) / ADVICE_CHUNK_SIZE;
239 : 2002 : chunk_offset = (la->next_id - la->base_id) % ADVICE_CHUNK_SIZE;
240 : :
241 : : /* Extend chunk array, if needed. */
242 [ + - ]: 2002 : if (chunk_number >= la->chunk_array_allocated_size)
243 : : {
244 : 0 : int new_size;
245 : :
246 : 0 : new_size = la->chunk_array_allocated_size + ADVICE_CHUNK_ARRAY_SIZE;
247 : 0 : la->chunks = repalloc0_array(la->chunks,
248 : : pgpa_local_advice_chunk *,
249 : : la->chunk_array_allocated_size,
250 : : new_size);
251 : 0 : la->chunk_array_allocated_size = new_size;
252 : 0 : }
253 : :
254 : : /* Allocate new chunk, if needed. */
255 [ + + ]: 2002 : if (la->chunks[chunk_number] == NULL)
256 : 2 : la->chunks[chunk_number] = palloc0_object(pgpa_local_advice_chunk);
257 : :
258 : : /* Save pointer and bump next-id counter. */
259 [ + - ]: 2002 : Assert(la->chunks[chunk_number]->entries[chunk_offset] == NULL);
260 : 2002 : la->chunks[chunk_number]->entries[chunk_offset] = ca;
261 : 2002 : ++la->next_id;
262 : :
263 : : /* If we've exceeded the storage limit, discard old data. */
264 : 2002 : pgpa_trim_local_advice(pg_plan_advice_local_collection_limit);
265 : 2002 : }
266 : :
267 : : /*
268 : : * Add a pg_collected_advice object to the shared advice collection.
269 : : *
270 : : * 'ca_pointer' should have been allocated from the pg_plan_advice DSA area
271 : : * and should point to an object of type pgpa_collected_advice.
272 : : */
273 : : static void
274 : 22308 : pgpa_store_shared_advice(dsa_pointer ca_pointer)
275 : : {
276 : 22308 : uint64 chunk_number;
277 : 22308 : uint64 chunk_offset;
278 : 22308 : pgpa_shared_state *state = pg_plan_advice_attach();
279 : 22308 : dsa_area *area = pg_plan_advice_dsa_area();
280 : 22308 : pgpa_shared_advice *sa = shared_collector;
281 : 22308 : dsa_pointer *chunk_array;
282 : 22308 : pgpa_shared_advice_chunk *chunk;
283 : :
284 : : /* Lock the shared state. */
285 : 22308 : LWLockAcquire(&state->lock, LW_EXCLUSIVE);
286 : :
287 : : /*
288 : : * If we're not attached to the shared advice collector yet, fix that now.
289 : : * If we're the first ones to attach, we may need to create the object.
290 : : */
291 [ + + ]: 22308 : if (sa == NULL)
292 : : {
293 [ + + ]: 232 : if (state->shared_collector == InvalidDsaPointer)
294 : 1 : state->shared_collector =
295 : 1 : dsa_allocate0(area, sizeof(pgpa_shared_advice));
296 : 232 : shared_collector = sa = dsa_get_address(area, state->shared_collector);
297 : 232 : }
298 : :
299 : : /*
300 : : * It's possible that some other backend may have succeeded in creating
301 : : * the main collector object but failed to allocate an initial chunk
302 : : * array, so we must be prepared to allocate the chunk array here whether
303 : : * or not we created the collector object.
304 : : */
305 [ + + ]: 22308 : if (shared_collector->chunk_array_allocated_size == 0)
306 : : {
307 : 1 : sa->chunks =
308 : 1 : dsa_allocate0(area,
309 : : sizeof(dsa_pointer) * ADVICE_CHUNK_ARRAY_SIZE);
310 : 1 : sa->chunk_array_allocated_size = ADVICE_CHUNK_ARRAY_SIZE;
311 : 1 : }
312 : :
313 : : /* Compute chunk and offset at which to store this advice. */
314 : 22308 : chunk_number = (sa->next_id - sa->base_id) / ADVICE_CHUNK_SIZE;
315 : 22308 : chunk_offset = (sa->next_id - sa->base_id) % ADVICE_CHUNK_SIZE;
316 : :
317 : : /* Get the address of the chunk array and, if needed, extend it. */
318 [ - + ]: 22308 : if (chunk_number >= sa->chunk_array_allocated_size)
319 : : {
320 : 0 : int new_size;
321 : 0 : dsa_pointer new_chunks;
322 : :
323 : : /*
324 : : * DSA can't enlarge an existing allocation, so we must make a new
325 : : * allocation and copy data over.
326 : : */
327 : 0 : new_size = sa->chunk_array_allocated_size + ADVICE_CHUNK_ARRAY_SIZE;
328 : 0 : new_chunks = dsa_allocate0(area, sizeof(dsa_pointer) * new_size);
329 : 0 : chunk_array = dsa_get_address(area, new_chunks);
330 : 0 : memcpy(chunk_array, dsa_get_address(area, sa->chunks),
331 : : sizeof(dsa_pointer) * sa->chunk_array_allocated_size);
332 : 0 : dsa_free(area, sa->chunks);
333 : 0 : sa->chunks = new_chunks;
334 : 0 : sa->chunk_array_allocated_size = new_size;
335 : 0 : }
336 : : else
337 : 22308 : chunk_array = dsa_get_address(area, sa->chunks);
338 : :
339 : : /* Get the address of the desired chunk, allocating it if needed. */
340 [ + + ]: 22308 : if (chunk_array[chunk_number] == InvalidDsaPointer)
341 : 22 : chunk_array[chunk_number] =
342 : 22 : dsa_allocate0(area, sizeof(pgpa_shared_advice_chunk));
343 : 22308 : chunk = dsa_get_address(area, chunk_array[chunk_number]);
344 : :
345 : : /* Save pointer and bump next-id counter. */
346 [ + - ]: 22308 : Assert(chunk->entries[chunk_offset] == InvalidDsaPointer);
347 : 22308 : chunk->entries[chunk_offset] = ca_pointer;
348 : 22308 : ++sa->next_id;
349 : :
350 : : /* If we've exceeded the storage limit, discard old data. */
351 : 22308 : pgpa_trim_shared_advice(area, pg_plan_advice_shared_collection_limit);
352 : :
353 : : /* Release lock on shared state. */
354 : 22308 : LWLockRelease(&state->lock);
355 : 22308 : }
356 : :
357 : : /*
358 : : * Discard collected advice stored in backend-local memory in excess of the
359 : : * specified limit.
360 : : */
361 : : static void
362 : 2004 : pgpa_trim_local_advice(int limit)
363 : : {
364 : 2004 : pgpa_local_advice *la = local_collector;
365 : 2004 : uint64 current_count;
366 : 2004 : uint64 trim_count;
367 : 2004 : uint64 total_chunk_count;
368 : 2004 : uint64 trim_chunk_count;
369 : 2004 : uint64 remaining_chunk_count;
370 : :
371 : : /* If we haven't yet reached the limit, there's nothing to do. */
372 : 2004 : current_count = la->next_id - la->oldest_id;
373 [ + + ]: 2004 : if (current_count <= limit)
374 : 2002 : return;
375 : :
376 : : /* Free enough entries to get us back down to the limit. */
377 : 2 : trim_count = current_count - limit;
378 [ + + ]: 2004 : while (trim_count > 0)
379 : : {
380 : 2002 : uint64 chunk_number;
381 : 2002 : uint64 chunk_offset;
382 : :
383 : 2002 : chunk_number = (la->oldest_id - la->base_id) / ADVICE_CHUNK_SIZE;
384 : 2002 : chunk_offset = (la->oldest_id - la->base_id) % ADVICE_CHUNK_SIZE;
385 : :
386 [ + - ]: 2002 : Assert(la->chunks[chunk_number]->entries[chunk_offset] != NULL);
387 : 2002 : pfree(la->chunks[chunk_number]->entries[chunk_offset]);
388 : 2002 : la->chunks[chunk_number]->entries[chunk_offset] = NULL;
389 : 2002 : ++la->oldest_id;
390 : 2002 : --trim_count;
391 : 2002 : }
392 : :
393 : : /* Free any chunks that are now entirely unused. */
394 : 2 : trim_chunk_count = (la->oldest_id - la->base_id) / ADVICE_CHUNK_SIZE;
395 [ + + ]: 3 : for (uint64 n = 0; n < trim_chunk_count; ++n)
396 : 1 : pfree(la->chunks[n]);
397 : :
398 : : /* Slide remaining chunk pointers back toward the base of the array. */
399 : 4 : total_chunk_count = (la->next_id - la->base_id +
400 : 2 : ADVICE_CHUNK_SIZE - 1) / ADVICE_CHUNK_SIZE;
401 : 2 : remaining_chunk_count = total_chunk_count - trim_chunk_count;
402 [ - + ]: 2 : if (remaining_chunk_count > 0)
403 : 2 : memmove(&la->chunks[0], &la->chunks[trim_chunk_count],
404 : : sizeof(pgpa_local_advice_chunk *) * remaining_chunk_count);
405 : :
406 : : /* Don't leave stale pointers around. */
407 : 2 : memset(&la->chunks[remaining_chunk_count], 0,
408 : : sizeof(pgpa_local_advice_chunk *)
409 : : * (total_chunk_count - remaining_chunk_count));
410 : :
411 : : /* Adjust base ID value accordingly. */
412 : 2 : la->base_id += trim_chunk_count * ADVICE_CHUNK_SIZE;
413 [ - + ]: 2004 : }
414 : :
415 : : /*
416 : : * Discard collected advice stored in shared memory in excess of the
417 : : * specified limit.
418 : : */
419 : : static void
420 : 22309 : pgpa_trim_shared_advice(dsa_area *area, int limit)
421 : : {
422 : 22309 : pgpa_shared_advice *sa = shared_collector;
423 : 22309 : uint64 current_count;
424 : 22309 : uint64 trim_count;
425 : 22309 : uint64 total_chunk_count;
426 : 22309 : uint64 trim_chunk_count;
427 : 22309 : uint64 remaining_chunk_count;
428 : 22309 : dsa_pointer *chunk_array;
429 : :
430 : : /* If we haven't yet reached the limit, there's nothing to do. */
431 : 22309 : current_count = sa->next_id - sa->oldest_id;
432 [ + + ]: 22309 : if (current_count <= limit)
433 : 22308 : return;
434 : :
435 : : /* Get a pointer to the chunk array. */
436 : 1 : chunk_array = dsa_get_address(area, sa->chunks);
437 : :
438 : : /* Free enough entries to get us back down to the limit. */
439 : 1 : trim_count = current_count - limit;
440 [ + + ]: 22309 : while (trim_count > 0)
441 : : {
442 : 22308 : uint64 chunk_number;
443 : 22308 : uint64 chunk_offset;
444 : 22308 : pgpa_shared_advice_chunk *chunk;
445 : :
446 : 22308 : chunk_number = (sa->oldest_id - sa->base_id) / ADVICE_CHUNK_SIZE;
447 : 22308 : chunk_offset = (sa->oldest_id - sa->base_id) % ADVICE_CHUNK_SIZE;
448 : :
449 : 22308 : chunk = dsa_get_address(area, chunk_array[chunk_number]);
450 [ + - ]: 22308 : Assert(chunk->entries[chunk_offset] != InvalidDsaPointer);
451 : 22308 : dsa_free(area, chunk->entries[chunk_offset]);
452 : 22308 : chunk->entries[chunk_offset] = InvalidDsaPointer;
453 : 22308 : ++sa->oldest_id;
454 : 22308 : --trim_count;
455 : 22308 : }
456 : :
457 : : /* Free any chunks that are now entirely unused. */
458 : 1 : trim_chunk_count = (sa->oldest_id - sa->base_id) / ADVICE_CHUNK_SIZE;
459 [ + + ]: 22 : for (uint64 n = 0; n < trim_chunk_count; ++n)
460 : 21 : dsa_free(area, chunk_array[n]);
461 : :
462 : : /* Slide remaining chunk pointers back toward the base of the array. */
463 : 2 : total_chunk_count = (sa->next_id - sa->base_id +
464 : 1 : ADVICE_CHUNK_SIZE - 1) / ADVICE_CHUNK_SIZE;
465 : 1 : remaining_chunk_count = total_chunk_count - trim_chunk_count;
466 [ - + ]: 1 : if (remaining_chunk_count > 0)
467 : 1 : memmove(&chunk_array[0], &chunk_array[trim_chunk_count],
468 : : sizeof(dsa_pointer) * remaining_chunk_count);
469 : :
470 : : /* Don't leave stale pointers around. */
471 : 1 : memset(&chunk_array[remaining_chunk_count], 0,
472 : : sizeof(pgpa_shared_advice_chunk *)
473 : : * (total_chunk_count - remaining_chunk_count));
474 : :
475 : : /* Adjust base ID value accordingly. */
476 : 1 : sa->base_id += trim_chunk_count * ADVICE_CHUNK_SIZE;
477 [ - + ]: 22309 : }
478 : :
479 : : /*
480 : : * SQL-callable function to discard advice collected in backend-local memory
481 : : */
482 : : Datum
483 : 3 : pg_clear_collected_local_advice(PG_FUNCTION_ARGS)
484 : : {
485 [ + + ]: 3 : if (local_collector != NULL)
486 : 2 : pgpa_trim_local_advice(0);
487 : :
488 : 3 : PG_RETURN_VOID();
489 : : }
490 : :
491 : : /*
492 : : * SQL-callable function to discard advice collected in backend-local memory
493 : : */
494 : : Datum
495 : 1 : pg_clear_collected_shared_advice(PG_FUNCTION_ARGS)
496 : : {
497 : 1 : pgpa_shared_state *state = pg_plan_advice_attach();
498 : 1 : dsa_area *area = pg_plan_advice_dsa_area();
499 : :
500 : 1 : LWLockAcquire(&state->lock, LW_EXCLUSIVE);
501 : :
502 : : /*
503 : : * If we're not attached to the shared advice collector yet, fix that now;
504 : : * but if the collector doesn't even exist, we can return without doing
505 : : * anything else.
506 : : */
507 [ - + ]: 1 : if (shared_collector == NULL)
508 : : {
509 [ + - ]: 1 : if (state->shared_collector == InvalidDsaPointer)
510 : : {
511 : 0 : LWLockRelease(&state->lock);
512 : 0 : return (Datum) 0;
513 : : }
514 : 1 : shared_collector = dsa_get_address(area, state->shared_collector);
515 : 1 : }
516 : :
517 : : /* Do the real work */
518 : 1 : pgpa_trim_shared_advice(area, 0);
519 : :
520 : 1 : LWLockRelease(&state->lock);
521 : :
522 : 1 : PG_RETURN_VOID();
523 : 1 : }
524 : :
525 : : /*
526 : : * SQL-callable SRF to return advice collected in backend-local memory
527 : : */
528 : : Datum
529 : 2 : pg_get_collected_local_advice(PG_FUNCTION_ARGS)
530 : : {
531 : 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
532 : 2 : pgpa_local_advice *la = local_collector;
533 : 2 : Oid userid = GetUserId();
534 : :
535 : 2 : InitMaterializedSRF(fcinfo, 0);
536 : :
537 [ + - ]: 2 : if (la == NULL)
538 : 0 : return (Datum) 0;
539 : :
540 : : /* Loop over all entries. */
541 [ + + ]: 2004 : for (uint64 id = la->oldest_id; id < la->next_id; ++id)
542 : : {
543 : 2002 : uint64 chunk_number;
544 : 2002 : uint64 chunk_offset;
545 : 2002 : pgpa_collected_advice *ca;
546 : 2002 : Datum values[PG_GET_ADVICE_COLUMNS];
547 : 2002 : bool nulls[PG_GET_ADVICE_COLUMNS] = {0};
548 : :
549 : 2002 : chunk_number = (id - la->base_id) / ADVICE_CHUNK_SIZE;
550 : 2002 : chunk_offset = (id - la->base_id) % ADVICE_CHUNK_SIZE;
551 : :
552 : 2002 : ca = la->chunks[chunk_number]->entries[chunk_offset];
553 : :
554 [ + - ]: 2002 : if (!member_can_set_role(userid, ca->userid))
555 : 0 : continue;
556 : :
557 : 2002 : values[0] = UInt64GetDatum(id);
558 : 2002 : values[1] = ObjectIdGetDatum(ca->userid);
559 : 2002 : values[2] = ObjectIdGetDatum(ca->dbid);
560 : 2002 : values[3] = UInt64GetDatum(ca->queryid);
561 : 2002 : values[4] = TimestampGetDatum(ca->timestamp);
562 : 2002 : values[5] = CStringGetTextDatum(query_string(ca));
563 : 2002 : values[6] = CStringGetTextDatum(advice_string(ca));
564 : :
565 : 4004 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
566 : 2002 : values, nulls);
567 [ - - + ]: 2002 : }
568 : :
569 : 2 : return (Datum) 0;
570 : 2 : }
571 : :
572 : : /*
573 : : * SQL-callable SRF to return advice collected in shared memory
574 : : */
575 : : Datum
576 : 21 : pg_get_collected_shared_advice(PG_FUNCTION_ARGS)
577 : : {
578 : 21 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
579 : 21 : pgpa_shared_state *state = pg_plan_advice_attach();
580 : 21 : dsa_area *area = pg_plan_advice_dsa_area();
581 : 21 : dsa_pointer *chunk_array;
582 : 21 : pgpa_shared_advice *sa = shared_collector;
583 : :
584 : 21 : InitMaterializedSRF(fcinfo, 0);
585 : :
586 : : /* Lock the shared state. */
587 : 21 : LWLockAcquire(&state->lock, LW_SHARED);
588 : :
589 : : /*
590 : : * If we're not attached to the shared advice collector yet, fix that now;
591 : : * but if the collector doesn't even exist, we can return without doing
592 : : * anything else.
593 : : */
594 [ - + ]: 21 : if (sa == NULL)
595 : : {
596 [ + - ]: 21 : if (state->shared_collector == InvalidDsaPointer)
597 : : {
598 : 0 : LWLockRelease(&state->lock);
599 : 0 : return (Datum) 0;
600 : : }
601 : 21 : shared_collector = sa = dsa_get_address(area, state->shared_collector);
602 : 21 : }
603 : :
604 : : /* Get a pointer to the chunk array. */
605 : 21 : chunk_array = dsa_get_address(area, sa->chunks);
606 : :
607 : : /* Loop over all entries. */
608 [ + + ]: 468489 : for (uint64 id = sa->oldest_id; id < sa->next_id; ++id)
609 : : {
610 : 468468 : uint64 chunk_number;
611 : 468468 : uint64 chunk_offset;
612 : 468468 : pgpa_shared_advice_chunk *chunk;
613 : 468468 : pgpa_collected_advice *ca;
614 : 468468 : Datum values[PG_GET_ADVICE_COLUMNS];
615 : 468468 : bool nulls[PG_GET_ADVICE_COLUMNS] = {0};
616 : :
617 : 468468 : chunk_number = (id - sa->base_id) / ADVICE_CHUNK_SIZE;
618 : 468468 : chunk_offset = (id - sa->base_id) % ADVICE_CHUNK_SIZE;
619 : :
620 : 468468 : chunk = dsa_get_address(area, chunk_array[chunk_number]);
621 : 468468 : ca = dsa_get_address(area, chunk->entries[chunk_offset]);
622 : :
623 : 468468 : values[0] = UInt64GetDatum(id);
624 : 468468 : values[1] = ObjectIdGetDatum(ca->userid);
625 : 468468 : values[2] = ObjectIdGetDatum(ca->dbid);
626 : 468468 : values[3] = UInt64GetDatum(ca->queryid);
627 : 468468 : values[4] = TimestampGetDatum(ca->timestamp);
628 : 468468 : values[5] = CStringGetTextDatum(query_string(ca));
629 : 468468 : values[6] = CStringGetTextDatum(advice_string(ca));
630 : :
631 : 936936 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
632 : 468468 : values, nulls);
633 : 468468 : }
634 : :
635 : : /* Release lock on shared state. */
636 : 21 : LWLockRelease(&state->lock);
637 : :
638 : 21 : return (Datum) 0;
639 : 21 : }
|