Modify sequence state storage to eliminate dangling-pointer problem
authorTom Lane
Wed, 22 May 2002 21:40:55 +0000 (21:40 +0000)
committerTom Lane
Wed, 22 May 2002 21:40:55 +0000 (21:40 +0000)
exemplified by bug #671.  Moving the storage to relcache turned out to
be a bad idea because relcache might decide to discard the info.  Instead,
open and close the relcache entry on each sequence operation, and use
a record of the current XID to discover whether we already hold
AccessShareLock on the sequence.

src/backend/access/transam/xact.c
src/backend/commands/define.c
src/backend/commands/sequence.c
src/include/commands/defrem.h
src/include/commands/sequence.h

index 10de2f8a6d539eec2b56983aa325cf0b56afaa13..b874b4790ad37b53f32634df1a11cb8b880ec2de 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.123 2002/05/21 22:05:53 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.124 2002/05/22 21:40:55 tgl Exp $
  *
  * NOTES
  *     Transaction aborts can now occur two ways:
 #include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
-#include "commands/sequence.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
 #include "libpq/be-fsstubs.h"
@@ -947,7 +946,6 @@ CommitTransaction(void)
    /* NOTIFY commit must also come before lower-level cleanup */
    AtCommit_Notify();
 
-   CloseSequences();
    AtEOXact_portals();
 
    /* Here is where we really truly commit. */
@@ -1063,7 +1061,6 @@ AbortTransaction(void)
    DeferredTriggerAbortXact();
    lo_commit(false);           /* 'false' means it's abort */
    AtAbort_Notify();
-   CloseSequences();
    AtEOXact_portals();
 
    /* Advertise the fact that we aborted in pg_clog. */
index 889ddd0f44026a578d78c8cfb1d67d890cd0ec54..20d2a61de4795049e8bc10825932515329bc8ce2 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.76 2002/04/15 05:22:03 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.77 2002/05/22 21:40:55 tgl Exp $
  *
  * DESCRIPTION
  *   The "DefineFoo" routines take the parse tree and pick out the
@@ -37,6 +37,7 @@
 
 #include "commands/defrem.h"
 #include "parser/parse_type.h"
+#include "utils/int8.h"
 
 
 /*
@@ -114,6 +115,34 @@ defGetNumeric(DefElem *def)
    return 0;                   /* keep compiler quiet */
 }
 
+/*
+ * Extract an int64 value from a DefElem.
+ */
+int64
+defGetInt64(DefElem *def)
+{
+   if (def->arg == NULL)
+       elog(ERROR, "Define: \"%s\" requires a numeric value",
+            def->defname);
+   switch (nodeTag(def->arg))
+   {
+       case T_Integer:
+           return (int64) intVal(def->arg);
+       case T_Float:
+           /*
+            * Values too large for int4 will be represented as Float
+            * constants by the lexer.  Accept these if they are valid int8
+            * strings.
+            */
+           return DatumGetInt64(DirectFunctionCall1(int8in,
+                                    CStringGetDatum(strVal(def->arg))));
+       default:
+           elog(ERROR, "Define: \"%s\" requires a numeric value",
+                def->defname);
+   }
+   return 0;                   /* keep compiler quiet */
+}
+
 /*
  * Extract a possibly-qualified name (as a List of Strings) from a DefElem.
  */
index c79e6f97e1ba6bfd5b39ca453664ce2af4bc72fe..8d90c81d3cbc847be21f381b6ec75994204dd320 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.78 2002/05/21 22:05:54 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.79 2002/05/22 21:40:55 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "access/heapam.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_type.h"
+#include "commands/defrem.h"
 #include "commands/tablecmds.h"
 #include "commands/sequence.h"
 #include "miscadmin.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
-#include "utils/int8.h"
 
 
-#define SEQ_MAGIC    0x1717
-
 #ifndef INT64_IS_BUSTED
 #ifdef HAVE_LL_CONSTANTS
 #define SEQ_MAXVALUE   ((int64) 0x7FFFFFFFFFFFFFFFLL)
  */
 #define SEQ_LOG_VALS   32
 
+/*
+ * The "special area" of a sequence's buffer page looks like this.
+ */
+#define SEQ_MAGIC    0x1717
+
 typedef struct sequence_magic
 {
    uint32      magic;
 } sequence_magic;
 
+/*
+ * We store a SeqTable item for every sequence we have touched in the current
+ * session.  This is needed to hold onto nextval/currval state.  (We can't
+ * rely on the relcache, since it's only, well, a cache, and may decide to
+ * discard entries.)
+ *
+ * XXX We use linear search to find pre-existing SeqTable entries.  This is
+ * good when only a small number of sequences are touched in a session, but
+ * would suck with many different sequences.  Perhaps use a hashtable someday.
+ */
 typedef struct SeqTableData
 {
-   Oid         relid;
-   Relation    rel;            /* NULL if rel is not open in cur xact */
-   int64       cached;
-   int64       last;
-   int64       increment;
-   struct SeqTableData *next;
+   struct SeqTableData *next;  /* link to next SeqTable object */
+   Oid         relid;          /* pg_class OID of this sequence */
+   TransactionId xid;          /* xact in which we last did a seq op */
+   int64       last;           /* value last returned by nextval */
+   int64       cached;         /* last value already cached for nextval */
+   /* if last != cached, we have not used up all the cached values */
+   int64       increment;      /* copy of sequence's increment field */
 } SeqTableData;
 
 typedef SeqTableData *SeqTable;
 
-static SeqTable seqtab = NULL;
+static SeqTable seqtab = NULL; /* Head of list of SeqTable items */
 
-static SeqTable init_sequence(char *caller, RangeVar *relation);
-static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
+
+static void init_sequence(const char *caller, RangeVar *relation,
+                         SeqTable *p_elm, Relation *p_rel);
+static Form_pg_sequence read_info(const char *caller, SeqTable elm,
+                                 Relation rel, Buffer *buf);
 static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
-static int64 get_param(DefElem *def);
 static void do_setval(RangeVar *sequence, int64 next, bool iscalled);
 
 /*
@@ -281,6 +297,7 @@ nextval(PG_FUNCTION_ARGS)
    text       *seqin = PG_GETARG_TEXT_P(0);
    RangeVar   *sequence;
    SeqTable    elm;
+   Relation    seqrel;
    Buffer      buf;
    Page        page;
    Form_pg_sequence seq;
@@ -300,7 +317,7 @@ nextval(PG_FUNCTION_ARGS)
                                                                "nextval"));
 
    /* open and AccessShareLock sequence */
-   elm = init_sequence("nextval", sequence);
+   init_sequence("nextval", sequence, &elm, &seqrel);
 
    if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
        elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
@@ -309,11 +326,12 @@ nextval(PG_FUNCTION_ARGS)
    if (elm->last != elm->cached)       /* some numbers were cached */
    {
        elm->last += elm->increment;
+       relation_close(seqrel, NoLock);
        PG_RETURN_INT64(elm->last);
    }
 
-   seq = read_info("nextval", elm, &buf);      /* lock page' buffer and
-                                                * read tuple */
+   /* lock page' buffer and read tuple */
+   seq = read_info("nextval", elm, seqrel, &buf);
    page = BufferGetPage(buf);
 
    last = next = result = seq->last_value;
@@ -421,7 +439,7 @@ nextval(PG_FUNCTION_ARGS)
        XLogRecPtr  recptr;
        XLogRecData rdata[2];
 
-       xlrec.node = elm->rel->rd_node;
+       xlrec.node = seqrel->rd_node;
        rdata[0].buffer = InvalidBuffer;
        rdata[0].data = (char *) &xlrec;
        rdata[0].len = sizeof(xl_seq_rec);
@@ -453,6 +471,8 @@ nextval(PG_FUNCTION_ARGS)
    if (WriteBuffer(buf) == STATUS_ERROR)
        elog(ERROR, "%s.nextval: WriteBuffer failed", sequence->relname);
 
+   relation_close(seqrel, NoLock);
+
    PG_RETURN_INT64(result);
 }
 
@@ -462,13 +482,14 @@ currval(PG_FUNCTION_ARGS)
    text       *seqin = PG_GETARG_TEXT_P(0);
    RangeVar   *sequence;
    SeqTable    elm;
+   Relation    seqrel;
    int64       result;
 
    sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,
                                                                "currval"));
 
    /* open and AccessShareLock sequence */
-   elm = init_sequence("currval", sequence);
+   init_sequence("currval", sequence, &elm, &seqrel);
 
    if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK)
        elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
@@ -480,6 +501,8 @@ currval(PG_FUNCTION_ARGS)
 
    result = elm->last;
 
+   relation_close(seqrel, NoLock);
+
    PG_RETURN_INT64(result);
 }
 
@@ -500,18 +523,19 @@ static void
 do_setval(RangeVar *sequence, int64 next, bool iscalled)
 {
    SeqTable    elm;
+   Relation    seqrel;
    Buffer      buf;
    Form_pg_sequence seq;
 
    /* open and AccessShareLock sequence */
-   elm = init_sequence("setval", sequence);
+   init_sequence("setval", sequence, &elm, &seqrel);
 
    if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
        elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
             sequence->relname, sequence->relname);
 
    /* lock page' buffer and read tuple */
-   seq = read_info("setval", elm, &buf);
+   seq = read_info("setval", elm, seqrel, &buf);
 
    if ((next < seq->min_value) || (next > seq->max_value))
        elog(ERROR, "%s.setval: value " INT64_FORMAT " is out of bounds (" INT64_FORMAT "," INT64_FORMAT ")",
@@ -529,7 +553,7 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled)
        XLogRecData rdata[2];
        Page        page = BufferGetPage(buf);
 
-       xlrec.node = elm->rel->rd_node;
+       xlrec.node = seqrel->rd_node;
        rdata[0].buffer = InvalidBuffer;
        rdata[0].data = (char *) &xlrec;
        rdata[0].len = sizeof(xl_seq_rec);
@@ -559,6 +583,8 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled)
 
    if (WriteBuffer(buf) == STATUS_ERROR)
        elog(ERROR, "%s.setval: WriteBuffer failed", sequence->relname);
+
+   relation_close(seqrel, NoLock);
 }
 
 /*
@@ -600,84 +626,48 @@ setval_and_iscalled(PG_FUNCTION_ARGS)
    PG_RETURN_INT64(next);
 }
 
-static Form_pg_sequence
-read_info(char *caller, SeqTable elm, Buffer *buf)
-{
-   PageHeader  page;
-   ItemId      lp;
-   HeapTupleData tuple;
-   sequence_magic *sm;
-   Form_pg_sequence seq;
 
-   if (elm->rel->rd_nblocks > 1)
-       elog(ERROR, "%s.%s: invalid number of blocks in sequence",
-            RelationGetRelationName(elm->rel), caller);
-
-   *buf = ReadBuffer(elm->rel, 0);
-   if (!BufferIsValid(*buf))
-       elog(ERROR, "%s.%s: ReadBuffer failed",
-            RelationGetRelationName(elm->rel), caller);
-
-   LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
-
-   page = (PageHeader) BufferGetPage(*buf);
-   sm = (sequence_magic *) PageGetSpecialPointer(page);
-
-   if (sm->magic != SEQ_MAGIC)
-       elog(ERROR, "%s.%s: bad magic (%08X)",
-            RelationGetRelationName(elm->rel), caller, sm->magic);
-
-   lp = PageGetItemId(page, FirstOffsetNumber);
-   Assert(ItemIdIsUsed(lp));
-   tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
-
-   seq = (Form_pg_sequence) GETSTRUCT(&tuple);
-
-   elm->increment = seq->increment_by;
-
-   return seq;
-}
-
-
-static SeqTable
-init_sequence(char *caller, RangeVar *relation)
+/*
+ * Given a relation name, open and lock the sequence.  p_elm and p_rel are
+ * output parameters.
+ */
+static void
+init_sequence(const char *caller, RangeVar *relation,
+             SeqTable *p_elm, Relation *p_rel)
 {
    Oid         relid = RangeVarGetRelid(relation, false);
-   SeqTable    elm,
-               prev = (SeqTable) NULL;
+   TransactionId thisxid = GetCurrentTransactionId();
+   SeqTable    elm;
    Relation    seqrel;
    
    /* Look to see if we already have a seqtable entry for relation */
-   for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
+   for (elm = seqtab; elm != NULL; elm = elm->next)
    {
        if (elm->relid == relid)
            break;
-       prev = elm;
    }
 
-   /* If so, and if it's already been opened in this xact, just return it */
-   if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
-       return elm;
+   /*
+    * Open the sequence relation, acquiring AccessShareLock if we don't
+    * already have a lock in the current xact.
+    */
+   if (elm == NULL || elm->xid != thisxid)
+       seqrel = relation_open(relid, AccessShareLock);
+   else
+       seqrel = relation_open(relid, NoLock);
 
-   /* Else open and check it */
-   seqrel = heap_open(relid, AccessShareLock);
    if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
        elog(ERROR, "%s.%s: %s is not a sequence",
             relation->relname, caller, relation->relname);
 
    /*
-    * If elm exists but elm->rel is NULL, the seqtable entry is left over
-    * from a previous xact -- update the entry and reuse it.
+    * Allocate new seqtable entry if we didn't find one.
     *
     * NOTE: seqtable entries remain in the list for the life of a backend.
     * If the sequence itself is deleted then the entry becomes wasted memory,
     * but it's small enough that this should not matter.
     */ 
-   if (elm != (SeqTable) NULL)
-   {
-       elm->rel = seqrel;
-   }
-   else
+   if (elm == NULL)
    {
        /*
         * Time to make a new seqtable entry.  These entries live as long
@@ -686,40 +676,59 @@ init_sequence(char *caller, RangeVar *relation)
        elm = (SeqTable) malloc(sizeof(SeqTableData));
        if (elm == NULL)
            elog(ERROR, "Memory exhausted in init_sequence");
-       elm->rel = seqrel;
        elm->relid = relid;
-       elm->cached = elm->last = elm->increment = 0;
-       elm->next = (SeqTable) NULL;
-
-       if (seqtab == (SeqTable) NULL)
-           seqtab = elm;
-       else
-           prev->next = elm;
+       /* increment is set to 0 until we do read_info (see currval) */
+       elm->last = elm->cached = elm->increment = 0;
+       elm->next = seqtab;
+       seqtab = elm;
    }
 
-   return elm;
+   /* Flag that we have a lock in the current xact. */
+   elm->xid = thisxid;
+
+   *p_elm = elm;
+   *p_rel = seqrel;
 }
 
 
-/*
- * CloseSequences
- *             is called by xact mgr at commit/abort.
- */
-void
-CloseSequences(void)
+/* Given an opened relation, lock the page buffer and find the tuple */
+static Form_pg_sequence
+read_info(const char *caller, SeqTable elm,
+         Relation rel, Buffer *buf)
 {
-   SeqTable    elm;
-   Relation    rel;
+   PageHeader  page;
+   ItemId      lp;
+   HeapTupleData tuple;
+   sequence_magic *sm;
+   Form_pg_sequence seq;
 
-   for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
-   {
-       rel = elm->rel;
-       if (rel != (Relation) NULL)     /* opened in current xact */
-       {
-           elm->rel = (Relation) NULL;
-           heap_close(rel, AccessShareLock);
-       }
-   }
+   if (rel->rd_nblocks > 1)
+       elog(ERROR, "%s.%s: invalid number of blocks in sequence",
+            RelationGetRelationName(rel), caller);
+
+   *buf = ReadBuffer(rel, 0);
+   if (!BufferIsValid(*buf))
+       elog(ERROR, "%s.%s: ReadBuffer failed",
+            RelationGetRelationName(rel), caller);
+
+   LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+
+   page = (PageHeader) BufferGetPage(*buf);
+   sm = (sequence_magic *) PageGetSpecialPointer(page);
+
+   if (sm->magic != SEQ_MAGIC)
+       elog(ERROR, "%s.%s: bad magic (%08X)",
+            RelationGetRelationName(rel), caller, sm->magic);
+
+   lp = PageGetItemId(page, FirstOffsetNumber);
+   Assert(ItemIdIsUsed(lp));
+   tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
+
+   seq = (Form_pg_sequence) GETSTRUCT(&tuple);
+
+   elm->increment = seq->increment_by;
+
+   return seq;
 }
 
 
@@ -761,7 +770,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
 
    if (increment_by == (DefElem *) NULL)       /* INCREMENT BY */
        new->increment_by = 1;
-   else if ((new->increment_by = get_param(increment_by)) == 0)
+   else if ((new->increment_by = defGetInt64(increment_by)) == 0)
        elog(ERROR, "DefineSequence: can't INCREMENT by 0");
 
    if (max_value == (DefElem *) NULL)  /* MAXVALUE */
@@ -772,7 +781,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
            new->max_value = -1;    /* descending seq */
    }
    else
-       new->max_value = get_param(max_value);
+       new->max_value = defGetInt64(max_value);
 
    if (min_value == (DefElem *) NULL)  /* MINVALUE */
    {
@@ -782,7 +791,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
            new->min_value = SEQ_MINVALUE;      /* descending seq */
    }
    else
-       new->min_value = get_param(min_value);
+       new->min_value = defGetInt64(min_value);
 
    if (new->min_value >= new->max_value)
        elog(ERROR, "DefineSequence: MINVALUE (" INT64_FORMAT ") can't be >= MAXVALUE (" INT64_FORMAT ")",
@@ -796,7 +805,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
            new->last_value = new->max_value;   /* descending seq */
    }
    else
-       new->last_value = get_param(last_value);
+       new->last_value = defGetInt64(last_value);
 
    if (new->last_value < new->min_value)
        elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be < MINVALUE (" INT64_FORMAT ")",
@@ -807,33 +816,12 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
 
    if (cache_value == (DefElem *) NULL)        /* CACHE */
        new->cache_value = 1;
-   else if ((new->cache_value = get_param(cache_value)) <= 0)
+   else if ((new->cache_value = defGetInt64(cache_value)) <= 0)
        elog(ERROR, "DefineSequence: CACHE (" INT64_FORMAT ") can't be <= 0",
             new->cache_value);
 
 }
 
-static int64
-get_param(DefElem *def)
-{
-   if (def->arg == (Node *) NULL)
-       elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
-
-   if (IsA(def->arg, Integer))
-       return (int64) intVal(def->arg);
-
-   /*
-    * Values too large for int4 will be represented as Float constants by
-    * the lexer.  Accept these if they are valid int8 strings.
-    */
-   if (IsA(def->arg, Float))
-       return DatumGetInt64(DirectFunctionCall1(int8in,
-                                    CStringGetDatum(strVal(def->arg))));
-
-   /* Shouldn't get here unless parser messed up */
-   elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
-   return 0;                   /* not reached; keep compiler quiet */
-}
 
 void
 seq_redo(XLogRecPtr lsn, XLogRecord *record)
index cf9048c714be3e01ba235eeac58c517b063651ec..5ebead414d10ed19a5f0f1c2ba07b2d478088719 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: defrem.h,v 1.37 2002/05/17 18:32:52 petere Exp $
+ * $Id: defrem.h,v 1.38 2002/05/22 21:40:55 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -54,12 +54,13 @@ extern void DefineDomain(CreateDomainStmt *stmt);
 extern void RemoveDomain(List *names, int behavior);
 
 
-/* support routines in define.c */
+/* support routines in commands/define.c */
 
 extern void case_translate_language_name(const char *input, char *output);
 
 extern char *defGetString(DefElem *def);
 extern double defGetNumeric(DefElem *def);
+extern int64 defGetInt64(DefElem *def);
 extern List *defGetQualifiedName(DefElem *def);
 extern TypeName *defGetTypeName(DefElem *def);
 extern int defGetTypeLength(DefElem *def);
index 48c0673cdf94e2c6af58b903fa5d1055fffaa251..e91523ac47d7fa81c7d66bd4121b8f05b7f7707a 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: sequence.h,v 1.19 2001/11/05 17:46:33 momjian Exp $
+ * $Id: sequence.h,v 1.20 2002/05/22 21:40:55 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -84,7 +84,6 @@ extern Datum setval(PG_FUNCTION_ARGS);
 extern Datum setval_and_iscalled(PG_FUNCTION_ARGS);
 
 extern void DefineSequence(CreateSeqStmt *stmt);
-extern void CloseSequences(void);
 
 extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr);
 extern void seq_undo(XLogRecPtr lsn, XLogRecord *rptr);