LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 5.5 % 452 25
Test Date: 2026-01-26 10:56:24 Functions: 6.2 % 16 1
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 15.2 % 330 50

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * execReplication.c
       4                 :             :  *        miscellaneous executor routines for logical replication
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *        src/backend/executor/execReplication.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/amapi.h"
      18                 :             : #include "access/commit_ts.h"
      19                 :             : #include "access/genam.h"
      20                 :             : #include "access/gist.h"
      21                 :             : #include "access/relscan.h"
      22                 :             : #include "access/tableam.h"
      23                 :             : #include "access/transam.h"
      24                 :             : #include "access/xact.h"
      25                 :             : #include "access/heapam.h"
      26                 :             : #include "catalog/pg_am_d.h"
      27                 :             : #include "commands/trigger.h"
      28                 :             : #include "executor/executor.h"
      29                 :             : #include "executor/nodeModifyTable.h"
      30                 :             : #include "replication/conflict.h"
      31                 :             : #include "replication/logicalrelation.h"
      32                 :             : #include "storage/lmgr.h"
      33                 :             : #include "utils/builtins.h"
      34                 :             : #include "utils/lsyscache.h"
      35                 :             : #include "utils/rel.h"
      36                 :             : #include "utils/snapmgr.h"
      37                 :             : #include "utils/syscache.h"
      38                 :             : #include "utils/typcache.h"
      39                 :             : 
      40                 :             : 
      41                 :             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      42                 :             :                                                  TypeCacheEntry **eq, Bitmapset *columns);
      43                 :             : 
      44                 :             : /*
      45                 :             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      46                 :             :  * is setup to match 'rel' (*NOT* idxrel!).
      47                 :             :  *
      48                 :             :  * Returns how many columns to use for the index scan.
      49                 :             :  *
      50                 :             :  * This is not a generic routine, idxrel must be PK, RI, or an index that can be
      51                 :             :  * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      52                 :             :  * for details.
      53                 :             :  *
      54                 :             :  * By definition, replication identity of a rel meets all limitations associated
      55                 :             :  * with that. Note that any other index could also meet these limitations.
      56                 :             :  */
      57                 :             : static int
      58                 :           0 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      59                 :             :                                                  TupleTableSlot *searchslot)
      60                 :             : {
      61                 :           0 :         int                     index_attoff;
      62                 :           0 :         int                     skey_attoff = 0;
      63                 :           0 :         Datum           indclassDatum;
      64                 :           0 :         oidvector  *opclass;
      65                 :           0 :         int2vector *indkey = &idxrel->rd_index->indkey;
      66                 :             : 
      67                 :           0 :         indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      68                 :             :                                                                                    Anum_pg_index_indclass);
      69                 :           0 :         opclass = (oidvector *) DatumGetPointer(indclassDatum);
      70                 :             : 
      71                 :             :         /* Build scankey for every non-expression attribute in the index. */
      72         [ #  # ]:           0 :         for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      73                 :           0 :                  index_attoff++)
      74                 :             :         {
      75                 :           0 :                 Oid                     operator;
      76                 :           0 :                 Oid                     optype;
      77                 :           0 :                 Oid                     opfamily;
      78                 :           0 :                 RegProcedure regop;
      79                 :           0 :                 int                     table_attno = indkey->values[index_attoff];
      80                 :           0 :                 StrategyNumber eq_strategy;
      81                 :             : 
      82         [ #  # ]:           0 :                 if (!AttributeNumberIsValid(table_attno))
      83                 :             :                 {
      84                 :             :                         /*
      85                 :             :                          * XXX: Currently, we don't support expressions in the scan key,
      86                 :             :                          * see code below.
      87                 :             :                          */
      88                 :           0 :                         continue;
      89                 :             :                 }
      90                 :             : 
      91                 :             :                 /*
      92                 :             :                  * Load the operator info.  We need this to get the equality operator
      93                 :             :                  * function for the scan key.
      94                 :             :                  */
      95                 :           0 :                 optype = get_opclass_input_type(opclass->values[index_attoff]);
      96                 :           0 :                 opfamily = get_opclass_family(opclass->values[index_attoff]);
      97                 :           0 :                 eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
      98                 :           0 :                 operator = get_opfamily_member(opfamily, optype,
      99                 :           0 :                                                                            optype,
     100                 :           0 :                                                                            eq_strategy);
     101                 :             : 
     102         [ #  # ]:           0 :                 if (!OidIsValid(operator))
     103   [ #  #  #  # ]:           0 :                         elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     104                 :             :                                  eq_strategy, optype, optype, opfamily);
     105                 :             : 
     106                 :           0 :                 regop = get_opcode(operator);
     107                 :             : 
     108                 :             :                 /* Initialize the scankey. */
     109                 :           0 :                 ScanKeyInit(&skey[skey_attoff],
     110                 :           0 :                                         index_attoff + 1,
     111                 :           0 :                                         eq_strategy,
     112                 :           0 :                                         regop,
     113                 :           0 :                                         searchslot->tts_values[table_attno - 1]);
     114                 :             : 
     115                 :           0 :                 skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     116                 :             : 
     117                 :             :                 /* Check for null value. */
     118         [ #  # ]:           0 :                 if (searchslot->tts_isnull[table_attno - 1])
     119                 :           0 :                         skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     120                 :             : 
     121                 :           0 :                 skey_attoff++;
     122      [ #  #  # ]:           0 :         }
     123                 :             : 
     124                 :             :         /* There must always be at least one attribute for the index scan. */
     125         [ #  # ]:           0 :         Assert(skey_attoff > 0);
     126                 :             : 
     127                 :           0 :         return skey_attoff;
     128                 :           0 : }
     129                 :             : 
     130                 :             : 
     131                 :             : /*
     132                 :             :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     133                 :             :  * due to concurrent modifications. This function should be called after
     134                 :             :  * invoking table_tuple_lock.
     135                 :             :  */
     136                 :             : static bool
     137                 :           0 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     138                 :             : {
     139                 :           0 :         bool            refetch = false;
     140                 :             : 
     141   [ #  #  #  #  :           0 :         switch (res)
                      # ]
     142                 :             :         {
     143                 :             :                 case TM_Ok:
     144                 :             :                         break;
     145                 :             :                 case TM_Updated:
     146                 :             :                         /* XXX: Improve handling here */
     147         [ #  # ]:           0 :                         if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     148   [ #  #  #  # ]:           0 :                                 ereport(LOG,
     149                 :             :                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     150                 :             :                                                  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     151                 :             :                         else
     152   [ #  #  #  # ]:           0 :                                 ereport(LOG,
     153                 :             :                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     154                 :             :                                                  errmsg("concurrent update, retrying")));
     155                 :           0 :                         refetch = true;
     156                 :           0 :                         break;
     157                 :             :                 case TM_Deleted:
     158                 :             :                         /* XXX: Improve handling here */
     159   [ #  #  #  # ]:           0 :                         ereport(LOG,
     160                 :             :                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     161                 :             :                                          errmsg("concurrent delete, retrying")));
     162                 :           0 :                         refetch = true;
     163                 :           0 :                         break;
     164                 :             :                 case TM_Invisible:
     165   [ #  #  #  # ]:           0 :                         elog(ERROR, "attempted to lock invisible tuple");
     166                 :           0 :                         break;
     167                 :             :                 default:
     168   [ #  #  #  # ]:           0 :                         elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     169                 :           0 :                         break;
     170                 :             :         }
     171                 :             : 
     172                 :           0 :         return refetch;
     173                 :           0 : }
     174                 :             : 
     175                 :             : /*
     176                 :             :  * Search the relation 'rel' for tuple using the index.
     177                 :             :  *
     178                 :             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     179                 :             :  * contents, and return true.  Return false otherwise.
     180                 :             :  */
     181                 :             : bool
     182                 :           0 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     183                 :             :                                                          LockTupleMode lockmode,
     184                 :             :                                                          TupleTableSlot *searchslot,
     185                 :             :                                                          TupleTableSlot *outslot)
     186                 :             : {
     187                 :           0 :         ScanKeyData skey[INDEX_MAX_KEYS];
     188                 :           0 :         int                     skey_attoff;
     189                 :           0 :         IndexScanDesc scan;
     190                 :           0 :         SnapshotData snap;
     191                 :           0 :         TransactionId xwait;
     192                 :           0 :         Relation        idxrel;
     193                 :           0 :         bool            found;
     194                 :           0 :         TypeCacheEntry **eq = NULL;
     195                 :           0 :         bool            isIdxSafeToSkipDuplicates;
     196                 :             : 
     197                 :             :         /* Open the index. */
     198                 :           0 :         idxrel = index_open(idxoid, RowExclusiveLock);
     199                 :             : 
     200                 :           0 :         isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     201                 :             : 
     202                 :           0 :         InitDirtySnapshot(snap);
     203                 :             : 
     204                 :             :         /* Build scan key. */
     205                 :           0 :         skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     206                 :             : 
     207                 :             :         /* Start an index scan. */
     208                 :           0 :         scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
     209                 :             : 
     210                 :             : retry:
     211                 :           0 :         found = false;
     212                 :             : 
     213                 :           0 :         index_rescan(scan, skey, skey_attoff, NULL, 0);
     214                 :             : 
     215                 :             :         /* Try to find the tuple */
     216         [ #  # ]:           0 :         while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     217                 :             :         {
     218                 :             :                 /*
     219                 :             :                  * Avoid expensive equality check if the index is primary key or
     220                 :             :                  * replica identity index.
     221                 :             :                  */
     222         [ #  # ]:           0 :                 if (!isIdxSafeToSkipDuplicates)
     223                 :             :                 {
     224         [ #  # ]:           0 :                         if (eq == NULL)
     225                 :           0 :                                 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     226                 :             : 
     227         [ #  # ]:           0 :                         if (!tuples_equal(outslot, searchslot, eq, NULL))
     228                 :           0 :                                 continue;
     229                 :           0 :                 }
     230                 :             : 
     231                 :           0 :                 ExecMaterializeSlot(outslot);
     232                 :             : 
     233         [ #  # ]:           0 :                 xwait = TransactionIdIsValid(snap.xmin) ?
     234                 :           0 :                         snap.xmin : snap.xmax;
     235                 :             : 
     236                 :             :                 /*
     237                 :             :                  * If the tuple is locked, wait for locking transaction to finish and
     238                 :             :                  * retry.
     239                 :             :                  */
     240         [ #  # ]:           0 :                 if (TransactionIdIsValid(xwait))
     241                 :             :                 {
     242                 :           0 :                         XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     243                 :           0 :                         goto retry;
     244                 :             :                 }
     245                 :             : 
     246                 :             :                 /* Found our tuple and it's not locked */
     247                 :           0 :                 found = true;
     248                 :           0 :                 break;
     249                 :             :         }
     250                 :             : 
     251                 :             :         /* Found tuple, try to lock it in the lockmode. */
     252         [ #  # ]:           0 :         if (found)
     253                 :             :         {
     254                 :           0 :                 TM_FailureData tmfd;
     255                 :           0 :                 TM_Result       res;
     256                 :             : 
     257                 :           0 :                 PushActiveSnapshot(GetLatestSnapshot());
     258                 :             : 
     259                 :           0 :                 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     260                 :           0 :                                                            outslot,
     261                 :           0 :                                                            GetCurrentCommandId(false),
     262                 :           0 :                                                            lockmode,
     263                 :             :                                                            LockWaitBlock,
     264                 :             :                                                            0 /* don't follow updates */ ,
     265                 :             :                                                            &tmfd);
     266                 :             : 
     267                 :           0 :                 PopActiveSnapshot();
     268                 :             : 
     269         [ #  # ]:           0 :                 if (should_refetch_tuple(res, &tmfd))
     270                 :           0 :                         goto retry;
     271      [ #  #  # ]:           0 :         }
     272                 :             : 
     273                 :           0 :         index_endscan(scan);
     274                 :             : 
     275                 :             :         /* Don't release lock until commit. */
     276                 :           0 :         index_close(idxrel, NoLock);
     277                 :             : 
     278                 :           0 :         return found;
     279                 :           0 : }
     280                 :             : 
     281                 :             : /*
     282                 :             :  * Compare the tuples in the slots by checking if they have equal values.
     283                 :             :  *
     284                 :             :  * If 'columns' is not null, only the columns specified within it will be
     285                 :             :  * considered for the equality check, ignoring all other columns.
     286                 :             :  */
     287                 :             : static bool
     288                 :           0 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     289                 :             :                          TypeCacheEntry **eq, Bitmapset *columns)
     290                 :             : {
     291                 :           0 :         int                     attrnum;
     292                 :             : 
     293         [ #  # ]:           0 :         Assert(slot1->tts_tupleDescriptor->natts ==
     294                 :             :                    slot2->tts_tupleDescriptor->natts);
     295                 :             : 
     296                 :           0 :         slot_getallattrs(slot1);
     297                 :           0 :         slot_getallattrs(slot2);
     298                 :             : 
     299                 :             :         /* Check equality of the attributes. */
     300         [ #  # ]:           0 :         for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     301                 :             :         {
     302                 :           0 :                 Form_pg_attribute att;
     303                 :           0 :                 TypeCacheEntry *typentry;
     304                 :             : 
     305                 :           0 :                 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     306                 :             : 
     307                 :             :                 /*
     308                 :             :                  * Ignore dropped and generated columns as the publisher doesn't send
     309                 :             :                  * those
     310                 :             :                  */
     311   [ #  #  #  # ]:           0 :                 if (att->attisdropped || att->attgenerated)
     312                 :           0 :                         continue;
     313                 :             : 
     314                 :             :                 /*
     315                 :             :                  * Ignore columns that are not listed for checking.
     316                 :             :                  */
     317   [ #  #  #  # ]:           0 :                 if (columns &&
     318                 :           0 :                         !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     319                 :           0 :                                                    columns))
     320                 :           0 :                         continue;
     321                 :             : 
     322                 :             :                 /*
     323                 :             :                  * If one value is NULL and other is not, then they are certainly not
     324                 :             :                  * equal
     325                 :             :                  */
     326         [ #  # ]:           0 :                 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     327                 :           0 :                         return false;
     328                 :             : 
     329                 :             :                 /*
     330                 :             :                  * If both are NULL, they can be considered equal.
     331                 :             :                  */
     332   [ #  #  #  # ]:           0 :                 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     333                 :           0 :                         continue;
     334                 :             : 
     335                 :           0 :                 typentry = eq[attrnum];
     336         [ #  # ]:           0 :                 if (typentry == NULL)
     337                 :             :                 {
     338                 :           0 :                         typentry = lookup_type_cache(att->atttypid,
     339                 :             :                                                                                  TYPECACHE_EQ_OPR_FINFO);
     340         [ #  # ]:           0 :                         if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     341   [ #  #  #  # ]:           0 :                                 ereport(ERROR,
     342                 :             :                                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
     343                 :             :                                                  errmsg("could not identify an equality operator for type %s",
     344                 :             :                                                                 format_type_be(att->atttypid))));
     345                 :           0 :                         eq[attrnum] = typentry;
     346                 :           0 :                 }
     347                 :             : 
     348   [ #  #  #  # ]:           0 :                 if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     349                 :           0 :                                                                                         att->attcollation,
     350                 :           0 :                                                                                         slot1->tts_values[attrnum],
     351                 :           0 :                                                                                         slot2->tts_values[attrnum])))
     352                 :           0 :                         return false;
     353      [ #  #  # ]:           0 :         }
     354                 :             : 
     355                 :           0 :         return true;
     356                 :           0 : }
     357                 :             : 
     358                 :             : /*
     359                 :             :  * Search the relation 'rel' for tuple using the sequential scan.
     360                 :             :  *
     361                 :             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     362                 :             :  * contents, and return true.  Return false otherwise.
     363                 :             :  *
     364                 :             :  * Note that this stops on the first matching tuple.
     365                 :             :  *
     366                 :             :  * This can obviously be quite slow on tables that have more than few rows.
     367                 :             :  */
     368                 :             : bool
     369                 :           0 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     370                 :             :                                                  TupleTableSlot *searchslot, TupleTableSlot *outslot)
     371                 :             : {
     372                 :           0 :         TupleTableSlot *scanslot;
     373                 :           0 :         TableScanDesc scan;
     374                 :           0 :         SnapshotData snap;
     375                 :           0 :         TypeCacheEntry **eq;
     376                 :           0 :         TransactionId xwait;
     377                 :           0 :         bool            found;
     378                 :           0 :         TupleDesc       desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     379                 :             : 
     380         [ #  # ]:           0 :         Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     381                 :             : 
     382                 :           0 :         eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     383                 :             : 
     384                 :             :         /* Start a heap scan. */
     385                 :           0 :         InitDirtySnapshot(snap);
     386                 :           0 :         scan = table_beginscan(rel, &snap, 0, NULL);
     387                 :           0 :         scanslot = table_slot_create(rel, NULL);
     388                 :             : 
     389                 :             : retry:
     390                 :           0 :         found = false;
     391                 :             : 
     392                 :           0 :         table_rescan(scan, NULL);
     393                 :             : 
     394                 :             :         /* Try to find the tuple */
     395         [ #  # ]:           0 :         while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     396                 :             :         {
     397         [ #  # ]:           0 :                 if (!tuples_equal(scanslot, searchslot, eq, NULL))
     398                 :           0 :                         continue;
     399                 :             : 
     400                 :           0 :                 found = true;
     401                 :           0 :                 ExecCopySlot(outslot, scanslot);
     402                 :             : 
     403         [ #  # ]:           0 :                 xwait = TransactionIdIsValid(snap.xmin) ?
     404                 :           0 :                         snap.xmin : snap.xmax;
     405                 :             : 
     406                 :             :                 /*
     407                 :             :                  * If the tuple is locked, wait for locking transaction to finish and
     408                 :             :                  * retry.
     409                 :             :                  */
     410         [ #  # ]:           0 :                 if (TransactionIdIsValid(xwait))
     411                 :             :                 {
     412                 :           0 :                         XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     413                 :           0 :                         goto retry;
     414                 :             :                 }
     415                 :             : 
     416                 :             :                 /* Found our tuple and it's not locked */
     417                 :           0 :                 break;
     418                 :             :         }
     419                 :             : 
     420                 :             :         /* Found tuple, try to lock it in the lockmode. */
     421         [ #  # ]:           0 :         if (found)
     422                 :             :         {
     423                 :           0 :                 TM_FailureData tmfd;
     424                 :           0 :                 TM_Result       res;
     425                 :             : 
     426                 :           0 :                 PushActiveSnapshot(GetLatestSnapshot());
     427                 :             : 
     428                 :           0 :                 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     429                 :           0 :                                                            outslot,
     430                 :           0 :                                                            GetCurrentCommandId(false),
     431                 :           0 :                                                            lockmode,
     432                 :             :                                                            LockWaitBlock,
     433                 :             :                                                            0 /* don't follow updates */ ,
     434                 :             :                                                            &tmfd);
     435                 :             : 
     436                 :           0 :                 PopActiveSnapshot();
     437                 :             : 
     438         [ #  # ]:           0 :                 if (should_refetch_tuple(res, &tmfd))
     439                 :           0 :                         goto retry;
     440      [ #  #  # ]:           0 :         }
     441                 :             : 
     442                 :           0 :         table_endscan(scan);
     443                 :           0 :         ExecDropSingleTupleTableSlot(scanslot);
     444                 :             : 
     445                 :           0 :         return found;
     446                 :           0 : }
     447                 :             : 
     448                 :             : /*
     449                 :             :  * Build additional index information necessary for conflict detection.
     450                 :             :  */
     451                 :             : static void
     452                 :           0 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
     453                 :             : {
     454         [ #  # ]:           0 :         for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
     455                 :             :         {
     456                 :           0 :                 Relation        indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
     457                 :           0 :                 IndexInfo  *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
     458                 :             : 
     459         [ #  # ]:           0 :                 if (conflictindex != RelationGetRelid(indexRelation))
     460                 :           0 :                         continue;
     461                 :             : 
     462                 :             :                 /*
     463                 :             :                  * This Assert will fail if BuildSpeculativeIndexInfo() is called
     464                 :             :                  * twice for the given index.
     465                 :             :                  */
     466         [ #  # ]:           0 :                 Assert(indexRelationInfo->ii_UniqueOps == NULL);
     467                 :             : 
     468                 :           0 :                 BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
     469      [ #  #  # ]:           0 :         }
     470                 :           0 : }
     471                 :             : 
     472                 :             : /*
     473                 :             :  * If the tuple is recently dead and was deleted by a transaction with a newer
     474                 :             :  * commit timestamp than previously recorded, update the associated transaction
     475                 :             :  * ID, commit time, and origin. This helps ensure that conflict detection uses
     476                 :             :  * the most recent and relevant deletion metadata.
     477                 :             :  */
     478                 :             : static void
     479                 :           0 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
     480                 :             :                                                                  TransactionId oldestxmin,
     481                 :             :                                                                  TransactionId *delete_xid,
     482                 :             :                                                                  TimestampTz *delete_time,
     483                 :             :                                                                  RepOriginId *delete_origin)
     484                 :             : {
     485                 :           0 :         BufferHeapTupleTableSlot *hslot;
     486                 :           0 :         HeapTuple       tuple;
     487                 :           0 :         Buffer          buf;
     488                 :           0 :         bool            recently_dead = false;
     489                 :           0 :         TransactionId xmax;
     490                 :           0 :         TimestampTz localts;
     491                 :           0 :         RepOriginId localorigin;
     492                 :             : 
     493                 :           0 :         hslot = (BufferHeapTupleTableSlot *) scanslot;
     494                 :             : 
     495                 :           0 :         tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
     496                 :           0 :         buf = hslot->buffer;
     497                 :             : 
     498                 :           0 :         LockBuffer(buf, BUFFER_LOCK_SHARE);
     499                 :             : 
     500                 :             :         /*
     501                 :             :          * We do not consider HEAPTUPLE_DEAD status because it indicates either
     502                 :             :          * tuples whose inserting transaction was aborted (meaning there is no
     503                 :             :          * commit timestamp or origin), or tuples deleted by a transaction older
     504                 :             :          * than oldestxmin, making it safe to ignore them during conflict
     505                 :             :          * detection (See comments atop worker.c for details).
     506                 :             :          */
     507         [ #  # ]:           0 :         if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
     508                 :           0 :                 recently_dead = true;
     509                 :             : 
     510                 :           0 :         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     511                 :             : 
     512         [ #  # ]:           0 :         if (!recently_dead)
     513                 :           0 :                 return;
     514                 :             : 
     515                 :           0 :         xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
     516         [ #  # ]:           0 :         if (!TransactionIdIsValid(xmax))
     517                 :           0 :                 return;
     518                 :             : 
     519                 :             :         /* Select the dead tuple with the most recent commit timestamp */
     520   [ #  #  #  # ]:           0 :         if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
     521                 :           0 :                 TimestampDifferenceExceeds(*delete_time, localts, 0))
     522                 :             :         {
     523                 :           0 :                 *delete_xid = xmax;
     524                 :           0 :                 *delete_time = localts;
     525                 :           0 :                 *delete_origin = localorigin;
     526                 :           0 :         }
     527         [ #  # ]:           0 : }
     528                 :             : 
     529                 :             : /*
     530                 :             :  * Searches the relation 'rel' for the most recently deleted tuple that matches
     531                 :             :  * the values in 'searchslot' and is not yet removable by VACUUM. The function
     532                 :             :  * returns the transaction ID, origin, and commit timestamp of the transaction
     533                 :             :  * that deleted this tuple.
     534                 :             :  *
     535                 :             :  * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
     536                 :             :  * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
     537                 :             :  * conflict detection.
     538                 :             :  *
     539                 :             :  * Instead of stopping at the first match, we scan all matching dead tuples to
     540                 :             :  * identify most recent deletion. This is crucial because only the latest
     541                 :             :  * deletion is relevant for resolving conflicts.
     542                 :             :  *
     543                 :             :  * For example, consider a scenario on the subscriber where a row is deleted,
     544                 :             :  * re-inserted, and then deleted again only on the subscriber:
     545                 :             :  *
     546                 :             :  *   - (pk, 1) - deleted at 9:00,
     547                 :             :  *   - (pk, 1) - deleted at 9:02,
     548                 :             :  *
     549                 :             :  * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
     550                 :             :  *
     551                 :             :  * If we mistakenly return the older deletion (9:00), the system may wrongly
     552                 :             :  * apply the remote update using a last-update-wins strategy. Instead, we must
     553                 :             :  * recognize the more recent deletion at 9:02 and skip the update. See
     554                 :             :  * comments atop worker.c for details. Note, as of now, conflict resolution
     555                 :             :  * is not implemented. Consequently, the system may incorrectly report the
     556                 :             :  * older tuple as the conflicted one, leading to misleading results.
     557                 :             :  *
     558                 :             :  * The commit timestamp of the deleting transaction is used to determine which
     559                 :             :  * tuple was deleted most recently.
     560                 :             :  */
     561                 :             : bool
     562                 :           0 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
     563                 :             :                                                                 TransactionId oldestxmin,
     564                 :             :                                                                 TransactionId *delete_xid,
     565                 :             :                                                                 RepOriginId *delete_origin,
     566                 :             :                                                                 TimestampTz *delete_time)
     567                 :             : {
     568                 :           0 :         TupleTableSlot *scanslot;
     569                 :           0 :         TableScanDesc scan;
     570                 :           0 :         TypeCacheEntry **eq;
     571                 :           0 :         Bitmapset  *indexbitmap;
     572                 :           0 :         TupleDesc       desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     573                 :             : 
     574         [ #  # ]:           0 :         Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     575                 :             : 
     576                 :           0 :         *delete_xid = InvalidTransactionId;
     577                 :           0 :         *delete_origin = InvalidRepOriginId;
     578                 :           0 :         *delete_time = 0;
     579                 :             : 
     580                 :             :         /*
     581                 :             :          * If the relation has a replica identity key or a primary key that is
     582                 :             :          * unusable for locating deleted tuples (see
     583                 :             :          * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
     584                 :             :          * necessary. In such cases, comparing the entire tuple is not required,
     585                 :             :          * since the remote tuple might not include all column values. Instead,
     586                 :             :          * the indexed columns alone are sufficient to identify the target tuple
     587                 :             :          * (see logicalrep_rel_mark_updatable).
     588                 :             :          */
     589                 :           0 :         indexbitmap = RelationGetIndexAttrBitmap(rel,
     590                 :             :                                                                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     591                 :             : 
     592                 :             :         /* fallback to PK if no replica identity */
     593         [ #  # ]:           0 :         if (!indexbitmap)
     594                 :           0 :                 indexbitmap = RelationGetIndexAttrBitmap(rel,
     595                 :             :                                                                                                  INDEX_ATTR_BITMAP_PRIMARY_KEY);
     596                 :             : 
     597                 :           0 :         eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
     598                 :             : 
     599                 :             :         /*
     600                 :             :          * Start a heap scan using SnapshotAny to identify dead tuples that are
     601                 :             :          * not visible under a standard MVCC snapshot. Tuples from transactions
     602                 :             :          * not yet committed or those just committed prior to the scan are
     603                 :             :          * excluded in update_most_recent_deletion_info().
     604                 :             :          */
     605                 :           0 :         scan = table_beginscan(rel, SnapshotAny, 0, NULL);
     606                 :           0 :         scanslot = table_slot_create(rel, NULL);
     607                 :             : 
     608                 :           0 :         table_rescan(scan, NULL);
     609                 :             : 
     610                 :             :         /* Try to find the tuple */
     611         [ #  # ]:           0 :         while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     612                 :             :         {
     613         [ #  # ]:           0 :                 if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
     614                 :           0 :                         continue;
     615                 :             : 
     616                 :           0 :                 update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     617                 :           0 :                                                                                  delete_time, delete_origin);
     618                 :             :         }
     619                 :             : 
     620                 :           0 :         table_endscan(scan);
     621                 :           0 :         ExecDropSingleTupleTableSlot(scanslot);
     622                 :             : 
     623                 :           0 :         return *delete_time != 0;
     624                 :           0 : }
     625                 :             : 
     626                 :             : /*
     627                 :             :  * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
     628                 :             :  * the deleted tuple.
     629                 :             :  */
     630                 :             : bool
     631                 :           0 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
     632                 :             :                                                                         TupleTableSlot *searchslot,
     633                 :             :                                                                         TransactionId oldestxmin,
     634                 :             :                                                                         TransactionId *delete_xid,
     635                 :             :                                                                         RepOriginId *delete_origin,
     636                 :             :                                                                         TimestampTz *delete_time)
     637                 :             : {
     638                 :           0 :         Relation        idxrel;
     639                 :           0 :         ScanKeyData skey[INDEX_MAX_KEYS];
     640                 :           0 :         int                     skey_attoff;
     641                 :           0 :         IndexScanDesc scan;
     642                 :           0 :         TupleTableSlot *scanslot;
     643                 :           0 :         TypeCacheEntry **eq = NULL;
     644                 :           0 :         bool            isIdxSafeToSkipDuplicates;
     645                 :           0 :         TupleDesc       desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     646                 :             : 
     647         [ #  # ]:           0 :         Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     648         [ #  # ]:           0 :         Assert(OidIsValid(idxoid));
     649                 :             : 
     650                 :           0 :         *delete_xid = InvalidTransactionId;
     651                 :           0 :         *delete_time = 0;
     652                 :           0 :         *delete_origin = InvalidRepOriginId;
     653                 :             : 
     654                 :           0 :         isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     655                 :             : 
     656                 :           0 :         scanslot = table_slot_create(rel, NULL);
     657                 :             : 
     658                 :           0 :         idxrel = index_open(idxoid, RowExclusiveLock);
     659                 :             : 
     660                 :             :         /* Build scan key. */
     661                 :           0 :         skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     662                 :             : 
     663                 :             :         /*
     664                 :             :          * Start an index scan using SnapshotAny to identify dead tuples that are
     665                 :             :          * not visible under a standard MVCC snapshot. Tuples from transactions
     666                 :             :          * not yet committed or those just committed prior to the scan are
     667                 :             :          * excluded in update_most_recent_deletion_info().
     668                 :             :          */
     669                 :           0 :         scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
     670                 :             : 
     671                 :           0 :         index_rescan(scan, skey, skey_attoff, NULL, 0);
     672                 :             : 
     673                 :             :         /* Try to find the tuple */
     674         [ #  # ]:           0 :         while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
     675                 :             :         {
     676                 :             :                 /*
     677                 :             :                  * Avoid expensive equality check if the index is primary key or
     678                 :             :                  * replica identity index.
     679                 :             :                  */
     680         [ #  # ]:           0 :                 if (!isIdxSafeToSkipDuplicates)
     681                 :             :                 {
     682         [ #  # ]:           0 :                         if (eq == NULL)
     683                 :           0 :                                 eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
     684                 :             : 
     685         [ #  # ]:           0 :                         if (!tuples_equal(scanslot, searchslot, eq, NULL))
     686                 :           0 :                                 continue;
     687                 :           0 :                 }
     688                 :             : 
     689                 :           0 :                 update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     690                 :           0 :                                                                                  delete_time, delete_origin);
     691                 :             :         }
     692                 :             : 
     693                 :           0 :         index_endscan(scan);
     694                 :             : 
     695                 :           0 :         index_close(idxrel, NoLock);
     696                 :             : 
     697                 :           0 :         ExecDropSingleTupleTableSlot(scanslot);
     698                 :             : 
     699                 :           0 :         return *delete_time != 0;
     700                 :           0 : }
     701                 :             : 
     702                 :             : /*
     703                 :             :  * Find the tuple that violates the passed unique index (conflictindex).
     704                 :             :  *
     705                 :             :  * If the conflicting tuple is found return true, otherwise false.
     706                 :             :  *
     707                 :             :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     708                 :             :  * the required information. Note that if the tuple is deleted before a lock
     709                 :             :  * is acquired, we will retry to find the conflicting tuple again.
     710                 :             :  */
     711                 :             : static bool
     712                 :           0 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     713                 :             :                                   Oid conflictindex, TupleTableSlot *slot,
     714                 :             :                                   TupleTableSlot **conflictslot)
     715                 :             : {
     716                 :           0 :         Relation        rel = resultRelInfo->ri_RelationDesc;
     717                 :           0 :         ItemPointerData conflictTid;
     718                 :           0 :         TM_FailureData tmfd;
     719                 :           0 :         TM_Result       res;
     720                 :             : 
     721                 :           0 :         *conflictslot = NULL;
     722                 :             : 
     723                 :             :         /*
     724                 :             :          * Build additional information required to check constraints violations.
     725                 :             :          * See check_exclusion_or_unique_constraint().
     726                 :             :          */
     727                 :           0 :         BuildConflictIndexInfo(resultRelInfo, conflictindex);
     728                 :             : 
     729                 :             : retry:
     730   [ #  #  #  # ]:           0 :         if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     731                 :           0 :                                                                   &conflictTid, &slot->tts_tid,
     732                 :           0 :                                                                   list_make1_oid(conflictindex)))
     733                 :             :         {
     734         [ #  # ]:           0 :                 if (*conflictslot)
     735                 :           0 :                         ExecDropSingleTupleTableSlot(*conflictslot);
     736                 :             : 
     737                 :           0 :                 *conflictslot = NULL;
     738                 :           0 :                 return false;
     739                 :             :         }
     740                 :             : 
     741                 :           0 :         *conflictslot = table_slot_create(rel, NULL);
     742                 :             : 
     743                 :           0 :         PushActiveSnapshot(GetLatestSnapshot());
     744                 :             : 
     745                 :           0 :         res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
     746                 :           0 :                                                    *conflictslot,
     747                 :           0 :                                                    GetCurrentCommandId(false),
     748                 :             :                                                    LockTupleShare,
     749                 :             :                                                    LockWaitBlock,
     750                 :             :                                                    0 /* don't follow updates */ ,
     751                 :             :                                                    &tmfd);
     752                 :             : 
     753                 :           0 :         PopActiveSnapshot();
     754                 :             : 
     755         [ #  # ]:           0 :         if (should_refetch_tuple(res, &tmfd))
     756                 :           0 :                 goto retry;
     757                 :             : 
     758                 :           0 :         return true;
     759                 :           0 : }
     760                 :             : 
     761                 :             : /*
     762                 :             :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     763                 :             :  * tuple in 'remoteslot' and report if found.
     764                 :             :  */
     765                 :             : static void
     766                 :           0 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     767                 :             :                                            ConflictType type, List *recheckIndexes,
     768                 :             :                                            TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     769                 :             : {
     770                 :           0 :         List       *conflicttuples = NIL;
     771                 :           0 :         TupleTableSlot *conflictslot;
     772                 :             : 
     773                 :             :         /* Check all the unique indexes for conflicts */
     774   [ #  #  #  #  :           0 :         foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
             #  #  #  # ]
     775                 :             :         {
     776   [ #  #  #  # ]:           0 :                 if (list_member_oid(recheckIndexes, uniqueidx) &&
     777                 :           0 :                         FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     778                 :             :                                                           &conflictslot))
     779                 :             :                 {
     780                 :           0 :                         ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
     781                 :             : 
     782                 :           0 :                         conflicttuple->slot = conflictslot;
     783                 :           0 :                         conflicttuple->indexoid = uniqueidx;
     784                 :             : 
     785                 :           0 :                         GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
     786                 :           0 :                                                                         &conflicttuple->origin, &conflicttuple->ts);
     787                 :             : 
     788                 :           0 :                         conflicttuples = lappend(conflicttuples, conflicttuple);
     789                 :           0 :                 }
     790                 :           0 :         }
     791                 :             : 
     792                 :             :         /* Report the conflict, if found */
     793         [ #  # ]:           0 :         if (conflicttuples)
     794                 :           0 :                 ReportApplyConflict(estate, resultRelInfo, ERROR,
     795         [ #  # ]:           0 :                                                         list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
     796                 :           0 :                                                         searchslot, remoteslot, conflicttuples);
     797                 :           0 : }
     798                 :             : 
     799                 :             : /*
     800                 :             :  * Insert tuple represented in the slot to the relation, update the indexes,
     801                 :             :  * and execute any constraints and per-row triggers.
     802                 :             :  *
     803                 :             :  * Caller is responsible for opening the indexes.
     804                 :             :  */
     805                 :             : void
     806                 :           0 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     807                 :             :                                                  EState *estate, TupleTableSlot *slot)
     808                 :             : {
     809                 :           0 :         bool            skip_tuple = false;
     810                 :           0 :         Relation        rel = resultRelInfo->ri_RelationDesc;
     811                 :             : 
     812                 :             :         /* For now we support only tables. */
     813         [ #  # ]:           0 :         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     814                 :             : 
     815                 :           0 :         CheckCmdReplicaIdentity(rel, CMD_INSERT);
     816                 :             : 
     817                 :             :         /* BEFORE ROW INSERT Triggers */
     818   [ #  #  #  # ]:           0 :         if (resultRelInfo->ri_TrigDesc &&
     819                 :           0 :                 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     820                 :             :         {
     821         [ #  # ]:           0 :                 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     822                 :           0 :                         skip_tuple = true;      /* "do nothing" */
     823                 :           0 :         }
     824                 :             : 
     825         [ #  # ]:           0 :         if (!skip_tuple)
     826                 :             :         {
     827                 :           0 :                 List       *recheckIndexes = NIL;
     828                 :           0 :                 List       *conflictindexes;
     829                 :           0 :                 bool            conflict = false;
     830                 :             : 
     831                 :             :                 /* Compute stored generated columns */
     832   [ #  #  #  # ]:           0 :                 if (rel->rd_att->constr &&
     833                 :           0 :                         rel->rd_att->constr->has_generated_stored)
     834                 :           0 :                         ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     835                 :             :                                                                            CMD_INSERT);
     836                 :             : 
     837                 :             :                 /* Check the constraints of the tuple */
     838         [ #  # ]:           0 :                 if (rel->rd_att->constr)
     839                 :           0 :                         ExecConstraints(resultRelInfo, slot, estate);
     840         [ #  # ]:           0 :                 if (rel->rd_rel->relispartition)
     841                 :           0 :                         ExecPartitionCheck(resultRelInfo, slot, estate, true);
     842                 :             : 
     843                 :             :                 /* OK, store the tuple and create index entries for it */
     844                 :           0 :                 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     845                 :             : 
     846                 :           0 :                 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     847                 :             : 
     848         [ #  # ]:           0 :                 if (resultRelInfo->ri_NumIndices > 0)
     849                 :           0 :                         recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     850                 :           0 :                                                                                                    slot, estate, false,
     851                 :           0 :                                                                                                    conflictindexes ? true : false,
     852                 :             :                                                                                                    &conflict,
     853                 :           0 :                                                                                                    conflictindexes, false);
     854                 :             : 
     855                 :             :                 /*
     856                 :             :                  * Checks the conflict indexes to fetch the conflicting local row and
     857                 :             :                  * reports the conflict. We perform this check here, instead of
     858                 :             :                  * performing an additional index scan before the actual insertion and
     859                 :             :                  * reporting the conflict if any conflicting rows are found. This is
     860                 :             :                  * to avoid the overhead of executing the extra scan for each INSERT
     861                 :             :                  * operation, even when no conflict arises, which could introduce
     862                 :             :                  * significant overhead to replication, particularly in cases where
     863                 :             :                  * conflicts are rare.
     864                 :             :                  *
     865                 :             :                  * XXX OTOH, this could lead to clean-up effort for dead tuples added
     866                 :             :                  * in heap and index in case of conflicts. But as conflicts shouldn't
     867                 :             :                  * be a frequent thing so we preferred to save the performance
     868                 :             :                  * overhead of extra scan before each insertion.
     869                 :             :                  */
     870         [ #  # ]:           0 :                 if (conflict)
     871                 :           0 :                         CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     872                 :           0 :                                                                    recheckIndexes, NULL, slot);
     873                 :             : 
     874                 :             :                 /* AFTER ROW INSERT Triggers */
     875                 :           0 :                 ExecARInsertTriggers(estate, resultRelInfo, slot,
     876                 :           0 :                                                          recheckIndexes, NULL);
     877                 :             : 
     878                 :             :                 /*
     879                 :             :                  * XXX we should in theory pass a TransitionCaptureState object to the
     880                 :             :                  * above to capture transition tuples, but after statement triggers
     881                 :             :                  * don't actually get fired by replication yet anyway
     882                 :             :                  */
     883                 :             : 
     884                 :           0 :                 list_free(recheckIndexes);
     885                 :           0 :         }
     886                 :           0 : }
     887                 :             : 
     888                 :             : /*
     889                 :             :  * Find the searchslot tuple and update it with data in the slot,
     890                 :             :  * update the indexes, and execute any constraints and per-row triggers.
     891                 :             :  *
     892                 :             :  * Caller is responsible for opening the indexes.
     893                 :             :  */
     894                 :             : void
     895                 :           0 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     896                 :             :                                                  EState *estate, EPQState *epqstate,
     897                 :             :                                                  TupleTableSlot *searchslot, TupleTableSlot *slot)
     898                 :             : {
     899                 :           0 :         bool            skip_tuple = false;
     900                 :           0 :         Relation        rel = resultRelInfo->ri_RelationDesc;
     901                 :           0 :         ItemPointer tid = &(searchslot->tts_tid);
     902                 :             : 
     903                 :             :         /*
     904                 :             :          * We support only non-system tables, with
     905                 :             :          * check_publication_add_relation() accountable.
     906                 :             :          */
     907         [ #  # ]:           0 :         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     908         [ #  # ]:           0 :         Assert(!IsCatalogRelation(rel));
     909                 :             : 
     910                 :           0 :         CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     911                 :             : 
     912                 :             :         /* BEFORE ROW UPDATE Triggers */
     913   [ #  #  #  # ]:           0 :         if (resultRelInfo->ri_TrigDesc &&
     914                 :           0 :                 resultRelInfo->ri_TrigDesc->trig_update_before_row)
     915                 :             :         {
     916   [ #  #  #  # ]:           0 :                 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     917                 :           0 :                                                                   tid, NULL, slot, NULL, NULL, false))
     918                 :           0 :                         skip_tuple = true;      /* "do nothing" */
     919                 :           0 :         }
     920                 :             : 
     921         [ #  # ]:           0 :         if (!skip_tuple)
     922                 :             :         {
     923                 :           0 :                 List       *recheckIndexes = NIL;
     924                 :           0 :                 TU_UpdateIndexes update_indexes;
     925                 :           0 :                 List       *conflictindexes;
     926                 :           0 :                 bool            conflict = false;
     927                 :             : 
     928                 :             :                 /* Compute stored generated columns */
     929   [ #  #  #  # ]:           0 :                 if (rel->rd_att->constr &&
     930                 :           0 :                         rel->rd_att->constr->has_generated_stored)
     931                 :           0 :                         ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     932                 :             :                                                                            CMD_UPDATE);
     933                 :             : 
     934                 :             :                 /* Check the constraints of the tuple */
     935         [ #  # ]:           0 :                 if (rel->rd_att->constr)
     936                 :           0 :                         ExecConstraints(resultRelInfo, slot, estate);
     937         [ #  # ]:           0 :                 if (rel->rd_rel->relispartition)
     938                 :           0 :                         ExecPartitionCheck(resultRelInfo, slot, estate, true);
     939                 :             : 
     940                 :           0 :                 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     941                 :             :                                                                   &update_indexes);
     942                 :             : 
     943                 :           0 :                 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     944                 :             : 
     945   [ #  #  #  # ]:           0 :                 if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     946                 :           0 :                         recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     947                 :           0 :                                                                                                    slot, estate, true,
     948                 :           0 :                                                                                                    conflictindexes ? true : false,
     949                 :           0 :                                                                                                    &conflict, conflictindexes,
     950                 :           0 :                                                                                                    (update_indexes == TU_Summarizing));
     951                 :             : 
     952                 :             :                 /*
     953                 :             :                  * Refer to the comments above the call to CheckAndReportConflict() in
     954                 :             :                  * ExecSimpleRelationInsert to understand why this check is done at
     955                 :             :                  * this point.
     956                 :             :                  */
     957         [ #  # ]:           0 :                 if (conflict)
     958                 :           0 :                         CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     959                 :           0 :                                                                    recheckIndexes, searchslot, slot);
     960                 :             : 
     961                 :             :                 /* AFTER ROW UPDATE Triggers */
     962                 :           0 :                 ExecARUpdateTriggers(estate, resultRelInfo,
     963                 :             :                                                          NULL, NULL,
     964                 :           0 :                                                          tid, NULL, slot,
     965                 :           0 :                                                          recheckIndexes, NULL, false);
     966                 :             : 
     967                 :           0 :                 list_free(recheckIndexes);
     968                 :           0 :         }
     969                 :           0 : }
     970                 :             : 
     971                 :             : /*
     972                 :             :  * Find the searchslot tuple and delete it, and execute any constraints
     973                 :             :  * and per-row triggers.
     974                 :             :  *
     975                 :             :  * Caller is responsible for opening the indexes.
     976                 :             :  */
     977                 :             : void
     978                 :           0 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     979                 :             :                                                  EState *estate, EPQState *epqstate,
     980                 :             :                                                  TupleTableSlot *searchslot)
     981                 :             : {
     982                 :           0 :         bool            skip_tuple = false;
     983                 :           0 :         Relation        rel = resultRelInfo->ri_RelationDesc;
     984                 :           0 :         ItemPointer tid = &searchslot->tts_tid;
     985                 :             : 
     986                 :           0 :         CheckCmdReplicaIdentity(rel, CMD_DELETE);
     987                 :             : 
     988                 :             :         /* BEFORE ROW DELETE Triggers */
     989   [ #  #  #  # ]:           0 :         if (resultRelInfo->ri_TrigDesc &&
     990                 :           0 :                 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     991                 :             :         {
     992                 :           0 :                 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     993                 :           0 :                                                                                    tid, NULL, NULL, NULL, NULL, false);
     994                 :           0 :         }
     995                 :             : 
     996         [ #  # ]:           0 :         if (!skip_tuple)
     997                 :             :         {
     998                 :             :                 /* OK, delete the tuple */
     999                 :           0 :                 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
    1000                 :             : 
    1001                 :             :                 /* AFTER ROW DELETE Triggers */
    1002                 :           0 :                 ExecARDeleteTriggers(estate, resultRelInfo,
    1003                 :           0 :                                                          tid, NULL, NULL, false);
    1004                 :           0 :         }
    1005                 :           0 : }
    1006                 :             : 
    1007                 :             : /*
    1008                 :             :  * Check if command can be executed with current replica identity.
    1009                 :             :  */
    1010                 :             : void
    1011                 :       11958 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
    1012                 :             : {
    1013                 :       11958 :         PublicationDesc pubdesc;
    1014                 :             : 
    1015                 :             :         /*
    1016                 :             :          * Skip checking the replica identity for partitioned tables, because the
    1017                 :             :          * operations are actually performed on the leaf partitions.
    1018                 :             :          */
    1019         [ +  + ]:       11958 :         if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1020                 :         572 :                 return;
    1021                 :             : 
    1022                 :             :         /* We only need to do checks for UPDATE and DELETE. */
    1023   [ +  +  +  + ]:       11386 :         if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
    1024                 :        7643 :                 return;
    1025                 :             : 
    1026                 :             :         /*
    1027                 :             :          * It is only safe to execute UPDATE/DELETE if the relation does not
    1028                 :             :          * publish UPDATEs or DELETEs, or all the following conditions are
    1029                 :             :          * satisfied:
    1030                 :             :          *
    1031                 :             :          * 1. All columns, referenced in the row filters from publications which
    1032                 :             :          * the relation is in, are valid - i.e. when all referenced columns are
    1033                 :             :          * part of REPLICA IDENTITY.
    1034                 :             :          *
    1035                 :             :          * 2. All columns, referenced in the column lists are valid - i.e. when
    1036                 :             :          * all columns referenced in the REPLICA IDENTITY are covered by the
    1037                 :             :          * column list.
    1038                 :             :          *
    1039                 :             :          * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
    1040                 :             :          * - i.e. when all these generated columns are published.
    1041                 :             :          *
    1042                 :             :          * XXX We could optimize it by first checking whether any of the
    1043                 :             :          * publications have a row filter or column list for this relation, or if
    1044                 :             :          * the relation contains a generated column. If none of these exist and
    1045                 :             :          * the relation has replica identity then we can avoid building the
    1046                 :             :          * descriptor but as this happens only one time it doesn't seem worth the
    1047                 :             :          * additional complexity.
    1048                 :             :          */
    1049                 :        3743 :         RelationBuildPublicationDesc(rel, &pubdesc);
    1050   [ +  +  +  + ]:        3743 :         if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
    1051   [ +  -  +  - ]:          10 :                 ereport(ERROR,
    1052                 :             :                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1053                 :             :                                  errmsg("cannot update table \"%s\"",
    1054                 :             :                                                 RelationGetRelationName(rel)),
    1055                 :             :                                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1056   [ +  +  +  + ]:        3733 :         else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
    1057   [ +  -  +  - ]:          18 :                 ereport(ERROR,
    1058                 :             :                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1059                 :             :                                  errmsg("cannot update table \"%s\"",
    1060                 :             :                                                 RelationGetRelationName(rel)),
    1061                 :             :                                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1062   [ +  +  +  + ]:        3715 :         else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
    1063   [ +  -  +  - ]:           4 :                 ereport(ERROR,
    1064                 :             :                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1065                 :             :                                  errmsg("cannot update table \"%s\"",
    1066                 :             :                                                 RelationGetRelationName(rel)),
    1067                 :             :                                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1068   [ +  +  +  - ]:        3711 :         else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
    1069   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1070                 :             :                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1071                 :             :                                  errmsg("cannot delete from table \"%s\"",
    1072                 :             :                                                 RelationGetRelationName(rel)),
    1073                 :             :                                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1074   [ +  +  +  - ]:        3711 :         else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
    1075   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1076                 :             :                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1077                 :             :                                  errmsg("cannot delete from table \"%s\"",
    1078                 :             :                                                 RelationGetRelationName(rel)),
    1079                 :             :                                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1080   [ +  +  +  - ]:        3711 :         else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
    1081   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1082                 :             :                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1083                 :             :                                  errmsg("cannot delete from table \"%s\"",
    1084                 :             :                                                 RelationGetRelationName(rel)),
    1085                 :             :                                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1086                 :             : 
    1087                 :             :         /* If relation has replica identity we are always good. */
    1088         [ +  + ]:        3711 :         if (OidIsValid(RelationGetReplicaIndex(rel)))
    1089                 :         659 :                 return;
    1090                 :             : 
    1091                 :             :         /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
    1092         [ +  + ]:        3052 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
    1093                 :           5 :                 return;
    1094                 :             : 
    1095                 :             :         /*
    1096                 :             :          * This is UPDATE/DELETE and there is no replica identity.
    1097                 :             :          *
    1098                 :             :          * Check if the table publishes UPDATES or DELETES.
    1099                 :             :          */
    1100   [ +  +  +  + ]:        3047 :         if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
    1101   [ +  -  +  - ]:          19 :                 ereport(ERROR,
    1102                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1103                 :             :                                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
    1104                 :             :                                                 RelationGetRelationName(rel)),
    1105                 :             :                                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1106   [ +  +  +  + ]:        3028 :         else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
    1107   [ +  -  +  - ]:           1 :                 ereport(ERROR,
    1108                 :             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1109                 :             :                                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
    1110                 :             :                                                 RelationGetRelationName(rel)),
    1111                 :             :                                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1112         [ -  + ]:       11906 : }
    1113                 :             : 
    1114                 :             : 
    1115                 :             : /*
    1116                 :             :  * Check if we support writing into specific relkind of local relation and check
    1117                 :             :  * if it aligns with the relkind of the relation on the publisher.
    1118                 :             :  *
    1119                 :             :  * The nspname and relname are only needed for error reporting.
    1120                 :             :  */
    1121                 :             : void
    1122                 :           0 : CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
    1123                 :             :                                                  const char *nspname, const char *relname)
    1124                 :             : {
    1125         [ #  # ]:           0 :         if (localrelkind != RELKIND_RELATION &&
    1126   [ #  #  #  # ]:           0 :                 localrelkind != RELKIND_PARTITIONED_TABLE &&
    1127                 :           0 :                 localrelkind != RELKIND_SEQUENCE)
    1128   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1129                 :             :                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1130                 :             :                                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
    1131                 :             :                                                 nspname, relname),
    1132                 :             :                                  errdetail_relkind_not_supported(localrelkind)));
    1133                 :             : 
    1134                 :             :         /*
    1135                 :             :          * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
    1136                 :             :          * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
    1137                 :             :          * exactly on both publisher and subscriber.
    1138                 :             :          */
    1139   [ #  #  #  # ]:           0 :         if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
    1140         [ #  # ]:           0 :                 (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
    1141   [ #  #  #  # ]:           0 :                 ereport(ERROR,
    1142                 :             :                                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1143                 :             :                 /* translator: 3rd and 4th %s are "sequence" or "table" */
    1144                 :             :                                 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
    1145                 :             :                                            nspname, relname,
    1146                 :             :                                            remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1147                 :             :                                            localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
    1148                 :           0 : }
        

Generated by: LCOV version 2.3.2-1