Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "libpq-fe.h"
23 : #include "pg_getopt.h"
24 :
25 :
26 : static void exit_nicely(PGconn *conn);
27 : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
28 : pg_attribute_printf(2, 3);
29 : static bool process_result(PGconn *conn, PGresult *res, int results,
30 : int numsent);
31 :
32 : static const char *const progname = "libpq_pipeline";
33 :
34 : /* Options and defaults */
35 : static char *tracefile = NULL; /* path to PQtrace() file */
36 :
37 :
38 : #ifdef DEBUG_OUTPUT
39 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 : #else
41 : #define pg_debug(...)
42 : #endif
43 :
44 : static const char *const drop_table_sql =
45 : "DROP TABLE IF EXISTS pq_pipeline_demo";
46 : static const char *const create_table_sql =
47 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 : "int8filler int8);";
49 : static const char *const insert_sql =
50 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 : static const char *const insert_sql2 =
52 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53 :
54 : /* max char length of an int32/64, plus sign and null terminator */
55 : #define MAXINTLEN 12
56 : #define MAXINT8LEN 20
57 :
58 : static void
59 0 : exit_nicely(PGconn *conn)
60 : {
61 0 : PQfinish(conn);
62 0 : exit(1);
63 : }
64 :
65 : /*
66 : * The following few functions are wrapped in macros to make the reported line
67 : * number in an error match the line number of the invocation.
68 : */
69 :
70 : /*
71 : * Print an error to stderr and terminate the program.
72 : */
73 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74 : pg_noreturn static void
75 0 : pg_fatal_impl(int line, const char *fmt,...)
76 : {
77 0 : va_list args;
78 :
79 0 : fflush(stdout);
80 :
81 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
82 0 : va_start(args, fmt);
83 0 : vfprintf(stderr, fmt, args);
84 0 : va_end(args);
85 0 : Assert(fmt[strlen(fmt) - 1] != '\n');
86 0 : fprintf(stderr, "\n");
87 0 : exit(1);
88 : }
89 :
90 : /*
91 : * Check that libpq next returns a PGresult with the specified status,
92 : * returning the PGresult so that caller can perform additional checks.
93 : */
94 : #define confirm_result_status(conn, status) confirm_result_status_impl(__LINE__, conn, status)
95 : static PGresult *
96 0 : confirm_result_status_impl(int line, PGconn *conn, ExecStatusType status)
97 : {
98 0 : PGresult *res;
99 :
100 0 : res = PQgetResult(conn);
101 0 : if (res == NULL)
102 0 : pg_fatal_impl(line, "PQgetResult returned null unexpectedly: %s",
103 0 : PQerrorMessage(conn));
104 0 : if (PQresultStatus(res) != status)
105 0 : pg_fatal_impl(line, "PQgetResult returned status %s, expected %s: %s",
106 0 : PQresStatus(PQresultStatus(res)),
107 0 : PQresStatus(status),
108 0 : PQerrorMessage(conn));
109 0 : return res;
110 0 : }
111 :
112 : /*
113 : * Check that libpq next returns a PGresult with the specified status,
114 : * then free the PGresult.
115 : */
116 : #define consume_result_status(conn, status) consume_result_status_impl(__LINE__, conn, status)
117 : static void
118 0 : consume_result_status_impl(int line, PGconn *conn, ExecStatusType status)
119 : {
120 0 : PGresult *res;
121 :
122 0 : res = confirm_result_status_impl(line, conn, status);
123 0 : PQclear(res);
124 0 : }
125 :
126 : /*
127 : * Check that libpq next returns a null PGresult.
128 : */
129 : #define consume_null_result(conn) consume_null_result_impl(__LINE__, conn)
130 : static void
131 0 : consume_null_result_impl(int line, PGconn *conn)
132 : {
133 0 : PGresult *res;
134 :
135 0 : res = PQgetResult(conn);
136 0 : if (res != NULL)
137 0 : pg_fatal_impl(line, "expected NULL PGresult, got %s: %s",
138 0 : PQresStatus(PQresultStatus(res)),
139 0 : PQerrorMessage(conn));
140 0 : }
141 :
142 : /*
143 : * Check that the query on the given connection got canceled.
144 : */
145 : #define consume_query_cancel(conn) consume_query_cancel_impl(__LINE__, conn)
146 : static void
147 0 : consume_query_cancel_impl(int line, PGconn *conn)
148 : {
149 0 : PGresult *res;
150 :
151 0 : res = confirm_result_status_impl(line, conn, PGRES_FATAL_ERROR);
152 0 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
153 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
154 0 : PQerrorMessage(conn));
155 0 : PQclear(res);
156 :
157 0 : while (PQisBusy(conn))
158 0 : PQconsumeInput(conn);
159 0 : }
160 :
161 : /*
162 : * Using monitorConn, query pg_stat_activity to see that the connection with
163 : * the given PID is either in the given state, or waiting on the given event
164 : * (only one of them can be given).
165 : */
166 : static void
167 0 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
168 : char *state, char *event)
169 : {
170 0 : const Oid paramTypes[] = {INT4OID, TEXTOID};
171 0 : const char *paramValues[2];
172 0 : char *pidstr = psprintf("%d", procpid);
173 :
174 0 : Assert((state == NULL) ^ (event == NULL));
175 :
176 0 : paramValues[0] = pidstr;
177 0 : paramValues[1] = state ? state : event;
178 :
179 0 : while (true)
180 : {
181 0 : PGresult *res;
182 0 : char *value;
183 :
184 0 : if (state != NULL)
185 0 : res = PQexecParams(monitorConn,
186 : "SELECT count(*) FROM pg_stat_activity WHERE "
187 : "pid = $1 AND state = $2",
188 0 : 2, paramTypes, paramValues, NULL, NULL, 0);
189 : else
190 0 : res = PQexecParams(monitorConn,
191 : "SELECT count(*) FROM pg_stat_activity WHERE "
192 : "pid = $1 AND wait_event = $2",
193 0 : 2, paramTypes, paramValues, NULL, NULL, 0);
194 :
195 0 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
196 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
197 0 : if (PQntuples(res) != 1)
198 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
199 0 : if (PQnfields(res) != 1)
200 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
201 0 : value = PQgetvalue(res, 0, 0);
202 0 : if (strcmp(value, "0") != 0)
203 : {
204 0 : PQclear(res);
205 0 : break;
206 : }
207 0 : PQclear(res);
208 :
209 : /* wait 10ms before polling again */
210 0 : pg_usleep(10000);
211 0 : }
212 :
213 0 : pfree(pidstr);
214 0 : }
215 :
216 : #define send_cancellable_query(conn, monitorConn) \
217 : send_cancellable_query_impl(__LINE__, conn, monitorConn)
218 : static void
219 0 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
220 : {
221 0 : const char *env_wait;
222 0 : const Oid paramTypes[1] = {INT4OID};
223 :
224 : /*
225 : * Wait for the connection to be idle, so that our check for an active
226 : * connection below is reliable, instead of possibly seeing an outdated
227 : * state.
228 : */
229 0 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
230 :
231 0 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
232 0 : if (env_wait == NULL)
233 0 : env_wait = "180";
234 :
235 0 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
236 0 : &env_wait, NULL, NULL, 0) != 1)
237 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
238 :
239 : /*
240 : * Wait for the sleep to be active, because if the query is not running
241 : * yet, the cancel request that we send won't have any effect.
242 : */
243 0 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
244 0 : }
245 :
246 : /*
247 : * Create a new connection with the same conninfo as the given one.
248 : */
249 : static PGconn *
250 0 : copy_connection(PGconn *conn)
251 : {
252 0 : PGconn *copyConn;
253 0 : PQconninfoOption *opts = PQconninfo(conn);
254 0 : const char **keywords;
255 0 : const char **vals;
256 0 : int nopts = 0;
257 0 : int i;
258 :
259 0 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
260 0 : nopts++;
261 0 : nopts++; /* for the NULL terminator */
262 :
263 0 : keywords = pg_malloc(sizeof(char *) * nopts);
264 0 : vals = pg_malloc(sizeof(char *) * nopts);
265 :
266 0 : i = 0;
267 0 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
268 : {
269 0 : if (opt->val)
270 : {
271 0 : keywords[i] = opt->keyword;
272 0 : vals[i] = opt->val;
273 0 : i++;
274 0 : }
275 0 : }
276 0 : keywords[i] = vals[i] = NULL;
277 :
278 0 : copyConn = PQconnectdbParams(keywords, vals, false);
279 :
280 0 : if (PQstatus(copyConn) != CONNECTION_OK)
281 0 : pg_fatal("Connection to database failed: %s",
282 : PQerrorMessage(copyConn));
283 :
284 0 : pfree(keywords);
285 0 : pfree(vals);
286 0 : PQconninfoFree(opts);
287 :
288 0 : return copyConn;
289 0 : }
290 :
291 : /*
292 : * Test query cancellation routines
293 : */
294 : static void
295 0 : test_cancel(PGconn *conn)
296 : {
297 0 : PGcancel *cancel;
298 0 : PGcancelConn *cancelConn;
299 0 : PGconn *monitorConn;
300 0 : char errorbuf[256];
301 :
302 0 : fprintf(stderr, "test cancellations... ");
303 :
304 0 : if (PQsetnonblocking(conn, 1) != 0)
305 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
306 :
307 : /*
308 : * Make a separate connection to the database to monitor the query on the
309 : * main connection.
310 : */
311 0 : monitorConn = copy_connection(conn);
312 0 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
313 :
314 : /* test PQcancel */
315 0 : send_cancellable_query(conn, monitorConn);
316 0 : cancel = PQgetCancel(conn);
317 0 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
318 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
319 0 : consume_query_cancel(conn);
320 :
321 : /* PGcancel object can be reused for the next query */
322 0 : send_cancellable_query(conn, monitorConn);
323 0 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
324 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
325 0 : consume_query_cancel(conn);
326 :
327 0 : PQfreeCancel(cancel);
328 :
329 : /* test PQrequestCancel */
330 0 : send_cancellable_query(conn, monitorConn);
331 0 : if (!PQrequestCancel(conn))
332 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
333 0 : consume_query_cancel(conn);
334 :
335 : /* test PQcancelBlocking */
336 0 : send_cancellable_query(conn, monitorConn);
337 0 : cancelConn = PQcancelCreate(conn);
338 0 : if (!PQcancelBlocking(cancelConn))
339 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
340 0 : consume_query_cancel(conn);
341 0 : PQcancelFinish(cancelConn);
342 :
343 : /* test PQcancelCreate and then polling with PQcancelPoll */
344 0 : send_cancellable_query(conn, monitorConn);
345 0 : cancelConn = PQcancelCreate(conn);
346 0 : if (!PQcancelStart(cancelConn))
347 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
348 0 : while (true)
349 : {
350 0 : struct timeval tv;
351 0 : fd_set input_mask;
352 0 : fd_set output_mask;
353 0 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
354 0 : int sock = PQcancelSocket(cancelConn);
355 :
356 0 : if (pollres == PGRES_POLLING_OK)
357 0 : break;
358 :
359 0 : FD_ZERO(&input_mask);
360 0 : FD_ZERO(&output_mask);
361 0 : switch (pollres)
362 : {
363 : case PGRES_POLLING_READING:
364 : pg_debug("polling for reads\n");
365 0 : FD_SET(sock, &input_mask);
366 0 : break;
367 : case PGRES_POLLING_WRITING:
368 : pg_debug("polling for writes\n");
369 0 : FD_SET(sock, &output_mask);
370 0 : break;
371 : default:
372 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
373 : }
374 :
375 0 : if (sock < 0)
376 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
377 :
378 0 : tv.tv_sec = 3;
379 0 : tv.tv_usec = 0;
380 :
381 0 : while (true)
382 : {
383 0 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
384 : {
385 0 : if (errno == EINTR)
386 0 : continue;
387 0 : pg_fatal("select() failed: %m");
388 : }
389 0 : break;
390 : }
391 0 : }
392 0 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
393 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
394 0 : consume_query_cancel(conn);
395 :
396 : /*
397 : * test PQcancelReset works on the cancel connection and it can be reused
398 : * afterwards
399 : */
400 0 : PQcancelReset(cancelConn);
401 :
402 0 : send_cancellable_query(conn, monitorConn);
403 0 : if (!PQcancelStart(cancelConn))
404 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
405 0 : while (true)
406 : {
407 0 : struct timeval tv;
408 0 : fd_set input_mask;
409 0 : fd_set output_mask;
410 0 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
411 0 : int sock = PQcancelSocket(cancelConn);
412 :
413 0 : if (pollres == PGRES_POLLING_OK)
414 0 : break;
415 :
416 0 : FD_ZERO(&input_mask);
417 0 : FD_ZERO(&output_mask);
418 0 : switch (pollres)
419 : {
420 : case PGRES_POLLING_READING:
421 : pg_debug("polling for reads\n");
422 0 : FD_SET(sock, &input_mask);
423 0 : break;
424 : case PGRES_POLLING_WRITING:
425 : pg_debug("polling for writes\n");
426 0 : FD_SET(sock, &output_mask);
427 0 : break;
428 : default:
429 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
430 : }
431 :
432 0 : if (sock < 0)
433 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
434 :
435 0 : tv.tv_sec = 3;
436 0 : tv.tv_usec = 0;
437 :
438 0 : while (true)
439 : {
440 0 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
441 : {
442 0 : if (errno == EINTR)
443 0 : continue;
444 0 : pg_fatal("select() failed: %m");
445 : }
446 0 : break;
447 : }
448 0 : }
449 0 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
450 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
451 0 : consume_query_cancel(conn);
452 :
453 0 : PQcancelFinish(cancelConn);
454 0 : PQfinish(monitorConn);
455 :
456 0 : fprintf(stderr, "ok\n");
457 0 : }
458 :
459 : static void
460 0 : test_disallowed_in_pipeline(PGconn *conn)
461 : {
462 0 : PGresult *res = NULL;
463 :
464 0 : fprintf(stderr, "test error cases... ");
465 :
466 0 : if (PQisnonblocking(conn))
467 0 : pg_fatal("Expected blocking connection mode");
468 :
469 0 : if (PQenterPipelineMode(conn) != 1)
470 0 : pg_fatal("Unable to enter pipeline mode");
471 :
472 0 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
473 0 : pg_fatal("Pipeline mode not activated properly");
474 :
475 : /* PQexec should fail in pipeline mode */
476 0 : res = PQexec(conn, "SELECT 1");
477 0 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
478 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
479 0 : if (strcmp(PQerrorMessage(conn),
480 0 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
481 0 : pg_fatal("did not get expected error message; got: \"%s\"",
482 : PQerrorMessage(conn));
483 0 : PQclear(res);
484 :
485 : /* PQsendQuery should fail in pipeline mode */
486 0 : if (PQsendQuery(conn, "SELECT 1") != 0)
487 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
488 0 : if (strcmp(PQerrorMessage(conn),
489 0 : "PQsendQuery not allowed in pipeline mode\n") != 0)
490 0 : pg_fatal("did not get expected error message; got: \"%s\"",
491 : PQerrorMessage(conn));
492 :
493 : /* Entering pipeline mode when already in pipeline mode is OK */
494 0 : if (PQenterPipelineMode(conn) != 1)
495 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
496 :
497 0 : if (PQisBusy(conn) != 0)
498 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
499 :
500 : /* ok, back to normal command mode */
501 0 : if (PQexitPipelineMode(conn) != 1)
502 0 : pg_fatal("couldn't exit idle empty pipeline mode");
503 :
504 0 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
505 0 : pg_fatal("Pipeline mode not terminated properly");
506 :
507 : /* exiting pipeline mode when not in pipeline mode should be a no-op */
508 0 : if (PQexitPipelineMode(conn) != 1)
509 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
510 :
511 : /* can now PQexec again */
512 0 : res = PQexec(conn, "SELECT 1");
513 0 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
514 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
515 : PQerrorMessage(conn));
516 0 : PQclear(res);
517 :
518 0 : fprintf(stderr, "ok\n");
519 0 : }
520 :
521 : static void
522 0 : test_multi_pipelines(PGconn *conn)
523 : {
524 0 : const char *dummy_params[1] = {"1"};
525 0 : Oid dummy_param_oids[1] = {INT4OID};
526 :
527 0 : fprintf(stderr, "multi pipeline... ");
528 :
529 : /*
530 : * Queue up a couple of small pipelines and process each without returning
531 : * to command mode first.
532 : */
533 0 : if (PQenterPipelineMode(conn) != 1)
534 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
535 :
536 : /* first pipeline */
537 0 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
538 0 : dummy_params, NULL, NULL, 0) != 1)
539 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
540 :
541 0 : if (PQpipelineSync(conn) != 1)
542 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
543 :
544 : /* second pipeline */
545 0 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
546 0 : dummy_params, NULL, NULL, 0) != 1)
547 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
548 :
549 : /* Skip flushing once. */
550 0 : if (PQsendPipelineSync(conn) != 1)
551 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
552 :
553 : /* third pipeline */
554 0 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
555 0 : dummy_params, NULL, NULL, 0) != 1)
556 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
557 :
558 0 : if (PQpipelineSync(conn) != 1)
559 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
560 :
561 : /* OK, start processing the results */
562 :
563 : /* first pipeline */
564 0 : consume_result_status(conn, PGRES_TUPLES_OK);
565 :
566 0 : consume_null_result(conn);
567 :
568 0 : if (PQexitPipelineMode(conn) != 0)
569 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
570 :
571 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
572 :
573 : /* second pipeline */
574 0 : consume_result_status(conn, PGRES_TUPLES_OK);
575 :
576 0 : consume_null_result(conn);
577 :
578 0 : if (PQexitPipelineMode(conn) != 0)
579 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
580 :
581 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
582 :
583 : /* third pipeline */
584 0 : consume_result_status(conn, PGRES_TUPLES_OK);
585 :
586 0 : consume_null_result(conn);
587 :
588 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
589 :
590 : /* We're still in pipeline mode ... */
591 0 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
592 0 : pg_fatal("Fell out of pipeline mode somehow");
593 :
594 : /* until we end it, which we can safely do now */
595 0 : if (PQexitPipelineMode(conn) != 1)
596 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
597 : PQerrorMessage(conn));
598 :
599 0 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
600 0 : pg_fatal("exiting pipeline mode didn't seem to work");
601 :
602 0 : fprintf(stderr, "ok\n");
603 0 : }
604 :
605 : /*
606 : * Test behavior when a pipeline dispatches a number of commands that are
607 : * not flushed by a sync point.
608 : */
609 : static void
610 0 : test_nosync(PGconn *conn)
611 : {
612 0 : int numqueries = 10;
613 0 : int results = 0;
614 0 : int sock = PQsocket(conn);
615 :
616 0 : fprintf(stderr, "nosync... ");
617 :
618 0 : if (sock < 0)
619 0 : pg_fatal("invalid socket");
620 :
621 0 : if (PQenterPipelineMode(conn) != 1)
622 0 : pg_fatal("could not enter pipeline mode");
623 0 : for (int i = 0; i < numqueries; i++)
624 : {
625 0 : fd_set input_mask;
626 0 : struct timeval tv;
627 :
628 0 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
629 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
630 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
631 0 : PQflush(conn);
632 :
633 : /*
634 : * If the server has written anything to us, read (some of) it now.
635 : */
636 0 : FD_ZERO(&input_mask);
637 0 : FD_SET(sock, &input_mask);
638 0 : tv.tv_sec = 0;
639 0 : tv.tv_usec = 0;
640 0 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
641 : {
642 0 : fprintf(stderr, "select() failed: %m\n");
643 0 : exit_nicely(conn);
644 0 : }
645 0 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
646 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
647 0 : }
648 :
649 : /* tell server to flush its output buffer */
650 0 : if (PQsendFlushRequest(conn) != 1)
651 0 : pg_fatal("failed to send flush request");
652 0 : PQflush(conn);
653 :
654 : /* Now read all results */
655 0 : for (;;)
656 : {
657 : /* We expect exactly one TUPLES_OK result for each query we sent */
658 0 : consume_result_status(conn, PGRES_TUPLES_OK);
659 :
660 : /* and one NULL result should follow each */
661 0 : consume_null_result(conn);
662 :
663 0 : results++;
664 :
665 : /* if we're done, we're done */
666 0 : if (results == numqueries)
667 0 : break;
668 : }
669 :
670 0 : fprintf(stderr, "ok\n");
671 0 : }
672 :
673 : /*
674 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
675 : * still have to get results for each pipeline item, but the item will just be
676 : * a PGRES_PIPELINE_ABORTED code.
677 : *
678 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
679 : * usually use an xact, but in this case we want to observe the effects of each
680 : * statement.
681 : */
682 : static void
683 0 : test_pipeline_abort(PGconn *conn)
684 : {
685 0 : PGresult *res = NULL;
686 0 : const char *dummy_params[1] = {"1"};
687 0 : Oid dummy_param_oids[1] = {INT4OID};
688 0 : int i;
689 0 : int gotrows;
690 0 : bool goterror;
691 :
692 0 : fprintf(stderr, "aborted pipeline... ");
693 :
694 0 : res = PQexec(conn, drop_table_sql);
695 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
696 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
697 0 : PQclear(res);
698 :
699 0 : res = PQexec(conn, create_table_sql);
700 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
701 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
702 0 : PQclear(res);
703 :
704 : /*
705 : * Queue up a couple of small pipelines and process each without returning
706 : * to command mode first. Make sure the second operation in the first
707 : * pipeline ERRORs.
708 : */
709 0 : if (PQenterPipelineMode(conn) != 1)
710 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
711 :
712 0 : dummy_params[0] = "1";
713 0 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
714 0 : dummy_params, NULL, NULL, 0) != 1)
715 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
716 :
717 0 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
718 0 : 1, dummy_param_oids, dummy_params,
719 0 : NULL, NULL, 0) != 1)
720 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
721 :
722 0 : dummy_params[0] = "2";
723 0 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
724 0 : dummy_params, NULL, NULL, 0) != 1)
725 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
726 :
727 0 : if (PQpipelineSync(conn) != 1)
728 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
729 :
730 0 : dummy_params[0] = "3";
731 0 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
732 0 : dummy_params, NULL, NULL, 0) != 1)
733 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
734 : PQerrorMessage(conn));
735 :
736 0 : if (PQpipelineSync(conn) != 1)
737 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
738 :
739 : /*
740 : * OK, start processing the pipeline results.
741 : *
742 : * We should get a command-ok for the first query, then a fatal error and
743 : * a pipeline aborted message for the second insert, a pipeline-end, then
744 : * a command-ok and a pipeline-ok for the second pipeline operation.
745 : */
746 0 : consume_result_status(conn, PGRES_COMMAND_OK);
747 :
748 : /* NULL result to signal end-of-results for this command */
749 0 : consume_null_result(conn);
750 :
751 : /* Second query caused error, so we expect an error next */
752 0 : consume_result_status(conn, PGRES_FATAL_ERROR);
753 :
754 : /* NULL result to signal end-of-results for this command */
755 0 : consume_null_result(conn);
756 :
757 : /*
758 : * pipeline should now be aborted.
759 : *
760 : * Note that we could still queue more queries at this point if we wanted;
761 : * they'd get added to a new third pipeline since we've already sent a
762 : * second. The aborted flag relates only to the pipeline being received.
763 : */
764 0 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
765 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
766 :
767 : /* third query in pipeline, the second insert */
768 0 : consume_result_status(conn, PGRES_PIPELINE_ABORTED);
769 :
770 : /* NULL result to signal end-of-results for this command */
771 0 : consume_null_result(conn);
772 :
773 0 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
774 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
775 :
776 : /* Ensure we're still in pipeline */
777 0 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
778 0 : pg_fatal("Fell out of pipeline mode somehow");
779 :
780 : /*
781 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
782 : *
783 : * (This is so clients know to start processing results normally again and
784 : * can tell the difference between skipped commands and the sync.)
785 : */
786 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
787 :
788 0 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
789 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
790 :
791 : /* We're still in pipeline mode... */
792 0 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
793 0 : pg_fatal("Fell out of pipeline mode somehow");
794 :
795 : /* the insert from the second pipeline */
796 0 : consume_result_status(conn, PGRES_COMMAND_OK);
797 :
798 : /* Read the NULL result at the end of the command */
799 0 : consume_null_result(conn);
800 :
801 : /* the second pipeline sync */
802 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
803 :
804 : /* Read the NULL result at the end of the command */
805 0 : consume_null_result(conn);
806 :
807 : /* Try to send two queries in one command */
808 0 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
809 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
810 0 : if (PQpipelineSync(conn) != 1)
811 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
812 0 : goterror = false;
813 0 : while ((res = PQgetResult(conn)) != NULL)
814 : {
815 0 : switch (PQresultStatus(res))
816 : {
817 : case PGRES_FATAL_ERROR:
818 0 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
819 0 : pg_fatal("expected error about multiple commands, got %s",
820 : PQerrorMessage(conn));
821 0 : printf("got expected %s", PQerrorMessage(conn));
822 0 : goterror = true;
823 0 : break;
824 : default:
825 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
826 : break;
827 : }
828 0 : PQclear(res);
829 : }
830 0 : if (!goterror)
831 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
832 :
833 : /* the second pipeline sync */
834 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
835 :
836 0 : fprintf(stderr, "ok\n");
837 :
838 : /* Test single-row mode with an error partways */
839 0 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
840 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
841 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
842 0 : if (PQpipelineSync(conn) != 1)
843 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
844 0 : PQsetSingleRowMode(conn);
845 0 : goterror = false;
846 0 : gotrows = 0;
847 0 : while ((res = PQgetResult(conn)) != NULL)
848 : {
849 0 : switch (PQresultStatus(res))
850 : {
851 : case PGRES_SINGLE_TUPLE:
852 0 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
853 0 : gotrows++;
854 0 : break;
855 : case PGRES_FATAL_ERROR:
856 0 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
857 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
858 : PQerrorMessage(conn),
859 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
860 0 : printf("got expected division-by-zero\n");
861 0 : goterror = true;
862 0 : break;
863 : default:
864 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
865 : }
866 0 : PQclear(res);
867 : }
868 0 : if (!goterror)
869 0 : pg_fatal("did not get division-by-zero error");
870 0 : if (gotrows != 3)
871 0 : pg_fatal("did not get three rows");
872 :
873 : /* the third pipeline sync */
874 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
875 :
876 : /* We're still in pipeline mode... */
877 0 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
878 0 : pg_fatal("Fell out of pipeline mode somehow");
879 :
880 : /* until we end it, which we can safely do now */
881 0 : if (PQexitPipelineMode(conn) != 1)
882 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
883 : PQerrorMessage(conn));
884 :
885 0 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
886 0 : pg_fatal("exiting pipeline mode didn't seem to work");
887 :
888 : /*-
889 : * Since we fired the pipelines off without a surrounding xact, the results
890 : * should be:
891 : *
892 : * - Implicit xact started by server around 1st pipeline
893 : * - First insert applied
894 : * - Second statement aborted xact
895 : * - Third insert skipped
896 : * - Sync rolled back first implicit xact
897 : * - Implicit xact created by server around 2nd pipeline
898 : * - insert applied from 2nd pipeline
899 : * - Sync commits 2nd xact
900 : *
901 : * So we should only have the value 3 that we inserted.
902 : */
903 0 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
904 :
905 0 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
906 0 : pg_fatal("Expected tuples, got %s: %s",
907 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
908 0 : if (PQntuples(res) != 1)
909 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
910 0 : for (i = 0; i < PQntuples(res); i++)
911 : {
912 0 : const char *val = PQgetvalue(res, i, 0);
913 :
914 0 : if (strcmp(val, "3") != 0)
915 0 : pg_fatal("expected only insert with value 3, got %s", val);
916 0 : }
917 :
918 0 : PQclear(res);
919 :
920 0 : fprintf(stderr, "ok\n");
921 0 : }
922 :
923 : /* State machine enum for test_pipelined_insert */
924 : enum PipelineInsertStep
925 : {
926 : BI_BEGIN_TX,
927 : BI_DROP_TABLE,
928 : BI_CREATE_TABLE,
929 : BI_PREPARE,
930 : BI_INSERT_ROWS,
931 : BI_COMMIT_TX,
932 : BI_SYNC,
933 : BI_DONE,
934 : };
935 :
936 : static void
937 0 : test_pipelined_insert(PGconn *conn, int n_rows)
938 : {
939 0 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
940 0 : const char *insert_params[2];
941 0 : char insert_param_0[MAXINTLEN];
942 0 : char insert_param_1[MAXINT8LEN];
943 0 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
944 0 : recv_step = BI_BEGIN_TX;
945 0 : int rows_to_send,
946 : rows_to_receive;
947 :
948 0 : insert_params[0] = insert_param_0;
949 0 : insert_params[1] = insert_param_1;
950 :
951 0 : rows_to_send = rows_to_receive = n_rows;
952 :
953 : /*
954 : * Do a pipelined insert into a table created at the start of the pipeline
955 : */
956 0 : if (PQenterPipelineMode(conn) != 1)
957 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
958 :
959 0 : while (send_step != BI_PREPARE)
960 : {
961 0 : const char *sql;
962 :
963 0 : switch (send_step)
964 : {
965 : case BI_BEGIN_TX:
966 0 : sql = "BEGIN TRANSACTION";
967 0 : send_step = BI_DROP_TABLE;
968 0 : break;
969 :
970 : case BI_DROP_TABLE:
971 0 : sql = drop_table_sql;
972 0 : send_step = BI_CREATE_TABLE;
973 0 : break;
974 :
975 : case BI_CREATE_TABLE:
976 0 : sql = create_table_sql;
977 0 : send_step = BI_PREPARE;
978 0 : break;
979 :
980 : default:
981 0 : pg_fatal("invalid state");
982 : sql = NULL; /* keep compiler quiet */
983 : }
984 :
985 : pg_debug("sending: %s\n", sql);
986 0 : if (PQsendQueryParams(conn, sql,
987 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
988 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
989 0 : }
990 :
991 0 : Assert(send_step == BI_PREPARE);
992 : pg_debug("sending: %s\n", insert_sql2);
993 0 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
994 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
995 0 : send_step = BI_INSERT_ROWS;
996 :
997 : /*
998 : * Now we start inserting. We'll be sending enough data that we could fill
999 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1000 : * mode and consume input while we send more output. As results of each
1001 : * query are processed we should pop them to allow processing of the next
1002 : * query. There's no need to finish the pipeline before processing
1003 : * results.
1004 : */
1005 0 : if (PQsetnonblocking(conn, 1) != 0)
1006 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1007 :
1008 0 : while (recv_step != BI_DONE)
1009 : {
1010 0 : int sock;
1011 0 : fd_set input_mask;
1012 0 : fd_set output_mask;
1013 :
1014 0 : sock = PQsocket(conn);
1015 :
1016 0 : if (sock < 0)
1017 0 : break; /* shouldn't happen */
1018 :
1019 0 : FD_ZERO(&input_mask);
1020 0 : FD_SET(sock, &input_mask);
1021 0 : FD_ZERO(&output_mask);
1022 0 : FD_SET(sock, &output_mask);
1023 :
1024 0 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1025 : {
1026 0 : fprintf(stderr, "select() failed: %m\n");
1027 0 : exit_nicely(conn);
1028 0 : }
1029 :
1030 : /*
1031 : * Process any results, so we keep the server's output buffer free
1032 : * flowing and it can continue to process input
1033 : */
1034 0 : if (FD_ISSET(sock, &input_mask))
1035 : {
1036 0 : PQconsumeInput(conn);
1037 :
1038 : /* Read until we'd block if we tried to read */
1039 0 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1040 : {
1041 0 : PGresult *res;
1042 0 : const char *cmdtag = "";
1043 0 : const char *description = "";
1044 0 : int status;
1045 :
1046 : /*
1047 : * Read next result. If no more results from this query,
1048 : * advance to the next query
1049 : */
1050 0 : res = PQgetResult(conn);
1051 0 : if (res == NULL)
1052 0 : continue;
1053 :
1054 0 : status = PGRES_COMMAND_OK;
1055 0 : switch (recv_step)
1056 : {
1057 : case BI_BEGIN_TX:
1058 0 : cmdtag = "BEGIN";
1059 0 : recv_step++;
1060 0 : break;
1061 : case BI_DROP_TABLE:
1062 0 : cmdtag = "DROP TABLE";
1063 0 : recv_step++;
1064 0 : break;
1065 : case BI_CREATE_TABLE:
1066 0 : cmdtag = "CREATE TABLE";
1067 0 : recv_step++;
1068 0 : break;
1069 : case BI_PREPARE:
1070 0 : cmdtag = "";
1071 0 : description = "PREPARE";
1072 0 : recv_step++;
1073 0 : break;
1074 : case BI_INSERT_ROWS:
1075 0 : cmdtag = "INSERT";
1076 0 : rows_to_receive--;
1077 0 : if (rows_to_receive == 0)
1078 0 : recv_step++;
1079 0 : break;
1080 : case BI_COMMIT_TX:
1081 0 : cmdtag = "COMMIT";
1082 0 : recv_step++;
1083 0 : break;
1084 : case BI_SYNC:
1085 0 : cmdtag = "";
1086 0 : description = "SYNC";
1087 0 : status = PGRES_PIPELINE_SYNC;
1088 0 : recv_step++;
1089 0 : break;
1090 : case BI_DONE:
1091 : /* unreachable */
1092 0 : pg_fatal("unreachable state");
1093 : }
1094 :
1095 0 : if (PQresultStatus(res) != status)
1096 0 : pg_fatal("%s reported status %s, expected %s\n"
1097 : "Error message: \"%s\"",
1098 : description, PQresStatus(PQresultStatus(res)),
1099 : PQresStatus(status), PQerrorMessage(conn));
1100 :
1101 0 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1102 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1103 : description, cmdtag, PQcmdStatus(res));
1104 :
1105 : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1106 :
1107 0 : PQclear(res);
1108 0 : }
1109 0 : }
1110 :
1111 : /* Write more rows and/or the end pipeline message, if needed */
1112 0 : if (FD_ISSET(sock, &output_mask))
1113 : {
1114 0 : PQflush(conn);
1115 :
1116 0 : if (send_step == BI_INSERT_ROWS)
1117 : {
1118 0 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1119 : /* use up some buffer space with a wide value */
1120 0 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1121 :
1122 0 : if (PQsendQueryPrepared(conn, "my_insert",
1123 0 : 2, insert_params, NULL, NULL, 0) == 1)
1124 : {
1125 : pg_debug("sent row %d\n", rows_to_send);
1126 :
1127 0 : rows_to_send--;
1128 0 : if (rows_to_send == 0)
1129 0 : send_step++;
1130 0 : }
1131 : else
1132 : {
1133 : /*
1134 : * in nonblocking mode, so it's OK for an insert to fail
1135 : * to send
1136 : */
1137 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1138 0 : rows_to_send, PQerrorMessage(conn));
1139 : }
1140 0 : }
1141 0 : else if (send_step == BI_COMMIT_TX)
1142 : {
1143 0 : if (PQsendQueryParams(conn, "COMMIT",
1144 0 : 0, NULL, NULL, NULL, NULL, 0) == 1)
1145 : {
1146 : pg_debug("sent COMMIT\n");
1147 0 : send_step++;
1148 0 : }
1149 : else
1150 : {
1151 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1152 0 : PQerrorMessage(conn));
1153 : }
1154 0 : }
1155 0 : else if (send_step == BI_SYNC)
1156 : {
1157 0 : if (PQpipelineSync(conn) == 1)
1158 : {
1159 0 : fprintf(stdout, "pipeline sync sent\n");
1160 0 : send_step++;
1161 0 : }
1162 : else
1163 : {
1164 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1165 0 : PQerrorMessage(conn));
1166 : }
1167 0 : }
1168 0 : }
1169 0 : }
1170 :
1171 : /* We've got the sync message and the pipeline should be done */
1172 0 : if (PQexitPipelineMode(conn) != 1)
1173 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1174 : PQerrorMessage(conn));
1175 :
1176 0 : if (PQsetnonblocking(conn, 0) != 0)
1177 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1178 :
1179 0 : fprintf(stderr, "ok\n");
1180 0 : }
1181 :
1182 : static void
1183 0 : test_prepared(PGconn *conn)
1184 : {
1185 0 : PGresult *res = NULL;
1186 0 : Oid param_oids[1] = {INT4OID};
1187 0 : Oid expected_oids[4];
1188 0 : Oid typ;
1189 :
1190 0 : fprintf(stderr, "prepared... ");
1191 :
1192 0 : if (PQenterPipelineMode(conn) != 1)
1193 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1194 0 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1195 : "interval '1 sec'",
1196 0 : 1, param_oids) != 1)
1197 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1198 0 : expected_oids[0] = INT4OID;
1199 0 : expected_oids[1] = TEXTOID;
1200 0 : expected_oids[2] = NUMERICOID;
1201 0 : expected_oids[3] = INTERVALOID;
1202 0 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1203 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1204 0 : if (PQpipelineSync(conn) != 1)
1205 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1206 :
1207 0 : consume_result_status(conn, PGRES_COMMAND_OK);
1208 :
1209 0 : consume_null_result(conn);
1210 :
1211 0 : res = confirm_result_status(conn, PGRES_COMMAND_OK);
1212 0 : if (PQnfields(res) != lengthof(expected_oids))
1213 0 : pg_fatal("expected %zu columns, got %d",
1214 : lengthof(expected_oids), PQnfields(res));
1215 0 : for (int i = 0; i < PQnfields(res); i++)
1216 : {
1217 0 : typ = PQftype(res, i);
1218 0 : if (typ != expected_oids[i])
1219 0 : pg_fatal("field %d: expected type %u, got %u",
1220 : i, expected_oids[i], typ);
1221 0 : }
1222 0 : PQclear(res);
1223 :
1224 0 : consume_null_result(conn);
1225 :
1226 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1227 :
1228 0 : fprintf(stderr, "closing statement..");
1229 0 : if (PQsendClosePrepared(conn, "select_one") != 1)
1230 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1231 0 : if (PQpipelineSync(conn) != 1)
1232 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1233 :
1234 0 : consume_result_status(conn, PGRES_COMMAND_OK);
1235 :
1236 0 : consume_null_result(conn);
1237 :
1238 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1239 :
1240 0 : if (PQexitPipelineMode(conn) != 1)
1241 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1242 :
1243 : /* Now that it's closed we should get an error when describing */
1244 0 : res = PQdescribePrepared(conn, "select_one");
1245 0 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1246 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1247 0 : PQclear(res);
1248 :
1249 : /*
1250 : * Also test the blocking close, this should not fail since closing a
1251 : * non-existent prepared statement is a no-op
1252 : */
1253 0 : res = PQclosePrepared(conn, "select_one");
1254 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1255 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1256 0 : PQclear(res);
1257 :
1258 0 : fprintf(stderr, "creating portal... ");
1259 :
1260 0 : res = PQexec(conn, "BEGIN");
1261 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1262 0 : pg_fatal("BEGIN failed: %s", PQerrorMessage(conn));
1263 0 : PQclear(res);
1264 :
1265 0 : res = PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1266 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1267 0 : pg_fatal("DECLARE CURSOR failed: %s", PQerrorMessage(conn));
1268 0 : PQclear(res);
1269 :
1270 0 : PQenterPipelineMode(conn);
1271 0 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1272 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1273 0 : if (PQpipelineSync(conn) != 1)
1274 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275 :
1276 0 : res = confirm_result_status(conn, PGRES_COMMAND_OK);
1277 0 : typ = PQftype(res, 0);
1278 0 : if (typ != INT4OID)
1279 0 : pg_fatal("portal: expected type %u, got %u",
1280 : INT4OID, typ);
1281 0 : PQclear(res);
1282 :
1283 0 : consume_null_result(conn);
1284 :
1285 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1286 :
1287 0 : fprintf(stderr, "closing portal... ");
1288 0 : if (PQsendClosePortal(conn, "cursor_one") != 1)
1289 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1290 0 : if (PQpipelineSync(conn) != 1)
1291 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1292 :
1293 0 : consume_result_status(conn, PGRES_COMMAND_OK);
1294 :
1295 0 : consume_null_result(conn);
1296 :
1297 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1298 :
1299 0 : if (PQexitPipelineMode(conn) != 1)
1300 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1301 :
1302 : /* Now that it's closed we should get an error when describing */
1303 0 : res = PQdescribePortal(conn, "cursor_one");
1304 0 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1305 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1306 0 : PQclear(res);
1307 :
1308 : /*
1309 : * Also test the blocking close, this should not fail since closing a
1310 : * non-existent portal is a no-op
1311 : */
1312 0 : res = PQclosePortal(conn, "cursor_one");
1313 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1314 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1315 0 : PQclear(res);
1316 :
1317 0 : fprintf(stderr, "ok\n");
1318 0 : }
1319 :
1320 : /*
1321 : * Test max_protocol_version options.
1322 : */
1323 : static void
1324 0 : test_protocol_version(PGconn *conn)
1325 : {
1326 0 : const char **keywords;
1327 0 : const char **vals;
1328 0 : int nopts;
1329 0 : PQconninfoOption *opts = PQconninfo(conn);
1330 0 : int protocol_version;
1331 0 : int max_protocol_version_index = -1;
1332 0 : int i;
1333 :
1334 : /* Prepare keywords/vals arrays, copied from the existing connection. */
1335 0 : nopts = 0;
1336 0 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1337 0 : nopts++;
1338 0 : nopts++; /* NULL terminator */
1339 :
1340 0 : keywords = pg_malloc0(sizeof(char *) * nopts);
1341 0 : vals = pg_malloc0(sizeof(char *) * nopts);
1342 :
1343 0 : i = 0;
1344 0 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1345 : {
1346 : /*
1347 : * If the test already specified max_protocol_version, we want to
1348 : * replace it rather than attempting to override it. This matters when
1349 : * testing defaults, because empty option values at the end of the
1350 : * connection string won't replace earlier settings.
1351 : */
1352 0 : if (strcmp(opt->keyword, "max_protocol_version") == 0)
1353 0 : max_protocol_version_index = i;
1354 0 : else if (!opt->val)
1355 0 : continue;
1356 :
1357 0 : keywords[i] = opt->keyword;
1358 0 : vals[i] = opt->val;
1359 :
1360 0 : i++;
1361 0 : }
1362 :
1363 0 : Assert(max_protocol_version_index >= 0);
1364 :
1365 : /*
1366 : * Test default protocol_version
1367 : */
1368 0 : vals[max_protocol_version_index] = "";
1369 0 : conn = PQconnectdbParams(keywords, vals, false);
1370 :
1371 0 : if (PQstatus(conn) != CONNECTION_OK)
1372 0 : pg_fatal("Connection to database failed: %s",
1373 : PQerrorMessage(conn));
1374 :
1375 0 : protocol_version = PQfullProtocolVersion(conn);
1376 0 : if (protocol_version != 30000)
1377 0 : pg_fatal("expected 30000, got %d", protocol_version);
1378 :
1379 0 : PQfinish(conn);
1380 :
1381 : /*
1382 : * Test max_protocol_version=3.0
1383 : */
1384 0 : vals[max_protocol_version_index] = "3.0";
1385 0 : conn = PQconnectdbParams(keywords, vals, false);
1386 :
1387 0 : if (PQstatus(conn) != CONNECTION_OK)
1388 0 : pg_fatal("Connection to database failed: %s",
1389 : PQerrorMessage(conn));
1390 :
1391 0 : protocol_version = PQfullProtocolVersion(conn);
1392 0 : if (protocol_version != 30000)
1393 0 : pg_fatal("expected 30000, got %d", protocol_version);
1394 :
1395 0 : PQfinish(conn);
1396 :
1397 : /*
1398 : * Test max_protocol_version=3.1. It's not valid, we went straight from
1399 : * 3.0 to 3.2.
1400 : */
1401 0 : vals[max_protocol_version_index] = "3.1";
1402 0 : conn = PQconnectdbParams(keywords, vals, false);
1403 :
1404 0 : if (PQstatus(conn) != CONNECTION_BAD)
1405 0 : pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1406 :
1407 0 : PQfinish(conn);
1408 :
1409 : /*
1410 : * Test max_protocol_version=3.2
1411 : */
1412 0 : vals[max_protocol_version_index] = "3.2";
1413 0 : conn = PQconnectdbParams(keywords, vals, false);
1414 :
1415 0 : if (PQstatus(conn) != CONNECTION_OK)
1416 0 : pg_fatal("Connection to database failed: %s",
1417 : PQerrorMessage(conn));
1418 :
1419 0 : protocol_version = PQfullProtocolVersion(conn);
1420 0 : if (protocol_version != 30002)
1421 0 : pg_fatal("expected 30002, got %d", protocol_version);
1422 :
1423 0 : PQfinish(conn);
1424 :
1425 : /*
1426 : * Test max_protocol_version=latest. 'latest' currently means '3.2'.
1427 : */
1428 0 : vals[max_protocol_version_index] = "latest";
1429 0 : conn = PQconnectdbParams(keywords, vals, false);
1430 :
1431 0 : if (PQstatus(conn) != CONNECTION_OK)
1432 0 : pg_fatal("Connection to database failed: %s",
1433 : PQerrorMessage(conn));
1434 :
1435 0 : protocol_version = PQfullProtocolVersion(conn);
1436 0 : if (protocol_version != 30002)
1437 0 : pg_fatal("expected 30002, got %d", protocol_version);
1438 :
1439 0 : PQfinish(conn);
1440 :
1441 0 : pfree(keywords);
1442 0 : pfree(vals);
1443 0 : PQconninfoFree(opts);
1444 0 : }
1445 :
1446 : /* Notice processor: print notices, and count how many we got */
1447 : static void
1448 0 : notice_processor(void *arg, const char *message)
1449 : {
1450 0 : int *n_notices = (int *) arg;
1451 :
1452 0 : (*n_notices)++;
1453 0 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1454 0 : }
1455 :
1456 : /* Verify behavior in "idle" state */
1457 : static void
1458 0 : test_pipeline_idle(PGconn *conn)
1459 : {
1460 0 : int n_notices = 0;
1461 :
1462 0 : fprintf(stderr, "\npipeline idle...\n");
1463 :
1464 0 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1465 :
1466 : /* Try to exit pipeline mode in pipeline-idle state */
1467 0 : if (PQenterPipelineMode(conn) != 1)
1468 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1469 0 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1470 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1471 0 : PQsendFlushRequest(conn);
1472 :
1473 0 : consume_result_status(conn, PGRES_TUPLES_OK);
1474 :
1475 0 : consume_null_result(conn);
1476 :
1477 0 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1478 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1479 0 : if (PQexitPipelineMode(conn) == 1)
1480 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1481 0 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1482 0 : strlen("cannot exit pipeline mode")) != 0)
1483 0 : pg_fatal("did not get expected error; got: %s",
1484 : PQerrorMessage(conn));
1485 0 : PQsendFlushRequest(conn);
1486 :
1487 0 : consume_result_status(conn, PGRES_TUPLES_OK);
1488 :
1489 0 : consume_null_result(conn);
1490 :
1491 0 : if (PQexitPipelineMode(conn) != 1)
1492 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1493 :
1494 0 : if (n_notices > 0)
1495 0 : pg_fatal("got %d notice(s)", n_notices);
1496 0 : fprintf(stderr, "ok - 1\n");
1497 :
1498 : /* Have a WARNING in the middle of a resultset */
1499 0 : if (PQenterPipelineMode(conn) != 1)
1500 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1501 0 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1502 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1503 0 : PQsendFlushRequest(conn);
1504 :
1505 0 : consume_result_status(conn, PGRES_TUPLES_OK);
1506 :
1507 0 : if (PQexitPipelineMode(conn) != 1)
1508 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1509 0 : fprintf(stderr, "ok - 2\n");
1510 0 : }
1511 :
1512 : static void
1513 0 : test_simple_pipeline(PGconn *conn)
1514 : {
1515 0 : const char *dummy_params[1] = {"1"};
1516 0 : Oid dummy_param_oids[1] = {INT4OID};
1517 :
1518 0 : fprintf(stderr, "simple pipeline... ");
1519 :
1520 : /*
1521 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1522 : * process the results of as they come in.
1523 : *
1524 : * For a simple case we should be able to do this without interim
1525 : * processing of results since our output buffer will give us enough slush
1526 : * to work with and we won't block on sending. So blocking mode is fine.
1527 : */
1528 0 : if (PQisnonblocking(conn))
1529 0 : pg_fatal("Expected blocking connection mode");
1530 :
1531 0 : if (PQenterPipelineMode(conn) != 1)
1532 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1533 :
1534 0 : if (PQsendQueryParams(conn, "SELECT $1",
1535 0 : 1, dummy_param_oids, dummy_params,
1536 0 : NULL, NULL, 0) != 1)
1537 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1538 :
1539 0 : if (PQexitPipelineMode(conn) != 0)
1540 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1541 :
1542 0 : if (PQpipelineSync(conn) != 1)
1543 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1544 :
1545 0 : consume_result_status(conn, PGRES_TUPLES_OK);
1546 :
1547 0 : consume_null_result(conn);
1548 :
1549 : /*
1550 : * Even though we've processed the result there's still a sync to come and
1551 : * we can't exit pipeline mode yet
1552 : */
1553 0 : if (PQexitPipelineMode(conn) != 0)
1554 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1555 :
1556 0 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1557 :
1558 0 : consume_null_result(conn);
1559 :
1560 : /* We're still in pipeline mode... */
1561 0 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1562 0 : pg_fatal("Fell out of pipeline mode somehow");
1563 :
1564 : /* ... until we end it, which we can safely do now */
1565 0 : if (PQexitPipelineMode(conn) != 1)
1566 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1567 : PQerrorMessage(conn));
1568 :
1569 0 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1570 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1571 :
1572 0 : fprintf(stderr, "ok\n");
1573 0 : }
1574 :
1575 : static void
1576 0 : test_singlerowmode(PGconn *conn)
1577 : {
1578 0 : PGresult *res;
1579 0 : int i;
1580 0 : bool pipeline_ended = false;
1581 :
1582 0 : if (PQenterPipelineMode(conn) != 1)
1583 0 : pg_fatal("failed to enter pipeline mode: %s",
1584 : PQerrorMessage(conn));
1585 :
1586 : /* One series of three commands, using single-row mode for the first two. */
1587 0 : for (i = 0; i < 3; i++)
1588 : {
1589 0 : char *param[1];
1590 :
1591 0 : param[0] = psprintf("%d", 44 + i);
1592 :
1593 0 : if (PQsendQueryParams(conn,
1594 : "SELECT generate_series(42, $1)",
1595 : 1,
1596 : NULL,
1597 0 : (const char **) param,
1598 : NULL,
1599 : NULL,
1600 0 : 0) != 1)
1601 0 : pg_fatal("failed to send query: %s",
1602 : PQerrorMessage(conn));
1603 0 : pfree(param[0]);
1604 0 : }
1605 0 : if (PQpipelineSync(conn) != 1)
1606 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1607 :
1608 0 : for (i = 0; !pipeline_ended; i++)
1609 : {
1610 0 : bool first = true;
1611 0 : bool saw_ending_tuplesok;
1612 0 : bool isSingleTuple = false;
1613 :
1614 : /* Set single row mode for only first 2 SELECT queries */
1615 0 : if (i < 2)
1616 : {
1617 0 : if (PQsetSingleRowMode(conn) != 1)
1618 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1619 0 : }
1620 :
1621 : /* Consume rows for this query */
1622 0 : saw_ending_tuplesok = false;
1623 0 : while ((res = PQgetResult(conn)) != NULL)
1624 : {
1625 0 : ExecStatusType est = PQresultStatus(res);
1626 :
1627 0 : if (est == PGRES_PIPELINE_SYNC)
1628 : {
1629 0 : fprintf(stderr, "end of pipeline reached\n");
1630 0 : pipeline_ended = true;
1631 0 : PQclear(res);
1632 0 : if (i != 3)
1633 0 : pg_fatal("Expected three results, got %d", i);
1634 0 : break;
1635 : }
1636 :
1637 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1638 0 : if (first)
1639 : {
1640 0 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1641 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1642 : i, PQresStatus(est));
1643 0 : if (i >= 2 && est != PGRES_TUPLES_OK)
1644 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1645 : i, PQresStatus(est));
1646 0 : first = false;
1647 0 : }
1648 :
1649 0 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1650 0 : switch (est)
1651 : {
1652 : case PGRES_TUPLES_OK:
1653 0 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1654 0 : saw_ending_tuplesok = true;
1655 0 : if (isSingleTuple)
1656 : {
1657 0 : if (PQntuples(res) == 0)
1658 0 : fprintf(stderr, "all tuples received in query %d\n", i);
1659 : else
1660 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1661 0 : }
1662 0 : break;
1663 :
1664 : case PGRES_SINGLE_TUPLE:
1665 0 : isSingleTuple = true;
1666 0 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1667 0 : break;
1668 :
1669 : default:
1670 0 : pg_fatal("unexpected");
1671 : }
1672 0 : PQclear(res);
1673 0 : }
1674 0 : if (!pipeline_ended && !saw_ending_tuplesok)
1675 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1676 0 : }
1677 :
1678 : /*
1679 : * Now issue one command, get its results in with single-row mode, then
1680 : * issue another command, and get its results in normal mode; make sure
1681 : * the single-row mode flag is reset as expected.
1682 : */
1683 0 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1684 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1685 0 : pg_fatal("failed to send query: %s",
1686 : PQerrorMessage(conn));
1687 0 : if (PQsendFlushRequest(conn) != 1)
1688 0 : pg_fatal("failed to send flush request");
1689 0 : if (PQsetSingleRowMode(conn) != 1)
1690 0 : pg_fatal("PQsetSingleRowMode() failed");
1691 :
1692 0 : consume_result_status(conn, PGRES_SINGLE_TUPLE);
1693 :
1694 0 : consume_result_status(conn, PGRES_TUPLES_OK);
1695 :
1696 0 : consume_null_result(conn);
1697 :
1698 0 : if (PQsendQueryParams(conn, "SELECT 1",
1699 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1700 0 : pg_fatal("failed to send query: %s",
1701 : PQerrorMessage(conn));
1702 0 : if (PQsendFlushRequest(conn) != 1)
1703 0 : pg_fatal("failed to send flush request");
1704 :
1705 0 : consume_result_status(conn, PGRES_TUPLES_OK);
1706 :
1707 0 : consume_null_result(conn);
1708 :
1709 : /*
1710 : * Try chunked mode as well; make sure that it correctly delivers a
1711 : * partial final chunk.
1712 : */
1713 0 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1714 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1715 0 : pg_fatal("failed to send query: %s",
1716 : PQerrorMessage(conn));
1717 0 : if (PQsendFlushRequest(conn) != 1)
1718 0 : pg_fatal("failed to send flush request");
1719 0 : if (PQsetChunkedRowsMode(conn, 3) != 1)
1720 0 : pg_fatal("PQsetChunkedRowsMode() failed");
1721 :
1722 0 : res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
1723 0 : if (PQntuples(res) != 3)
1724 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1725 0 : PQclear(res);
1726 :
1727 0 : res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
1728 0 : if (PQntuples(res) != 2)
1729 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1730 0 : PQclear(res);
1731 :
1732 0 : res = confirm_result_status(conn, PGRES_TUPLES_OK);
1733 0 : if (PQntuples(res) != 0)
1734 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1735 0 : PQclear(res);
1736 :
1737 0 : consume_null_result(conn);
1738 :
1739 0 : if (PQexitPipelineMode(conn) != 1)
1740 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1741 :
1742 0 : fprintf(stderr, "ok\n");
1743 0 : }
1744 :
1745 : /*
1746 : * Simple test to verify that a pipeline is discarded as a whole when there's
1747 : * an error, ignoring transaction commands.
1748 : */
1749 : static void
1750 0 : test_transaction(PGconn *conn)
1751 : {
1752 0 : PGresult *res;
1753 0 : bool expect_null;
1754 0 : int num_syncs = 0;
1755 :
1756 0 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1757 : "CREATE TABLE pq_pipeline_tst (id int)");
1758 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1759 0 : pg_fatal("failed to create test table: %s",
1760 : PQerrorMessage(conn));
1761 0 : PQclear(res);
1762 :
1763 0 : if (PQenterPipelineMode(conn) != 1)
1764 0 : pg_fatal("failed to enter pipeline mode: %s",
1765 : PQerrorMessage(conn));
1766 0 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1767 0 : pg_fatal("could not send prepare on pipeline: %s",
1768 : PQerrorMessage(conn));
1769 :
1770 0 : if (PQsendQueryParams(conn,
1771 : "BEGIN",
1772 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1773 0 : pg_fatal("failed to send query: %s",
1774 : PQerrorMessage(conn));
1775 0 : if (PQsendQueryParams(conn,
1776 : "SELECT 0/0",
1777 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1778 0 : pg_fatal("failed to send query: %s",
1779 : PQerrorMessage(conn));
1780 :
1781 : /*
1782 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1783 : * get out of the pipeline-aborted state first.
1784 : */
1785 0 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1786 0 : pg_fatal("failed to execute prepared: %s",
1787 : PQerrorMessage(conn));
1788 :
1789 : /* This insert fails because we're in pipeline-aborted state */
1790 0 : if (PQsendQueryParams(conn,
1791 : "INSERT INTO pq_pipeline_tst VALUES (1)",
1792 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1793 0 : pg_fatal("failed to send query: %s",
1794 : PQerrorMessage(conn));
1795 0 : if (PQpipelineSync(conn) != 1)
1796 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1797 0 : num_syncs++;
1798 :
1799 : /*
1800 : * This insert fails even though the pipeline got a SYNC, because we're in
1801 : * an aborted transaction
1802 : */
1803 0 : if (PQsendQueryParams(conn,
1804 : "INSERT INTO pq_pipeline_tst VALUES (2)",
1805 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1806 0 : pg_fatal("failed to send query: %s",
1807 : PQerrorMessage(conn));
1808 0 : if (PQpipelineSync(conn) != 1)
1809 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1810 0 : num_syncs++;
1811 :
1812 : /*
1813 : * Send ROLLBACK using prepared stmt. This one works because we just did
1814 : * PQpipelineSync above.
1815 : */
1816 0 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1817 0 : pg_fatal("failed to execute prepared: %s",
1818 : PQerrorMessage(conn));
1819 :
1820 : /*
1821 : * Now that we're out of a transaction and in pipeline-good mode, this
1822 : * insert works
1823 : */
1824 0 : if (PQsendQueryParams(conn,
1825 : "INSERT INTO pq_pipeline_tst VALUES (3)",
1826 0 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1827 0 : pg_fatal("failed to send query: %s",
1828 : PQerrorMessage(conn));
1829 : /* Send two syncs now -- match up to SYNC messages below */
1830 0 : if (PQpipelineSync(conn) != 1)
1831 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1832 0 : num_syncs++;
1833 0 : if (PQpipelineSync(conn) != 1)
1834 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1835 0 : num_syncs++;
1836 :
1837 0 : expect_null = false;
1838 0 : for (int i = 0;; i++)
1839 : {
1840 0 : ExecStatusType restype;
1841 :
1842 0 : res = PQgetResult(conn);
1843 0 : if (res == NULL)
1844 : {
1845 0 : printf("%d: got NULL result\n", i);
1846 0 : if (!expect_null)
1847 0 : pg_fatal("did not expect NULL here");
1848 0 : expect_null = false;
1849 0 : continue;
1850 : }
1851 0 : restype = PQresultStatus(res);
1852 0 : printf("%d: got status %s", i, PQresStatus(restype));
1853 0 : if (expect_null)
1854 0 : pg_fatal("expected NULL");
1855 0 : if (restype == PGRES_FATAL_ERROR)
1856 0 : printf("; error: %s", PQerrorMessage(conn));
1857 0 : else if (restype == PGRES_PIPELINE_ABORTED)
1858 : {
1859 0 : printf(": command didn't run because pipeline aborted\n");
1860 0 : }
1861 : else
1862 0 : printf("\n");
1863 0 : PQclear(res);
1864 :
1865 0 : if (restype == PGRES_PIPELINE_SYNC)
1866 0 : num_syncs--;
1867 : else
1868 0 : expect_null = true;
1869 0 : if (num_syncs <= 0)
1870 0 : break;
1871 0 : }
1872 :
1873 0 : consume_null_result(conn);
1874 :
1875 0 : if (PQexitPipelineMode(conn) != 1)
1876 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1877 :
1878 : /* We expect to find one tuple containing the value "3" */
1879 0 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1880 0 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1881 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1882 0 : if (PQntuples(res) != 1)
1883 0 : pg_fatal("did not get 1 tuple");
1884 0 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1885 0 : pg_fatal("did not get expected tuple");
1886 0 : PQclear(res);
1887 :
1888 0 : fprintf(stderr, "ok\n");
1889 0 : }
1890 :
1891 : /*
1892 : * In this test mode we send a stream of queries, with one in the middle
1893 : * causing an error. Verify that we can still send some more after the
1894 : * error and have libpq work properly.
1895 : */
1896 : static void
1897 0 : test_uniqviol(PGconn *conn)
1898 : {
1899 0 : int sock = PQsocket(conn);
1900 0 : PGresult *res;
1901 0 : Oid paramTypes[2] = {INT8OID, INT8OID};
1902 0 : const char *paramValues[2];
1903 0 : char paramValue0[MAXINT8LEN];
1904 0 : char paramValue1[MAXINT8LEN];
1905 0 : int ctr = 0;
1906 0 : int numsent = 0;
1907 0 : int results = 0;
1908 0 : bool read_done = false;
1909 0 : bool write_done = false;
1910 0 : bool error_sent = false;
1911 0 : bool got_error = false;
1912 0 : int switched = 0;
1913 0 : int socketful = 0;
1914 0 : fd_set in_fds;
1915 0 : fd_set out_fds;
1916 :
1917 0 : fprintf(stderr, "uniqviol ...");
1918 :
1919 0 : PQsetnonblocking(conn, 1);
1920 :
1921 0 : paramValues[0] = paramValue0;
1922 0 : paramValues[1] = paramValue1;
1923 0 : sprintf(paramValue1, "42");
1924 :
1925 0 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1926 : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1927 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1928 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1929 0 : PQclear(res);
1930 :
1931 0 : res = PQexec(conn, "begin");
1932 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1933 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1934 0 : PQclear(res);
1935 :
1936 0 : res = PQprepare(conn, "insertion",
1937 : "insert into ppln_uniqviol values ($1, $2) returning id",
1938 0 : 2, paramTypes);
1939 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1940 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1941 0 : PQclear(res);
1942 :
1943 0 : if (PQenterPipelineMode(conn) != 1)
1944 0 : pg_fatal("failed to enter pipeline mode");
1945 :
1946 0 : while (!read_done)
1947 : {
1948 : /*
1949 : * Avoid deadlocks by reading everything the server has sent before
1950 : * sending anything. (Special precaution is needed here to process
1951 : * PQisBusy before testing the socket for read-readiness, because the
1952 : * socket does not turn read-ready after "sending" queries in aborted
1953 : * pipeline mode.)
1954 : */
1955 0 : while (PQisBusy(conn) == 0)
1956 : {
1957 0 : bool new_error;
1958 :
1959 0 : if (results >= numsent)
1960 : {
1961 0 : if (write_done)
1962 0 : read_done = true;
1963 0 : break;
1964 : }
1965 :
1966 0 : res = PQgetResult(conn);
1967 0 : new_error = process_result(conn, res, results, numsent);
1968 0 : if (new_error && got_error)
1969 0 : pg_fatal("got two errors");
1970 0 : got_error |= new_error;
1971 0 : if (results++ >= numsent - 1)
1972 : {
1973 0 : if (write_done)
1974 0 : read_done = true;
1975 0 : break;
1976 : }
1977 0 : }
1978 :
1979 0 : if (read_done)
1980 0 : break;
1981 :
1982 0 : FD_ZERO(&out_fds);
1983 0 : FD_SET(sock, &out_fds);
1984 :
1985 0 : FD_ZERO(&in_fds);
1986 0 : FD_SET(sock, &in_fds);
1987 :
1988 0 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1989 : {
1990 0 : if (errno == EINTR)
1991 0 : continue;
1992 0 : pg_fatal("select() failed: %m");
1993 : }
1994 :
1995 0 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1996 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1997 :
1998 : /*
1999 : * If the socket is writable and we haven't finished sending queries,
2000 : * send some.
2001 : */
2002 0 : if (!write_done && FD_ISSET(sock, &out_fds))
2003 : {
2004 0 : for (;;)
2005 : {
2006 0 : int flush;
2007 :
2008 : /*
2009 : * provoke uniqueness violation exactly once after having
2010 : * switched to read mode.
2011 : */
2012 0 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2013 : {
2014 0 : sprintf(paramValue0, "%d", numsent / 2);
2015 0 : fprintf(stderr, "E");
2016 0 : error_sent = true;
2017 0 : }
2018 : else
2019 : {
2020 0 : fprintf(stderr, ".");
2021 0 : sprintf(paramValue0, "%d", ctr++);
2022 : }
2023 :
2024 0 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2025 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2026 0 : numsent++;
2027 :
2028 : /* Are we done writing? */
2029 0 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
2030 : {
2031 0 : if (PQsendFlushRequest(conn) != 1)
2032 0 : pg_fatal("failed to send flush request");
2033 0 : write_done = true;
2034 0 : fprintf(stderr, "\ndone writing\n");
2035 0 : PQflush(conn);
2036 0 : break;
2037 : }
2038 :
2039 : /* is the outgoing socket full? */
2040 0 : flush = PQflush(conn);
2041 0 : if (flush == -1)
2042 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2043 0 : if (flush == 1)
2044 : {
2045 0 : if (socketful == 0)
2046 0 : socketful = numsent;
2047 0 : fprintf(stderr, "\nswitch to reading\n");
2048 0 : switched++;
2049 0 : break;
2050 : }
2051 0 : }
2052 0 : }
2053 : }
2054 :
2055 0 : if (!got_error)
2056 0 : pg_fatal("did not get expected error");
2057 :
2058 0 : fprintf(stderr, "ok\n");
2059 0 : }
2060 :
2061 : /*
2062 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2063 : * the expected NULL that should follow it.
2064 : *
2065 : * Returns true if we read a fatal error message, otherwise false.
2066 : */
2067 : static bool
2068 0 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2069 : {
2070 0 : bool got_error = false;
2071 :
2072 0 : if (res == NULL)
2073 0 : pg_fatal("got unexpected NULL");
2074 :
2075 0 : switch (PQresultStatus(res))
2076 : {
2077 : case PGRES_FATAL_ERROR:
2078 0 : got_error = true;
2079 0 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2080 0 : PQclear(res);
2081 0 : consume_null_result(conn);
2082 0 : break;
2083 :
2084 : case PGRES_TUPLES_OK:
2085 0 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2086 0 : PQclear(res);
2087 0 : consume_null_result(conn);
2088 0 : break;
2089 :
2090 : case PGRES_PIPELINE_ABORTED:
2091 0 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2092 0 : PQclear(res);
2093 0 : consume_null_result(conn);
2094 0 : break;
2095 :
2096 : default:
2097 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2098 : }
2099 :
2100 0 : return got_error;
2101 0 : }
2102 :
2103 :
2104 : static void
2105 0 : usage(const char *progname)
2106 : {
2107 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2108 0 : fprintf(stderr, "Usage:\n");
2109 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
2110 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2111 0 : fprintf(stderr, "\nOptions:\n");
2112 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2113 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2114 0 : }
2115 :
2116 : static void
2117 0 : print_test_list(void)
2118 : {
2119 0 : printf("cancel\n");
2120 0 : printf("disallowed_in_pipeline\n");
2121 0 : printf("multi_pipelines\n");
2122 0 : printf("nosync\n");
2123 0 : printf("pipeline_abort\n");
2124 0 : printf("pipeline_idle\n");
2125 0 : printf("pipelined_insert\n");
2126 0 : printf("prepared\n");
2127 0 : printf("protocol_version\n");
2128 0 : printf("simple_pipeline\n");
2129 0 : printf("singlerow\n");
2130 0 : printf("transaction\n");
2131 0 : printf("uniqviol\n");
2132 0 : }
2133 :
2134 : int
2135 0 : main(int argc, char **argv)
2136 : {
2137 0 : const char *conninfo = "";
2138 0 : PGconn *conn;
2139 0 : FILE *trace = NULL;
2140 0 : char *testname;
2141 0 : int numrows = 10000;
2142 0 : PGresult *res;
2143 0 : int c;
2144 :
2145 0 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2146 : {
2147 0 : switch (c)
2148 : {
2149 : case 'r': /* numrows */
2150 0 : errno = 0;
2151 0 : numrows = strtol(optarg, NULL, 10);
2152 0 : if (errno != 0 || numrows <= 0)
2153 : {
2154 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2155 0 : optarg);
2156 0 : exit(1);
2157 : }
2158 0 : break;
2159 : case 't': /* trace file */
2160 0 : tracefile = pg_strdup(optarg);
2161 0 : break;
2162 : }
2163 : }
2164 :
2165 0 : if (optind < argc)
2166 : {
2167 0 : testname = pg_strdup(argv[optind]);
2168 0 : optind++;
2169 0 : }
2170 : else
2171 : {
2172 0 : usage(argv[0]);
2173 0 : exit(1);
2174 : }
2175 :
2176 0 : if (strcmp(testname, "tests") == 0)
2177 : {
2178 0 : print_test_list();
2179 0 : exit(0);
2180 : }
2181 :
2182 0 : if (optind < argc)
2183 : {
2184 0 : conninfo = pg_strdup(argv[optind]);
2185 0 : optind++;
2186 0 : }
2187 :
2188 : /* Make a connection to the database */
2189 0 : conn = PQconnectdb(conninfo);
2190 0 : if (PQstatus(conn) != CONNECTION_OK)
2191 : {
2192 0 : fprintf(stderr, "Connection to database failed: %s\n",
2193 0 : PQerrorMessage(conn));
2194 0 : exit_nicely(conn);
2195 0 : }
2196 :
2197 0 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2198 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2199 0 : pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2200 0 : PQclear(res);
2201 0 : res = PQexec(conn, "SET debug_parallel_query = off");
2202 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2203 0 : pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2204 0 : PQclear(res);
2205 :
2206 : /* Set the trace file, if requested */
2207 0 : if (tracefile != NULL)
2208 : {
2209 0 : if (strcmp(tracefile, "-") == 0)
2210 0 : trace = stdout;
2211 : else
2212 0 : trace = fopen(tracefile, "w");
2213 0 : if (trace == NULL)
2214 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2215 :
2216 : /* Make it line-buffered */
2217 0 : setvbuf(trace, NULL, PG_IOLBF, 0);
2218 :
2219 0 : PQtrace(conn, trace);
2220 0 : PQsetTraceFlags(conn,
2221 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2222 0 : }
2223 :
2224 0 : if (strcmp(testname, "cancel") == 0)
2225 0 : test_cancel(conn);
2226 0 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2227 0 : test_disallowed_in_pipeline(conn);
2228 0 : else if (strcmp(testname, "multi_pipelines") == 0)
2229 0 : test_multi_pipelines(conn);
2230 0 : else if (strcmp(testname, "nosync") == 0)
2231 0 : test_nosync(conn);
2232 0 : else if (strcmp(testname, "pipeline_abort") == 0)
2233 0 : test_pipeline_abort(conn);
2234 0 : else if (strcmp(testname, "pipeline_idle") == 0)
2235 0 : test_pipeline_idle(conn);
2236 0 : else if (strcmp(testname, "pipelined_insert") == 0)
2237 0 : test_pipelined_insert(conn, numrows);
2238 0 : else if (strcmp(testname, "prepared") == 0)
2239 0 : test_prepared(conn);
2240 0 : else if (strcmp(testname, "protocol_version") == 0)
2241 0 : test_protocol_version(conn);
2242 0 : else if (strcmp(testname, "simple_pipeline") == 0)
2243 0 : test_simple_pipeline(conn);
2244 0 : else if (strcmp(testname, "singlerow") == 0)
2245 0 : test_singlerowmode(conn);
2246 0 : else if (strcmp(testname, "transaction") == 0)
2247 0 : test_transaction(conn);
2248 0 : else if (strcmp(testname, "uniqviol") == 0)
2249 0 : test_uniqviol(conn);
2250 : else
2251 : {
2252 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2253 0 : exit(1);
2254 : }
2255 :
2256 : /* close the connection to the database and cleanup */
2257 0 : PQfinish(conn);
2258 :
2259 0 : if (trace && trace != stdout)
2260 0 : fclose(trace);
2261 :
2262 0 : return 0;
2263 0 : }
|