LCOV - code coverage report
Current view: top level - src/test/modules/libpq_pipeline - libpq_pipeline.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 1244 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 27 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * libpq_pipeline.c
       4              :  *              Verify libpq pipeline execution functionality
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *              src/test/modules/libpq_pipeline/libpq_pipeline.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres_fe.h"
      17              : 
      18              : #include <sys/select.h>
      19              : #include <sys/time.h>
      20              : 
      21              : #include "catalog/pg_type_d.h"
      22              : #include "libpq-fe.h"
      23              : #include "pg_getopt.h"
      24              : 
      25              : 
      26              : static void exit_nicely(PGconn *conn);
      27              : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
      28              :                         pg_attribute_printf(2, 3);
      29              : static bool process_result(PGconn *conn, PGresult *res, int results,
      30              :                                                    int numsent);
      31              : 
      32              : static const char *const progname = "libpq_pipeline";
      33              : 
      34              : /* Options and defaults */
      35              : static char *tracefile = NULL;  /* path to PQtrace() file */
      36              : 
      37              : 
      38              : #ifdef DEBUG_OUTPUT
      39              : #define pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
      40              : #else
      41              : #define pg_debug(...)
      42              : #endif
      43              : 
      44              : static const char *const drop_table_sql =
      45              : "DROP TABLE IF EXISTS pq_pipeline_demo";
      46              : static const char *const create_table_sql =
      47              : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
      48              : "int8filler int8);";
      49              : static const char *const insert_sql =
      50              : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
      51              : static const char *const insert_sql2 =
      52              : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
      53              : 
      54              : /* max char length of an int32/64, plus sign and null terminator */
      55              : #define MAXINTLEN 12
      56              : #define MAXINT8LEN 20
      57              : 
      58              : static void
      59            0 : exit_nicely(PGconn *conn)
      60              : {
      61            0 :         PQfinish(conn);
      62            0 :         exit(1);
      63              : }
      64              : 
      65              : /*
      66              :  * The following few functions are wrapped in macros to make the reported line
      67              :  * number in an error match the line number of the invocation.
      68              :  */
      69              : 
      70              : /*
      71              :  * Print an error to stderr and terminate the program.
      72              :  */
      73              : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
      74              : pg_noreturn static void
      75            0 : pg_fatal_impl(int line, const char *fmt,...)
      76              : {
      77            0 :         va_list         args;
      78              : 
      79            0 :         fflush(stdout);
      80              : 
      81            0 :         fprintf(stderr, "\n%s:%d: ", progname, line);
      82            0 :         va_start(args, fmt);
      83            0 :         vfprintf(stderr, fmt, args);
      84            0 :         va_end(args);
      85            0 :         Assert(fmt[strlen(fmt) - 1] != '\n');
      86            0 :         fprintf(stderr, "\n");
      87            0 :         exit(1);
      88              : }
      89              : 
      90              : /*
      91              :  * Check that libpq next returns a PGresult with the specified status,
      92              :  * returning the PGresult so that caller can perform additional checks.
      93              :  */
      94              : #define confirm_result_status(conn, status) confirm_result_status_impl(__LINE__, conn, status)
      95              : static PGresult *
      96            0 : confirm_result_status_impl(int line, PGconn *conn, ExecStatusType status)
      97              : {
      98            0 :         PGresult   *res;
      99              : 
     100            0 :         res = PQgetResult(conn);
     101            0 :         if (res == NULL)
     102            0 :                 pg_fatal_impl(line, "PQgetResult returned null unexpectedly: %s",
     103            0 :                                           PQerrorMessage(conn));
     104            0 :         if (PQresultStatus(res) != status)
     105            0 :                 pg_fatal_impl(line, "PQgetResult returned status %s, expected %s: %s",
     106            0 :                                           PQresStatus(PQresultStatus(res)),
     107            0 :                                           PQresStatus(status),
     108            0 :                                           PQerrorMessage(conn));
     109            0 :         return res;
     110            0 : }
     111              : 
     112              : /*
     113              :  * Check that libpq next returns a PGresult with the specified status,
     114              :  * then free the PGresult.
     115              :  */
     116              : #define consume_result_status(conn, status) consume_result_status_impl(__LINE__, conn, status)
     117              : static void
     118            0 : consume_result_status_impl(int line, PGconn *conn, ExecStatusType status)
     119              : {
     120            0 :         PGresult   *res;
     121              : 
     122            0 :         res = confirm_result_status_impl(line, conn, status);
     123            0 :         PQclear(res);
     124            0 : }
     125              : 
     126              : /*
     127              :  * Check that libpq next returns a null PGresult.
     128              :  */
     129              : #define consume_null_result(conn) consume_null_result_impl(__LINE__, conn)
     130              : static void
     131            0 : consume_null_result_impl(int line, PGconn *conn)
     132              : {
     133            0 :         PGresult   *res;
     134              : 
     135            0 :         res = PQgetResult(conn);
     136            0 :         if (res != NULL)
     137            0 :                 pg_fatal_impl(line, "expected NULL PGresult, got %s: %s",
     138            0 :                                           PQresStatus(PQresultStatus(res)),
     139            0 :                                           PQerrorMessage(conn));
     140            0 : }
     141              : 
     142              : /*
     143              :  * Check that the query on the given connection got canceled.
     144              :  */
     145              : #define consume_query_cancel(conn) consume_query_cancel_impl(__LINE__, conn)
     146              : static void
     147            0 : consume_query_cancel_impl(int line, PGconn *conn)
     148              : {
     149            0 :         PGresult   *res;
     150              : 
     151            0 :         res = confirm_result_status_impl(line, conn, PGRES_FATAL_ERROR);
     152            0 :         if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
     153            0 :                 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
     154            0 :                                           PQerrorMessage(conn));
     155            0 :         PQclear(res);
     156              : 
     157            0 :         while (PQisBusy(conn))
     158            0 :                 PQconsumeInput(conn);
     159            0 : }
     160              : 
     161              : /*
     162              :  * Using monitorConn, query pg_stat_activity to see that the connection with
     163              :  * the given PID is either in the given state, or waiting on the given event
     164              :  * (only one of them can be given).
     165              :  */
     166              : static void
     167            0 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
     168              :                                                   char *state, char *event)
     169              : {
     170            0 :         const Oid       paramTypes[] = {INT4OID, TEXTOID};
     171            0 :         const char *paramValues[2];
     172            0 :         char       *pidstr = psprintf("%d", procpid);
     173              : 
     174            0 :         Assert((state == NULL) ^ (event == NULL));
     175              : 
     176            0 :         paramValues[0] = pidstr;
     177            0 :         paramValues[1] = state ? state : event;
     178              : 
     179            0 :         while (true)
     180              :         {
     181            0 :                 PGresult   *res;
     182            0 :                 char       *value;
     183              : 
     184            0 :                 if (state != NULL)
     185            0 :                         res = PQexecParams(monitorConn,
     186              :                                                            "SELECT count(*) FROM pg_stat_activity WHERE "
     187              :                                                            "pid = $1 AND state = $2",
     188            0 :                                                            2, paramTypes, paramValues, NULL, NULL, 0);
     189              :                 else
     190            0 :                         res = PQexecParams(monitorConn,
     191              :                                                            "SELECT count(*) FROM pg_stat_activity WHERE "
     192              :                                                            "pid = $1 AND wait_event = $2",
     193            0 :                                                            2, paramTypes, paramValues, NULL, NULL, 0);
     194              : 
     195            0 :                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
     196            0 :                         pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
     197            0 :                 if (PQntuples(res) != 1)
     198            0 :                         pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
     199            0 :                 if (PQnfields(res) != 1)
     200            0 :                         pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
     201            0 :                 value = PQgetvalue(res, 0, 0);
     202            0 :                 if (strcmp(value, "0") != 0)
     203              :                 {
     204            0 :                         PQclear(res);
     205            0 :                         break;
     206              :                 }
     207            0 :                 PQclear(res);
     208              : 
     209              :                 /* wait 10ms before polling again */
     210            0 :                 pg_usleep(10000);
     211            0 :         }
     212              : 
     213            0 :         pfree(pidstr);
     214            0 : }
     215              : 
     216              : #define send_cancellable_query(conn, monitorConn) \
     217              :         send_cancellable_query_impl(__LINE__, conn, monitorConn)
     218              : static void
     219            0 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
     220              : {
     221            0 :         const char *env_wait;
     222            0 :         const Oid       paramTypes[1] = {INT4OID};
     223              : 
     224              :         /*
     225              :          * Wait for the connection to be idle, so that our check for an active
     226              :          * connection below is reliable, instead of possibly seeing an outdated
     227              :          * state.
     228              :          */
     229            0 :         wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
     230              : 
     231            0 :         env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
     232            0 :         if (env_wait == NULL)
     233            0 :                 env_wait = "180";
     234              : 
     235            0 :         if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
     236            0 :                                                   &env_wait, NULL, NULL, 0) != 1)
     237            0 :                 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
     238              : 
     239              :         /*
     240              :          * Wait for the sleep to be active, because if the query is not running
     241              :          * yet, the cancel request that we send won't have any effect.
     242              :          */
     243            0 :         wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
     244            0 : }
     245              : 
     246              : /*
     247              :  * Create a new connection with the same conninfo as the given one.
     248              :  */
     249              : static PGconn *
     250            0 : copy_connection(PGconn *conn)
     251              : {
     252            0 :         PGconn     *copyConn;
     253            0 :         PQconninfoOption *opts = PQconninfo(conn);
     254            0 :         const char **keywords;
     255            0 :         const char **vals;
     256            0 :         int                     nopts = 0;
     257            0 :         int                     i;
     258              : 
     259            0 :         for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     260            0 :                 nopts++;
     261            0 :         nopts++;                                        /* for the NULL terminator */
     262              : 
     263            0 :         keywords = pg_malloc(sizeof(char *) * nopts);
     264            0 :         vals = pg_malloc(sizeof(char *) * nopts);
     265              : 
     266            0 :         i = 0;
     267            0 :         for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     268              :         {
     269            0 :                 if (opt->val)
     270              :                 {
     271            0 :                         keywords[i] = opt->keyword;
     272            0 :                         vals[i] = opt->val;
     273            0 :                         i++;
     274            0 :                 }
     275            0 :         }
     276            0 :         keywords[i] = vals[i] = NULL;
     277              : 
     278            0 :         copyConn = PQconnectdbParams(keywords, vals, false);
     279              : 
     280            0 :         if (PQstatus(copyConn) != CONNECTION_OK)
     281            0 :                 pg_fatal("Connection to database failed: %s",
     282              :                                  PQerrorMessage(copyConn));
     283              : 
     284            0 :         pfree(keywords);
     285            0 :         pfree(vals);
     286            0 :         PQconninfoFree(opts);
     287              : 
     288            0 :         return copyConn;
     289            0 : }
     290              : 
     291              : /*
     292              :  * Test query cancellation routines
     293              :  */
     294              : static void
     295            0 : test_cancel(PGconn *conn)
     296              : {
     297            0 :         PGcancel   *cancel;
     298            0 :         PGcancelConn *cancelConn;
     299            0 :         PGconn     *monitorConn;
     300            0 :         char            errorbuf[256];
     301              : 
     302            0 :         fprintf(stderr, "test cancellations... ");
     303              : 
     304            0 :         if (PQsetnonblocking(conn, 1) != 0)
     305            0 :                 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
     306              : 
     307              :         /*
     308              :          * Make a separate connection to the database to monitor the query on the
     309              :          * main connection.
     310              :          */
     311            0 :         monitorConn = copy_connection(conn);
     312            0 :         Assert(PQstatus(monitorConn) == CONNECTION_OK);
     313              : 
     314              :         /* test PQcancel */
     315            0 :         send_cancellable_query(conn, monitorConn);
     316            0 :         cancel = PQgetCancel(conn);
     317            0 :         if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     318            0 :                 pg_fatal("failed to run PQcancel: %s", errorbuf);
     319            0 :         consume_query_cancel(conn);
     320              : 
     321              :         /* PGcancel object can be reused for the next query */
     322            0 :         send_cancellable_query(conn, monitorConn);
     323            0 :         if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     324            0 :                 pg_fatal("failed to run PQcancel: %s", errorbuf);
     325            0 :         consume_query_cancel(conn);
     326              : 
     327            0 :         PQfreeCancel(cancel);
     328              : 
     329              :         /* test PQrequestCancel */
     330            0 :         send_cancellable_query(conn, monitorConn);
     331            0 :         if (!PQrequestCancel(conn))
     332            0 :                 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
     333            0 :         consume_query_cancel(conn);
     334              : 
     335              :         /* test PQcancelBlocking */
     336            0 :         send_cancellable_query(conn, monitorConn);
     337            0 :         cancelConn = PQcancelCreate(conn);
     338            0 :         if (!PQcancelBlocking(cancelConn))
     339            0 :                 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
     340            0 :         consume_query_cancel(conn);
     341            0 :         PQcancelFinish(cancelConn);
     342              : 
     343              :         /* test PQcancelCreate and then polling with PQcancelPoll */
     344            0 :         send_cancellable_query(conn, monitorConn);
     345            0 :         cancelConn = PQcancelCreate(conn);
     346            0 :         if (!PQcancelStart(cancelConn))
     347            0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     348            0 :         while (true)
     349              :         {
     350            0 :                 struct timeval tv;
     351            0 :                 fd_set          input_mask;
     352            0 :                 fd_set          output_mask;
     353            0 :                 PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     354            0 :                 int                     sock = PQcancelSocket(cancelConn);
     355              : 
     356            0 :                 if (pollres == PGRES_POLLING_OK)
     357            0 :                         break;
     358              : 
     359            0 :                 FD_ZERO(&input_mask);
     360            0 :                 FD_ZERO(&output_mask);
     361            0 :                 switch (pollres)
     362              :                 {
     363              :                         case PGRES_POLLING_READING:
     364              :                                 pg_debug("polling for reads\n");
     365            0 :                                 FD_SET(sock, &input_mask);
     366            0 :                                 break;
     367              :                         case PGRES_POLLING_WRITING:
     368              :                                 pg_debug("polling for writes\n");
     369            0 :                                 FD_SET(sock, &output_mask);
     370            0 :                                 break;
     371              :                         default:
     372            0 :                                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     373              :                 }
     374              : 
     375            0 :                 if (sock < 0)
     376            0 :                         pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     377              : 
     378            0 :                 tv.tv_sec = 3;
     379            0 :                 tv.tv_usec = 0;
     380              : 
     381            0 :                 while (true)
     382              :                 {
     383            0 :                         if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     384              :                         {
     385            0 :                                 if (errno == EINTR)
     386            0 :                                         continue;
     387            0 :                                 pg_fatal("select() failed: %m");
     388              :                         }
     389            0 :                         break;
     390              :                 }
     391            0 :         }
     392            0 :         if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     393            0 :                 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     394            0 :         consume_query_cancel(conn);
     395              : 
     396              :         /*
     397              :          * test PQcancelReset works on the cancel connection and it can be reused
     398              :          * afterwards
     399              :          */
     400            0 :         PQcancelReset(cancelConn);
     401              : 
     402            0 :         send_cancellable_query(conn, monitorConn);
     403            0 :         if (!PQcancelStart(cancelConn))
     404            0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     405            0 :         while (true)
     406              :         {
     407            0 :                 struct timeval tv;
     408            0 :                 fd_set          input_mask;
     409            0 :                 fd_set          output_mask;
     410            0 :                 PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     411            0 :                 int                     sock = PQcancelSocket(cancelConn);
     412              : 
     413            0 :                 if (pollres == PGRES_POLLING_OK)
     414            0 :                         break;
     415              : 
     416            0 :                 FD_ZERO(&input_mask);
     417            0 :                 FD_ZERO(&output_mask);
     418            0 :                 switch (pollres)
     419              :                 {
     420              :                         case PGRES_POLLING_READING:
     421              :                                 pg_debug("polling for reads\n");
     422            0 :                                 FD_SET(sock, &input_mask);
     423            0 :                                 break;
     424              :                         case PGRES_POLLING_WRITING:
     425              :                                 pg_debug("polling for writes\n");
     426            0 :                                 FD_SET(sock, &output_mask);
     427            0 :                                 break;
     428              :                         default:
     429            0 :                                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     430              :                 }
     431              : 
     432            0 :                 if (sock < 0)
     433            0 :                         pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     434              : 
     435            0 :                 tv.tv_sec = 3;
     436            0 :                 tv.tv_usec = 0;
     437              : 
     438            0 :                 while (true)
     439              :                 {
     440            0 :                         if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     441              :                         {
     442            0 :                                 if (errno == EINTR)
     443            0 :                                         continue;
     444            0 :                                 pg_fatal("select() failed: %m");
     445              :                         }
     446            0 :                         break;
     447              :                 }
     448            0 :         }
     449            0 :         if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     450            0 :                 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     451            0 :         consume_query_cancel(conn);
     452              : 
     453            0 :         PQcancelFinish(cancelConn);
     454            0 :         PQfinish(monitorConn);
     455              : 
     456            0 :         fprintf(stderr, "ok\n");
     457            0 : }
     458              : 
     459              : static void
     460            0 : test_disallowed_in_pipeline(PGconn *conn)
     461              : {
     462            0 :         PGresult   *res = NULL;
     463              : 
     464            0 :         fprintf(stderr, "test error cases... ");
     465              : 
     466            0 :         if (PQisnonblocking(conn))
     467            0 :                 pg_fatal("Expected blocking connection mode");
     468              : 
     469            0 :         if (PQenterPipelineMode(conn) != 1)
     470            0 :                 pg_fatal("Unable to enter pipeline mode");
     471              : 
     472            0 :         if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     473            0 :                 pg_fatal("Pipeline mode not activated properly");
     474              : 
     475              :         /* PQexec should fail in pipeline mode */
     476            0 :         res = PQexec(conn, "SELECT 1");
     477            0 :         if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     478            0 :                 pg_fatal("PQexec should fail in pipeline mode but succeeded");
     479            0 :         if (strcmp(PQerrorMessage(conn),
     480            0 :                            "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
     481            0 :                 pg_fatal("did not get expected error message; got: \"%s\"",
     482              :                                  PQerrorMessage(conn));
     483            0 :         PQclear(res);
     484              : 
     485              :         /* PQsendQuery should fail in pipeline mode */
     486            0 :         if (PQsendQuery(conn, "SELECT 1") != 0)
     487            0 :                 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
     488            0 :         if (strcmp(PQerrorMessage(conn),
     489            0 :                            "PQsendQuery not allowed in pipeline mode\n") != 0)
     490            0 :                 pg_fatal("did not get expected error message; got: \"%s\"",
     491              :                                  PQerrorMessage(conn));
     492              : 
     493              :         /* Entering pipeline mode when already in pipeline mode is OK */
     494            0 :         if (PQenterPipelineMode(conn) != 1)
     495            0 :                 pg_fatal("re-entering pipeline mode should be a no-op but failed");
     496              : 
     497            0 :         if (PQisBusy(conn) != 0)
     498            0 :                 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
     499              : 
     500              :         /* ok, back to normal command mode */
     501            0 :         if (PQexitPipelineMode(conn) != 1)
     502            0 :                 pg_fatal("couldn't exit idle empty pipeline mode");
     503              : 
     504            0 :         if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     505            0 :                 pg_fatal("Pipeline mode not terminated properly");
     506              : 
     507              :         /* exiting pipeline mode when not in pipeline mode should be a no-op */
     508            0 :         if (PQexitPipelineMode(conn) != 1)
     509            0 :                 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
     510              : 
     511              :         /* can now PQexec again */
     512            0 :         res = PQexec(conn, "SELECT 1");
     513            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     514            0 :                 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
     515              :                                  PQerrorMessage(conn));
     516            0 :         PQclear(res);
     517              : 
     518            0 :         fprintf(stderr, "ok\n");
     519            0 : }
     520              : 
     521              : static void
     522            0 : test_multi_pipelines(PGconn *conn)
     523              : {
     524            0 :         const char *dummy_params[1] = {"1"};
     525            0 :         Oid                     dummy_param_oids[1] = {INT4OID};
     526              : 
     527            0 :         fprintf(stderr, "multi pipeline... ");
     528              : 
     529              :         /*
     530              :          * Queue up a couple of small pipelines and process each without returning
     531              :          * to command mode first.
     532              :          */
     533            0 :         if (PQenterPipelineMode(conn) != 1)
     534            0 :                 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     535              : 
     536              :         /* first pipeline */
     537            0 :         if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     538            0 :                                                   dummy_params, NULL, NULL, 0) != 1)
     539            0 :                 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
     540              : 
     541            0 :         if (PQpipelineSync(conn) != 1)
     542            0 :                 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     543              : 
     544              :         /* second pipeline */
     545            0 :         if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     546            0 :                                                   dummy_params, NULL, NULL, 0) != 1)
     547            0 :                 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
     548              : 
     549              :         /* Skip flushing once. */
     550            0 :         if (PQsendPipelineSync(conn) != 1)
     551            0 :                 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     552              : 
     553              :         /* third pipeline */
     554            0 :         if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     555            0 :                                                   dummy_params, NULL, NULL, 0) != 1)
     556            0 :                 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
     557              : 
     558            0 :         if (PQpipelineSync(conn) != 1)
     559            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     560              : 
     561              :         /* OK, start processing the results */
     562              : 
     563              :         /* first pipeline */
     564            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
     565              : 
     566            0 :         consume_null_result(conn);
     567              : 
     568            0 :         if (PQexitPipelineMode(conn) != 0)
     569            0 :                 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     570              : 
     571            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
     572              : 
     573              :         /* second pipeline */
     574            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
     575              : 
     576            0 :         consume_null_result(conn);
     577              : 
     578            0 :         if (PQexitPipelineMode(conn) != 0)
     579            0 :                 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     580              : 
     581            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
     582              : 
     583              :         /* third pipeline */
     584            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
     585              : 
     586            0 :         consume_null_result(conn);
     587              : 
     588            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
     589              : 
     590              :         /* We're still in pipeline mode ... */
     591            0 :         if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     592            0 :                 pg_fatal("Fell out of pipeline mode somehow");
     593              : 
     594              :         /* until we end it, which we can safely do now */
     595            0 :         if (PQexitPipelineMode(conn) != 1)
     596            0 :                 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     597              :                                  PQerrorMessage(conn));
     598              : 
     599            0 :         if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     600            0 :                 pg_fatal("exiting pipeline mode didn't seem to work");
     601              : 
     602            0 :         fprintf(stderr, "ok\n");
     603            0 : }
     604              : 
     605              : /*
     606              :  * Test behavior when a pipeline dispatches a number of commands that are
     607              :  * not flushed by a sync point.
     608              :  */
     609              : static void
     610            0 : test_nosync(PGconn *conn)
     611              : {
     612            0 :         int                     numqueries = 10;
     613            0 :         int                     results = 0;
     614            0 :         int                     sock = PQsocket(conn);
     615              : 
     616            0 :         fprintf(stderr, "nosync... ");
     617              : 
     618            0 :         if (sock < 0)
     619            0 :                 pg_fatal("invalid socket");
     620              : 
     621            0 :         if (PQenterPipelineMode(conn) != 1)
     622            0 :                 pg_fatal("could not enter pipeline mode");
     623            0 :         for (int i = 0; i < numqueries; i++)
     624              :         {
     625            0 :                 fd_set          input_mask;
     626            0 :                 struct timeval tv;
     627              : 
     628            0 :                 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
     629            0 :                                                           0, NULL, NULL, NULL, NULL, 0) != 1)
     630            0 :                         pg_fatal("error sending select: %s", PQerrorMessage(conn));
     631            0 :                 PQflush(conn);
     632              : 
     633              :                 /*
     634              :                  * If the server has written anything to us, read (some of) it now.
     635              :                  */
     636            0 :                 FD_ZERO(&input_mask);
     637            0 :                 FD_SET(sock, &input_mask);
     638            0 :                 tv.tv_sec = 0;
     639            0 :                 tv.tv_usec = 0;
     640            0 :                 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
     641              :                 {
     642            0 :                         fprintf(stderr, "select() failed: %m\n");
     643            0 :                         exit_nicely(conn);
     644            0 :                 }
     645            0 :                 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
     646            0 :                         pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
     647            0 :         }
     648              : 
     649              :         /* tell server to flush its output buffer */
     650            0 :         if (PQsendFlushRequest(conn) != 1)
     651            0 :                 pg_fatal("failed to send flush request");
     652            0 :         PQflush(conn);
     653              : 
     654              :         /* Now read all results */
     655            0 :         for (;;)
     656              :         {
     657              :                 /* We expect exactly one TUPLES_OK result for each query we sent */
     658            0 :                 consume_result_status(conn, PGRES_TUPLES_OK);
     659              : 
     660              :                 /* and one NULL result should follow each */
     661            0 :                 consume_null_result(conn);
     662              : 
     663            0 :                 results++;
     664              : 
     665              :                 /* if we're done, we're done */
     666            0 :                 if (results == numqueries)
     667            0 :                         break;
     668              :         }
     669              : 
     670            0 :         fprintf(stderr, "ok\n");
     671            0 : }
     672              : 
     673              : /*
     674              :  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
     675              :  * still have to get results for each pipeline item, but the item will just be
     676              :  * a PGRES_PIPELINE_ABORTED code.
     677              :  *
     678              :  * This intentionally doesn't use a transaction to wrap the pipeline. You should
     679              :  * usually use an xact, but in this case we want to observe the effects of each
     680              :  * statement.
     681              :  */
     682              : static void
     683            0 : test_pipeline_abort(PGconn *conn)
     684              : {
     685            0 :         PGresult   *res = NULL;
     686            0 :         const char *dummy_params[1] = {"1"};
     687            0 :         Oid                     dummy_param_oids[1] = {INT4OID};
     688            0 :         int                     i;
     689            0 :         int                     gotrows;
     690            0 :         bool            goterror;
     691              : 
     692            0 :         fprintf(stderr, "aborted pipeline... ");
     693              : 
     694            0 :         res = PQexec(conn, drop_table_sql);
     695            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     696            0 :                 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
     697            0 :         PQclear(res);
     698              : 
     699            0 :         res = PQexec(conn, create_table_sql);
     700            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     701            0 :                 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
     702            0 :         PQclear(res);
     703              : 
     704              :         /*
     705              :          * Queue up a couple of small pipelines and process each without returning
     706              :          * to command mode first. Make sure the second operation in the first
     707              :          * pipeline ERRORs.
     708              :          */
     709            0 :         if (PQenterPipelineMode(conn) != 1)
     710            0 :                 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     711              : 
     712            0 :         dummy_params[0] = "1";
     713            0 :         if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     714            0 :                                                   dummy_params, NULL, NULL, 0) != 1)
     715            0 :                 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
     716              : 
     717            0 :         if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
     718            0 :                                                   1, dummy_param_oids, dummy_params,
     719            0 :                                                   NULL, NULL, 0) != 1)
     720            0 :                 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
     721              : 
     722            0 :         dummy_params[0] = "2";
     723            0 :         if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     724            0 :                                                   dummy_params, NULL, NULL, 0) != 1)
     725            0 :                 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
     726              : 
     727            0 :         if (PQpipelineSync(conn) != 1)
     728            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     729              : 
     730            0 :         dummy_params[0] = "3";
     731            0 :         if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     732            0 :                                                   dummy_params, NULL, NULL, 0) != 1)
     733            0 :                 pg_fatal("dispatching second-pipeline insert failed: %s",
     734              :                                  PQerrorMessage(conn));
     735              : 
     736            0 :         if (PQpipelineSync(conn) != 1)
     737            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     738              : 
     739              :         /*
     740              :          * OK, start processing the pipeline results.
     741              :          *
     742              :          * We should get a command-ok for the first query, then a fatal error and
     743              :          * a pipeline aborted message for the second insert, a pipeline-end, then
     744              :          * a command-ok and a pipeline-ok for the second pipeline operation.
     745              :          */
     746            0 :         consume_result_status(conn, PGRES_COMMAND_OK);
     747              : 
     748              :         /* NULL result to signal end-of-results for this command */
     749            0 :         consume_null_result(conn);
     750              : 
     751              :         /* Second query caused error, so we expect an error next */
     752            0 :         consume_result_status(conn, PGRES_FATAL_ERROR);
     753              : 
     754              :         /* NULL result to signal end-of-results for this command */
     755            0 :         consume_null_result(conn);
     756              : 
     757              :         /*
     758              :          * pipeline should now be aborted.
     759              :          *
     760              :          * Note that we could still queue more queries at this point if we wanted;
     761              :          * they'd get added to a new third pipeline since we've already sent a
     762              :          * second. The aborted flag relates only to the pipeline being received.
     763              :          */
     764            0 :         if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     765            0 :                 pg_fatal("pipeline should be flagged as aborted but isn't");
     766              : 
     767              :         /* third query in pipeline, the second insert */
     768            0 :         consume_result_status(conn, PGRES_PIPELINE_ABORTED);
     769              : 
     770              :         /* NULL result to signal end-of-results for this command */
     771            0 :         consume_null_result(conn);
     772              : 
     773            0 :         if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     774            0 :                 pg_fatal("pipeline should be flagged as aborted but isn't");
     775              : 
     776              :         /* Ensure we're still in pipeline */
     777            0 :         if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     778            0 :                 pg_fatal("Fell out of pipeline mode somehow");
     779              : 
     780              :         /*
     781              :          * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
     782              :          *
     783              :          * (This is so clients know to start processing results normally again and
     784              :          * can tell the difference between skipped commands and the sync.)
     785              :          */
     786            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
     787              : 
     788            0 :         if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
     789            0 :                 pg_fatal("sync should've cleared the aborted flag but didn't");
     790              : 
     791              :         /* We're still in pipeline mode... */
     792            0 :         if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     793            0 :                 pg_fatal("Fell out of pipeline mode somehow");
     794              : 
     795              :         /* the insert from the second pipeline */
     796            0 :         consume_result_status(conn, PGRES_COMMAND_OK);
     797              : 
     798              :         /* Read the NULL result at the end of the command */
     799            0 :         consume_null_result(conn);
     800              : 
     801              :         /* the second pipeline sync */
     802            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
     803              : 
     804              :         /* Read the NULL result at the end of the command */
     805            0 :         consume_null_result(conn);
     806              : 
     807              :         /* Try to send two queries in one command */
     808            0 :         if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
     809            0 :                 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     810            0 :         if (PQpipelineSync(conn) != 1)
     811            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     812            0 :         goterror = false;
     813            0 :         while ((res = PQgetResult(conn)) != NULL)
     814              :         {
     815            0 :                 switch (PQresultStatus(res))
     816              :                 {
     817              :                         case PGRES_FATAL_ERROR:
     818            0 :                                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
     819            0 :                                         pg_fatal("expected error about multiple commands, got %s",
     820              :                                                          PQerrorMessage(conn));
     821            0 :                                 printf("got expected %s", PQerrorMessage(conn));
     822            0 :                                 goterror = true;
     823            0 :                                 break;
     824              :                         default:
     825            0 :                                 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
     826              :                                 break;
     827              :                 }
     828            0 :                 PQclear(res);
     829              :         }
     830            0 :         if (!goterror)
     831            0 :                 pg_fatal("did not get cannot-insert-multiple-commands error");
     832              : 
     833              :         /* the second pipeline sync */
     834            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
     835              : 
     836            0 :         fprintf(stderr, "ok\n");
     837              : 
     838              :         /* Test single-row mode with an error partways */
     839            0 :         if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
     840            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
     841            0 :                 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     842            0 :         if (PQpipelineSync(conn) != 1)
     843            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     844            0 :         PQsetSingleRowMode(conn);
     845            0 :         goterror = false;
     846            0 :         gotrows = 0;
     847            0 :         while ((res = PQgetResult(conn)) != NULL)
     848              :         {
     849            0 :                 switch (PQresultStatus(res))
     850              :                 {
     851              :                         case PGRES_SINGLE_TUPLE:
     852            0 :                                 printf("got row: %s\n", PQgetvalue(res, 0, 0));
     853            0 :                                 gotrows++;
     854            0 :                                 break;
     855              :                         case PGRES_FATAL_ERROR:
     856            0 :                                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
     857            0 :                                         pg_fatal("expected division-by-zero, got: %s (%s)",
     858              :                                                          PQerrorMessage(conn),
     859              :                                                          PQresultErrorField(res, PG_DIAG_SQLSTATE));
     860            0 :                                 printf("got expected division-by-zero\n");
     861            0 :                                 goterror = true;
     862            0 :                                 break;
     863              :                         default:
     864            0 :                                 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
     865              :                 }
     866            0 :                 PQclear(res);
     867              :         }
     868            0 :         if (!goterror)
     869            0 :                 pg_fatal("did not get division-by-zero error");
     870            0 :         if (gotrows != 3)
     871            0 :                 pg_fatal("did not get three rows");
     872              : 
     873              :         /* the third pipeline sync */
     874            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
     875              : 
     876              :         /* We're still in pipeline mode... */
     877            0 :         if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     878            0 :                 pg_fatal("Fell out of pipeline mode somehow");
     879              : 
     880              :         /* until we end it, which we can safely do now */
     881            0 :         if (PQexitPipelineMode(conn) != 1)
     882            0 :                 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     883              :                                  PQerrorMessage(conn));
     884              : 
     885            0 :         if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     886            0 :                 pg_fatal("exiting pipeline mode didn't seem to work");
     887              : 
     888              :         /*-
     889              :          * Since we fired the pipelines off without a surrounding xact, the results
     890              :          * should be:
     891              :          *
     892              :          * - Implicit xact started by server around 1st pipeline
     893              :          * - First insert applied
     894              :          * - Second statement aborted xact
     895              :          * - Third insert skipped
     896              :          * - Sync rolled back first implicit xact
     897              :          * - Implicit xact created by server around 2nd pipeline
     898              :          * - insert applied from 2nd pipeline
     899              :          * - Sync commits 2nd xact
     900              :          *
     901              :          * So we should only have the value 3 that we inserted.
     902              :          */
     903            0 :         res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
     904              : 
     905            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     906            0 :                 pg_fatal("Expected tuples, got %s: %s",
     907              :                                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     908            0 :         if (PQntuples(res) != 1)
     909            0 :                 pg_fatal("expected 1 result, got %d", PQntuples(res));
     910            0 :         for (i = 0; i < PQntuples(res); i++)
     911              :         {
     912            0 :                 const char *val = PQgetvalue(res, i, 0);
     913              : 
     914            0 :                 if (strcmp(val, "3") != 0)
     915            0 :                         pg_fatal("expected only insert with value 3, got %s", val);
     916            0 :         }
     917              : 
     918            0 :         PQclear(res);
     919              : 
     920            0 :         fprintf(stderr, "ok\n");
     921            0 : }
     922              : 
     923              : /* State machine enum for test_pipelined_insert */
     924              : enum PipelineInsertStep
     925              : {
     926              :         BI_BEGIN_TX,
     927              :         BI_DROP_TABLE,
     928              :         BI_CREATE_TABLE,
     929              :         BI_PREPARE,
     930              :         BI_INSERT_ROWS,
     931              :         BI_COMMIT_TX,
     932              :         BI_SYNC,
     933              :         BI_DONE,
     934              : };
     935              : 
     936              : static void
     937            0 : test_pipelined_insert(PGconn *conn, int n_rows)
     938              : {
     939            0 :         Oid                     insert_param_oids[2] = {INT4OID, INT8OID};
     940            0 :         const char *insert_params[2];
     941            0 :         char            insert_param_0[MAXINTLEN];
     942            0 :         char            insert_param_1[MAXINT8LEN];
     943            0 :         enum PipelineInsertStep send_step = BI_BEGIN_TX,
     944            0 :                                 recv_step = BI_BEGIN_TX;
     945            0 :         int                     rows_to_send,
     946              :                                 rows_to_receive;
     947              : 
     948            0 :         insert_params[0] = insert_param_0;
     949            0 :         insert_params[1] = insert_param_1;
     950              : 
     951            0 :         rows_to_send = rows_to_receive = n_rows;
     952              : 
     953              :         /*
     954              :          * Do a pipelined insert into a table created at the start of the pipeline
     955              :          */
     956            0 :         if (PQenterPipelineMode(conn) != 1)
     957            0 :                 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     958              : 
     959            0 :         while (send_step != BI_PREPARE)
     960              :         {
     961            0 :                 const char *sql;
     962              : 
     963            0 :                 switch (send_step)
     964              :                 {
     965              :                         case BI_BEGIN_TX:
     966            0 :                                 sql = "BEGIN TRANSACTION";
     967            0 :                                 send_step = BI_DROP_TABLE;
     968            0 :                                 break;
     969              : 
     970              :                         case BI_DROP_TABLE:
     971            0 :                                 sql = drop_table_sql;
     972            0 :                                 send_step = BI_CREATE_TABLE;
     973            0 :                                 break;
     974              : 
     975              :                         case BI_CREATE_TABLE:
     976            0 :                                 sql = create_table_sql;
     977            0 :                                 send_step = BI_PREPARE;
     978            0 :                                 break;
     979              : 
     980              :                         default:
     981            0 :                                 pg_fatal("invalid state");
     982              :                                 sql = NULL;             /* keep compiler quiet */
     983              :                 }
     984              : 
     985              :                 pg_debug("sending: %s\n", sql);
     986            0 :                 if (PQsendQueryParams(conn, sql,
     987            0 :                                                           0, NULL, NULL, NULL, NULL, 0) != 1)
     988            0 :                         pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
     989            0 :         }
     990              : 
     991            0 :         Assert(send_step == BI_PREPARE);
     992              :         pg_debug("sending: %s\n", insert_sql2);
     993            0 :         if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
     994            0 :                 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
     995            0 :         send_step = BI_INSERT_ROWS;
     996              : 
     997              :         /*
     998              :          * Now we start inserting. We'll be sending enough data that we could fill
     999              :          * our output buffer, so to avoid deadlocking we need to enter nonblocking
    1000              :          * mode and consume input while we send more output. As results of each
    1001              :          * query are processed we should pop them to allow processing of the next
    1002              :          * query. There's no need to finish the pipeline before processing
    1003              :          * results.
    1004              :          */
    1005            0 :         if (PQsetnonblocking(conn, 1) != 0)
    1006            0 :                 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
    1007              : 
    1008            0 :         while (recv_step != BI_DONE)
    1009              :         {
    1010            0 :                 int                     sock;
    1011            0 :                 fd_set          input_mask;
    1012            0 :                 fd_set          output_mask;
    1013              : 
    1014            0 :                 sock = PQsocket(conn);
    1015              : 
    1016            0 :                 if (sock < 0)
    1017            0 :                         break;                          /* shouldn't happen */
    1018              : 
    1019            0 :                 FD_ZERO(&input_mask);
    1020            0 :                 FD_SET(sock, &input_mask);
    1021            0 :                 FD_ZERO(&output_mask);
    1022            0 :                 FD_SET(sock, &output_mask);
    1023              : 
    1024            0 :                 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
    1025              :                 {
    1026            0 :                         fprintf(stderr, "select() failed: %m\n");
    1027            0 :                         exit_nicely(conn);
    1028            0 :                 }
    1029              : 
    1030              :                 /*
    1031              :                  * Process any results, so we keep the server's output buffer free
    1032              :                  * flowing and it can continue to process input
    1033              :                  */
    1034            0 :                 if (FD_ISSET(sock, &input_mask))
    1035              :                 {
    1036            0 :                         PQconsumeInput(conn);
    1037              : 
    1038              :                         /* Read until we'd block if we tried to read */
    1039            0 :                         while (!PQisBusy(conn) && recv_step < BI_DONE)
    1040              :                         {
    1041            0 :                                 PGresult   *res;
    1042            0 :                                 const char *cmdtag = "";
    1043            0 :                                 const char *description = "";
    1044            0 :                                 int                     status;
    1045              : 
    1046              :                                 /*
    1047              :                                  * Read next result.  If no more results from this query,
    1048              :                                  * advance to the next query
    1049              :                                  */
    1050            0 :                                 res = PQgetResult(conn);
    1051            0 :                                 if (res == NULL)
    1052            0 :                                         continue;
    1053              : 
    1054            0 :                                 status = PGRES_COMMAND_OK;
    1055            0 :                                 switch (recv_step)
    1056              :                                 {
    1057              :                                         case BI_BEGIN_TX:
    1058            0 :                                                 cmdtag = "BEGIN";
    1059            0 :                                                 recv_step++;
    1060            0 :                                                 break;
    1061              :                                         case BI_DROP_TABLE:
    1062            0 :                                                 cmdtag = "DROP TABLE";
    1063            0 :                                                 recv_step++;
    1064            0 :                                                 break;
    1065              :                                         case BI_CREATE_TABLE:
    1066            0 :                                                 cmdtag = "CREATE TABLE";
    1067            0 :                                                 recv_step++;
    1068            0 :                                                 break;
    1069              :                                         case BI_PREPARE:
    1070            0 :                                                 cmdtag = "";
    1071            0 :                                                 description = "PREPARE";
    1072            0 :                                                 recv_step++;
    1073            0 :                                                 break;
    1074              :                                         case BI_INSERT_ROWS:
    1075            0 :                                                 cmdtag = "INSERT";
    1076            0 :                                                 rows_to_receive--;
    1077            0 :                                                 if (rows_to_receive == 0)
    1078            0 :                                                         recv_step++;
    1079            0 :                                                 break;
    1080              :                                         case BI_COMMIT_TX:
    1081            0 :                                                 cmdtag = "COMMIT";
    1082            0 :                                                 recv_step++;
    1083            0 :                                                 break;
    1084              :                                         case BI_SYNC:
    1085            0 :                                                 cmdtag = "";
    1086            0 :                                                 description = "SYNC";
    1087            0 :                                                 status = PGRES_PIPELINE_SYNC;
    1088            0 :                                                 recv_step++;
    1089            0 :                                                 break;
    1090              :                                         case BI_DONE:
    1091              :                                                 /* unreachable */
    1092            0 :                                                 pg_fatal("unreachable state");
    1093              :                                 }
    1094              : 
    1095            0 :                                 if (PQresultStatus(res) != status)
    1096            0 :                                         pg_fatal("%s reported status %s, expected %s\n"
    1097              :                                                          "Error message: \"%s\"",
    1098              :                                                          description, PQresStatus(PQresultStatus(res)),
    1099              :                                                          PQresStatus(status), PQerrorMessage(conn));
    1100              : 
    1101            0 :                                 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
    1102            0 :                                         pg_fatal("%s expected command tag '%s', got '%s'",
    1103              :                                                          description, cmdtag, PQcmdStatus(res));
    1104              : 
    1105              :                                 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
    1106              : 
    1107            0 :                                 PQclear(res);
    1108            0 :                         }
    1109            0 :                 }
    1110              : 
    1111              :                 /* Write more rows and/or the end pipeline message, if needed */
    1112            0 :                 if (FD_ISSET(sock, &output_mask))
    1113              :                 {
    1114            0 :                         PQflush(conn);
    1115              : 
    1116            0 :                         if (send_step == BI_INSERT_ROWS)
    1117              :                         {
    1118            0 :                                 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
    1119              :                                 /* use up some buffer space with a wide value */
    1120            0 :                                 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
    1121              : 
    1122            0 :                                 if (PQsendQueryPrepared(conn, "my_insert",
    1123            0 :                                                                                 2, insert_params, NULL, NULL, 0) == 1)
    1124              :                                 {
    1125              :                                         pg_debug("sent row %d\n", rows_to_send);
    1126              : 
    1127            0 :                                         rows_to_send--;
    1128            0 :                                         if (rows_to_send == 0)
    1129            0 :                                                 send_step++;
    1130            0 :                                 }
    1131              :                                 else
    1132              :                                 {
    1133              :                                         /*
    1134              :                                          * in nonblocking mode, so it's OK for an insert to fail
    1135              :                                          * to send
    1136              :                                          */
    1137            0 :                                         fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
    1138            0 :                                                         rows_to_send, PQerrorMessage(conn));
    1139              :                                 }
    1140            0 :                         }
    1141            0 :                         else if (send_step == BI_COMMIT_TX)
    1142              :                         {
    1143            0 :                                 if (PQsendQueryParams(conn, "COMMIT",
    1144            0 :                                                                           0, NULL, NULL, NULL, NULL, 0) == 1)
    1145              :                                 {
    1146              :                                         pg_debug("sent COMMIT\n");
    1147            0 :                                         send_step++;
    1148            0 :                                 }
    1149              :                                 else
    1150              :                                 {
    1151            0 :                                         fprintf(stderr, "WARNING: failed to send commit: %s\n",
    1152            0 :                                                         PQerrorMessage(conn));
    1153              :                                 }
    1154            0 :                         }
    1155            0 :                         else if (send_step == BI_SYNC)
    1156              :                         {
    1157            0 :                                 if (PQpipelineSync(conn) == 1)
    1158              :                                 {
    1159            0 :                                         fprintf(stdout, "pipeline sync sent\n");
    1160            0 :                                         send_step++;
    1161            0 :                                 }
    1162              :                                 else
    1163              :                                 {
    1164            0 :                                         fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
    1165            0 :                                                         PQerrorMessage(conn));
    1166              :                                 }
    1167            0 :                         }
    1168            0 :                 }
    1169            0 :         }
    1170              : 
    1171              :         /* We've got the sync message and the pipeline should be done */
    1172            0 :         if (PQexitPipelineMode(conn) != 1)
    1173            0 :                 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1174              :                                  PQerrorMessage(conn));
    1175              : 
    1176            0 :         if (PQsetnonblocking(conn, 0) != 0)
    1177            0 :                 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
    1178              : 
    1179            0 :         fprintf(stderr, "ok\n");
    1180            0 : }
    1181              : 
    1182              : static void
    1183            0 : test_prepared(PGconn *conn)
    1184              : {
    1185            0 :         PGresult   *res = NULL;
    1186            0 :         Oid                     param_oids[1] = {INT4OID};
    1187            0 :         Oid                     expected_oids[4];
    1188            0 :         Oid                     typ;
    1189              : 
    1190            0 :         fprintf(stderr, "prepared... ");
    1191              : 
    1192            0 :         if (PQenterPipelineMode(conn) != 1)
    1193            0 :                 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1194            0 :         if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
    1195              :                                           "interval '1 sec'",
    1196            0 :                                           1, param_oids) != 1)
    1197            0 :                 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
    1198            0 :         expected_oids[0] = INT4OID;
    1199            0 :         expected_oids[1] = TEXTOID;
    1200            0 :         expected_oids[2] = NUMERICOID;
    1201            0 :         expected_oids[3] = INTERVALOID;
    1202            0 :         if (PQsendDescribePrepared(conn, "select_one") != 1)
    1203            0 :                 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
    1204            0 :         if (PQpipelineSync(conn) != 1)
    1205            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1206              : 
    1207            0 :         consume_result_status(conn, PGRES_COMMAND_OK);
    1208              : 
    1209            0 :         consume_null_result(conn);
    1210              : 
    1211            0 :         res = confirm_result_status(conn, PGRES_COMMAND_OK);
    1212            0 :         if (PQnfields(res) != lengthof(expected_oids))
    1213            0 :                 pg_fatal("expected %zu columns, got %d",
    1214              :                                  lengthof(expected_oids), PQnfields(res));
    1215            0 :         for (int i = 0; i < PQnfields(res); i++)
    1216              :         {
    1217            0 :                 typ = PQftype(res, i);
    1218            0 :                 if (typ != expected_oids[i])
    1219            0 :                         pg_fatal("field %d: expected type %u, got %u",
    1220              :                                          i, expected_oids[i], typ);
    1221            0 :         }
    1222            0 :         PQclear(res);
    1223              : 
    1224            0 :         consume_null_result(conn);
    1225              : 
    1226            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1227              : 
    1228            0 :         fprintf(stderr, "closing statement..");
    1229            0 :         if (PQsendClosePrepared(conn, "select_one") != 1)
    1230            0 :                 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
    1231            0 :         if (PQpipelineSync(conn) != 1)
    1232            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1233              : 
    1234            0 :         consume_result_status(conn, PGRES_COMMAND_OK);
    1235              : 
    1236            0 :         consume_null_result(conn);
    1237              : 
    1238            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1239              : 
    1240            0 :         if (PQexitPipelineMode(conn) != 1)
    1241            0 :                 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1242              : 
    1243              :         /* Now that it's closed we should get an error when describing */
    1244            0 :         res = PQdescribePrepared(conn, "select_one");
    1245            0 :         if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1246            0 :                 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1247            0 :         PQclear(res);
    1248              : 
    1249              :         /*
    1250              :          * Also test the blocking close, this should not fail since closing a
    1251              :          * non-existent prepared statement is a no-op
    1252              :          */
    1253            0 :         res = PQclosePrepared(conn, "select_one");
    1254            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1255            0 :                 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1256            0 :         PQclear(res);
    1257              : 
    1258            0 :         fprintf(stderr, "creating portal... ");
    1259              : 
    1260            0 :         res = PQexec(conn, "BEGIN");
    1261            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1262            0 :                 pg_fatal("BEGIN failed: %s", PQerrorMessage(conn));
    1263            0 :         PQclear(res);
    1264              : 
    1265            0 :         res = PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
    1266            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1267            0 :                 pg_fatal("DECLARE CURSOR failed: %s", PQerrorMessage(conn));
    1268            0 :         PQclear(res);
    1269              : 
    1270            0 :         PQenterPipelineMode(conn);
    1271            0 :         if (PQsendDescribePortal(conn, "cursor_one") != 1)
    1272            0 :                 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
    1273            0 :         if (PQpipelineSync(conn) != 1)
    1274            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1275              : 
    1276            0 :         res = confirm_result_status(conn, PGRES_COMMAND_OK);
    1277            0 :         typ = PQftype(res, 0);
    1278            0 :         if (typ != INT4OID)
    1279            0 :                 pg_fatal("portal: expected type %u, got %u",
    1280              :                                  INT4OID, typ);
    1281            0 :         PQclear(res);
    1282              : 
    1283            0 :         consume_null_result(conn);
    1284              : 
    1285            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1286              : 
    1287            0 :         fprintf(stderr, "closing portal... ");
    1288            0 :         if (PQsendClosePortal(conn, "cursor_one") != 1)
    1289            0 :                 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
    1290            0 :         if (PQpipelineSync(conn) != 1)
    1291            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1292              : 
    1293            0 :         consume_result_status(conn, PGRES_COMMAND_OK);
    1294              : 
    1295            0 :         consume_null_result(conn);
    1296              : 
    1297            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1298              : 
    1299            0 :         if (PQexitPipelineMode(conn) != 1)
    1300            0 :                 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1301              : 
    1302              :         /* Now that it's closed we should get an error when describing */
    1303            0 :         res = PQdescribePortal(conn, "cursor_one");
    1304            0 :         if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1305            0 :                 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1306            0 :         PQclear(res);
    1307              : 
    1308              :         /*
    1309              :          * Also test the blocking close, this should not fail since closing a
    1310              :          * non-existent portal is a no-op
    1311              :          */
    1312            0 :         res = PQclosePortal(conn, "cursor_one");
    1313            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1314            0 :                 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1315            0 :         PQclear(res);
    1316              : 
    1317            0 :         fprintf(stderr, "ok\n");
    1318            0 : }
    1319              : 
    1320              : /*
    1321              :  * Test max_protocol_version options.
    1322              :  */
    1323              : static void
    1324            0 : test_protocol_version(PGconn *conn)
    1325              : {
    1326            0 :         const char **keywords;
    1327            0 :         const char **vals;
    1328            0 :         int                     nopts;
    1329            0 :         PQconninfoOption *opts = PQconninfo(conn);
    1330            0 :         int                     protocol_version;
    1331            0 :         int                     max_protocol_version_index = -1;
    1332            0 :         int                     i;
    1333              : 
    1334              :         /* Prepare keywords/vals arrays, copied from the existing connection. */
    1335            0 :         nopts = 0;
    1336            0 :         for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
    1337            0 :                 nopts++;
    1338            0 :         nopts++;                                        /* NULL terminator */
    1339              : 
    1340            0 :         keywords = pg_malloc0(sizeof(char *) * nopts);
    1341            0 :         vals = pg_malloc0(sizeof(char *) * nopts);
    1342              : 
    1343            0 :         i = 0;
    1344            0 :         for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
    1345              :         {
    1346              :                 /*
    1347              :                  * If the test already specified max_protocol_version, we want to
    1348              :                  * replace it rather than attempting to override it. This matters when
    1349              :                  * testing defaults, because empty option values at the end of the
    1350              :                  * connection string won't replace earlier settings.
    1351              :                  */
    1352            0 :                 if (strcmp(opt->keyword, "max_protocol_version") == 0)
    1353            0 :                         max_protocol_version_index = i;
    1354            0 :                 else if (!opt->val)
    1355            0 :                         continue;
    1356              : 
    1357            0 :                 keywords[i] = opt->keyword;
    1358            0 :                 vals[i] = opt->val;
    1359              : 
    1360            0 :                 i++;
    1361            0 :         }
    1362              : 
    1363            0 :         Assert(max_protocol_version_index >= 0);
    1364              : 
    1365              :         /*
    1366              :          * Test default protocol_version
    1367              :          */
    1368            0 :         vals[max_protocol_version_index] = "";
    1369            0 :         conn = PQconnectdbParams(keywords, vals, false);
    1370              : 
    1371            0 :         if (PQstatus(conn) != CONNECTION_OK)
    1372            0 :                 pg_fatal("Connection to database failed: %s",
    1373              :                                  PQerrorMessage(conn));
    1374              : 
    1375            0 :         protocol_version = PQfullProtocolVersion(conn);
    1376            0 :         if (protocol_version != 30000)
    1377            0 :                 pg_fatal("expected 30000, got %d", protocol_version);
    1378              : 
    1379            0 :         PQfinish(conn);
    1380              : 
    1381              :         /*
    1382              :          * Test max_protocol_version=3.0
    1383              :          */
    1384            0 :         vals[max_protocol_version_index] = "3.0";
    1385            0 :         conn = PQconnectdbParams(keywords, vals, false);
    1386              : 
    1387            0 :         if (PQstatus(conn) != CONNECTION_OK)
    1388            0 :                 pg_fatal("Connection to database failed: %s",
    1389              :                                  PQerrorMessage(conn));
    1390              : 
    1391            0 :         protocol_version = PQfullProtocolVersion(conn);
    1392            0 :         if (protocol_version != 30000)
    1393            0 :                 pg_fatal("expected 30000, got %d", protocol_version);
    1394              : 
    1395            0 :         PQfinish(conn);
    1396              : 
    1397              :         /*
    1398              :          * Test max_protocol_version=3.1. It's not valid, we went straight from
    1399              :          * 3.0 to 3.2.
    1400              :          */
    1401            0 :         vals[max_protocol_version_index] = "3.1";
    1402            0 :         conn = PQconnectdbParams(keywords, vals, false);
    1403              : 
    1404            0 :         if (PQstatus(conn) != CONNECTION_BAD)
    1405            0 :                 pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
    1406              : 
    1407            0 :         PQfinish(conn);
    1408              : 
    1409              :         /*
    1410              :          * Test max_protocol_version=3.2
    1411              :          */
    1412            0 :         vals[max_protocol_version_index] = "3.2";
    1413            0 :         conn = PQconnectdbParams(keywords, vals, false);
    1414              : 
    1415            0 :         if (PQstatus(conn) != CONNECTION_OK)
    1416            0 :                 pg_fatal("Connection to database failed: %s",
    1417              :                                  PQerrorMessage(conn));
    1418              : 
    1419            0 :         protocol_version = PQfullProtocolVersion(conn);
    1420            0 :         if (protocol_version != 30002)
    1421            0 :                 pg_fatal("expected 30002, got %d", protocol_version);
    1422              : 
    1423            0 :         PQfinish(conn);
    1424              : 
    1425              :         /*
    1426              :          * Test max_protocol_version=latest. 'latest' currently means '3.2'.
    1427              :          */
    1428            0 :         vals[max_protocol_version_index] = "latest";
    1429            0 :         conn = PQconnectdbParams(keywords, vals, false);
    1430              : 
    1431            0 :         if (PQstatus(conn) != CONNECTION_OK)
    1432            0 :                 pg_fatal("Connection to database failed: %s",
    1433              :                                  PQerrorMessage(conn));
    1434              : 
    1435            0 :         protocol_version = PQfullProtocolVersion(conn);
    1436            0 :         if (protocol_version != 30002)
    1437            0 :                 pg_fatal("expected 30002, got %d", protocol_version);
    1438              : 
    1439            0 :         PQfinish(conn);
    1440              : 
    1441            0 :         pfree(keywords);
    1442            0 :         pfree(vals);
    1443            0 :         PQconninfoFree(opts);
    1444            0 : }
    1445              : 
    1446              : /* Notice processor: print notices, and count how many we got */
    1447              : static void
    1448            0 : notice_processor(void *arg, const char *message)
    1449              : {
    1450            0 :         int                *n_notices = (int *) arg;
    1451              : 
    1452            0 :         (*n_notices)++;
    1453            0 :         fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
    1454            0 : }
    1455              : 
    1456              : /* Verify behavior in "idle" state */
    1457              : static void
    1458            0 : test_pipeline_idle(PGconn *conn)
    1459              : {
    1460            0 :         int                     n_notices = 0;
    1461              : 
    1462            0 :         fprintf(stderr, "\npipeline idle...\n");
    1463              : 
    1464            0 :         PQsetNoticeProcessor(conn, notice_processor, &n_notices);
    1465              : 
    1466              :         /* Try to exit pipeline mode in pipeline-idle state */
    1467            0 :         if (PQenterPipelineMode(conn) != 1)
    1468            0 :                 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1469            0 :         if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1470            0 :                 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1471            0 :         PQsendFlushRequest(conn);
    1472              : 
    1473            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
    1474              : 
    1475            0 :         consume_null_result(conn);
    1476              : 
    1477            0 :         if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1478            0 :                 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1479            0 :         if (PQexitPipelineMode(conn) == 1)
    1480            0 :                 pg_fatal("exiting pipeline succeeded when it shouldn't");
    1481            0 :         if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
    1482            0 :                                 strlen("cannot exit pipeline mode")) != 0)
    1483            0 :                 pg_fatal("did not get expected error; got: %s",
    1484              :                                  PQerrorMessage(conn));
    1485            0 :         PQsendFlushRequest(conn);
    1486              : 
    1487            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
    1488              : 
    1489            0 :         consume_null_result(conn);
    1490              : 
    1491            0 :         if (PQexitPipelineMode(conn) != 1)
    1492            0 :                 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
    1493              : 
    1494            0 :         if (n_notices > 0)
    1495            0 :                 pg_fatal("got %d notice(s)", n_notices);
    1496            0 :         fprintf(stderr, "ok - 1\n");
    1497              : 
    1498              :         /* Have a WARNING in the middle of a resultset */
    1499            0 :         if (PQenterPipelineMode(conn) != 1)
    1500            0 :                 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
    1501            0 :         if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1502            0 :                 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1503            0 :         PQsendFlushRequest(conn);
    1504              : 
    1505            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
    1506              : 
    1507            0 :         if (PQexitPipelineMode(conn) != 1)
    1508            0 :                 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
    1509            0 :         fprintf(stderr, "ok - 2\n");
    1510            0 : }
    1511              : 
    1512              : static void
    1513            0 : test_simple_pipeline(PGconn *conn)
    1514              : {
    1515            0 :         const char *dummy_params[1] = {"1"};
    1516            0 :         Oid                     dummy_param_oids[1] = {INT4OID};
    1517              : 
    1518            0 :         fprintf(stderr, "simple pipeline... ");
    1519              : 
    1520              :         /*
    1521              :          * Enter pipeline mode and dispatch a set of operations, which we'll then
    1522              :          * process the results of as they come in.
    1523              :          *
    1524              :          * For a simple case we should be able to do this without interim
    1525              :          * processing of results since our output buffer will give us enough slush
    1526              :          * to work with and we won't block on sending. So blocking mode is fine.
    1527              :          */
    1528            0 :         if (PQisnonblocking(conn))
    1529            0 :                 pg_fatal("Expected blocking connection mode");
    1530              : 
    1531            0 :         if (PQenterPipelineMode(conn) != 1)
    1532            0 :                 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1533              : 
    1534            0 :         if (PQsendQueryParams(conn, "SELECT $1",
    1535            0 :                                                   1, dummy_param_oids, dummy_params,
    1536            0 :                                                   NULL, NULL, 0) != 1)
    1537            0 :                 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
    1538              : 
    1539            0 :         if (PQexitPipelineMode(conn) != 0)
    1540            0 :                 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
    1541              : 
    1542            0 :         if (PQpipelineSync(conn) != 1)
    1543            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1544              : 
    1545            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
    1546              : 
    1547            0 :         consume_null_result(conn);
    1548              : 
    1549              :         /*
    1550              :          * Even though we've processed the result there's still a sync to come and
    1551              :          * we can't exit pipeline mode yet
    1552              :          */
    1553            0 :         if (PQexitPipelineMode(conn) != 0)
    1554            0 :                 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
    1555              : 
    1556            0 :         consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1557              : 
    1558            0 :         consume_null_result(conn);
    1559              : 
    1560              :         /* We're still in pipeline mode... */
    1561            0 :         if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
    1562            0 :                 pg_fatal("Fell out of pipeline mode somehow");
    1563              : 
    1564              :         /* ... until we end it, which we can safely do now */
    1565            0 :         if (PQexitPipelineMode(conn) != 1)
    1566            0 :                 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1567              :                                  PQerrorMessage(conn));
    1568              : 
    1569            0 :         if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
    1570            0 :                 pg_fatal("Exiting pipeline mode didn't seem to work");
    1571              : 
    1572            0 :         fprintf(stderr, "ok\n");
    1573            0 : }
    1574              : 
    1575              : static void
    1576            0 : test_singlerowmode(PGconn *conn)
    1577              : {
    1578            0 :         PGresult   *res;
    1579            0 :         int                     i;
    1580            0 :         bool            pipeline_ended = false;
    1581              : 
    1582            0 :         if (PQenterPipelineMode(conn) != 1)
    1583            0 :                 pg_fatal("failed to enter pipeline mode: %s",
    1584              :                                  PQerrorMessage(conn));
    1585              : 
    1586              :         /* One series of three commands, using single-row mode for the first two. */
    1587            0 :         for (i = 0; i < 3; i++)
    1588              :         {
    1589            0 :                 char       *param[1];
    1590              : 
    1591            0 :                 param[0] = psprintf("%d", 44 + i);
    1592              : 
    1593            0 :                 if (PQsendQueryParams(conn,
    1594              :                                                           "SELECT generate_series(42, $1)",
    1595              :                                                           1,
    1596              :                                                           NULL,
    1597            0 :                                                           (const char **) param,
    1598              :                                                           NULL,
    1599              :                                                           NULL,
    1600            0 :                                                           0) != 1)
    1601            0 :                         pg_fatal("failed to send query: %s",
    1602              :                                          PQerrorMessage(conn));
    1603            0 :                 pfree(param[0]);
    1604            0 :         }
    1605            0 :         if (PQpipelineSync(conn) != 1)
    1606            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1607              : 
    1608            0 :         for (i = 0; !pipeline_ended; i++)
    1609              :         {
    1610            0 :                 bool            first = true;
    1611            0 :                 bool            saw_ending_tuplesok;
    1612            0 :                 bool            isSingleTuple = false;
    1613              : 
    1614              :                 /* Set single row mode for only first 2 SELECT queries */
    1615            0 :                 if (i < 2)
    1616              :                 {
    1617            0 :                         if (PQsetSingleRowMode(conn) != 1)
    1618            0 :                                 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
    1619            0 :                 }
    1620              : 
    1621              :                 /* Consume rows for this query */
    1622            0 :                 saw_ending_tuplesok = false;
    1623            0 :                 while ((res = PQgetResult(conn)) != NULL)
    1624              :                 {
    1625            0 :                         ExecStatusType est = PQresultStatus(res);
    1626              : 
    1627            0 :                         if (est == PGRES_PIPELINE_SYNC)
    1628              :                         {
    1629            0 :                                 fprintf(stderr, "end of pipeline reached\n");
    1630            0 :                                 pipeline_ended = true;
    1631            0 :                                 PQclear(res);
    1632            0 :                                 if (i != 3)
    1633            0 :                                         pg_fatal("Expected three results, got %d", i);
    1634            0 :                                 break;
    1635              :                         }
    1636              : 
    1637              :                         /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
    1638            0 :                         if (first)
    1639              :                         {
    1640            0 :                                 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
    1641            0 :                                         pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
    1642              :                                                          i, PQresStatus(est));
    1643            0 :                                 if (i >= 2 && est != PGRES_TUPLES_OK)
    1644            0 :                                         pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
    1645              :                                                          i, PQresStatus(est));
    1646            0 :                                 first = false;
    1647            0 :                         }
    1648              : 
    1649            0 :                         fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
    1650            0 :                         switch (est)
    1651              :                         {
    1652              :                                 case PGRES_TUPLES_OK:
    1653            0 :                                         fprintf(stderr, ", tuples: %d\n", PQntuples(res));
    1654            0 :                                         saw_ending_tuplesok = true;
    1655            0 :                                         if (isSingleTuple)
    1656              :                                         {
    1657            0 :                                                 if (PQntuples(res) == 0)
    1658            0 :                                                         fprintf(stderr, "all tuples received in query %d\n", i);
    1659              :                                                 else
    1660            0 :                                                         pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
    1661            0 :                                         }
    1662            0 :                                         break;
    1663              : 
    1664              :                                 case PGRES_SINGLE_TUPLE:
    1665            0 :                                         isSingleTuple = true;
    1666            0 :                                         fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
    1667            0 :                                         break;
    1668              : 
    1669              :                                 default:
    1670            0 :                                         pg_fatal("unexpected");
    1671              :                         }
    1672            0 :                         PQclear(res);
    1673            0 :                 }
    1674            0 :                 if (!pipeline_ended && !saw_ending_tuplesok)
    1675            0 :                         pg_fatal("didn't get expected terminating TUPLES_OK");
    1676            0 :         }
    1677              : 
    1678              :         /*
    1679              :          * Now issue one command, get its results in with single-row mode, then
    1680              :          * issue another command, and get its results in normal mode; make sure
    1681              :          * the single-row mode flag is reset as expected.
    1682              :          */
    1683            0 :         if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
    1684            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1685            0 :                 pg_fatal("failed to send query: %s",
    1686              :                                  PQerrorMessage(conn));
    1687            0 :         if (PQsendFlushRequest(conn) != 1)
    1688            0 :                 pg_fatal("failed to send flush request");
    1689            0 :         if (PQsetSingleRowMode(conn) != 1)
    1690            0 :                 pg_fatal("PQsetSingleRowMode() failed");
    1691              : 
    1692            0 :         consume_result_status(conn, PGRES_SINGLE_TUPLE);
    1693              : 
    1694            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
    1695              : 
    1696            0 :         consume_null_result(conn);
    1697              : 
    1698            0 :         if (PQsendQueryParams(conn, "SELECT 1",
    1699            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1700            0 :                 pg_fatal("failed to send query: %s",
    1701              :                                  PQerrorMessage(conn));
    1702            0 :         if (PQsendFlushRequest(conn) != 1)
    1703            0 :                 pg_fatal("failed to send flush request");
    1704              : 
    1705            0 :         consume_result_status(conn, PGRES_TUPLES_OK);
    1706              : 
    1707            0 :         consume_null_result(conn);
    1708              : 
    1709              :         /*
    1710              :          * Try chunked mode as well; make sure that it correctly delivers a
    1711              :          * partial final chunk.
    1712              :          */
    1713            0 :         if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
    1714            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1715            0 :                 pg_fatal("failed to send query: %s",
    1716              :                                  PQerrorMessage(conn));
    1717            0 :         if (PQsendFlushRequest(conn) != 1)
    1718            0 :                 pg_fatal("failed to send flush request");
    1719            0 :         if (PQsetChunkedRowsMode(conn, 3) != 1)
    1720            0 :                 pg_fatal("PQsetChunkedRowsMode() failed");
    1721              : 
    1722            0 :         res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
    1723            0 :         if (PQntuples(res) != 3)
    1724            0 :                 pg_fatal("Expected 3 rows, got %d", PQntuples(res));
    1725            0 :         PQclear(res);
    1726              : 
    1727            0 :         res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
    1728            0 :         if (PQntuples(res) != 2)
    1729            0 :                 pg_fatal("Expected 2 rows, got %d", PQntuples(res));
    1730            0 :         PQclear(res);
    1731              : 
    1732            0 :         res = confirm_result_status(conn, PGRES_TUPLES_OK);
    1733            0 :         if (PQntuples(res) != 0)
    1734            0 :                 pg_fatal("Expected 0 rows, got %d", PQntuples(res));
    1735            0 :         PQclear(res);
    1736              : 
    1737            0 :         consume_null_result(conn);
    1738              : 
    1739            0 :         if (PQexitPipelineMode(conn) != 1)
    1740            0 :                 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1741              : 
    1742            0 :         fprintf(stderr, "ok\n");
    1743            0 : }
    1744              : 
    1745              : /*
    1746              :  * Simple test to verify that a pipeline is discarded as a whole when there's
    1747              :  * an error, ignoring transaction commands.
    1748              :  */
    1749              : static void
    1750            0 : test_transaction(PGconn *conn)
    1751              : {
    1752            0 :         PGresult   *res;
    1753            0 :         bool            expect_null;
    1754            0 :         int                     num_syncs = 0;
    1755              : 
    1756            0 :         res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
    1757              :                                  "CREATE TABLE pq_pipeline_tst (id int)");
    1758            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1759            0 :                 pg_fatal("failed to create test table: %s",
    1760              :                                  PQerrorMessage(conn));
    1761            0 :         PQclear(res);
    1762              : 
    1763            0 :         if (PQenterPipelineMode(conn) != 1)
    1764            0 :                 pg_fatal("failed to enter pipeline mode: %s",
    1765              :                                  PQerrorMessage(conn));
    1766            0 :         if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
    1767            0 :                 pg_fatal("could not send prepare on pipeline: %s",
    1768              :                                  PQerrorMessage(conn));
    1769              : 
    1770            0 :         if (PQsendQueryParams(conn,
    1771              :                                                   "BEGIN",
    1772            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1773            0 :                 pg_fatal("failed to send query: %s",
    1774              :                                  PQerrorMessage(conn));
    1775            0 :         if (PQsendQueryParams(conn,
    1776              :                                                   "SELECT 0/0",
    1777            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1778            0 :                 pg_fatal("failed to send query: %s",
    1779              :                                  PQerrorMessage(conn));
    1780              : 
    1781              :         /*
    1782              :          * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
    1783              :          * get out of the pipeline-aborted state first.
    1784              :          */
    1785            0 :         if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1786            0 :                 pg_fatal("failed to execute prepared: %s",
    1787              :                                  PQerrorMessage(conn));
    1788              : 
    1789              :         /* This insert fails because we're in pipeline-aborted state */
    1790            0 :         if (PQsendQueryParams(conn,
    1791              :                                                   "INSERT INTO pq_pipeline_tst VALUES (1)",
    1792            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1793            0 :                 pg_fatal("failed to send query: %s",
    1794              :                                  PQerrorMessage(conn));
    1795            0 :         if (PQpipelineSync(conn) != 1)
    1796            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1797            0 :         num_syncs++;
    1798              : 
    1799              :         /*
    1800              :          * This insert fails even though the pipeline got a SYNC, because we're in
    1801              :          * an aborted transaction
    1802              :          */
    1803            0 :         if (PQsendQueryParams(conn,
    1804              :                                                   "INSERT INTO pq_pipeline_tst VALUES (2)",
    1805            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1806            0 :                 pg_fatal("failed to send query: %s",
    1807              :                                  PQerrorMessage(conn));
    1808            0 :         if (PQpipelineSync(conn) != 1)
    1809            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1810            0 :         num_syncs++;
    1811              : 
    1812              :         /*
    1813              :          * Send ROLLBACK using prepared stmt. This one works because we just did
    1814              :          * PQpipelineSync above.
    1815              :          */
    1816            0 :         if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1817            0 :                 pg_fatal("failed to execute prepared: %s",
    1818              :                                  PQerrorMessage(conn));
    1819              : 
    1820              :         /*
    1821              :          * Now that we're out of a transaction and in pipeline-good mode, this
    1822              :          * insert works
    1823              :          */
    1824            0 :         if (PQsendQueryParams(conn,
    1825              :                                                   "INSERT INTO pq_pipeline_tst VALUES (3)",
    1826            0 :                                                   0, NULL, NULL, NULL, NULL, 0) != 1)
    1827            0 :                 pg_fatal("failed to send query: %s",
    1828              :                                  PQerrorMessage(conn));
    1829              :         /* Send two syncs now -- match up to SYNC messages below */
    1830            0 :         if (PQpipelineSync(conn) != 1)
    1831            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1832            0 :         num_syncs++;
    1833            0 :         if (PQpipelineSync(conn) != 1)
    1834            0 :                 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1835            0 :         num_syncs++;
    1836              : 
    1837            0 :         expect_null = false;
    1838            0 :         for (int i = 0;; i++)
    1839              :         {
    1840            0 :                 ExecStatusType restype;
    1841              : 
    1842            0 :                 res = PQgetResult(conn);
    1843            0 :                 if (res == NULL)
    1844              :                 {
    1845            0 :                         printf("%d: got NULL result\n", i);
    1846            0 :                         if (!expect_null)
    1847            0 :                                 pg_fatal("did not expect NULL here");
    1848            0 :                         expect_null = false;
    1849            0 :                         continue;
    1850              :                 }
    1851            0 :                 restype = PQresultStatus(res);
    1852            0 :                 printf("%d: got status %s", i, PQresStatus(restype));
    1853            0 :                 if (expect_null)
    1854            0 :                         pg_fatal("expected NULL");
    1855            0 :                 if (restype == PGRES_FATAL_ERROR)
    1856            0 :                         printf("; error: %s", PQerrorMessage(conn));
    1857            0 :                 else if (restype == PGRES_PIPELINE_ABORTED)
    1858              :                 {
    1859            0 :                         printf(": command didn't run because pipeline aborted\n");
    1860            0 :                 }
    1861              :                 else
    1862            0 :                         printf("\n");
    1863            0 :                 PQclear(res);
    1864              : 
    1865            0 :                 if (restype == PGRES_PIPELINE_SYNC)
    1866            0 :                         num_syncs--;
    1867              :                 else
    1868            0 :                         expect_null = true;
    1869            0 :                 if (num_syncs <= 0)
    1870            0 :                         break;
    1871            0 :         }
    1872              : 
    1873            0 :         consume_null_result(conn);
    1874              : 
    1875            0 :         if (PQexitPipelineMode(conn) != 1)
    1876            0 :                 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1877              : 
    1878              :         /* We expect to find one tuple containing the value "3" */
    1879            0 :         res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
    1880            0 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1881            0 :                 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
    1882            0 :         if (PQntuples(res) != 1)
    1883            0 :                 pg_fatal("did not get 1 tuple");
    1884            0 :         if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
    1885            0 :                 pg_fatal("did not get expected tuple");
    1886            0 :         PQclear(res);
    1887              : 
    1888            0 :         fprintf(stderr, "ok\n");
    1889            0 : }
    1890              : 
    1891              : /*
    1892              :  * In this test mode we send a stream of queries, with one in the middle
    1893              :  * causing an error.  Verify that we can still send some more after the
    1894              :  * error and have libpq work properly.
    1895              :  */
    1896              : static void
    1897            0 : test_uniqviol(PGconn *conn)
    1898              : {
    1899            0 :         int                     sock = PQsocket(conn);
    1900            0 :         PGresult   *res;
    1901            0 :         Oid                     paramTypes[2] = {INT8OID, INT8OID};
    1902            0 :         const char *paramValues[2];
    1903            0 :         char            paramValue0[MAXINT8LEN];
    1904            0 :         char            paramValue1[MAXINT8LEN];
    1905            0 :         int                     ctr = 0;
    1906            0 :         int                     numsent = 0;
    1907            0 :         int                     results = 0;
    1908            0 :         bool            read_done = false;
    1909            0 :         bool            write_done = false;
    1910            0 :         bool            error_sent = false;
    1911            0 :         bool            got_error = false;
    1912            0 :         int                     switched = 0;
    1913            0 :         int                     socketful = 0;
    1914            0 :         fd_set          in_fds;
    1915            0 :         fd_set          out_fds;
    1916              : 
    1917            0 :         fprintf(stderr, "uniqviol ...");
    1918              : 
    1919            0 :         PQsetnonblocking(conn, 1);
    1920              : 
    1921            0 :         paramValues[0] = paramValue0;
    1922            0 :         paramValues[1] = paramValue1;
    1923            0 :         sprintf(paramValue1, "42");
    1924              : 
    1925            0 :         res = PQexec(conn, "drop table if exists ppln_uniqviol;"
    1926              :                                  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
    1927            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1928            0 :                 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
    1929            0 :         PQclear(res);
    1930              : 
    1931            0 :         res = PQexec(conn, "begin");
    1932            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1933            0 :                 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
    1934            0 :         PQclear(res);
    1935              : 
    1936            0 :         res = PQprepare(conn, "insertion",
    1937              :                                         "insert into ppln_uniqviol values ($1, $2) returning id",
    1938            0 :                                         2, paramTypes);
    1939            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1940            0 :                 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
    1941            0 :         PQclear(res);
    1942              : 
    1943            0 :         if (PQenterPipelineMode(conn) != 1)
    1944            0 :                 pg_fatal("failed to enter pipeline mode");
    1945              : 
    1946            0 :         while (!read_done)
    1947              :         {
    1948              :                 /*
    1949              :                  * Avoid deadlocks by reading everything the server has sent before
    1950              :                  * sending anything.  (Special precaution is needed here to process
    1951              :                  * PQisBusy before testing the socket for read-readiness, because the
    1952              :                  * socket does not turn read-ready after "sending" queries in aborted
    1953              :                  * pipeline mode.)
    1954              :                  */
    1955            0 :                 while (PQisBusy(conn) == 0)
    1956              :                 {
    1957            0 :                         bool            new_error;
    1958              : 
    1959            0 :                         if (results >= numsent)
    1960              :                         {
    1961            0 :                                 if (write_done)
    1962            0 :                                         read_done = true;
    1963            0 :                                 break;
    1964              :                         }
    1965              : 
    1966            0 :                         res = PQgetResult(conn);
    1967            0 :                         new_error = process_result(conn, res, results, numsent);
    1968            0 :                         if (new_error && got_error)
    1969            0 :                                 pg_fatal("got two errors");
    1970            0 :                         got_error |= new_error;
    1971            0 :                         if (results++ >= numsent - 1)
    1972              :                         {
    1973            0 :                                 if (write_done)
    1974            0 :                                         read_done = true;
    1975            0 :                                 break;
    1976              :                         }
    1977            0 :                 }
    1978              : 
    1979            0 :                 if (read_done)
    1980            0 :                         break;
    1981              : 
    1982            0 :                 FD_ZERO(&out_fds);
    1983            0 :                 FD_SET(sock, &out_fds);
    1984              : 
    1985            0 :                 FD_ZERO(&in_fds);
    1986            0 :                 FD_SET(sock, &in_fds);
    1987              : 
    1988            0 :                 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
    1989              :                 {
    1990            0 :                         if (errno == EINTR)
    1991            0 :                                 continue;
    1992            0 :                         pg_fatal("select() failed: %m");
    1993              :                 }
    1994              : 
    1995            0 :                 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
    1996            0 :                         pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
    1997              : 
    1998              :                 /*
    1999              :                  * If the socket is writable and we haven't finished sending queries,
    2000              :                  * send some.
    2001              :                  */
    2002            0 :                 if (!write_done && FD_ISSET(sock, &out_fds))
    2003              :                 {
    2004            0 :                         for (;;)
    2005              :                         {
    2006            0 :                                 int                     flush;
    2007              : 
    2008              :                                 /*
    2009              :                                  * provoke uniqueness violation exactly once after having
    2010              :                                  * switched to read mode.
    2011              :                                  */
    2012            0 :                                 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
    2013              :                                 {
    2014            0 :                                         sprintf(paramValue0, "%d", numsent / 2);
    2015            0 :                                         fprintf(stderr, "E");
    2016            0 :                                         error_sent = true;
    2017            0 :                                 }
    2018              :                                 else
    2019              :                                 {
    2020            0 :                                         fprintf(stderr, ".");
    2021            0 :                                         sprintf(paramValue0, "%d", ctr++);
    2022              :                                 }
    2023              : 
    2024            0 :                                 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
    2025            0 :                                         pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
    2026            0 :                                 numsent++;
    2027              : 
    2028              :                                 /* Are we done writing? */
    2029            0 :                                 if (socketful != 0 && numsent % socketful == 42 && error_sent)
    2030              :                                 {
    2031            0 :                                         if (PQsendFlushRequest(conn) != 1)
    2032            0 :                                                 pg_fatal("failed to send flush request");
    2033            0 :                                         write_done = true;
    2034            0 :                                         fprintf(stderr, "\ndone writing\n");
    2035            0 :                                         PQflush(conn);
    2036            0 :                                         break;
    2037              :                                 }
    2038              : 
    2039              :                                 /* is the outgoing socket full? */
    2040            0 :                                 flush = PQflush(conn);
    2041            0 :                                 if (flush == -1)
    2042            0 :                                         pg_fatal("failed to flush: %s", PQerrorMessage(conn));
    2043            0 :                                 if (flush == 1)
    2044              :                                 {
    2045            0 :                                         if (socketful == 0)
    2046            0 :                                                 socketful = numsent;
    2047            0 :                                         fprintf(stderr, "\nswitch to reading\n");
    2048            0 :                                         switched++;
    2049            0 :                                         break;
    2050              :                                 }
    2051            0 :                         }
    2052            0 :                 }
    2053              :         }
    2054              : 
    2055            0 :         if (!got_error)
    2056            0 :                 pg_fatal("did not get expected error");
    2057              : 
    2058            0 :         fprintf(stderr, "ok\n");
    2059            0 : }
    2060              : 
    2061              : /*
    2062              :  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
    2063              :  * the expected NULL that should follow it.
    2064              :  *
    2065              :  * Returns true if we read a fatal error message, otherwise false.
    2066              :  */
    2067              : static bool
    2068            0 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
    2069              : {
    2070            0 :         bool            got_error = false;
    2071              : 
    2072            0 :         if (res == NULL)
    2073            0 :                 pg_fatal("got unexpected NULL");
    2074              : 
    2075            0 :         switch (PQresultStatus(res))
    2076              :         {
    2077              :                 case PGRES_FATAL_ERROR:
    2078            0 :                         got_error = true;
    2079            0 :                         fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
    2080            0 :                         PQclear(res);
    2081            0 :                         consume_null_result(conn);
    2082            0 :                         break;
    2083              : 
    2084              :                 case PGRES_TUPLES_OK:
    2085            0 :                         fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
    2086            0 :                         PQclear(res);
    2087            0 :                         consume_null_result(conn);
    2088            0 :                         break;
    2089              : 
    2090              :                 case PGRES_PIPELINE_ABORTED:
    2091            0 :                         fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
    2092            0 :                         PQclear(res);
    2093            0 :                         consume_null_result(conn);
    2094            0 :                         break;
    2095              : 
    2096              :                 default:
    2097            0 :                         pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
    2098              :         }
    2099              : 
    2100            0 :         return got_error;
    2101            0 : }
    2102              : 
    2103              : 
    2104              : static void
    2105            0 : usage(const char *progname)
    2106              : {
    2107            0 :         fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
    2108            0 :         fprintf(stderr, "Usage:\n");
    2109            0 :         fprintf(stderr, "  %s [OPTION] tests\n", progname);
    2110            0 :         fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
    2111            0 :         fprintf(stderr, "\nOptions:\n");
    2112            0 :         fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
    2113            0 :         fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
    2114            0 : }
    2115              : 
    2116              : static void
    2117            0 : print_test_list(void)
    2118              : {
    2119            0 :         printf("cancel\n");
    2120            0 :         printf("disallowed_in_pipeline\n");
    2121            0 :         printf("multi_pipelines\n");
    2122            0 :         printf("nosync\n");
    2123            0 :         printf("pipeline_abort\n");
    2124            0 :         printf("pipeline_idle\n");
    2125            0 :         printf("pipelined_insert\n");
    2126            0 :         printf("prepared\n");
    2127            0 :         printf("protocol_version\n");
    2128            0 :         printf("simple_pipeline\n");
    2129            0 :         printf("singlerow\n");
    2130            0 :         printf("transaction\n");
    2131            0 :         printf("uniqviol\n");
    2132            0 : }
    2133              : 
    2134              : int
    2135            0 : main(int argc, char **argv)
    2136              : {
    2137            0 :         const char *conninfo = "";
    2138            0 :         PGconn     *conn;
    2139            0 :         FILE       *trace = NULL;
    2140            0 :         char       *testname;
    2141            0 :         int                     numrows = 10000;
    2142            0 :         PGresult   *res;
    2143            0 :         int                     c;
    2144              : 
    2145            0 :         while ((c = getopt(argc, argv, "r:t:")) != -1)
    2146              :         {
    2147            0 :                 switch (c)
    2148              :                 {
    2149              :                         case 'r':                       /* numrows */
    2150            0 :                                 errno = 0;
    2151            0 :                                 numrows = strtol(optarg, NULL, 10);
    2152            0 :                                 if (errno != 0 || numrows <= 0)
    2153              :                                 {
    2154            0 :                                         fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
    2155            0 :                                                         optarg);
    2156            0 :                                         exit(1);
    2157              :                                 }
    2158            0 :                                 break;
    2159              :                         case 't':                       /* trace file */
    2160            0 :                                 tracefile = pg_strdup(optarg);
    2161            0 :                                 break;
    2162              :                 }
    2163              :         }
    2164              : 
    2165            0 :         if (optind < argc)
    2166              :         {
    2167            0 :                 testname = pg_strdup(argv[optind]);
    2168            0 :                 optind++;
    2169            0 :         }
    2170              :         else
    2171              :         {
    2172            0 :                 usage(argv[0]);
    2173            0 :                 exit(1);
    2174              :         }
    2175              : 
    2176            0 :         if (strcmp(testname, "tests") == 0)
    2177              :         {
    2178            0 :                 print_test_list();
    2179            0 :                 exit(0);
    2180              :         }
    2181              : 
    2182            0 :         if (optind < argc)
    2183              :         {
    2184            0 :                 conninfo = pg_strdup(argv[optind]);
    2185            0 :                 optind++;
    2186            0 :         }
    2187              : 
    2188              :         /* Make a connection to the database */
    2189            0 :         conn = PQconnectdb(conninfo);
    2190            0 :         if (PQstatus(conn) != CONNECTION_OK)
    2191              :         {
    2192            0 :                 fprintf(stderr, "Connection to database failed: %s\n",
    2193            0 :                                 PQerrorMessage(conn));
    2194            0 :                 exit_nicely(conn);
    2195            0 :         }
    2196              : 
    2197            0 :         res = PQexec(conn, "SET lc_messages TO \"C\"");
    2198            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2199            0 :                 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
    2200            0 :         PQclear(res);
    2201            0 :         res = PQexec(conn, "SET debug_parallel_query = off");
    2202            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2203            0 :                 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
    2204            0 :         PQclear(res);
    2205              : 
    2206              :         /* Set the trace file, if requested */
    2207            0 :         if (tracefile != NULL)
    2208              :         {
    2209            0 :                 if (strcmp(tracefile, "-") == 0)
    2210            0 :                         trace = stdout;
    2211              :                 else
    2212            0 :                         trace = fopen(tracefile, "w");
    2213            0 :                 if (trace == NULL)
    2214            0 :                         pg_fatal("could not open file \"%s\": %m", tracefile);
    2215              : 
    2216              :                 /* Make it line-buffered */
    2217            0 :                 setvbuf(trace, NULL, PG_IOLBF, 0);
    2218              : 
    2219            0 :                 PQtrace(conn, trace);
    2220            0 :                 PQsetTraceFlags(conn,
    2221              :                                                 PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    2222            0 :         }
    2223              : 
    2224            0 :         if (strcmp(testname, "cancel") == 0)
    2225            0 :                 test_cancel(conn);
    2226            0 :         else if (strcmp(testname, "disallowed_in_pipeline") == 0)
    2227            0 :                 test_disallowed_in_pipeline(conn);
    2228            0 :         else if (strcmp(testname, "multi_pipelines") == 0)
    2229            0 :                 test_multi_pipelines(conn);
    2230            0 :         else if (strcmp(testname, "nosync") == 0)
    2231            0 :                 test_nosync(conn);
    2232            0 :         else if (strcmp(testname, "pipeline_abort") == 0)
    2233            0 :                 test_pipeline_abort(conn);
    2234            0 :         else if (strcmp(testname, "pipeline_idle") == 0)
    2235            0 :                 test_pipeline_idle(conn);
    2236            0 :         else if (strcmp(testname, "pipelined_insert") == 0)
    2237            0 :                 test_pipelined_insert(conn, numrows);
    2238            0 :         else if (strcmp(testname, "prepared") == 0)
    2239            0 :                 test_prepared(conn);
    2240            0 :         else if (strcmp(testname, "protocol_version") == 0)
    2241            0 :                 test_protocol_version(conn);
    2242            0 :         else if (strcmp(testname, "simple_pipeline") == 0)
    2243            0 :                 test_simple_pipeline(conn);
    2244            0 :         else if (strcmp(testname, "singlerow") == 0)
    2245            0 :                 test_singlerowmode(conn);
    2246            0 :         else if (strcmp(testname, "transaction") == 0)
    2247            0 :                 test_transaction(conn);
    2248            0 :         else if (strcmp(testname, "uniqviol") == 0)
    2249            0 :                 test_uniqviol(conn);
    2250              :         else
    2251              :         {
    2252            0 :                 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
    2253            0 :                 exit(1);
    2254              :         }
    2255              : 
    2256              :         /* close the connection to the database and cleanup */
    2257            0 :         PQfinish(conn);
    2258              : 
    2259            0 :         if (trace && trace != stdout)
    2260            0 :                 fclose(trace);
    2261              : 
    2262            0 :         return 0;
    2263            0 : }
        

Generated by: LCOV version 2.3.2-1