For inplace update durability, make heap_update() callers wait.
authorNoah Misch
Tue, 24 Sep 2024 22:25:18 +0000 (15:25 -0700)
committerNoah Misch
Tue, 24 Sep 2024 22:25:23 +0000 (15:25 -0700)
The previous commit fixed some ways of losing an inplace update.  It
remained possible to lose one when a backend working toward a
heap_update() copied a tuple into memory just before inplace update of
that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
to govern admission to the steps of copying an old tuple, modifying it,
and issuing heap_update().  This includes MERGE commands.  To avoid
changing most of the pg_class DDL, don't require LOCKTAG_TUPLE when
holding a relation lock sufficient to exclude inplace updaters.
Back-patch to v12 (all supported versions).  In v13 and v12, "UPDATE
pg_class" or "UPDATE pg_database" can still lose an inplace update.  The
v14+ UPDATE fix needs commit 86dc90056dfdbd9d1b891718d2e5614e3e432f35,
and it wasn't worth reimplementing that fix without such infrastructure.

Reviewed by Nitin Motiani and (in earlier versions) Heikki Linnakangas.

Discussion: https://postgr.es/m/20231027214946[email protected]

19 files changed:
src/backend/access/heap/README.tuplock
src/backend/access/heap/heapam.c
src/backend/access/index/genam.c
src/backend/catalog/aclchk.c
src/backend/catalog/catalog.c
src/backend/commands/dbcommands.c
src/backend/commands/indexcmds.c
src/backend/commands/tablecmds.c
src/backend/executor/execMain.c
src/backend/executor/execReplication.c
src/backend/executor/nodeModifyTable.c
src/backend/utils/cache/relcache.c
src/backend/utils/cache/syscache.c
src/include/nodes/execnodes.h
src/include/storage/lockdefs.h
src/include/utils/syscache.h
src/test/isolation/expected/intra-grant-inplace.out
src/test/isolation/specs/eval-plan-qual.spec
src/test/isolation/specs/intra-grant-inplace.spec

index ddb2defd28bb2c79e620d3231d23218d92c0a468..818cd7f980601926ae98293dbcfbb6dd99a14172 100644 (file)
@@ -154,6 +154,48 @@ The following infomask bits are applicable:
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
 
+Locking to write inplace-updated tables
+---------------------------------------
+
+If IsInplaceUpdateRelation() returns true for a table, the table is a system
+catalog that receives systable_inplace_update_begin() calls.  Preparing a
+heap_update() of these tables follows additional locking rules, to ensure we
+don't lose the effects of an inplace update.  In particular, consider a moment
+when a backend has fetched the old tuple to modify, not yet having called
+heap_update().  Another backend's inplace update starting then can't conclude
+until the heap_update() places its new tuple in a buffer.  We enforce that
+using locktags as follows.  While DDL code is the main audience, the executor
+follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
+are per-catalog:
+
+  pg_class systable_inplace_update_begin() callers: before the call, acquire a
+  lock on the relation in mode ShareUpdateExclusiveLock or stricter.  If the
+  update targets a row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX),
+  that lock must be on the table.  Locking the index rel is not necessary.
+  (This allows VACUUM to overwrite per-index pg_class while holding a lock on
+  the table alone.) systable_inplace_update_begin() acquires and releases
+  LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for ExclusiveLock, on each
+  tuple it overwrites.
+
+  pg_class heap_update() callers: before copying the tuple to modify, take a
+  lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a
+  ShareRowExclusiveLock or stricter on the relation.
+
+  SearchSysCacheLocked1() is one convenient way to acquire the tuple lock.
+  Most heap_update() callers already hold a suitable lock on the relation for
+  other reasons and can skip the tuple lock.  If you do acquire the tuple
+  lock, release it immediately after the update.
+
+
+  pg_database: before copying the tuple to modify, all updaters of pg_database
+  rows acquire LOCKTAG_TUPLE.  (Few updaters acquire LOCKTAG_OBJECT on the
+  database OID, so it wasn't worth extending that as a second option.)
+
+Ideally, DDL might want to perform permissions checks before LockTuple(), as
+we do with RangeVarGetRelidExtended() callbacks.  We typically don't bother.
+LOCKTAG_TUPLE acquirers release it after each row, so the potential
+inconvenience is lower.
+
 Reading inplace-updated columns
 -------------------------------
 
index f7fc0fcf6a31c50a7c3b0441dbd0973bcdd19280..febb38db15b11a065d4c1693e74907f85f4c206c 100644 (file)
@@ -52,6 +52,8 @@
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_database_d.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/atomics.h"
@@ -78,6 +80,12 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
                                  Buffer newbuf, HeapTuple oldtup,
                                  HeapTuple newtup, HeapTuple old_key_tuple,
                                  bool all_visible_cleared, bool new_all_visible_cleared);
+#ifdef USE_ASSERT_CHECKING
+static void check_lock_if_inplace_updateable_rel(Relation relation,
+                                                ItemPointer otid,
+                                                HeapTuple newtup);
+static void check_inplace_rel_lock(HeapTuple oldtup);
+#endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                           Bitmapset *interesting_cols,
                                           Bitmapset *external_cols,
@@ -119,6 +127,8 @@ static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_re
  * heavyweight lock mode and MultiXactStatus values to use for any particular
  * tuple lock strength.
  *
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
+ *
  * Don't look at lockstatus/updstatus directly!  Use get_mxact_status_for_lock
  * instead.
  */
@@ -3187,6 +3197,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                 errmsg("cannot update tuples during a parallel operation")));
 
+#ifdef USE_ASSERT_CHECKING
+   check_lock_if_inplace_updateable_rel(relation, otid, newtup);
+#endif
+
    /*
     * Fetch the list of attributes to be checked for various operations.
     *
@@ -4014,6 +4028,128 @@ l2:
    return TM_Ok;
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * Confirm adequate lock held during heap_update(), per rules from
+ * README.tuplock section "Locking to write inplace-updated tables".
+ */
+static void
+check_lock_if_inplace_updateable_rel(Relation relation,
+                                    ItemPointer otid,
+                                    HeapTuple newtup)
+{
+   /* LOCKTAG_TUPLE acceptable for any catalog */
+   switch (RelationGetRelid(relation))
+   {
+       case RelationRelationId:
+       case DatabaseRelationId:
+           {
+               LOCKTAG     tuptag;
+
+               SET_LOCKTAG_TUPLE(tuptag,
+                                 relation->rd_lockInfo.lockRelId.dbId,
+                                 relation->rd_lockInfo.lockRelId.relId,
+                                 ItemPointerGetBlockNumber(otid),
+                                 ItemPointerGetOffsetNumber(otid));
+               if (LockHeldByMe(&tuptag, InplaceUpdateTupleLock))
+                   return;
+           }
+           break;
+       default:
+           Assert(!IsInplaceUpdateRelation(relation));
+           return;
+   }
+
+   switch (RelationGetRelid(relation))
+   {
+       case RelationRelationId:
+           {
+               /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
+               Form_pg_class classForm = (Form_pg_class) GETSTRUCT(newtup);
+               Oid         relid = classForm->oid;
+               Oid         dbid;
+               LOCKTAG     tag;
+
+               if (IsSharedRelation(relid))
+                   dbid = InvalidOid;
+               else
+                   dbid = MyDatabaseId;
+
+               if (classForm->relkind == RELKIND_INDEX)
+               {
+                   Relation    irel = index_open(relid, AccessShareLock);
+
+                   SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+                   index_close(irel, AccessShareLock);
+               }
+               else
+                   SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+               if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock) &&
+                   !LockOrStrongerHeldByMe(&tag, ShareRowExclusiveLock))
+                   elog(WARNING,
+                        "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                        NameStr(classForm->relname),
+                        relid,
+                        classForm->relkind,
+                        ItemPointerGetBlockNumber(otid),
+                        ItemPointerGetOffsetNumber(otid));
+           }
+           break;
+       case DatabaseRelationId:
+           {
+               /* LOCKTAG_TUPLE required */
+               Form_pg_database dbForm = (Form_pg_database) GETSTRUCT(newtup);
+
+               elog(WARNING,
+                    "missing lock on database \"%s\" (OID %u) @ TID (%u,%u)",
+                    NameStr(dbForm->datname),
+                    dbForm->oid,
+                    ItemPointerGetBlockNumber(otid),
+                    ItemPointerGetOffsetNumber(otid));
+           }
+           break;
+   }
+}
+
+/*
+ * Confirm adequate relation lock held, per rules from README.tuplock section
+ * "Locking to write inplace-updated tables".
+ */
+static void
+check_inplace_rel_lock(HeapTuple oldtup)
+{
+   Form_pg_class classForm = (Form_pg_class) GETSTRUCT(oldtup);
+   Oid         relid = classForm->oid;
+   Oid         dbid;
+   LOCKTAG     tag;
+
+   if (IsSharedRelation(relid))
+       dbid = InvalidOid;
+   else
+       dbid = MyDatabaseId;
+
+   if (classForm->relkind == RELKIND_INDEX)
+   {
+       Relation    irel = index_open(relid, AccessShareLock);
+
+       SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+       index_close(irel, AccessShareLock);
+   }
+   else
+       SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+   if (!LockOrStrongerHeldByMe(&tag, ShareUpdateExclusiveLock))
+       elog(WARNING,
+            "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+            NameStr(classForm->relname),
+            relid,
+            classForm->relkind,
+            ItemPointerGetBlockNumber(&oldtup->t_self),
+            ItemPointerGetOffsetNumber(&oldtup->t_self));
+}
+#endif
+
 /*
  * Check if the specified attribute's values are the same.  Subroutine for
  * HeapDetermineColumnsInfo.
@@ -6039,15 +6175,21 @@ heap_inplace_lock(Relation relation,
    TM_Result   result;
    bool        ret;
 
+#ifdef USE_ASSERT_CHECKING
+   if (RelationGetRelid(relation) == RelationRelationId)
+       check_inplace_rel_lock(oldtup_ptr);
+#endif
+
    Assert(BufferIsValid(buffer));
 
+   LockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
    LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
    /*----------
     * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
     *
     * - wait unconditionally
-    * - no tuple locks
+    * - already locked tuple above, since inplace needs that unconditionally
     * - don't recheck header after wait: simpler to defer to next iteration
     * - don't try to continue even if the updater aborts: likewise
     * - no crosscheck
@@ -6131,7 +6273,10 @@ heap_inplace_lock(Relation relation,
     * don't bother optimizing that.
     */
    if (!ret)
+   {
+       UnlockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
        InvalidateCatalogSnapshot();
+   }
    return ret;
 }
 
@@ -6140,6 +6285,8 @@ heap_inplace_lock(Relation relation,
  *
  * The tuple cannot change size, and therefore its header fields and null
  * bitmap (if any) don't change either.
+ *
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
  */
 void
 heap_inplace_update_and_unlock(Relation relation,
@@ -6223,6 +6370,7 @@ heap_inplace_unlock(Relation relation,
                    HeapTuple oldtup, Buffer buffer)
 {
    LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+   UnlockTuple(relation, &oldtup->t_self, InplaceUpdateTupleLock);
 }
 
 /*
index c846bf30d500187c5f67300156aba7bf42cfd7b4..ede9660c9ebae88d25137563d38a4329d49dede8 100644 (file)
@@ -755,7 +755,9 @@ systable_endscan_ordered(SysScanDesc sysscan)
  *
  * Overwriting violates both MVCC and transactional safety, so the uses of
  * this function in Postgres are extremely limited.  Nonetheless we find some
- * places to use it.  Standard flow:
+ * places to use it.  See README.tuplock section "Locking to write
+ * inplace-updated tables" and later sections for expectations of readers and
+ * writers of a table that gets inplace updates.  Standard flow:
  *
  * ... [any slow preparation not requiring oldtup] ...
  * systable_inplace_update_begin([...], &tup, &inplace_state);
index 567807b02803785f610d5b44a206f1c82ea0bf02..81a711691a5e407de780a1a9554b6544eae4d5a5 100644 (file)
@@ -70,6 +70,7 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/aclchk_internal.h"
 #include "utils/builtins.h"
@@ -1822,7 +1823,7 @@ ExecGrant_Relation(InternalGrant *istmt)
        HeapTuple   tuple;
        ListCell   *cell_colprivs;
 
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relOid));
        if (!HeapTupleIsValid(tuple))
            elog(ERROR, "cache lookup failed for relation %u", relOid);
        pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
@@ -2038,6 +2039,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                                         values, nulls, replaces);
 
            CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+           UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
            /* Update initial privileges for extensions */
            recordExtensionInitPriv(relOid, RelationRelationId, 0, new_acl);
@@ -2050,6 +2052,8 @@ ExecGrant_Relation(InternalGrant *istmt)
 
            pfree(new_acl);
        }
+       else
+           UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
        /*
         * Handle column-level privileges, if any were specified or implied.
@@ -2159,7 +2163,7 @@ ExecGrant_Database(InternalGrant *istmt)
        Oid        *newmembers;
        HeapTuple   tuple;
 
-       tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(datId));
+       tuple = SearchSysCacheLocked1(DATABASEOID, ObjectIdGetDatum(datId));
        if (!HeapTupleIsValid(tuple))
            elog(ERROR, "cache lookup failed for database %u", datId);
 
@@ -2228,6 +2232,7 @@ ExecGrant_Database(InternalGrant *istmt)
                                     nulls, replaces);
 
        CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+       UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
        /* Update the shared dependency ACL info */
        updateAclDependencies(DatabaseRelationId, pg_database_tuple->oid, 0,
index 1cf998ee97591ac90b2a2f21ba81510919508a45..56defca760d1eae2386fb9c86d693b06efa6f490 100644 (file)
@@ -140,6 +140,15 @@ IsCatalogRelationOid(Oid relid)
 /*
  * IsInplaceUpdateRelation
  *     True iff core code performs inplace updates on the relation.
+ *
+ *     This is used for assertions and for making the executor follow the
+ *     locking protocol described at README.tuplock section "Locking to write
+ *     inplace-updated tables".  Extensions may inplace-update other heap
+ *     tables, but concurrent SQL UPDATE on the same table may overwrite
+ *     those modifications.
+ *
+ *     The executor can assume these are not partitions or partitioned and
+ *     have no triggers.
  */
 bool
 IsInplaceUpdateRelation(Relation relation)
index 3e581d62efb5593e08184cb149186d52f5a8ff20..1904f96136430f09e8d7610cad81b214b75f76d8 100644 (file)
@@ -1750,6 +1750,7 @@ RenameDatabase(const char *oldname, const char *newname)
 {
    Oid         db_id;
    HeapTuple   newtup;
+   ItemPointerData otid;
    Relation    rel;
    int         notherbackends;
    int         npreparedxacts;
@@ -1821,11 +1822,13 @@ RenameDatabase(const char *oldname, const char *newname)
                 errdetail_busy_db(notherbackends, npreparedxacts)));
 
    /* rename */
-   newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+   newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
    if (!HeapTupleIsValid(newtup))
        elog(ERROR, "cache lookup failed for database %u", db_id);
+   otid = newtup->t_self;
    namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-   CatalogTupleUpdate(rel, &newtup->t_self, newtup);
+   CatalogTupleUpdate(rel, &otid, newtup);
+   UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2073,6 +2076,7 @@ movedb(const char *dbname, const char *tblspcname)
            ereport(ERROR,
                    (errcode(ERRCODE_UNDEFINED_DATABASE),
                     errmsg("database \"%s\" does not exist", dbname)));
+       LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
        MemSet(new_record, 0, sizeof(new_record));
        MemSet(new_record_nulls, false, sizeof(new_record_nulls));
@@ -2085,6 +2089,7 @@ movedb(const char *dbname, const char *tblspcname)
                                     new_record,
                                     new_record_nulls, new_record_repl);
        CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
+       UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2315,6 +2320,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
        ereport(ERROR,
                (errcode(ERRCODE_UNDEFINED_DATABASE),
                 errmsg("database \"%s\" does not exist", stmt->dbname)));
+   LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    datform = (Form_pg_database) GETSTRUCT(tuple);
    dboid = datform->oid;
@@ -2368,6 +2374,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
                                 new_record_nulls, new_record_repl);
    CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+   UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
 
@@ -2417,6 +2424,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
    if (!pg_database_ownercheck(db_id, GetUserId()))
        aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                       stmt->dbname);
+   LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
    oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2434,6 +2442,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
        bool        nulls[Natts_pg_database] = {0};
        bool        replaces[Natts_pg_database] = {0};
        Datum       values[Natts_pg_database] = {0};
+       HeapTuple   newtuple;
 
        ereport(NOTICE,
                (errmsg("changing version from %s to %s",
@@ -2442,14 +2451,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
        values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
        replaces[Anum_pg_database_datcollversion - 1] = true;
 
-       tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
-                                 values, nulls, replaces);
-       CatalogTupleUpdate(rel, &tuple->t_self, tuple);
-       heap_freetuple(tuple);
+       newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                    values, nulls, replaces);
+       CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+       heap_freetuple(newtuple);
    }
    else
        ereport(NOTICE,
                (errmsg("version has not changed")));
+   UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2561,6 +2571,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("permission denied to change owner of database")));
 
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
+
        memset(repl_null, false, sizeof(repl_null));
        memset(repl_repl, false, sizeof(repl_repl));
 
@@ -2585,6 +2597,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
        CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        heap_freetuple(newtuple);
 
index ee3f4c2f9d3f805036d9c0dc7a212776598cd484..157455c52b4f5da258099af598f3d8fab9fe12ff 100644 (file)
@@ -4287,14 +4287,17 @@ update_relispartition(Oid relationId, bool newval)
 {
    HeapTuple   tup;
    Relation    classRel;
+   ItemPointerData otid;
 
    classRel = table_open(RelationRelationId, RowExclusiveLock);
-   tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
+   tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
    if (!HeapTupleIsValid(tup))
        elog(ERROR, "cache lookup failed for relation %u", relationId);
+   otid = tup->t_self;
    Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
    ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
-   CatalogTupleUpdate(classRel, &tup->t_self, tup);
+   CatalogTupleUpdate(classRel, &otid, tup);
+   UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
    heap_freetuple(tup);
    table_close(classRel, RowExclusiveLock);
 }
index 4f6bf4c959c6cdd25c3da1690e6f9fcda5bf191d..f3050d8a1d241c1841054e7e88065c50a1c5038f 100644 (file)
@@ -3354,6 +3354,7 @@ SetRelationTableSpace(Relation rel,
 {
    Relation    pg_class;
    HeapTuple   tuple;
+   ItemPointerData otid;
    Form_pg_class rd_rel;
    Oid         reloid = RelationGetRelid(rel);
 
@@ -3362,9 +3363,10 @@ SetRelationTableSpace(Relation rel,
    /* Get a modifiable copy of the relation's pg_class row. */
    pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-   tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+   tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "cache lookup failed for relation %u", reloid);
+   otid = tuple->t_self;
    rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
    /* Update the pg_class row. */
@@ -3372,7 +3374,8 @@ SetRelationTableSpace(Relation rel,
        InvalidOid : newTableSpaceId;
    if (OidIsValid(newRelFileNode))
        rd_rel->relfilenode = newRelFileNode;
-   CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+   CatalogTupleUpdate(pg_class, &otid, tuple);
+   UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
 
    /*
     * Record dependency on tablespace.  This is only required for relations
@@ -3866,6 +3869,7 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
 {
    Relation    targetrelation;
    Relation    relrelation;    /* for RELATION relation */
+   ItemPointerData otid;
    HeapTuple   reltup;
    Form_pg_class relform;
    Oid         namespaceId;
@@ -3888,7 +3892,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
     */
    relrelation = table_open(RelationRelationId, RowExclusiveLock);
 
-   reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
+   reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
+   otid = reltup->t_self;
    if (!HeapTupleIsValid(reltup))  /* shouldn't happen */
        elog(ERROR, "cache lookup failed for relation %u", myrelid);
    relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -3915,7 +3920,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
     */
    namestrcpy(&(relform->relname), newrelname);
 
-   CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
+   CatalogTupleUpdate(relrelation, &otid, reltup);
+   UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
                                 InvalidOid, is_internal);
@@ -14405,7 +14411,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
 
    /* Fetch heap tuple */
    relid = RelationGetRelid(rel);
-   tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+   tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -14509,6 +14515,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
                                 repl_val, repl_null, repl_repl);
 
    CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
+   UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
 
    InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
 
@@ -16705,7 +16712,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
    ObjectAddress thisobj;
    bool        already_done = false;
 
-   classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+   /* no rel lock for relkind=c so use LOCKTAG_TUPLE */
+   classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
    if (!HeapTupleIsValid(classTup))
        elog(ERROR, "cache lookup failed for relation %u", relOid);
    classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -16724,6 +16732,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
    already_done = object_address_present(&thisobj, objsMoved);
    if (!already_done && oldNspOid != newNspOid)
    {
+       ItemPointerData otid = classTup->t_self;
+
        /* check for duplicate name (more friendly than unique-index failure) */
        if (get_relname_relid(NameStr(classForm->relname),
                              newNspOid) != InvalidOid)
@@ -16736,7 +16746,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
        /* classTup is a copy, so OK to scribble on */
        classForm->relnamespace = newNspOid;
 
-       CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
+       CatalogTupleUpdate(classRel, &otid, classTup);
+       UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
+
 
        /* Update dependency on schema if caller said so */
        if (hasDependEntry &&
@@ -16748,6 +16760,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
            elog(ERROR, "failed to change schema dependency for relation \"%s\"",
                 NameStr(classForm->relname));
    }
+   else
+       UnlockTuple(classRel, &classTup->t_self, InplaceUpdateTupleLock);
    if (!already_done)
    {
        add_exact_object_address(&thisobj, objsMoved);
index b24a05f156a15f2cae25af55a4d714c0407de7bc..2c7e35196ad972a628c93cc08ed5dc903452387a 100644 (file)
@@ -43,6 +43,7 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
@@ -999,6 +1000,10 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
    TriggerDesc *trigDesc = resultRel->trigdesc;
    FdwRoutine *fdwroutine;
 
+   /* Expect a fully-formed ResultRelInfo from InitResultRelInfo(). */
+   Assert(resultRelInfo->ri_needLockTagTuple ==
+          IsInplaceUpdateRelation(resultRel));
+
    switch (resultRel->rd_rel->relkind)
    {
        case RELKIND_RELATION:
@@ -1207,6 +1212,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
    resultRelInfo->ri_NumIndices = 0;
    resultRelInfo->ri_IndexRelationDescs = NULL;
    resultRelInfo->ri_IndexRelationInfo = NULL;
+   resultRelInfo->ri_needLockTagTuple =
+       IsInplaceUpdateRelation(resultRelationDesc);
    /* make a copy so as not to depend on relcache info not changing... */
    resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
    if (resultRelInfo->ri_TrigDesc)
index 125f1367d1a651274c7f17ab3f0b929a01e08f09..3ecb7efe365ae17685c065500a9f959ea2061c35 100644 (file)
@@ -19,6 +19,7 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/catalog.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -483,8 +484,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
    Relation    rel = resultRelInfo->ri_RelationDesc;
    ItemPointer tid = &(searchslot->tts_tid);
 
-   /* For now we support only tables. */
+   /*
+    * We support only non-system tables, with
+    * check_publication_add_relation() accountable.
+    */
    Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+   Assert(!IsCatalogRelation(rel));
 
    CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 
index 13849fd23481b07762aaeedae7f707934cade378..bcb46c8e781b1f5beccb408cc2bc406311e28892 100644 (file)
@@ -2307,6 +2307,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
    }
    else
    {
+       ItemPointerData lockedtid;
+
        /*
         * If we generate a new candidate tuple after EvalPlanQual testing, we
         * must loop back here to try again.  (We don't need to redo triggers,
@@ -2315,6 +2317,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
         * to do them again.)
         */
 redo_act:
+       lockedtid = *tupleid;
        result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
                               canSetTag, &updateCxt);
 
@@ -2408,6 +2411,14 @@ redo_act:
                                ExecInitUpdateProjection(context->mtstate,
                                                         resultRelInfo);
 
+                           if (resultRelInfo->ri_needLockTagTuple)
+                           {
+                               UnlockTuple(resultRelationDesc,
+                                           &lockedtid, InplaceUpdateTupleLock);
+                               LockTuple(resultRelationDesc,
+                                         tupleid, InplaceUpdateTupleLock);
+                           }
+
                            /* Fetch the most recent version of old tuple. */
                            oldSlot = resultRelInfo->ri_oldTupleSlot;
                            if (!table_tuple_fetch_row_version(resultRelationDesc,
@@ -2512,6 +2523,14 @@ ExecOnConflictUpdate(ModifyTableContext *context,
    TransactionId xmin;
    bool        isnull;
 
+   /*
+    * Parse analysis should have blocked ON CONFLICT for all system
+    * relations, which includes these.  There's no fundamental obstacle to
+    * supporting this; we'd just need to handle LOCKTAG_TUPLE like the other
+    * ExecUpdate() caller.
+    */
+   Assert(!resultRelInfo->ri_needLockTagTuple);
+
    /* Determine lock mode to use */
    lockmode = ExecUpdateLockMode(context->estate, resultRelInfo);
 
@@ -2795,18 +2814,20 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
                 ItemPointer tupleid, bool canSetTag)
 {
    ModifyTableState *mtstate = context->mtstate;
+   ItemPointerData lockedtid;
    TupleTableSlot *newslot;
    EState     *estate = context->estate;
    ExprContext *econtext = mtstate->ps.ps_ExprContext;
    bool        isNull;
    EPQState   *epqstate = &mtstate->mt_epqstate;
    ListCell   *l;
+   bool        no_further_action = true;
 
    /*
     * If there are no WHEN MATCHED actions, we are done.
     */
    if (resultRelInfo->ri_matchedMergeAction == NIL)
-       return true;
+       goto out;
 
    /*
     * Make tuple and any needed join variables available to ExecQual and
@@ -2820,6 +2841,20 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 
 lmerge_matched:;
 
+   if (resultRelInfo->ri_needLockTagTuple)
+   {
+       /*
+        * This locks even for CMD_DELETE, for CMD_NOTHING, and for tuples
+        * that don't match mas_whenqual.  MERGE on system catalogs is a minor
+        * use case, so don't bother optimizing those.
+        */
+       LockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                 InplaceUpdateTupleLock);
+       lockedtid = *tupleid;
+   }
+   else
+       ItemPointerSetInvalid(&lockedtid);
+
    /*
     * This routine is only invoked for matched rows, and we must have found
     * the tupleid of the target row in that case; fetch that tuple.
@@ -2890,7 +2925,7 @@ lmerge_matched:;
                                        tupleid, NULL, newslot, &result))
                {
                    if (result == TM_Ok)
-                       return true;    /* "do nothing" */
+                       goto out;   /* "do nothing" */
                    break;      /* concurrent update/delete */
                }
                result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
@@ -2906,7 +2941,7 @@ lmerge_matched:;
                if (updateCxt.crossPartUpdate)
                {
                    mtstate->mt_merge_updated += 1;
-                   return true;
+                   goto out;
                }
 
                if (result == TM_Ok && updateCxt.updated)
@@ -2923,7 +2958,7 @@ lmerge_matched:;
                                        NULL, NULL, &result))
                {
                    if (result == TM_Ok)
-                       return true;    /* "do nothing" */
+                       goto out;   /* "do nothing" */
                    break;      /* concurrent update/delete */
                }
                result = ExecDeleteAct(context, resultRelInfo, tupleid, false);
@@ -3001,7 +3036,8 @@ lmerge_matched:;
                 * If the tuple was already deleted, return to let caller
                 * handle it under NOT MATCHED clauses.
                 */
-               return false;
+               no_further_action = false;
+               goto out;
 
            case TM_Updated:
                {
@@ -3047,13 +3083,19 @@ lmerge_matched:;
                             * NOT MATCHED actions.
                             */
                            if (TupIsNull(epqslot))
-                               return false;
+                           {
+                               no_further_action = false;
+                               goto out;
+                           }
 
                            (void) ExecGetJunkAttribute(epqslot,
                                                        resultRelInfo->ri_RowIdAttNo,
                                                        &isNull);
                            if (isNull)
-                               return false;
+                           {
+                               no_further_action = false;
+                               goto out;
+                           }
 
                            /*
                             * When a tuple was updated and migrated to
@@ -3079,6 +3121,10 @@ lmerge_matched:;
                             * Update tupleid to that of the new tuple, for
                             * the refetch we do at the top.
                             */
+                           if (resultRelInfo->ri_needLockTagTuple)
+                               UnlockTuple(resultRelInfo->ri_RelationDesc,
+                                           &lockedtid,
+                                           InplaceUpdateTupleLock);
                            ItemPointerCopy(&context->tmfd.ctid, tupleid);
                            goto lmerge_matched;
 
@@ -3088,7 +3134,8 @@ lmerge_matched:;
                             * tuple already deleted; tell caller to run NOT
                             * MATCHED actions
                             */
-                           return false;
+                           no_further_action = false;
+                           goto out;
 
                        case TM_SelfModified:
 
@@ -3116,13 +3163,15 @@ lmerge_matched:;
 
                            /* This shouldn't happen */
                            elog(ERROR, "attempted to update or delete invisible tuple");
-                           return false;
+                           no_further_action = false;
+                           goto out;
 
                        default:
                            /* see table_tuple_lock call in ExecDelete() */
                            elog(ERROR, "unexpected table_tuple_lock status: %u",
                                 result);
-                           return false;
+                           no_further_action = false;
+                           goto out;
                    }
                }
 
@@ -3144,7 +3193,11 @@ lmerge_matched:;
    /*
     * Successfully executed an action or no qualifying action was found.
     */
-   return true;
+out:
+   if (ItemPointerIsValid(&lockedtid))
+       UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                   InplaceUpdateTupleLock);
+   return no_further_action;
 }
 
 /*
@@ -3590,6 +3643,7 @@ ExecModifyTable(PlanState *pstate)
    HeapTupleData oldtupdata;
    HeapTuple   oldtuple;
    ItemPointer tupleid;
+   bool        tuplock;
 
    CHECK_FOR_INTERRUPTS();
 
@@ -3832,6 +3886,8 @@ ExecModifyTable(PlanState *pstate)
                break;
 
            case CMD_UPDATE:
+               tuplock = false;
+
                /* Initialize projection info if first time for this table */
                if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
                    ExecInitUpdateProjection(node, resultRelInfo);
@@ -3843,6 +3899,7 @@ ExecModifyTable(PlanState *pstate)
                oldSlot = resultRelInfo->ri_oldTupleSlot;
                if (oldtuple != NULL)
                {
+                   Assert(!resultRelInfo->ri_needLockTagTuple);
                    /* Use the wholerow junk attr as the old tuple. */
                    ExecForceStoreHeapTuple(oldtuple, oldSlot, false);
                }
@@ -3851,6 +3908,11 @@ ExecModifyTable(PlanState *pstate)
                    /* Fetch the most recent version of old tuple. */
                    Relation    relation = resultRelInfo->ri_RelationDesc;
 
+                   if (resultRelInfo->ri_needLockTagTuple)
+                   {
+                       LockTuple(relation, tupleid, InplaceUpdateTupleLock);
+                       tuplock = true;
+                   }
                    if (!table_tuple_fetch_row_version(relation, tupleid,
                                                       SnapshotAny,
                                                       oldSlot))
@@ -3863,6 +3925,9 @@ ExecModifyTable(PlanState *pstate)
                /* Now apply the update. */
                slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
                                  slot, node->canSetTag);
+               if (tuplock)
+                   UnlockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                               InplaceUpdateTupleLock);
                break;
 
            case CMD_DELETE:
index 0ce4400b3210333eb4742590087e8edb715bdd62..658c249b23a04bef8ab0be9126ae2e27189204bd 100644 (file)
@@ -3724,6 +3724,7 @@ RelationSetNewRelfilenode(Relation relation, char persistence)
 {
    Oid         newrelfilenode;
    Relation    pg_class;
+   ItemPointerData otid;
    HeapTuple   tuple;
    Form_pg_class classform;
    MultiXactId minmulti = InvalidMultiXactId;
@@ -3766,11 +3767,12 @@ RelationSetNewRelfilenode(Relation relation, char persistence)
     */
    pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-   tuple = SearchSysCacheCopy1(RELOID,
-                               ObjectIdGetDatum(RelationGetRelid(relation)));
+   tuple = SearchSysCacheLockedCopy1(RELOID,
+                                     ObjectIdGetDatum(RelationGetRelid(relation)));
    if (!HeapTupleIsValid(tuple))
        elog(ERROR, "could not find tuple for relation %u",
             RelationGetRelid(relation));
+   otid = tuple->t_self;
    classform = (Form_pg_class) GETSTRUCT(tuple);
 
    /*
@@ -3890,9 +3892,10 @@ RelationSetNewRelfilenode(Relation relation, char persistence)
        classform->relminmxid = minmulti;
        classform->relpersistence = persistence;
 
-       CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_class, &otid, tuple);
    }
 
+   UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
    heap_freetuple(tuple);
 
    table_close(pg_class, RowExclusiveLock);
index 1912b121463d4b1076e06acf5f51fb632fbb757d..2f619f61807bebd7e859dd241291d786e0caebac 100644 (file)
 #include "catalog/pg_type.h"
 #include "catalog/pg_user_mapping.h"
 #include "lib/qunique.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
 #include "utils/catcache.h"
+#include "utils/inval.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
@@ -1223,6 +1226,98 @@ ReleaseSysCache(HeapTuple tuple)
    ReleaseCatCache(tuple);
 }
 
+/*
+ * SearchSysCacheLocked1
+ *
+ * Combine SearchSysCache1() with acquiring a LOCKTAG_TUPLE at mode
+ * InplaceUpdateTupleLock.  This is a tool for complying with the
+ * README.tuplock section "Locking to write inplace-updated tables".  After
+ * the caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock)
+ * and ReleaseSysCache().
+ *
+ * The returned tuple may be the subject of an uncommitted update, so this
+ * doesn't prevent the "tuple concurrently updated" error.
+ */
+HeapTuple
+SearchSysCacheLocked1(int cacheId,
+                     Datum key1)
+{
+   ItemPointerData tid;
+   LOCKTAG     tag;
+   Oid         dboid =
+       SysCache[cacheId]->cc_relisshared ? InvalidOid : MyDatabaseId;
+   Oid         reloid = cacheinfo[cacheId].reloid;
+
+   /*----------
+    * Since inplace updates may happen just before our LockTuple(), we must
+    * return content acquired after LockTuple() of the TID we return.  If we
+    * just fetched twice instead of looping, the following sequence would
+    * defeat our locking:
+    *
+    * GRANT:   SearchSysCache1() = TID (1,5)
+    * GRANT:   LockTuple(pg_class, (1,5))
+    * [no more inplace update of (1,5) until we release the lock]
+    * CLUSTER: SearchSysCache1() = TID (1,5)
+    * CLUSTER: heap_update() = TID (1,8)
+    * CLUSTER: COMMIT
+    * GRANT:   SearchSysCache1() = TID (1,8)
+    * GRANT:   return (1,8) from SearchSysCacheLocked1()
+    * VACUUM:  SearchSysCache1() = TID (1,8)
+    * VACUUM:  LockTuple(pg_class, (1,8))  # two TIDs now locked for one rel
+    * VACUUM:  inplace update
+    * GRANT:   heap_update() = (1,9)  # lose inplace update
+    *
+    * In the happy case, this takes two fetches, one to determine the TID to
+    * lock and another to get the content and confirm the TID didn't change.
+    *
+    * This is valid even if the row gets updated to a new TID, the old TID
+    * becomes LP_UNUSED, and the row gets updated back to its old TID.  We'd
+    * still hold the right LOCKTAG_TUPLE and a copy of the row captured after
+    * the LOCKTAG_TUPLE.
+    */
+   ItemPointerSetInvalid(&tid);
+   for (;;)
+   {
+       HeapTuple   tuple;
+       LOCKMODE    lockmode = InplaceUpdateTupleLock;
+
+       tuple = SearchSysCache1(cacheId, key1);
+       if (ItemPointerIsValid(&tid))
+       {
+           if (!HeapTupleIsValid(tuple))
+           {
+               LockRelease(&tag, lockmode, false);
+               return tuple;
+           }
+           if (ItemPointerEquals(&tid, &tuple->t_self))
+               return tuple;
+           LockRelease(&tag, lockmode, false);
+       }
+       else if (!HeapTupleIsValid(tuple))
+           return tuple;
+
+       tid = tuple->t_self;
+       ReleaseSysCache(tuple);
+       /* like: LockTuple(rel, &tid, lockmode) */
+       SET_LOCKTAG_TUPLE(tag, dboid, reloid,
+                         ItemPointerGetBlockNumber(&tid),
+                         ItemPointerGetOffsetNumber(&tid));
+       (void) LockAcquire(&tag, lockmode, false, false);
+
+       /*
+        * If an inplace update just finished, ensure we process the syscache
+        * inval.  XXX this is insufficient: the inplace updater may not yet
+        * have reached AtEOXact_Inval().  See test at inplace-inval.spec.
+        *
+        * If a heap_update() call just released its LOCKTAG_TUPLE, we'll
+        * probably find the old tuple and reach "tuple concurrently updated".
+        * If that heap_update() aborts, our LOCKTAG_TUPLE blocks inplace
+        * updates while our caller works.
+        */
+       AcceptInvalidationMessages();
+   }
+}
+
 /*
  * SearchSysCacheCopy
  *
@@ -1249,6 +1344,28 @@ SearchSysCacheCopy(int cacheId,
    return newtuple;
 }
 
+/*
+ * SearchSysCacheLockedCopy1
+ *
+ * Meld SearchSysCacheLockedCopy1 with SearchSysCacheCopy().  After the
+ * caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock) and
+ * heap_freetuple().
+ */
+HeapTuple
+SearchSysCacheLockedCopy1(int cacheId,
+                         Datum key1)
+{
+   HeapTuple   tuple,
+               newtuple;
+
+   tuple = SearchSysCacheLocked1(cacheId, key1);
+   if (!HeapTupleIsValid(tuple))
+       return tuple;
+   newtuple = heap_copytuple(tuple);
+   ReleaseSysCache(tuple);
+   return newtuple;
+}
+
 /*
  * SearchSysCacheExists
  *
index 8b4d38257fb8b3447896ca1e6ff9a2549ec5f693..a1a6a564f19563adf58816f9d917f5f94588b3a5 100644 (file)
@@ -556,6 +556,9 @@ typedef struct ResultRelInfo
     * one of its ancestors; see ExecCrossPartitionUpdateForeignKey().
     */
    List       *ri_ancestorResultRels;
+
+   /* updates do LockTuple() before oldtup read; see README.tuplock */
+   bool        ri_needLockTagTuple;
 } ResultRelInfo;
 
 /*
index 350ddd4da49912b7dfd171b167fa571069ee2e71..1b99533762869e2fb3238b2499f1d3bfadf35b92 100644 (file)
@@ -47,6 +47,8 @@ typedef int LOCKMODE;
 
 #define MaxLockMode                8   /* highest standard lock mode */
 
+/* See README.tuplock section "Locking to write inplace-updated tables" */
+#define InplaceUpdateTupleLock ExclusiveLock
 
 /* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
index 4463ea66bea53db5865351353c0feaad43a22262..8860a72b7116fefd3b0720a13fd4db3a35a61857 100644 (file)
@@ -139,9 +139,14 @@ extern HeapTuple SearchSysCache4(int cacheId,
 
 extern void ReleaseSysCache(HeapTuple tuple);
 
+extern HeapTuple SearchSysCacheLocked1(int cacheId,
+                                      Datum key1);
+
 /* convenience routines */
 extern HeapTuple SearchSysCacheCopy(int cacheId,
                                    Datum key1, Datum key2, Datum key3, Datum key4);
+extern HeapTuple SearchSysCacheLockedCopy1(int cacheId,
+                                          Datum key1);
 extern bool SearchSysCacheExists(int cacheId,
                                 Datum key1, Datum key2, Datum key3, Datum key4);
 extern Oid GetSysCacheOid(int cacheId, AttrNumber oidcol,
index fe26984c0e04700c5013ed85fbe4719dbbf1fb6a..b5fe8b06f76184148bac4efeb9736c0d3da0d5ab 100644 (file)
@@ -100,7 +100,7 @@ f
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
 step c2: COMMIT;
 
-starting permutation: b3 sfu3 b1 grant1 read2 as3 addk2 r3 c1 read2
+starting permutation: b3 sfu3 b1 grant1 read2 addk2 r3 c1 read2
 step b3: BEGIN ISOLATION LEVEL READ COMMITTED;
 step sfu3: 
    SELECT relhasindex FROM pg_class
@@ -124,7 +124,6 @@ relhasindex
 f          
 (1 row)
 
-step as3: LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE;
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); 
 step r3: ROLLBACK;
 step grant1: <... completed>
@@ -155,9 +154,11 @@ step b1: BEGIN;
 step grant1: 
    GRANT SELECT ON intra_grant_inplace TO PUBLIC;
  
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
-step c2: COMMIT;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); 
+step addk2: <... completed>
+ERROR:  deadlock detected
 step grant1: <... completed>
+step c2: COMMIT;
 step c1: COMMIT;
 step read2: 
    SELECT relhasindex FROM pg_class
@@ -195,9 +196,8 @@ relhasindex
 f          
 (1 row)
 
-s4: WARNING:  got: tuple concurrently updated
-step revoke4: <... completed>
 step r3: ROLLBACK;
+step revoke4: <... completed>
 
 starting permutation: b1 drop1 b3 sfu3 revoke4 c1 r3
 step b1: BEGIN;
@@ -224,6 +224,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: tuple concurrently deleted
+s4: WARNING:  got: cache lookup failed for relation REDACTED
 step revoke4: <... completed>
 step r3: ROLLBACK;
index 1bcf5d3eca4bd40ffe1955f568c8dca837dbc5c0..76d12326b50ba09977f7b7fb471d2145304bf78d 100644 (file)
@@ -190,7 +190,7 @@ step simplepartupdate_noroute {
    update parttbl set b = 2 where c = 1 returning *;
 }
 
-# test system class updates
+# test system class LockTuple()
 
 step sys1  {
    UPDATE pg_class SET reltuples = 123 WHERE oid = 'accounts'::regclass;
index d07ed3bb2cc8f9d3b04c14c80364de0f822e3142..2992c85b44ddaf46ba0476a6a2199e3db63c213e 100644 (file)
@@ -14,6 +14,7 @@ teardown
 
 # heap_update()
 session s1
+setup  { SET deadlock_timeout = '100s'; }
 step b1    { BEGIN; }
 step grant1    {
    GRANT SELECT ON intra_grant_inplace TO PUBLIC;
@@ -25,6 +26,7 @@ step c1   { COMMIT; }
 
 # inplace update
 session s2
+setup  { SET deadlock_timeout = '10ms'; }
 step read2 {
    SELECT relhasindex FROM pg_class
    WHERE oid = 'intra_grant_inplace'::regclass;
@@ -48,7 +50,6 @@ step sfu3 {
    SELECT relhasindex FROM pg_class
    WHERE oid = 'intra_grant_inplace'::regclass FOR UPDATE;
 }
-step as3   { LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE; }
 step r3    { ROLLBACK; }
 
 # Additional heap_update()
@@ -74,8 +75,6 @@ step keyshr5  {
 teardown   { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned future LockTuple()
-
 permutation
    b1
    grant1
@@ -118,7 +117,6 @@ permutation
    b1
    grant1(r3)  # acquire LockTuple(), await sfu3 xmax
    read2
-   as3         # XXX temporary until patch adds locking to addk2
    addk2(c1)   # block in LockTuple() behind grant1
    r3          # unblock grant1; addk2 now awaits grant1 xmax
    c1
@@ -128,8 +126,8 @@ permutation
    b2
    sfnku2
    b1
-   grant1(c2)      # acquire LockTuple(), await sfnku2 xmax
-   addk2           # block in LockTuple() behind grant1 = deadlock
+   grant1(addk2)   # acquire LockTuple(), await sfnku2 xmax
+   addk2(*)        # block in LockTuple() behind grant1 = deadlock
    c2
    c1
    read2
@@ -140,7 +138,7 @@ permutation
    grant1
    b3
    sfu3(c1)    # acquire LockTuple(), await grant1 xmax
-   revoke4(sfu3)   # block in LockTuple() behind sfu3
+   revoke4(r3) # block in LockTuple() behind sfu3
    c1
    r3          # revoke4 unlocks old tuple and finds new