LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 1461 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 81 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*
       2              :  * dblink.c
       3              :  *
       4              :  * Functions returning results from a remote database
       5              :  *
       6              :  * Joe Conway <mail@joeconway.com>
       7              :  * And contributors:
       8              :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9              :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10              :  *
      11              :  * contrib/dblink/dblink.c
      12              :  * Copyright (c) 2001-2026, PostgreSQL Global Development Group
      13              :  * ALL RIGHTS RESERVED;
      14              :  *
      15              :  * Permission to use, copy, modify, and distribute this software and its
      16              :  * documentation for any purpose, without fee, and without a written agreement
      17              :  * is hereby granted, provided that the above copyright notice and this
      18              :  * paragraph and the following two paragraphs appear in all copies.
      19              :  *
      20              :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21              :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22              :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23              :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24              :  * POSSIBILITY OF SUCH DAMAGE.
      25              :  *
      26              :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27              :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28              :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29              :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30              :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31              :  *
      32              :  */
      33              : #include "postgres.h"
      34              : 
      35              : #include <limits.h>
      36              : 
      37              : #include "access/htup_details.h"
      38              : #include "access/relation.h"
      39              : #include "access/reloptions.h"
      40              : #include "access/table.h"
      41              : #include "catalog/namespace.h"
      42              : #include "catalog/pg_foreign_data_wrapper.h"
      43              : #include "catalog/pg_foreign_server.h"
      44              : #include "catalog/pg_type.h"
      45              : #include "catalog/pg_user_mapping.h"
      46              : #include "commands/defrem.h"
      47              : #include "common/base64.h"
      48              : #include "executor/spi.h"
      49              : #include "foreign/foreign.h"
      50              : #include "funcapi.h"
      51              : #include "lib/stringinfo.h"
      52              : #include "libpq-fe.h"
      53              : #include "libpq/libpq-be.h"
      54              : #include "libpq/libpq-be-fe-helpers.h"
      55              : #include "mb/pg_wchar.h"
      56              : #include "miscadmin.h"
      57              : #include "parser/scansup.h"
      58              : #include "utils/acl.h"
      59              : #include "utils/builtins.h"
      60              : #include "utils/fmgroids.h"
      61              : #include "utils/guc.h"
      62              : #include "utils/lsyscache.h"
      63              : #include "utils/memutils.h"
      64              : #include "utils/rel.h"
      65              : #include "utils/varlena.h"
      66              : #include "utils/wait_event.h"
      67              : 
      68            0 : PG_MODULE_MAGIC_EXT(
      69              :                                         .name = "dblink",
      70              :                                         .version = PG_VERSION
      71              : );
      72              : 
      73              : typedef struct remoteConn
      74              : {
      75              :         PGconn     *conn;                       /* Hold the remote connection */
      76              :         int                     openCursorCount;        /* The number of open cursors */
      77              :         bool            newXactForCursor;       /* Opened a transaction for a cursor */
      78              : } remoteConn;
      79              : 
      80              : typedef struct storeInfo
      81              : {
      82              :         FunctionCallInfo fcinfo;
      83              :         Tuplestorestate *tuplestore;
      84              :         AttInMetadata *attinmeta;
      85              :         MemoryContext tmpcontext;
      86              :         char      **cstrs;
      87              :         /* temp storage for results to avoid leaks on exception */
      88              :         PGresult   *last_res;
      89              :         PGresult   *cur_res;
      90              : } storeInfo;
      91              : 
      92              : /*
      93              :  * Internal declarations
      94              :  */
      95              : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      96              : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      97              : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      98              :                                                           PGresult *res);
      99              : static void materializeQueryResult(FunctionCallInfo fcinfo,
     100              :                                                                    PGconn *conn,
     101              :                                                                    const char *conname,
     102              :                                                                    const char *sql,
     103              :                                                                    bool fail);
     104              : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
     105              : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
     106              : static remoteConn *getConnectionByName(const char *name);
     107              : static HTAB *createConnHash(void);
     108              : static remoteConn *createNewConnection(const char *name);
     109              : static void deleteConnection(const char *name);
     110              : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     111              : static char **get_text_array_contents(ArrayType *array, int *numitems);
     112              : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     113              : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     114              : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     115              : static char *quote_ident_cstr(char *rawstr);
     116              : static int      get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     117              : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     118              : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     119              : static char *generate_relation_name(Relation rel);
     120              : static void dblink_connstr_check(const char *connstr);
     121              : static bool dblink_connstr_has_pw(const char *connstr);
     122              : static void dblink_security_check(PGconn *conn, const char *connname,
     123              :                                                                   const char *connstr);
     124              : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     125              :                                                          bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     126              : static char *get_connect_string(const char *servername);
     127              : static char *escape_param_str(const char *str);
     128              : static void validate_pkattnums(Relation rel,
     129              :                                                            int2vector *pkattnums_arg, int32 pknumatts_arg,
     130              :                                                            int **pkattnums, int *pknumatts);
     131              : static bool is_valid_dblink_option(const PQconninfoOption *options,
     132              :                                                                    const char *option, Oid context);
     133              : static int      applyRemoteGucs(PGconn *conn);
     134              : static void restoreLocalGucs(int nestlevel);
     135              : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
     136              : static void appendSCRAMKeysInfo(StringInfo buf);
     137              : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
     138              :                                                                            Oid context);
     139              : static bool dblink_connstr_has_required_scram_options(const char *connstr);
     140              : 
     141              : /* Global */
     142              : static remoteConn *pconn = NULL;
     143              : static HTAB *remoteConnHash = NULL;
     144              : 
     145              : /* custom wait event values, retrieved from shared memory */
     146              : static uint32 dblink_we_connect = 0;
     147              : static uint32 dblink_we_get_conn = 0;
     148              : static uint32 dblink_we_get_result = 0;
     149              : 
     150              : /*
     151              :  *      Following is hash that holds multiple remote connections.
     152              :  *      Calling convention of each dblink function changes to accept
     153              :  *      connection name as the first parameter. The connection hash is
     154              :  *      much like ecpg e.g. a mapping between a name and a PGconn object.
     155              :  *
     156              :  *      To avoid potentially leaking a PGconn object in case of out-of-memory
     157              :  *      errors, we first create the hash entry, then open the PGconn.
     158              :  *      Hence, a hash entry whose rconn.conn pointer is NULL must be
     159              :  *      understood as a leftover from a failed create; it should be ignored
     160              :  *      by lookup operations, and silently replaced by create operations.
     161              :  */
     162              : 
     163              : typedef struct remoteConnHashEnt
     164              : {
     165              :         char            name[NAMEDATALEN];
     166              :         remoteConn      rconn;
     167              : } remoteConnHashEnt;
     168              : 
     169              : /* initial number of connection hashes */
     170              : #define NUMCONN 16
     171              : 
     172              : pg_noreturn static void
     173            0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     174              : {
     175            0 :         char       *msg = pchomp(PQerrorMessage(conn));
     176              : 
     177            0 :         PQclear(res);
     178            0 :         elog(ERROR, "%s: %s", p2, msg);
     179            0 : }
     180              : 
     181              : pg_noreturn static void
     182            0 : dblink_conn_not_avail(const char *conname)
     183              : {
     184            0 :         if (conname)
     185            0 :                 ereport(ERROR,
     186              :                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     187              :                                  errmsg("connection \"%s\" not available", conname)));
     188              :         else
     189            0 :                 ereport(ERROR,
     190              :                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     191              :                                  errmsg("connection not available")));
     192            0 : }
     193              : 
     194              : static void
     195            0 : dblink_get_conn(char *conname_or_str,
     196              :                                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     197              : {
     198            0 :         remoteConn *rconn = getConnectionByName(conname_or_str);
     199            0 :         PGconn     *conn;
     200            0 :         char       *conname;
     201            0 :         bool            freeconn;
     202              : 
     203            0 :         if (rconn)
     204              :         {
     205            0 :                 conn = rconn->conn;
     206            0 :                 conname = conname_or_str;
     207            0 :                 freeconn = false;
     208            0 :         }
     209              :         else
     210              :         {
     211            0 :                 const char *connstr;
     212              : 
     213            0 :                 connstr = get_connect_string(conname_or_str);
     214            0 :                 if (connstr == NULL)
     215            0 :                         connstr = conname_or_str;
     216            0 :                 dblink_connstr_check(connstr);
     217              : 
     218              :                 /* first time, allocate or get the custom wait event */
     219            0 :                 if (dblink_we_get_conn == 0)
     220            0 :                         dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
     221              : 
     222              :                 /* OK to make connection */
     223            0 :                 conn = libpqsrv_connect(connstr, dblink_we_get_conn);
     224              : 
     225            0 :                 if (PQstatus(conn) == CONNECTION_BAD)
     226              :                 {
     227            0 :                         char       *msg = pchomp(PQerrorMessage(conn));
     228              : 
     229            0 :                         libpqsrv_disconnect(conn);
     230            0 :                         ereport(ERROR,
     231              :                                         (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     232              :                                          errmsg("could not establish connection"),
     233              :                                          errdetail_internal("%s", msg)));
     234            0 :                 }
     235              : 
     236            0 :                 PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     237              :                                                         "received message via remote connection");
     238              : 
     239            0 :                 dblink_security_check(conn, NULL, connstr);
     240            0 :                 if (PQclientEncoding(conn) != GetDatabaseEncoding())
     241            0 :                         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     242            0 :                 freeconn = true;
     243            0 :                 conname = NULL;
     244            0 :         }
     245              : 
     246            0 :         *conn_p = conn;
     247            0 :         *conname_p = conname;
     248            0 :         *freeconn_p = freeconn;
     249            0 : }
     250              : 
     251              : static PGconn *
     252            0 : dblink_get_named_conn(const char *conname)
     253              : {
     254            0 :         remoteConn *rconn = getConnectionByName(conname);
     255              : 
     256            0 :         if (rconn)
     257            0 :                 return rconn->conn;
     258              : 
     259            0 :         dblink_conn_not_avail(conname);
     260              :         return NULL;                            /* keep compiler quiet */
     261            0 : }
     262              : 
     263              : static void
     264            0 : dblink_init(void)
     265              : {
     266            0 :         if (!pconn)
     267              :         {
     268            0 :                 if (dblink_we_get_result == 0)
     269            0 :                         dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
     270              : 
     271            0 :                 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     272            0 :                 pconn->conn = NULL;
     273            0 :                 pconn->openCursorCount = 0;
     274            0 :                 pconn->newXactForCursor = false;
     275            0 :         }
     276            0 : }
     277              : 
     278              : /*
     279              :  * Create a persistent connection to another database
     280              :  */
     281            0 : PG_FUNCTION_INFO_V1(dblink_connect);
     282              : Datum
     283            0 : dblink_connect(PG_FUNCTION_ARGS)
     284              : {
     285            0 :         char       *conname_or_str = NULL;
     286            0 :         char       *connstr = NULL;
     287            0 :         char       *connname = NULL;
     288            0 :         char       *msg;
     289            0 :         PGconn     *conn = NULL;
     290            0 :         remoteConn *rconn = NULL;
     291              : 
     292            0 :         dblink_init();
     293              : 
     294            0 :         if (PG_NARGS() == 2)
     295              :         {
     296            0 :                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     297            0 :                 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     298            0 :         }
     299            0 :         else if (PG_NARGS() == 1)
     300            0 :                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     301              : 
     302              :         /* first check for valid foreign data server */
     303            0 :         connstr = get_connect_string(conname_or_str);
     304            0 :         if (connstr == NULL)
     305            0 :                 connstr = conname_or_str;
     306              : 
     307              :         /* check password in connection string if not superuser */
     308            0 :         dblink_connstr_check(connstr);
     309              : 
     310              :         /* first time, allocate or get the custom wait event */
     311            0 :         if (dblink_we_connect == 0)
     312            0 :                 dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
     313              : 
     314              :         /* if we need a hashtable entry, make that first, since it might fail */
     315            0 :         if (connname)
     316              :         {
     317            0 :                 rconn = createNewConnection(connname);
     318            0 :                 Assert(rconn->conn == NULL);
     319            0 :         }
     320              : 
     321              :         /* OK to make connection */
     322            0 :         conn = libpqsrv_connect(connstr, dblink_we_connect);
     323              : 
     324            0 :         if (PQstatus(conn) == CONNECTION_BAD)
     325              :         {
     326            0 :                 msg = pchomp(PQerrorMessage(conn));
     327            0 :                 libpqsrv_disconnect(conn);
     328            0 :                 if (connname)
     329            0 :                         deleteConnection(connname);
     330              : 
     331            0 :                 ereport(ERROR,
     332              :                                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     333              :                                  errmsg("could not establish connection"),
     334              :                                  errdetail_internal("%s", msg)));
     335            0 :         }
     336              : 
     337            0 :         PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     338              :                                                 "received message via remote connection");
     339              : 
     340              :         /* check password actually used if not superuser */
     341            0 :         dblink_security_check(conn, connname, connstr);
     342              : 
     343              :         /* attempt to set client encoding to match server encoding, if needed */
     344            0 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     345            0 :                 PQsetClientEncoding(conn, GetDatabaseEncodingName());
     346              : 
     347              :         /* all OK, save away the conn */
     348            0 :         if (connname)
     349              :         {
     350            0 :                 rconn->conn = conn;
     351            0 :         }
     352              :         else
     353              :         {
     354            0 :                 if (pconn->conn)
     355            0 :                         libpqsrv_disconnect(pconn->conn);
     356            0 :                 pconn->conn = conn;
     357              :         }
     358              : 
     359            0 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
     360            0 : }
     361              : 
     362              : /*
     363              :  * Clear a persistent connection to another database
     364              :  */
     365            0 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     366              : Datum
     367            0 : dblink_disconnect(PG_FUNCTION_ARGS)
     368              : {
     369            0 :         char       *conname = NULL;
     370            0 :         remoteConn *rconn = NULL;
     371            0 :         PGconn     *conn = NULL;
     372              : 
     373            0 :         dblink_init();
     374              : 
     375            0 :         if (PG_NARGS() == 1)
     376              :         {
     377            0 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     378            0 :                 rconn = getConnectionByName(conname);
     379            0 :                 if (rconn)
     380            0 :                         conn = rconn->conn;
     381            0 :         }
     382              :         else
     383            0 :                 conn = pconn->conn;
     384              : 
     385            0 :         if (!conn)
     386            0 :                 dblink_conn_not_avail(conname);
     387              : 
     388            0 :         libpqsrv_disconnect(conn);
     389            0 :         if (rconn)
     390            0 :                 deleteConnection(conname);
     391              :         else
     392            0 :                 pconn->conn = NULL;
     393              : 
     394            0 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
     395            0 : }
     396              : 
     397              : /*
     398              :  * opens a cursor using a persistent connection
     399              :  */
     400            0 : PG_FUNCTION_INFO_V1(dblink_open);
     401              : Datum
     402            0 : dblink_open(PG_FUNCTION_ARGS)
     403              : {
     404            0 :         PGresult   *res = NULL;
     405            0 :         PGconn     *conn;
     406            0 :         char       *curname = NULL;
     407            0 :         char       *sql = NULL;
     408            0 :         char       *conname = NULL;
     409            0 :         StringInfoData buf;
     410            0 :         remoteConn *rconn = NULL;
     411            0 :         bool            fail = true;    /* default to backward compatible behavior */
     412              : 
     413            0 :         dblink_init();
     414            0 :         initStringInfo(&buf);
     415              : 
     416            0 :         if (PG_NARGS() == 2)
     417              :         {
     418              :                 /* text,text */
     419            0 :                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     420            0 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     421            0 :                 rconn = pconn;
     422            0 :         }
     423            0 :         else if (PG_NARGS() == 3)
     424              :         {
     425              :                 /* might be text,text,text or text,text,bool */
     426            0 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     427              :                 {
     428            0 :                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     429            0 :                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     430            0 :                         fail = PG_GETARG_BOOL(2);
     431            0 :                         rconn = pconn;
     432            0 :                 }
     433              :                 else
     434              :                 {
     435            0 :                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     436            0 :                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     437            0 :                         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     438            0 :                         rconn = getConnectionByName(conname);
     439              :                 }
     440            0 :         }
     441            0 :         else if (PG_NARGS() == 4)
     442              :         {
     443              :                 /* text,text,text,bool */
     444            0 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     445            0 :                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     446            0 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     447            0 :                 fail = PG_GETARG_BOOL(3);
     448            0 :                 rconn = getConnectionByName(conname);
     449            0 :         }
     450              : 
     451            0 :         if (!rconn || !rconn->conn)
     452            0 :                 dblink_conn_not_avail(conname);
     453              : 
     454            0 :         conn = rconn->conn;
     455              : 
     456              :         /* If we are not in a transaction, start one */
     457            0 :         if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     458              :         {
     459            0 :                 res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
     460            0 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
     461            0 :                         dblink_res_internalerror(conn, res, "begin error");
     462            0 :                 PQclear(res);
     463            0 :                 rconn->newXactForCursor = true;
     464              : 
     465              :                 /*
     466              :                  * Since transaction state was IDLE, we force cursor count to
     467              :                  * initially be 0. This is needed as a previous ABORT might have wiped
     468              :                  * out our transaction without maintaining the cursor count for us.
     469              :                  */
     470            0 :                 rconn->openCursorCount = 0;
     471            0 :         }
     472              : 
     473              :         /* if we started a transaction, increment cursor count */
     474            0 :         if (rconn->newXactForCursor)
     475            0 :                 (rconn->openCursorCount)++;
     476              : 
     477            0 :         appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     478            0 :         res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     479            0 :         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     480              :         {
     481            0 :                 dblink_res_error(conn, conname, res, fail,
     482            0 :                                                  "while opening cursor \"%s\"", curname);
     483            0 :                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     484              :         }
     485              : 
     486            0 :         PQclear(res);
     487            0 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
     488            0 : }
     489              : 
     490              : /*
     491              :  * closes a cursor
     492              :  */
     493            0 : PG_FUNCTION_INFO_V1(dblink_close);
     494              : Datum
     495            0 : dblink_close(PG_FUNCTION_ARGS)
     496              : {
     497            0 :         PGconn     *conn;
     498            0 :         PGresult   *res = NULL;
     499            0 :         char       *curname = NULL;
     500            0 :         char       *conname = NULL;
     501            0 :         StringInfoData buf;
     502            0 :         remoteConn *rconn = NULL;
     503            0 :         bool            fail = true;    /* default to backward compatible behavior */
     504              : 
     505            0 :         dblink_init();
     506            0 :         initStringInfo(&buf);
     507              : 
     508            0 :         if (PG_NARGS() == 1)
     509              :         {
     510              :                 /* text */
     511            0 :                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     512            0 :                 rconn = pconn;
     513            0 :         }
     514            0 :         else if (PG_NARGS() == 2)
     515              :         {
     516              :                 /* might be text,text or text,bool */
     517            0 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     518              :                 {
     519            0 :                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     520            0 :                         fail = PG_GETARG_BOOL(1);
     521            0 :                         rconn = pconn;
     522            0 :                 }
     523              :                 else
     524              :                 {
     525            0 :                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     526            0 :                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     527            0 :                         rconn = getConnectionByName(conname);
     528              :                 }
     529            0 :         }
     530            0 :         if (PG_NARGS() == 3)
     531              :         {
     532              :                 /* text,text,bool */
     533            0 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     534            0 :                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     535            0 :                 fail = PG_GETARG_BOOL(2);
     536            0 :                 rconn = getConnectionByName(conname);
     537            0 :         }
     538              : 
     539            0 :         if (!rconn || !rconn->conn)
     540            0 :                 dblink_conn_not_avail(conname);
     541              : 
     542            0 :         conn = rconn->conn;
     543              : 
     544            0 :         appendStringInfo(&buf, "CLOSE %s", curname);
     545              : 
     546              :         /* close the cursor */
     547            0 :         res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     548            0 :         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     549              :         {
     550            0 :                 dblink_res_error(conn, conname, res, fail,
     551            0 :                                                  "while closing cursor \"%s\"", curname);
     552            0 :                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     553              :         }
     554              : 
     555            0 :         PQclear(res);
     556              : 
     557              :         /* if we started a transaction, decrement cursor count */
     558            0 :         if (rconn->newXactForCursor)
     559              :         {
     560            0 :                 (rconn->openCursorCount)--;
     561              : 
     562              :                 /* if count is zero, commit the transaction */
     563            0 :                 if (rconn->openCursorCount == 0)
     564              :                 {
     565            0 :                         rconn->newXactForCursor = false;
     566              : 
     567            0 :                         res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
     568            0 :                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     569            0 :                                 dblink_res_internalerror(conn, res, "commit error");
     570            0 :                         PQclear(res);
     571            0 :                 }
     572            0 :         }
     573              : 
     574            0 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
     575            0 : }
     576              : 
     577              : /*
     578              :  * Fetch results from an open cursor
     579              :  */
     580            0 : PG_FUNCTION_INFO_V1(dblink_fetch);
     581              : Datum
     582            0 : dblink_fetch(PG_FUNCTION_ARGS)
     583              : {
     584            0 :         PGresult   *res = NULL;
     585            0 :         char       *conname = NULL;
     586            0 :         remoteConn *rconn = NULL;
     587            0 :         PGconn     *conn = NULL;
     588            0 :         StringInfoData buf;
     589            0 :         char       *curname = NULL;
     590            0 :         int                     howmany = 0;
     591            0 :         bool            fail = true;    /* default to backward compatible */
     592              : 
     593            0 :         prepTuplestoreResult(fcinfo);
     594              : 
     595            0 :         dblink_init();
     596              : 
     597            0 :         if (PG_NARGS() == 4)
     598              :         {
     599              :                 /* text,text,int,bool */
     600            0 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     601            0 :                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     602            0 :                 howmany = PG_GETARG_INT32(2);
     603            0 :                 fail = PG_GETARG_BOOL(3);
     604              : 
     605            0 :                 rconn = getConnectionByName(conname);
     606            0 :                 if (rconn)
     607            0 :                         conn = rconn->conn;
     608            0 :         }
     609            0 :         else if (PG_NARGS() == 3)
     610              :         {
     611              :                 /* text,text,int or text,int,bool */
     612            0 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     613              :                 {
     614            0 :                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     615            0 :                         howmany = PG_GETARG_INT32(1);
     616            0 :                         fail = PG_GETARG_BOOL(2);
     617            0 :                         conn = pconn->conn;
     618            0 :                 }
     619              :                 else
     620              :                 {
     621            0 :                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     622            0 :                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     623            0 :                         howmany = PG_GETARG_INT32(2);
     624              : 
     625            0 :                         rconn = getConnectionByName(conname);
     626            0 :                         if (rconn)
     627            0 :                                 conn = rconn->conn;
     628              :                 }
     629            0 :         }
     630            0 :         else if (PG_NARGS() == 2)
     631              :         {
     632              :                 /* text,int */
     633            0 :                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     634            0 :                 howmany = PG_GETARG_INT32(1);
     635            0 :                 conn = pconn->conn;
     636            0 :         }
     637              : 
     638            0 :         if (!conn)
     639            0 :                 dblink_conn_not_avail(conname);
     640              : 
     641            0 :         initStringInfo(&buf);
     642            0 :         appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     643              : 
     644              :         /*
     645              :          * Try to execute the query.  Note that since libpq uses malloc, the
     646              :          * PGresult will be long-lived even though we are still in a short-lived
     647              :          * memory context.
     648              :          */
     649            0 :         res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     650            0 :         if (!res ||
     651            0 :                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
     652            0 :                  PQresultStatus(res) != PGRES_TUPLES_OK))
     653              :         {
     654            0 :                 dblink_res_error(conn, conname, res, fail,
     655            0 :                                                  "while fetching from cursor \"%s\"", curname);
     656            0 :                 return (Datum) 0;
     657              :         }
     658            0 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     659              :         {
     660              :                 /* cursor does not exist - closed already or bad name */
     661            0 :                 PQclear(res);
     662            0 :                 ereport(ERROR,
     663              :                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     664              :                                  errmsg("cursor \"%s\" does not exist", curname)));
     665            0 :         }
     666              : 
     667            0 :         materializeResult(fcinfo, conn, res);
     668            0 :         return (Datum) 0;
     669            0 : }
     670              : 
     671              : /*
     672              :  * Note: this is the new preferred version of dblink
     673              :  */
     674            0 : PG_FUNCTION_INFO_V1(dblink_record);
     675              : Datum
     676            0 : dblink_record(PG_FUNCTION_ARGS)
     677              : {
     678            0 :         return dblink_record_internal(fcinfo, false);
     679              : }
     680              : 
     681            0 : PG_FUNCTION_INFO_V1(dblink_send_query);
     682              : Datum
     683            0 : dblink_send_query(PG_FUNCTION_ARGS)
     684              : {
     685            0 :         PGconn     *conn;
     686            0 :         char       *sql;
     687            0 :         int                     retval;
     688              : 
     689            0 :         if (PG_NARGS() == 2)
     690              :         {
     691            0 :                 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     692            0 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     693            0 :         }
     694              :         else
     695              :                 /* shouldn't happen */
     696            0 :                 elog(ERROR, "wrong number of arguments");
     697              : 
     698              :         /* async query send */
     699            0 :         retval = PQsendQuery(conn, sql);
     700            0 :         if (retval != 1)
     701            0 :                 elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     702              : 
     703            0 :         PG_RETURN_INT32(retval);
     704            0 : }
     705              : 
     706            0 : PG_FUNCTION_INFO_V1(dblink_get_result);
     707              : Datum
     708            0 : dblink_get_result(PG_FUNCTION_ARGS)
     709              : {
     710            0 :         return dblink_record_internal(fcinfo, true);
     711              : }
     712              : 
     713              : static Datum
     714            0 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     715              : {
     716            0 :         PGconn     *volatile conn = NULL;
     717            0 :         volatile bool freeconn = false;
     718              : 
     719            0 :         prepTuplestoreResult(fcinfo);
     720              : 
     721            0 :         dblink_init();
     722              : 
     723            0 :         PG_TRY();
     724              :         {
     725            0 :                 char       *sql = NULL;
     726            0 :                 char       *conname = NULL;
     727            0 :                 bool            fail = true;    /* default to backward compatible */
     728              : 
     729            0 :                 if (!is_async)
     730              :                 {
     731            0 :                         if (PG_NARGS() == 3)
     732              :                         {
     733              :                                 /* text,text,bool */
     734            0 :                                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     735            0 :                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     736            0 :                                 fail = PG_GETARG_BOOL(2);
     737            0 :                                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     738            0 :                         }
     739            0 :                         else if (PG_NARGS() == 2)
     740              :                         {
     741              :                                 /* text,text or text,bool */
     742            0 :                                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     743              :                                 {
     744            0 :                                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     745            0 :                                         fail = PG_GETARG_BOOL(1);
     746            0 :                                         conn = pconn->conn;
     747            0 :                                 }
     748              :                                 else
     749              :                                 {
     750            0 :                                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     751            0 :                                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     752            0 :                                         dblink_get_conn(conname, &conn, &conname, &freeconn);
     753              :                                 }
     754            0 :                         }
     755            0 :                         else if (PG_NARGS() == 1)
     756              :                         {
     757              :                                 /* text */
     758            0 :                                 conn = pconn->conn;
     759            0 :                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     760            0 :                         }
     761              :                         else
     762              :                                 /* shouldn't happen */
     763            0 :                                 elog(ERROR, "wrong number of arguments");
     764            0 :                 }
     765              :                 else                                    /* is_async */
     766              :                 {
     767              :                         /* get async result */
     768            0 :                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     769              : 
     770            0 :                         if (PG_NARGS() == 2)
     771              :                         {
     772              :                                 /* text,bool */
     773            0 :                                 fail = PG_GETARG_BOOL(1);
     774            0 :                                 conn = dblink_get_named_conn(conname);
     775            0 :                         }
     776            0 :                         else if (PG_NARGS() == 1)
     777              :                         {
     778              :                                 /* text */
     779            0 :                                 conn = dblink_get_named_conn(conname);
     780            0 :                         }
     781              :                         else
     782              :                                 /* shouldn't happen */
     783            0 :                                 elog(ERROR, "wrong number of arguments");
     784              :                 }
     785              : 
     786            0 :                 if (!conn)
     787            0 :                         dblink_conn_not_avail(conname);
     788              : 
     789            0 :                 if (!is_async)
     790              :                 {
     791              :                         /* synchronous query, use efficient tuple collection method */
     792            0 :                         materializeQueryResult(fcinfo, conn, conname, sql, fail);
     793            0 :                 }
     794              :                 else
     795              :                 {
     796              :                         /* async result retrieval, do it the old way */
     797            0 :                         PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
     798              : 
     799              :                         /* NULL means we're all done with the async results */
     800            0 :                         if (res)
     801              :                         {
     802            0 :                                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     803            0 :                                         PQresultStatus(res) != PGRES_TUPLES_OK)
     804              :                                 {
     805            0 :                                         dblink_res_error(conn, conname, res, fail,
     806              :                                                                          "while executing query");
     807              :                                         /* if fail isn't set, we'll return an empty query result */
     808            0 :                                 }
     809              :                                 else
     810              :                                 {
     811            0 :                                         materializeResult(fcinfo, conn, res);
     812              :                                 }
     813            0 :                         }
     814            0 :                 }
     815            0 :         }
     816            0 :         PG_FINALLY();
     817              :         {
     818              :                 /* if needed, close the connection to the database */
     819            0 :                 if (freeconn)
     820            0 :                         libpqsrv_disconnect(conn);
     821              :         }
     822            0 :         PG_END_TRY();
     823              : 
     824            0 :         return (Datum) 0;
     825            0 : }
     826              : 
     827              : /*
     828              :  * Verify function caller can handle a tuplestore result, and set up for that.
     829              :  *
     830              :  * Note: if the caller returns without actually creating a tuplestore, the
     831              :  * executor will treat the function result as an empty set.
     832              :  */
     833              : static void
     834            0 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     835              : {
     836            0 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     837              : 
     838              :         /* check to see if query supports us returning a tuplestore */
     839            0 :         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     840            0 :                 ereport(ERROR,
     841              :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     842              :                                  errmsg("set-valued function called in context that cannot accept a set")));
     843            0 :         if (!(rsinfo->allowedModes & SFRM_Materialize))
     844            0 :                 ereport(ERROR,
     845              :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     846              :                                  errmsg("materialize mode required, but it is not allowed in this context")));
     847              : 
     848              :         /* let the executor know we're sending back a tuplestore */
     849            0 :         rsinfo->returnMode = SFRM_Materialize;
     850              : 
     851              :         /* caller must fill these to return a non-empty result */
     852            0 :         rsinfo->setResult = NULL;
     853            0 :         rsinfo->setDesc = NULL;
     854            0 : }
     855              : 
     856              : /*
     857              :  * Copy the contents of the PGresult into a tuplestore to be returned
     858              :  * as the result of the current function.
     859              :  * The PGresult will be released in this function.
     860              :  */
     861              : static void
     862            0 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     863              : {
     864            0 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     865            0 :         TupleDesc       tupdesc;
     866            0 :         bool            is_sql_cmd;
     867            0 :         int                     ntuples;
     868            0 :         int                     nfields;
     869              : 
     870              :         /* prepTuplestoreResult must have been called previously */
     871            0 :         Assert(rsinfo->returnMode == SFRM_Materialize);
     872              : 
     873            0 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     874              :         {
     875            0 :                 is_sql_cmd = true;
     876              : 
     877              :                 /*
     878              :                  * need a tuple descriptor representing one TEXT column to return the
     879              :                  * command status string as our result tuple
     880              :                  */
     881            0 :                 tupdesc = CreateTemplateTupleDesc(1);
     882            0 :                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     883              :                                                    TEXTOID, -1, 0);
     884            0 :                 ntuples = 1;
     885            0 :                 nfields = 1;
     886            0 :         }
     887              :         else
     888              :         {
     889            0 :                 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     890              : 
     891            0 :                 is_sql_cmd = false;
     892              : 
     893              :                 /* get a tuple descriptor for our result type */
     894            0 :                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     895              :                 {
     896              :                         case TYPEFUNC_COMPOSITE:
     897              :                                 /* success */
     898              :                                 break;
     899              :                         case TYPEFUNC_RECORD:
     900              :                                 /* failed to determine actual type of RECORD */
     901            0 :                                 ereport(ERROR,
     902              :                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     903              :                                                  errmsg("function returning record called in context "
     904              :                                                                 "that cannot accept type record")));
     905            0 :                                 break;
     906              :                         default:
     907              :                                 /* result type isn't composite */
     908            0 :                                 elog(ERROR, "return type must be a row type");
     909            0 :                                 break;
     910              :                 }
     911              : 
     912              :                 /* make sure we have a persistent copy of the tupdesc */
     913            0 :                 tupdesc = CreateTupleDescCopy(tupdesc);
     914            0 :                 ntuples = PQntuples(res);
     915            0 :                 nfields = PQnfields(res);
     916              :         }
     917              : 
     918              :         /*
     919              :          * check result and tuple descriptor have the same number of columns
     920              :          */
     921            0 :         if (nfields != tupdesc->natts)
     922            0 :                 ereport(ERROR,
     923              :                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
     924              :                                  errmsg("remote query result rowtype does not match "
     925              :                                                 "the specified FROM clause rowtype")));
     926              : 
     927            0 :         if (ntuples > 0)
     928              :         {
     929            0 :                 AttInMetadata *attinmeta;
     930            0 :                 int                     nestlevel = -1;
     931            0 :                 Tuplestorestate *tupstore;
     932            0 :                 MemoryContext oldcontext;
     933            0 :                 int                     row;
     934            0 :                 char      **values;
     935              : 
     936            0 :                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
     937              : 
     938              :                 /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     939            0 :                 if (!is_sql_cmd)
     940            0 :                         nestlevel = applyRemoteGucs(conn);
     941              : 
     942            0 :                 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     943            0 :                 tupstore = tuplestore_begin_heap(true, false, work_mem);
     944            0 :                 rsinfo->setResult = tupstore;
     945            0 :                 rsinfo->setDesc = tupdesc;
     946            0 :                 MemoryContextSwitchTo(oldcontext);
     947              : 
     948            0 :                 values = palloc_array(char *, nfields);
     949              : 
     950              :                 /* put all tuples into the tuplestore */
     951            0 :                 for (row = 0; row < ntuples; row++)
     952              :                 {
     953            0 :                         HeapTuple       tuple;
     954              : 
     955            0 :                         if (!is_sql_cmd)
     956              :                         {
     957            0 :                                 int                     i;
     958              : 
     959            0 :                                 for (i = 0; i < nfields; i++)
     960              :                                 {
     961            0 :                                         if (PQgetisnull(res, row, i))
     962            0 :                                                 values[i] = NULL;
     963              :                                         else
     964            0 :                                                 values[i] = PQgetvalue(res, row, i);
     965            0 :                                 }
     966            0 :                         }
     967              :                         else
     968              :                         {
     969            0 :                                 values[0] = PQcmdStatus(res);
     970              :                         }
     971              : 
     972              :                         /* build the tuple and put it into the tuplestore. */
     973            0 :                         tuple = BuildTupleFromCStrings(attinmeta, values);
     974            0 :                         tuplestore_puttuple(tupstore, tuple);
     975            0 :                 }
     976              : 
     977              :                 /* clean up GUC settings, if we changed any */
     978            0 :                 restoreLocalGucs(nestlevel);
     979            0 :         }
     980              : 
     981            0 :         PQclear(res);
     982            0 : }
     983              : 
     984              : /*
     985              :  * Execute the given SQL command and store its results into a tuplestore
     986              :  * to be returned as the result of the current function.
     987              :  *
     988              :  * This is equivalent to PQexec followed by materializeResult, but we make
     989              :  * use of libpq's single-row mode to avoid accumulating the whole result
     990              :  * inside libpq before it gets transferred to the tuplestore.
     991              :  */
     992              : static void
     993            0 : materializeQueryResult(FunctionCallInfo fcinfo,
     994              :                                            PGconn *conn,
     995              :                                            const char *conname,
     996              :                                            const char *sql,
     997              :                                            bool fail)
     998              : {
     999            0 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1000              : 
    1001              :         /* prepTuplestoreResult must have been called previously */
    1002            0 :         Assert(rsinfo->returnMode == SFRM_Materialize);
    1003              : 
    1004              :         /* Use a PG_TRY block to ensure we pump libpq dry of results */
    1005            0 :         PG_TRY();
    1006              :         {
    1007            0 :                 storeInfo       sinfo = {0};
    1008            0 :                 PGresult   *res;
    1009              : 
    1010            0 :                 sinfo.fcinfo = fcinfo;
    1011              :                 /* Create short-lived memory context for data conversions */
    1012            0 :                 sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1013              :                                                                                                  "dblink temporary context",
    1014              :                                                                                                  ALLOCSET_DEFAULT_SIZES);
    1015              : 
    1016              :                 /* execute query, collecting any tuples into the tuplestore */
    1017            0 :                 res = storeQueryResult(&sinfo, conn, sql);
    1018              : 
    1019            0 :                 if (!res ||
    1020            0 :                         (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1021            0 :                          PQresultStatus(res) != PGRES_TUPLES_OK))
    1022              :                 {
    1023            0 :                         dblink_res_error(conn, conname, res, fail,
    1024              :                                                          "while executing query");
    1025              :                         /* if fail isn't set, we'll return an empty query result */
    1026            0 :                 }
    1027            0 :                 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1028              :                 {
    1029              :                         /*
    1030              :                          * storeRow didn't get called, so we need to convert the command
    1031              :                          * status string to a tuple manually
    1032              :                          */
    1033            0 :                         TupleDesc       tupdesc;
    1034            0 :                         AttInMetadata *attinmeta;
    1035            0 :                         Tuplestorestate *tupstore;
    1036            0 :                         HeapTuple       tuple;
    1037            0 :                         char       *values[1];
    1038            0 :                         MemoryContext oldcontext;
    1039              : 
    1040              :                         /*
    1041              :                          * need a tuple descriptor representing one TEXT column to return
    1042              :                          * the command status string as our result tuple
    1043              :                          */
    1044            0 :                         tupdesc = CreateTemplateTupleDesc(1);
    1045            0 :                         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1046              :                                                            TEXTOID, -1, 0);
    1047            0 :                         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1048              : 
    1049            0 :                         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1050            0 :                         tupstore = tuplestore_begin_heap(true, false, work_mem);
    1051            0 :                         rsinfo->setResult = tupstore;
    1052            0 :                         rsinfo->setDesc = tupdesc;
    1053            0 :                         MemoryContextSwitchTo(oldcontext);
    1054              : 
    1055            0 :                         values[0] = PQcmdStatus(res);
    1056              : 
    1057              :                         /* build the tuple and put it into the tuplestore. */
    1058            0 :                         tuple = BuildTupleFromCStrings(attinmeta, values);
    1059            0 :                         tuplestore_puttuple(tupstore, tuple);
    1060              : 
    1061            0 :                         PQclear(res);
    1062            0 :                 }
    1063              :                 else
    1064              :                 {
    1065            0 :                         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1066              :                         /* storeRow should have created a tuplestore */
    1067            0 :                         Assert(rsinfo->setResult != NULL);
    1068              : 
    1069            0 :                         PQclear(res);
    1070              :                 }
    1071              : 
    1072              :                 /* clean up data conversion short-lived memory context */
    1073            0 :                 if (sinfo.tmpcontext != NULL)
    1074            0 :                         MemoryContextDelete(sinfo.tmpcontext);
    1075              : 
    1076            0 :                 PQclear(sinfo.last_res);
    1077            0 :                 PQclear(sinfo.cur_res);
    1078            0 :         }
    1079            0 :         PG_CATCH();
    1080              :         {
    1081            0 :                 PGresult   *res;
    1082              : 
    1083              :                 /* be sure to clear out any pending data in libpq */
    1084            0 :                 while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
    1085              :                            NULL)
    1086            0 :                         PQclear(res);
    1087            0 :                 PG_RE_THROW();
    1088              :         }
    1089            0 :         PG_END_TRY();
    1090            0 : }
    1091              : 
    1092              : /*
    1093              :  * Execute query, and send any result rows to sinfo->tuplestore.
    1094              :  */
    1095              : static PGresult *
    1096            0 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
    1097              : {
    1098            0 :         bool            first = true;
    1099            0 :         int                     nestlevel = -1;
    1100            0 :         PGresult   *res;
    1101              : 
    1102            0 :         if (!PQsendQuery(conn, sql))
    1103            0 :                 elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1104              : 
    1105            0 :         if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1106            0 :                 elog(ERROR, "failed to set single-row mode for dblink query");
    1107              : 
    1108            0 :         for (;;)
    1109              :         {
    1110            0 :                 CHECK_FOR_INTERRUPTS();
    1111              : 
    1112            0 :                 sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
    1113            0 :                 if (!sinfo->cur_res)
    1114            0 :                         break;
    1115              : 
    1116            0 :                 if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1117              :                 {
    1118              :                         /* got one row from possibly-bigger resultset */
    1119              : 
    1120              :                         /*
    1121              :                          * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1122              :                          * We shouldn't do this until we have a row in hand, to ensure
    1123              :                          * libpq has seen any earlier ParameterStatus protocol messages.
    1124              :                          */
    1125            0 :                         if (first && nestlevel < 0)
    1126            0 :                                 nestlevel = applyRemoteGucs(conn);
    1127              : 
    1128            0 :                         storeRow(sinfo, sinfo->cur_res, first);
    1129              : 
    1130            0 :                         PQclear(sinfo->cur_res);
    1131            0 :                         sinfo->cur_res = NULL;
    1132            0 :                         first = false;
    1133            0 :                 }
    1134              :                 else
    1135              :                 {
    1136              :                         /* if empty resultset, fill tuplestore header */
    1137            0 :                         if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1138            0 :                                 storeRow(sinfo, sinfo->cur_res, first);
    1139              : 
    1140              :                         /* store completed result at last_res */
    1141            0 :                         PQclear(sinfo->last_res);
    1142            0 :                         sinfo->last_res = sinfo->cur_res;
    1143            0 :                         sinfo->cur_res = NULL;
    1144            0 :                         first = true;
    1145              :                 }
    1146              :         }
    1147              : 
    1148              :         /* clean up GUC settings, if we changed any */
    1149            0 :         restoreLocalGucs(nestlevel);
    1150              : 
    1151              :         /* return last_res */
    1152            0 :         res = sinfo->last_res;
    1153            0 :         sinfo->last_res = NULL;
    1154            0 :         return res;
    1155            0 : }
    1156              : 
    1157              : /*
    1158              :  * Send single row to sinfo->tuplestore.
    1159              :  *
    1160              :  * If "first" is true, create the tuplestore using PGresult's metadata
    1161              :  * (in this case the PGresult might contain either zero or one row).
    1162              :  */
    1163              : static void
    1164            0 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
    1165              : {
    1166            0 :         int                     nfields = PQnfields(res);
    1167            0 :         HeapTuple       tuple;
    1168            0 :         int                     i;
    1169            0 :         MemoryContext oldcontext;
    1170              : 
    1171            0 :         if (first)
    1172              :         {
    1173              :                 /* Prepare for new result set */
    1174            0 :                 ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1175            0 :                 TupleDesc       tupdesc;
    1176              : 
    1177              :                 /*
    1178              :                  * It's possible to get more than one result set if the query string
    1179              :                  * contained multiple SQL commands.  In that case, we follow PQexec's
    1180              :                  * traditional behavior of throwing away all but the last result.
    1181              :                  */
    1182            0 :                 if (sinfo->tuplestore)
    1183            0 :                         tuplestore_end(sinfo->tuplestore);
    1184            0 :                 sinfo->tuplestore = NULL;
    1185              : 
    1186              :                 /* get a tuple descriptor for our result type */
    1187            0 :                 switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1188              :                 {
    1189              :                         case TYPEFUNC_COMPOSITE:
    1190              :                                 /* success */
    1191              :                                 break;
    1192              :                         case TYPEFUNC_RECORD:
    1193              :                                 /* failed to determine actual type of RECORD */
    1194            0 :                                 ereport(ERROR,
    1195              :                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1196              :                                                  errmsg("function returning record called in context "
    1197              :                                                                 "that cannot accept type record")));
    1198            0 :                                 break;
    1199              :                         default:
    1200              :                                 /* result type isn't composite */
    1201            0 :                                 elog(ERROR, "return type must be a row type");
    1202            0 :                                 break;
    1203              :                 }
    1204              : 
    1205              :                 /* make sure we have a persistent copy of the tupdesc */
    1206            0 :                 tupdesc = CreateTupleDescCopy(tupdesc);
    1207              : 
    1208              :                 /* check result and tuple descriptor have the same number of columns */
    1209            0 :                 if (nfields != tupdesc->natts)
    1210            0 :                         ereport(ERROR,
    1211              :                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
    1212              :                                          errmsg("remote query result rowtype does not match "
    1213              :                                                         "the specified FROM clause rowtype")));
    1214              : 
    1215              :                 /* Prepare attinmeta for later data conversions */
    1216            0 :                 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1217              : 
    1218              :                 /* Create a new, empty tuplestore */
    1219            0 :                 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1220            0 :                 sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1221            0 :                 rsinfo->setResult = sinfo->tuplestore;
    1222            0 :                 rsinfo->setDesc = tupdesc;
    1223            0 :                 MemoryContextSwitchTo(oldcontext);
    1224              : 
    1225              :                 /* Done if empty resultset */
    1226            0 :                 if (PQntuples(res) == 0)
    1227            0 :                         return;
    1228              : 
    1229              :                 /*
    1230              :                  * Set up sufficiently-wide string pointers array; this won't change
    1231              :                  * in size so it's easy to preallocate.
    1232              :                  */
    1233            0 :                 if (sinfo->cstrs)
    1234            0 :                         pfree(sinfo->cstrs);
    1235            0 :                 sinfo->cstrs = palloc_array(char *, nfields);
    1236            0 :         }
    1237              : 
    1238              :         /* Should have a single-row result if we get here */
    1239            0 :         Assert(PQntuples(res) == 1);
    1240              : 
    1241              :         /*
    1242              :          * Do the following work in a temp context that we reset after each tuple.
    1243              :          * This cleans up not only the data we have direct access to, but any
    1244              :          * cruft the I/O functions might leak.
    1245              :          */
    1246            0 :         oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1247              : 
    1248              :         /*
    1249              :          * Fill cstrs with null-terminated strings of column values.
    1250              :          */
    1251            0 :         for (i = 0; i < nfields; i++)
    1252              :         {
    1253            0 :                 if (PQgetisnull(res, 0, i))
    1254            0 :                         sinfo->cstrs[i] = NULL;
    1255              :                 else
    1256            0 :                         sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1257            0 :         }
    1258              : 
    1259              :         /* Convert row to a tuple, and add it to the tuplestore */
    1260            0 :         tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1261              : 
    1262            0 :         tuplestore_puttuple(sinfo->tuplestore, tuple);
    1263              : 
    1264              :         /* Clean up */
    1265            0 :         MemoryContextSwitchTo(oldcontext);
    1266            0 :         MemoryContextReset(sinfo->tmpcontext);
    1267            0 : }
    1268              : 
    1269              : /*
    1270              :  * List all open dblink connections by name.
    1271              :  * Returns an array of all connection names.
    1272              :  * Takes no params
    1273              :  */
    1274            0 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1275              : Datum
    1276            0 : dblink_get_connections(PG_FUNCTION_ARGS)
    1277              : {
    1278            0 :         HASH_SEQ_STATUS status;
    1279            0 :         remoteConnHashEnt *hentry;
    1280            0 :         ArrayBuildState *astate = NULL;
    1281              : 
    1282            0 :         if (remoteConnHash)
    1283              :         {
    1284            0 :                 hash_seq_init(&status, remoteConnHash);
    1285            0 :                 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1286              :                 {
    1287              :                         /* ignore it if it's not an open connection */
    1288            0 :                         if (hentry->rconn.conn == NULL)
    1289            0 :                                 continue;
    1290              :                         /* stash away current value */
    1291            0 :                         astate = accumArrayResult(astate,
    1292            0 :                                                                           CStringGetTextDatum(hentry->name),
    1293            0 :                                                                           false, TEXTOID, CurrentMemoryContext);
    1294              :                 }
    1295            0 :         }
    1296              : 
    1297            0 :         if (astate)
    1298            0 :                 PG_RETURN_DATUM(makeArrayResult(astate,
    1299              :                                                                                 CurrentMemoryContext));
    1300              :         else
    1301            0 :                 PG_RETURN_NULL();
    1302            0 : }
    1303              : 
    1304              : /*
    1305              :  * Checks if a given remote connection is busy
    1306              :  *
    1307              :  * Returns 1 if the connection is busy, 0 otherwise
    1308              :  * Params:
    1309              :  *      text connection_name - name of the connection to check
    1310              :  *
    1311              :  */
    1312            0 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1313              : Datum
    1314            0 : dblink_is_busy(PG_FUNCTION_ARGS)
    1315              : {
    1316            0 :         PGconn     *conn;
    1317              : 
    1318            0 :         dblink_init();
    1319            0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1320              : 
    1321            0 :         PQconsumeInput(conn);
    1322            0 :         PG_RETURN_INT32(PQisBusy(conn));
    1323            0 : }
    1324              : 
    1325              : /*
    1326              :  * Cancels a running request on a connection
    1327              :  *
    1328              :  * Returns text:
    1329              :  *      "OK" if the cancel request has been sent correctly,
    1330              :  *              an error message otherwise
    1331              :  *
    1332              :  * Params:
    1333              :  *      text connection_name - name of the connection to check
    1334              :  *
    1335              :  */
    1336            0 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1337              : Datum
    1338            0 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1339              : {
    1340            0 :         PGconn     *conn;
    1341            0 :         const char *msg;
    1342            0 :         TimestampTz endtime;
    1343              : 
    1344            0 :         dblink_init();
    1345            0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1346            0 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1347              :                                                                                   30000);
    1348            0 :         msg = libpqsrv_cancel(conn, endtime);
    1349            0 :         if (msg == NULL)
    1350            0 :                 msg = "OK";
    1351              : 
    1352            0 :         PG_RETURN_TEXT_P(cstring_to_text(msg));
    1353            0 : }
    1354              : 
    1355              : 
    1356              : /*
    1357              :  * Get error message from a connection
    1358              :  *
    1359              :  * Returns text:
    1360              :  *      "OK" if no error, an error message otherwise
    1361              :  *
    1362              :  * Params:
    1363              :  *      text connection_name - name of the connection to check
    1364              :  *
    1365              :  */
    1366            0 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1367              : Datum
    1368            0 : dblink_error_message(PG_FUNCTION_ARGS)
    1369              : {
    1370            0 :         char       *msg;
    1371            0 :         PGconn     *conn;
    1372              : 
    1373            0 :         dblink_init();
    1374            0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1375              : 
    1376            0 :         msg = PQerrorMessage(conn);
    1377            0 :         if (msg == NULL || msg[0] == '\0')
    1378            0 :                 PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1379              :         else
    1380            0 :                 PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1381            0 : }
    1382              : 
    1383              : /*
    1384              :  * Execute an SQL non-SELECT command
    1385              :  */
    1386            0 : PG_FUNCTION_INFO_V1(dblink_exec);
    1387              : Datum
    1388            0 : dblink_exec(PG_FUNCTION_ARGS)
    1389              : {
    1390            0 :         text       *volatile sql_cmd_status = NULL;
    1391            0 :         PGconn     *volatile conn = NULL;
    1392            0 :         volatile bool freeconn = false;
    1393              : 
    1394            0 :         dblink_init();
    1395              : 
    1396            0 :         PG_TRY();
    1397              :         {
    1398            0 :                 PGresult   *res = NULL;
    1399            0 :                 char       *sql = NULL;
    1400            0 :                 char       *conname = NULL;
    1401            0 :                 bool            fail = true;    /* default to backward compatible behavior */
    1402              : 
    1403            0 :                 if (PG_NARGS() == 3)
    1404              :                 {
    1405              :                         /* must be text,text,bool */
    1406            0 :                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1407            0 :                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1408            0 :                         fail = PG_GETARG_BOOL(2);
    1409            0 :                         dblink_get_conn(conname, &conn, &conname, &freeconn);
    1410            0 :                 }
    1411            0 :                 else if (PG_NARGS() == 2)
    1412              :                 {
    1413              :                         /* might be text,text or text,bool */
    1414            0 :                         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1415              :                         {
    1416            0 :                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1417            0 :                                 fail = PG_GETARG_BOOL(1);
    1418            0 :                                 conn = pconn->conn;
    1419            0 :                         }
    1420              :                         else
    1421              :                         {
    1422            0 :                                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1423            0 :                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1424            0 :                                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1425              :                         }
    1426            0 :                 }
    1427            0 :                 else if (PG_NARGS() == 1)
    1428              :                 {
    1429              :                         /* must be single text argument */
    1430            0 :                         conn = pconn->conn;
    1431            0 :                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1432            0 :                 }
    1433              :                 else
    1434              :                         /* shouldn't happen */
    1435            0 :                         elog(ERROR, "wrong number of arguments");
    1436              : 
    1437            0 :                 if (!conn)
    1438            0 :                         dblink_conn_not_avail(conname);
    1439              : 
    1440            0 :                 res = libpqsrv_exec(conn, sql, dblink_we_get_result);
    1441            0 :                 if (!res ||
    1442            0 :                         (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1443            0 :                          PQresultStatus(res) != PGRES_TUPLES_OK))
    1444              :                 {
    1445            0 :                         dblink_res_error(conn, conname, res, fail,
    1446              :                                                          "while executing command");
    1447              : 
    1448              :                         /*
    1449              :                          * and save a copy of the command status string to return as our
    1450              :                          * result tuple
    1451              :                          */
    1452            0 :                         sql_cmd_status = cstring_to_text("ERROR");
    1453            0 :                 }
    1454            0 :                 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1455              :                 {
    1456              :                         /*
    1457              :                          * and save a copy of the command status string to return as our
    1458              :                          * result tuple
    1459              :                          */
    1460            0 :                         sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1461            0 :                         PQclear(res);
    1462            0 :                 }
    1463              :                 else
    1464              :                 {
    1465            0 :                         PQclear(res);
    1466            0 :                         ereport(ERROR,
    1467              :                                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1468              :                                          errmsg("statement returning results not allowed")));
    1469              :                 }
    1470            0 :         }
    1471            0 :         PG_FINALLY();
    1472              :         {
    1473              :                 /* if needed, close the connection to the database */
    1474            0 :                 if (freeconn)
    1475            0 :                         libpqsrv_disconnect(conn);
    1476              :         }
    1477            0 :         PG_END_TRY();
    1478              : 
    1479            0 :         PG_RETURN_TEXT_P(sql_cmd_status);
    1480            0 : }
    1481              : 
    1482              : 
    1483              : /*
    1484              :  * dblink_get_pkey
    1485              :  *
    1486              :  * Return list of primary key fields for the supplied relation,
    1487              :  * or NULL if none exists.
    1488              :  */
    1489            0 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1490              : Datum
    1491            0 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1492              : {
    1493            0 :         int16           indnkeyatts;
    1494            0 :         char      **results;
    1495            0 :         FuncCallContext *funcctx;
    1496            0 :         int32           call_cntr;
    1497            0 :         int32           max_calls;
    1498            0 :         AttInMetadata *attinmeta;
    1499            0 :         MemoryContext oldcontext;
    1500              : 
    1501              :         /* stuff done only on the first call of the function */
    1502            0 :         if (SRF_IS_FIRSTCALL())
    1503              :         {
    1504            0 :                 Relation        rel;
    1505            0 :                 TupleDesc       tupdesc;
    1506              : 
    1507              :                 /* create a function context for cross-call persistence */
    1508            0 :                 funcctx = SRF_FIRSTCALL_INIT();
    1509              : 
    1510              :                 /*
    1511              :                  * switch to memory context appropriate for multiple function calls
    1512              :                  */
    1513            0 :                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1514              : 
    1515              :                 /* open target relation */
    1516            0 :                 rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1517              : 
    1518              :                 /* get the array of attnums */
    1519            0 :                 results = get_pkey_attnames(rel, &indnkeyatts);
    1520              : 
    1521            0 :                 relation_close(rel, AccessShareLock);
    1522              : 
    1523              :                 /*
    1524              :                  * need a tuple descriptor representing one INT and one TEXT column
    1525              :                  */
    1526            0 :                 tupdesc = CreateTemplateTupleDesc(2);
    1527            0 :                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1528              :                                                    INT4OID, -1, 0);
    1529            0 :                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1530              :                                                    TEXTOID, -1, 0);
    1531              : 
    1532              :                 /*
    1533              :                  * Generate attribute metadata needed later to produce tuples from raw
    1534              :                  * C strings
    1535              :                  */
    1536            0 :                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1537            0 :                 funcctx->attinmeta = attinmeta;
    1538              : 
    1539            0 :                 if ((results != NULL) && (indnkeyatts > 0))
    1540              :                 {
    1541            0 :                         funcctx->max_calls = indnkeyatts;
    1542              : 
    1543              :                         /* got results, keep track of them */
    1544            0 :                         funcctx->user_fctx = results;
    1545            0 :                 }
    1546              :                 else
    1547              :                 {
    1548              :                         /* fast track when no results */
    1549            0 :                         MemoryContextSwitchTo(oldcontext);
    1550            0 :                         SRF_RETURN_DONE(funcctx);
    1551              :                 }
    1552              : 
    1553            0 :                 MemoryContextSwitchTo(oldcontext);
    1554            0 :         }
    1555              : 
    1556              :         /* stuff done on every call of the function */
    1557            0 :         funcctx = SRF_PERCALL_SETUP();
    1558              : 
    1559              :         /*
    1560              :          * initialize per-call variables
    1561              :          */
    1562            0 :         call_cntr = funcctx->call_cntr;
    1563            0 :         max_calls = funcctx->max_calls;
    1564              : 
    1565            0 :         results = (char **) funcctx->user_fctx;
    1566            0 :         attinmeta = funcctx->attinmeta;
    1567              : 
    1568            0 :         if (call_cntr < max_calls)   /* do when there is more left to send */
    1569              :         {
    1570            0 :                 char      **values;
    1571            0 :                 HeapTuple       tuple;
    1572            0 :                 Datum           result;
    1573              : 
    1574            0 :                 values = palloc_array(char *, 2);
    1575            0 :                 values[0] = psprintf("%d", call_cntr + 1);
    1576            0 :                 values[1] = results[call_cntr];
    1577              : 
    1578              :                 /* build the tuple */
    1579            0 :                 tuple = BuildTupleFromCStrings(attinmeta, values);
    1580              : 
    1581              :                 /* make the tuple into a datum */
    1582            0 :                 result = HeapTupleGetDatum(tuple);
    1583              : 
    1584            0 :                 SRF_RETURN_NEXT(funcctx, result);
    1585            0 :         }
    1586              :         else
    1587              :         {
    1588              :                 /* do when there is no more left */
    1589            0 :                 SRF_RETURN_DONE(funcctx);
    1590              :         }
    1591            0 : }
    1592              : 
    1593              : 
    1594              : /*
    1595              :  * dblink_build_sql_insert
    1596              :  *
    1597              :  * Used to generate an SQL insert statement
    1598              :  * based on an existing tuple in a local relation.
    1599              :  * This is useful for selectively replicating data
    1600              :  * to another server via dblink.
    1601              :  *
    1602              :  * API:
    1603              :  * <relname> - name of local table of interest
    1604              :  * <pkattnums> - an int2vector of attnums which will be used
    1605              :  * to identify the local tuple of interest
    1606              :  * <pknumatts> - number of attnums in pkattnums
    1607              :  * <src_pkattvals_arry> - text array of key values which will be used
    1608              :  * to identify the local tuple of interest
    1609              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1610              :  * to build the string for execution remotely. These are substituted
    1611              :  * for their counterparts in src_pkattvals_arry
    1612              :  */
    1613            0 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1614              : Datum
    1615            0 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1616              : {
    1617            0 :         text       *relname_text = PG_GETARG_TEXT_PP(0);
    1618            0 :         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1619            0 :         int32           pknumatts_arg = PG_GETARG_INT32(2);
    1620            0 :         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1621            0 :         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1622            0 :         Relation        rel;
    1623            0 :         int                *pkattnums;
    1624            0 :         int                     pknumatts;
    1625            0 :         char      **src_pkattvals;
    1626            0 :         char      **tgt_pkattvals;
    1627            0 :         int                     src_nitems;
    1628            0 :         int                     tgt_nitems;
    1629            0 :         char       *sql;
    1630              : 
    1631              :         /*
    1632              :          * Open target relation.
    1633              :          */
    1634            0 :         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1635              : 
    1636              :         /*
    1637              :          * Process pkattnums argument.
    1638              :          */
    1639            0 :         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1640              :                                            &pkattnums, &pknumatts);
    1641              : 
    1642              :         /*
    1643              :          * Source array is made up of key values that will be used to locate the
    1644              :          * tuple of interest from the local system.
    1645              :          */
    1646            0 :         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1647              : 
    1648              :         /*
    1649              :          * There should be one source array key value for each key attnum
    1650              :          */
    1651            0 :         if (src_nitems != pknumatts)
    1652            0 :                 ereport(ERROR,
    1653              :                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1654              :                                  errmsg("source key array length must match number of key attributes")));
    1655              : 
    1656              :         /*
    1657              :          * Target array is made up of key values that will be used to build the
    1658              :          * SQL string for use on the remote system.
    1659              :          */
    1660            0 :         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1661              : 
    1662              :         /*
    1663              :          * There should be one target array key value for each key attnum
    1664              :          */
    1665            0 :         if (tgt_nitems != pknumatts)
    1666            0 :                 ereport(ERROR,
    1667              :                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1668              :                                  errmsg("target key array length must match number of key attributes")));
    1669              : 
    1670              :         /*
    1671              :          * Prep work is finally done. Go get the SQL string.
    1672              :          */
    1673            0 :         sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1674              : 
    1675              :         /*
    1676              :          * Now we can close the relation.
    1677              :          */
    1678            0 :         relation_close(rel, AccessShareLock);
    1679              : 
    1680              :         /*
    1681              :          * And send it
    1682              :          */
    1683            0 :         PG_RETURN_TEXT_P(cstring_to_text(sql));
    1684            0 : }
    1685              : 
    1686              : 
    1687              : /*
    1688              :  * dblink_build_sql_delete
    1689              :  *
    1690              :  * Used to generate an SQL delete statement.
    1691              :  * This is useful for selectively replicating a
    1692              :  * delete to another server via dblink.
    1693              :  *
    1694              :  * API:
    1695              :  * <relname> - name of remote table of interest
    1696              :  * <pkattnums> - an int2vector of attnums which will be used
    1697              :  * to identify the remote tuple of interest
    1698              :  * <pknumatts> - number of attnums in pkattnums
    1699              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1700              :  * to build the string for execution remotely.
    1701              :  */
    1702            0 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1703              : Datum
    1704            0 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1705              : {
    1706            0 :         text       *relname_text = PG_GETARG_TEXT_PP(0);
    1707            0 :         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1708            0 :         int32           pknumatts_arg = PG_GETARG_INT32(2);
    1709            0 :         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1710            0 :         Relation        rel;
    1711            0 :         int                *pkattnums;
    1712            0 :         int                     pknumatts;
    1713            0 :         char      **tgt_pkattvals;
    1714            0 :         int                     tgt_nitems;
    1715            0 :         char       *sql;
    1716              : 
    1717              :         /*
    1718              :          * Open target relation.
    1719              :          */
    1720            0 :         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1721              : 
    1722              :         /*
    1723              :          * Process pkattnums argument.
    1724              :          */
    1725            0 :         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1726              :                                            &pkattnums, &pknumatts);
    1727              : 
    1728              :         /*
    1729              :          * Target array is made up of key values that will be used to build the
    1730              :          * SQL string for use on the remote system.
    1731              :          */
    1732            0 :         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1733              : 
    1734              :         /*
    1735              :          * There should be one target array key value for each key attnum
    1736              :          */
    1737            0 :         if (tgt_nitems != pknumatts)
    1738            0 :                 ereport(ERROR,
    1739              :                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1740              :                                  errmsg("target key array length must match number of key attributes")));
    1741              : 
    1742              :         /*
    1743              :          * Prep work is finally done. Go get the SQL string.
    1744              :          */
    1745            0 :         sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1746              : 
    1747              :         /*
    1748              :          * Now we can close the relation.
    1749              :          */
    1750            0 :         relation_close(rel, AccessShareLock);
    1751              : 
    1752              :         /*
    1753              :          * And send it
    1754              :          */
    1755            0 :         PG_RETURN_TEXT_P(cstring_to_text(sql));
    1756            0 : }
    1757              : 
    1758              : 
    1759              : /*
    1760              :  * dblink_build_sql_update
    1761              :  *
    1762              :  * Used to generate an SQL update statement
    1763              :  * based on an existing tuple in a local relation.
    1764              :  * This is useful for selectively replicating data
    1765              :  * to another server via dblink.
    1766              :  *
    1767              :  * API:
    1768              :  * <relname> - name of local table of interest
    1769              :  * <pkattnums> - an int2vector of attnums which will be used
    1770              :  * to identify the local tuple of interest
    1771              :  * <pknumatts> - number of attnums in pkattnums
    1772              :  * <src_pkattvals_arry> - text array of key values which will be used
    1773              :  * to identify the local tuple of interest
    1774              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1775              :  * to build the string for execution remotely. These are substituted
    1776              :  * for their counterparts in src_pkattvals_arry
    1777              :  */
    1778            0 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1779              : Datum
    1780            0 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1781              : {
    1782            0 :         text       *relname_text = PG_GETARG_TEXT_PP(0);
    1783            0 :         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1784            0 :         int32           pknumatts_arg = PG_GETARG_INT32(2);
    1785            0 :         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1786            0 :         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1787            0 :         Relation        rel;
    1788            0 :         int                *pkattnums;
    1789            0 :         int                     pknumatts;
    1790            0 :         char      **src_pkattvals;
    1791            0 :         char      **tgt_pkattvals;
    1792            0 :         int                     src_nitems;
    1793            0 :         int                     tgt_nitems;
    1794            0 :         char       *sql;
    1795              : 
    1796              :         /*
    1797              :          * Open target relation.
    1798              :          */
    1799            0 :         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1800              : 
    1801              :         /*
    1802              :          * Process pkattnums argument.
    1803              :          */
    1804            0 :         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1805              :                                            &pkattnums, &pknumatts);
    1806              : 
    1807              :         /*
    1808              :          * Source array is made up of key values that will be used to locate the
    1809              :          * tuple of interest from the local system.
    1810              :          */
    1811            0 :         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1812              : 
    1813              :         /*
    1814              :          * There should be one source array key value for each key attnum
    1815              :          */
    1816            0 :         if (src_nitems != pknumatts)
    1817            0 :                 ereport(ERROR,
    1818              :                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1819              :                                  errmsg("source key array length must match number of key attributes")));
    1820              : 
    1821              :         /*
    1822              :          * Target array is made up of key values that will be used to build the
    1823              :          * SQL string for use on the remote system.
    1824              :          */
    1825            0 :         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1826              : 
    1827              :         /*
    1828              :          * There should be one target array key value for each key attnum
    1829              :          */
    1830            0 :         if (tgt_nitems != pknumatts)
    1831            0 :                 ereport(ERROR,
    1832              :                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1833              :                                  errmsg("target key array length must match number of key attributes")));
    1834              : 
    1835              :         /*
    1836              :          * Prep work is finally done. Go get the SQL string.
    1837              :          */
    1838            0 :         sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1839              : 
    1840              :         /*
    1841              :          * Now we can close the relation.
    1842              :          */
    1843            0 :         relation_close(rel, AccessShareLock);
    1844              : 
    1845              :         /*
    1846              :          * And send it
    1847              :          */
    1848            0 :         PG_RETURN_TEXT_P(cstring_to_text(sql));
    1849            0 : }
    1850              : 
    1851              : /*
    1852              :  * dblink_current_query
    1853              :  * return the current query string
    1854              :  * to allow its use in (among other things)
    1855              :  * rewrite rules
    1856              :  */
    1857            0 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1858              : Datum
    1859            0 : dblink_current_query(PG_FUNCTION_ARGS)
    1860              : {
    1861              :         /* This is now just an alias for the built-in function current_query() */
    1862            0 :         PG_RETURN_DATUM(current_query(fcinfo));
    1863              : }
    1864              : 
    1865              : /*
    1866              :  * Retrieve async notifications for a connection.
    1867              :  *
    1868              :  * Returns a setof record of notifications, or an empty set if none received.
    1869              :  * Can optionally take a named connection as parameter, but uses the unnamed
    1870              :  * connection per default.
    1871              :  *
    1872              :  */
    1873              : #define DBLINK_NOTIFY_COLS              3
    1874              : 
    1875            0 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1876              : Datum
    1877            0 : dblink_get_notify(PG_FUNCTION_ARGS)
    1878              : {
    1879            0 :         PGconn     *conn;
    1880            0 :         PGnotify   *notify;
    1881            0 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1882              : 
    1883            0 :         dblink_init();
    1884            0 :         if (PG_NARGS() == 1)
    1885            0 :                 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1886              :         else
    1887            0 :                 conn = pconn->conn;
    1888              : 
    1889            0 :         InitMaterializedSRF(fcinfo, 0);
    1890              : 
    1891            0 :         PQconsumeInput(conn);
    1892            0 :         while ((notify = PQnotifies(conn)) != NULL)
    1893              :         {
    1894            0 :                 Datum           values[DBLINK_NOTIFY_COLS];
    1895            0 :                 bool            nulls[DBLINK_NOTIFY_COLS];
    1896              : 
    1897            0 :                 memset(values, 0, sizeof(values));
    1898            0 :                 memset(nulls, 0, sizeof(nulls));
    1899              : 
    1900            0 :                 if (notify->relname != NULL)
    1901            0 :                         values[0] = CStringGetTextDatum(notify->relname);
    1902              :                 else
    1903            0 :                         nulls[0] = true;
    1904              : 
    1905            0 :                 values[1] = Int32GetDatum(notify->be_pid);
    1906              : 
    1907            0 :                 if (notify->extra != NULL)
    1908            0 :                         values[2] = CStringGetTextDatum(notify->extra);
    1909              :                 else
    1910            0 :                         nulls[2] = true;
    1911              : 
    1912            0 :                 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1913              : 
    1914            0 :                 PQfreemem(notify);
    1915            0 :                 PQconsumeInput(conn);
    1916            0 :         }
    1917              : 
    1918            0 :         return (Datum) 0;
    1919            0 : }
    1920              : 
    1921              : /*
    1922              :  * Validate the options given to a dblink foreign server or user mapping.
    1923              :  * Raise an error if any option is invalid.
    1924              :  *
    1925              :  * We just check the names of options here, so semantic errors in options,
    1926              :  * such as invalid numeric format, will be detected at the attempt to connect.
    1927              :  */
    1928            0 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1929              : Datum
    1930            0 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1931              : {
    1932            0 :         List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1933            0 :         Oid                     context = PG_GETARG_OID(1);
    1934            0 :         ListCell   *cell;
    1935              : 
    1936              :         static const PQconninfoOption *options = NULL;
    1937              : 
    1938              :         /*
    1939              :          * Get list of valid libpq options.
    1940              :          *
    1941              :          * To avoid unnecessary work, we get the list once and use it throughout
    1942              :          * the lifetime of this backend process.  We don't need to care about
    1943              :          * memory context issues, because PQconndefaults allocates with malloc.
    1944              :          */
    1945            0 :         if (!options)
    1946              :         {
    1947            0 :                 options = PQconndefaults();
    1948            0 :                 if (!options)                   /* assume reason for failure is OOM */
    1949            0 :                         ereport(ERROR,
    1950              :                                         (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1951              :                                          errmsg("out of memory"),
    1952              :                                          errdetail("Could not get libpq's default connection options.")));
    1953            0 :         }
    1954              : 
    1955              :         /* Validate each supplied option. */
    1956            0 :         foreach(cell, options_list)
    1957              :         {
    1958            0 :                 DefElem    *def = (DefElem *) lfirst(cell);
    1959              : 
    1960            0 :                 if (!is_valid_dblink_fdw_option(options, def->defname, context))
    1961              :                 {
    1962              :                         /*
    1963              :                          * Unknown option, or invalid option for the context specified, so
    1964              :                          * complain about it.  Provide a hint with a valid option that
    1965              :                          * looks similar, if there is one.
    1966              :                          */
    1967            0 :                         const PQconninfoOption *opt;
    1968            0 :                         const char *closest_match;
    1969            0 :                         ClosestMatchState match_state;
    1970            0 :                         bool            has_valid_options = false;
    1971              : 
    1972            0 :                         initClosestMatch(&match_state, def->defname, 4);
    1973            0 :                         for (opt = options; opt->keyword; opt++)
    1974              :                         {
    1975            0 :                                 if (is_valid_dblink_option(options, opt->keyword, context))
    1976              :                                 {
    1977            0 :                                         has_valid_options = true;
    1978            0 :                                         updateClosestMatch(&match_state, opt->keyword);
    1979            0 :                                 }
    1980            0 :                         }
    1981              : 
    1982            0 :                         closest_match = getClosestMatch(&match_state);
    1983            0 :                         ereport(ERROR,
    1984              :                                         (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    1985              :                                          errmsg("invalid option \"%s\"", def->defname),
    1986              :                                          has_valid_options ? closest_match ?
    1987              :                                          errhint("Perhaps you meant the option \"%s\".",
    1988              :                                                          closest_match) : 0 :
    1989              :                                          errhint("There are no valid options in this context.")));
    1990            0 :                 }
    1991            0 :         }
    1992              : 
    1993            0 :         PG_RETURN_VOID();
    1994            0 : }
    1995              : 
    1996              : 
    1997              : /*************************************************************
    1998              :  * internal functions
    1999              :  */
    2000              : 
    2001              : 
    2002              : /*
    2003              :  * get_pkey_attnames
    2004              :  *
    2005              :  * Get the primary key attnames for the given relation.
    2006              :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2007              :  */
    2008              : static char **
    2009            0 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2010              : {
    2011            0 :         Relation        indexRelation;
    2012            0 :         ScanKeyData skey;
    2013            0 :         SysScanDesc scan;
    2014            0 :         HeapTuple       indexTuple;
    2015            0 :         int                     i;
    2016            0 :         char      **result = NULL;
    2017            0 :         TupleDesc       tupdesc;
    2018              : 
    2019              :         /* initialize indnkeyatts to 0 in case no primary key exists */
    2020            0 :         *indnkeyatts = 0;
    2021              : 
    2022            0 :         tupdesc = rel->rd_att;
    2023              : 
    2024              :         /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2025            0 :         indexRelation = table_open(IndexRelationId, AccessShareLock);
    2026            0 :         ScanKeyInit(&skey,
    2027              :                                 Anum_pg_index_indrelid,
    2028              :                                 BTEqualStrategyNumber, F_OIDEQ,
    2029            0 :                                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2030              : 
    2031            0 :         scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2032              :                                                           NULL, 1, &skey);
    2033              : 
    2034            0 :         while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2035              :         {
    2036            0 :                 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2037              : 
    2038              :                 /* we're only interested if it is the primary key */
    2039            0 :                 if (index->indisprimary)
    2040              :                 {
    2041            0 :                         *indnkeyatts = index->indnkeyatts;
    2042            0 :                         if (*indnkeyatts > 0)
    2043              :                         {
    2044            0 :                                 result = palloc_array(char *, *indnkeyatts);
    2045              : 
    2046            0 :                                 for (i = 0; i < *indnkeyatts; i++)
    2047            0 :                                         result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2048            0 :                         }
    2049            0 :                         break;
    2050              :                 }
    2051            0 :         }
    2052              : 
    2053            0 :         systable_endscan(scan);
    2054            0 :         table_close(indexRelation, AccessShareLock);
    2055              : 
    2056            0 :         return result;
    2057            0 : }
    2058              : 
    2059              : /*
    2060              :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2061              :  * returned as NULL pointers)
    2062              :  */
    2063              : static char **
    2064            0 : get_text_array_contents(ArrayType *array, int *numitems)
    2065              : {
    2066            0 :         int                     ndim = ARR_NDIM(array);
    2067            0 :         int                *dims = ARR_DIMS(array);
    2068            0 :         int                     nitems;
    2069            0 :         int16           typlen;
    2070            0 :         bool            typbyval;
    2071            0 :         char            typalign;
    2072            0 :         char      **values;
    2073            0 :         char       *ptr;
    2074            0 :         bits8      *bitmap;
    2075            0 :         int                     bitmask;
    2076            0 :         int                     i;
    2077              : 
    2078            0 :         Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2079              : 
    2080            0 :         *numitems = nitems = ArrayGetNItems(ndim, dims);
    2081              : 
    2082            0 :         get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2083              :                                                  &typlen, &typbyval, &typalign);
    2084              : 
    2085            0 :         values = palloc_array(char *, nitems);
    2086              : 
    2087            0 :         ptr = ARR_DATA_PTR(array);
    2088            0 :         bitmap = ARR_NULLBITMAP(array);
    2089            0 :         bitmask = 1;
    2090              : 
    2091            0 :         for (i = 0; i < nitems; i++)
    2092              :         {
    2093            0 :                 if (bitmap && (*bitmap & bitmask) == 0)
    2094              :                 {
    2095            0 :                         values[i] = NULL;
    2096            0 :                 }
    2097              :                 else
    2098              :                 {
    2099            0 :                         values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2100            0 :                         ptr = att_addlength_pointer(ptr, typlen, ptr);
    2101            0 :                         ptr = (char *) att_align_nominal(ptr, typalign);
    2102              :                 }
    2103              : 
    2104              :                 /* advance bitmap pointer if any */
    2105            0 :                 if (bitmap)
    2106              :                 {
    2107            0 :                         bitmask <<= 1;
    2108            0 :                         if (bitmask == 0x100)
    2109              :                         {
    2110            0 :                                 bitmap++;
    2111            0 :                                 bitmask = 1;
    2112            0 :                         }
    2113            0 :                 }
    2114            0 :         }
    2115              : 
    2116            0 :         return values;
    2117            0 : }
    2118              : 
    2119              : static char *
    2120            0 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2121              : {
    2122            0 :         char       *relname;
    2123            0 :         HeapTuple       tuple;
    2124            0 :         TupleDesc       tupdesc;
    2125            0 :         int                     natts;
    2126            0 :         StringInfoData buf;
    2127            0 :         char       *val;
    2128            0 :         int                     key;
    2129            0 :         int                     i;
    2130            0 :         bool            needComma;
    2131              : 
    2132            0 :         initStringInfo(&buf);
    2133              : 
    2134              :         /* get relation name including any needed schema prefix and quoting */
    2135            0 :         relname = generate_relation_name(rel);
    2136              : 
    2137            0 :         tupdesc = rel->rd_att;
    2138            0 :         natts = tupdesc->natts;
    2139              : 
    2140            0 :         tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2141            0 :         if (!tuple)
    2142            0 :                 ereport(ERROR,
    2143              :                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2144              :                                  errmsg("source row not found")));
    2145              : 
    2146            0 :         appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2147              : 
    2148            0 :         needComma = false;
    2149            0 :         for (i = 0; i < natts; i++)
    2150              :         {
    2151            0 :                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2152              : 
    2153            0 :                 if (att->attisdropped)
    2154            0 :                         continue;
    2155              : 
    2156            0 :                 if (needComma)
    2157            0 :                         appendStringInfoChar(&buf, ',');
    2158              : 
    2159            0 :                 appendStringInfoString(&buf,
    2160            0 :                                                            quote_ident_cstr(NameStr(att->attname)));
    2161            0 :                 needComma = true;
    2162            0 :         }
    2163              : 
    2164            0 :         appendStringInfoString(&buf, ") VALUES(");
    2165              : 
    2166              :         /*
    2167              :          * Note: i is physical column number (counting from 0).
    2168              :          */
    2169            0 :         needComma = false;
    2170            0 :         for (i = 0; i < natts; i++)
    2171              :         {
    2172            0 :                 if (TupleDescAttr(tupdesc, i)->attisdropped)
    2173            0 :                         continue;
    2174              : 
    2175            0 :                 if (needComma)
    2176            0 :                         appendStringInfoChar(&buf, ',');
    2177              : 
    2178            0 :                 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2179              : 
    2180            0 :                 if (key >= 0)
    2181            0 :                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2182              :                 else
    2183            0 :                         val = SPI_getvalue(tuple, tupdesc, i + 1);
    2184              : 
    2185            0 :                 if (val != NULL)
    2186              :                 {
    2187            0 :                         appendStringInfoString(&buf, quote_literal_cstr(val));
    2188            0 :                         pfree(val);
    2189            0 :                 }
    2190              :                 else
    2191            0 :                         appendStringInfoString(&buf, "NULL");
    2192            0 :                 needComma = true;
    2193            0 :         }
    2194            0 :         appendStringInfoChar(&buf, ')');
    2195              : 
    2196            0 :         return buf.data;
    2197            0 : }
    2198              : 
    2199              : static char *
    2200            0 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2201              : {
    2202            0 :         char       *relname;
    2203            0 :         TupleDesc       tupdesc;
    2204            0 :         StringInfoData buf;
    2205            0 :         int                     i;
    2206              : 
    2207            0 :         initStringInfo(&buf);
    2208              : 
    2209              :         /* get relation name including any needed schema prefix and quoting */
    2210            0 :         relname = generate_relation_name(rel);
    2211              : 
    2212            0 :         tupdesc = rel->rd_att;
    2213              : 
    2214            0 :         appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2215            0 :         for (i = 0; i < pknumatts; i++)
    2216              :         {
    2217            0 :                 int                     pkattnum = pkattnums[i];
    2218            0 :                 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2219              : 
    2220            0 :                 if (i > 0)
    2221            0 :                         appendStringInfoString(&buf, " AND ");
    2222              : 
    2223            0 :                 appendStringInfoString(&buf,
    2224            0 :                                                            quote_ident_cstr(NameStr(attr->attname)));
    2225              : 
    2226            0 :                 if (tgt_pkattvals[i] != NULL)
    2227            0 :                         appendStringInfo(&buf, " = %s",
    2228            0 :                                                          quote_literal_cstr(tgt_pkattvals[i]));
    2229              :                 else
    2230            0 :                         appendStringInfoString(&buf, " IS NULL");
    2231            0 :         }
    2232              : 
    2233            0 :         return buf.data;
    2234            0 : }
    2235              : 
    2236              : static char *
    2237            0 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2238              : {
    2239            0 :         char       *relname;
    2240            0 :         HeapTuple       tuple;
    2241            0 :         TupleDesc       tupdesc;
    2242            0 :         int                     natts;
    2243            0 :         StringInfoData buf;
    2244            0 :         char       *val;
    2245            0 :         int                     key;
    2246            0 :         int                     i;
    2247            0 :         bool            needComma;
    2248              : 
    2249            0 :         initStringInfo(&buf);
    2250              : 
    2251              :         /* get relation name including any needed schema prefix and quoting */
    2252            0 :         relname = generate_relation_name(rel);
    2253              : 
    2254            0 :         tupdesc = rel->rd_att;
    2255            0 :         natts = tupdesc->natts;
    2256              : 
    2257            0 :         tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2258            0 :         if (!tuple)
    2259            0 :                 ereport(ERROR,
    2260              :                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2261              :                                  errmsg("source row not found")));
    2262              : 
    2263            0 :         appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2264              : 
    2265              :         /*
    2266              :          * Note: i is physical column number (counting from 0).
    2267              :          */
    2268            0 :         needComma = false;
    2269            0 :         for (i = 0; i < natts; i++)
    2270              :         {
    2271            0 :                 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2272              : 
    2273            0 :                 if (attr->attisdropped)
    2274            0 :                         continue;
    2275              : 
    2276            0 :                 if (needComma)
    2277            0 :                         appendStringInfoString(&buf, ", ");
    2278              : 
    2279            0 :                 appendStringInfo(&buf, "%s = ",
    2280            0 :                                                  quote_ident_cstr(NameStr(attr->attname)));
    2281              : 
    2282            0 :                 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2283              : 
    2284            0 :                 if (key >= 0)
    2285            0 :                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2286              :                 else
    2287            0 :                         val = SPI_getvalue(tuple, tupdesc, i + 1);
    2288              : 
    2289            0 :                 if (val != NULL)
    2290              :                 {
    2291            0 :                         appendStringInfoString(&buf, quote_literal_cstr(val));
    2292            0 :                         pfree(val);
    2293            0 :                 }
    2294              :                 else
    2295            0 :                         appendStringInfoString(&buf, "NULL");
    2296            0 :                 needComma = true;
    2297            0 :         }
    2298              : 
    2299            0 :         appendStringInfoString(&buf, " WHERE ");
    2300              : 
    2301            0 :         for (i = 0; i < pknumatts; i++)
    2302              :         {
    2303            0 :                 int                     pkattnum = pkattnums[i];
    2304            0 :                 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2305              : 
    2306            0 :                 if (i > 0)
    2307            0 :                         appendStringInfoString(&buf, " AND ");
    2308              : 
    2309            0 :                 appendStringInfoString(&buf,
    2310            0 :                                                            quote_ident_cstr(NameStr(attr->attname)));
    2311              : 
    2312            0 :                 val = tgt_pkattvals[i];
    2313              : 
    2314            0 :                 if (val != NULL)
    2315            0 :                         appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2316              :                 else
    2317            0 :                         appendStringInfoString(&buf, " IS NULL");
    2318            0 :         }
    2319              : 
    2320            0 :         return buf.data;
    2321            0 : }
    2322              : 
    2323              : /*
    2324              :  * Return a properly quoted identifier.
    2325              :  * Uses quote_ident in quote.c
    2326              :  */
    2327              : static char *
    2328            0 : quote_ident_cstr(char *rawstr)
    2329              : {
    2330            0 :         text       *rawstr_text;
    2331            0 :         text       *result_text;
    2332            0 :         char       *result;
    2333              : 
    2334            0 :         rawstr_text = cstring_to_text(rawstr);
    2335            0 :         result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2336              :                                                                                                          PointerGetDatum(rawstr_text)));
    2337            0 :         result = text_to_cstring(result_text);
    2338              : 
    2339            0 :         return result;
    2340            0 : }
    2341              : 
    2342              : static int
    2343            0 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2344              : {
    2345            0 :         int                     i;
    2346              : 
    2347              :         /*
    2348              :          * Not likely a long list anyway, so just scan for the value
    2349              :          */
    2350            0 :         for (i = 0; i < pknumatts; i++)
    2351            0 :                 if (key == pkattnums[i])
    2352            0 :                         return i;
    2353              : 
    2354            0 :         return -1;
    2355            0 : }
    2356              : 
    2357              : static HeapTuple
    2358            0 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2359              : {
    2360            0 :         char       *relname;
    2361            0 :         TupleDesc       tupdesc;
    2362            0 :         int                     natts;
    2363            0 :         StringInfoData buf;
    2364            0 :         int                     ret;
    2365            0 :         HeapTuple       tuple;
    2366            0 :         int                     i;
    2367              : 
    2368              :         /*
    2369              :          * Connect to SPI manager
    2370              :          */
    2371            0 :         SPI_connect();
    2372              : 
    2373            0 :         initStringInfo(&buf);
    2374              : 
    2375              :         /* get relation name including any needed schema prefix and quoting */
    2376            0 :         relname = generate_relation_name(rel);
    2377              : 
    2378            0 :         tupdesc = rel->rd_att;
    2379            0 :         natts = tupdesc->natts;
    2380              : 
    2381              :         /*
    2382              :          * Build sql statement to look up tuple of interest, ie, the one matching
    2383              :          * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2384              :          * generate a result tuple that matches the table's physical structure,
    2385              :          * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2386              :          * different tupdescs and everything's very confusing.
    2387              :          */
    2388            0 :         appendStringInfoString(&buf, "SELECT ");
    2389              : 
    2390            0 :         for (i = 0; i < natts; i++)
    2391              :         {
    2392            0 :                 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2393              : 
    2394            0 :                 if (i > 0)
    2395            0 :                         appendStringInfoString(&buf, ", ");
    2396              : 
    2397            0 :                 if (attr->attisdropped)
    2398            0 :                         appendStringInfoString(&buf, "NULL");
    2399              :                 else
    2400            0 :                         appendStringInfoString(&buf,
    2401            0 :                                                                    quote_ident_cstr(NameStr(attr->attname)));
    2402            0 :         }
    2403              : 
    2404            0 :         appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2405              : 
    2406            0 :         for (i = 0; i < pknumatts; i++)
    2407              :         {
    2408            0 :                 int                     pkattnum = pkattnums[i];
    2409            0 :                 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2410              : 
    2411            0 :                 if (i > 0)
    2412            0 :                         appendStringInfoString(&buf, " AND ");
    2413              : 
    2414            0 :                 appendStringInfoString(&buf,
    2415            0 :                                                            quote_ident_cstr(NameStr(attr->attname)));
    2416              : 
    2417            0 :                 if (src_pkattvals[i] != NULL)
    2418            0 :                         appendStringInfo(&buf, " = %s",
    2419            0 :                                                          quote_literal_cstr(src_pkattvals[i]));
    2420              :                 else
    2421            0 :                         appendStringInfoString(&buf, " IS NULL");
    2422            0 :         }
    2423              : 
    2424              :         /*
    2425              :          * Retrieve the desired tuple
    2426              :          */
    2427            0 :         ret = SPI_exec(buf.data, 0);
    2428            0 :         pfree(buf.data);
    2429              : 
    2430              :         /*
    2431              :          * Only allow one qualifying tuple
    2432              :          */
    2433            0 :         if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2434            0 :                 ereport(ERROR,
    2435              :                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2436              :                                  errmsg("source criteria matched more than one record")));
    2437              : 
    2438            0 :         else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2439              :         {
    2440            0 :                 SPITupleTable *tuptable = SPI_tuptable;
    2441              : 
    2442            0 :                 tuple = SPI_copytuple(tuptable->vals[0]);
    2443            0 :                 SPI_finish();
    2444              : 
    2445            0 :                 return tuple;
    2446            0 :         }
    2447              :         else
    2448              :         {
    2449              :                 /*
    2450              :                  * no qualifying tuples
    2451              :                  */
    2452            0 :                 SPI_finish();
    2453              : 
    2454            0 :                 return NULL;
    2455              :         }
    2456              : 
    2457              :         /*
    2458              :          * never reached, but keep compiler quiet
    2459              :          */
    2460            0 :         return NULL;
    2461            0 : }
    2462              : 
    2463              : static void
    2464            0 : RangeVarCallbackForDblink(const RangeVar *relation,
    2465              :                                                   Oid relId, Oid oldRelId, void *arg)
    2466              : {
    2467            0 :         AclResult       aclresult;
    2468              : 
    2469            0 :         if (!OidIsValid(relId))
    2470            0 :                 return;
    2471              : 
    2472            0 :         aclresult = pg_class_aclcheck(relId, GetUserId(), *((AclMode *) arg));
    2473            0 :         if (aclresult != ACLCHECK_OK)
    2474            0 :                 aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relId)),
    2475            0 :                                            relation->relname);
    2476            0 : }
    2477              : 
    2478              : /*
    2479              :  * Open the relation named by relname_text, acquire specified type of lock,
    2480              :  * verify we have specified permissions.
    2481              :  * Caller must close rel when done with it.
    2482              :  */
    2483              : static Relation
    2484            0 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2485              : {
    2486            0 :         RangeVar   *relvar;
    2487            0 :         Oid                     relid;
    2488              : 
    2489            0 :         relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2490            0 :         relid = RangeVarGetRelidExtended(relvar, lockmode, 0,
    2491              :                                                                          RangeVarCallbackForDblink, &aclmode);
    2492              : 
    2493            0 :         return table_open(relid, NoLock);
    2494            0 : }
    2495              : 
    2496              : /*
    2497              :  * generate_relation_name - copied from ruleutils.c
    2498              :  *              Compute the name to display for a relation
    2499              :  *
    2500              :  * The result includes all necessary quoting and schema-prefixing.
    2501              :  */
    2502              : static char *
    2503            0 : generate_relation_name(Relation rel)
    2504              : {
    2505            0 :         char       *nspname;
    2506            0 :         char       *result;
    2507              : 
    2508              :         /* Qualify the name if not visible in search path */
    2509            0 :         if (RelationIsVisible(RelationGetRelid(rel)))
    2510            0 :                 nspname = NULL;
    2511              :         else
    2512            0 :                 nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2513              : 
    2514            0 :         result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2515              : 
    2516            0 :         return result;
    2517            0 : }
    2518              : 
    2519              : 
    2520              : static remoteConn *
    2521            0 : getConnectionByName(const char *name)
    2522              : {
    2523            0 :         remoteConnHashEnt *hentry;
    2524            0 :         char       *key;
    2525              : 
    2526            0 :         if (!remoteConnHash)
    2527            0 :                 remoteConnHash = createConnHash();
    2528              : 
    2529            0 :         key = pstrdup(name);
    2530            0 :         truncate_identifier(key, strlen(key), false);
    2531            0 :         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2532            0 :                                                                                            key, HASH_FIND, NULL);
    2533              : 
    2534            0 :         if (hentry && hentry->rconn.conn != NULL)
    2535            0 :                 return &hentry->rconn;
    2536              : 
    2537            0 :         return NULL;
    2538            0 : }
    2539              : 
    2540              : static HTAB *
    2541            0 : createConnHash(void)
    2542              : {
    2543            0 :         HASHCTL         ctl;
    2544              : 
    2545            0 :         ctl.keysize = NAMEDATALEN;
    2546            0 :         ctl.entrysize = sizeof(remoteConnHashEnt);
    2547              : 
    2548            0 :         return hash_create("Remote Con hash", NUMCONN, &ctl,
    2549              :                                            HASH_ELEM | HASH_STRINGS);
    2550            0 : }
    2551              : 
    2552              : static remoteConn *
    2553            0 : createNewConnection(const char *name)
    2554              : {
    2555            0 :         remoteConnHashEnt *hentry;
    2556            0 :         bool            found;
    2557            0 :         char       *key;
    2558              : 
    2559            0 :         if (!remoteConnHash)
    2560            0 :                 remoteConnHash = createConnHash();
    2561              : 
    2562            0 :         key = pstrdup(name);
    2563            0 :         truncate_identifier(key, strlen(key), true);
    2564            0 :         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2565              :                                                                                            HASH_ENTER, &found);
    2566              : 
    2567            0 :         if (found && hentry->rconn.conn != NULL)
    2568            0 :                 ereport(ERROR,
    2569              :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2570              :                                  errmsg("duplicate connection name")));
    2571              : 
    2572              :         /* New, or reusable, so initialize the rconn struct to zeroes */
    2573            0 :         memset(&hentry->rconn, 0, sizeof(remoteConn));
    2574              : 
    2575            0 :         return &hentry->rconn;
    2576            0 : }
    2577              : 
    2578              : static void
    2579            0 : deleteConnection(const char *name)
    2580              : {
    2581            0 :         remoteConnHashEnt *hentry;
    2582            0 :         bool            found;
    2583            0 :         char       *key;
    2584              : 
    2585            0 :         if (!remoteConnHash)
    2586            0 :                 remoteConnHash = createConnHash();
    2587              : 
    2588            0 :         key = pstrdup(name);
    2589            0 :         truncate_identifier(key, strlen(key), false);
    2590            0 :         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2591            0 :                                                                                            key, HASH_REMOVE, &found);
    2592              : 
    2593            0 :         if (!hentry)
    2594            0 :                 ereport(ERROR,
    2595              :                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2596              :                                  errmsg("undefined connection name")));
    2597            0 : }
    2598              : 
    2599              :  /*
    2600              :   * Ensure that require_auth and SCRAM keys are correctly set on connstr.
    2601              :   * SCRAM keys used to pass-through are coming from the initial connection
    2602              :   * from the client with the server.
    2603              :   *
    2604              :   * All required SCRAM options are set by dblink, so we just need to ensure
    2605              :   * that these options are not overwritten by the user.
    2606              :   *
    2607              :   * See appendSCRAMKeysInfo and its usage for more.
    2608              :   */
    2609              : bool
    2610            0 : dblink_connstr_has_required_scram_options(const char *connstr)
    2611              : {
    2612            0 :         PQconninfoOption *options;
    2613            0 :         bool            has_scram_server_key = false;
    2614            0 :         bool            has_scram_client_key = false;
    2615            0 :         bool            has_require_auth = false;
    2616            0 :         bool            has_scram_keys = false;
    2617              : 
    2618            0 :         options = PQconninfoParse(connstr, NULL);
    2619            0 :         if (options)
    2620              :         {
    2621              :                 /*
    2622              :                  * Continue iterating even if we found the keys that we need to
    2623              :                  * validate to make sure that there is no other declaration of these
    2624              :                  * keys that can overwrite the first.
    2625              :                  */
    2626            0 :                 for (PQconninfoOption *option = options; option->keyword != NULL; option++)
    2627              :                 {
    2628            0 :                         if (strcmp(option->keyword, "require_auth") == 0)
    2629              :                         {
    2630            0 :                                 if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
    2631            0 :                                         has_require_auth = true;
    2632              :                                 else
    2633            0 :                                         has_require_auth = false;
    2634            0 :                         }
    2635              : 
    2636            0 :                         if (strcmp(option->keyword, "scram_client_key") == 0)
    2637              :                         {
    2638            0 :                                 if (option->val != NULL && option->val[0] != '\0')
    2639            0 :                                         has_scram_client_key = true;
    2640              :                                 else
    2641            0 :                                         has_scram_client_key = false;
    2642            0 :                         }
    2643              : 
    2644            0 :                         if (strcmp(option->keyword, "scram_server_key") == 0)
    2645              :                         {
    2646            0 :                                 if (option->val != NULL && option->val[0] != '\0')
    2647            0 :                                         has_scram_server_key = true;
    2648              :                                 else
    2649            0 :                                         has_scram_server_key = false;
    2650            0 :                         }
    2651            0 :                 }
    2652            0 :                 PQconninfoFree(options);
    2653            0 :         }
    2654              : 
    2655            0 :         has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
    2656              : 
    2657            0 :         return (has_scram_keys && has_require_auth);
    2658            0 : }
    2659              : 
    2660              : /*
    2661              :  * We need to make sure that the connection made used credentials
    2662              :  * which were provided by the user, so check what credentials were
    2663              :  * used to connect and then make sure that they came from the user.
    2664              :  *
    2665              :  * On failure, we close "conn" and also delete the hashtable entry
    2666              :  * identified by "connname" (if that's not NULL).
    2667              :  */
    2668              : static void
    2669            0 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
    2670              : {
    2671              :         /* Superuser bypasses security check */
    2672            0 :         if (superuser())
    2673            0 :                 return;
    2674              : 
    2675              :         /* If password was used to connect, make sure it was one provided */
    2676            0 :         if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
    2677            0 :                 return;
    2678              : 
    2679              :         /*
    2680              :          * Password was not used to connect, check if SCRAM pass-through is in
    2681              :          * use.
    2682              :          *
    2683              :          * If dblink_connstr_has_required_scram_options is true we assume that
    2684              :          * UseScramPassthrough is also true because the required SCRAM keys are
    2685              :          * only added if UseScramPassthrough is set, and the user is not allowed
    2686              :          * to add the SCRAM keys on fdw and user mapping options.
    2687              :          */
    2688            0 :         if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2689            0 :                 return;
    2690              : 
    2691              : #ifdef ENABLE_GSS
    2692              :         /* If GSSAPI creds used to connect, make sure it was one delegated */
    2693            0 :         if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
    2694            0 :                 return;
    2695              : #endif
    2696              : 
    2697              :         /* Otherwise, fail out */
    2698            0 :         libpqsrv_disconnect(conn);
    2699            0 :         if (connname)
    2700            0 :                 deleteConnection(connname);
    2701              : 
    2702            0 :         ereport(ERROR,
    2703              :                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2704              :                          errmsg("password or GSSAPI delegated credentials required"),
    2705              :                          errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
    2706              :                          errhint("Ensure provided credentials match target server's authentication method.")));
    2707            0 : }
    2708              : 
    2709              : /*
    2710              :  * Function to check if the connection string includes an explicit
    2711              :  * password, needed to ensure that non-superuser password-based auth
    2712              :  * is using a provided password and not one picked up from the
    2713              :  * environment.
    2714              :  */
    2715              : static bool
    2716            0 : dblink_connstr_has_pw(const char *connstr)
    2717              : {
    2718            0 :         PQconninfoOption *options;
    2719            0 :         PQconninfoOption *option;
    2720            0 :         bool            connstr_gives_password = false;
    2721              : 
    2722            0 :         options = PQconninfoParse(connstr, NULL);
    2723            0 :         if (options)
    2724              :         {
    2725            0 :                 for (option = options; option->keyword != NULL; option++)
    2726              :                 {
    2727            0 :                         if (strcmp(option->keyword, "password") == 0)
    2728              :                         {
    2729            0 :                                 if (option->val != NULL && option->val[0] != '\0')
    2730              :                                 {
    2731            0 :                                         connstr_gives_password = true;
    2732            0 :                                         break;
    2733              :                                 }
    2734            0 :                         }
    2735            0 :                 }
    2736            0 :                 PQconninfoFree(options);
    2737            0 :         }
    2738              : 
    2739            0 :         return connstr_gives_password;
    2740            0 : }
    2741              : 
    2742              : /*
    2743              :  * For non-superusers, insist that the connstr specify a password, except if
    2744              :  * GSSAPI credentials have been delegated (and we check that they are used for
    2745              :  * the connection in dblink_security_check later) or if SCRAM pass-through is
    2746              :  * being used.  This prevents a password or GSSAPI credentials from being
    2747              :  * picked up from .pgpass, a service file, the environment, etc.  We don't want
    2748              :  * the postgres user's passwords or Kerberos credentials to be accessible to
    2749              :  * non-superusers. In case of SCRAM pass-through insist that the connstr
    2750              :  * has the required SCRAM pass-through options.
    2751              :  */
    2752              : static void
    2753            0 : dblink_connstr_check(const char *connstr)
    2754              : {
    2755            0 :         if (superuser())
    2756            0 :                 return;
    2757              : 
    2758            0 :         if (dblink_connstr_has_pw(connstr))
    2759            0 :                 return;
    2760              : 
    2761            0 :         if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2762            0 :                 return;
    2763              : 
    2764              : #ifdef ENABLE_GSS
    2765            0 :         if (be_gssapi_get_delegation(MyProcPort))
    2766            0 :                 return;
    2767              : #endif
    2768              : 
    2769            0 :         ereport(ERROR,
    2770              :                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2771              :                          errmsg("password or GSSAPI delegated credentials required"),
    2772              :                          errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
    2773            0 : }
    2774              : 
    2775              : /*
    2776              :  * Report an error received from the remote server
    2777              :  *
    2778              :  * res: the received error result
    2779              :  * fail: true for ERROR ereport, false for NOTICE
    2780              :  * fmt and following args: sprintf-style format and values for errcontext;
    2781              :  * the resulting string should be worded like "while <some action>"
    2782              :  *
    2783              :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
    2784              :  * in which case memory context cleanup will clear it eventually).
    2785              :  */
    2786              : static void
    2787            0 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2788              :                                  bool fail, const char *fmt,...)
    2789              : {
    2790            0 :         int                     level;
    2791            0 :         char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2792            0 :         char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2793            0 :         char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2794            0 :         char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2795            0 :         char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2796            0 :         int                     sqlstate;
    2797            0 :         va_list         ap;
    2798            0 :         char            dblink_context_msg[512];
    2799              : 
    2800            0 :         if (fail)
    2801            0 :                 level = ERROR;
    2802              :         else
    2803            0 :                 level = NOTICE;
    2804              : 
    2805            0 :         if (pg_diag_sqlstate)
    2806            0 :                 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2807              :                                                                  pg_diag_sqlstate[1],
    2808              :                                                                  pg_diag_sqlstate[2],
    2809              :                                                                  pg_diag_sqlstate[3],
    2810              :                                                                  pg_diag_sqlstate[4]);
    2811              :         else
    2812            0 :                 sqlstate = ERRCODE_CONNECTION_FAILURE;
    2813              : 
    2814              :         /*
    2815              :          * If we don't get a message from the PGresult, try the PGconn.  This is
    2816              :          * needed because for connection-level failures, PQgetResult may just
    2817              :          * return NULL, not a PGresult at all.
    2818              :          */
    2819            0 :         if (message_primary == NULL)
    2820            0 :                 message_primary = pchomp(PQerrorMessage(conn));
    2821              : 
    2822              :         /*
    2823              :          * Format the basic errcontext string.  Below, we'll add on something
    2824              :          * about the connection name.  That's a violation of the translatability
    2825              :          * guidelines about constructing error messages out of parts, but since
    2826              :          * there's no translation support for dblink, there's no need to worry
    2827              :          * about that (yet).
    2828              :          */
    2829            0 :         va_start(ap, fmt);
    2830            0 :         vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2831            0 :         va_end(ap);
    2832              : 
    2833            0 :         ereport(level,
    2834              :                         (errcode(sqlstate),
    2835              :                          (message_primary != NULL && message_primary[0] != '\0') ?
    2836              :                          errmsg_internal("%s", message_primary) :
    2837              :                          errmsg("could not obtain message string for remote error"),
    2838              :                          message_detail ? errdetail_internal("%s", message_detail) : 0,
    2839              :                          message_hint ? errhint("%s", message_hint) : 0,
    2840              :                          message_context ? (errcontext("%s", message_context)) : 0,
    2841              :                          conname ?
    2842              :                          (errcontext("%s on dblink connection named \"%s\"",
    2843              :                                                  dblink_context_msg, conname)) :
    2844              :                          (errcontext("%s on unnamed dblink connection",
    2845              :                                                  dblink_context_msg))));
    2846            0 :         PQclear(res);
    2847            0 : }
    2848              : 
    2849              : /*
    2850              :  * Obtain connection string for a foreign server
    2851              :  */
    2852              : static char *
    2853            0 : get_connect_string(const char *servername)
    2854              : {
    2855            0 :         ForeignServer *foreign_server = NULL;
    2856            0 :         UserMapping *user_mapping;
    2857            0 :         ListCell   *cell;
    2858            0 :         StringInfoData buf;
    2859            0 :         ForeignDataWrapper *fdw;
    2860            0 :         AclResult       aclresult;
    2861            0 :         char       *srvname;
    2862              : 
    2863              :         static const PQconninfoOption *options = NULL;
    2864              : 
    2865            0 :         initStringInfo(&buf);
    2866              : 
    2867              :         /*
    2868              :          * Get list of valid libpq options.
    2869              :          *
    2870              :          * To avoid unnecessary work, we get the list once and use it throughout
    2871              :          * the lifetime of this backend process.  We don't need to care about
    2872              :          * memory context issues, because PQconndefaults allocates with malloc.
    2873              :          */
    2874            0 :         if (!options)
    2875              :         {
    2876            0 :                 options = PQconndefaults();
    2877            0 :                 if (!options)                   /* assume reason for failure is OOM */
    2878            0 :                         ereport(ERROR,
    2879              :                                         (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2880              :                                          errmsg("out of memory"),
    2881              :                                          errdetail("Could not get libpq's default connection options.")));
    2882            0 :         }
    2883              : 
    2884              :         /* first gather the server connstr options */
    2885            0 :         srvname = pstrdup(servername);
    2886            0 :         truncate_identifier(srvname, strlen(srvname), false);
    2887            0 :         foreign_server = GetForeignServerByName(srvname, true);
    2888              : 
    2889            0 :         if (foreign_server)
    2890              :         {
    2891            0 :                 Oid                     serverid = foreign_server->serverid;
    2892            0 :                 Oid                     fdwid = foreign_server->fdwid;
    2893            0 :                 Oid                     userid = GetUserId();
    2894              : 
    2895            0 :                 user_mapping = GetUserMapping(userid, serverid);
    2896            0 :                 fdw = GetForeignDataWrapper(fdwid);
    2897              : 
    2898              :                 /* Check permissions, user must have usage on the server. */
    2899            0 :                 aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2900            0 :                 if (aclresult != ACLCHECK_OK)
    2901            0 :                         aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2902              : 
    2903              :                 /*
    2904              :                  * First append hardcoded options needed for SCRAM pass-through, so if
    2905              :                  * the user overwrites these options we can ereport on
    2906              :                  * dblink_connstr_check and dblink_security_check.
    2907              :                  */
    2908            0 :                 if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
    2909            0 :                         appendSCRAMKeysInfo(&buf);
    2910              : 
    2911            0 :                 foreach(cell, fdw->options)
    2912              :                 {
    2913            0 :                         DefElem    *def = lfirst(cell);
    2914              : 
    2915            0 :                         if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2916            0 :                                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2917            0 :                                                                  escape_param_str(strVal(def->arg)));
    2918            0 :                 }
    2919              : 
    2920            0 :                 foreach(cell, foreign_server->options)
    2921              :                 {
    2922            0 :                         DefElem    *def = lfirst(cell);
    2923              : 
    2924            0 :                         if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2925            0 :                                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2926            0 :                                                                  escape_param_str(strVal(def->arg)));
    2927            0 :                 }
    2928              : 
    2929            0 :                 foreach(cell, user_mapping->options)
    2930              :                 {
    2931              : 
    2932            0 :                         DefElem    *def = lfirst(cell);
    2933              : 
    2934            0 :                         if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2935            0 :                                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2936            0 :                                                                  escape_param_str(strVal(def->arg)));
    2937            0 :                 }
    2938              : 
    2939            0 :                 return buf.data;
    2940            0 :         }
    2941              :         else
    2942            0 :                 return NULL;
    2943            0 : }
    2944              : 
    2945              : /*
    2946              :  * Escaping libpq connect parameter strings.
    2947              :  *
    2948              :  * Replaces "'" with "\'" and "\" with "\\".
    2949              :  */
    2950              : static char *
    2951            0 : escape_param_str(const char *str)
    2952              : {
    2953            0 :         const char *cp;
    2954            0 :         StringInfoData buf;
    2955              : 
    2956            0 :         initStringInfo(&buf);
    2957              : 
    2958            0 :         for (cp = str; *cp; cp++)
    2959              :         {
    2960            0 :                 if (*cp == '\\' || *cp == '\'')
    2961            0 :                         appendStringInfoChar(&buf, '\\');
    2962            0 :                 appendStringInfoChar(&buf, *cp);
    2963            0 :         }
    2964              : 
    2965            0 :         return buf.data;
    2966            0 : }
    2967              : 
    2968              : /*
    2969              :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2970              :  * functions, and translate to the internal representation.
    2971              :  *
    2972              :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2973              :  * argument (the need for the separate count argument is historical, but we
    2974              :  * still check it).  We check that each attnum corresponds to a valid,
    2975              :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2976              :  * listed twice, though the actual use-case for such things is dubious.
    2977              :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2978              :  * physical not logical column numbers; this was changed for future-proofing.
    2979              :  *
    2980              :  * The internal representation is a palloc'd int array of 0-based physical
    2981              :  * attnums.
    2982              :  */
    2983              : static void
    2984            0 : validate_pkattnums(Relation rel,
    2985              :                                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2986              :                                    int **pkattnums, int *pknumatts)
    2987              : {
    2988            0 :         TupleDesc       tupdesc = rel->rd_att;
    2989            0 :         int                     natts = tupdesc->natts;
    2990            0 :         int                     i;
    2991              : 
    2992              :         /* Don't take more array elements than there are */
    2993            0 :         pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    2994              : 
    2995              :         /* Must have at least one pk attnum selected */
    2996            0 :         if (pknumatts_arg <= 0)
    2997            0 :                 ereport(ERROR,
    2998              :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2999              :                                  errmsg("number of key attributes must be > 0")));
    3000              : 
    3001              :         /* Allocate output array */
    3002            0 :         *pkattnums = palloc_array(int, pknumatts_arg);
    3003            0 :         *pknumatts = pknumatts_arg;
    3004              : 
    3005              :         /* Validate attnums and convert to internal form */
    3006            0 :         for (i = 0; i < pknumatts_arg; i++)
    3007              :         {
    3008            0 :                 int                     pkattnum = pkattnums_arg->values[i];
    3009            0 :                 int                     lnum;
    3010            0 :                 int                     j;
    3011              : 
    3012              :                 /* Can throw error immediately if out of range */
    3013            0 :                 if (pkattnum <= 0 || pkattnum > natts)
    3014            0 :                         ereport(ERROR,
    3015              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3016              :                                          errmsg("invalid attribute number %d", pkattnum)));
    3017              : 
    3018              :                 /* Identify which physical column has this logical number */
    3019            0 :                 lnum = 0;
    3020            0 :                 for (j = 0; j < natts; j++)
    3021              :                 {
    3022              :                         /* dropped columns don't count */
    3023            0 :                         if (TupleDescCompactAttr(tupdesc, j)->attisdropped)
    3024            0 :                                 continue;
    3025              : 
    3026            0 :                         if (++lnum == pkattnum)
    3027            0 :                                 break;
    3028            0 :                 }
    3029              : 
    3030            0 :                 if (j < natts)
    3031            0 :                         (*pkattnums)[i] = j;
    3032              :                 else
    3033            0 :                         ereport(ERROR,
    3034              :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3035              :                                          errmsg("invalid attribute number %d", pkattnum)));
    3036            0 :         }
    3037            0 : }
    3038              : 
    3039              : /*
    3040              :  * Check if the specified connection option is valid.
    3041              :  *
    3042              :  * We basically allow whatever libpq thinks is an option, with these
    3043              :  * restrictions:
    3044              :  *              debug options: disallowed
    3045              :  *              "client_encoding": disallowed
    3046              :  *              "user": valid only in USER MAPPING options
    3047              :  *              secure options (eg password): valid only in USER MAPPING options
    3048              :  *              others: valid only in FOREIGN SERVER options
    3049              :  *
    3050              :  * We disallow client_encoding because it would be overridden anyway via
    3051              :  * PQclientEncoding; allowing it to be specified would merely promote
    3052              :  * confusion.
    3053              :  */
    3054              : static bool
    3055            0 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    3056              :                                            Oid context)
    3057              : {
    3058            0 :         const PQconninfoOption *opt;
    3059              : 
    3060              :         /* Look up the option in libpq result */
    3061            0 :         for (opt = options; opt->keyword; opt++)
    3062              :         {
    3063            0 :                 if (strcmp(opt->keyword, option) == 0)
    3064            0 :                         break;
    3065            0 :         }
    3066            0 :         if (opt->keyword == NULL)
    3067            0 :                 return false;
    3068              : 
    3069              :         /* Disallow debug options (particularly "replication") */
    3070            0 :         if (strchr(opt->dispchar, 'D'))
    3071            0 :                 return false;
    3072              : 
    3073              :         /* Disallow "client_encoding" */
    3074            0 :         if (strcmp(opt->keyword, "client_encoding") == 0)
    3075            0 :                 return false;
    3076              : 
    3077              :         /*
    3078              :          * Disallow OAuth options for now, since the builtin flow communicates on
    3079              :          * stderr by default and can't cache tokens yet.
    3080              :          */
    3081            0 :         if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
    3082            0 :                 return false;
    3083              : 
    3084              :         /*
    3085              :          * If the option is "user" or marked secure, it should be specified only
    3086              :          * in USER MAPPING.  Others should be specified only in SERVER.
    3087              :          */
    3088            0 :         if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3089              :         {
    3090            0 :                 if (context != UserMappingRelationId)
    3091            0 :                         return false;
    3092            0 :         }
    3093              :         else
    3094              :         {
    3095            0 :                 if (context != ForeignServerRelationId)
    3096            0 :                         return false;
    3097              :         }
    3098              : 
    3099            0 :         return true;
    3100            0 : }
    3101              : 
    3102              : /*
    3103              :  * Same as is_valid_dblink_option but also check for only dblink_fdw specific
    3104              :  * options.
    3105              :  */
    3106              : static bool
    3107            0 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
    3108              :                                                    Oid context)
    3109              : {
    3110            0 :         if (strcmp(option, "use_scram_passthrough") == 0)
    3111            0 :                 return true;
    3112              : 
    3113            0 :         return is_valid_dblink_option(options, option, context);
    3114            0 : }
    3115              : 
    3116              : /*
    3117              :  * Copy the remote session's values of GUCs that affect datatype I/O
    3118              :  * and apply them locally in a new GUC nesting level.  Returns the new
    3119              :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3120              :  * or -1 if no new nestlevel was needed.
    3121              :  *
    3122              :  * We use the equivalent of a function SET option to allow the settings to
    3123              :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3124              :  * thrown in between, guc.c will take care of undoing the settings.
    3125              :  */
    3126              : static int
    3127            0 : applyRemoteGucs(PGconn *conn)
    3128              : {
    3129              :         static const char *const GUCsAffectingIO[] = {
    3130              :                 "DateStyle",
    3131              :                 "IntervalStyle"
    3132              :         };
    3133              : 
    3134            0 :         int                     nestlevel = -1;
    3135            0 :         int                     i;
    3136              : 
    3137            0 :         for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3138              :         {
    3139            0 :                 const char *gucName = GUCsAffectingIO[i];
    3140            0 :                 const char *remoteVal = PQparameterStatus(conn, gucName);
    3141            0 :                 const char *localVal;
    3142              : 
    3143              :                 /*
    3144              :                  * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3145              :                  * that's okay because its output format won't be ambiguous.  So just
    3146              :                  * skip the GUC if we don't get a value for it.  (We might eventually
    3147              :                  * need more complicated logic with remote-version checks here.)
    3148              :                  */
    3149            0 :                 if (remoteVal == NULL)
    3150            0 :                         continue;
    3151              : 
    3152              :                 /*
    3153              :                  * Avoid GUC-setting overhead if the remote and local GUCs already
    3154              :                  * have the same value.
    3155              :                  */
    3156            0 :                 localVal = GetConfigOption(gucName, false, false);
    3157            0 :                 Assert(localVal != NULL);
    3158              : 
    3159            0 :                 if (strcmp(remoteVal, localVal) == 0)
    3160            0 :                         continue;
    3161              : 
    3162              :                 /* Create new GUC nest level if we didn't already */
    3163            0 :                 if (nestlevel < 0)
    3164            0 :                         nestlevel = NewGUCNestLevel();
    3165              : 
    3166              :                 /* Apply the option (this will throw error on failure) */
    3167            0 :                 (void) set_config_option(gucName, remoteVal,
    3168              :                                                                  PGC_USERSET, PGC_S_SESSION,
    3169              :                                                                  GUC_ACTION_SAVE, true, 0, false);
    3170            0 :         }
    3171              : 
    3172            0 :         return nestlevel;
    3173            0 : }
    3174              : 
    3175              : /*
    3176              :  * Restore local GUCs after they have been overlaid with remote settings.
    3177              :  */
    3178              : static void
    3179            0 : restoreLocalGucs(int nestlevel)
    3180              : {
    3181              :         /* Do nothing if no new nestlevel was created */
    3182            0 :         if (nestlevel > 0)
    3183            0 :                 AtEOXact_GUC(true, nestlevel);
    3184            0 : }
    3185              : 
    3186              : /*
    3187              :  * Append SCRAM client key and server key information from the global
    3188              :  * MyProcPort into the given StringInfo buffer.
    3189              :  */
    3190              : static void
    3191            0 : appendSCRAMKeysInfo(StringInfo buf)
    3192              : {
    3193            0 :         int                     len;
    3194            0 :         int                     encoded_len;
    3195            0 :         char       *client_key;
    3196            0 :         char       *server_key;
    3197              : 
    3198            0 :         len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
    3199              :         /* don't forget the zero-terminator */
    3200            0 :         client_key = palloc0(len + 1);
    3201            0 :         encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
    3202              :                                                                 sizeof(MyProcPort->scram_ClientKey),
    3203            0 :                                                                 client_key, len);
    3204            0 :         if (encoded_len < 0)
    3205            0 :                 elog(ERROR, "could not encode SCRAM client key");
    3206              : 
    3207            0 :         len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
    3208              :         /* don't forget the zero-terminator */
    3209            0 :         server_key = palloc0(len + 1);
    3210            0 :         encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
    3211              :                                                                 sizeof(MyProcPort->scram_ServerKey),
    3212            0 :                                                                 server_key, len);
    3213            0 :         if (encoded_len < 0)
    3214            0 :                 elog(ERROR, "could not encode SCRAM server key");
    3215              : 
    3216            0 :         appendStringInfo(buf, "scram_client_key='%s' ", client_key);
    3217            0 :         appendStringInfo(buf, "scram_server_key='%s' ", server_key);
    3218            0 :         appendStringInfoString(buf, "require_auth='scram-sha-256' ");
    3219              : 
    3220            0 :         pfree(client_key);
    3221            0 :         pfree(server_key);
    3222            0 : }
    3223              : 
    3224              : 
    3225              : static bool
    3226            0 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
    3227              : {
    3228            0 :         ListCell   *cell;
    3229              : 
    3230            0 :         foreach(cell, foreign_server->options)
    3231              :         {
    3232            0 :                 DefElem    *def = lfirst(cell);
    3233              : 
    3234            0 :                 if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3235            0 :                         return defGetBoolean(def);
    3236            0 :         }
    3237              : 
    3238            0 :         foreach(cell, user->options)
    3239              :         {
    3240            0 :                 DefElem    *def = (DefElem *) lfirst(cell);
    3241              : 
    3242            0 :                 if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3243            0 :                         return defGetBoolean(def);
    3244            0 :         }
    3245              : 
    3246            0 :         return false;
    3247            0 : }
        

Generated by: LCOV version 2.3.2-1