LCOV - code coverage report
Current view: top level - contrib/postgres_fdw - connection.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 884 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 51 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * connection.c
       4              :  *                Connection management functions for postgres_fdw
       5              :  *
       6              :  * Portions Copyright (c) 2012-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *                contrib/postgres_fdw/connection.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #if HAVE_POLL_H
      16              : #include <poll.h>
      17              : #endif
      18              : 
      19              : #include "access/htup_details.h"
      20              : #include "access/xact.h"
      21              : #include "catalog/pg_user_mapping.h"
      22              : #include "commands/defrem.h"
      23              : #include "common/base64.h"
      24              : #include "funcapi.h"
      25              : #include "libpq/libpq-be.h"
      26              : #include "libpq/libpq-be-fe-helpers.h"
      27              : #include "mb/pg_wchar.h"
      28              : #include "miscadmin.h"
      29              : #include "pgstat.h"
      30              : #include "postgres_fdw.h"
      31              : #include "storage/latch.h"
      32              : #include "utils/builtins.h"
      33              : #include "utils/hsearch.h"
      34              : #include "utils/inval.h"
      35              : #include "utils/syscache.h"
      36              : 
      37              : /*
      38              :  * Connection cache hash table entry
      39              :  *
      40              :  * The lookup key in this hash table is the user mapping OID. We use just one
      41              :  * connection per user mapping ID, which ensures that all the scans use the
      42              :  * same snapshot during a query.  Using the user mapping OID rather than
      43              :  * the foreign server OID + user OID avoids creating multiple connections when
      44              :  * the public user mapping applies to all user OIDs.
      45              :  *
      46              :  * The "conn" pointer can be NULL if we don't currently have a live connection.
      47              :  * When we do have a connection, xact_depth tracks the current depth of
      48              :  * transactions and subtransactions open on the remote side.  We need to issue
      49              :  * commands at the same nesting depth on the remote as we're executing at
      50              :  * ourselves, so that rolling back a subtransaction will kill the right
      51              :  * queries and not the wrong ones.
      52              :  */
      53              : typedef Oid ConnCacheKey;
      54              : 
      55              : typedef struct ConnCacheEntry
      56              : {
      57              :         ConnCacheKey key;                       /* hash key (must be first) */
      58              :         PGconn     *conn;                       /* connection to foreign server, or NULL */
      59              :         /* Remaining fields are invalid when conn is NULL: */
      60              :         int                     xact_depth;             /* 0 = no xact open, 1 = main xact open, 2 =
      61              :                                                                  * one level of subxact open, etc */
      62              :         bool            have_prep_stmt; /* have we prepared any stmts in this xact? */
      63              :         bool            have_error;             /* have any subxacts aborted in this xact? */
      64              :         bool            changing_xact_state;    /* xact state change in process */
      65              :         bool            parallel_commit;        /* do we commit (sub)xacts in parallel? */
      66              :         bool            parallel_abort; /* do we abort (sub)xacts in parallel? */
      67              :         bool            invalidated;    /* true if reconnect is pending */
      68              :         bool            keep_connections;       /* setting value of keep_connections
      69              :                                                                          * server option */
      70              :         Oid                     serverid;               /* foreign server OID used to get server name */
      71              :         uint32          server_hashvalue;       /* hash value of foreign server OID */
      72              :         uint32          mapping_hashvalue;      /* hash value of user mapping OID */
      73              :         PgFdwConnState state;           /* extra per-connection state */
      74              : } ConnCacheEntry;
      75              : 
      76              : /*
      77              :  * Connection cache (initialized on first use)
      78              :  */
      79              : static HTAB *ConnectionHash = NULL;
      80              : 
      81              : /* for assigning cursor numbers and prepared statement numbers */
      82              : static unsigned int cursor_number = 0;
      83              : static unsigned int prep_stmt_number = 0;
      84              : 
      85              : /* tracks whether any work is needed in callback functions */
      86              : static bool xact_got_connection = false;
      87              : 
      88              : /* custom wait event values, retrieved from shared memory */
      89              : static uint32 pgfdw_we_cleanup_result = 0;
      90              : static uint32 pgfdw_we_connect = 0;
      91              : static uint32 pgfdw_we_get_result = 0;
      92              : 
      93              : /*
      94              :  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
      95              :  * query; if it takes longer than 30 seconds to do these, we assume the
      96              :  * connection is dead.
      97              :  */
      98              : #define CONNECTION_CLEANUP_TIMEOUT      30000
      99              : 
     100              : /*
     101              :  * Milliseconds to wait before issuing another cancel request.  This covers
     102              :  * the race condition where the remote session ignored our cancel request
     103              :  * because it arrived while idle.
     104              :  */
     105              : #define RETRY_CANCEL_TIMEOUT    1000
     106              : 
     107              : /* Macro for constructing abort command to be sent */
     108              : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
     109              :         do { \
     110              :                 if (toplevel) \
     111              :                         snprintf((sql), sizeof(sql), \
     112              :                                          "ABORT TRANSACTION"); \
     113              :                 else \
     114              :                         snprintf((sql), sizeof(sql), \
     115              :                                          "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
     116              :                                          (entry)->xact_depth, (entry)->xact_depth); \
     117              :         } while(0)
     118              : 
     119              : /*
     120              :  * Extension version number, for supporting older extension versions' objects
     121              :  */
     122              : enum pgfdwVersion
     123              : {
     124              :         PGFDW_V1_1 = 0,
     125              :         PGFDW_V1_2,
     126              : };
     127              : 
     128              : /*
     129              :  * SQL functions
     130              :  */
     131            0 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
     132            0 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
     133            0 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
     134            0 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
     135              : 
     136              : /* prototypes of private functions */
     137              : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
     138              : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
     139              : static void disconnect_pg_server(ConnCacheEntry *entry);
     140              : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
     141              : static void configure_remote_session(PGconn *conn);
     142              : static void do_sql_command_begin(PGconn *conn, const char *sql);
     143              : static void do_sql_command_end(PGconn *conn, const char *sql,
     144              :                                                            bool consume_input);
     145              : static void begin_remote_xact(ConnCacheEntry *entry);
     146              : static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
     147              :                                                                   const char *sql);
     148              : static void pgfdw_xact_callback(XactEvent event, void *arg);
     149              : static void pgfdw_subxact_callback(SubXactEvent event,
     150              :                                                                    SubTransactionId mySubid,
     151              :                                                                    SubTransactionId parentSubid,
     152              :                                                                    void *arg);
     153              : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
     154              : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
     155              : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
     156              : static bool pgfdw_cancel_query(PGconn *conn);
     157              : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
     158              : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
     159              :                                                                    TimestampTz retrycanceltime,
     160              :                                                                    bool consume_input);
     161              : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
     162              :                                                                          bool ignore_errors);
     163              : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
     164              : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
     165              :                                                                                  TimestampTz endtime,
     166              :                                                                                  bool consume_input,
     167              :                                                                                  bool ignore_errors);
     168              : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
     169              :                                                                          TimestampTz retrycanceltime,
     170              :                                                                          PGresult **result, bool *timed_out);
     171              : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
     172              : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
     173              :                                                                           List **pending_entries,
     174              :                                                                           List **cancel_requested);
     175              : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
     176              : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
     177              :                                                                                            int curlevel);
     178              : static void pgfdw_finish_abort_cleanup(List *pending_entries,
     179              :                                                                            List *cancel_requested,
     180              :                                                                            bool toplevel);
     181              : static void pgfdw_security_check(const char **keywords, const char **values,
     182              :                                                                  UserMapping *user, PGconn *conn);
     183              : static bool UserMappingPasswordRequired(UserMapping *user);
     184              : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
     185              : static bool disconnect_cached_connections(Oid serverid);
     186              : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
     187              :                                                                                                   enum pgfdwVersion api_version);
     188              : static int      pgfdw_conn_check(PGconn *conn);
     189              : static bool pgfdw_conn_checkable(void);
     190              : static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
     191              : 
     192              : /*
     193              :  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
     194              :  * server with the user's authorization.  A new connection is established
     195              :  * if we don't already have a suitable one, and a transaction is opened at
     196              :  * the right subtransaction nesting depth if we didn't do that already.
     197              :  *
     198              :  * will_prep_stmt must be true if caller intends to create any prepared
     199              :  * statements.  Since those don't go away automatically at transaction end
     200              :  * (not even on error), we need this flag to cue manual cleanup.
     201              :  *
     202              :  * If state is not NULL, *state receives the per-connection state associated
     203              :  * with the PGconn.
     204              :  */
     205              : PGconn *
     206            0 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
     207              : {
     208            0 :         bool            found;
     209            0 :         bool            retry = false;
     210            0 :         ConnCacheEntry *entry;
     211            0 :         ConnCacheKey key;
     212            0 :         MemoryContext ccxt = CurrentMemoryContext;
     213              : 
     214              :         /* First time through, initialize connection cache hashtable */
     215            0 :         if (ConnectionHash == NULL)
     216              :         {
     217            0 :                 HASHCTL         ctl;
     218              : 
     219            0 :                 if (pgfdw_we_get_result == 0)
     220            0 :                         pgfdw_we_get_result =
     221            0 :                                 WaitEventExtensionNew("PostgresFdwGetResult");
     222              : 
     223            0 :                 ctl.keysize = sizeof(ConnCacheKey);
     224            0 :                 ctl.entrysize = sizeof(ConnCacheEntry);
     225            0 :                 ConnectionHash = hash_create("postgres_fdw connections", 8,
     226              :                                                                          &ctl,
     227              :                                                                          HASH_ELEM | HASH_BLOBS);
     228              : 
     229              :                 /*
     230              :                  * Register some callback functions that manage connection cleanup.
     231              :                  * This should be done just once in each backend.
     232              :                  */
     233            0 :                 RegisterXactCallback(pgfdw_xact_callback, NULL);
     234            0 :                 RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
     235            0 :                 CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
     236              :                                                                           pgfdw_inval_callback, (Datum) 0);
     237            0 :                 CacheRegisterSyscacheCallback(USERMAPPINGOID,
     238              :                                                                           pgfdw_inval_callback, (Datum) 0);
     239            0 :         }
     240              : 
     241              :         /* Set flag that we did GetConnection during the current transaction */
     242            0 :         xact_got_connection = true;
     243              : 
     244              :         /* Create hash key for the entry.  Assume no pad bytes in key struct */
     245            0 :         key = user->umid;
     246              : 
     247              :         /*
     248              :          * Find or create cached entry for requested connection.
     249              :          */
     250            0 :         entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
     251            0 :         if (!found)
     252              :         {
     253              :                 /*
     254              :                  * We need only clear "conn" here; remaining fields will be filled
     255              :                  * later when "conn" is set.
     256              :                  */
     257            0 :                 entry->conn = NULL;
     258            0 :         }
     259              : 
     260              :         /* Reject further use of connections which failed abort cleanup. */
     261            0 :         pgfdw_reject_incomplete_xact_state_change(entry);
     262              : 
     263              :         /*
     264              :          * If the connection needs to be remade due to invalidation, disconnect as
     265              :          * soon as we're out of all transactions.
     266              :          */
     267            0 :         if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
     268              :         {
     269            0 :                 elog(DEBUG3, "closing connection %p for option changes to take effect",
     270              :                          entry->conn);
     271            0 :                 disconnect_pg_server(entry);
     272            0 :         }
     273              : 
     274              :         /*
     275              :          * If cache entry doesn't have a connection, we have to establish a new
     276              :          * connection.  (If connect_pg_server throws an error, the cache entry
     277              :          * will remain in a valid empty state, ie conn == NULL.)
     278              :          */
     279            0 :         if (entry->conn == NULL)
     280            0 :                 make_new_connection(entry, user);
     281              : 
     282              :         /*
     283              :          * We check the health of the cached connection here when using it.  In
     284              :          * cases where we're out of all transactions, if a broken connection is
     285              :          * detected, we try to reestablish a new connection later.
     286              :          */
     287            0 :         PG_TRY();
     288              :         {
     289              :                 /* Process a pending asynchronous request if any. */
     290            0 :                 if (entry->state.pendingAreq)
     291            0 :                         process_pending_request(entry->state.pendingAreq);
     292              :                 /* Start a new transaction or subtransaction if needed. */
     293            0 :                 begin_remote_xact(entry);
     294              :         }
     295            0 :         PG_CATCH();
     296              :         {
     297            0 :                 MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
     298            0 :                 ErrorData  *errdata = CopyErrorData();
     299              : 
     300              :                 /*
     301              :                  * Determine whether to try to reestablish the connection.
     302              :                  *
     303              :                  * After a broken connection is detected in libpq, any error other
     304              :                  * than connection failure (e.g., out-of-memory) can be thrown
     305              :                  * somewhere between return from libpq and the expected ereport() call
     306              :                  * in pgfdw_report_error(). In this case, since PQstatus() indicates
     307              :                  * CONNECTION_BAD, checking only PQstatus() causes the false detection
     308              :                  * of connection failure. To avoid this, we also verify that the
     309              :                  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
     310              :                  * checking only the sqlstate can cause another false detection
     311              :                  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
     312              :                  * for any libpq-originated error condition.
     313              :                  */
     314            0 :                 if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
     315            0 :                         PQstatus(entry->conn) != CONNECTION_BAD ||
     316            0 :                         entry->xact_depth > 0)
     317              :                 {
     318            0 :                         MemoryContextSwitchTo(ecxt);
     319            0 :                         PG_RE_THROW();
     320              :                 }
     321              : 
     322              :                 /* Clean up the error state */
     323            0 :                 FlushErrorState();
     324            0 :                 FreeErrorData(errdata);
     325            0 :                 errdata = NULL;
     326              : 
     327            0 :                 retry = true;
     328            0 :         }
     329            0 :         PG_END_TRY();
     330              : 
     331              :         /*
     332              :          * If a broken connection is detected, disconnect it, reestablish a new
     333              :          * connection and retry a new remote transaction. If connection failure is
     334              :          * reported again, we give up getting a connection.
     335              :          */
     336            0 :         if (retry)
     337              :         {
     338            0 :                 Assert(entry->xact_depth == 0);
     339              : 
     340            0 :                 ereport(DEBUG3,
     341              :                                 (errmsg_internal("could not start remote transaction on connection %p",
     342              :                                                                  entry->conn)),
     343              :                                 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
     344              : 
     345            0 :                 elog(DEBUG3, "closing connection %p to reestablish a new one",
     346              :                          entry->conn);
     347            0 :                 disconnect_pg_server(entry);
     348              : 
     349            0 :                 make_new_connection(entry, user);
     350              : 
     351            0 :                 begin_remote_xact(entry);
     352            0 :         }
     353              : 
     354              :         /* Remember if caller will prepare statements */
     355            0 :         entry->have_prep_stmt |= will_prep_stmt;
     356              : 
     357              :         /* If caller needs access to the per-connection state, return it. */
     358            0 :         if (state)
     359            0 :                 *state = &entry->state;
     360              : 
     361            0 :         return entry->conn;
     362            0 : }
     363              : 
     364              : /*
     365              :  * Reset all transient state fields in the cached connection entry and
     366              :  * establish new connection to the remote server.
     367              :  */
     368              : static void
     369            0 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
     370              : {
     371            0 :         ForeignServer *server = GetForeignServer(user->serverid);
     372            0 :         ListCell   *lc;
     373              : 
     374            0 :         Assert(entry->conn == NULL);
     375              : 
     376              :         /* Reset all transient state fields, to be sure all are clean */
     377            0 :         entry->xact_depth = 0;
     378            0 :         entry->have_prep_stmt = false;
     379            0 :         entry->have_error = false;
     380            0 :         entry->changing_xact_state = false;
     381            0 :         entry->invalidated = false;
     382            0 :         entry->serverid = server->serverid;
     383            0 :         entry->server_hashvalue =
     384            0 :                 GetSysCacheHashValue1(FOREIGNSERVEROID,
     385              :                                                           ObjectIdGetDatum(server->serverid));
     386            0 :         entry->mapping_hashvalue =
     387            0 :                 GetSysCacheHashValue1(USERMAPPINGOID,
     388              :                                                           ObjectIdGetDatum(user->umid));
     389            0 :         memset(&entry->state, 0, sizeof(entry->state));
     390              : 
     391              :         /*
     392              :          * Determine whether to keep the connection that we're about to make here
     393              :          * open even after the transaction using it ends, so that the subsequent
     394              :          * transactions can re-use it.
     395              :          *
     396              :          * By default, all the connections to any foreign servers are kept open.
     397              :          *
     398              :          * Also determine whether to commit/abort (sub)transactions opened on the
     399              :          * remote server in parallel at (sub)transaction end, which is disabled by
     400              :          * default.
     401              :          *
     402              :          * Note: it's enough to determine these only when making a new connection
     403              :          * because if these settings for it are changed, it will be closed and
     404              :          * re-made later.
     405              :          */
     406            0 :         entry->keep_connections = true;
     407            0 :         entry->parallel_commit = false;
     408            0 :         entry->parallel_abort = false;
     409            0 :         foreach(lc, server->options)
     410              :         {
     411            0 :                 DefElem    *def = (DefElem *) lfirst(lc);
     412              : 
     413            0 :                 if (strcmp(def->defname, "keep_connections") == 0)
     414            0 :                         entry->keep_connections = defGetBoolean(def);
     415            0 :                 else if (strcmp(def->defname, "parallel_commit") == 0)
     416            0 :                         entry->parallel_commit = defGetBoolean(def);
     417            0 :                 else if (strcmp(def->defname, "parallel_abort") == 0)
     418            0 :                         entry->parallel_abort = defGetBoolean(def);
     419            0 :         }
     420              : 
     421              :         /* Now try to make the connection */
     422            0 :         entry->conn = connect_pg_server(server, user);
     423              : 
     424            0 :         elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
     425              :                  entry->conn, server->servername, user->umid, user->userid);
     426            0 : }
     427              : 
     428              : /*
     429              :  * Check that non-superuser has used password or delegated credentials
     430              :  * to establish connection; otherwise, he's piggybacking on the
     431              :  * postgres server's user identity. See also dblink_security_check()
     432              :  * in contrib/dblink and check_conn_params.
     433              :  */
     434              : static void
     435            0 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
     436              : {
     437              :         /* Superusers bypass the check */
     438            0 :         if (superuser_arg(user->userid))
     439            0 :                 return;
     440              : 
     441              : #ifdef ENABLE_GSS
     442              :         /* Connected via GSSAPI with delegated credentials- all good. */
     443            0 :         if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
     444            0 :                 return;
     445              : #endif
     446              : 
     447              :         /* Ok if superuser set PW required false. */
     448            0 :         if (!UserMappingPasswordRequired(user))
     449            0 :                 return;
     450              : 
     451              :         /* Connected via PW, with PW required true, and provided non-empty PW. */
     452            0 :         if (PQconnectionUsedPassword(conn))
     453              :         {
     454              :                 /* ok if params contain a non-empty password */
     455            0 :                 for (int i = 0; keywords[i] != NULL; i++)
     456              :                 {
     457            0 :                         if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     458            0 :                                 return;
     459            0 :                 }
     460            0 :         }
     461              : 
     462              :         /*
     463              :          * Ok if SCRAM pass-through is being used and all required SCRAM options
     464              :          * are set correctly. If pgfdw_has_required_scram_options returns true we
     465              :          * assume that UseScramPassthrough is also true since SCRAM options are
     466              :          * only set when UseScramPassthrough is enabled.
     467              :          */
     468            0 :         if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
     469            0 :                 return;
     470              : 
     471            0 :         ereport(ERROR,
     472              :                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     473              :                          errmsg("password or GSSAPI delegated credentials required"),
     474              :                          errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
     475              :                          errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
     476            0 : }
     477              : 
     478              : /*
     479              :  * Connect to remote server using specified server and user mapping properties.
     480              :  */
     481              : static PGconn *
     482            0 : connect_pg_server(ForeignServer *server, UserMapping *user)
     483              : {
     484            0 :         PGconn     *volatile conn = NULL;
     485              : 
     486              :         /*
     487              :          * Use PG_TRY block to ensure closing connection on error.
     488              :          */
     489            0 :         PG_TRY();
     490              :         {
     491            0 :                 const char **keywords;
     492            0 :                 const char **values;
     493            0 :                 char       *appname = NULL;
     494            0 :                 int                     n;
     495              : 
     496              :                 /*
     497              :                  * Construct connection params from generic options of ForeignServer
     498              :                  * and UserMapping.  (Some of them might not be libpq options, in
     499              :                  * which case we'll just waste a few array slots.)  Add 4 extra slots
     500              :                  * for application_name, fallback_application_name, client_encoding,
     501              :                  * end marker, and 3 extra slots for scram keys and required scram
     502              :                  * pass-through options.
     503              :                  */
     504            0 :                 n = list_length(server->options) + list_length(user->options) + 4 + 3;
     505            0 :                 keywords = (const char **) palloc(n * sizeof(char *));
     506            0 :                 values = (const char **) palloc(n * sizeof(char *));
     507              : 
     508            0 :                 n = 0;
     509            0 :                 n += ExtractConnectionOptions(server->options,
     510            0 :                                                                           keywords + n, values + n);
     511            0 :                 n += ExtractConnectionOptions(user->options,
     512            0 :                                                                           keywords + n, values + n);
     513              : 
     514              :                 /*
     515              :                  * Use pgfdw_application_name as application_name if set.
     516              :                  *
     517              :                  * PQconnectdbParams() processes the parameter arrays from start to
     518              :                  * end. If any key word is repeated, the last value is used. Therefore
     519              :                  * note that pgfdw_application_name must be added to the arrays after
     520              :                  * options of ForeignServer are, so that it can override
     521              :                  * application_name set in ForeignServer.
     522              :                  */
     523            0 :                 if (pgfdw_application_name && *pgfdw_application_name != '\0')
     524              :                 {
     525            0 :                         keywords[n] = "application_name";
     526            0 :                         values[n] = pgfdw_application_name;
     527            0 :                         n++;
     528            0 :                 }
     529              : 
     530              :                 /*
     531              :                  * Search the parameter arrays to find application_name setting, and
     532              :                  * replace escape sequences in it with status information if found.
     533              :                  * The arrays are searched backwards because the last value is used if
     534              :                  * application_name is repeatedly set.
     535              :                  */
     536            0 :                 for (int i = n - 1; i >= 0; i--)
     537              :                 {
     538            0 :                         if (strcmp(keywords[i], "application_name") == 0 &&
     539            0 :                                 *(values[i]) != '\0')
     540              :                         {
     541              :                                 /*
     542              :                                  * Use this application_name setting if it's not empty string
     543              :                                  * even after any escape sequences in it are replaced.
     544              :                                  */
     545            0 :                                 appname = process_pgfdw_appname(values[i]);
     546            0 :                                 if (appname[0] != '\0')
     547              :                                 {
     548            0 :                                         values[i] = appname;
     549            0 :                                         break;
     550              :                                 }
     551              : 
     552              :                                 /*
     553              :                                  * This empty application_name is not used, so we set
     554              :                                  * values[i] to NULL and keep searching the array to find the
     555              :                                  * next one.
     556              :                                  */
     557            0 :                                 values[i] = NULL;
     558            0 :                                 pfree(appname);
     559            0 :                                 appname = NULL;
     560            0 :                         }
     561            0 :                 }
     562              : 
     563              :                 /* Use "postgres_fdw" as fallback_application_name */
     564            0 :                 keywords[n] = "fallback_application_name";
     565            0 :                 values[n] = "postgres_fdw";
     566            0 :                 n++;
     567              : 
     568              :                 /* Set client_encoding so that libpq can convert encoding properly. */
     569            0 :                 keywords[n] = "client_encoding";
     570            0 :                 values[n] = GetDatabaseEncodingName();
     571            0 :                 n++;
     572              : 
     573              :                 /* Add required SCRAM pass-through connection options if it's enabled. */
     574            0 :                 if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
     575              :                 {
     576            0 :                         int                     len;
     577            0 :                         int                     encoded_len;
     578              : 
     579            0 :                         keywords[n] = "scram_client_key";
     580            0 :                         len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
     581              :                         /* don't forget the zero-terminator */
     582            0 :                         values[n] = palloc0(len + 1);
     583            0 :                         encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
     584              :                                                                                 sizeof(MyProcPort->scram_ClientKey),
     585            0 :                                                                                 (char *) values[n], len);
     586            0 :                         if (encoded_len < 0)
     587            0 :                                 elog(ERROR, "could not encode SCRAM client key");
     588            0 :                         n++;
     589              : 
     590            0 :                         keywords[n] = "scram_server_key";
     591            0 :                         len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
     592              :                         /* don't forget the zero-terminator */
     593            0 :                         values[n] = palloc0(len + 1);
     594            0 :                         encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
     595              :                                                                                 sizeof(MyProcPort->scram_ServerKey),
     596            0 :                                                                                 (char *) values[n], len);
     597            0 :                         if (encoded_len < 0)
     598            0 :                                 elog(ERROR, "could not encode SCRAM server key");
     599            0 :                         n++;
     600              : 
     601              :                         /*
     602              :                          * Require scram-sha-256 to ensure that no other auth method is
     603              :                          * used when connecting with foreign server.
     604              :                          */
     605            0 :                         keywords[n] = "require_auth";
     606            0 :                         values[n] = "scram-sha-256";
     607            0 :                         n++;
     608            0 :                 }
     609              : 
     610            0 :                 keywords[n] = values[n] = NULL;
     611              : 
     612              :                 /* Verify the set of connection parameters. */
     613            0 :                 check_conn_params(keywords, values, user);
     614              : 
     615              :                 /* first time, allocate or get the custom wait event */
     616            0 :                 if (pgfdw_we_connect == 0)
     617            0 :                         pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
     618              : 
     619              :                 /* OK to make connection */
     620            0 :                 conn = libpqsrv_connect_params(keywords, values,
     621              :                                                                            false,       /* expand_dbname */
     622            0 :                                                                            pgfdw_we_connect);
     623              : 
     624            0 :                 if (!conn || PQstatus(conn) != CONNECTION_OK)
     625            0 :                         ereport(ERROR,
     626              :                                         (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     627              :                                          errmsg("could not connect to server \"%s\"",
     628              :                                                         server->servername),
     629              :                                          errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
     630              : 
     631            0 :                 PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     632              :                                                         "received message via remote connection");
     633              : 
     634              :                 /* Perform post-connection security checks. */
     635            0 :                 pgfdw_security_check(keywords, values, user, conn);
     636              : 
     637              :                 /* Prepare new session for use */
     638            0 :                 configure_remote_session(conn);
     639              : 
     640            0 :                 if (appname != NULL)
     641            0 :                         pfree(appname);
     642            0 :                 pfree(keywords);
     643            0 :                 pfree(values);
     644            0 :         }
     645            0 :         PG_CATCH();
     646              :         {
     647            0 :                 libpqsrv_disconnect(conn);
     648            0 :                 PG_RE_THROW();
     649              :         }
     650            0 :         PG_END_TRY();
     651              : 
     652            0 :         return conn;
     653            0 : }
     654              : 
     655              : /*
     656              :  * Disconnect any open connection for a connection cache entry.
     657              :  */
     658              : static void
     659            0 : disconnect_pg_server(ConnCacheEntry *entry)
     660              : {
     661            0 :         if (entry->conn != NULL)
     662              :         {
     663            0 :                 libpqsrv_disconnect(entry->conn);
     664            0 :                 entry->conn = NULL;
     665            0 :         }
     666            0 : }
     667              : 
     668              : /*
     669              :  * Return true if the password_required is defined and false for this user
     670              :  * mapping, otherwise false. The mapping has been pre-validated.
     671              :  */
     672              : static bool
     673            0 : UserMappingPasswordRequired(UserMapping *user)
     674              : {
     675            0 :         ListCell   *cell;
     676              : 
     677            0 :         foreach(cell, user->options)
     678              :         {
     679            0 :                 DefElem    *def = (DefElem *) lfirst(cell);
     680              : 
     681            0 :                 if (strcmp(def->defname, "password_required") == 0)
     682            0 :                         return defGetBoolean(def);
     683            0 :         }
     684              : 
     685            0 :         return true;
     686            0 : }
     687              : 
     688              : static bool
     689            0 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
     690              : {
     691            0 :         ListCell   *cell;
     692              : 
     693            0 :         foreach(cell, server->options)
     694              :         {
     695            0 :                 DefElem    *def = (DefElem *) lfirst(cell);
     696              : 
     697            0 :                 if (strcmp(def->defname, "use_scram_passthrough") == 0)
     698            0 :                         return defGetBoolean(def);
     699            0 :         }
     700              : 
     701            0 :         foreach(cell, user->options)
     702              :         {
     703            0 :                 DefElem    *def = (DefElem *) lfirst(cell);
     704              : 
     705            0 :                 if (strcmp(def->defname, "use_scram_passthrough") == 0)
     706            0 :                         return defGetBoolean(def);
     707            0 :         }
     708              : 
     709            0 :         return false;
     710            0 : }
     711              : 
     712              : /*
     713              :  * For non-superusers, insist that the connstr specify a password or that the
     714              :  * user provided their own GSSAPI delegated credentials.  This
     715              :  * prevents a password from being picked up from .pgpass, a service file, the
     716              :  * environment, etc.  We don't want the postgres user's passwords,
     717              :  * certificates, etc to be accessible to non-superusers.  (See also
     718              :  * dblink_connstr_check in contrib/dblink.)
     719              :  */
     720              : static void
     721            0 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
     722              : {
     723            0 :         int                     i;
     724              : 
     725              :         /* no check required if superuser */
     726            0 :         if (superuser_arg(user->userid))
     727            0 :                 return;
     728              : 
     729              : #ifdef ENABLE_GSS
     730              :         /* ok if the user provided their own delegated credentials */
     731            0 :         if (be_gssapi_get_delegation(MyProcPort))
     732            0 :                 return;
     733              : #endif
     734              : 
     735              :         /* ok if params contain a non-empty password */
     736            0 :         for (i = 0; keywords[i] != NULL; i++)
     737              :         {
     738            0 :                 if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     739            0 :                         return;
     740            0 :         }
     741              : 
     742              :         /* ok if the superuser explicitly said so at user mapping creation time */
     743            0 :         if (!UserMappingPasswordRequired(user))
     744            0 :                 return;
     745              : 
     746              :         /*
     747              :          * Ok if SCRAM pass-through is being used and all required scram options
     748              :          * are set correctly. If pgfdw_has_required_scram_options returns true we
     749              :          * assume that UseScramPassthrough is also true since SCRAM options are
     750              :          * only set when UseScramPassthrough is enabled.
     751              :          */
     752            0 :         if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
     753            0 :                 return;
     754              : 
     755            0 :         ereport(ERROR,
     756              :                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     757              :                          errmsg("password or GSSAPI delegated credentials required"),
     758              :                          errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
     759            0 : }
     760              : 
     761              : /*
     762              :  * Issue SET commands to make sure remote session is configured properly.
     763              :  *
     764              :  * We do this just once at connection, assuming nothing will change the
     765              :  * values later.  Since we'll never send volatile function calls to the
     766              :  * remote, there shouldn't be any way to break this assumption from our end.
     767              :  * It's possible to think of ways to break it at the remote end, eg making
     768              :  * a foreign table point to a view that includes a set_config call ---
     769              :  * but once you admit the possibility of a malicious view definition,
     770              :  * there are any number of ways to break things.
     771              :  */
     772              : static void
     773            0 : configure_remote_session(PGconn *conn)
     774              : {
     775            0 :         int                     remoteversion = PQserverVersion(conn);
     776              : 
     777              :         /* Force the search path to contain only pg_catalog (see deparse.c) */
     778            0 :         do_sql_command(conn, "SET search_path = pg_catalog");
     779              : 
     780              :         /*
     781              :          * Set remote timezone; this is basically just cosmetic, since all
     782              :          * transmitted and returned timestamptzs should specify a zone explicitly
     783              :          * anyway.  However it makes the regression test outputs more predictable.
     784              :          *
     785              :          * We don't risk setting remote zone equal to ours, since the remote
     786              :          * server might use a different timezone database.  Instead, use GMT
     787              :          * (quoted, because very old servers are picky about case).  That's
     788              :          * guaranteed to work regardless of the remote's timezone database,
     789              :          * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
     790              :          */
     791            0 :         do_sql_command(conn, "SET timezone = 'GMT'");
     792              : 
     793              :         /*
     794              :          * Set values needed to ensure unambiguous data output from remote.  (This
     795              :          * logic should match what pg_dump does.  See also set_transmission_modes
     796              :          * in postgres_fdw.c.)
     797              :          */
     798            0 :         do_sql_command(conn, "SET datestyle = ISO");
     799            0 :         if (remoteversion >= 80400)
     800            0 :                 do_sql_command(conn, "SET intervalstyle = postgres");
     801            0 :         if (remoteversion >= 90000)
     802            0 :                 do_sql_command(conn, "SET extra_float_digits = 3");
     803              :         else
     804            0 :                 do_sql_command(conn, "SET extra_float_digits = 2");
     805            0 : }
     806              : 
     807              : /*
     808              :  * Convenience subroutine to issue a non-data-returning SQL command to remote
     809              :  */
     810              : void
     811            0 : do_sql_command(PGconn *conn, const char *sql)
     812              : {
     813            0 :         do_sql_command_begin(conn, sql);
     814            0 :         do_sql_command_end(conn, sql, false);
     815            0 : }
     816              : 
     817              : static void
     818            0 : do_sql_command_begin(PGconn *conn, const char *sql)
     819              : {
     820            0 :         if (!PQsendQuery(conn, sql))
     821            0 :                 pgfdw_report_error(NULL, conn, sql);
     822            0 : }
     823              : 
     824              : static void
     825            0 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
     826              : {
     827            0 :         PGresult   *res;
     828              : 
     829              :         /*
     830              :          * If requested, consume whatever data is available from the socket. (Note
     831              :          * that if all data is available, this allows pgfdw_get_result to call
     832              :          * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
     833              :          * would be large compared to the overhead of PQconsumeInput.)
     834              :          */
     835            0 :         if (consume_input && !PQconsumeInput(conn))
     836            0 :                 pgfdw_report_error(NULL, conn, sql);
     837            0 :         res = pgfdw_get_result(conn);
     838            0 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     839            0 :                 pgfdw_report_error(res, conn, sql);
     840            0 :         PQclear(res);
     841            0 : }
     842              : 
     843              : /*
     844              :  * Start remote transaction or subtransaction, if needed.
     845              :  *
     846              :  * Note that we always use at least REPEATABLE READ in the remote session.
     847              :  * This is so that, if a query initiates multiple scans of the same or
     848              :  * different foreign tables, we will get snapshot-consistent results from
     849              :  * those scans.  A disadvantage is that we can't provide sane emulation of
     850              :  * READ COMMITTED behavior --- it would be nice if we had some other way to
     851              :  * control which remote queries share a snapshot.
     852              :  */
     853              : static void
     854            0 : begin_remote_xact(ConnCacheEntry *entry)
     855              : {
     856            0 :         int                     curlevel = GetCurrentTransactionNestLevel();
     857              : 
     858              :         /* Start main transaction if we haven't yet */
     859            0 :         if (entry->xact_depth <= 0)
     860              :         {
     861            0 :                 const char *sql;
     862              : 
     863            0 :                 elog(DEBUG3, "starting remote transaction on connection %p",
     864              :                          entry->conn);
     865              : 
     866            0 :                 if (IsolationIsSerializable())
     867            0 :                         sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
     868              :                 else
     869            0 :                         sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
     870            0 :                 entry->changing_xact_state = true;
     871            0 :                 do_sql_command(entry->conn, sql);
     872            0 :                 entry->xact_depth = 1;
     873            0 :                 entry->changing_xact_state = false;
     874            0 :         }
     875              : 
     876              :         /*
     877              :          * If we're in a subtransaction, stack up savepoints to match our level.
     878              :          * This ensures we can rollback just the desired effects when a
     879              :          * subtransaction aborts.
     880              :          */
     881            0 :         while (entry->xact_depth < curlevel)
     882              :         {
     883            0 :                 char            sql[64];
     884              : 
     885            0 :                 snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
     886            0 :                 entry->changing_xact_state = true;
     887            0 :                 do_sql_command(entry->conn, sql);
     888            0 :                 entry->xact_depth++;
     889            0 :                 entry->changing_xact_state = false;
     890            0 :         }
     891            0 : }
     892              : 
     893              : /*
     894              :  * Release connection reference count created by calling GetConnection.
     895              :  */
     896              : void
     897            0 : ReleaseConnection(PGconn *conn)
     898              : {
     899              :         /*
     900              :          * Currently, we don't actually track connection references because all
     901              :          * cleanup is managed on a transaction or subtransaction basis instead. So
     902              :          * there's nothing to do here.
     903              :          */
     904            0 : }
     905              : 
     906              : /*
     907              :  * Assign a "unique" number for a cursor.
     908              :  *
     909              :  * These really only need to be unique per connection within a transaction.
     910              :  * For the moment we ignore the per-connection point and assign them across
     911              :  * all connections in the transaction, but we ask for the connection to be
     912              :  * supplied in case we want to refine that.
     913              :  *
     914              :  * Note that even if wraparound happens in a very long transaction, actual
     915              :  * collisions are highly improbable; just be sure to use %u not %d to print.
     916              :  */
     917              : unsigned int
     918            0 : GetCursorNumber(PGconn *conn)
     919              : {
     920            0 :         return ++cursor_number;
     921              : }
     922              : 
     923              : /*
     924              :  * Assign a "unique" number for a prepared statement.
     925              :  *
     926              :  * This works much like GetCursorNumber, except that we never reset the counter
     927              :  * within a session.  That's because we can't be 100% sure we've gotten rid
     928              :  * of all prepared statements on all connections, and it's not really worth
     929              :  * increasing the risk of prepared-statement name collisions by resetting.
     930              :  */
     931              : unsigned int
     932            0 : GetPrepStmtNumber(PGconn *conn)
     933              : {
     934            0 :         return ++prep_stmt_number;
     935              : }
     936              : 
     937              : /*
     938              :  * Submit a query and wait for the result.
     939              :  *
     940              :  * Since we don't use non-blocking mode, this can't process interrupts while
     941              :  * pushing the query text to the server.  That risk is relatively small, so we
     942              :  * ignore that for now.
     943              :  *
     944              :  * Caller is responsible for the error handling on the result.
     945              :  */
     946              : PGresult *
     947            0 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
     948              : {
     949              :         /* First, process a pending asynchronous request, if any. */
     950            0 :         if (state && state->pendingAreq)
     951            0 :                 process_pending_request(state->pendingAreq);
     952              : 
     953            0 :         if (!PQsendQuery(conn, query))
     954            0 :                 return NULL;
     955            0 :         return pgfdw_get_result(conn);
     956            0 : }
     957              : 
     958              : /*
     959              :  * Wrap libpqsrv_get_result_last(), adding wait event.
     960              :  *
     961              :  * Caller is responsible for the error handling on the result.
     962              :  */
     963              : PGresult *
     964            0 : pgfdw_get_result(PGconn *conn)
     965              : {
     966            0 :         return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
     967              : }
     968              : 
     969              : /*
     970              :  * Report an error we got from the remote server.
     971              :  *
     972              :  * Callers should use pgfdw_report_error() to throw an error, or use
     973              :  * pgfdw_report() for lesser message levels.  (We make this distinction
     974              :  * so that pgfdw_report_error() can be marked noreturn.)
     975              :  *
     976              :  * res: PGresult containing the error (might be NULL)
     977              :  * conn: connection we did the query on
     978              :  * sql: NULL, or text of remote command we tried to execute
     979              :  *
     980              :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
     981              :  * in which case memory context cleanup will clear it eventually).
     982              :  *
     983              :  * Note: callers that choose not to throw ERROR for a remote error are
     984              :  * responsible for making sure that the associated ConnCacheEntry gets
     985              :  * marked with have_error = true.
     986              :  */
     987              : void
     988            0 : pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
     989              : {
     990            0 :         pgfdw_report_internal(ERROR, res, conn, sql);
     991            0 :         pg_unreachable();
     992              : }
     993              : 
     994              : void
     995            0 : pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
     996              : {
     997            0 :         Assert(elevel < ERROR);              /* use pgfdw_report_error for that */
     998            0 :         pgfdw_report_internal(elevel, res, conn, sql);
     999            0 : }
    1000              : 
    1001              : static void
    1002            0 : pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
    1003              :                                           const char *sql)
    1004              : {
    1005            0 :         char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    1006            0 :         char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    1007            0 :         char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    1008            0 :         char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    1009            0 :         char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    1010            0 :         int                     sqlstate;
    1011              : 
    1012            0 :         if (diag_sqlstate)
    1013            0 :                 sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
    1014              :                                                                  diag_sqlstate[1],
    1015              :                                                                  diag_sqlstate[2],
    1016              :                                                                  diag_sqlstate[3],
    1017              :                                                                  diag_sqlstate[4]);
    1018              :         else
    1019            0 :                 sqlstate = ERRCODE_CONNECTION_FAILURE;
    1020              : 
    1021              :         /*
    1022              :          * If we don't get a message from the PGresult, try the PGconn.  This is
    1023              :          * needed because for connection-level failures, PQgetResult may just
    1024              :          * return NULL, not a PGresult at all.
    1025              :          */
    1026            0 :         if (message_primary == NULL)
    1027            0 :                 message_primary = pchomp(PQerrorMessage(conn));
    1028              : 
    1029            0 :         ereport(elevel,
    1030              :                         (errcode(sqlstate),
    1031              :                          (message_primary != NULL && message_primary[0] != '\0') ?
    1032              :                          errmsg_internal("%s", message_primary) :
    1033              :                          errmsg("could not obtain message string for remote error"),
    1034              :                          message_detail ? errdetail_internal("%s", message_detail) : 0,
    1035              :                          message_hint ? errhint("%s", message_hint) : 0,
    1036              :                          message_context ? errcontext("%s", message_context) : 0,
    1037              :                          sql ? errcontext("remote SQL command: %s", sql) : 0));
    1038            0 :         PQclear(res);
    1039            0 : }
    1040              : 
    1041              : /*
    1042              :  * pgfdw_xact_callback --- cleanup at main-transaction end.
    1043              :  *
    1044              :  * This runs just late enough that it must not enter user-defined code
    1045              :  * locally.  (Entering such code on the remote side is fine.  Its remote
    1046              :  * COMMIT TRANSACTION may run deferred triggers.)
    1047              :  */
    1048              : static void
    1049            0 : pgfdw_xact_callback(XactEvent event, void *arg)
    1050              : {
    1051            0 :         HASH_SEQ_STATUS scan;
    1052            0 :         ConnCacheEntry *entry;
    1053            0 :         List       *pending_entries = NIL;
    1054            0 :         List       *cancel_requested = NIL;
    1055              : 
    1056              :         /* Quick exit if no connections were touched in this transaction. */
    1057            0 :         if (!xact_got_connection)
    1058            0 :                 return;
    1059              : 
    1060              :         /*
    1061              :          * Scan all connection cache entries to find open remote transactions, and
    1062              :          * close them.
    1063              :          */
    1064            0 :         hash_seq_init(&scan, ConnectionHash);
    1065            0 :         while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1066              :         {
    1067            0 :                 PGresult   *res;
    1068              : 
    1069              :                 /* Ignore cache entry if no open connection right now */
    1070            0 :                 if (entry->conn == NULL)
    1071            0 :                         continue;
    1072              : 
    1073              :                 /* If it has an open remote transaction, try to close it */
    1074            0 :                 if (entry->xact_depth > 0)
    1075              :                 {
    1076            0 :                         elog(DEBUG3, "closing remote transaction on connection %p",
    1077              :                                  entry->conn);
    1078              : 
    1079            0 :                         switch (event)
    1080              :                         {
    1081              :                                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
    1082              :                                 case XACT_EVENT_PRE_COMMIT:
    1083              : 
    1084              :                                         /*
    1085              :                                          * If abort cleanup previously failed for this connection,
    1086              :                                          * we can't issue any more commands against it.
    1087              :                                          */
    1088            0 :                                         pgfdw_reject_incomplete_xact_state_change(entry);
    1089              : 
    1090              :                                         /* Commit all remote transactions during pre-commit */
    1091            0 :                                         entry->changing_xact_state = true;
    1092            0 :                                         if (entry->parallel_commit)
    1093              :                                         {
    1094            0 :                                                 do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
    1095            0 :                                                 pending_entries = lappend(pending_entries, entry);
    1096            0 :                                                 continue;
    1097              :                                         }
    1098            0 :                                         do_sql_command(entry->conn, "COMMIT TRANSACTION");
    1099            0 :                                         entry->changing_xact_state = false;
    1100              : 
    1101              :                                         /*
    1102              :                                          * If there were any errors in subtransactions, and we
    1103              :                                          * made prepared statements, do a DEALLOCATE ALL to make
    1104              :                                          * sure we get rid of all prepared statements. This is
    1105              :                                          * annoying and not terribly bulletproof, but it's
    1106              :                                          * probably not worth trying harder.
    1107              :                                          *
    1108              :                                          * DEALLOCATE ALL only exists in 8.3 and later, so this
    1109              :                                          * constrains how old a server postgres_fdw can
    1110              :                                          * communicate with.  We intentionally ignore errors in
    1111              :                                          * the DEALLOCATE, so that we can hobble along to some
    1112              :                                          * extent with older servers (leaking prepared statements
    1113              :                                          * as we go; but we don't really support update operations
    1114              :                                          * pre-8.3 anyway).
    1115              :                                          */
    1116            0 :                                         if (entry->have_prep_stmt && entry->have_error)
    1117              :                                         {
    1118            0 :                                                 res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
    1119              :                                                                                            NULL);
    1120            0 :                                                 PQclear(res);
    1121            0 :                                         }
    1122            0 :                                         entry->have_prep_stmt = false;
    1123            0 :                                         entry->have_error = false;
    1124            0 :                                         break;
    1125              :                                 case XACT_EVENT_PRE_PREPARE:
    1126              : 
    1127              :                                         /*
    1128              :                                          * We disallow any remote transactions, since it's not
    1129              :                                          * very reasonable to hold them open until the prepared
    1130              :                                          * transaction is committed.  For the moment, throw error
    1131              :                                          * unconditionally; later we might allow read-only cases.
    1132              :                                          * Note that the error will cause us to come right back
    1133              :                                          * here with event == XACT_EVENT_ABORT, so we'll clean up
    1134              :                                          * the connection state at that point.
    1135              :                                          */
    1136            0 :                                         ereport(ERROR,
    1137              :                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1138              :                                                          errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
    1139            0 :                                         break;
    1140              :                                 case XACT_EVENT_PARALLEL_COMMIT:
    1141              :                                 case XACT_EVENT_COMMIT:
    1142              :                                 case XACT_EVENT_PREPARE:
    1143              :                                         /* Pre-commit should have closed the open transaction */
    1144            0 :                                         elog(ERROR, "missed cleaning up connection during pre-commit");
    1145            0 :                                         break;
    1146              :                                 case XACT_EVENT_PARALLEL_ABORT:
    1147              :                                 case XACT_EVENT_ABORT:
    1148              :                                         /* Rollback all remote transactions during abort */
    1149            0 :                                         if (entry->parallel_abort)
    1150              :                                         {
    1151            0 :                                                 if (pgfdw_abort_cleanup_begin(entry, true,
    1152              :                                                                                                           &pending_entries,
    1153              :                                                                                                           &cancel_requested))
    1154            0 :                                                         continue;
    1155            0 :                                         }
    1156              :                                         else
    1157            0 :                                                 pgfdw_abort_cleanup(entry, true);
    1158            0 :                                         break;
    1159              :                         }
    1160            0 :                 }
    1161              : 
    1162              :                 /* Reset state to show we're out of a transaction */
    1163            0 :                 pgfdw_reset_xact_state(entry, true);
    1164            0 :         }
    1165              : 
    1166              :         /* If there are any pending connections, finish cleaning them up */
    1167            0 :         if (pending_entries || cancel_requested)
    1168              :         {
    1169            0 :                 if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
    1170            0 :                         event == XACT_EVENT_PRE_COMMIT)
    1171              :                 {
    1172            0 :                         Assert(cancel_requested == NIL);
    1173            0 :                         pgfdw_finish_pre_commit_cleanup(pending_entries);
    1174            0 :                 }
    1175              :                 else
    1176              :                 {
    1177            0 :                         Assert(event == XACT_EVENT_PARALLEL_ABORT ||
    1178              :                                    event == XACT_EVENT_ABORT);
    1179            0 :                         pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1180              :                                                                            true);
    1181              :                 }
    1182            0 :         }
    1183              : 
    1184              :         /*
    1185              :          * Regardless of the event type, we can now mark ourselves as out of the
    1186              :          * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
    1187              :          * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
    1188              :          */
    1189            0 :         xact_got_connection = false;
    1190              : 
    1191              :         /* Also reset cursor numbering for next transaction */
    1192            0 :         cursor_number = 0;
    1193            0 : }
    1194              : 
    1195              : /*
    1196              :  * pgfdw_subxact_callback --- cleanup at subtransaction end.
    1197              :  */
    1198              : static void
    1199            0 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
    1200              :                                            SubTransactionId parentSubid, void *arg)
    1201              : {
    1202            0 :         HASH_SEQ_STATUS scan;
    1203            0 :         ConnCacheEntry *entry;
    1204            0 :         int                     curlevel;
    1205            0 :         List       *pending_entries = NIL;
    1206            0 :         List       *cancel_requested = NIL;
    1207              : 
    1208              :         /* Nothing to do at subxact start, nor after commit. */
    1209            0 :         if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
    1210            0 :                   event == SUBXACT_EVENT_ABORT_SUB))
    1211            0 :                 return;
    1212              : 
    1213              :         /* Quick exit if no connections were touched in this transaction. */
    1214            0 :         if (!xact_got_connection)
    1215            0 :                 return;
    1216              : 
    1217              :         /*
    1218              :          * Scan all connection cache entries to find open remote subtransactions
    1219              :          * of the current level, and close them.
    1220              :          */
    1221            0 :         curlevel = GetCurrentTransactionNestLevel();
    1222            0 :         hash_seq_init(&scan, ConnectionHash);
    1223            0 :         while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1224              :         {
    1225            0 :                 char            sql[100];
    1226              : 
    1227              :                 /*
    1228              :                  * We only care about connections with open remote subtransactions of
    1229              :                  * the current level.
    1230              :                  */
    1231            0 :                 if (entry->conn == NULL || entry->xact_depth < curlevel)
    1232            0 :                         continue;
    1233              : 
    1234            0 :                 if (entry->xact_depth > curlevel)
    1235            0 :                         elog(ERROR, "missed cleaning up remote subtransaction at level %d",
    1236              :                                  entry->xact_depth);
    1237              : 
    1238            0 :                 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1239              :                 {
    1240              :                         /*
    1241              :                          * If abort cleanup previously failed for this connection, we
    1242              :                          * can't issue any more commands against it.
    1243              :                          */
    1244            0 :                         pgfdw_reject_incomplete_xact_state_change(entry);
    1245              : 
    1246              :                         /* Commit all remote subtransactions during pre-commit */
    1247            0 :                         snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1248            0 :                         entry->changing_xact_state = true;
    1249            0 :                         if (entry->parallel_commit)
    1250              :                         {
    1251            0 :                                 do_sql_command_begin(entry->conn, sql);
    1252            0 :                                 pending_entries = lappend(pending_entries, entry);
    1253            0 :                                 continue;
    1254              :                         }
    1255            0 :                         do_sql_command(entry->conn, sql);
    1256            0 :                         entry->changing_xact_state = false;
    1257            0 :                 }
    1258              :                 else
    1259              :                 {
    1260              :                         /* Rollback all remote subtransactions during abort */
    1261            0 :                         if (entry->parallel_abort)
    1262              :                         {
    1263            0 :                                 if (pgfdw_abort_cleanup_begin(entry, false,
    1264              :                                                                                           &pending_entries,
    1265              :                                                                                           &cancel_requested))
    1266            0 :                                         continue;
    1267            0 :                         }
    1268              :                         else
    1269            0 :                                 pgfdw_abort_cleanup(entry, false);
    1270              :                 }
    1271              : 
    1272              :                 /* OK, we're outta that level of subtransaction */
    1273            0 :                 pgfdw_reset_xact_state(entry, false);
    1274            0 :         }
    1275              : 
    1276              :         /* If there are any pending connections, finish cleaning them up */
    1277            0 :         if (pending_entries || cancel_requested)
    1278              :         {
    1279            0 :                 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1280              :                 {
    1281            0 :                         Assert(cancel_requested == NIL);
    1282            0 :                         pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
    1283            0 :                 }
    1284              :                 else
    1285              :                 {
    1286            0 :                         Assert(event == SUBXACT_EVENT_ABORT_SUB);
    1287            0 :                         pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1288              :                                                                            false);
    1289              :                 }
    1290            0 :         }
    1291            0 : }
    1292              : 
    1293              : /*
    1294              :  * Connection invalidation callback function
    1295              :  *
    1296              :  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
    1297              :  * close connections depending on that entry immediately if current transaction
    1298              :  * has not used those connections yet. Otherwise, mark those connections as
    1299              :  * invalid and then make pgfdw_xact_callback() close them at the end of current
    1300              :  * transaction, since they cannot be closed in the midst of the transaction
    1301              :  * using them. Closed connections will be remade at the next opportunity if
    1302              :  * necessary.
    1303              :  *
    1304              :  * Although most cache invalidation callbacks blow away all the related stuff
    1305              :  * regardless of the given hashvalue, connections are expensive enough that
    1306              :  * it's worth trying to avoid that.
    1307              :  *
    1308              :  * NB: We could avoid unnecessary disconnection more strictly by examining
    1309              :  * individual option values, but it seems too much effort for the gain.
    1310              :  */
    1311              : static void
    1312            0 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
    1313              : {
    1314            0 :         HASH_SEQ_STATUS scan;
    1315            0 :         ConnCacheEntry *entry;
    1316              : 
    1317            0 :         Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
    1318              : 
    1319              :         /* ConnectionHash must exist already, if we're registered */
    1320            0 :         hash_seq_init(&scan, ConnectionHash);
    1321            0 :         while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1322              :         {
    1323              :                 /* Ignore invalid entries */
    1324            0 :                 if (entry->conn == NULL)
    1325            0 :                         continue;
    1326              : 
    1327              :                 /* hashvalue == 0 means a cache reset, must clear all state */
    1328            0 :                 if (hashvalue == 0 ||
    1329            0 :                         (cacheid == FOREIGNSERVEROID &&
    1330            0 :                          entry->server_hashvalue == hashvalue) ||
    1331            0 :                         (cacheid == USERMAPPINGOID &&
    1332            0 :                          entry->mapping_hashvalue == hashvalue))
    1333              :                 {
    1334              :                         /*
    1335              :                          * Close the connection immediately if it's not used yet in this
    1336              :                          * transaction. Otherwise mark it as invalid so that
    1337              :                          * pgfdw_xact_callback() can close it at the end of this
    1338              :                          * transaction.
    1339              :                          */
    1340            0 :                         if (entry->xact_depth == 0)
    1341              :                         {
    1342            0 :                                 elog(DEBUG3, "discarding connection %p", entry->conn);
    1343            0 :                                 disconnect_pg_server(entry);
    1344            0 :                         }
    1345              :                         else
    1346            0 :                                 entry->invalidated = true;
    1347            0 :                 }
    1348              :         }
    1349            0 : }
    1350              : 
    1351              : /*
    1352              :  * Raise an error if the given connection cache entry is marked as being
    1353              :  * in the middle of an xact state change.  This should be called at which no
    1354              :  * such change is expected to be in progress; if one is found to be in
    1355              :  * progress, it means that we aborted in the middle of a previous state change
    1356              :  * and now don't know what the remote transaction state actually is.
    1357              :  * Such connections can't safely be further used.  Re-establishing the
    1358              :  * connection would change the snapshot and roll back any writes already
    1359              :  * performed, so that's not an option, either. Thus, we must abort.
    1360              :  */
    1361              : static void
    1362            0 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
    1363              : {
    1364            0 :         ForeignServer *server;
    1365              : 
    1366              :         /* nothing to do for inactive entries and entries of sane state */
    1367            0 :         if (entry->conn == NULL || !entry->changing_xact_state)
    1368            0 :                 return;
    1369              : 
    1370              :         /* make sure this entry is inactive */
    1371            0 :         disconnect_pg_server(entry);
    1372              : 
    1373              :         /* find server name to be shown in the message below */
    1374            0 :         server = GetForeignServer(entry->serverid);
    1375              : 
    1376            0 :         ereport(ERROR,
    1377              :                         (errcode(ERRCODE_CONNECTION_EXCEPTION),
    1378              :                          errmsg("connection to server \"%s\" was lost",
    1379              :                                         server->servername)));
    1380            0 : }
    1381              : 
    1382              : /*
    1383              :  * Reset state to show we're out of a (sub)transaction.
    1384              :  */
    1385              : static void
    1386            0 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
    1387              : {
    1388            0 :         if (toplevel)
    1389              :         {
    1390              :                 /* Reset state to show we're out of a transaction */
    1391            0 :                 entry->xact_depth = 0;
    1392              : 
    1393              :                 /*
    1394              :                  * If the connection isn't in a good idle state, it is marked as
    1395              :                  * invalid or keep_connections option of its server is disabled, then
    1396              :                  * discard it to recover. Next GetConnection will open a new
    1397              :                  * connection.
    1398              :                  */
    1399            0 :                 if (PQstatus(entry->conn) != CONNECTION_OK ||
    1400            0 :                         PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
    1401            0 :                         entry->changing_xact_state ||
    1402            0 :                         entry->invalidated ||
    1403            0 :                         !entry->keep_connections)
    1404              :                 {
    1405            0 :                         elog(DEBUG3, "discarding connection %p", entry->conn);
    1406            0 :                         disconnect_pg_server(entry);
    1407            0 :                 }
    1408            0 :         }
    1409              :         else
    1410              :         {
    1411              :                 /* Reset state to show we're out of a subtransaction */
    1412            0 :                 entry->xact_depth--;
    1413              :         }
    1414            0 : }
    1415              : 
    1416              : /*
    1417              :  * Cancel the currently-in-progress query (whose query text we do not have)
    1418              :  * and ignore the result.  Returns true if we successfully cancel the query
    1419              :  * and discard any pending result, and false if not.
    1420              :  *
    1421              :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1422              :  * recursion trouble, we'll end up slamming the connection shut, which will
    1423              :  * necessitate failing the entire toplevel transaction even if subtransactions
    1424              :  * were used.  Try to use WARNING where we can.
    1425              :  *
    1426              :  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
    1427              :  * query text from the pendingAreq saved in the per-connection state, then
    1428              :  * report the query using it.
    1429              :  */
    1430              : static bool
    1431            0 : pgfdw_cancel_query(PGconn *conn)
    1432              : {
    1433            0 :         TimestampTz now = GetCurrentTimestamp();
    1434            0 :         TimestampTz endtime;
    1435            0 :         TimestampTz retrycanceltime;
    1436              : 
    1437              :         /*
    1438              :          * If it takes too long to cancel the query and discard the result, assume
    1439              :          * the connection is dead.
    1440              :          */
    1441            0 :         endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
    1442              : 
    1443              :         /*
    1444              :          * Also, lose patience and re-issue the cancel request after a little bit.
    1445              :          * (This serves to close some race conditions.)
    1446              :          */
    1447            0 :         retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
    1448              : 
    1449            0 :         if (!pgfdw_cancel_query_begin(conn, endtime))
    1450            0 :                 return false;
    1451            0 :         return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
    1452            0 : }
    1453              : 
    1454              : /*
    1455              :  * Submit a cancel request to the given connection, waiting only until
    1456              :  * the given time.
    1457              :  *
    1458              :  * We sleep interruptibly until we receive confirmation that the cancel
    1459              :  * request has been accepted, and if it is, return true; if the timeout
    1460              :  * lapses without that, or the request fails for whatever reason, return
    1461              :  * false.
    1462              :  */
    1463              : static bool
    1464            0 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
    1465              : {
    1466            0 :         const char *errormsg = libpqsrv_cancel(conn, endtime);
    1467              : 
    1468            0 :         if (errormsg != NULL)
    1469            0 :                 ereport(WARNING,
    1470              :                                 errcode(ERRCODE_CONNECTION_FAILURE),
    1471              :                                 errmsg("could not send cancel request: %s", errormsg));
    1472              : 
    1473            0 :         return errormsg == NULL;
    1474            0 : }
    1475              : 
    1476              : static bool
    1477            0 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
    1478              :                                            TimestampTz retrycanceltime, bool consume_input)
    1479              : {
    1480            0 :         PGresult   *result;
    1481            0 :         bool            timed_out;
    1482              : 
    1483              :         /*
    1484              :          * If requested, consume whatever data is available from the socket. (Note
    1485              :          * that if all data is available, this allows pgfdw_get_cleanup_result to
    1486              :          * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1487              :          * which would be large compared to the overhead of PQconsumeInput.)
    1488              :          */
    1489            0 :         if (consume_input && !PQconsumeInput(conn))
    1490              :         {
    1491            0 :                 ereport(WARNING,
    1492              :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1493              :                                  errmsg("could not get result of cancel request: %s",
    1494              :                                                 pchomp(PQerrorMessage(conn)))));
    1495            0 :                 return false;
    1496              :         }
    1497              : 
    1498              :         /* Get and discard the result of the query. */
    1499            0 :         if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
    1500              :                                                                  &result, &timed_out))
    1501              :         {
    1502            0 :                 if (timed_out)
    1503            0 :                         ereport(WARNING,
    1504              :                                         (errmsg("could not get result of cancel request due to timeout")));
    1505              :                 else
    1506            0 :                         ereport(WARNING,
    1507              :                                         (errcode(ERRCODE_CONNECTION_FAILURE),
    1508              :                                          errmsg("could not get result of cancel request: %s",
    1509              :                                                         pchomp(PQerrorMessage(conn)))));
    1510              : 
    1511            0 :                 return false;
    1512              :         }
    1513            0 :         PQclear(result);
    1514              : 
    1515            0 :         return true;
    1516            0 : }
    1517              : 
    1518              : /*
    1519              :  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
    1520              :  * result.  If the query is executed without error, the return value is true.
    1521              :  * If the query is executed successfully but returns an error, the return
    1522              :  * value is true if and only if ignore_errors is set.  If the query can't be
    1523              :  * sent or times out, the return value is false.
    1524              :  *
    1525              :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1526              :  * recursion trouble, we'll end up slamming the connection shut, which will
    1527              :  * necessitate failing the entire toplevel transaction even if subtransactions
    1528              :  * were used.  Try to use WARNING where we can.
    1529              :  */
    1530              : static bool
    1531            0 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
    1532              : {
    1533            0 :         TimestampTz endtime;
    1534              : 
    1535              :         /*
    1536              :          * If it takes too long to execute a cleanup query, assume the connection
    1537              :          * is dead.  It's fairly likely that this is why we aborted in the first
    1538              :          * place (e.g. statement timeout, user cancel), so the timeout shouldn't
    1539              :          * be too long.
    1540              :          */
    1541            0 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1542              :                                                                                   CONNECTION_CLEANUP_TIMEOUT);
    1543              : 
    1544            0 :         if (!pgfdw_exec_cleanup_query_begin(conn, query))
    1545            0 :                 return false;
    1546            0 :         return pgfdw_exec_cleanup_query_end(conn, query, endtime,
    1547            0 :                                                                                 false, ignore_errors);
    1548            0 : }
    1549              : 
    1550              : static bool
    1551            0 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
    1552              : {
    1553            0 :         Assert(query != NULL);
    1554              : 
    1555              :         /*
    1556              :          * Submit a query.  Since we don't use non-blocking mode, this also can
    1557              :          * block.  But its risk is relatively small, so we ignore that for now.
    1558              :          */
    1559            0 :         if (!PQsendQuery(conn, query))
    1560              :         {
    1561            0 :                 pgfdw_report(WARNING, NULL, conn, query);
    1562            0 :                 return false;
    1563              :         }
    1564              : 
    1565            0 :         return true;
    1566            0 : }
    1567              : 
    1568              : static bool
    1569            0 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
    1570              :                                                          TimestampTz endtime, bool consume_input,
    1571              :                                                          bool ignore_errors)
    1572              : {
    1573            0 :         PGresult   *result;
    1574            0 :         bool            timed_out;
    1575              : 
    1576            0 :         Assert(query != NULL);
    1577              : 
    1578              :         /*
    1579              :          * If requested, consume whatever data is available from the socket. (Note
    1580              :          * that if all data is available, this allows pgfdw_get_cleanup_result to
    1581              :          * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1582              :          * which would be large compared to the overhead of PQconsumeInput.)
    1583              :          */
    1584            0 :         if (consume_input && !PQconsumeInput(conn))
    1585              :         {
    1586            0 :                 pgfdw_report(WARNING, NULL, conn, query);
    1587            0 :                 return false;
    1588              :         }
    1589              : 
    1590              :         /* Get the result of the query. */
    1591            0 :         if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
    1592              :         {
    1593            0 :                 if (timed_out)
    1594            0 :                         ereport(WARNING,
    1595              :                                         (errmsg("could not get query result due to timeout"),
    1596              :                                          errcontext("remote SQL command: %s", query)));
    1597              :                 else
    1598            0 :                         pgfdw_report(WARNING, NULL, conn, query);
    1599              : 
    1600            0 :                 return false;
    1601              :         }
    1602              : 
    1603              :         /* Issue a warning if not successful. */
    1604            0 :         if (PQresultStatus(result) != PGRES_COMMAND_OK)
    1605              :         {
    1606            0 :                 pgfdw_report(WARNING, result, conn, query);
    1607            0 :                 return ignore_errors;
    1608              :         }
    1609            0 :         PQclear(result);
    1610              : 
    1611            0 :         return true;
    1612            0 : }
    1613              : 
    1614              : /*
    1615              :  * Get, during abort cleanup, the result of a query that is in progress.
    1616              :  * This might be a query that is being interrupted by a cancel request or by
    1617              :  * transaction abort, or it might be a query that was initiated as part of
    1618              :  * transaction abort to get the remote side back to the appropriate state.
    1619              :  *
    1620              :  * endtime is the time at which we should give up and assume the remote side
    1621              :  * is dead.  retrycanceltime is the time at which we should issue a fresh
    1622              :  * cancel request (pass the same value as endtime if this is not wanted).
    1623              :  *
    1624              :  * Returns true if the timeout expired or connection trouble occurred,
    1625              :  * false otherwise.  Sets *result except in case of a true result.
    1626              :  * Sets *timed_out to true only when the timeout expired.
    1627              :  */
    1628              : static bool
    1629            0 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
    1630              :                                                  TimestampTz retrycanceltime,
    1631              :                                                  PGresult **result,
    1632              :                                                  bool *timed_out)
    1633              : {
    1634            0 :         bool            failed = false;
    1635            0 :         PGresult   *last_res = NULL;
    1636            0 :         int                     canceldelta = RETRY_CANCEL_TIMEOUT * 2;
    1637              : 
    1638            0 :         *result = NULL;
    1639            0 :         *timed_out = false;
    1640            0 :         for (;;)
    1641              :         {
    1642            0 :                 PGresult   *res;
    1643              : 
    1644            0 :                 while (PQisBusy(conn))
    1645              :                 {
    1646            0 :                         int                     wc;
    1647            0 :                         TimestampTz now = GetCurrentTimestamp();
    1648            0 :                         long            cur_timeout;
    1649              : 
    1650              :                         /* If timeout has expired, give up. */
    1651            0 :                         if (now >= endtime)
    1652              :                         {
    1653            0 :                                 *timed_out = true;
    1654            0 :                                 failed = true;
    1655            0 :                                 goto exit;
    1656              :                         }
    1657              : 
    1658              :                         /* If we need to re-issue the cancel request, do that. */
    1659            0 :                         if (now >= retrycanceltime)
    1660              :                         {
    1661              :                                 /* We ignore failure to issue the repeated request. */
    1662            0 :                                 (void) libpqsrv_cancel(conn, endtime);
    1663              : 
    1664              :                                 /* Recompute "now" in case that took measurable time. */
    1665            0 :                                 now = GetCurrentTimestamp();
    1666              : 
    1667              :                                 /* Adjust re-cancel timeout in increasing steps. */
    1668            0 :                                 retrycanceltime = TimestampTzPlusMilliseconds(now,
    1669              :                                                                                                                           canceldelta);
    1670            0 :                                 canceldelta += canceldelta;
    1671            0 :                         }
    1672              : 
    1673              :                         /* If timeout has expired, give up, else get sleep time. */
    1674            0 :                         cur_timeout = TimestampDifferenceMilliseconds(now,
    1675            0 :                                                                                                                   Min(endtime,
    1676              :                                                                                                                           retrycanceltime));
    1677            0 :                         if (cur_timeout <= 0)
    1678              :                         {
    1679            0 :                                 *timed_out = true;
    1680            0 :                                 failed = true;
    1681            0 :                                 goto exit;
    1682              :                         }
    1683              : 
    1684              :                         /* first time, allocate or get the custom wait event */
    1685            0 :                         if (pgfdw_we_cleanup_result == 0)
    1686            0 :                                 pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
    1687              : 
    1688              :                         /* Sleep until there's something to do */
    1689            0 :                         wc = WaitLatchOrSocket(MyLatch,
    1690              :                                                                    WL_LATCH_SET | WL_SOCKET_READABLE |
    1691              :                                                                    WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1692            0 :                                                                    PQsocket(conn),
    1693            0 :                                                                    cur_timeout, pgfdw_we_cleanup_result);
    1694            0 :                         ResetLatch(MyLatch);
    1695              : 
    1696            0 :                         CHECK_FOR_INTERRUPTS();
    1697              : 
    1698              :                         /* Data available in socket? */
    1699            0 :                         if (wc & WL_SOCKET_READABLE)
    1700              :                         {
    1701            0 :                                 if (!PQconsumeInput(conn))
    1702              :                                 {
    1703              :                                         /* connection trouble */
    1704            0 :                                         failed = true;
    1705            0 :                                         goto exit;
    1706              :                                 }
    1707            0 :                         }
    1708            0 :                 }
    1709              : 
    1710            0 :                 res = PQgetResult(conn);
    1711            0 :                 if (res == NULL)
    1712            0 :                         break;                          /* query is complete */
    1713              : 
    1714            0 :                 PQclear(last_res);
    1715            0 :                 last_res = res;
    1716            0 :         }
    1717              : exit:
    1718            0 :         if (failed)
    1719            0 :                 PQclear(last_res);
    1720              :         else
    1721            0 :                 *result = last_res;
    1722            0 :         return failed;
    1723            0 : }
    1724              : 
    1725              : /*
    1726              :  * Abort remote transaction or subtransaction.
    1727              :  *
    1728              :  * "toplevel" should be set to true if toplevel (main) transaction is
    1729              :  * rollbacked, false otherwise.
    1730              :  *
    1731              :  * Set entry->changing_xact_state to false on success, true on failure.
    1732              :  */
    1733              : static void
    1734            0 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
    1735              : {
    1736            0 :         char            sql[100];
    1737              : 
    1738              :         /*
    1739              :          * Don't try to clean up the connection if we're already in error
    1740              :          * recursion trouble.
    1741              :          */
    1742            0 :         if (in_error_recursion_trouble())
    1743            0 :                 entry->changing_xact_state = true;
    1744              : 
    1745              :         /*
    1746              :          * If connection is already unsalvageable, don't touch it further.
    1747              :          */
    1748            0 :         if (entry->changing_xact_state)
    1749            0 :                 return;
    1750              : 
    1751              :         /*
    1752              :          * Mark this connection as in the process of changing transaction state.
    1753              :          */
    1754            0 :         entry->changing_xact_state = true;
    1755              : 
    1756              :         /* Assume we might have lost track of prepared statements */
    1757            0 :         entry->have_error = true;
    1758              : 
    1759              :         /*
    1760              :          * If a command has been submitted to the remote server by using an
    1761              :          * asynchronous execution function, the command might not have yet
    1762              :          * completed.  Check to see if a command is still being processed by the
    1763              :          * remote server, and if so, request cancellation of the command.
    1764              :          */
    1765            0 :         if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
    1766            0 :                 !pgfdw_cancel_query(entry->conn))
    1767            0 :                 return;                                 /* Unable to cancel running query */
    1768              : 
    1769            0 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1770            0 :         if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
    1771            0 :                 return;                                 /* Unable to abort remote (sub)transaction */
    1772              : 
    1773            0 :         if (toplevel)
    1774              :         {
    1775            0 :                 if (entry->have_prep_stmt && entry->have_error &&
    1776            0 :                         !pgfdw_exec_cleanup_query(entry->conn,
    1777              :                                                                           "DEALLOCATE ALL",
    1778              :                                                                           true))
    1779            0 :                         return;                         /* Trouble clearing prepared statements */
    1780              : 
    1781            0 :                 entry->have_prep_stmt = false;
    1782            0 :                 entry->have_error = false;
    1783            0 :         }
    1784              : 
    1785              :         /*
    1786              :          * If pendingAreq of the per-connection state is not NULL, it means that
    1787              :          * an asynchronous fetch begun by fetch_more_data_begin() was not done
    1788              :          * successfully and thus the per-connection state was not reset in
    1789              :          * fetch_more_data(); in that case reset the per-connection state here.
    1790              :          */
    1791            0 :         if (entry->state.pendingAreq)
    1792            0 :                 memset(&entry->state, 0, sizeof(entry->state));
    1793              : 
    1794              :         /* Disarm changing_xact_state if it all worked */
    1795            0 :         entry->changing_xact_state = false;
    1796            0 : }
    1797              : 
    1798              : /*
    1799              :  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
    1800              :  * don't wait for the result.
    1801              :  *
    1802              :  * Returns true if the abort command or cancel request is successfully issued,
    1803              :  * false otherwise.  If the abort command is successfully issued, the given
    1804              :  * connection cache entry is appended to *pending_entries.  Otherwise, if the
    1805              :  * cancel request is successfully issued, it is appended to *cancel_requested.
    1806              :  */
    1807              : static bool
    1808            0 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
    1809              :                                                   List **pending_entries, List **cancel_requested)
    1810              : {
    1811              :         /*
    1812              :          * Don't try to clean up the connection if we're already in error
    1813              :          * recursion trouble.
    1814              :          */
    1815            0 :         if (in_error_recursion_trouble())
    1816            0 :                 entry->changing_xact_state = true;
    1817              : 
    1818              :         /*
    1819              :          * If connection is already unsalvageable, don't touch it further.
    1820              :          */
    1821            0 :         if (entry->changing_xact_state)
    1822            0 :                 return false;
    1823              : 
    1824              :         /*
    1825              :          * Mark this connection as in the process of changing transaction state.
    1826              :          */
    1827            0 :         entry->changing_xact_state = true;
    1828              : 
    1829              :         /* Assume we might have lost track of prepared statements */
    1830            0 :         entry->have_error = true;
    1831              : 
    1832              :         /*
    1833              :          * If a command has been submitted to the remote server by using an
    1834              :          * asynchronous execution function, the command might not have yet
    1835              :          * completed.  Check to see if a command is still being processed by the
    1836              :          * remote server, and if so, request cancellation of the command.
    1837              :          */
    1838            0 :         if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
    1839              :         {
    1840            0 :                 TimestampTz endtime;
    1841              : 
    1842            0 :                 endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1843              :                                                                                           CONNECTION_CLEANUP_TIMEOUT);
    1844            0 :                 if (!pgfdw_cancel_query_begin(entry->conn, endtime))
    1845            0 :                         return false;           /* Unable to cancel running query */
    1846            0 :                 *cancel_requested = lappend(*cancel_requested, entry);
    1847            0 :         }
    1848              :         else
    1849              :         {
    1850            0 :                 char            sql[100];
    1851              : 
    1852            0 :                 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1853            0 :                 if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1854            0 :                         return false;           /* Unable to abort remote transaction */
    1855            0 :                 *pending_entries = lappend(*pending_entries, entry);
    1856            0 :         }
    1857              : 
    1858            0 :         return true;
    1859            0 : }
    1860              : 
    1861              : /*
    1862              :  * Finish pre-commit cleanup of connections on each of which we've sent a
    1863              :  * COMMIT command to the remote server.
    1864              :  */
    1865              : static void
    1866            0 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
    1867              : {
    1868            0 :         ConnCacheEntry *entry;
    1869            0 :         List       *pending_deallocs = NIL;
    1870            0 :         ListCell   *lc;
    1871              : 
    1872            0 :         Assert(pending_entries);
    1873              : 
    1874              :         /*
    1875              :          * Get the result of the COMMIT command for each of the pending entries
    1876              :          */
    1877            0 :         foreach(lc, pending_entries)
    1878              :         {
    1879            0 :                 entry = (ConnCacheEntry *) lfirst(lc);
    1880              : 
    1881            0 :                 Assert(entry->changing_xact_state);
    1882              : 
    1883              :                 /*
    1884              :                  * We might already have received the result on the socket, so pass
    1885              :                  * consume_input=true to try to consume it first
    1886              :                  */
    1887            0 :                 do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
    1888            0 :                 entry->changing_xact_state = false;
    1889              : 
    1890              :                 /* Do a DEALLOCATE ALL in parallel if needed */
    1891            0 :                 if (entry->have_prep_stmt && entry->have_error)
    1892              :                 {
    1893              :                         /* Ignore errors (see notes in pgfdw_xact_callback) */
    1894            0 :                         if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
    1895              :                         {
    1896            0 :                                 pending_deallocs = lappend(pending_deallocs, entry);
    1897            0 :                                 continue;
    1898              :                         }
    1899            0 :                 }
    1900            0 :                 entry->have_prep_stmt = false;
    1901            0 :                 entry->have_error = false;
    1902              : 
    1903            0 :                 pgfdw_reset_xact_state(entry, true);
    1904            0 :         }
    1905              : 
    1906              :         /* No further work if no pending entries */
    1907            0 :         if (!pending_deallocs)
    1908            0 :                 return;
    1909              : 
    1910              :         /*
    1911              :          * Get the result of the DEALLOCATE command for each of the pending
    1912              :          * entries
    1913              :          */
    1914            0 :         foreach(lc, pending_deallocs)
    1915              :         {
    1916            0 :                 PGresult   *res;
    1917              : 
    1918            0 :                 entry = (ConnCacheEntry *) lfirst(lc);
    1919              : 
    1920              :                 /* Ignore errors (see notes in pgfdw_xact_callback) */
    1921            0 :                 while ((res = PQgetResult(entry->conn)) != NULL)
    1922              :                 {
    1923            0 :                         PQclear(res);
    1924              :                         /* Stop if the connection is lost (else we'll loop infinitely) */
    1925            0 :                         if (PQstatus(entry->conn) == CONNECTION_BAD)
    1926            0 :                                 break;
    1927              :                 }
    1928            0 :                 entry->have_prep_stmt = false;
    1929            0 :                 entry->have_error = false;
    1930              : 
    1931            0 :                 pgfdw_reset_xact_state(entry, true);
    1932            0 :         }
    1933            0 : }
    1934              : 
    1935              : /*
    1936              :  * Finish pre-subcommit cleanup of connections on each of which we've sent a
    1937              :  * RELEASE command to the remote server.
    1938              :  */
    1939              : static void
    1940            0 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
    1941              : {
    1942            0 :         ConnCacheEntry *entry;
    1943            0 :         char            sql[100];
    1944            0 :         ListCell   *lc;
    1945              : 
    1946            0 :         Assert(pending_entries);
    1947              : 
    1948              :         /*
    1949              :          * Get the result of the RELEASE command for each of the pending entries
    1950              :          */
    1951            0 :         snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1952            0 :         foreach(lc, pending_entries)
    1953              :         {
    1954            0 :                 entry = (ConnCacheEntry *) lfirst(lc);
    1955              : 
    1956            0 :                 Assert(entry->changing_xact_state);
    1957              : 
    1958              :                 /*
    1959              :                  * We might already have received the result on the socket, so pass
    1960              :                  * consume_input=true to try to consume it first
    1961              :                  */
    1962            0 :                 do_sql_command_end(entry->conn, sql, true);
    1963            0 :                 entry->changing_xact_state = false;
    1964              : 
    1965            0 :                 pgfdw_reset_xact_state(entry, false);
    1966            0 :         }
    1967            0 : }
    1968              : 
    1969              : /*
    1970              :  * Finish abort cleanup of connections on each of which we've sent an abort
    1971              :  * command or cancel request to the remote server.
    1972              :  */
    1973              : static void
    1974            0 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
    1975              :                                                    bool toplevel)
    1976              : {
    1977            0 :         List       *pending_deallocs = NIL;
    1978            0 :         ListCell   *lc;
    1979              : 
    1980              :         /*
    1981              :          * For each of the pending cancel requests (if any), get and discard the
    1982              :          * result of the query, and submit an abort command to the remote server.
    1983              :          */
    1984            0 :         if (cancel_requested)
    1985              :         {
    1986            0 :                 foreach(lc, cancel_requested)
    1987              :                 {
    1988            0 :                         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1989            0 :                         TimestampTz now = GetCurrentTimestamp();
    1990            0 :                         TimestampTz endtime;
    1991            0 :                         TimestampTz retrycanceltime;
    1992            0 :                         char            sql[100];
    1993              : 
    1994            0 :                         Assert(entry->changing_xact_state);
    1995              : 
    1996              :                         /*
    1997              :                          * Set end time.  You might think we should do this before issuing
    1998              :                          * cancel request like in normal mode, but that is problematic,
    1999              :                          * because if, for example, it took longer than 30 seconds to
    2000              :                          * process the first few entries in the cancel_requested list, it
    2001              :                          * would cause a timeout error when processing each of the
    2002              :                          * remaining entries in the list, leading to slamming that entry's
    2003              :                          * connection shut.
    2004              :                          */
    2005            0 :                         endtime = TimestampTzPlusMilliseconds(now,
    2006              :                                                                                                   CONNECTION_CLEANUP_TIMEOUT);
    2007            0 :                         retrycanceltime = TimestampTzPlusMilliseconds(now,
    2008              :                                                                                                                   RETRY_CANCEL_TIMEOUT);
    2009              : 
    2010            0 :                         if (!pgfdw_cancel_query_end(entry->conn, endtime,
    2011            0 :                                                                                 retrycanceltime, true))
    2012              :                         {
    2013              :                                 /* Unable to cancel running query */
    2014            0 :                                 pgfdw_reset_xact_state(entry, toplevel);
    2015            0 :                                 continue;
    2016              :                         }
    2017              : 
    2018              :                         /* Send an abort command in parallel if needed */
    2019            0 :                         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    2020            0 :                         if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    2021              :                         {
    2022              :                                 /* Unable to abort remote (sub)transaction */
    2023            0 :                                 pgfdw_reset_xact_state(entry, toplevel);
    2024            0 :                         }
    2025              :                         else
    2026            0 :                                 pending_entries = lappend(pending_entries, entry);
    2027            0 :                 }
    2028            0 :         }
    2029              : 
    2030              :         /* No further work if no pending entries */
    2031            0 :         if (!pending_entries)
    2032            0 :                 return;
    2033              : 
    2034              :         /*
    2035              :          * Get the result of the abort command for each of the pending entries
    2036              :          */
    2037            0 :         foreach(lc, pending_entries)
    2038              :         {
    2039            0 :                 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2040            0 :                 TimestampTz endtime;
    2041            0 :                 char            sql[100];
    2042              : 
    2043            0 :                 Assert(entry->changing_xact_state);
    2044              : 
    2045              :                 /*
    2046              :                  * Set end time.  We do this now, not before issuing the command like
    2047              :                  * in normal mode, for the same reason as for the cancel_requested
    2048              :                  * entries.
    2049              :                  */
    2050            0 :                 endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2051              :                                                                                           CONNECTION_CLEANUP_TIMEOUT);
    2052              : 
    2053            0 :                 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    2054            0 :                 if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
    2055              :                                                                                   true, false))
    2056              :                 {
    2057              :                         /* Unable to abort remote (sub)transaction */
    2058            0 :                         pgfdw_reset_xact_state(entry, toplevel);
    2059            0 :                         continue;
    2060              :                 }
    2061              : 
    2062            0 :                 if (toplevel)
    2063              :                 {
    2064              :                         /* Do a DEALLOCATE ALL in parallel if needed */
    2065            0 :                         if (entry->have_prep_stmt && entry->have_error)
    2066              :                         {
    2067            0 :                                 if (!pgfdw_exec_cleanup_query_begin(entry->conn,
    2068              :                                                                                                         "DEALLOCATE ALL"))
    2069              :                                 {
    2070              :                                         /* Trouble clearing prepared statements */
    2071            0 :                                         pgfdw_reset_xact_state(entry, toplevel);
    2072            0 :                                 }
    2073              :                                 else
    2074            0 :                                         pending_deallocs = lappend(pending_deallocs, entry);
    2075            0 :                                 continue;
    2076              :                         }
    2077            0 :                         entry->have_prep_stmt = false;
    2078            0 :                         entry->have_error = false;
    2079            0 :                 }
    2080              : 
    2081              :                 /* Reset the per-connection state if needed */
    2082            0 :                 if (entry->state.pendingAreq)
    2083            0 :                         memset(&entry->state, 0, sizeof(entry->state));
    2084              : 
    2085              :                 /* We're done with this entry; unset the changing_xact_state flag */
    2086            0 :                 entry->changing_xact_state = false;
    2087            0 :                 pgfdw_reset_xact_state(entry, toplevel);
    2088            0 :         }
    2089              : 
    2090              :         /* No further work if no pending entries */
    2091            0 :         if (!pending_deallocs)
    2092            0 :                 return;
    2093            0 :         Assert(toplevel);
    2094              : 
    2095              :         /*
    2096              :          * Get the result of the DEALLOCATE command for each of the pending
    2097              :          * entries
    2098              :          */
    2099            0 :         foreach(lc, pending_deallocs)
    2100              :         {
    2101            0 :                 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2102            0 :                 TimestampTz endtime;
    2103              : 
    2104            0 :                 Assert(entry->changing_xact_state);
    2105            0 :                 Assert(entry->have_prep_stmt);
    2106            0 :                 Assert(entry->have_error);
    2107              : 
    2108              :                 /*
    2109              :                  * Set end time.  We do this now, not before issuing the command like
    2110              :                  * in normal mode, for the same reason as for the cancel_requested
    2111              :                  * entries.
    2112              :                  */
    2113            0 :                 endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2114              :                                                                                           CONNECTION_CLEANUP_TIMEOUT);
    2115              : 
    2116            0 :                 if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
    2117            0 :                                                                                   endtime, true, true))
    2118              :                 {
    2119              :                         /* Trouble clearing prepared statements */
    2120            0 :                         pgfdw_reset_xact_state(entry, toplevel);
    2121            0 :                         continue;
    2122              :                 }
    2123            0 :                 entry->have_prep_stmt = false;
    2124            0 :                 entry->have_error = false;
    2125              : 
    2126              :                 /* Reset the per-connection state if needed */
    2127            0 :                 if (entry->state.pendingAreq)
    2128            0 :                         memset(&entry->state, 0, sizeof(entry->state));
    2129              : 
    2130              :                 /* We're done with this entry; unset the changing_xact_state flag */
    2131            0 :                 entry->changing_xact_state = false;
    2132            0 :                 pgfdw_reset_xact_state(entry, toplevel);
    2133            0 :         }
    2134            0 : }
    2135              : 
    2136              : /* Number of output arguments (columns) for various API versions */
    2137              : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1  2
    2138              : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2  6
    2139              : #define POSTGRES_FDW_GET_CONNECTIONS_COLS       6       /* maximum of above */
    2140              : 
    2141              : /*
    2142              :  * Internal function used by postgres_fdw_get_connections variants.
    2143              :  *
    2144              :  * For API version 1.1, this function takes no input parameter and
    2145              :  * returns a set of records with the following values:
    2146              :  *
    2147              :  * - server_name - server name of active connection. In case the foreign server
    2148              :  *   is dropped but still the connection is active, then the server name will
    2149              :  *   be NULL in output.
    2150              :  * - valid - true/false representing whether the connection is valid or not.
    2151              :  *   Note that connections can become invalid in pgfdw_inval_callback.
    2152              :  *
    2153              :  * For API version 1.2 and later, this function takes an input parameter
    2154              :  * to check a connection status and returns the following
    2155              :  * additional values along with the four values from version 1.1:
    2156              :  *
    2157              :  * - user_name - the local user name of the active connection. In case the
    2158              :  *   user mapping is dropped but the connection is still active, then the
    2159              :  *   user name will be NULL in the output.
    2160              :  * - used_in_xact - true if the connection is used in the current transaction.
    2161              :  * - closed - true if the connection is closed.
    2162              :  * - remote_backend_pid - process ID of the remote backend, on the foreign
    2163              :  *   server, handling the connection.
    2164              :  *
    2165              :  * No records are returned when there are no cached connections at all.
    2166              :  */
    2167              : static void
    2168            0 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
    2169              :                                                                           enum pgfdwVersion api_version)
    2170              : {
    2171            0 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    2172            0 :         HASH_SEQ_STATUS scan;
    2173            0 :         ConnCacheEntry *entry;
    2174              : 
    2175            0 :         InitMaterializedSRF(fcinfo, 0);
    2176              : 
    2177              :         /* If cache doesn't exist, we return no records */
    2178            0 :         if (!ConnectionHash)
    2179            0 :                 return;
    2180              : 
    2181              :         /* Check we have the expected number of output arguments */
    2182            0 :         switch (rsinfo->setDesc->natts)
    2183              :         {
    2184              :                 case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
    2185            0 :                         if (api_version != PGFDW_V1_1)
    2186            0 :                                 elog(ERROR, "incorrect number of output arguments");
    2187            0 :                         break;
    2188              :                 case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
    2189            0 :                         if (api_version != PGFDW_V1_2)
    2190            0 :                                 elog(ERROR, "incorrect number of output arguments");
    2191            0 :                         break;
    2192              :                 default:
    2193            0 :                         elog(ERROR, "incorrect number of output arguments");
    2194            0 :         }
    2195              : 
    2196            0 :         hash_seq_init(&scan, ConnectionHash);
    2197            0 :         while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2198              :         {
    2199            0 :                 ForeignServer *server;
    2200            0 :                 Datum           values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2201            0 :                 bool            nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2202            0 :                 int                     i = 0;
    2203              : 
    2204              :                 /* We only look for open remote connections */
    2205            0 :                 if (!entry->conn)
    2206            0 :                         continue;
    2207              : 
    2208            0 :                 server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
    2209              : 
    2210              :                 /*
    2211              :                  * The foreign server may have been dropped in current explicit
    2212              :                  * transaction. It is not possible to drop the server from another
    2213              :                  * session when the connection associated with it is in use in the
    2214              :                  * current transaction, if tried so, the drop query in another session
    2215              :                  * blocks until the current transaction finishes.
    2216              :                  *
    2217              :                  * Even though the server is dropped in the current transaction, the
    2218              :                  * cache can still have associated active connection entry, say we
    2219              :                  * call such connections dangling. Since we can not fetch the server
    2220              :                  * name from system catalogs for dangling connections, instead we show
    2221              :                  * NULL value for server name in output.
    2222              :                  *
    2223              :                  * We could have done better by storing the server name in the cache
    2224              :                  * entry instead of server oid so that it could be used in the output.
    2225              :                  * But the server name in each cache entry requires 64 bytes of
    2226              :                  * memory, which is huge, when there are many cached connections and
    2227              :                  * the use case i.e. dropping the foreign server within the explicit
    2228              :                  * current transaction seems rare. So, we chose to show NULL value for
    2229              :                  * server name in output.
    2230              :                  *
    2231              :                  * Such dangling connections get closed either in next use or at the
    2232              :                  * end of current explicit transaction in pgfdw_xact_callback.
    2233              :                  */
    2234            0 :                 if (!server)
    2235              :                 {
    2236              :                         /*
    2237              :                          * If the server has been dropped in the current explicit
    2238              :                          * transaction, then this entry would have been invalidated in
    2239              :                          * pgfdw_inval_callback at the end of drop server command. Note
    2240              :                          * that this connection would not have been closed in
    2241              :                          * pgfdw_inval_callback because it is still being used in the
    2242              :                          * current explicit transaction. So, assert that here.
    2243              :                          */
    2244            0 :                         Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
    2245              : 
    2246              :                         /* Show null, if no server name was found */
    2247            0 :                         nulls[i++] = true;
    2248            0 :                 }
    2249              :                 else
    2250            0 :                         values[i++] = CStringGetTextDatum(server->servername);
    2251              : 
    2252            0 :                 if (api_version >= PGFDW_V1_2)
    2253              :                 {
    2254            0 :                         HeapTuple       tp;
    2255              : 
    2256              :                         /* Use the system cache to obtain the user mapping */
    2257            0 :                         tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
    2258              : 
    2259              :                         /*
    2260              :                          * Just like in the foreign server case, user mappings can also be
    2261              :                          * dropped in the current explicit transaction. Therefore, the
    2262              :                          * similar check as in the server case is required.
    2263              :                          */
    2264            0 :                         if (!HeapTupleIsValid(tp))
    2265              :                         {
    2266              :                                 /*
    2267              :                                  * If we reach here, this entry must have been invalidated in
    2268              :                                  * pgfdw_inval_callback, same as in the server case.
    2269              :                                  */
    2270            0 :                                 Assert(entry->conn && entry->xact_depth > 0 &&
    2271              :                                            entry->invalidated);
    2272              : 
    2273            0 :                                 nulls[i++] = true;
    2274            0 :                         }
    2275              :                         else
    2276              :                         {
    2277            0 :                                 Oid                     userid;
    2278              : 
    2279            0 :                                 userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
    2280            0 :                                 values[i++] = CStringGetTextDatum(MappingUserName(userid));
    2281            0 :                                 ReleaseSysCache(tp);
    2282            0 :                         }
    2283            0 :                 }
    2284              : 
    2285            0 :                 values[i++] = BoolGetDatum(!entry->invalidated);
    2286              : 
    2287            0 :                 if (api_version >= PGFDW_V1_2)
    2288              :                 {
    2289            0 :                         bool            check_conn = PG_GETARG_BOOL(0);
    2290              : 
    2291              :                         /* Is this connection used in the current transaction? */
    2292            0 :                         values[i++] = BoolGetDatum(entry->xact_depth > 0);
    2293              : 
    2294              :                         /*
    2295              :                          * If a connection status check is requested and supported, return
    2296              :                          * whether the connection is closed. Otherwise, return NULL.
    2297              :                          */
    2298            0 :                         if (check_conn && pgfdw_conn_checkable())
    2299            0 :                                 values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
    2300              :                         else
    2301            0 :                                 nulls[i++] = true;
    2302              : 
    2303              :                         /* Return process ID of remote backend */
    2304            0 :                         values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
    2305            0 :                 }
    2306              : 
    2307            0 :                 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    2308            0 :         }
    2309            0 : }
    2310              : 
    2311              : /*
    2312              :  * List active foreign server connections.
    2313              :  *
    2314              :  * The SQL API of this function has changed multiple times, and will likely
    2315              :  * do so again in future.  To support the case where a newer version of this
    2316              :  * loadable module is being used with an old SQL declaration of the function,
    2317              :  * we continue to support the older API versions.
    2318              :  */
    2319              : Datum
    2320            0 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
    2321              : {
    2322            0 :         postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
    2323              : 
    2324            0 :         PG_RETURN_VOID();
    2325              : }
    2326              : 
    2327              : Datum
    2328            0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
    2329              : {
    2330            0 :         postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
    2331              : 
    2332            0 :         PG_RETURN_VOID();
    2333              : }
    2334              : 
    2335              : /*
    2336              :  * Disconnect the specified cached connections.
    2337              :  *
    2338              :  * This function discards the open connections that are established by
    2339              :  * postgres_fdw from the local session to the foreign server with
    2340              :  * the given name. Note that there can be multiple connections to
    2341              :  * the given server using different user mappings. If the connections
    2342              :  * are used in the current local transaction, they are not disconnected
    2343              :  * and warning messages are reported. This function returns true
    2344              :  * if it disconnects at least one connection, otherwise false. If no
    2345              :  * foreign server with the given name is found, an error is reported.
    2346              :  */
    2347              : Datum
    2348            0 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
    2349              : {
    2350            0 :         ForeignServer *server;
    2351            0 :         char       *servername;
    2352              : 
    2353            0 :         servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
    2354            0 :         server = GetForeignServerByName(servername, false);
    2355              : 
    2356            0 :         PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
    2357            0 : }
    2358              : 
    2359              : /*
    2360              :  * Disconnect all the cached connections.
    2361              :  *
    2362              :  * This function discards all the open connections that are established by
    2363              :  * postgres_fdw from the local session to the foreign servers.
    2364              :  * If the connections are used in the current local transaction, they are
    2365              :  * not disconnected and warning messages are reported. This function
    2366              :  * returns true if it disconnects at least one connection, otherwise false.
    2367              :  */
    2368              : Datum
    2369            0 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
    2370              : {
    2371            0 :         PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
    2372              : }
    2373              : 
    2374              : /*
    2375              :  * Workhorse to disconnect cached connections.
    2376              :  *
    2377              :  * This function scans all the connection cache entries and disconnects
    2378              :  * the open connections whose foreign server OID matches with
    2379              :  * the specified one. If InvalidOid is specified, it disconnects all
    2380              :  * the cached connections.
    2381              :  *
    2382              :  * This function emits a warning for each connection that's used in
    2383              :  * the current transaction and doesn't close it. It returns true if
    2384              :  * it disconnects at least one connection, otherwise false.
    2385              :  *
    2386              :  * Note that this function disconnects even the connections that are
    2387              :  * established by other users in the same local session using different
    2388              :  * user mappings. This leads even non-superuser to be able to close
    2389              :  * the connections established by superusers in the same local session.
    2390              :  *
    2391              :  * XXX As of now we don't see any security risk doing this. But we should
    2392              :  * set some restrictions on that, for example, prevent non-superuser
    2393              :  * from closing the connections established by superusers even
    2394              :  * in the same session?
    2395              :  */
    2396              : static bool
    2397            0 : disconnect_cached_connections(Oid serverid)
    2398              : {
    2399            0 :         HASH_SEQ_STATUS scan;
    2400            0 :         ConnCacheEntry *entry;
    2401            0 :         bool            all = !OidIsValid(serverid);
    2402            0 :         bool            result = false;
    2403              : 
    2404              :         /*
    2405              :          * Connection cache hashtable has not been initialized yet in this
    2406              :          * session, so return false.
    2407              :          */
    2408            0 :         if (!ConnectionHash)
    2409            0 :                 return false;
    2410              : 
    2411            0 :         hash_seq_init(&scan, ConnectionHash);
    2412            0 :         while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2413              :         {
    2414              :                 /* Ignore cache entry if no open connection right now. */
    2415            0 :                 if (!entry->conn)
    2416            0 :                         continue;
    2417              : 
    2418            0 :                 if (all || entry->serverid == serverid)
    2419              :                 {
    2420              :                         /*
    2421              :                          * Emit a warning because the connection to close is used in the
    2422              :                          * current transaction and cannot be disconnected right now.
    2423              :                          */
    2424            0 :                         if (entry->xact_depth > 0)
    2425              :                         {
    2426            0 :                                 ForeignServer *server;
    2427              : 
    2428            0 :                                 server = GetForeignServerExtended(entry->serverid,
    2429              :                                                                                                   FSV_MISSING_OK);
    2430              : 
    2431            0 :                                 if (!server)
    2432              :                                 {
    2433              :                                         /*
    2434              :                                          * If the foreign server was dropped while its connection
    2435              :                                          * was used in the current transaction, the connection
    2436              :                                          * must have been marked as invalid by
    2437              :                                          * pgfdw_inval_callback at the end of DROP SERVER command.
    2438              :                                          */
    2439            0 :                                         Assert(entry->invalidated);
    2440              : 
    2441            0 :                                         ereport(WARNING,
    2442              :                                                         (errmsg("cannot close dropped server connection because it is still in use")));
    2443            0 :                                 }
    2444              :                                 else
    2445            0 :                                         ereport(WARNING,
    2446              :                                                         (errmsg("cannot close connection for server \"%s\" because it is still in use",
    2447              :                                                                         server->servername)));
    2448            0 :                         }
    2449              :                         else
    2450              :                         {
    2451            0 :                                 elog(DEBUG3, "discarding connection %p", entry->conn);
    2452            0 :                                 disconnect_pg_server(entry);
    2453            0 :                                 result = true;
    2454              :                         }
    2455            0 :                 }
    2456              :         }
    2457              : 
    2458            0 :         return result;
    2459            0 : }
    2460              : 
    2461              : /*
    2462              :  * Check if the remote server closed the connection.
    2463              :  *
    2464              :  * Returns 1 if the connection is closed, -1 if an error occurred,
    2465              :  * and 0 if it's not closed or if the connection check is unavailable
    2466              :  * on this platform.
    2467              :  */
    2468              : static int
    2469            0 : pgfdw_conn_check(PGconn *conn)
    2470              : {
    2471            0 :         int                     sock = PQsocket(conn);
    2472              : 
    2473            0 :         if (PQstatus(conn) != CONNECTION_OK || sock == -1)
    2474            0 :                 return -1;
    2475              : 
    2476              : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2477              :         {
    2478              :                 struct pollfd input_fd;
    2479              :                 int                     result;
    2480              : 
    2481              :                 input_fd.fd = sock;
    2482              :                 input_fd.events = POLLRDHUP;
    2483              :                 input_fd.revents = 0;
    2484              : 
    2485              :                 do
    2486              :                         result = poll(&input_fd, 1, 0);
    2487              :                 while (result < 0 && errno == EINTR);
    2488              : 
    2489              :                 if (result < 0)
    2490              :                         return -1;
    2491              : 
    2492              :                 return (input_fd.revents &
    2493              :                                 (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
    2494              :         }
    2495              : #else
    2496            0 :         return 0;
    2497              : #endif
    2498            0 : }
    2499              : 
    2500              : /*
    2501              :  * Check if connection status checking is available on this platform.
    2502              :  *
    2503              :  * Returns true if available, false otherwise.
    2504              :  */
    2505              : static bool
    2506            0 : pgfdw_conn_checkable(void)
    2507              : {
    2508              : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2509              :         return true;
    2510              : #else
    2511            0 :         return false;
    2512              : #endif
    2513              : }
    2514              : 
    2515              : /*
    2516              :  * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
    2517              :  * keys used to pass-through are coming from the initial connection from the
    2518              :  * client with the server.
    2519              :  *
    2520              :  * All required SCRAM options are set by postgres_fdw, so we just need to
    2521              :  * ensure that these options are not overwritten by the user.
    2522              :  */
    2523              : static bool
    2524            0 : pgfdw_has_required_scram_options(const char **keywords, const char **values)
    2525              : {
    2526            0 :         bool            has_scram_server_key = false;
    2527            0 :         bool            has_scram_client_key = false;
    2528            0 :         bool            has_require_auth = false;
    2529            0 :         bool            has_scram_keys = false;
    2530              : 
    2531              :         /*
    2532              :          * Continue iterating even if we found the keys that we need to validate
    2533              :          * to make sure that there is no other declaration of these keys that can
    2534              :          * overwrite the first.
    2535              :          */
    2536            0 :         for (int i = 0; keywords[i] != NULL; i++)
    2537              :         {
    2538            0 :                 if (strcmp(keywords[i], "scram_client_key") == 0)
    2539              :                 {
    2540            0 :                         if (values[i] != NULL && values[i][0] != '\0')
    2541            0 :                                 has_scram_client_key = true;
    2542              :                         else
    2543            0 :                                 has_scram_client_key = false;
    2544            0 :                 }
    2545              : 
    2546            0 :                 if (strcmp(keywords[i], "scram_server_key") == 0)
    2547              :                 {
    2548            0 :                         if (values[i] != NULL && values[i][0] != '\0')
    2549            0 :                                 has_scram_server_key = true;
    2550              :                         else
    2551            0 :                                 has_scram_server_key = false;
    2552            0 :                 }
    2553              : 
    2554            0 :                 if (strcmp(keywords[i], "require_auth") == 0)
    2555              :                 {
    2556            0 :                         if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
    2557            0 :                                 has_require_auth = true;
    2558              :                         else
    2559            0 :                                 has_require_auth = false;
    2560            0 :                 }
    2561            0 :         }
    2562              : 
    2563            0 :         has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
    2564              : 
    2565            0 :         return (has_scram_keys && has_require_auth);
    2566            0 : }
        

Generated by: LCOV version 2.3.2-1