Branch data Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execAsync.c
4 : : * Support routines for asynchronous execution
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/executor/execAsync.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "executor/execAsync.h"
18 : : #include "executor/executor.h"
19 : : #include "executor/nodeAppend.h"
20 : : #include "executor/nodeForeignscan.h"
21 : :
22 : : /*
23 : : * Asynchronously request a tuple from a designed async-capable node.
24 : : */
25 : : void
26 : 0 : ExecAsyncRequest(AsyncRequest *areq)
27 : : {
28 [ # # ]: 0 : if (areq->requestee->chgParam != NULL) /* something changed? */
29 : 0 : ExecReScan(areq->requestee); /* let ReScan handle this */
30 : :
31 : : /* must provide our own instrumentation support */
32 [ # # ]: 0 : if (areq->requestee->instrument)
33 : 0 : InstrStartNode(areq->requestee->instrument);
34 : :
35 [ # # ]: 0 : switch (nodeTag(areq->requestee))
36 : : {
37 : : case T_ForeignScanState:
38 : 0 : ExecAsyncForeignScanRequest(areq);
39 : 0 : break;
40 : : default:
41 : : /* If the node doesn't support async, caller messed up. */
42 [ # # # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
43 : : (int) nodeTag(areq->requestee));
44 : 0 : }
45 : :
46 : 0 : ExecAsyncResponse(areq);
47 : :
48 : : /* must provide our own instrumentation support */
49 [ # # ]: 0 : if (areq->requestee->instrument)
50 : 0 : InstrStopNode(areq->requestee->instrument,
51 [ # # ]: 0 : TupIsNull(areq->result) ? 0.0 : 1.0);
52 : 0 : }
53 : :
54 : : /*
55 : : * Give the asynchronous node a chance to configure the file descriptor event
56 : : * for which it wishes to wait. We expect the node-type specific callback to
57 : : * make a single call of the following form:
58 : : *
59 : : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60 : : */
61 : : void
62 : 0 : ExecAsyncConfigureWait(AsyncRequest *areq)
63 : : {
64 : : /* must provide our own instrumentation support */
65 [ # # ]: 0 : if (areq->requestee->instrument)
66 : 0 : InstrStartNode(areq->requestee->instrument);
67 : :
68 [ # # ]: 0 : switch (nodeTag(areq->requestee))
69 : : {
70 : : case T_ForeignScanState:
71 : 0 : ExecAsyncForeignScanConfigureWait(areq);
72 : 0 : break;
73 : : default:
74 : : /* If the node doesn't support async, caller messed up. */
75 [ # # # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
76 : : (int) nodeTag(areq->requestee));
77 : 0 : }
78 : :
79 : : /* must provide our own instrumentation support */
80 [ # # ]: 0 : if (areq->requestee->instrument)
81 : 0 : InstrStopNode(areq->requestee->instrument, 0.0);
82 : 0 : }
83 : :
84 : : /*
85 : : * Call the asynchronous node back when a relevant event has occurred.
86 : : */
87 : : void
88 : 0 : ExecAsyncNotify(AsyncRequest *areq)
89 : : {
90 : : /* must provide our own instrumentation support */
91 [ # # ]: 0 : if (areq->requestee->instrument)
92 : 0 : InstrStartNode(areq->requestee->instrument);
93 : :
94 [ # # ]: 0 : switch (nodeTag(areq->requestee))
95 : : {
96 : : case T_ForeignScanState:
97 : 0 : ExecAsyncForeignScanNotify(areq);
98 : 0 : break;
99 : : default:
100 : : /* If the node doesn't support async, caller messed up. */
101 [ # # # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
102 : : (int) nodeTag(areq->requestee));
103 : 0 : }
104 : :
105 : 0 : ExecAsyncResponse(areq);
106 : :
107 : : /* must provide our own instrumentation support */
108 [ # # ]: 0 : if (areq->requestee->instrument)
109 : 0 : InstrStopNode(areq->requestee->instrument,
110 [ # # ]: 0 : TupIsNull(areq->result) ? 0.0 : 1.0);
111 : 0 : }
112 : :
113 : : /*
114 : : * Call the requestor back when an asynchronous node has produced a result.
115 : : */
116 : : void
117 : 0 : ExecAsyncResponse(AsyncRequest *areq)
118 : : {
119 [ # # ]: 0 : switch (nodeTag(areq->requestor))
120 : : {
121 : : case T_AppendState:
122 : 0 : ExecAsyncAppendResponse(areq);
123 : 0 : break;
124 : : default:
125 : : /* If the node doesn't support async, caller messed up. */
126 [ # # # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
127 : : (int) nodeTag(areq->requestor));
128 : 0 : }
129 : 0 : }
130 : :
131 : : /*
132 : : * A requestee node should call this function to deliver the tuple to its
133 : : * requestor node. The requestee node can call this from its ExecAsyncRequest
134 : : * or ExecAsyncNotify callback.
135 : : */
136 : : void
137 : 0 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
138 : : {
139 : 0 : areq->request_complete = true;
140 : 0 : areq->result = result;
141 : 0 : }
142 : :
143 : : /*
144 : : * A requestee node should call this function to indicate that it is pending
145 : : * for a callback. The requestee node can call this from its ExecAsyncRequest
146 : : * or ExecAsyncNotify callback.
147 : : */
148 : : void
149 : 0 : ExecAsyncRequestPending(AsyncRequest *areq)
150 : : {
151 : 0 : areq->callback_pending = true;
152 : 0 : areq->request_complete = false;
153 : 0 : areq->result = NULL;
154 : 0 : }
|