Code review for check-domain-constraints-in-COPY patch. Do correct thing
authorTom Lane
Fri, 20 Sep 2002 16:56:02 +0000 (16:56 +0000)
committerTom Lane
Fri, 20 Sep 2002 16:56:02 +0000 (16:56 +0000)
when default expression for a domain is being used.  Avoid repetitive
catalog lookups.

src/backend/commands/copy.c

index becb66d7fc4358e3e6593026514dfb717693c3e6..b4de718eae993228d8c436c590572dc3fa527313 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.174 2002/09/20 15:43:03 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.175 2002/09/20 16:56:02 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -43,6 +43,7 @@
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
+
 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
 #define OCTVALUE(c) ((c) - '0')
 
@@ -746,8 +747,9 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
                num_defaults;
    FmgrInfo   *in_functions;
    Oid        *elements;
-   bool       *isDomain;
-   bool        hasDomain = false;
+   Node      **constraintexprs;
+   Const     **constraintconsts;
+   bool        hasConstraints = false;
    int         i;
    List       *cur;
    Oid         in_func_oid;
@@ -792,15 +794,18 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
    ExecSetSlotDescriptor(slot, tupDesc, false);
 
    /*
-    * pick up the input function and default expression (if any) for each
-    * attribute in the relation.  (We don't actually use the input
-    * function if it's a binary copy.)
+    * Pick up the required catalog information for each attribute in the
+    * relation, including the input function, the element type (to pass
+    * to the input function), and info about defaults and constraints.
+    * (We don't actually use the input function if it's a binary copy.)
     */
-   defmap = (int *) palloc(sizeof(int) * num_phys_attrs);
-   defexprs = (Node **) palloc(sizeof(Node *) * num_phys_attrs);
    in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    elements = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
-   isDomain = (bool *) palloc(num_phys_attrs * sizeof(bool));
+   defmap = (int *) palloc(num_phys_attrs * sizeof(int));
+   defexprs = (Node **) palloc(num_phys_attrs * sizeof(Node *));
+   constraintexprs = (Node **) palloc(num_phys_attrs * sizeof(Node *));
+   constraintconsts = (Const **) palloc(num_phys_attrs * sizeof(Const *));
+   MemSet(constraintexprs, 0, num_phys_attrs * sizeof(Node *));
 
    for (i = 0; i < num_phys_attrs; i++)
    {
@@ -808,20 +813,12 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
        if (attr[i]->attisdropped)
            continue;
 
-       /* Test for the base type */
-       if (getBaseType(attr[i]->atttypid) != attr[i]->atttypid)
-       {
-           hasDomain = true;
-           isDomain[i] = true;
-       }
-       else
-           isDomain[i] = false;
-
        /* Fetch the input function */
        in_func_oid = (Oid) GetInputFunction(attr[i]->atttypid);
        fmgr_info(in_func_oid, &in_functions[i]);
        elements[i] = GetTypeElement(attr[i]->atttypid);
 
+       /* Get default info if needed */
        if (intMember(i + 1, attnumlist))
        {
            /* attribute is to be copied */
@@ -839,6 +836,45 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
                num_defaults++;
            }
        }
+
+       /* If it's a domain type, get info on domain constraints */
+       if (get_typtype(attr[i]->atttypid) == 'd')
+       {
+           Const      *con;
+           Node       *node;
+
+           /*
+            * Easiest way to do this is to use parse_coerce.c to set up
+            * an expression that checks the constraints.  (At present,
+            * the expression might contain a length-coercion-function call
+            * and/or ConstraintTest nodes.)  The bottom of the expression
+            * is a Const node that we fill in with the actual datum during
+            * the data input loop.
+            *
+            * XXX to prevent premature constant folding in parse_coerce,
+            * pass in a NULL constant to start with.  See the comments in
+            * coerce_type; this should be changed someday to use some sort
+            * of Param node instead of a Const.
+            */
+           con = makeConst(attr[i]->atttypid,
+                           attr[i]->attlen,
+                           (Datum) 0,
+                           true,        /* is null */
+                           attr[i]->attbyval,
+                           false,      /* not a set */
+                           false);     /* not coerced */
+
+           node = coerce_type_constraints((Node *) con, attr[i]->atttypid,
+                                          COERCE_IMPLICIT_CAST);
+
+           /* check whether any constraints actually found */
+           if (node != (Node *) con)
+           {
+               constraintexprs[i] = node;
+               constraintconsts[i] = con;
+               hasConstraints = true;
+           }
+       }
    }
 
    if (!binary)
@@ -1090,52 +1126,6 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
            }
        }
 
-       /* Deal with domains */
-       if (hasDomain)
-       {
-           ParseState *pstate;
-           pstate = make_parsestate(NULL);
-
-           foreach(cur, attnumlist)
-           {
-               int         attnum = lfirsti(cur);
-               int         m = attnum - 1;
-
-               Const      *con;
-               Node       *node;
-               bool       isNull = (nulls[m] == 'n');
-
-               /* This is not a domain, so lets skip it */
-               if (!isDomain[m])
-                   continue;
-
-               /*
-                * This is a domain.  As such, we must process it's input
-                * function and coerce_type_constraints.  The simplest way
-                * of doing that is to allow coerce_type to accomplish its
-                * job from an unknown constant
-                */
-
-               /* Create a constant */
-               con = makeConst(attr[m]->atttypid,
-                               attr[m]->attlen,
-                               values[m],
-                               isNull,
-                               attr[m]->attbyval,
-                               false,      /* not a set */
-                               false);     /* not coerced */
-
-               /* Process constraints */
-               node = coerce_type_constraints((Node *) con, attr[m]->atttypid,
-                                               COERCE_IMPLICIT_CAST);
-
-               values[m] = ExecEvalExpr(node, econtext,
-                                        &isNull, NULL);
-
-               nulls[m] = isNull ? 'n' : ' ';
-           }
-       }
-
        /*
         * Now compute and insert any defaults available for the columns
         * not provided by the input data.  Anything not processed here or
@@ -1151,6 +1141,36 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
                nulls[defmap[i]] = ' ';
        }
 
+       /*
+        * Next apply any domain constraints
+        */
+       if (hasConstraints)
+       {
+           for (i = 0; i < num_phys_attrs; i++)
+           {
+               Node       *node = constraintexprs[i];
+               Const      *con;
+               bool        isnull;
+
+               if (node == NULL)
+                   continue;   /* no constraint for this attr */
+
+               /* Insert current row's value into the Const node */
+               con = constraintconsts[i];
+               con->constvalue = values[i];
+               con->constisnull = (nulls[i] == 'n');
+
+               /*
+                * Execute the constraint expression.  Allow the expression
+                * to replace the value (consider e.g. a timestamp precision
+                * restriction).
+                */
+               values[i] = ExecEvalExpr(node, econtext,
+                                        &isnull, NULL);
+               nulls[i] = isnull ? 'n' : ' ';
+           }
+       }
+
        /*
         * And now we can form the input tuple.
         */