LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 34.8 % 325 113
Test Date: 2026-01-26 10:56:24 Functions: 38.5 % 13 5
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 12.3 % 122 15

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * pg_subscription.c
       4                 :             :  *              replication subscriptions
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *              src/backend/catalog/pg_subscription.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/genam.h"
      18                 :             : #include "access/heapam.h"
      19                 :             : #include "access/htup_details.h"
      20                 :             : #include "access/tableam.h"
      21                 :             : #include "catalog/indexing.h"
      22                 :             : #include "catalog/pg_subscription.h"
      23                 :             : #include "catalog/pg_subscription_rel.h"
      24                 :             : #include "catalog/pg_type.h"
      25                 :             : #include "miscadmin.h"
      26                 :             : #include "storage/lmgr.h"
      27                 :             : #include "utils/array.h"
      28                 :             : #include "utils/builtins.h"
      29                 :             : #include "utils/fmgroids.h"
      30                 :             : #include "utils/lsyscache.h"
      31                 :             : #include "utils/pg_lsn.h"
      32                 :             : #include "utils/rel.h"
      33                 :             : #include "utils/syscache.h"
      34                 :             : 
      35                 :             : static List *textarray_to_stringlist(ArrayType *textarray);
      36                 :             : 
      37                 :             : /*
      38                 :             :  * Add a comma-separated list of publication names to the 'dest' string.
      39                 :             :  */
      40                 :             : void
      41                 :           0 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      42                 :             : {
      43                 :           0 :         ListCell   *lc;
      44                 :           0 :         bool            first = true;
      45                 :             : 
      46         [ #  # ]:           0 :         Assert(publications != NIL);
      47                 :             : 
      48   [ #  #  #  #  :           0 :         foreach(lc, publications)
                   #  # ]
      49                 :             :         {
      50                 :           0 :                 char       *pubname = strVal(lfirst(lc));
      51                 :             : 
      52         [ #  # ]:           0 :                 if (first)
      53                 :           0 :                         first = false;
      54                 :             :                 else
      55                 :           0 :                         appendStringInfoString(dest, ", ");
      56                 :             : 
      57         [ #  # ]:           0 :                 if (quote_literal)
      58                 :           0 :                         appendStringInfoString(dest, quote_literal_cstr(pubname));
      59                 :             :                 else
      60                 :             :                 {
      61                 :           0 :                         appendStringInfoChar(dest, '"');
      62                 :           0 :                         appendStringInfoString(dest, pubname);
      63                 :           0 :                         appendStringInfoChar(dest, '"');
      64                 :             :                 }
      65                 :           0 :         }
      66                 :           0 : }
      67                 :             : 
      68                 :             : /*
      69                 :             :  * Fetch the subscription from the syscache.
      70                 :             :  */
      71                 :             : Subscription *
      72                 :          50 : GetSubscription(Oid subid, bool missing_ok)
      73                 :             : {
      74                 :          50 :         HeapTuple       tup;
      75                 :          50 :         Subscription *sub;
      76                 :          50 :         Form_pg_subscription subform;
      77                 :          50 :         Datum           datum;
      78                 :          50 :         bool            isnull;
      79                 :             : 
      80                 :          50 :         tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      81                 :             : 
      82         [ +  - ]:          50 :         if (!HeapTupleIsValid(tup))
      83                 :             :         {
      84         [ #  # ]:           0 :                 if (missing_ok)
      85                 :           0 :                         return NULL;
      86                 :             : 
      87   [ #  #  #  # ]:           0 :                 elog(ERROR, "cache lookup failed for subscription %u", subid);
      88                 :           0 :         }
      89                 :             : 
      90                 :          50 :         subform = (Form_pg_subscription) GETSTRUCT(tup);
      91                 :             : 
      92                 :          50 :         sub = palloc_object(Subscription);
      93                 :          50 :         sub->oid = subid;
      94                 :          50 :         sub->dbid = subform->subdbid;
      95                 :          50 :         sub->skiplsn = subform->subskiplsn;
      96                 :          50 :         sub->name = pstrdup(NameStr(subform->subname));
      97                 :          50 :         sub->owner = subform->subowner;
      98                 :          50 :         sub->enabled = subform->subenabled;
      99                 :          50 :         sub->binary = subform->subbinary;
     100                 :          50 :         sub->stream = subform->substream;
     101                 :          50 :         sub->twophasestate = subform->subtwophasestate;
     102                 :          50 :         sub->disableonerr = subform->subdisableonerr;
     103                 :          50 :         sub->passwordrequired = subform->subpasswordrequired;
     104                 :          50 :         sub->runasowner = subform->subrunasowner;
     105                 :          50 :         sub->failover = subform->subfailover;
     106                 :          50 :         sub->retaindeadtuples = subform->subretaindeadtuples;
     107                 :          50 :         sub->maxretention = subform->submaxretention;
     108                 :          50 :         sub->retentionactive = subform->subretentionactive;
     109                 :             : 
     110                 :             :         /* Get conninfo */
     111                 :          50 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     112                 :          50 :                                                                    tup,
     113                 :             :                                                                    Anum_pg_subscription_subconninfo);
     114                 :          50 :         sub->conninfo = TextDatumGetCString(datum);
     115                 :             : 
     116                 :             :         /* Get slotname */
     117                 :          50 :         datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     118                 :          50 :                                                         tup,
     119                 :             :                                                         Anum_pg_subscription_subslotname,
     120                 :             :                                                         &isnull);
     121         [ +  + ]:          50 :         if (!isnull)
     122                 :          39 :                 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     123                 :             :         else
     124                 :          11 :                 sub->slotname = NULL;
     125                 :             : 
     126                 :             :         /* Get synccommit */
     127                 :          50 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     128                 :          50 :                                                                    tup,
     129                 :             :                                                                    Anum_pg_subscription_subsynccommit);
     130                 :          50 :         sub->synccommit = TextDatumGetCString(datum);
     131                 :             : 
     132                 :             :         /* Get publications */
     133                 :          50 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     134                 :          50 :                                                                    tup,
     135                 :             :                                                                    Anum_pg_subscription_subpublications);
     136                 :          50 :         sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     137                 :             : 
     138                 :             :         /* Get origin */
     139                 :          50 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     140                 :          50 :                                                                    tup,
     141                 :             :                                                                    Anum_pg_subscription_suborigin);
     142                 :          50 :         sub->origin = TextDatumGetCString(datum);
     143                 :             : 
     144                 :             :         /* Is the subscription owner a superuser? */
     145                 :          50 :         sub->ownersuperuser = superuser_arg(sub->owner);
     146                 :             : 
     147                 :          50 :         ReleaseSysCache(tup);
     148                 :             : 
     149                 :          50 :         return sub;
     150                 :          50 : }
     151                 :             : 
     152                 :             : /*
     153                 :             :  * Return number of subscriptions defined in given database.
     154                 :             :  * Used by dropdb() to check if database can indeed be dropped.
     155                 :             :  */
     156                 :             : int
     157                 :           1 : CountDBSubscriptions(Oid dbid)
     158                 :             : {
     159                 :           1 :         int                     nsubs = 0;
     160                 :           1 :         Relation        rel;
     161                 :           1 :         ScanKeyData scankey;
     162                 :           1 :         SysScanDesc scan;
     163                 :           1 :         HeapTuple       tup;
     164                 :             : 
     165                 :           1 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     166                 :             : 
     167                 :           1 :         ScanKeyInit(&scankey,
     168                 :             :                                 Anum_pg_subscription_subdbid,
     169                 :             :                                 BTEqualStrategyNumber, F_OIDEQ,
     170                 :           1 :                                 ObjectIdGetDatum(dbid));
     171                 :             : 
     172                 :           1 :         scan = systable_beginscan(rel, InvalidOid, false,
     173                 :             :                                                           NULL, 1, &scankey);
     174                 :             : 
     175         [ -  + ]:           1 :         while (HeapTupleIsValid(tup = systable_getnext(scan)))
     176                 :           0 :                 nsubs++;
     177                 :             : 
     178                 :           1 :         systable_endscan(scan);
     179                 :             : 
     180                 :           1 :         table_close(rel, NoLock);
     181                 :             : 
     182                 :           2 :         return nsubs;
     183                 :           1 : }
     184                 :             : 
     185                 :             : /*
     186                 :             :  * Free memory allocated by subscription struct.
     187                 :             :  */
     188                 :             : void
     189                 :           0 : FreeSubscription(Subscription *sub)
     190                 :             : {
     191                 :           0 :         pfree(sub->name);
     192                 :           0 :         pfree(sub->conninfo);
     193         [ #  # ]:           0 :         if (sub->slotname)
     194                 :           0 :                 pfree(sub->slotname);
     195                 :           0 :         list_free_deep(sub->publications);
     196                 :           0 :         pfree(sub);
     197                 :           0 : }
     198                 :             : 
     199                 :             : /*
     200                 :             :  * Disable the given subscription.
     201                 :             :  */
     202                 :             : void
     203                 :           0 : DisableSubscription(Oid subid)
     204                 :             : {
     205                 :           0 :         Relation        rel;
     206                 :           0 :         bool            nulls[Natts_pg_subscription];
     207                 :           0 :         bool            replaces[Natts_pg_subscription];
     208                 :           0 :         Datum           values[Natts_pg_subscription];
     209                 :           0 :         HeapTuple       tup;
     210                 :             : 
     211                 :             :         /* Look up the subscription in the catalog */
     212                 :           0 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     213                 :           0 :         tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     214                 :             : 
     215         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
     216   [ #  #  #  # ]:           0 :                 elog(ERROR, "cache lookup failed for subscription %u", subid);
     217                 :             : 
     218                 :           0 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     219                 :             : 
     220                 :             :         /* Form a new tuple. */
     221                 :           0 :         memset(values, 0, sizeof(values));
     222                 :           0 :         memset(nulls, false, sizeof(nulls));
     223                 :           0 :         memset(replaces, false, sizeof(replaces));
     224                 :             : 
     225                 :             :         /* Set the subscription to disabled. */
     226                 :           0 :         values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     227                 :           0 :         replaces[Anum_pg_subscription_subenabled - 1] = true;
     228                 :             : 
     229                 :             :         /* Update the catalog */
     230                 :           0 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     231                 :           0 :                                                         replaces);
     232                 :           0 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
     233                 :           0 :         heap_freetuple(tup);
     234                 :             : 
     235                 :           0 :         table_close(rel, NoLock);
     236                 :           0 : }
     237                 :             : 
     238                 :             : /*
     239                 :             :  * Convert text array to list of strings.
     240                 :             :  *
     241                 :             :  * Note: the resulting list of strings is pallocated here.
     242                 :             :  */
     243                 :             : static List *
     244                 :          50 : textarray_to_stringlist(ArrayType *textarray)
     245                 :             : {
     246                 :          50 :         Datum      *elems;
     247                 :          50 :         int                     nelems,
     248                 :             :                                 i;
     249                 :          50 :         List       *res = NIL;
     250                 :             : 
     251                 :          50 :         deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     252                 :             : 
     253         [ +  - ]:          50 :         if (nelems == 0)
     254                 :           0 :                 return NIL;
     255                 :             : 
     256         [ +  + ]:         126 :         for (i = 0; i < nelems; i++)
     257                 :          76 :                 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     258                 :             : 
     259                 :          50 :         return res;
     260                 :          50 : }
     261                 :             : 
     262                 :             : /*
     263                 :             :  * Add new state record for a subscription table.
     264                 :             :  *
     265                 :             :  * If retain_lock is true, then don't release the locks taken in this function.
     266                 :             :  * We normally release the locks at the end of transaction but in binary-upgrade
     267                 :             :  * mode, we expect to release those immediately.
     268                 :             :  */
     269                 :             : void
     270                 :           0 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     271                 :             :                                                 XLogRecPtr sublsn, bool retain_lock)
     272                 :             : {
     273                 :           0 :         Relation        rel;
     274                 :           0 :         HeapTuple       tup;
     275                 :           0 :         bool            nulls[Natts_pg_subscription_rel];
     276                 :           0 :         Datum           values[Natts_pg_subscription_rel];
     277                 :             : 
     278                 :           0 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     279                 :             : 
     280                 :           0 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     281                 :             : 
     282                 :             :         /* Try finding existing mapping. */
     283                 :           0 :         tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     284                 :             :                                                           ObjectIdGetDatum(relid),
     285                 :             :                                                           ObjectIdGetDatum(subid));
     286         [ #  # ]:           0 :         if (HeapTupleIsValid(tup))
     287   [ #  #  #  # ]:           0 :                 elog(ERROR, "subscription relation %u in subscription %u already exists",
     288                 :             :                          relid, subid);
     289                 :             : 
     290                 :             :         /* Form the tuple. */
     291                 :           0 :         memset(values, 0, sizeof(values));
     292                 :           0 :         memset(nulls, false, sizeof(nulls));
     293                 :           0 :         values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     294                 :           0 :         values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     295                 :           0 :         values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     296         [ #  # ]:           0 :         if (XLogRecPtrIsValid(sublsn))
     297                 :           0 :                 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     298                 :             :         else
     299                 :           0 :                 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     300                 :             : 
     301                 :           0 :         tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     302                 :             : 
     303                 :             :         /* Insert tuple into catalog. */
     304                 :           0 :         CatalogTupleInsert(rel, tup);
     305                 :             : 
     306                 :           0 :         heap_freetuple(tup);
     307                 :             : 
     308                 :             :         /* Cleanup. */
     309         [ #  # ]:           0 :         if (retain_lock)
     310                 :             :         {
     311                 :           0 :                 table_close(rel, NoLock);
     312                 :           0 :         }
     313                 :             :         else
     314                 :             :         {
     315                 :           0 :                 table_close(rel, RowExclusiveLock);
     316                 :           0 :                 UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     317                 :             :         }
     318                 :           0 : }
     319                 :             : 
     320                 :             : /*
     321                 :             :  * Update the state of a subscription table.
     322                 :             :  */
     323                 :             : void
     324                 :           0 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     325                 :             :                                                    XLogRecPtr sublsn, bool already_locked)
     326                 :             : {
     327                 :           0 :         Relation        rel;
     328                 :           0 :         HeapTuple       tup;
     329                 :           0 :         bool            nulls[Natts_pg_subscription_rel];
     330                 :           0 :         Datum           values[Natts_pg_subscription_rel];
     331                 :           0 :         bool            replaces[Natts_pg_subscription_rel];
     332                 :             : 
     333         [ #  # ]:           0 :         if (already_locked)
     334                 :             :         {
     335                 :             : #ifdef USE_ASSERT_CHECKING
     336                 :           0 :                 LOCKTAG         tag;
     337                 :             : 
     338         [ #  # ]:           0 :                 Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     339                 :             :                                                                                   RowExclusiveLock, true));
     340                 :           0 :                 SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     341         [ #  # ]:           0 :                 Assert(LockHeldByMe(&tag, AccessShareLock, true));
     342                 :             : #endif
     343                 :             : 
     344                 :           0 :                 rel = table_open(SubscriptionRelRelationId, NoLock);
     345                 :           0 :         }
     346                 :             :         else
     347                 :             :         {
     348                 :           0 :                 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     349                 :           0 :                 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     350                 :             :         }
     351                 :             : 
     352                 :             :         /* Try finding existing mapping. */
     353                 :           0 :         tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     354                 :             :                                                           ObjectIdGetDatum(relid),
     355                 :             :                                                           ObjectIdGetDatum(subid));
     356         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
     357   [ #  #  #  # ]:           0 :                 elog(ERROR, "subscription relation %u in subscription %u does not exist",
     358                 :             :                          relid, subid);
     359                 :             : 
     360                 :             :         /* Update the tuple. */
     361                 :           0 :         memset(values, 0, sizeof(values));
     362                 :           0 :         memset(nulls, false, sizeof(nulls));
     363                 :           0 :         memset(replaces, false, sizeof(replaces));
     364                 :             : 
     365                 :           0 :         replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     366                 :           0 :         values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     367                 :             : 
     368                 :           0 :         replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     369         [ #  # ]:           0 :         if (XLogRecPtrIsValid(sublsn))
     370                 :           0 :                 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     371                 :             :         else
     372                 :           0 :                 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     373                 :             : 
     374                 :           0 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     375                 :           0 :                                                         replaces);
     376                 :             : 
     377                 :             :         /* Update the catalog. */
     378                 :           0 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
     379                 :             : 
     380                 :             :         /* Cleanup. */
     381                 :           0 :         table_close(rel, NoLock);
     382                 :           0 : }
     383                 :             : 
     384                 :             : /*
     385                 :             :  * Get state of subscription table.
     386                 :             :  *
     387                 :             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     388                 :             :  */
     389                 :             : char
     390                 :           0 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     391                 :             : {
     392                 :           0 :         HeapTuple       tup;
     393                 :           0 :         char            substate;
     394                 :           0 :         bool            isnull;
     395                 :           0 :         Datum           d;
     396                 :           0 :         Relation        rel;
     397                 :             : 
     398                 :             :         /*
     399                 :             :          * This is to avoid the race condition with AlterSubscription which tries
     400                 :             :          * to remove this relstate.
     401                 :             :          */
     402                 :           0 :         rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     403                 :             : 
     404                 :             :         /* Try finding the mapping. */
     405                 :           0 :         tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     406                 :           0 :                                                   ObjectIdGetDatum(relid),
     407                 :           0 :                                                   ObjectIdGetDatum(subid));
     408                 :             : 
     409         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
     410                 :             :         {
     411                 :           0 :                 table_close(rel, AccessShareLock);
     412                 :           0 :                 *sublsn = InvalidXLogRecPtr;
     413                 :           0 :                 return SUBREL_STATE_UNKNOWN;
     414                 :             :         }
     415                 :             : 
     416                 :             :         /* Get the state. */
     417                 :           0 :         substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     418                 :             : 
     419                 :             :         /* Get the LSN */
     420                 :           0 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     421                 :             :                                                 Anum_pg_subscription_rel_srsublsn, &isnull);
     422         [ #  # ]:           0 :         if (isnull)
     423                 :           0 :                 *sublsn = InvalidXLogRecPtr;
     424                 :             :         else
     425                 :           0 :                 *sublsn = DatumGetLSN(d);
     426                 :             : 
     427                 :             :         /* Cleanup */
     428                 :           0 :         ReleaseSysCache(tup);
     429                 :             : 
     430                 :           0 :         table_close(rel, AccessShareLock);
     431                 :             : 
     432                 :           0 :         return substate;
     433                 :           0 : }
     434                 :             : 
     435                 :             : /*
     436                 :             :  * Drop subscription relation mapping. These can be for a particular
     437                 :             :  * subscription, or for a particular relation, or both.
     438                 :             :  */
     439                 :             : void
     440                 :        5559 : RemoveSubscriptionRel(Oid subid, Oid relid)
     441                 :             : {
     442                 :        5559 :         Relation        rel;
     443                 :        5559 :         TableScanDesc scan;
     444                 :        5559 :         ScanKeyData skey[2];
     445                 :        5559 :         HeapTuple       tup;
     446                 :        5559 :         int                     nkeys = 0;
     447                 :             : 
     448                 :        5559 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     449                 :             : 
     450         [ +  + ]:        5559 :         if (OidIsValid(subid))
     451                 :             :         {
     452                 :          26 :                 ScanKeyInit(&skey[nkeys++],
     453                 :             :                                         Anum_pg_subscription_rel_srsubid,
     454                 :             :                                         BTEqualStrategyNumber,
     455                 :             :                                         F_OIDEQ,
     456                 :          13 :                                         ObjectIdGetDatum(subid));
     457                 :          13 :         }
     458                 :             : 
     459         [ +  + ]:        5559 :         if (OidIsValid(relid))
     460                 :             :         {
     461                 :       11092 :                 ScanKeyInit(&skey[nkeys++],
     462                 :             :                                         Anum_pg_subscription_rel_srrelid,
     463                 :             :                                         BTEqualStrategyNumber,
     464                 :             :                                         F_OIDEQ,
     465                 :        5546 :                                         ObjectIdGetDatum(relid));
     466                 :        5546 :         }
     467                 :             : 
     468                 :             :         /* Do the search and delete what we found. */
     469                 :        5559 :         scan = table_beginscan_catalog(rel, nkeys, skey);
     470         [ -  + ]:        5559 :         while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     471                 :             :         {
     472                 :           0 :                 Form_pg_subscription_rel subrel;
     473                 :             : 
     474                 :           0 :                 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     475                 :             : 
     476                 :             :                 /*
     477                 :             :                  * We don't allow to drop the relation mapping when the table
     478                 :             :                  * synchronization is in progress unless the caller updates the
     479                 :             :                  * corresponding subscription as well. This is to ensure that we don't
     480                 :             :                  * leave tablesync slots or origins in the system when the
     481                 :             :                  * corresponding table is dropped. For sequences, however, it's ok to
     482                 :             :                  * drop them since no separate slots or origins are created during
     483                 :             :                  * synchronization.
     484                 :             :                  */
     485         [ #  # ]:           0 :                 if (!OidIsValid(subid) &&
     486   [ #  #  #  # ]:           0 :                         subrel->srsubstate != SUBREL_STATE_READY &&
     487                 :           0 :                         get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     488                 :             :                 {
     489   [ #  #  #  # ]:           0 :                         ereport(ERROR,
     490                 :             :                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     491                 :             :                                          errmsg("could not drop relation mapping for subscription \"%s\"",
     492                 :             :                                                         get_subscription_name(subrel->srsubid, false)),
     493                 :             :                                          errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     494                 :             :                                                            get_rel_name(relid), subrel->srsubstate),
     495                 :             : 
     496                 :             :                         /*
     497                 :             :                          * translator: first %s is a SQL ALTER command and second %s is a
     498                 :             :                          * SQL DROP command
     499                 :             :                          */
     500                 :             :                                          errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     501                 :             :                                                          "ALTER SUBSCRIPTION ... ENABLE",
     502                 :             :                                                          "DROP SUBSCRIPTION ...")));
     503                 :           0 :                 }
     504                 :             : 
     505                 :           0 :                 CatalogTupleDelete(rel, &tup->t_self);
     506                 :           0 :         }
     507                 :        5559 :         table_endscan(scan);
     508                 :             : 
     509                 :        5559 :         table_close(rel, RowExclusiveLock);
     510                 :        5559 : }
     511                 :             : 
     512                 :             : /*
     513                 :             :  * Does the subscription have any tables?
     514                 :             :  *
     515                 :             :  * Use this function only to know true/false, and when you have no need for the
     516                 :             :  * List returned by GetSubscriptionRelations.
     517                 :             :  */
     518                 :             : bool
     519                 :           0 : HasSubscriptionTables(Oid subid)
     520                 :             : {
     521                 :           0 :         Relation        rel;
     522                 :           0 :         ScanKeyData skey[1];
     523                 :           0 :         SysScanDesc scan;
     524                 :           0 :         HeapTuple       tup;
     525                 :           0 :         bool            has_subtables = false;
     526                 :             : 
     527                 :           0 :         rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     528                 :             : 
     529                 :           0 :         ScanKeyInit(&skey[0],
     530                 :             :                                 Anum_pg_subscription_rel_srsubid,
     531                 :             :                                 BTEqualStrategyNumber, F_OIDEQ,
     532                 :           0 :                                 ObjectIdGetDatum(subid));
     533                 :             : 
     534                 :           0 :         scan = systable_beginscan(rel, InvalidOid, false,
     535                 :           0 :                                                           NULL, 1, skey);
     536                 :             : 
     537         [ #  # ]:           0 :         while (HeapTupleIsValid(tup = systable_getnext(scan)))
     538                 :             :         {
     539                 :           0 :                 Form_pg_subscription_rel subrel;
     540                 :           0 :                 char            relkind;
     541                 :             : 
     542                 :           0 :                 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     543                 :           0 :                 relkind = get_rel_relkind(subrel->srrelid);
     544                 :             : 
     545   [ #  #  #  # ]:           0 :                 if (relkind == RELKIND_RELATION ||
     546                 :           0 :                         relkind == RELKIND_PARTITIONED_TABLE)
     547                 :             :                 {
     548                 :           0 :                         has_subtables = true;
     549                 :           0 :                         break;
     550                 :             :                 }
     551      [ #  #  # ]:           0 :         }
     552                 :             : 
     553                 :             :         /* Cleanup */
     554                 :           0 :         systable_endscan(scan);
     555                 :           0 :         table_close(rel, AccessShareLock);
     556                 :             : 
     557                 :           0 :         return has_subtables;
     558                 :           0 : }
     559                 :             : 
     560                 :             : /*
     561                 :             :  * Get the relations for the subscription.
     562                 :             :  *
     563                 :             :  * If not_ready is true, return only the relations that are not in a ready
     564                 :             :  * state, otherwise return all the relations of the subscription.  The
     565                 :             :  * returned list is palloc'ed in the current memory context.
     566                 :             :  */
     567                 :             : List *
     568                 :          13 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     569                 :             :                                                  bool not_ready)
     570                 :             : {
     571                 :          13 :         List       *res = NIL;
     572                 :          13 :         Relation        rel;
     573                 :          13 :         HeapTuple       tup;
     574                 :          13 :         int                     nkeys = 0;
     575                 :          13 :         ScanKeyData skey[2];
     576                 :          13 :         SysScanDesc scan;
     577                 :             : 
     578                 :             :         /* One or both of 'tables' and 'sequences' must be true. */
     579   [ -  +  #  # ]:          13 :         Assert(tables || sequences);
     580                 :             : 
     581                 :          13 :         rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     582                 :             : 
     583                 :          26 :         ScanKeyInit(&skey[nkeys++],
     584                 :             :                                 Anum_pg_subscription_rel_srsubid,
     585                 :             :                                 BTEqualStrategyNumber, F_OIDEQ,
     586                 :          13 :                                 ObjectIdGetDatum(subid));
     587                 :             : 
     588         [ -  + ]:          13 :         if (not_ready)
     589                 :          26 :                 ScanKeyInit(&skey[nkeys++],
     590                 :             :                                         Anum_pg_subscription_rel_srsubstate,
     591                 :             :                                         BTEqualStrategyNumber, F_CHARNE,
     592                 :          13 :                                         CharGetDatum(SUBREL_STATE_READY));
     593                 :             : 
     594                 :          26 :         scan = systable_beginscan(rel, InvalidOid, false,
     595                 :          13 :                                                           NULL, nkeys, skey);
     596                 :             : 
     597         [ -  + ]:          13 :         while (HeapTupleIsValid(tup = systable_getnext(scan)))
     598                 :             :         {
     599                 :           0 :                 Form_pg_subscription_rel subrel;
     600                 :           0 :                 SubscriptionRelState *relstate;
     601                 :           0 :                 Datum           d;
     602                 :           0 :                 bool            isnull;
     603                 :           0 :                 char            relkind;
     604                 :             : 
     605                 :           0 :                 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     606                 :             : 
     607                 :             :                 /* Relation is either a sequence or a table */
     608                 :           0 :                 relkind = get_rel_relkind(subrel->srrelid);
     609   [ #  #  #  #  :           0 :                 Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
                   #  # ]
     610                 :             :                            relkind == RELKIND_PARTITIONED_TABLE);
     611                 :             : 
     612                 :             :                 /* Skip sequences if they were not requested */
     613   [ #  #  #  # ]:           0 :                 if ((relkind == RELKIND_SEQUENCE) && !sequences)
     614                 :           0 :                         continue;
     615                 :             : 
     616                 :             :                 /* Skip tables if they were not requested */
     617         [ #  # ]:           0 :                 if ((relkind == RELKIND_RELATION ||
     618         [ #  # ]:           0 :                          relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     619                 :           0 :                         continue;
     620                 :             : 
     621                 :           0 :                 relstate = palloc_object(SubscriptionRelState);
     622                 :           0 :                 relstate->relid = subrel->srrelid;
     623                 :           0 :                 relstate->state = subrel->srsubstate;
     624                 :           0 :                 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     625                 :             :                                                         Anum_pg_subscription_rel_srsublsn, &isnull);
     626         [ #  # ]:           0 :                 if (isnull)
     627                 :           0 :                         relstate->lsn = InvalidXLogRecPtr;
     628                 :             :                 else
     629                 :           0 :                         relstate->lsn = DatumGetLSN(d);
     630                 :             : 
     631                 :           0 :                 res = lappend(res, relstate);
     632      [ #  #  # ]:           0 :         }
     633                 :             : 
     634                 :             :         /* Cleanup */
     635                 :          13 :         systable_endscan(scan);
     636                 :          13 :         table_close(rel, AccessShareLock);
     637                 :             : 
     638                 :          26 :         return res;
     639                 :          13 : }
     640                 :             : 
     641                 :             : /*
     642                 :             :  * Update the dead tuple retention status for the given subscription.
     643                 :             :  */
     644                 :             : void
     645                 :           0 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     646                 :             : {
     647                 :           0 :         Relation        rel;
     648                 :           0 :         bool            nulls[Natts_pg_subscription];
     649                 :           0 :         bool            replaces[Natts_pg_subscription];
     650                 :           0 :         Datum           values[Natts_pg_subscription];
     651                 :           0 :         HeapTuple       tup;
     652                 :             : 
     653                 :             :         /* Look up the subscription in the catalog */
     654                 :           0 :         rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     655                 :           0 :         tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     656                 :             : 
     657         [ #  # ]:           0 :         if (!HeapTupleIsValid(tup))
     658   [ #  #  #  # ]:           0 :                 elog(ERROR, "cache lookup failed for subscription %u", subid);
     659                 :             : 
     660                 :           0 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     661                 :             : 
     662                 :             :         /* Form a new tuple. */
     663                 :           0 :         memset(values, 0, sizeof(values));
     664                 :           0 :         memset(nulls, false, sizeof(nulls));
     665                 :           0 :         memset(replaces, false, sizeof(replaces));
     666                 :             : 
     667                 :             :         /* Set the subscription to disabled. */
     668                 :           0 :         values[Anum_pg_subscription_subretentionactive - 1] = active;
     669                 :           0 :         replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     670                 :             : 
     671                 :             :         /* Update the catalog */
     672                 :           0 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     673                 :           0 :                                                         replaces);
     674                 :           0 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
     675                 :           0 :         heap_freetuple(tup);
     676                 :             : 
     677                 :           0 :         table_close(rel, NoLock);
     678                 :           0 : }
        

Generated by: LCOV version 2.3.2-1