Implement CREATE TABLE LIKE ... INCLUDING INDEXES. Patch from NikhilS,
authorNeil Conway
Tue, 17 Jul 2007 05:02:03 +0000 (05:02 +0000)
committerNeil Conway
Tue, 17 Jul 2007 05:02:03 +0000 (05:02 +0000)
based in part on an earlier patch from Trevor Hardcastle, and reviewed
by myself.

15 files changed:
doc/src/sgml/ref/create_table.sgml
src/backend/bootstrap/bootparse.y
src/backend/commands/indexcmds.c
src/backend/commands/tablecmds.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/outfuncs.c
src/backend/parser/parse_utilcmd.c
src/backend/tcop/utility.c
src/backend/utils/adt/ruleutils.c
src/include/commands/defrem.h
src/include/nodes/parsenodes.h
src/include/utils/builtins.h
src/test/regress/expected/inherit.out
src/test/regress/sql/inherit.sql

index 064769cee0a8d96ff9c277774418a66173c1a1db..68e8f045e6aaf454f9829dbec75011c34e31f3dd 100644 (file)
@@ -1,5 +1,5 @@
 
 
@@ -23,7 +23,7 @@ PostgreSQL documentation
 CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } ] TABLE table_name ( [
   { column_name data_type [ DEFAULT default_expr ] [ column_constraint [ ... ] ]
     | table_constraint
-    | LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS } ] ... }
+    | LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS | INDEXES } ] ... }
     [, ... ]
 ] )
 [ INHERITS ( parent_table [, ... ] ) ]
@@ -237,7 +237,7 @@ and table_constraint is:
    
 
    
-    LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS } ]
+    LIKE parent_table [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS | INDEXES } ]
     
      
       The LIKE clause specifies a table from which
@@ -265,11 +265,16 @@ and table_constraint is:
       column constraints and table constraints — when constraints are
       requested, all check constraints are copied.
      
+     
+      Any indexes on the original table will not be created on the new
+      table, unless the INCLUDING INDEXES clause is
+      specified.
+     
      
       Note also that unlike INHERITS, copied columns and
       constraints are not merged with similarly named columns and constraints.
       If the same name is specified explicitly or in another
-      LIKE clause an error is signalled.
+      LIKE clause, an error is signalled.
      
     
    
index ff2f7f70c3cd55ca97a6f4db4d005c52517dd589..3fd29b7e97168894adf42eab40ba47fc28e37242 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/bootstrap/bootparse.y,v 1.88 2007/03/13 00:33:39 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/bootstrap/bootparse.y,v 1.89 2007/07/17 05:02:00 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -252,7 +252,7 @@ Boot_DeclareIndexStmt:
                                LexIDStr($8),
                                NULL,
                                $10,
-                               NULL, NIL,
+                               NULL, NIL, NULL,
                                false, false, false,
                                false, false, true, false, false);
                    do_end();
@@ -270,7 +270,7 @@ Boot_DeclareUniqueIndexStmt:
                                LexIDStr($9),
                                NULL,
                                $11,
-                               NULL, NIL,
+                               NULL, NIL, NULL,
                                true, false, false,
                                false, false, true, false, false);
                    do_end();
index 98dad7371334ba93ca74ea51eb60846155108c7a..943662f8f8b2665a454bcd30d8fe3923657c50ef 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.160 2007/06/23 22:12:50 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.161 2007/07/17 05:02:00 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -79,6 +79,8 @@ static bool relationHasPrimaryKey(Relation rel);
  *     to index on.
  * 'predicate': the partial-index condition, or NULL if none.
  * 'options': reloptions from WITH (in list-of-DefElem form).
+ * 'src_options': reloptions from the source index, if this is a cloned
+ *     index produced by CREATE TABLE LIKE ... INCLUDING INDEXES
  * 'unique': make the index enforce uniqueness.
  * 'primary': mark the index as a primary key in the catalogs.
  * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint,
@@ -100,6 +102,7 @@ DefineIndex(RangeVar *heapRelation,
            List *attributeList,
            Expr *predicate,
            List *options,
+           char *src_options,
            bool unique,
            bool primary,
            bool isconstraint,
@@ -392,9 +395,17 @@ DefineIndex(RangeVar *heapRelation,
    }
 
    /*
-    * Parse AM-specific options, convert to text array form, validate.
+    * Parse AM-specific options, convert to text array form,
+    * validate.  The src_options introduced due to using indexes
+    * via the "CREATE LIKE INCLUDING INDEXES" statement also need to
+    * be merged here
     */
-   reloptions = transformRelOptions((Datum) 0, options, false, false);
+   if (src_options)
+       reloptions = unflatten_reloptions(src_options);
+   else
+       reloptions = (Datum) 0;
+
+   reloptions = transformRelOptions(reloptions, options, false, false);
 
    (void) index_reloptions(amoptions, reloptions, true);
 
index 4bc2a25fcddde07c275e10c50f4cddcd7097c543..07e566204280c75f53c5526108faf3e2b864b634 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.229 2007/07/03 01:30:36 neilc Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.230 2007/07/17 05:02:00 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -3794,6 +3794,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
                stmt->indexParams,      /* parameters */
                (Expr *) stmt->whereClause,
                stmt->options,
+               stmt->src_options,
                stmt->unique,
                stmt->primary,
                stmt->isconstraint,
index ec3d6168897a817786ccad51e7e8b068739f3371..3bc6afe1df888b347399db7bd29067ad751c6ec0 100644 (file)
@@ -15,7 +15,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.379 2007/06/11 22:22:40 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.380 2007/07/17 05:02:01 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -2192,6 +2192,7 @@ _copyIndexStmt(IndexStmt *from)
    COPY_STRING_FIELD(tableSpace);
    COPY_NODE_FIELD(indexParams);
    COPY_NODE_FIELD(options);
+   COPY_STRING_FIELD(src_options);
    COPY_NODE_FIELD(whereClause);
    COPY_SCALAR_FIELD(unique);
    COPY_SCALAR_FIELD(primary);
index 114550f17da6f79959e2daa9c756f5b30e7cd122..317a5a29959d3eae7fadd8a059ad4f733f38f95f 100644 (file)
@@ -18,7 +18,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.310 2007/06/11 22:22:40 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.311 2007/07/17 05:02:01 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1044,6 +1044,7 @@ _equalIndexStmt(IndexStmt *a, IndexStmt *b)
    COMPARE_STRING_FIELD(tableSpace);
    COMPARE_NODE_FIELD(indexParams);
    COMPARE_NODE_FIELD(options);
+   COMPARE_STRING_FIELD(src_options);
    COMPARE_NODE_FIELD(whereClause);
    COMPARE_SCALAR_FIELD(unique);
    COMPARE_SCALAR_FIELD(primary);
index d5d81eaae598d073460bb51eaccaae6498b54aef..2d2b229c9e8f78dd0c9ea7fbc49ba00df47f4128 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.312 2007/07/17 01:21:43 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.313 2007/07/17 05:02:01 neilc Exp $
  *
  * NOTES
  *   Every node type that can appear in stored rules' parsetrees *must*
@@ -1541,6 +1541,7 @@ _outIndexStmt(StringInfo str, IndexStmt *node)
    WRITE_STRING_FIELD(tableSpace);
    WRITE_NODE_FIELD(indexParams);
    WRITE_NODE_FIELD(options);
+   WRITE_STRING_FIELD(src_options);
    WRITE_NODE_FIELD(whereClause);
    WRITE_BOOL_FIELD(unique);
    WRITE_BOOL_FIELD(primary);
index 378222519436868e01a079d4b77b401be090daab..f17ad4802126daa3d84a738a22cc2ba2a3e4712e 100644 (file)
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.1 2007/06/23 22:12:51 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.2 2007/07/17 05:02:02 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/heapam.h"
 #include "catalog/heap.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_opclass.h"
 #include "catalog/pg_type.h"
 #include "commands/defrem.h"
 #include "commands/tablecmds.h"
+#include "commands/tablespace.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "optimizer/clauses.h"
@@ -47,6 +50,7 @@
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
+#include "utils/relcache.h"
 #include "utils/syscache.h"
 
 
@@ -63,6 +67,7 @@ typedef struct
    List       *ckconstraints;  /* CHECK constraints */
    List       *fkconstraints;  /* FOREIGN KEY constraints */
    List       *ixconstraints;  /* index-creating constraints */
+   List       *inh_indexes;    /* cloned indexes from INCLUDING INDEXES */
    List       *blist;          /* "before list" of things to do before
                                 * creating the table */
    List       *alist;          /* "after list" of things to do after creating
@@ -93,8 +98,13 @@ static void transformTableConstraint(ParseState *pstate,
                         Constraint *constraint);
 static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
                     InhRelation *inhrelation);
+static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, 
+                           Relation parent_index, AttrNumber *attmap);
+static List *get_opclass(Oid opclass, Oid actual_datatype);
 static void transformIndexConstraints(ParseState *pstate,
                          CreateStmtContext *cxt);
+static IndexStmt *transformIndexConstraint(Constraint *constraint,
+                                          CreateStmtContext *cxt);
 static void transformFKConstraints(ParseState *pstate,
                       CreateStmtContext *cxt,
                       bool skipValidation,
@@ -146,6 +156,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
    cxt.ckconstraints = NIL;
    cxt.fkconstraints = NIL;
    cxt.ixconstraints = NIL;
+   cxt.inh_indexes = NIL;
    cxt.blist = NIL;
    cxt.alist = NIL;
    cxt.pkey = NULL;
@@ -555,11 +566,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
        }
    }
 
-   if (including_indexes)
-       ereport(ERROR,
-               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                errmsg("LIKE INCLUDING INDEXES is not implemented")));
-
    /*
     * Insert the copied attributes into the cxt for the new table
     * definition.
@@ -657,6 +663,35 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
        }
    }
 
+   if (including_indexes && relation->rd_rel->relhasindex)
+   {
+       AttrNumber *attmap;
+       List       *parent_indexes;
+       ListCell   *l;
+
+       attmap = varattnos_map_schema(tupleDesc, cxt->columns);
+       parent_indexes = RelationGetIndexList(relation);
+
+       foreach(l, parent_indexes)
+       {
+           Oid          parent_index_oid = lfirst_oid(l);
+           Relation     parent_index;
+           IndexStmt   *index_stmt;
+
+           parent_index = index_open(parent_index_oid, AccessShareLock);
+
+           /* Build CREATE INDEX statement to recreate the parent_index */
+           index_stmt = generateClonedIndexStmt(cxt, parent_index,
+                                                attmap);
+
+           /* Add the new IndexStmt to the create context */
+           cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
+
+           /* Keep our lock on the index till xact commit */
+           index_close(parent_index, NoLock);
+       }
+   }
+
    /*
     * Close the parent rel, but keep our AccessShareLock on it until xact
     * commit.  That will prevent someone else from deleting or ALTERing the
@@ -666,188 +701,254 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
 }
 
 /*
- * transformIndexConstraints
- *     Handle UNIQUE and PRIMARY KEY constraints, which create indexes
+ * Generate an IndexStmt entry using information from an already
+ * existing index "source_idx".
+ *
+ * Note: Much of this functionality is cribbed from pg_get_indexdef.
  */
-static void
-transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
+static IndexStmt *
+generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
+                       AttrNumber *attmap)
 {
-   IndexStmt  *index;
-   List       *indexlist = NIL;
-   ListCell   *listptr;
-   ListCell   *l;
+   HeapTuple            ht_idx;
+   HeapTuple            ht_idxrel;
+   HeapTuple            ht_am;
+   Form_pg_index        idxrec;
+   Form_pg_class        idxrelrec;
+   Form_pg_am           amrec;
+   List                *indexprs = NIL;
+   ListCell            *indexpr_item;
+   Oid                  indrelid;
+   Oid                  source_relid;
+   int                  keyno;
+   Oid                  keycoltype;
+   Datum                indclassDatum;
+   Datum                indoptionDatum;
+   bool                 isnull;
+   oidvector           *indclass;
+   int2vector          *indoption;
+   IndexStmt           *index;
+   Datum                reloptions;
+
+   source_relid = RelationGetRelid(source_idx);
+
+   /* Fetch pg_index tuple for source index */
+   ht_idx = SearchSysCache(INDEXRELID,
+                           ObjectIdGetDatum(source_relid),
+                           0, 0, 0);
+   if (!HeapTupleIsValid(ht_idx))
+       elog(ERROR, "cache lookup failed for index %u", source_relid);
+   idxrec = (Form_pg_index) GETSTRUCT(ht_idx);
+
+   Assert(source_relid == idxrec->indexrelid);
+   indrelid = idxrec->indrelid;
+
+   index = makeNode(IndexStmt);
+   index->unique = idxrec->indisunique;
+   index->concurrent = false;
+   index->primary = idxrec->indisprimary;
+   index->relation = cxt->relation;
+   index->isconstraint = false;
 
    /*
-    * Run through the constraints that need to generate an index. For PRIMARY
-    * KEY, mark each column as NOT NULL and create an index. For UNIQUE,
-    * create an index as for PRIMARY KEY, but do not insist on NOT NULL.
+    * We don't try to preserve the name of the source index; instead, just
+    * let DefineIndex() choose a reasonable name.
+    */
+   index->idxname = NULL;
+
+   /* Must get indclass and indoption the hard way */
+   indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                   Anum_pg_index_indclass, &isnull);
+   Assert(!isnull);
+   indclass = (oidvector *) DatumGetPointer(indclassDatum);
+   indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                    Anum_pg_index_indoption, &isnull);
+   Assert(!isnull);
+   indoption = (int2vector *) DatumGetPointer(indoptionDatum);
+
+   /* Fetch pg_class tuple of source index */
+   ht_idxrel = SearchSysCache(RELOID,
+                              ObjectIdGetDatum(source_relid),
+                              0, 0, 0);
+   if (!HeapTupleIsValid(ht_idxrel))
+       elog(ERROR, "cache lookup failed for relation %u", source_relid);
+
+   /*
+    * Store the reloptions for later use by this new index
     */
-   foreach(listptr, cxt->ixconstraints)
+   reloptions = SysCacheGetAttr(RELOID, ht_idxrel,
+                                Anum_pg_class_reloptions, &isnull);
+   if (!isnull)
+       index->src_options = flatten_reloptions(source_relid);
+
+   idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
+
+   /* Fetch pg_am tuple for the index's access method */
+   ht_am = SearchSysCache(AMOID,
+                          ObjectIdGetDatum(idxrelrec->relam),
+                          0, 0, 0);
+   if (!HeapTupleIsValid(ht_am))
+       elog(ERROR, "cache lookup failed for access method %u",
+            idxrelrec->relam);
+   amrec = (Form_pg_am) GETSTRUCT(ht_am);
+   index->accessMethod = pstrdup(NameStr(amrec->amname));
+
+   /* Get the index expressions, if any */
+   if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs))
    {
-       Constraint *constraint = lfirst(listptr);
-       ListCell   *keys;
-       IndexElem  *iparam;
+       Datum       exprsDatum;
+       bool        isnull;
+       char       *exprsString;
+
+       exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                    Anum_pg_index_indexprs, &isnull);
+       exprsString = DatumGetCString(DirectFunctionCall1(textout,
+                                                         exprsDatum));
+       Assert(!isnull);
+       indexprs = (List *) stringToNode(exprsString);
+   }
 
-       Assert(IsA(constraint, Constraint));
-       Assert((constraint->contype == CONSTR_PRIMARY)
-              || (constraint->contype == CONSTR_UNIQUE));
+   indexpr_item = list_head(indexprs);
 
-       index = makeNode(IndexStmt);
+   for (keyno = 0; keyno < idxrec->indnatts; keyno++)
+   {
+       IndexElem   *iparam;
+       AttrNumber  attnum = idxrec->indkey.values[keyno];
+       int16       opt = indoption->values[keyno];
 
-       index->unique = true;
-       index->primary = (constraint->contype == CONSTR_PRIMARY);
-       if (index->primary)
+       iparam = makeNode(IndexElem);
+
+       if (AttributeNumberIsValid(attnum))
        {
-           if (cxt->pkey != NULL)
-               ereport(ERROR,
-                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                        errmsg("multiple primary keys for table \"%s\" are not allowed",
-                               cxt->relation->relname)));
-           cxt->pkey = index;
+           /* Simple index column */
+           char       *attname;
 
-           /*
-            * In ALTER TABLE case, a primary index might already exist, but
-            * DefineIndex will check for it.
-            */
-       }
-       index->isconstraint = true;
+           attname = get_relid_attribute_name(indrelid, attnum);
+           keycoltype = get_atttype(indrelid, attnum);
 
-       if (constraint->name != NULL)
-           index->idxname = pstrdup(constraint->name);
+           iparam->name = attname;
+           iparam->expr = NULL;
+       }
        else
-           index->idxname = NULL;      /* DefineIndex will choose name */
+       {
+           /* Expressional index */
+           Node       *indexkey;
+
+           if (indexpr_item == NULL)
+               elog(ERROR, "too few entries in indexprs list");
+           indexkey = (Node *) lfirst(indexpr_item);
+           change_varattnos_of_a_node(indexkey, attmap);
+           iparam->name = NULL;
+           iparam->expr = indexkey;
+
+           indexpr_item = lnext(indexpr_item);
+           keycoltype = exprType(indexkey);
+       }
 
-       index->relation = cxt->relation;
-       index->accessMethod = DEFAULT_INDEX_TYPE;
-       index->options = constraint->options;
-       index->tableSpace = constraint->indexspace;
-       index->indexParams = NIL;
-       index->whereClause = NULL;
-       index->concurrent = false;
+       /* Add the operator class name, if non-default */
+       iparam->opclass = get_opclass(indclass->values[keyno], keycoltype);
 
-       /*
-        * Make sure referenced keys exist.  If we are making a PRIMARY KEY
-        * index, also make sure they are NOT NULL, if possible. (Although we
-        * could leave it to DefineIndex to mark the columns NOT NULL, it's
-        * more efficient to get it right the first time.)
-        */
-       foreach(keys, constraint->keys)
+       iparam->ordering = SORTBY_DEFAULT;
+       iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
+
+       /* Adjust options if necessary */
+       if (amrec->amcanorder)
        {
-           char       *key = strVal(lfirst(keys));
-           bool        found = false;
-           ColumnDef  *column = NULL;
-           ListCell   *columns;
+           /* If it supports sort ordering, report DESC and NULLS opts */
+           if (opt & INDOPTION_DESC)
+               iparam->ordering = SORTBY_DESC;
+           if (opt & INDOPTION_NULLS_FIRST)
+               iparam->nulls_ordering = SORTBY_NULLS_FIRST;
+       }
 
-           foreach(columns, cxt->columns)
-           {
-               column = (ColumnDef *) lfirst(columns);
-               Assert(IsA(column, ColumnDef));
-               if (strcmp(column->colname, key) == 0)
-               {
-                   found = true;
-                   break;
-               }
-           }
-           if (found)
-           {
-               /* found column in the new table; force it to be NOT NULL */
-               if (constraint->contype == CONSTR_PRIMARY)
-                   column->is_not_null = TRUE;
-           }
-           else if (SystemAttributeByName(key, cxt->hasoids) != NULL)
-           {
-               /*
-                * column will be a system column in the new table, so accept
-                * it.  System columns can't ever be null, so no need to worry
-                * about PRIMARY/NOT NULL constraint.
-                */
-               found = true;
-           }
-           else if (cxt->inhRelations)
-           {
-               /* try inherited tables */
-               ListCell   *inher;
+       index->indexParams = lappend(index->indexParams, iparam);
+   }
 
-               foreach(inher, cxt->inhRelations)
-               {
-                   RangeVar   *inh = (RangeVar *) lfirst(inher);
-                   Relation    rel;
-                   int         count;
+   /* Use the same tablespace as the source index */
+   index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode);
 
-                   Assert(IsA(inh, RangeVar));
-                   rel = heap_openrv(inh, AccessShareLock);
-                   if (rel->rd_rel->relkind != RELKIND_RELATION)
-                       ereport(ERROR,
-                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                          errmsg("inherited relation \"%s\" is not a table",
-                                 inh->relname)));
-                   for (count = 0; count < rel->rd_att->natts; count++)
-                   {
-                       Form_pg_attribute inhattr = rel->rd_att->attrs[count];
-                       char       *inhname = NameStr(inhattr->attname);
-
-                       if (inhattr->attisdropped)
-                           continue;
-                       if (strcmp(key, inhname) == 0)
-                       {
-                           found = true;
-
-                           /*
-                            * We currently have no easy way to force an
-                            * inherited column to be NOT NULL at creation, if
-                            * its parent wasn't so already. We leave it to
-                            * DefineIndex to fix things up in this case.
-                            */
-                           break;
-                       }
-                   }
-                   heap_close(rel, NoLock);
-                   if (found)
-                       break;
-               }
-           }
+   /* If it's a partial index, decompile and append the predicate */
+   if (!heap_attisnull(ht_idx, Anum_pg_index_indpred))
+   {
+       Datum       pred_datum;
+       bool        isnull;
+       char       *pred_str;
+
+       /* Convert text string to node tree */
+       pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                    Anum_pg_index_indpred, &isnull);
+       Assert(!isnull);
+       pred_str = DatumGetCString(DirectFunctionCall1(textout,
+                                                      pred_datum));
+       index->whereClause = (Node *) stringToNode(pred_str);
+       change_varattnos_of_a_node(index->whereClause, attmap);
+   }
 
-           /*
-            * In the ALTER TABLE case, don't complain about index keys not
-            * created in the command; they may well exist already.
-            * DefineIndex will complain about them if not, and will also take
-            * care of marking them NOT NULL.
-            */
-           if (!found && !cxt->isalter)
-               ereport(ERROR,
-                       (errcode(ERRCODE_UNDEFINED_COLUMN),
-                        errmsg("column \"%s\" named in key does not exist",
-                               key)));
+   /* Clean up */
+   ReleaseSysCache(ht_idx);
+   ReleaseSysCache(ht_idxrel);
+   ReleaseSysCache(ht_am);
 
-           /* Check for PRIMARY KEY(foo, foo) */
-           foreach(columns, index->indexParams)
-           {
-               iparam = (IndexElem *) lfirst(columns);
-               if (iparam->name && strcmp(key, iparam->name) == 0)
-               {
-                   if (index->primary)
-                       ereport(ERROR,
-                               (errcode(ERRCODE_DUPLICATE_COLUMN),
-                                errmsg("column \"%s\" appears twice in primary key constraint",
-                                       key)));
-                   else
-                       ereport(ERROR,
-                               (errcode(ERRCODE_DUPLICATE_COLUMN),
-                                errmsg("column \"%s\" appears twice in unique constraint",
-                                       key)));
-               }
-           }
+   return index;
+}
 
-           /* OK, add it to the index definition */
-           iparam = makeNode(IndexElem);
-           iparam->name = pstrdup(key);
-           iparam->expr = NULL;
-           iparam->opclass = NIL;
-           iparam->ordering = SORTBY_DEFAULT;
-           iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
-           index->indexParams = lappend(index->indexParams, iparam);
-       }
+/*
+ * get_opclass         - fetch name of an index operator class
+ *
+ * If the opclass is the default for the given actual_datatype, then
+ * the return value is NIL.
+ */
+static List *
+get_opclass(Oid opclass, Oid actual_datatype)
+{
+   HeapTuple            ht_opc;
+   Form_pg_opclass      opc_rec;
+   List                *result = NIL;
+
+   ht_opc = SearchSysCache(CLAOID,
+                           ObjectIdGetDatum(opclass),
+                           0, 0, 0);
+   if (!HeapTupleIsValid(ht_opc))
+       elog(ERROR, "cache lookup failed for opclass %u", opclass);
+   opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc);
+
+   if (!OidIsValid(actual_datatype) ||
+       GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass)
+   {
+       char *nsp_name = get_namespace_name(opc_rec->opcnamespace);
+       char *opc_name = NameStr(opc_rec->opcname);
+
+       result = list_make2(makeString(nsp_name), makeString(opc_name));
+   }
 
+   ReleaseSysCache(ht_opc);
+   return result;
+}
+
+
+/*
+ * transformIndexConstraints
+ *     Handle UNIQUE and PRIMARY KEY constraints, which create
+ *     indexes. We also merge index definitions arising from
+ *     LIKE ... INCLUDING INDEXES.
+ */
+static void
+transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
+{
+   IndexStmt  *index;
+   List       *indexlist = NIL;
+   ListCell   *lc;
+
+   /*
+    * Run through the constraints that need to generate an index. For PRIMARY
+    * KEY, mark each column as NOT NULL and create an index. For UNIQUE,
+    * create an index as for PRIMARY KEY, but do not insist on NOT NULL.
+    */
+   foreach(lc, cxt->ixconstraints)
+   {
+       Constraint *constraint = (Constraint *) lfirst(lc);
+
+       index = transformIndexConstraint(constraint, cxt);
        indexlist = lappend(indexlist, index);
    }
 
@@ -867,12 +968,12 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
        cxt->alist = list_make1(cxt->pkey);
    }
 
-   foreach(l, indexlist)
+   foreach(lc, indexlist)
    {
        bool        keep = true;
        ListCell   *k;
 
-       index = lfirst(l);
+       index = lfirst(lc);
 
        /* if it's pkey, it's already in cxt->alist */
        if (index == cxt->pkey)
@@ -900,6 +1001,194 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
        if (keep)
            cxt->alist = lappend(cxt->alist, index);
    }
+
+   /* Copy indexes defined by LIKE ... INCLUDING INDEXES */
+   foreach(lc, cxt->inh_indexes)
+   {
+       index = (IndexStmt *) lfirst(lc);
+
+       if (index->primary)
+       {
+           if (cxt->pkey)
+               ereport(ERROR,
+                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                        errmsg("multiple primary keys for table \"%s\" are not allowed",
+                               cxt->relation->relname)));
+
+           cxt->pkey = index;
+       }
+
+       cxt->alist = lappend(cxt->alist, index);
+   }
+}
+
+static IndexStmt *
+transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
+{
+   IndexStmt   *index;
+   ListCell    *keys;
+   IndexElem   *iparam;
+
+   Assert(constraint->contype == CONSTR_PRIMARY ||
+          constraint->contype == CONSTR_UNIQUE);
+
+   index = makeNode(IndexStmt);
+   index->unique = true;
+   index->primary = (constraint->contype == CONSTR_PRIMARY);
+
+   if (index->primary)
+   {
+       if (cxt->pkey != NULL)
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                    errmsg("multiple primary keys for table \"%s\" are not allowed",
+                           cxt->relation->relname)));
+       cxt->pkey = index;
+
+       /*
+        * In ALTER TABLE case, a primary index might already exist, but
+        * DefineIndex will check for it.
+        */
+   }
+   index->isconstraint = true;
+
+   if (constraint->name != NULL)
+       index->idxname = pstrdup(constraint->name);
+   else
+       index->idxname = NULL;      /* DefineIndex will choose name */
+
+   index->relation = cxt->relation;
+   index->accessMethod = DEFAULT_INDEX_TYPE;
+   index->options = constraint->options;
+   index->tableSpace = constraint->indexspace;
+   index->indexParams = NIL;
+   index->whereClause = NULL;
+   index->concurrent = false;
+
+   /*
+    * Make sure referenced keys exist.  If we are making a PRIMARY KEY
+    * index, also make sure they are NOT NULL, if possible. (Although we
+    * could leave it to DefineIndex to mark the columns NOT NULL, it's
+    * more efficient to get it right the first time.)
+    */
+   foreach(keys, constraint->keys)
+   {
+       char       *key = strVal(lfirst(keys));
+       bool        found = false;
+       ColumnDef  *column = NULL;
+       ListCell   *columns;
+
+       foreach(columns, cxt->columns)
+       {
+           column = (ColumnDef *) lfirst(columns);
+           Assert(IsA(column, ColumnDef));
+           if (strcmp(column->colname, key) == 0)
+           {
+               found = true;
+               break;
+           }
+       }
+       if (found)
+       {
+           /* found column in the new table; force it to be NOT NULL */
+           if (constraint->contype == CONSTR_PRIMARY)
+               column->is_not_null = TRUE;
+       }
+       else if (SystemAttributeByName(key, cxt->hasoids) != NULL)
+       {
+           /*
+            * column will be a system column in the new table, so accept
+            * it.  System columns can't ever be null, so no need to worry
+            * about PRIMARY/NOT NULL constraint.
+            */
+           found = true;
+       }
+       else if (cxt->inhRelations)
+       {
+           /* try inherited tables */
+           ListCell   *inher;
+
+           foreach(inher, cxt->inhRelations)
+           {
+               RangeVar   *inh = (RangeVar *) lfirst(inher);
+               Relation    rel;
+               int         count;
+
+               Assert(IsA(inh, RangeVar));
+               rel = heap_openrv(inh, AccessShareLock);
+               if (rel->rd_rel->relkind != RELKIND_RELATION)
+                   ereport(ERROR,
+                           (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                            errmsg("inherited relation \"%s\" is not a table",
+                                   inh->relname)));
+               for (count = 0; count < rel->rd_att->natts; count++)
+               {
+                   Form_pg_attribute inhattr = rel->rd_att->attrs[count];
+                   char       *inhname = NameStr(inhattr->attname);
+
+                   if (inhattr->attisdropped)
+                       continue;
+                   if (strcmp(key, inhname) == 0)
+                   {
+                       found = true;
+
+                       /*
+                        * We currently have no easy way to force an
+                        * inherited column to be NOT NULL at creation, if
+                        * its parent wasn't so already. We leave it to
+                        * DefineIndex to fix things up in this case.
+                        */
+                       break;
+                   }
+               }
+               heap_close(rel, NoLock);
+               if (found)
+                   break;
+           }
+       }
+
+       /*
+        * In the ALTER TABLE case, don't complain about index keys not
+        * created in the command; they may well exist already.
+        * DefineIndex will complain about them if not, and will also take
+        * care of marking them NOT NULL.
+        */
+       if (!found && !cxt->isalter)
+           ereport(ERROR,
+                   (errcode(ERRCODE_UNDEFINED_COLUMN),
+                    errmsg("column \"%s\" named in key does not exist",
+                           key)));
+
+       /* Check for PRIMARY KEY(foo, foo) */
+       foreach(columns, index->indexParams)
+       {
+           iparam = (IndexElem *) lfirst(columns);
+           if (iparam->name && strcmp(key, iparam->name) == 0)
+           {
+               if (index->primary)
+                   ereport(ERROR,
+                           (errcode(ERRCODE_DUPLICATE_COLUMN),
+                            errmsg("column \"%s\" appears twice in primary key constraint",
+                                   key)));
+               else
+                   ereport(ERROR,
+                           (errcode(ERRCODE_DUPLICATE_COLUMN),
+                            errmsg("column \"%s\" appears twice in unique constraint",
+                                   key)));
+           }
+       }
+
+       /* OK, add it to the index definition */
+       iparam = makeNode(IndexElem);
+       iparam->name = pstrdup(key);
+       iparam->expr = NULL;
+       iparam->opclass = NIL;
+       iparam->ordering = SORTBY_DEFAULT;
+       iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
+       index->indexParams = lappend(index->indexParams, iparam);
+   }
+
+   return index;
 }
 
 /*
@@ -1376,6 +1665,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
    cxt.ckconstraints = NIL;
    cxt.fkconstraints = NIL;
    cxt.ixconstraints = NIL;
+   cxt.inh_indexes = NIL;
    cxt.blist = NIL;
    cxt.alist = NIL;
    cxt.pkey = NULL;
index ec9aa9d26374514b8837896396090c0a83674b2a..77e40674df915ab308554cf8765e6b9cfbc59866 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.283 2007/07/03 01:30:37 neilc Exp $
+ *   $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.284 2007/07/17 05:02:02 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -886,6 +886,7 @@ ProcessUtility(Node *parsetree,
                            stmt->indexParams,  /* parameters */
                            (Expr *) stmt->whereClause,
                            stmt->options,
+                           stmt->src_options,
                            stmt->unique,
                            stmt->primary,
                            stmt->isconstraint,
index df5dbec6078587d77ff249eb2a3843a7585a5b07..4870209d46bfd57fe6c6a36186a35959be456d17 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.262 2007/06/18 21:40:58 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.263 2007/07/17 05:02:02 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -193,7 +193,6 @@ static char *generate_relation_name(Oid relid);
 static char *generate_function_name(Oid funcid, int nargs, Oid *argtypes);
 static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2);
 static text *string_to_text(char *str);
-static char *flatten_reloptions(Oid relid);
 
 #define only_marker(rte)  ((rte)->inh ? "" : "ONLY ")
 
@@ -763,8 +762,7 @@ pg_get_indexdef_worker(Oid indexrelid, int colno, int prettyFlags)
 
        /* Add the operator class name */
        if (!colno)
-           get_opclass_name(indclass->values[keyno], keycoltype,
-                            &buf);
+           get_opclass_name(indclass->values[keyno], keycoltype, &buf);
 
        /* Add options if relevant */
        if (amrec->amcanorder)
@@ -5417,7 +5415,7 @@ string_to_text(char *str)
 /*
  * Generate a C string representing a relation's reloptions, or NULL if none.
  */
-static char *
+char *
 flatten_reloptions(Oid relid)
 {
    char       *result = NULL;
@@ -5453,3 +5451,31 @@ flatten_reloptions(Oid relid)
 
    return result;
 }
+
+/*
+ * Generate an Array Datum representing a relation's reloptions using
+ * a C string
+ */
+Datum
+unflatten_reloptions(char *reloptstring)
+{
+   Datum       result = (Datum) 0;
+
+   if (reloptstring)
+   {
+       Datum       sep, relopts;
+
+       /*
+        * We want to use text_to_array(reloptstring, ', ') --- but
+        * DirectFunctionCall2(text_to_array) does not work, because
+        * text_to_array() relies on fcinfo to be valid.  So use
+        * OidFunctionCall2.
+        */
+       sep = DirectFunctionCall1(textin, CStringGetDatum(", "));
+       relopts = DirectFunctionCall1(textin, CStringGetDatum(reloptstring));
+
+       result = OidFunctionCall2(F_TEXT_TO_ARRAY, relopts, sep);
+   }
+
+   return result;
+}
index 5bb94a24f25294fb946a5f96465ea0760cf9bdfe..cf74692208b5e6b1eb3cffcab6e5e31d0f731ab1 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/commands/defrem.h,v 1.81 2007/03/13 00:33:43 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/commands/defrem.h,v 1.82 2007/07/17 05:02:02 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -26,6 +26,7 @@ extern void DefineIndex(RangeVar *heapRelation,
            List *attributeList,
            Expr *predicate,
            List *options,
+           char *src_options,
            bool unique,
            bool primary,
            bool isconstraint,
index 50bb6c2048fb736c3d37d60415d843e416ef09e3..a108759b76041f6168bb6923370c88d21d76964f 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.349 2007/06/23 22:12:52 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.350 2007/07/17 05:02:02 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1501,6 +1501,7 @@ typedef struct IndexStmt
    char       *tableSpace;     /* tablespace, or NULL to use parent's */
    List       *indexParams;    /* a list of IndexElem */
    List       *options;        /* options from WITH clause */
+   char       *src_options;    /* relopts inherited from source index */
    Node       *whereClause;    /* qualification (partial-index predicate) */
    bool        unique;         /* is index unique? */
    bool        primary;        /* is index on primary key? */
index e35a287718b748ca63141ceea07059196a899ce6..3c3a9ed2d426605c66c18f9ebe1f84597262ecaa 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/builtins.h,v 1.297 2007/06/26 16:48:09 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/utils/builtins.h,v 1.298 2007/07/17 05:02:02 neilc Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -560,6 +560,8 @@ extern List *deparse_context_for_plan(Node *outer_plan, Node *inner_plan,
 extern const char *quote_identifier(const char *ident);
 extern char *quote_qualified_identifier(const char *namespace,
                           const char *ident);
+extern char *flatten_reloptions(Oid relid);
+extern Datum unflatten_reloptions(char *reloptstring);
 
 /* tid.c */
 extern Datum tidin(PG_FUNCTION_ARGS);
index fa97f019b1d7aac435017153357b708879f7d40c..40dfaeda902a0c75e0f51b8e66fc86c2190f5319 100644 (file)
@@ -633,6 +633,26 @@ SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y
 (2 rows)
 
 DROP TABLE inhg;
+CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
+INSERT INTO inhg VALUES (5, 10);
+INSERT INTO inhg VALUES (20, 10); -- should fail
+ERROR:  duplicate key value violates unique constraint "inhg_pkey"
+DROP TABLE inhg;
+/* Multiple primary keys creation should fail */
+CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, PRIMARY KEY(x)); /* fails */
+ERROR:  multiple primary keys for table "inhg" are not allowed
+CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE);
+NOTICE:  CREATE TABLE / UNIQUE will create implicit index "inhz_yy_key" for table "inhz"
+CREATE UNIQUE INDEX inhz_xx_idx on inhz (xx) WHERE xx <> 'test';
+/* Ok to create multiple unique indexes */
+CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING INDEXES);
+NOTICE:  CREATE TABLE / UNIQUE will create implicit index "inhg_x_key" for table "inhg"
+INSERT INTO inhg (xx, yy, x) VALUES ('test', 5, 10);
+INSERT INTO inhg (xx, yy, x) VALUES ('test', 10, 15);
+INSERT INTO inhg (xx, yy, x) VALUES ('foo', 10, 15); -- should fail
+ERROR:  duplicate key value violates unique constraint "inhg_x_key"
+DROP TABLE inhg;
+DROP TABLE inhz;
 -- Test changing the type of inherited columns
 insert into d values('test','one','two','three');
 alter table a alter column aa type integer using bit_length(aa);
index cd4221f899db1f3e213505a15f332529e90eb77f..b0499a649284df366d8974fca70e92423d1a4fae 100644 (file)
@@ -156,6 +156,21 @@ INSERT INTO inhg VALUES ('x', 'foo',  'y');  /* fails due to constraint */
 SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y */
 DROP TABLE inhg;
 
+CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
+INSERT INTO inhg VALUES (5, 10);
+INSERT INTO inhg VALUES (20, 10); -- should fail
+DROP TABLE inhg;
+/* Multiple primary keys creation should fail */
+CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, PRIMARY KEY(x)); /* fails */
+CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE);
+CREATE UNIQUE INDEX inhz_xx_idx on inhz (xx) WHERE xx <> 'test';
+/* Ok to create multiple unique indexes */
+CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING INDEXES);
+INSERT INTO inhg (xx, yy, x) VALUES ('test', 5, 10);
+INSERT INTO inhg (xx, yy, x) VALUES ('test', 10, 15);
+INSERT INTO inhg (xx, yy, x) VALUES ('foo', 10, 15); -- should fail
+DROP TABLE inhg;
+DROP TABLE inhz;
 
 -- Test changing the type of inherited columns
 insert into d values('test','one','two','three');