For inplace update durability, make heap_update() callers wait.
authorNoah Misch
Tue, 24 Sep 2024 22:25:18 +0000 (15:25 -0700)
committerNoah Misch
Tue, 24 Sep 2024 22:25:18 +0000 (15:25 -0700)
The previous commit fixed some ways of losing an inplace update.  It
remained possible to lose one when a backend working toward a
heap_update() copied a tuple into memory just before inplace update of
that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
to govern admission to the steps of copying an old tuple, modifying it,
and issuing heap_update().  This includes MERGE commands.  To avoid
changing most of the pg_class DDL, don't require LOCKTAG_TUPLE when
holding a relation lock sufficient to exclude inplace updaters.
Back-patch to v12 (all supported versions).  In v13 and v12, "UPDATE
pg_class" or "UPDATE pg_database" can still lose an inplace update.  The
v14+ UPDATE fix needs commit 86dc90056dfdbd9d1b891718d2e5614e3e432f35,
and it wasn't worth reimplementing that fix without such infrastructure.

Reviewed by Nitin Motiani and (in earlier versions) Heikki Linnakangas.

Discussion: https://postgr.es/m/20231027214946[email protected]

20 files changed:
src/backend/access/heap/README.tuplock
src/backend/access/heap/heapam.c
src/backend/access/index/genam.c
src/backend/catalog/aclchk.c
src/backend/catalog/catalog.c
src/backend/commands/dbcommands.c
src/backend/commands/event_trigger.c
src/backend/commands/indexcmds.c
src/backend/commands/tablecmds.c
src/backend/executor/execMain.c
src/backend/executor/execReplication.c
src/backend/executor/nodeModifyTable.c
src/backend/utils/cache/relcache.c
src/backend/utils/cache/syscache.c
src/include/nodes/execnodes.h
src/include/storage/lockdefs.h
src/include/utils/syscache.h
src/test/isolation/expected/intra-grant-inplace.out
src/test/isolation/specs/eval-plan-qual.spec
src/test/isolation/specs/intra-grant-inplace.spec

index ddb2defd28bb2c79e620d3231d23218d92c0a468..818cd7f980601926ae98293dbcfbb6dd99a14172 100644 (file)
@@ -154,6 +154,48 @@ The following infomask bits are applicable:
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
 
+Locking to write inplace-updated tables
+---------------------------------------
+
+If IsInplaceUpdateRelation() returns true for a table, the table is a system
+catalog that receives systable_inplace_update_begin() calls.  Preparing a
+heap_update() of these tables follows additional locking rules, to ensure we
+don't lose the effects of an inplace update.  In particular, consider a moment
+when a backend has fetched the old tuple to modify, not yet having called
+heap_update().  Another backend's inplace update starting then can't conclude
+until the heap_update() places its new tuple in a buffer.  We enforce that
+using locktags as follows.  While DDL code is the main audience, the executor
+follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
+are per-catalog:
+
+  pg_class systable_inplace_update_begin() callers: before the call, acquire a
+  lock on the relation in mode ShareUpdateExclusiveLock or stricter.  If the
+  update targets a row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX),
+  that lock must be on the table.  Locking the index rel is not necessary.
+  (This allows VACUUM to overwrite per-index pg_class while holding a lock on
+  the table alone.) systable_inplace_update_begin() acquires and releases
+  LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for ExclusiveLock, on each
+  tuple it overwrites.
+
+  pg_class heap_update() callers: before copying the tuple to modify, take a
+  lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a
+  ShareRowExclusiveLock or stricter on the relation.
+
+  SearchSysCacheLocked1() is one convenient way to acquire the tuple lock.
+  Most heap_update() callers already hold a suitable lock on the relation for
+  other reasons and can skip the tuple lock.  If you do acquire the tuple
+  lock, release it immediately after the update.
+
+
+  pg_database: before copying the tuple to modify, all updaters of pg_database
+  rows acquire LOCKTAG_TUPLE.  (Few updaters acquire LOCKTAG_OBJECT on the
+  database OID, so it wasn't worth extending that as a second option.)
+
+Ideally, DDL might want to perform permissions checks before LockTuple(), as
+we do with RangeVarGetRelidExtended() callbacks.  We typically don't bother.
+LOCKTAG_TUPLE acquirers release it after each row, so the potential
+inconvenience is lower.
+
 Reading inplace-updated columns
 -------------------------------
 
index ecc6eaa27c7185e40f8f6e03d7ad94030a0c8e53..da5e656a08de40ccbfdb68d783995a273b73b5fa 100644 (file)
@@ -40,6 +40,8 @@
 #include "access/valid.h"
 #include "access/visibilitymap.h"
 #include "access/xloginsert.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
@@ -57,6 +59,12 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
                                  Buffer newbuf, HeapTuple oldtup,
                                  HeapTuple newtup, HeapTuple old_key_tuple,
                                  bool all_visible_cleared, bool new_all_visible_cleared);
+#ifdef USE_ASSERT_CHECKING
+static void check_lock_if_inplace_updateable_rel(Relation relation,
+                                                ItemPointer otid,
+                                                HeapTuple newtup);
+static void check_inplace_rel_lock(HeapTuple oldtup);
+#endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                           Bitmapset *interesting_cols,
                                           Bitmapset *external_cols,
@@ -103,6 +111,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool ke
  * heavyweight lock mode and MultiXactStatus values to use for any particular
  * tuple lock strength.
  *
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
+ *
  * Don't look at lockstatus/updstatus directly!  Use get_mxact_status_for_lock
  * instead.
  */
@@ -3189,6 +3199,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                 errmsg("cannot update tuples during a parallel operation")));
 
+#ifdef USE_ASSERT_CHECKING
+   check_lock_if_inplace_updateable_rel(relation, otid, newtup);
+#endif
+
    /*
     * Fetch the list of attributes to be checked for various operations.
     *
@@ -4053,6 +4067,128 @@ l2:
    return TM_Ok;
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * Confirm adequate lock held during heap_update(), per rules from
+ * README.tuplock section "Locking to write inplace-updated tables".
+ */
+static void
+check_lock_if_inplace_updateable_rel(Relation relation,
+                                    ItemPointer otid,
+                                    HeapTuple newtup)
+{
+   /* LOCKTAG_TUPLE acceptable for any catalog */
+   switch (RelationGetRelid(relation))
+   {
+       case RelationRelationId:
+       case DatabaseRelationId:
+           {
+               LOCKTAG     tuptag;
+
+               SET_LOCKTAG_TUPLE(tuptag,
+                                 relation->rd_lockInfo.lockRelId.dbId,
+                                 relation->rd_lockInfo.lockRelId.relId,
+                                 ItemPointerGetBlockNumber(otid),
+                                 ItemPointerGetOffsetNumber(otid));
+               if (LockHeldByMe(&tuptag, InplaceUpdateTupleLock, false))
+                   return;
+           }
+           break;
+       default:
+           Assert(!IsInplaceUpdateRelation(relation));
+           return;
+   }
+
+   switch (RelationGetRelid(relation))
+   {
+       case RelationRelationId:
+           {
+               /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
+               Form_pg_class classForm = (Form_pg_class) GETSTRUCT(newtup);
+               Oid         relid = classForm->oid;
+               Oid         dbid;
+               LOCKTAG     tag;
+
+               if (IsSharedRelation(relid))
+                   dbid = InvalidOid;
+               else
+                   dbid = MyDatabaseId;
+
+               if (classForm->relkind == RELKIND_INDEX)
+               {
+                   Relation    irel = index_open(relid, AccessShareLock);
+
+                   SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+                   index_close(irel, AccessShareLock);
+               }
+               else
+                   SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+               if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, false) &&
+                   !LockHeldByMe(&tag, ShareRowExclusiveLock, true))
+                   elog(WARNING,
+                        "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                        NameStr(classForm->relname),
+                        relid,
+                        classForm->relkind,
+                        ItemPointerGetBlockNumber(otid),
+                        ItemPointerGetOffsetNumber(otid));
+           }
+           break;
+       case DatabaseRelationId:
+           {
+               /* LOCKTAG_TUPLE required */
+               Form_pg_database dbForm = (Form_pg_database) GETSTRUCT(newtup);
+
+               elog(WARNING,
+                    "missing lock on database \"%s\" (OID %u) @ TID (%u,%u)",
+                    NameStr(dbForm->datname),
+                    dbForm->oid,
+                    ItemPointerGetBlockNumber(otid),
+                    ItemPointerGetOffsetNumber(otid));
+           }
+           break;
+   }
+}
+
+/*
+ * Confirm adequate relation lock held, per rules from README.tuplock section
+ * "Locking to write inplace-updated tables".
+ */
+static void
+check_inplace_rel_lock(HeapTuple oldtup)
+{
+   Form_pg_class classForm = (Form_pg_class) GETSTRUCT(oldtup);
+   Oid         relid = classForm->oid;
+   Oid         dbid;
+   LOCKTAG     tag;
+
+   if (IsSharedRelation(relid))
+       dbid = InvalidOid;
+   else
+       dbid = MyDatabaseId;
+
+   if (classForm->relkind == RELKIND_INDEX)
+   {
+       Relation    irel = index_open(relid, AccessShareLock);
+
+       SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+       index_close(irel, AccessShareLock);
+   }
+   else
+       SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+   if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, true))
+       elog(WARNING,
+            "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+            NameStr(classForm->relname),
+            relid,
+            classForm->relkind,
+            ItemPointerGetBlockNumber(&oldtup->t_self),
+            ItemPointerGetOffsetNumber(&oldtup->t_self));
+}
+#endif
+
 /*
  * Check if the specified attribute's values are the same.  Subroutine for
  * HeapDetermineColumnsInfo.
@@ -6070,15 +6206,21 @@ heap_inplace_lock(Relation relation,
    TM_Result   result;
    bool        ret;
 
+#ifdef USE_ASSERT_CHECKING
+   if (RelationGetRelid(relation) == RelationRelationId)
+       check_inplace_rel_lock(oldtup_ptr);
+#endif
+
    Assert(BufferIsValid(buffer));
 
+   LockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
    LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
    /*----------
     * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
     *
     * - wait unconditionally
-    * - no tuple locks
+    * - already locked tuple above, since inplace needs that unconditionally
     * - don't recheck header after wait: simpler to defer to next iteration
     * - don't try to continue even if the updater aborts: likewise
     * - no crosscheck
@@ -6162,7 +6304,10 @@ heap_inplace_lock(Relation relation,
     * don't bother optimizing that.
     */
    if (!ret)
+   {
+       UnlockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
        InvalidateCatalogSnapshot();
+   }
    return ret;
 }
 
@@ -6171,6 +6316,8 @@ heap_inplace_lock(Relation relation,
  *
  * The tuple cannot change size, and therefore its header fields and null
  * bitmap (if any) don't change either.
+ *
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
  */
 void
 heap_inplace_update_and_unlock(Relation relation,
@@ -6254,6 +6401,7 @@ heap_inplace_unlock(Relation relation,
                    HeapTuple oldtup, Buffer buffer)
 {
    LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+   UnlockTuple(relation, &oldtup->t_self, InplaceUpdateTupleLock);
 }
 
 #define        FRM_NOOP                0x0001
index f3f6d4754e9279588ce0da88f236f432c5763cf0..52fde5cc4d4ce6dc4edd0e3b1b2a4cba06b66534 100644 (file)
@@ -765,7 +765,9 @@ systable_endscan_ordered(SysScanDesc sysscan)
  *
  * Overwriting violates both MVCC and transactional safety, so the uses of
  * this function in Postgres are extremely limited.  Nonetheless we find some
- * places to use it.  Standard flow:
+ * places to use it.  See README.tuplock section "Locking to write
+ * inplace-updated tables" and later sections for expectations of readers and
+ * writers of a table that gets inplace updates.  Standard flow:
  *
  * ... [any slow preparation not requiring oldtup] ...
  * systable_inplace_update_begin([...], &tup, &inplace_state);
index d2abc48fd8d1f586fa92d6c1e76e0355c78a1a19..819045203dd1b0cd752ed9078280e7b8df591248 100644 (file)
@@ -75,6 +75,7 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/aclchk_internal.h"
 #include "utils/builtins.h"
@@ -1848,7 +1849,7 @@ ExecGrant_Relation(InternalGrant *istmt)
        HeapTuple   tuple;
        ListCell   *cell_colprivs;
 
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relOid));
        if (!HeapTupleIsValid(tuple))
            elog(ERROR, "cache lookup failed for relation %u", relOid);
        pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
@@ -2060,6 +2061,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                                         values, nulls, replaces);
 
            CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+           UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
            /* Update initial privileges for extensions */
            recordExtensionInitPriv(relOid, RelationRelationId, 0, new_acl);
@@ -2072,6 +2074,8 @@ ExecGrant_Relation(InternalGrant *istmt)
 
            pfree(new_acl);
        }
+       else
+           UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
        /*
         * Handle column-level privileges, if any were specified or implied.
@@ -2185,7 +2189,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, AclMode default_privs,
        Oid        *oldmembers;
        Oid        *newmembers;
 
-       tuple = SearchSysCache1(cacheid, ObjectIdGetDatum(objectid));
+       tuple = SearchSysCacheLocked1(cacheid, ObjectIdGetDatum(objectid));
        if (!HeapTupleIsValid(tuple))
            elog(ERROR, "cache lookup failed for %s %u", get_object_class_descr(classid), objectid);
 
@@ -2261,6 +2265,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, AclMode default_privs,
                                     nulls, replaces);
 
        CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+       UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
        /* Update initial privileges for extensions */
        recordExtensionInitPriv(objectid, classid, 0, new_acl);
index d6b07a786550897107ca638fff889203d7428923..cfe8c5104b6535ce882b709cdd93a3735c743729 100644 (file)
@@ -138,6 +138,15 @@ IsCatalogRelationOid(Oid relid)
 /*
  * IsInplaceUpdateRelation
  *     True iff core code performs inplace updates on the relation.
+ *
+ *     This is used for assertions and for making the executor follow the
+ *     locking protocol described at README.tuplock section "Locking to write
+ *     inplace-updated tables".  Extensions may inplace-update other heap
+ *     tables, but concurrent SQL UPDATE on the same table may overwrite
+ *     those modifications.
+ *
+ *     The executor can assume these are not partitions or partitioned and
+ *     have no triggers.
  */
 bool
 IsInplaceUpdateRelation(Relation relation)
index 40bfd09333764b0e4b77ca5259d2d868c0446b2f..aa91a396967c8a80c807023bf6d097aa68fe5d2f 100644 (file)
@@ -1877,6 +1877,7 @@ RenameDatabase(const char *oldname, const char *newname)
 {
    Oid         db_id;
    HeapTuple   newtup;
+   ItemPointerData otid;
    Relation    rel;
    int         notherbackends;
    int         npreparedxacts;
@@ -1948,11 +1949,13 @@ RenameDatabase(const char *oldname, const char *newname)
                 errdetail_busy_db(notherbackends, npreparedxacts)));
 
    /* rename */
-   newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+   newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
    if (!HeapTupleIsValid(newtup))
        elog(ERROR, "cache lookup failed for database %u", db_id);
+   otid = newtup->t_self;
    namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-   CatalogTupleUpdate(rel, &newtup->t_self, newtup);
+   CatalogTupleUpdate(rel, &otid, newtup);
+   UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2201,6 +2204,7 @@ movedb(const char *dbname, const char *tblspcname)
            ereport(ERROR,
                    (errcode(ERRCODE_UNDEFINED_DATABASE),
                     errmsg("database \"%s\" does not exist", dbname)));
+       LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
        new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
        new_record_repl[Anum_pg_database_dattablespace - 1] = true;
@@ -2209,6 +2213,7 @@ movedb(const char *dbname, const char *tblspcname)
                                     new_record,
                                     new_record_nulls, new_record_repl);
        CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
+       UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2439,6 +2444,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
        ereport(ERROR,
                (errcode(ERRCODE_UNDEFINED_DATABASE),
                 errmsg("database \"%s\" does not exist", stmt->dbname)));
+   LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    datform = (Form_pg_database) GETSTRUCT(tuple);
    dboid = datform->oid;
@@ -2488,6 +2494,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
                                 new_record_nulls, new_record_repl);
    CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+   UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
 
@@ -2537,6 +2544,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
    if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
        aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                       stmt->dbname);
+   LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
    oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2565,6 +2573,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
        bool        nulls[Natts_pg_database] = {0};
        bool        replaces[Natts_pg_database] = {0};
        Datum       values[Natts_pg_database] = {0};
+       HeapTuple   newtuple;
 
        ereport(NOTICE,
                (errmsg("changing version from %s to %s",
@@ -2573,14 +2582,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
        values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
        replaces[Anum_pg_database_datcollversion - 1] = true;
 
-       tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
-                                 values, nulls, replaces);
-       CatalogTupleUpdate(rel, &tuple->t_self, tuple);
-       heap_freetuple(tuple);
+       newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                    values, nulls, replaces);
+       CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+       heap_freetuple(newtuple);
    }
    else
        ereport(NOTICE,
                (errmsg("version has not changed")));
+   UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2692,6 +2702,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("permission denied to change owner of database")));
 
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
+
        repl_repl[Anum_pg_database_datdba - 1] = true;
        repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
 
@@ -2713,6 +2725,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
        CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        heap_freetuple(newtuple);
 
index 55baf10c8566eed15c1a0e8fef10554675083998..05a6de68ba3b036c7d8559c4516a042a8209a35b 100644 (file)
@@ -388,6 +388,7 @@ SetDatabaseHasLoginEventTriggers(void)
    /* Set dathasloginevt flag in pg_database */
    Form_pg_database db;
    Relation    pg_db = table_open(DatabaseRelationId, RowExclusiveLock);
+   ItemPointerData otid;
    HeapTuple   tuple;
 
    /*
@@ -399,16 +400,18 @@ SetDatabaseHasLoginEventTriggers(void)
     */
    LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, AccessExclusiveLock);
 
-   tuple = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
+   tuple = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "cache lookup failed for database %u", MyDatabaseId);
+   otid = tuple->t_self;
    db = (Form_pg_database) GETSTRUCT(tuple);
    if (!db->dathasloginevt)
    {
        db->dathasloginevt = true;
-       CatalogTupleUpdate(pg_db, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_db, &otid, tuple);
        CommandCounterIncrement();
    }
+   UnlockTuple(pg_db, &otid, InplaceUpdateTupleLock);
    table_close(pg_db, RowExclusiveLock);
    heap_freetuple(tuple);
 }
index f99c2d2deec91387bd06eeb4bb85ca6c048c981b..ec5f253b28fe32b38d13292ad47aa035c0eda6b0 100644 (file)
@@ -4557,14 +4557,17 @@ update_relispartition(Oid relationId, bool newval)
 {
    HeapTuple   tup;
    Relation    classRel;
+   ItemPointerData otid;
 
    classRel = table_open(RelationRelationId, RowExclusiveLock);
-   tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
+   tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
    if (!HeapTupleIsValid(tup))
        elog(ERROR, "cache lookup failed for relation %u", relationId);
+   otid = tup->t_self;
    Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
    ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
-   CatalogTupleUpdate(classRel, &tup->t_self, tup);
+   CatalogTupleUpdate(classRel, &otid, tup);
+   UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
    heap_freetuple(tup);
    table_close(classRel, RowExclusiveLock);
 }
index d27e6cf3451a49bd907a42976310fbf1439140d8..87232e013760b9c0063a0bb9523f7f10bb12a739 100644 (file)
@@ -3593,6 +3593,7 @@ SetRelationTableSpace(Relation rel,
 {
    Relation    pg_class;
    HeapTuple   tuple;
+   ItemPointerData otid;
    Form_pg_class rd_rel;
    Oid         reloid = RelationGetRelid(rel);
 
@@ -3601,9 +3602,10 @@ SetRelationTableSpace(Relation rel,
    /* Get a modifiable copy of the relation's pg_class row. */
    pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-   tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+   tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "cache lookup failed for relation %u", reloid);
+   otid = tuple->t_self;
    rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
    /* Update the pg_class row. */
@@ -3611,7 +3613,8 @@ SetRelationTableSpace(Relation rel,
        InvalidOid : newTableSpaceId;
    if (RelFileNumberIsValid(newRelFilenumber))
        rd_rel->relfilenode = newRelFilenumber;
-   CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+   CatalogTupleUpdate(pg_class, &otid, tuple);
+   UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
 
    /*
     * Record dependency on tablespace.  This is only required for relations
@@ -4105,6 +4108,7 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
 {
    Relation    targetrelation;
    Relation    relrelation;    /* for RELATION relation */
+   ItemPointerData otid;
    HeapTuple   reltup;
    Form_pg_class relform;
    Oid         namespaceId;
@@ -4127,7 +4131,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
     */
    relrelation = table_open(RelationRelationId, RowExclusiveLock);
 
-   reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
+   reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
+   otid = reltup->t_self;
    if (!HeapTupleIsValid(reltup))  /* shouldn't happen */
        elog(ERROR, "cache lookup failed for relation %u", myrelid);
    relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -4154,7 +4159,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
     */
    namestrcpy(&(relform->relname), newrelname);
 
-   CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
+   CatalogTupleUpdate(relrelation, &otid, reltup);
+   UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
                                 InvalidOid, is_internal);
@@ -15072,7 +15078,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
 
    /* Fetch heap tuple */
    relid = RelationGetRelid(rel);
-   tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+   tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -15176,6 +15182,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
                                 repl_val, repl_null, repl_repl);
 
    CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
+   UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
 
@@ -17329,7 +17336,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
    ObjectAddress thisobj;
    bool        already_done = false;
 
-   classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+   /* no rel lock for relkind=c so use LOCKTAG_TUPLE */
+   classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
    if (!HeapTupleIsValid(classTup))
        elog(ERROR, "cache lookup failed for relation %u", relOid);
    classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -17348,6 +17356,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
    already_done = object_address_present(&thisobj, objsMoved);
    if (!already_done && oldNspOid != newNspOid)
    {
+       ItemPointerData otid = classTup->t_self;
+
        /* check for duplicate name (more friendly than unique-index failure) */
        if (get_relname_relid(NameStr(classForm->relname),
                              newNspOid) != InvalidOid)
@@ -17360,7 +17370,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
        /* classTup is a copy, so OK to scribble on */
        classForm->relnamespace = newNspOid;
 
-       CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
+       CatalogTupleUpdate(classRel, &otid, classTup);
+       UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
+
 
        /* Update dependency on schema if caller said so */
        if (hasDependEntry &&
@@ -17372,6 +17384,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
            elog(ERROR, "could not change schema dependency for relation \"%s\"",
                 NameStr(classForm->relname));
    }
+   else
+       UnlockTuple(classRel, &classTup->t_self, InplaceUpdateTupleLock);
    if (!already_done)
    {
        add_exact_object_address(&thisobj, objsMoved);
index 713cf3e802c10283e4bc7c7045c8a81a2810c0a9..728cdee480ba81f2b7e1ae38c67850c6d92d45df 100644 (file)
@@ -1036,6 +1036,10 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation,
    Relation    resultRel = resultRelInfo->ri_RelationDesc;
    FdwRoutine *fdwroutine;
 
+   /* Expect a fully-formed ResultRelInfo from InitResultRelInfo(). */
+   Assert(resultRelInfo->ri_needLockTagTuple ==
+          IsInplaceUpdateRelation(resultRel));
+
    switch (resultRel->rd_rel->relkind)
    {
        case RELKIND_RELATION:
@@ -1216,6 +1220,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
    resultRelInfo->ri_NumIndices = 0;
    resultRelInfo->ri_IndexRelationDescs = NULL;
    resultRelInfo->ri_IndexRelationInfo = NULL;
+   resultRelInfo->ri_needLockTagTuple =
+       IsInplaceUpdateRelation(resultRelationDesc);
    /* make a copy so as not to depend on relcache info not changing... */
    resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
    if (resultRelInfo->ri_TrigDesc)
index 1086cbc9624766c739589517731a59ea4a91a45d..54025c9f150903cbd6ec7b4b50222079898d4e3f 100644 (file)
@@ -661,8 +661,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
    Relation    rel = resultRelInfo->ri_RelationDesc;
    ItemPointer tid = &(searchslot->tts_tid);
 
-   /* For now we support only tables. */
+   /*
+    * We support only non-system tables, with
+    * check_publication_add_relation() accountable.
+    */
    Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+   Assert(!IsCatalogRelation(rel));
 
    CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 
index 8bf4c80d4a01b99869f951708785959718d1ab94..1161520f76bbd2a7821c4ecb99155c1a61de9b96 100644 (file)
@@ -2324,6 +2324,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
    }
    else
    {
+       ItemPointerData lockedtid;
+
        /*
         * If we generate a new candidate tuple after EvalPlanQual testing, we
         * must loop back here to try again.  (We don't need to redo triggers,
@@ -2332,6 +2334,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
         * to do them again.)
         */
 redo_act:
+       lockedtid = *tupleid;
        result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
                               canSetTag, &updateCxt);
 
@@ -2425,6 +2428,14 @@ redo_act:
                                ExecInitUpdateProjection(context->mtstate,
                                                         resultRelInfo);
 
+                           if (resultRelInfo->ri_needLockTagTuple)
+                           {
+                               UnlockTuple(resultRelationDesc,
+                                           &lockedtid, InplaceUpdateTupleLock);
+                               LockTuple(resultRelationDesc,
+                                         tupleid, InplaceUpdateTupleLock);
+                           }
+
                            /* Fetch the most recent version of old tuple. */
                            oldSlot = resultRelInfo->ri_oldTupleSlot;
                            if (!table_tuple_fetch_row_version(resultRelationDesc,
@@ -2529,6 +2540,14 @@ ExecOnConflictUpdate(ModifyTableContext *context,
    TransactionId xmin;
    bool        isnull;
 
+   /*
+    * Parse analysis should have blocked ON CONFLICT for all system
+    * relations, which includes these.  There's no fundamental obstacle to
+    * supporting this; we'd just need to handle LOCKTAG_TUPLE like the other
+    * ExecUpdate() caller.
+    */
+   Assert(!resultRelInfo->ri_needLockTagTuple);
+
    /* Determine lock mode to use */
    lockmode = ExecUpdateLockMode(context->estate, resultRelInfo);
 
@@ -2854,6 +2873,7 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 {
    ModifyTableState *mtstate = context->mtstate;
    List      **mergeActions = resultRelInfo->ri_MergeActions;
+   ItemPointerData lockedtid;
    List       *actionStates;
    TupleTableSlot *newslot = NULL;
    TupleTableSlot *rslot = NULL;
@@ -2890,14 +2910,32 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
     * target wholerow junk attr.
     */
    Assert(tupleid != NULL || oldtuple != NULL);
+   ItemPointerSetInvalid(&lockedtid);
    if (oldtuple != NULL)
+   {
+       Assert(!resultRelInfo->ri_needLockTagTuple);
        ExecForceStoreHeapTuple(oldtuple, resultRelInfo->ri_oldTupleSlot,
                                false);
-   else if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
-                                           tupleid,
-                                           SnapshotAny,
-                                           resultRelInfo->ri_oldTupleSlot))
-       elog(ERROR, "failed to fetch the target tuple");
+   }
+   else
+   {
+       if (resultRelInfo->ri_needLockTagTuple)
+       {
+           /*
+            * This locks even for CMD_DELETE, for CMD_NOTHING, and for tuples
+            * that don't match mas_whenqual.  MERGE on system catalogs is a
+            * minor use case, so don't bother optimizing those.
+            */
+           LockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                     InplaceUpdateTupleLock);
+           lockedtid = *tupleid;
+       }
+       if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+                                          tupleid,
+                                          SnapshotAny,
+                                          resultRelInfo->ri_oldTupleSlot))
+           elog(ERROR, "failed to fetch the target tuple");
+   }
 
    /*
     * Test the join condition.  If it's satisfied, perform a MATCHED action.
@@ -2969,7 +3007,7 @@ lmerge_matched:
                                        tupleid, NULL, newslot, &result))
                {
                    if (result == TM_Ok)
-                       return NULL;    /* "do nothing" */
+                       goto out;   /* "do nothing" */
 
                    break;      /* concurrent update/delete */
                }
@@ -2980,11 +3018,11 @@ lmerge_matched:
                {
                    if (!ExecIRUpdateTriggers(estate, resultRelInfo,
                                              oldtuple, newslot))
-                       return NULL;    /* "do nothing" */
+                       goto out;   /* "do nothing" */
                }
                else
                {
-                   /* called table_tuple_fetch_row_version() above */
+                   /* checked ri_needLockTagTuple above */
                    Assert(oldtuple == NULL);
 
                    result = ExecUpdateAct(context, resultRelInfo, tupleid,
@@ -3003,7 +3041,8 @@ lmerge_matched:
                    if (updateCxt.crossPartUpdate)
                    {
                        mtstate->mt_merge_updated += 1;
-                       return context->cpUpdateReturningSlot;
+                       rslot = context->cpUpdateReturningSlot;
+                       goto out;
                    }
                }
 
@@ -3021,7 +3060,7 @@ lmerge_matched:
                                        NULL, NULL, &result))
                {
                    if (result == TM_Ok)
-                       return NULL;    /* "do nothing" */
+                       goto out;   /* "do nothing" */
 
                    break;      /* concurrent update/delete */
                }
@@ -3032,11 +3071,11 @@ lmerge_matched:
                {
                    if (!ExecIRDeleteTriggers(estate, resultRelInfo,
                                              oldtuple))
-                       return NULL;    /* "do nothing" */
+                       goto out;   /* "do nothing" */
                }
                else
                {
-                   /* called table_tuple_fetch_row_version() above */
+                   /* checked ri_needLockTagTuple above */
                    Assert(oldtuple == NULL);
 
                    result = ExecDeleteAct(context, resultRelInfo, tupleid,
@@ -3118,7 +3157,7 @@ lmerge_matched:
                 * let caller handle it under NOT MATCHED [BY TARGET] clauses.
                 */
                *matched = false;
-               return NULL;
+               goto out;
 
            case TM_Updated:
                {
@@ -3192,7 +3231,7 @@ lmerge_matched:
                                 * more to do.
                                 */
                                if (TupIsNull(epqslot))
-                                   return NULL;
+                                   goto out;
 
                                /*
                                 * If we got a NULL ctid from the subplan, the
@@ -3210,6 +3249,15 @@ lmerge_matched:
                                 * we need to switch to the NOT MATCHED BY
                                 * SOURCE case.
                                 */
+                               if (resultRelInfo->ri_needLockTagTuple)
+                               {
+                                   if (ItemPointerIsValid(&lockedtid))
+                                       UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                                   InplaceUpdateTupleLock);
+                                   LockTuple(resultRelInfo->ri_RelationDesc, &context->tmfd.ctid,
+                                             InplaceUpdateTupleLock);
+                                   lockedtid = context->tmfd.ctid;
+                               }
                                if (!table_tuple_fetch_row_version(resultRelationDesc,
                                                                   &context->tmfd.ctid,
                                                                   SnapshotAny,
@@ -3238,7 +3286,7 @@ lmerge_matched:
                             * MATCHED [BY TARGET] actions
                             */
                            *matched = false;
-                           return NULL;
+                           goto out;
 
                        case TM_SelfModified:
 
@@ -3266,13 +3314,13 @@ lmerge_matched:
 
                            /* This shouldn't happen */
                            elog(ERROR, "attempted to update or delete invisible tuple");
-                           return NULL;
+                           goto out;
 
                        default:
                            /* see table_tuple_lock call in ExecDelete() */
                            elog(ERROR, "unexpected table_tuple_lock status: %u",
                                 result);
-                           return NULL;
+                           goto out;
                    }
                }
 
@@ -3319,6 +3367,10 @@ lmerge_matched:
    /*
     * Successfully executed an action or no qualifying action was found.
     */
+out:
+   if (ItemPointerIsValid(&lockedtid))
+       UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                   InplaceUpdateTupleLock);
    return rslot;
 }
 
@@ -3770,6 +3822,7 @@ ExecModifyTable(PlanState *pstate)
    HeapTupleData oldtupdata;
    HeapTuple   oldtuple;
    ItemPointer tupleid;
+   bool        tuplock;
 
    CHECK_FOR_INTERRUPTS();
 
@@ -4082,6 +4135,8 @@ ExecModifyTable(PlanState *pstate)
                break;
 
            case CMD_UPDATE:
+               tuplock = false;
+
                /* Initialize projection info if first time for this table */
                if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
                    ExecInitUpdateProjection(node, resultRelInfo);
@@ -4093,6 +4148,7 @@ ExecModifyTable(PlanState *pstate)
                oldSlot = resultRelInfo->ri_oldTupleSlot;
                if (oldtuple != NULL)
                {
+                   Assert(!resultRelInfo->ri_needLockTagTuple);
                    /* Use the wholerow junk attr as the old tuple. */
                    ExecForceStoreHeapTuple(oldtuple, oldSlot, false);
                }
@@ -4101,6 +4157,11 @@ ExecModifyTable(PlanState *pstate)
                    /* Fetch the most recent version of old tuple. */
                    Relation    relation = resultRelInfo->ri_RelationDesc;
 
+                   if (resultRelInfo->ri_needLockTagTuple)
+                   {
+                       LockTuple(relation, tupleid, InplaceUpdateTupleLock);
+                       tuplock = true;
+                   }
                    if (!table_tuple_fetch_row_version(relation, tupleid,
                                                       SnapshotAny,
                                                       oldSlot))
@@ -4112,6 +4173,9 @@ ExecModifyTable(PlanState *pstate)
                /* Now apply the update. */
                slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
                                  slot, node->canSetTag);
+               if (tuplock)
+                   UnlockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                               InplaceUpdateTupleLock);
                break;
 
            case CMD_DELETE:
index 5b6b7b809c02fbea2f658cd256e5992658b7a490..c326f687eb4f5ee65a8e19059dc86ce8c816a54c 100644 (file)
@@ -3768,6 +3768,7 @@ RelationSetNewRelfilenumber(Relation relation, char persistence)
 {
    RelFileNumber newrelfilenumber;
    Relation    pg_class;
+   ItemPointerData otid;
    HeapTuple   tuple;
    Form_pg_class classform;
    MultiXactId minmulti = InvalidMultiXactId;
@@ -3810,11 +3811,12 @@ RelationSetNewRelfilenumber(Relation relation, char persistence)
     */
    pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-   tuple = SearchSysCacheCopy1(RELOID,
-                               ObjectIdGetDatum(RelationGetRelid(relation)));
+   tuple = SearchSysCacheLockedCopy1(RELOID,
+                                     ObjectIdGetDatum(RelationGetRelid(relation)));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "could not find tuple for relation %u",
             RelationGetRelid(relation));
+   otid = tuple->t_self;
    classform = (Form_pg_class) GETSTRUCT(tuple);
 
    /*
@@ -3934,9 +3936,10 @@ RelationSetNewRelfilenumber(Relation relation, char persistence)
        classform->relminmxid = minmulti;
        classform->relpersistence = persistence;
 
-       CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_class, &otid, tuple);
    }
 
+   UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
    heap_freetuple(tuple);
 
    table_close(pg_class, RowExclusiveLock);
index 3e03dfc9910e3d5c316f844638286e4831985f68..50c9440f7923e8a3b473b8fdefe6d2e47f13c385 100644 (file)
 #include "catalog/pg_shseclabel_d.h"
 #include "common/int.h"
 #include "lib/qunique.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
 #include "utils/catcache.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -268,6 +271,98 @@ ReleaseSysCache(HeapTuple tuple)
    ReleaseCatCache(tuple);
 }
 
+/*
+ * SearchSysCacheLocked1
+ *
+ * Combine SearchSysCache1() with acquiring a LOCKTAG_TUPLE at mode
+ * InplaceUpdateTupleLock.  This is a tool for complying with the
+ * README.tuplock section "Locking to write inplace-updated tables".  After
+ * the caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock)
+ * and ReleaseSysCache().
+ *
+ * The returned tuple may be the subject of an uncommitted update, so this
+ * doesn't prevent the "tuple concurrently updated" error.
+ */
+HeapTuple
+SearchSysCacheLocked1(int cacheId,
+                     Datum key1)
+{
+   ItemPointerData tid;
+   LOCKTAG     tag;
+   Oid         dboid =
+       SysCache[cacheId]->cc_relisshared ? InvalidOid : MyDatabaseId;
+   Oid         reloid = cacheinfo[cacheId].reloid;
+
+   /*----------
+    * Since inplace updates may happen just before our LockTuple(), we must
+    * return content acquired after LockTuple() of the TID we return.  If we
+    * just fetched twice instead of looping, the following sequence would
+    * defeat our locking:
+    *
+    * GRANT:   SearchSysCache1() = TID (1,5)
+    * GRANT:   LockTuple(pg_class, (1,5))
+    * [no more inplace update of (1,5) until we release the lock]
+    * CLUSTER: SearchSysCache1() = TID (1,5)
+    * CLUSTER: heap_update() = TID (1,8)
+    * CLUSTER: COMMIT
+    * GRANT:   SearchSysCache1() = TID (1,8)
+    * GRANT:   return (1,8) from SearchSysCacheLocked1()
+    * VACUUM:  SearchSysCache1() = TID (1,8)
+    * VACUUM:  LockTuple(pg_class, (1,8))  # two TIDs now locked for one rel
+    * VACUUM:  inplace update
+    * GRANT:   heap_update() = (1,9)  # lose inplace update
+    *
+    * In the happy case, this takes two fetches, one to determine the TID to
+    * lock and another to get the content and confirm the TID didn't change.
+    *
+    * This is valid even if the row gets updated to a new TID, the old TID
+    * becomes LP_UNUSED, and the row gets updated back to its old TID.  We'd
+    * still hold the right LOCKTAG_TUPLE and a copy of the row captured after
+    * the LOCKTAG_TUPLE.
+    */
+   ItemPointerSetInvalid(&tid);
+   for (;;)
+   {
+       HeapTuple   tuple;
+       LOCKMODE    lockmode = InplaceUpdateTupleLock;
+
+       tuple = SearchSysCache1(cacheId, key1);
+       if (ItemPointerIsValid(&tid))
+       {
+           if (!HeapTupleIsValid(tuple))
+           {
+               LockRelease(&tag, lockmode, false);
+               return tuple;
+           }
+           if (ItemPointerEquals(&tid, &tuple->t_self))
+               return tuple;
+           LockRelease(&tag, lockmode, false);
+       }
+       else if (!HeapTupleIsValid(tuple))
+           return tuple;
+
+       tid = tuple->t_self;
+       ReleaseSysCache(tuple);
+       /* like: LockTuple(rel, &tid, lockmode) */
+       SET_LOCKTAG_TUPLE(tag, dboid, reloid,
+                         ItemPointerGetBlockNumber(&tid),
+                         ItemPointerGetOffsetNumber(&tid));
+       (void) LockAcquire(&tag, lockmode, false, false);
+
+       /*
+        * If an inplace update just finished, ensure we process the syscache
+        * inval.  XXX this is insufficient: the inplace updater may not yet
+        * have reached AtEOXact_Inval().  See test at inplace-inval.spec.
+        *
+        * If a heap_update() call just released its LOCKTAG_TUPLE, we'll
+        * probably find the old tuple and reach "tuple concurrently updated".
+        * If that heap_update() aborts, our LOCKTAG_TUPLE blocks inplace
+        * updates while our caller works.
+        */
+       AcceptInvalidationMessages();
+   }
+}
+
 /*
  * SearchSysCacheCopy
  *
@@ -294,6 +389,28 @@ SearchSysCacheCopy(int cacheId,
    return newtuple;
 }
 
+/*
+ * SearchSysCacheLockedCopy1
+ *
+ * Meld SearchSysCacheLockedCopy1 with SearchSysCacheCopy().  After the
+ * caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock) and
+ * heap_freetuple().
+ */
+HeapTuple
+SearchSysCacheLockedCopy1(int cacheId,
+                         Datum key1)
+{
+   HeapTuple   tuple,
+               newtuple;
+
+   tuple = SearchSysCacheLocked1(cacheId, key1);
+   if (!HeapTupleIsValid(tuple))
+       return tuple;
+   newtuple = heap_copytuple(tuple);
+   ReleaseSysCache(tuple);
+   return newtuple;
+}
+
 /*
  * SearchSysCacheExists
  *
index 88467977f89de09c334b815140a8d28524fc63c6..aab59d681cf5c0fd6908b23b868cd0b018ce2957 100644 (file)
@@ -485,6 +485,9 @@ typedef struct ResultRelInfo
    /* Have the projection and the slots above been initialized? */
    bool        ri_projectNewInfoValid;
 
+   /* updates do LockTuple() before oldtup read; see README.tuplock */
+   bool        ri_needLockTagTuple;
+
    /* triggers to be fired, if any */
    TriggerDesc *ri_TrigDesc;
 
index 934ba84f6a26e21740322801981eddac14cbc09d..810b297edf95b2e7cfdf041694ed51ac97d1a91b 100644 (file)
@@ -47,6 +47,8 @@ typedef int LOCKMODE;
 
 #define MaxLockMode                8   /* highest standard lock mode */
 
+/* See README.tuplock section "Locking to write inplace-updated tables" */
+#define InplaceUpdateTupleLock ExclusiveLock
 
 /* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
index 03a27dd0a83f4134e6638cdc42cf8c7014e1a38b..b541911c8fc27c10cc6bdf7d24645ab554a30881 100644 (file)
@@ -43,9 +43,14 @@ extern HeapTuple SearchSysCache4(int cacheId,
 
 extern void ReleaseSysCache(HeapTuple tuple);
 
+extern HeapTuple SearchSysCacheLocked1(int cacheId,
+                                      Datum key1);
+
 /* convenience routines */
 extern HeapTuple SearchSysCacheCopy(int cacheId,
                                    Datum key1, Datum key2, Datum key3, Datum key4);
+extern HeapTuple SearchSysCacheLockedCopy1(int cacheId,
+                                          Datum key1);
 extern bool SearchSysCacheExists(int cacheId,
                                 Datum key1, Datum key2, Datum key3, Datum key4);
 extern Oid GetSysCacheOid(int cacheId, AttrNumber oidcol,
index fe26984c0e04700c5013ed85fbe4719dbbf1fb6a..b5fe8b06f76184148bac4efeb9736c0d3da0d5ab 100644 (file)
@@ -100,7 +100,7 @@ f
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
 step c2: COMMIT;
 
-starting permutation: b3 sfu3 b1 grant1 read2 as3 addk2 r3 c1 read2
+starting permutation: b3 sfu3 b1 grant1 read2 addk2 r3 c1 read2
 step b3: BEGIN ISOLATION LEVEL READ COMMITTED;
 step sfu3: 
    SELECT relhasindex FROM pg_class
@@ -124,7 +124,6 @@ relhasindex
 f          
 (1 row)
 
-step as3: LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE;
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); 
 step r3: ROLLBACK;
 step grant1: <... completed>
@@ -155,9 +154,11 @@ step b1: BEGIN;
 step grant1: 
    GRANT SELECT ON intra_grant_inplace TO PUBLIC;
  
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
-step c2: COMMIT;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); 
+step addk2: <... completed>
+ERROR:  deadlock detected
 step grant1: <... completed>
+step c2: COMMIT;
 step c1: COMMIT;
 step read2: 
    SELECT relhasindex FROM pg_class
@@ -195,9 +196,8 @@ relhasindex
 f          
 (1 row)
 
-s4: WARNING:  got: tuple concurrently updated
-step revoke4: <... completed>
 step r3: ROLLBACK;
+step revoke4: <... completed>
 
 starting permutation: b1 drop1 b3 sfu3 revoke4 c1 r3
 step b1: BEGIN;
@@ -224,6 +224,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: tuple concurrently deleted
+s4: WARNING:  got: cache lookup failed for relation REDACTED
 step revoke4: <... completed>
 step r3: ROLLBACK;
index 3a74406f4d98a8af6046aa93885f0a142734bf8b..07307e623e47314a5d109608ab3f5f89ae5d661b 100644 (file)
@@ -194,7 +194,7 @@ step simplepartupdate_noroute {
    update parttbl set b = 2 where c = 1 returning *;
 }
 
-# test system class updates
+# test system class LockTuple()
 
 step sys1  {
    UPDATE pg_class SET reltuples = 123 WHERE oid = 'accounts'::regclass;
index d07ed3bb2cc8f9d3b04c14c80364de0f822e3142..2992c85b44ddaf46ba0476a6a2199e3db63c213e 100644 (file)
@@ -14,6 +14,7 @@ teardown
 
 # heap_update()
 session s1
+setup  { SET deadlock_timeout = '100s'; }
 step b1    { BEGIN; }
 step grant1    {
    GRANT SELECT ON intra_grant_inplace TO PUBLIC;
@@ -25,6 +26,7 @@ step c1   { COMMIT; }
 
 # inplace update
 session s2
+setup  { SET deadlock_timeout = '10ms'; }
 step read2 {
    SELECT relhasindex FROM pg_class
    WHERE oid = 'intra_grant_inplace'::regclass;
@@ -48,7 +50,6 @@ step sfu3 {
    SELECT relhasindex FROM pg_class
    WHERE oid = 'intra_grant_inplace'::regclass FOR UPDATE;
 }
-step as3   { LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE; }
 step r3    { ROLLBACK; }
 
 # Additional heap_update()
@@ -74,8 +75,6 @@ step keyshr5  {
 teardown   { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned future LockTuple()
-
 permutation
    b1
    grant1
@@ -118,7 +117,6 @@ permutation
    b1
    grant1(r3)  # acquire LockTuple(), await sfu3 xmax
    read2
-   as3         # XXX temporary until patch adds locking to addk2
    addk2(c1)   # block in LockTuple() behind grant1
    r3          # unblock grant1; addk2 now awaits grant1 xmax
    c1
@@ -128,8 +126,8 @@ permutation
    b2
    sfnku2
    b1
-   grant1(c2)      # acquire LockTuple(), await sfnku2 xmax
-   addk2           # block in LockTuple() behind grant1 = deadlock
+   grant1(addk2)   # acquire LockTuple(), await sfnku2 xmax
+   addk2(*)        # block in LockTuple() behind grant1 = deadlock
    c2
    c1
    read2
@@ -140,7 +138,7 @@ permutation
    grant1
    b3
    sfu3(c1)    # acquire LockTuple(), await grant1 xmax
-   revoke4(sfu3)   # block in LockTuple() behind sfu3
+   revoke4(r3) # block in LockTuple() behind sfu3
    c1
    r3          # revoke4 unlocks old tuple and finds new