Re-allow specification of a new default value for an inherited column
authorTom Lane
Fri, 30 Mar 2001 20:50:36 +0000 (20:50 +0000)
committerTom Lane
Fri, 30 Mar 2001 20:50:36 +0000 (20:50 +0000)
in CREATE TABLE, but give a warning notice.  Clean up inconsistent
handling of defaults and NOT NULL flags from multiply-inherited columns.
Per pghackers discussion 28-Mar through 30-Mar.

src/backend/commands/creatinh.c
src/test/regress/expected/create_table.out
src/test/regress/expected/inherit.out

index 602ceb54328f46547c3c5bd9e552b07d890eb74e..7aaaa3cdebf181768dae88168ff1094eaba29320 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.74 2001/03/22 06:16:11 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.75 2001/03/30 20:50:36 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
  * ----------------
  */
 
-static int checkAttrExists(const char *attributeName,
-               const char *attributeType, List *schema);
 static List *MergeAttributes(List *schema, List *supers, bool istemp,
                List **supOids, List **supconstr);
+static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
 static void StoreCatalogInheritance(Oid relationId, List *supers);
+static int findAttrByName(const char *attributeName, List *schema);
 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
 
 
@@ -63,9 +63,10 @@ DefineRelation(CreateStmt *stmt, char relkind)
    int         i;
    AttrNumber  attnum;
 
-   if (strlen(stmt->relname) >= NAMEDATALEN)
-       elog(ERROR, "the relation name %s is >= %d characters long",
-            stmt->relname, NAMEDATALEN);
+   /*
+    * Truncate relname to appropriate length (probably a waste of time,
+    * as parser should have done this already).
+    */
    StrNCpy(relname, stmt->relname, NAMEDATALEN);
 
    /*
@@ -80,7 +81,7 @@ DefineRelation(CreateStmt *stmt, char relkind)
        elog(ERROR, "DefineRelation: please inherit from a relation or define an attribute");
 
    /*
-    * create a relation descriptor from the relation schema and create
+    * Create a relation descriptor from the relation schema and create
     * the relation.  Note that in this stage only inherited (pre-cooked)
     * defaults and constraints will be included into the new relation.
     * (BuildDescForRelation takes care of the inherited defaults, but we
@@ -234,51 +235,11 @@ TruncateRelation(char *name)
    heap_truncate(name);
 }
 
-/*
- * complementary static functions for MergeAttributes().
- * Varattnos of pg_relcheck.rcbin should be rewritten when
- * subclasses inherit the constraints from the super class.
- * Note that these functions rewrite varattnos while walking
- * through a node tree.
- */
-static bool
-change_varattnos_walker(Node *node, const AttrNumber *newattno)
-{
-   if (node == NULL)
-       return false;
-   if (IsA(node, Var))
-   {
-       Var        *var = (Var *) node;
-
-       if (var->varlevelsup == 0 && var->varno == 1)
-       {
-
-           /*
-            * ??? the following may be a problem when the node is
-            * multiply referenced though stringToNode() doesn't create
-            * such a node currently.
-            */
-           Assert(newattno[var->varattno - 1] > 0);
-           var->varattno = newattno[var->varattno - 1];
-       }
-       return false;
-   }
-   return expression_tree_walker(node, change_varattnos_walker,
-                                 (void *) newattno);
-}
-
-static bool
-change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
-{
-   return change_varattnos_walker(node, newattno);
-}
-
-/*
+/*----------
  * MergeAttributes
- *     Returns new schema given initial schema and supers.
+ *     Returns new schema given initial schema and superclasses.
  *
  * Input arguments:
- *
  * 'schema' is the column/attribute definition for the table. (It's a list
  *     of ColumnDef's.) It is destructively changed.
  * 'supers' is a list of names (as Value objects) of parent relations.
@@ -286,7 +247,11 @@ change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
  *
  * Output arguments:
  * 'supOids' receives an integer list of the OIDs of the parent relations.
- * 'supconstr' receives a list of constraints belonging to the parents.
+ * 'supconstr' receives a list of constraints belonging to the parents,
+ *     updated as necessary to be valid for the child.
+ *
+ * Return value:
+ * Completed schema list.
  *
  * Notes:
  *   The order in which the attributes are inherited is very important.
@@ -301,14 +266,17 @@ change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
  *     create table student (gpa float8) inherits (person);
  *     create table stud_emp (percent int4) inherits (emp, student);
  *
- *   the order of the attributes of stud_emp is as follow:
- *
+ *   The order of the attributes of stud_emp is:
  *
  *                         person {1:name, 2:age, 3:location}
  *                         /    \
  *            {6:gpa}  student   emp {4:salary, 5:manager}
  *                         \    /
  *                        stud_emp {7:percent}
+ *
+ *    If the same attribute name appears multiple times, then it appears
+ *    in the result table in the proper location for its first appearance.
+ *----------
  */
 static List *
 MergeAttributes(List *schema, List *supers, bool istemp,
@@ -318,11 +286,14 @@ MergeAttributes(List *schema, List *supers, bool istemp,
    List       *inhSchema = NIL;
    List       *parentOids = NIL;
    List       *constraints = NIL;
-   int         attnums;
+   int         child_attno;
 
    /*
-    * Validates that there are no duplications. Validity checking of
-    * types occurs later.
+    * Check for duplicate names in the explicit list of attributes.
+    *
+    * Although we might consider merging such entries in the same way that
+    * we handle name conflicts for inherited attributes, it seems to make
+    * more sense to assume such conflicts are errors.
     */
    foreach(entry, schema)
    {
@@ -331,10 +302,6 @@ MergeAttributes(List *schema, List *supers, bool istemp,
 
        foreach(rest, lnext(entry))
        {
-
-           /*
-            * check for duplicated names within the new relation
-            */
            ColumnDef  *restdef = lfirst(rest);
 
            if (strcmp(coldef->colname, restdef->colname) == 0)
@@ -344,6 +311,9 @@ MergeAttributes(List *schema, List *supers, bool istemp,
            }
        }
    }
+   /*
+    * Reject duplicate names in the list of parents, too.
+    */
    foreach(entry, supers)
    {
        List       *rest;
@@ -359,23 +329,18 @@ MergeAttributes(List *schema, List *supers, bool istemp,
    }
 
    /*
-    * merge the inherited attributes into the schema
+    * Scan the parents left-to-right, and merge their attributes to form
+    * a list of inherited attributes (inhSchema).
     */
-   attnums = 0;
+   child_attno = 0;
    foreach(entry, supers)
    {
        char       *name = strVal(lfirst(entry));
        Relation    relation;
-       List       *partialResult = NIL;
-       AttrNumber  attrno;
        TupleDesc   tupleDesc;
        TupleConstr *constr;
-       AttrNumber *newattno,
-                  *partialAttidx;
-       Node       *expr;
-       int         i,
-                   attidx,
-                   attno_exist;
+       AttrNumber *newattno;
+       AttrNumber  parent_attno;
 
        relation = heap_openr(name, AccessShareLock);
 
@@ -394,33 +359,30 @@ MergeAttributes(List *schema, List *supers, bool istemp,
 
        parentOids = lappendi(parentOids, relation->rd_id);
        setRelhassubclassInRelation(relation->rd_id, true);
+
        tupleDesc = RelationGetDescr(relation);
-       /* allocate a new attribute number table and initialize */
-       newattno = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber));
-       for (i = 0; i < tupleDesc->natts; i++)
-           newattno[i] = 0;
+       constr = tupleDesc->constr;
 
        /*
-        * searching and storing order are different. another table is
-        * needed.
+        * newattno[] will contain the child-table attribute numbers for
+        * the attributes of this parent table.  (They are not the same
+        * for parents after the first one.)
         */
-       partialAttidx = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber));
-       for (i = 0; i < tupleDesc->natts; i++)
-           partialAttidx[i] = 0;
-       constr = tupleDesc->constr;
+       newattno = (AttrNumber *) palloc(tupleDesc->natts*sizeof(AttrNumber));
 
-       attidx = 0;
-       for (attrno = relation->rd_rel->relnatts - 1; attrno >= 0; attrno--)
+       for (parent_attno = 1; parent_attno <= tupleDesc->natts;
+            parent_attno++)
        {
-           Form_pg_attribute attribute = tupleDesc->attrs[attrno];
+           Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
            char       *attributeName;
            char       *attributeType;
            HeapTuple   tuple;
+           int         exist_attno;
            ColumnDef  *def;
            TypeName   *typename;
 
            /*
-            * form name, type and constraints
+            * Get name and type name of attribute
             */
            attributeName = NameStr(attribute->attname);
            tuple = SearchSysCache(TYPEOID,
@@ -433,65 +395,78 @@ MergeAttributes(List *schema, List *supers, bool istemp,
            ReleaseSysCache(tuple);
 
            /*
-            * check validity
-            *
+            * Does it conflict with some previously inherited column?
             */
-           if (checkAttrExists(attributeName, attributeType, schema) != 0)
-               elog(ERROR, "CREATE TABLE: attribute \"%s\" already exists in inherited schema",
+           exist_attno = findAttrByName(attributeName, inhSchema);
+           if (exist_attno > 0)
+           {
+               /*
+                * Yes, try to merge the two column definitions.
+                * They must have the same type and typmod.
+                */
+               elog(NOTICE, "CREATE TABLE: merging multiple inherited definitions of attribute \"%s\"",
                     attributeName);
-
-           if (0 < (attno_exist = checkAttrExists(attributeName, attributeType, inhSchema)))
+               def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
+               if (strcmp(def->typename->name, attributeType) != 0 ||
+                   def->typename->typmod != attribute->atttypmod)
+                   elog(ERROR, "CREATE TABLE: inherited attribute \"%s\" type conflict (%s and %s)",
+                        attributeName, def->typename->name, attributeType);
+               /* Merge of NOT NULL constraints = OR 'em together */
+               def->is_not_null |= attribute->attnotnull;
+               /* Default and other constraints are handled below */
+               newattno[parent_attno - 1] = exist_attno;
+           }
+           else
            {
-
                /*
-                * this entry already exists
+                * No, create a new inherited column
                 */
-               newattno[attribute->attnum - 1] = attno_exist;
-               continue;
+               def = makeNode(ColumnDef);
+               def->colname = pstrdup(attributeName);
+               typename = makeNode(TypeName);
+               typename->name = attributeType;
+               typename->typmod = attribute->atttypmod;
+               def->typename = typename;
+               def->is_not_null = attribute->attnotnull;
+               def->is_sequence = false;
+               def->raw_default = NULL;
+               def->cooked_default = NULL;
+               def->constraints = NIL;
+               inhSchema = lappend(inhSchema, def);
+               newattno[parent_attno - 1] = ++child_attno;
            }
-           attidx++;
-           partialAttidx[attribute->attnum - 1] = attidx;
-
            /*
-            * add an entry to the schema
+            * Copy default if any, overriding any default from earlier parent
             */
-           def = makeNode(ColumnDef);
-           typename = makeNode(TypeName);
-           def->colname = pstrdup(attributeName);
-           typename->name = pstrdup(attributeType);
-           typename->typmod = attribute->atttypmod;
-           def->typename = typename;
-           def->is_not_null = attribute->attnotnull;
-           def->raw_default = NULL;
-           def->cooked_default = NULL;
            if (attribute->atthasdef)
            {
                AttrDefault *attrdef;
                int         i;
 
-               Assert(constr != NULL);
+               def->raw_default = NULL;
+               def->cooked_default = NULL;
 
+               Assert(constr != NULL);
                attrdef = constr->defval;
                for (i = 0; i < constr->num_defval; i++)
                {
-                   if (attrdef[i].adnum == attrno + 1)
+                   if (attrdef[i].adnum == parent_attno)
                    {
+                       /*
+                        * if default expr could contain any vars, we'd
+                        * need to fix 'em, but it can't ...
+                        */
                        def->cooked_default = pstrdup(attrdef[i].adbin);
                        break;
                    }
                }
                Assert(def->cooked_default != NULL);
            }
-           partialResult = lcons(def, partialResult);
        }
-       for (i = 0; i < tupleDesc->natts; i++)
-       {
-           if (partialAttidx[i] > 0)
-               newattno[i] = attnums + attidx + 1 - partialAttidx[i];
-       }
-       attnums += attidx;
-       pfree(partialAttidx);
-
+       /*
+        * Now copy the constraints of this parent, adjusting attnos using
+        * the completed newattno[] map
+        */
        if (constr && constr->num_check > 0)
        {
            ConstrCheck *check = constr->check;
@@ -500,6 +475,7 @@ MergeAttributes(List *schema, List *supers, bool istemp,
            for (i = 0; i < constr->num_check; i++)
            {
                Constraint *cdef = makeNode(Constraint);
+               Node       *expr;
 
                cdef->contype = CONSTR_CHECK;
                if (check[i].ccname[0] == '$')
@@ -514,6 +490,7 @@ MergeAttributes(List *schema, List *supers, bool istemp,
                constraints = lappend(constraints, cdef);
            }
        }
+
        pfree(newattno);
 
        /*
@@ -522,24 +499,109 @@ MergeAttributes(List *schema, List *supers, bool istemp,
         * ALTERing the parent before the child is committed.
         */
        heap_close(relation, NoLock);
-
-       /*
-        * wants the inherited schema to appear in the order they are
-        * specified in CREATE TABLE
-        */
-       inhSchema = nconc(inhSchema, partialResult);
    }
 
    /*
-    * put the inherited schema before our the schema for this table
+    * If we had no inherited attributes, the result schema is just the
+    * explicitly declared columns.  Otherwise, we need to merge the
+    * declared columns into the inherited schema list.
     */
-   schema = nconc(inhSchema, schema);
+   if (inhSchema != NIL)
+   {
+       foreach(entry, schema)
+       {
+           ColumnDef  *newdef = lfirst(entry);
+           char       *attributeName = newdef->colname;
+           char       *attributeType = newdef->typename->name;
+           int         exist_attno;
+
+           /*
+            * Does it conflict with some previously inherited column?
+            */
+           exist_attno = findAttrByName(attributeName, inhSchema);
+           if (exist_attno > 0)
+           {
+               ColumnDef  *def;
+
+               /*
+                * Yes, try to merge the two column definitions.
+                * They must have the same type and typmod.
+                */
+               elog(NOTICE, "CREATE TABLE: merging attribute \"%s\" with inherited definition",
+                    attributeName);
+               def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
+               if (strcmp(def->typename->name, attributeType) != 0 ||
+                   def->typename->typmod != newdef->typename->typmod)
+                   elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
+                        attributeName, def->typename->name, attributeType);
+               /* Merge of NOT NULL constraints = OR 'em together */
+               def->is_not_null |= newdef->is_not_null;
+               /* If new def has a default, override previous default */
+               if (newdef->raw_default != NULL)
+               {
+                   def->raw_default = newdef->raw_default;
+                   def->cooked_default = newdef->cooked_default;
+               }
+           }
+           else
+           {
+               /*
+                * No, attach new column to result schema
+                */
+               inhSchema = lappend(inhSchema, newdef);
+           }
+       }
+
+       schema = inhSchema;
+   }
 
    *supOids = parentOids;
    *supconstr = constraints;
    return schema;
 }
 
+/*
+ * complementary static functions for MergeAttributes().
+ *
+ * Varattnos of pg_relcheck.rcbin must be rewritten when subclasses inherit
+ * constraints from parent classes, since the inherited attributes could
+ * be given different column numbers in multiple-inheritance cases.
+ *
+ * Note that the passed node tree is modified in place!
+ */
+static bool
+change_varattnos_walker(Node *node, const AttrNumber *newattno)
+{
+   if (node == NULL)
+       return false;
+   if (IsA(node, Var))
+   {
+       Var        *var = (Var *) node;
+
+       if (var->varlevelsup == 0 && var->varno == 1 &&
+           var->varattno > 0)
+       {
+
+           /*
+            * ??? the following may be a problem when the node is
+            * multiply referenced though stringToNode() doesn't create
+            * such a node currently.
+            */
+           Assert(newattno[var->varattno - 1] > 0);
+           var->varattno = newattno[var->varattno - 1];
+       }
+       return false;
+   }
+   return expression_tree_walker(node, change_varattnos_walker,
+                                 (void *) newattno);
+}
+
+static bool
+change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
+{
+   return change_varattnos_walker(node, newattno);
+}
+
 /*
  * StoreCatalogInheritance
  *     Updates the system catalogs with proper inheritance information.
@@ -716,15 +778,14 @@ again:
    heap_close(relation, RowExclusiveLock);
 }
 
-
-
 /*
- * returns the index (starting with 1) if attribute already exists in schema,
+ * Look for an existing schema entry with the given name.
+ *
+ * Returns the index (starting with 1) if attribute already exists in schema,
  * 0 if it doesn't.
  */
 static int
-checkAttrExists(const char *attributeName, const char *attributeType,
-               List *schema)
+findAttrByName(const char *attributeName, List *schema)
 {
    List       *s;
    int         i = 0;
@@ -735,21 +796,14 @@ checkAttrExists(const char *attributeName, const char *attributeType,
 
        ++i;
        if (strcmp(attributeName, def->colname) == 0)
-       {
-
-           /*
-            * attribute exists. Make sure the types are the same.
-            */
-           if (strcmp(attributeType, def->typename->name) != 0)
-               elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
-                    attributeName, attributeType, def->typename->name);
            return i;
-       }
    }
    return 0;
 }
 
-
+/*
+ * Update a relation's pg_class.relhassubclass entry to the given value
+ */
 static void
 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
 {
index 6a3fc96ff136add146089df96f229d40c79057b7..a0ddd5ef609a3367d567673255ea75877381b1e7 100644 (file)
@@ -81,6 +81,9 @@ CREATE TABLE student (
 CREATE TABLE stud_emp (
    percent     int4
 ) INHERITS (emp, student);
+NOTICE:  CREATE TABLE: merging multiple inherited definitions of attribute "name"
+NOTICE:  CREATE TABLE: merging multiple inherited definitions of attribute "age"
+NOTICE:  CREATE TABLE: merging multiple inherited definitions of attribute "location"
 CREATE TABLE city (
    name        name,
    location    box,
@@ -132,6 +135,8 @@ CREATE TABLE c_star (
 CREATE TABLE d_star (
    d           float8
 ) INHERITS (b_star, c_star);
+NOTICE:  CREATE TABLE: merging multiple inherited definitions of attribute "class"
+NOTICE:  CREATE TABLE: merging multiple inherited definitions of attribute "a"
 CREATE TABLE e_star (
    e           int2
 ) INHERITS (c_star);
index 3bc4199e1183c6df85fb0a9586f85e119ff987fe..b56798124f3bf90bba895a227dd211edd9aba51b 100644 (file)
@@ -5,6 +5,8 @@ CREATE TABLE a (aa TEXT);
 CREATE TABLE b (bb TEXT) INHERITS (a);
 CREATE TABLE c (cc TEXT) INHERITS (a);
 CREATE TABLE d (dd TEXT) INHERITS (b,c,a);
+NOTICE:  CREATE TABLE: merging multiple inherited definitions of attribute "aa"
+NOTICE:  CREATE TABLE: merging multiple inherited definitions of attribute "aa"
 INSERT INTO a(aa) VALUES('aaa');
 INSERT INTO a(aa) VALUES('aaaa');
 INSERT INTO a(aa) VALUES('aaaaa');