Export the external file reader used in COPY FROM as APIs.
authorItagaki Takahiro
Wed, 16 Feb 2011 02:19:11 +0000 (11:19 +0900)
committerItagaki Takahiro
Wed, 16 Feb 2011 02:19:11 +0000 (11:19 +0900)
They are expected to be used by extension modules like file_fdw.
There are no user-visible changes.

Itagaki Takahiro
Reviewed and tested by Kevin Grittner and Noah Misch.

src/backend/commands/copy.c
src/include/commands/copy.h

index 3350ca0b6ef510999a266fd70badfd6b51ebfb01..9f7263d59a54107afdf0fd44a79154e32225ad2c 100644 (file)
@@ -93,13 +93,11 @@ typedef struct CopyStateData
    FILE       *copy_file;      /* used if copy_dest == COPY_FILE */
    StringInfo  fe_msgbuf;      /* used for all dests during COPY TO, only for
                                 * dest == COPY_NEW_FE in COPY FROM */
-   bool        fe_copy;        /* true for all FE copy dests */
    bool        fe_eof;         /* true if detected end of copy data */
    EolType     eol_type;       /* EOL type of input */
    int         client_encoding;    /* remote side's character encoding */
    bool        need_transcoding;       /* client encoding diff from server? */
    bool        encoding_embeds_ascii;  /* ASCII can be non-first byte? */
-   uint64      processed;      /* # of tuples processed */
 
    /* parameters from the COPY command */
    Relation    rel;            /* relation to copy to or from */
@@ -119,18 +117,35 @@ typedef struct CopyStateData
    bool       *force_quote_flags;      /* per-column CSV FQ flags */
    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
 
-   /* these are just for error messages, see copy_in_error_callback */
+   /* these are just for error messages, see CopyFromErrorCallback */
    const char *cur_relname;    /* table name for error messages */
    int         cur_lineno;     /* line number for error messages */
    const char *cur_attname;    /* current att for error messages */
    const char *cur_attval;     /* current att value for error messages */
 
+   /*
+    * Working state for COPY TO/FROM
+    */
+   MemoryContext copycontext;  /* per-copy execution context */
+
    /*
     * Working state for COPY TO
     */
    FmgrInfo   *out_functions;  /* lookup info for output functions */
    MemoryContext rowcontext;   /* per-row evaluation context */
 
+   /*
+    * Working state for COPY FROM
+    */
+   AttrNumber  num_defaults;
+   bool        file_has_oids;
+   FmgrInfo    oid_in_function;
+   Oid         oid_typioparam;
+   FmgrInfo   *in_functions;   /* array of input functions for each attrs */
+   Oid        *typioparams;    /* array of element types for in_functions */
+   int        *defmap;         /* array of default att numbers */
+   ExprState **defexprs;       /* array of default att expressions */
+
    /*
     * These variables are used to reduce overhead in textual COPY FROM.
     *
@@ -169,13 +184,12 @@ typedef struct CopyStateData
    int         raw_buf_len;    /* total # of bytes stored */
 } CopyStateData;
 
-typedef CopyStateData *CopyState;
-
 /* DestReceiver for COPY (SELECT) TO */
 typedef struct
 {
    DestReceiver pub;           /* publicly-known function pointers */
    CopyState   cstate;         /* CopyStateData for the command */
+   uint64      processed;      /* # of tuples processed */
 } DR_copy;
 
 
@@ -248,11 +262,17 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static void DoCopyTo(CopyState cstate);
-static void CopyTo(CopyState cstate);
+static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
+               const char *queryString, List *attnamelist, List *options);
+static void EndCopy(CopyState cstate);
+static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
+               const char *filename, List *attnamelist, List *options);
+static void EndCopyTo(CopyState cstate);
+static uint64 DoCopyTo(CopyState cstate);
+static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
             Datum *values, bool *nulls);
-static void CopyFrom(CopyState cstate);
+static uint64 CopyFrom(CopyState cstate);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
 static int CopyReadAttributesText(CopyState cstate);
@@ -700,6 +720,102 @@ CopyLoadRawBuf(CopyState cstate)
  * input/output stream. The latter could be either stdin/stdout or a
  * socket, depending on whether we're running under Postmaster control.
  *
+ * Do not allow a Postgres user without superuser privilege to read from
+ * or write to a file.
+ *
+ * Do not allow the copy if user doesn't have proper permission to access
+ * the table or the specifically requested columns.
+ */
+uint64
+DoCopy(const CopyStmt *stmt, const char *queryString)
+{
+   CopyState   cstate;
+   bool        is_from = stmt->is_from;
+   bool        pipe = (stmt->filename == NULL);
+   Relation    rel;
+   uint64      processed;
+
+   /* Disallow file COPY except to superusers. */
+   if (!pipe && !superuser())
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("must be superuser to COPY to or from a file"),
+                errhint("Anyone can COPY to stdout or from stdin. "
+                        "psql's \\copy command also works for anyone.")));
+
+   if (stmt->relation)
+   {
+       TupleDesc       tupDesc;
+       AclMode         required_access = (is_from ? ACL_INSERT : ACL_SELECT);
+       RangeTblEntry  *rte;
+       List           *attnums;
+       ListCell       *cur;
+
+       Assert(!stmt->query);
+
+       /* Open and lock the relation, using the appropriate lock type. */
+       rel = heap_openrv(stmt->relation,
+                            (is_from ? RowExclusiveLock : AccessShareLock));
+
+       rte = makeNode(RangeTblEntry);
+       rte->rtekind = RTE_RELATION;
+       rte->relid = RelationGetRelid(rel);
+       rte->requiredPerms = required_access;
+
+       tupDesc = RelationGetDescr(rel);
+       attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
+       foreach (cur, attnums)
+       {
+           int     attno = lfirst_int(cur) -
+                           FirstLowInvalidHeapAttributeNumber;
+
+           if (is_from)
+               rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
+           else
+               rte->selectedCols = bms_add_member(rte->selectedCols, attno);
+       }
+       ExecCheckRTPerms(list_make1(rte), true);
+   }
+   else
+   {
+       Assert(stmt->query);
+
+       rel = NULL;
+   }
+
+   if (is_from)
+   {
+       /* check read-only transaction */
+       if (XactReadOnly && rel->rd_backend != MyBackendId)
+           PreventCommandIfReadOnly("COPY FROM");
+
+       cstate = BeginCopyFrom(rel, stmt->filename,
+                              stmt->attlist, stmt->options);
+       processed = CopyFrom(cstate);   /* copy from file to database */
+       EndCopyFrom(cstate);
+   }
+   else
+   {
+       cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
+                            stmt->attlist, stmt->options);
+       processed = DoCopyTo(cstate);   /* copy from database to file */
+       EndCopyTo(cstate);
+   }
+
+   /*
+    * Close the relation. If reading, we can release the AccessShareLock we
+    * got; if writing, we should hold the lock until end of transaction to
+    * ensure that updates will be committed before lock is released.
+    */
+   if (rel != NULL)
+       heap_close(rel, (is_from ? NoLock : AccessShareLock));
+
+   return processed;
+}
+
+/*
+ * Common setup routines used by BeginCopyFrom and BeginCopyTo.
+ *
  * Iff , unload or reload in the binary format, as opposed to the
  * more wasteful but more robust and portable text format.
  *
@@ -711,35 +827,42 @@ CopyLoadRawBuf(CopyState cstate)
  *
  * If in the text format, delimit columns with delimiter  and print
  * NULL values as .
- *
- * Do not allow a Postgres user without superuser privilege to read from
- * or write to a file.
- *
- * Do not allow the copy if user doesn't have proper permission to access
- * the table or the specifically requested columns.
  */
-uint64
-DoCopy(const CopyStmt *stmt, const char *queryString)
+static CopyState
+BeginCopy(bool is_from,
+         Relation rel,
+         Node *raw_query,
+         const char *queryString,
+         List *attnamelist,
+         List *options)
 {
    CopyState   cstate;
-   bool        is_from = stmt->is_from;
-   bool        pipe = (stmt->filename == NULL);
-   List       *attnamelist = stmt->attlist;
    List       *force_quote = NIL;
    List       *force_notnull = NIL;
    bool        force_quote_all = false;
    bool        format_specified = false;
-   AclMode     required_access = (is_from ? ACL_INSERT : ACL_SELECT);
    ListCell   *option;
    TupleDesc   tupDesc;
    int         num_phys_attrs;
-   uint64      processed;
+   MemoryContext oldcontext;
 
    /* Allocate workspace and zero all fields */
    cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
 
+   /*
+    * We allocate everything used by a cstate in a new memory context.
+    * This would avoid memory leaks repeated uses of COPY in a query.
+    */
+   cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
+                                               "COPY",
+                                               ALLOCSET_DEFAULT_MINSIZE,
+                                               ALLOCSET_DEFAULT_INITSIZE,
+                                               ALLOCSET_DEFAULT_MAXSIZE);
+
+   oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
    /* Extract options from the statement node tree */
-   foreach(option, stmt->options)
+   foreach(option, options)
    {
        DefElem    *defel = (DefElem *) lfirst(option);
 
@@ -980,51 +1103,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("CSV quote character must not appear in the NULL specification")));
 
-   /* Disallow file COPY except to superusers. */
-   if (!pipe && !superuser())
-       ereport(ERROR,
-               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                errmsg("must be superuser to COPY to or from a file"),
-                errhint("Anyone can COPY to stdout or from stdin. "
-                        "psql's \\copy command also works for anyone.")));
-
-   if (stmt->relation)
+   if (rel)
    {
-       RangeTblEntry  *rte;
-       List           *attnums;
-       ListCell       *cur;
-
-       Assert(!stmt->query);
-       cstate->queryDesc = NULL;
+       Assert(!raw_query);
 
-       /* Open and lock the relation, using the appropriate lock type. */
-       cstate->rel = heap_openrv(stmt->relation,
-                            (is_from ? RowExclusiveLock : AccessShareLock));
+       cstate->rel = rel;
 
        tupDesc = RelationGetDescr(cstate->rel);
 
-       /* Check relation permissions. */
-       rte = makeNode(RangeTblEntry);
-       rte->rtekind = RTE_RELATION;
-       rte->relid = RelationGetRelid(cstate->rel);
-       rte->requiredPerms = required_access;
-
-       attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
-       foreach (cur, attnums)
-       {
-           int     attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
-
-           if (is_from)
-               rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
-           else
-               rte->selectedCols = bms_add_member(rte->selectedCols, attno);
-       }
-       ExecCheckRTPerms(list_make1(rte), true);
-
-       /* check read-only transaction */
-       if (XactReadOnly && is_from && cstate->rel->rd_backend != MyBackendId)
-           PreventCommandIfReadOnly("COPY FROM");
-
        /* Don't allow COPY w/ OIDs to or from a table without them */
        if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
            ereport(ERROR,
@@ -1058,7 +1144,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
         * function and is executed repeatedly.  (See also the same hack in
         * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
         */
-       rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
+       rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
                                           queryString, NULL, 0);
 
        /* We don't expect more or less than one result query */
@@ -1160,14 +1246,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
        }
    }
 
-   /* Set up variables to avoid per-attribute overhead. */
-   initStringInfo(&cstate->attribute_buf);
-   initStringInfo(&cstate->line_buf);
-   cstate->line_buf_converted = false;
-   cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
-   cstate->raw_buf_index = cstate->raw_buf_len = 0;
-   cstate->processed = 0;
-
    /*
     * Set up encoding conversion info.  Even if the client and server
     * encodings are the same, we must apply pg_client_to_server() to validate
@@ -1181,84 +1259,75 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
    cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
 
    cstate->copy_dest = COPY_FILE;      /* default */
-   cstate->filename = stmt->filename;
 
-   if (is_from)
-       CopyFrom(cstate);       /* copy from file to database */
-   else
-       DoCopyTo(cstate);       /* copy from database to file */
+   MemoryContextSwitchTo(oldcontext);
 
-   /*
-    * Close the relation or query.  If reading, we can release the
-    * AccessShareLock we got; if writing, we should hold the lock until end
-    * of transaction to ensure that updates will be committed before lock is
-    * released.
-    */
-   if (cstate->rel)
-       heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
-   else
-   {
-       /* Close down the query and free resources. */
-       ExecutorEnd(cstate->queryDesc);
-       FreeQueryDesc(cstate->queryDesc);
-       PopActiveSnapshot();
-   }
+   return cstate;
+}
 
-   /* Clean up storage (probably not really necessary) */
-   processed = cstate->processed;
+/*
+ * Release resources allocated in a cstate for COPY TO/FROM.
+ */
+static void
+EndCopy(CopyState cstate)
+{
+   if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not close file \"%s\": %m",
+                       cstate->filename)));
 
-   pfree(cstate->attribute_buf.data);
-   pfree(cstate->line_buf.data);
-   pfree(cstate->raw_buf);
+   MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
-
-   return processed;
 }
 
-
 /*
- * This intermediate routine exists mainly to localize the effects of setjmp
- * so we don't need to plaster a lot of variables with "volatile".
+ * Setup CopyState to read tuples from a table or a query for COPY TO.
  */
-static void
-DoCopyTo(CopyState cstate)
+static CopyState
+BeginCopyTo(Relation rel,
+           Node *query,
+           const char *queryString,
+           const char *filename,
+           List *attnamelist,
+           List *options)
 {
-   bool        pipe = (cstate->filename == NULL);
+   CopyState   cstate;
+   bool        pipe = (filename == NULL);
+   MemoryContext oldcontext;
 
-   if (cstate->rel)
+   if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
    {
-       if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
-       {
-           if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
-               ereport(ERROR,
-                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                        errmsg("cannot copy from view \"%s\"",
-                               RelationGetRelationName(cstate->rel)),
-                        errhint("Try the COPY (SELECT ...) TO variant.")));
-           else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
-               ereport(ERROR,
-                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                        errmsg("cannot copy from foreign table \"%s\"",
-                               RelationGetRelationName(cstate->rel)),
-                        errhint("Try the COPY (SELECT ...) TO variant.")));
-           else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
-               ereport(ERROR,
-                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                        errmsg("cannot copy from sequence \"%s\"",
-                               RelationGetRelationName(cstate->rel))));
-           else
-               ereport(ERROR,
-                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                        errmsg("cannot copy from non-table relation \"%s\"",
-                               RelationGetRelationName(cstate->rel))));
-       }
+       if (rel->rd_rel->relkind == RELKIND_VIEW)
+           ereport(ERROR,
+                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                    errmsg("cannot copy from view \"%s\"",
+                           RelationGetRelationName(rel)),
+                    errhint("Try the COPY (SELECT ...) TO variant.")));
+       else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+           ereport(ERROR,
+                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                    errmsg("cannot copy from foreign table \"%s\"",
+                           RelationGetRelationName(rel)),
+                    errhint("Try the COPY (SELECT ...) TO variant.")));
+       else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
+           ereport(ERROR,
+                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                    errmsg("cannot copy from sequence \"%s\"",
+                           RelationGetRelationName(rel))));
+       else
+           ereport(ERROR,
+                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                    errmsg("cannot copy from non-table relation \"%s\"",
+                           RelationGetRelationName(rel))));
    }
 
+   cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
+   oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
    if (pipe)
    {
-       if (whereToSendOutput == DestRemote)
-           cstate->fe_copy = true;
-       else
+       if (whereToSendOutput != DestRemote)
            cstate->copy_file = stdout;
    }
    else
@@ -1270,11 +1339,12 @@ DoCopyTo(CopyState cstate)
         * Prevent write to relative path ... too easy to shoot oneself in the
         * foot by overwriting a database file ...
         */
-       if (!is_absolute_path(cstate->filename))
+       if (!is_absolute_path(filename))
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_NAME),
                     errmsg("relative path not allowed for COPY to file")));
 
+       cstate->filename = pstrdup(filename);
        oumask = umask(S_IWGRP | S_IWOTH);
        cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
        umask(oumask);
@@ -1292,14 +1362,30 @@ DoCopyTo(CopyState cstate)
                     errmsg("\"%s\" is a directory", cstate->filename)));
    }
 
+   MemoryContextSwitchTo(oldcontext);
+
+   return cstate;
+}
+
+/*
+ * This intermediate routine exists mainly to localize the effects of setjmp
+ * so we don't need to plaster a lot of variables with "volatile".
+ */
+static uint64
+DoCopyTo(CopyState cstate)
+{
+   bool        pipe = (cstate->filename == NULL);
+   bool        fe_copy = (pipe && whereToSendOutput == DestRemote);
+   uint64      processed;
+
    PG_TRY();
    {
-       if (cstate->fe_copy)
+       if (fe_copy)
            SendCopyBegin(cstate);
 
-       CopyTo(cstate);
+       processed = CopyTo(cstate);
 
-       if (cstate->fe_copy)
+       if (fe_copy)
            SendCopyEnd(cstate);
    }
    PG_CATCH();
@@ -1314,26 +1400,38 @@ DoCopyTo(CopyState cstate)
    }
    PG_END_TRY();
 
-   if (!pipe)
+   return processed;
+}
+
+/*
+ * Clean up storage and release resources for COPY TO.
+ */
+static void
+EndCopyTo(CopyState cstate)
+{
+   if (cstate->queryDesc != NULL)
    {
-       if (FreeFile(cstate->copy_file))
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not close file \"%s\": %m",
-                           cstate->filename)));
+       /* Close down the query and free resources. */
+       ExecutorEnd(cstate->queryDesc);
+       FreeQueryDesc(cstate->queryDesc);
+       PopActiveSnapshot();
    }
+
+   /* Clean up storage */
+   EndCopy(cstate);
 }
 
 /*
  * Copy from relation or query TO file.
  */
-static void
+static uint64
 CopyTo(CopyState cstate)
 {
    TupleDesc   tupDesc;
    int         num_phys_attrs;
    Form_pg_attribute *attr;
    ListCell   *cur;
+   uint64      processed;
 
    if (cstate->rel)
        tupDesc = RelationGetDescr(cstate->rel);
@@ -1439,6 +1537,7 @@ CopyTo(CopyState cstate)
 
        scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
 
+       processed = 0;
        while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
        {
            CHECK_FOR_INTERRUPTS();
@@ -1448,14 +1547,19 @@ CopyTo(CopyState cstate)
 
            /* Format and send the data */
            CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
+           processed++;
        }
 
        heap_endscan(scandesc);
+
+       pfree(values);
+       pfree(nulls);
    }
    else
    {
        /* run the plan --- the dest receiver will send tuples */
        ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+       processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
    }
 
    if (cstate->binary)
@@ -1467,6 +1571,8 @@ CopyTo(CopyState cstate)
    }
 
    MemoryContextDelete(cstate->rowcontext);
+
+   return processed;
 }
 
 /*
@@ -1558,16 +1664,16 @@ CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
    CopySendEndOfRow(cstate);
 
    MemoryContextSwitchTo(oldcontext);
-
-   cstate->processed++;
 }
 
 
 /*
  * error context callback for COPY FROM
+ *
+ * The argument for the error context must be CopyState.
  */
-static void
-copy_in_error_callback(void *arg)
+void
+CopyFromErrorCallback(void *arg)
 {
    CopyState   cstate = (CopyState) arg;
 
@@ -1669,41 +1775,23 @@ limit_printout_length(const char *str)
 /*
  * Copy FROM file to relation.
  */
-static void
+static uint64
 CopyFrom(CopyState cstate)
 {
-   bool        pipe = (cstate->filename == NULL);
    HeapTuple   tuple;
    TupleDesc   tupDesc;
-   Form_pg_attribute *attr;
-   AttrNumber  num_phys_attrs,
-               attr_count,
-               num_defaults;
-   FmgrInfo   *in_functions;
-   FmgrInfo    oid_in_function;
-   Oid        *typioparams;
-   Oid         oid_typioparam;
-   int         attnum;
-   int         i;
-   Oid         in_func_oid;
    Datum      *values;
    bool       *nulls;
-   int         nfields;
-   char      **field_strings;
-   bool        done = false;
-   bool        isnull;
    ResultRelInfo *resultRelInfo;
    EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
+   ExprContext *econtext;
    TupleTableSlot *slot;
-   bool        file_has_oids;
-   int        *defmap;
-   ExprState **defexprs;       /* array of default att expressions */
-   ExprContext *econtext;      /* used for ExecEvalExpr for default atts */
    MemoryContext oldcontext = CurrentMemoryContext;
    ErrorContextCallback errcontext;
    CommandId   mycid = GetCurrentCommandId(true);
    int         hi_options = 0; /* start with default heap_insert options */
    BulkInsertState bistate;
+   uint64      processed = 0;
 
    Assert(cstate->rel);
 
@@ -1731,6 +1819,8 @@ CopyFrom(CopyState cstate)
                            RelationGetRelationName(cstate->rel))));
    }
 
+   tupDesc = RelationGetDescr(cstate->rel);
+
    /*----------
     * Check to see if we can avoid writing WAL
     *
@@ -1766,48 +1856,16 @@ CopyFrom(CopyState cstate)
            hi_options |= HEAP_INSERT_SKIP_WAL;
    }
 
-   if (pipe)
-   {
-       if (whereToSendOutput == DestRemote)
-           ReceiveCopyBegin(cstate);
-       else
-           cstate->copy_file = stdin;
-   }
-   else
-   {
-       struct stat st;
-
-       cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
-
-       if (cstate->copy_file == NULL)
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not open file \"%s\" for reading: %m",
-                           cstate->filename)));
-
-       fstat(fileno(cstate->copy_file), &st);
-       if (S_ISDIR(st.st_mode))
-           ereport(ERROR,
-                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                    errmsg("\"%s\" is a directory", cstate->filename)));
-   }
-
-   tupDesc = RelationGetDescr(cstate->rel);
-   attr = tupDesc->attrs;
-   num_phys_attrs = tupDesc->natts;
-   attr_count = list_length(cstate->attnumlist);
-   num_defaults = 0;
-
-   /*
-    * We need a ResultRelInfo so we can use the regular executor's
-    * index-entry-making machinery.  (There used to be a huge amount of code
-    * here that basically duplicated execUtils.c ...)
-    */
-   resultRelInfo = makeNode(ResultRelInfo);
-   resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
-   resultRelInfo->ri_RelationDesc = cstate->rel;
-   resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
-   if (resultRelInfo->ri_TrigDesc)
+   /*
+    * We need a ResultRelInfo so we can use the regular executor's
+    * index-entry-making machinery.  (There used to be a huge amount of code
+    * here that basically duplicated execUtils.c ...)
+    */
+   resultRelInfo = makeNode(ResultRelInfo);
+   resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
+   resultRelInfo->ri_RelationDesc = cstate->rel;
+   resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
+   if (resultRelInfo->ri_TrigDesc)
    {
        resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
            palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
@@ -1826,8 +1884,191 @@ CopyFrom(CopyState cstate)
    slot = ExecInitExtraTupleSlot(estate);
    ExecSetSlotDescriptor(slot, tupDesc);
 
+   /* Prepare to catch AFTER triggers. */
+   AfterTriggerBeginQuery();
+
+   /*
+    * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
+    * should do this for COPY, since it's not really an "INSERT" statement as
+    * such. However, executing these triggers maintains consistency with the
+    * EACH ROW triggers that we already fire on COPY.
+    */
+   ExecBSInsertTriggers(estate, resultRelInfo);
+
+   values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+   nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
+
+   bistate = GetBulkInsertState();
    econtext = GetPerTupleExprContext(estate);
 
+   /* Set up callback to identify error line number */
+   errcontext.callback = CopyFromErrorCallback;
+   errcontext.arg = (void *) cstate;
+   errcontext.previous = error_context_stack;
+   error_context_stack = &errcontext;
+
+   for (;;)
+   {
+       bool        skip_tuple;
+       Oid         loaded_oid = InvalidOid;
+
+       CHECK_FOR_INTERRUPTS();
+
+       /* Reset the per-tuple exprcontext */
+       ResetPerTupleExprContext(estate);
+
+       /* Switch into its memory context */
+       MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+       if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
+           break;
+
+       /* And now we can form the input tuple. */
+       tuple = heap_form_tuple(tupDesc, values, nulls);
+
+       if (loaded_oid != InvalidOid)
+           HeapTupleSetOid(tuple, loaded_oid);
+
+       /* Triggers and stuff need to be invoked in query context. */
+       MemoryContextSwitchTo(oldcontext);
+
+       skip_tuple = false;
+
+       /* BEFORE ROW INSERT Triggers */
+       if (resultRelInfo->ri_TrigDesc &&
+           resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+       {
+           HeapTuple   newtuple;
+
+           newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
+
+           if (newtuple == NULL)       /* "do nothing" */
+               skip_tuple = true;
+           else if (newtuple != tuple) /* modified by Trigger(s) */
+           {
+               heap_freetuple(tuple);
+               tuple = newtuple;
+           }
+       }
+
+       if (!skip_tuple)
+       {
+           List       *recheckIndexes = NIL;
+
+           /* Place tuple in tuple slot */
+           ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+           /* Check the constraints of the tuple */
+           if (cstate->rel->rd_att->constr)
+               ExecConstraints(resultRelInfo, slot, estate);
+
+           /* OK, store the tuple and create index entries for it */
+           heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+
+           if (resultRelInfo->ri_NumIndices > 0)
+               recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+                                                      estate);
+
+           /* AFTER ROW INSERT Triggers */
+           ExecARInsertTriggers(estate, resultRelInfo, tuple,
+                                recheckIndexes);
+
+           list_free(recheckIndexes);
+
+           /*
+            * We count only tuples not suppressed by a BEFORE INSERT trigger;
+            * this is the same definition used by execMain.c for counting
+            * tuples inserted by an INSERT command.
+            */
+           processed++;
+       }
+   }
+
+   /* Done, clean up */
+   error_context_stack = errcontext.previous;
+
+   FreeBulkInsertState(bistate);
+
+   MemoryContextSwitchTo(oldcontext);
+
+   /* Execute AFTER STATEMENT insertion triggers */
+   ExecASInsertTriggers(estate, resultRelInfo);
+
+   /* Handle queued AFTER triggers */
+   AfterTriggerEndQuery(estate);
+
+   pfree(values);
+   pfree(nulls);
+
+   ExecResetTupleTable(estate->es_tupleTable, false);
+
+   ExecCloseIndices(resultRelInfo);
+
+   FreeExecutorState(estate);
+
+   /*
+    * If we skipped writing WAL, then we need to sync the heap (but not
+    * indexes since those use WAL anyway)
+    */
+   if (hi_options & HEAP_INSERT_SKIP_WAL)
+       heap_sync(cstate->rel);
+
+   return processed;
+}
+
+/*
+ * Setup to read tuples from a file for COPY FROM.
+ *
+ * 'rel': Used as a template for the tuples
+ * 'filename': Name of server-local file to read
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyState, to be passed to NextCopyFrom and related functions.
+ */
+CopyState
+BeginCopyFrom(Relation rel,
+             const char *filename,
+             List *attnamelist,
+             List *options)
+{
+   CopyState   cstate;
+   bool        pipe = (filename == NULL);
+   TupleDesc   tupDesc;
+   Form_pg_attribute *attr;
+   AttrNumber  num_phys_attrs,
+               num_defaults;
+   FmgrInfo   *in_functions;
+   Oid        *typioparams;
+   int         attnum;
+   Oid         in_func_oid;
+   int        *defmap;
+   ExprState **defexprs;
+   MemoryContext oldcontext;
+
+   cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
+   oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
+   /* Initialize state variables */
+   cstate->fe_eof = false;
+   cstate->eol_type = EOL_UNKNOWN;
+   cstate->cur_relname = RelationGetRelationName(cstate->rel);
+   cstate->cur_lineno = 0;
+   cstate->cur_attname = NULL;
+   cstate->cur_attval = NULL;
+
+   /* Set up variables to avoid per-attribute overhead. */
+   initStringInfo(&cstate->attribute_buf);
+   initStringInfo(&cstate->line_buf);
+   cstate->line_buf_converted = false;
+   cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+   cstate->raw_buf_index = cstate->raw_buf_len = 0;
+
+   tupDesc = RelationGetDescr(cstate->rel);
+   attr = tupDesc->attrs;
+   num_phys_attrs = tupDesc->natts;
+   num_defaults = 0;
+
    /*
     * Pick up the required catalog information for each attribute in the
     * relation, including the input function, the element type (to pass to
@@ -1863,27 +2104,54 @@ CopyFrom(CopyState cstate)
 
            if (defexpr != NULL)
            {
-               defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
-                                                        estate);
+               /* Initialize expressions in copycontext. */
+               defexprs[num_defaults] = ExecInitExpr(
+                               expression_planner((Expr *) defexpr), NULL);
                defmap[num_defaults] = attnum - 1;
                num_defaults++;
            }
        }
    }
 
-   /* Prepare to catch AFTER triggers. */
-   AfterTriggerBeginQuery();
+   /* We keep those variables in cstate. */
+   cstate->in_functions = in_functions;
+   cstate->typioparams = typioparams;
+   cstate->defmap = defmap;
+   cstate->defexprs = defexprs;
+   cstate->num_defaults = num_defaults;
 
-   /*
-    * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
-    * should do this for COPY, since it's not really an "INSERT" statement as
-    * such. However, executing these triggers maintains consistency with the
-    * EACH ROW triggers that we already fire on COPY.
-    */
-   ExecBSInsertTriggers(estate, resultRelInfo);
+   if (pipe)
+   {
+       if (whereToSendOutput == DestRemote)
+           ReceiveCopyBegin(cstate);
+       else
+           cstate->copy_file = stdin;
+   }
+   else
+   {
+       struct stat st;
+
+       cstate->filename = pstrdup(filename);
+       cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+
+       if (cstate->copy_file == NULL)
+           ereport(ERROR,
+                   (errcode_for_file_access(),
+                    errmsg("could not open file \"%s\" for reading: %m",
+                           cstate->filename)));
+
+       fstat(fileno(cstate->copy_file), &st);
+       if (S_ISDIR(st.st_mode))
+           ereport(ERROR,
+                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                    errmsg("\"%s\" is a directory", cstate->filename)));
+   }
 
    if (!cstate->binary)
-       file_has_oids = cstate->oids;   /* must rely on user to tell us... */
+   {
+       /* must rely on user to tell us... */
+       cstate->file_has_oids = cstate->oids;
+   }
    else
    {
        /* Read and verify binary header */
@@ -1901,7 +2169,7 @@ CopyFrom(CopyState cstate)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("invalid COPY file header (missing flags)")));
-       file_has_oids = (tmp & (1 << 16)) != 0;
+       cstate->file_has_oids = (tmp & (1 << 16)) != 0;
        tmp &= ~(1 << 16);
        if ((tmp >> 16) != 0)
            ereport(ERROR,
@@ -1923,358 +2191,315 @@ CopyFrom(CopyState cstate)
        }
    }
 
-   if (file_has_oids && cstate->binary)
+   if (cstate->file_has_oids && cstate->binary)
    {
        getTypeBinaryInputInfo(OIDOID,
-                              &in_func_oid, &oid_typioparam);
-       fmgr_info(in_func_oid, &oid_in_function);
+                              &in_func_oid, &cstate->oid_typioparam);
+       fmgr_info(in_func_oid, &cstate->oid_in_function);
    }
 
-   values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-   nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
-
    /* create workspace for CopyReadAttributes results */
-   nfields = file_has_oids ? (attr_count + 1) : attr_count;
-   if (! cstate->binary)
+   if (!cstate->binary)
    {
+       AttrNumber  attr_count = list_length(cstate->attnumlist);
+       int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
+
        cstate->max_fields = nfields;
        cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
    }
 
-   /* Initialize state variables */
-   cstate->fe_eof = false;
-   cstate->eol_type = EOL_UNKNOWN;
-   cstate->cur_relname = RelationGetRelationName(cstate->rel);
-   cstate->cur_lineno = 0;
-   cstate->cur_attname = NULL;
-   cstate->cur_attval = NULL;
+   MemoryContextSwitchTo(oldcontext);
 
-   bistate = GetBulkInsertState();
+   return cstate;
+}
 
-   /* Set up callback to identify error line number */
-   errcontext.callback = copy_in_error_callback;
-   errcontext.arg = (void *) cstate;
-   errcontext.previous = error_context_stack;
-   error_context_stack = &errcontext;
+/*
+ * Read raw fields in the next line for COPY FROM in text or csv mode.
+ * Return false if no more lines.
+ *
+ * An internal temporary buffer is returned via 'fields'. It is valid until
+ * the next call of the function. Since the function returns all raw fields
+ * in the input file, 'nfields' could be different from the number of columns
+ * in the relation.
+ *
+ * NOTE: force_not_null option are not applied to the returned fields.
+ */
+bool
+NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
+{
+   int         fldct;
+   bool        done;
+
+   /* only available for text or csv input */
+   Assert(!cstate->binary);
 
    /* on input just throw the header line away */
-   if (cstate->header_line)
+   if (cstate->cur_lineno == 0 && cstate->header_line)
    {
        cstate->cur_lineno++;
-       done = CopyReadLine(cstate);
+       if (CopyReadLine(cstate))
+           return false;   /* done */
    }
 
-   while (!done)
-   {
-       bool        skip_tuple;
-       Oid         loaded_oid = InvalidOid;
+   cstate->cur_lineno++;
 
-       CHECK_FOR_INTERRUPTS();
+   /* Actually read the line into memory here */
+   done = CopyReadLine(cstate);
 
-       cstate->cur_lineno++;
+   /*
+    * EOF at start of line means we're done.  If we see EOF after
+    * some characters, we act as though it was newline followed by
+    * EOF, ie, process the line and then exit loop on next iteration.
+    */
+   if (done && cstate->line_buf.len == 0)
+       return false;
 
-       /* Reset the per-tuple exprcontext */
-       ResetPerTupleExprContext(estate);
+   /* Parse the line into de-escaped field values */
+   if (cstate->csv_mode)
+       fldct = CopyReadAttributesCSV(cstate);
+   else
+       fldct = CopyReadAttributesText(cstate);
 
-       /* Switch into its memory context */
-       MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+   *fields = cstate->raw_fields;
+   *nfields = fldct;
+   return true;
+}
 
-       /* Initialize all values for row to NULL */
-       MemSet(values, 0, num_phys_attrs * sizeof(Datum));
-       MemSet(nulls, true, num_phys_attrs * sizeof(bool));
+/*
+ * Read next tuple from file for COPY FROM. Return false if no more tuples.
+ *
+ * 'econtext' is used to evaluate default expression for each columns not
+ * read from the file. It can be NULL when no default values are used, i.e.
+ * when all columns are read from the file.
+ *
+ * 'values' and 'nulls' arrays must be the same length as columns of the
+ * relation passed to BeginCopyFrom. This function fills the arrays.
+ * Oid of the tuple is returned with 'tupleOid' separately.
+ */
+bool
+NextCopyFrom(CopyState cstate, ExprContext *econtext,
+            Datum *values, bool *nulls, Oid *tupleOid)
+{
+   TupleDesc   tupDesc;
+   Form_pg_attribute *attr;
+   AttrNumber  num_phys_attrs,
+               attr_count,
+               num_defaults = cstate->num_defaults;
+   FmgrInfo   *in_functions = cstate->in_functions;
+   Oid        *typioparams = cstate->typioparams;
+   int         i;
+   int         nfields;
+   bool        isnull;
+   bool        file_has_oids = cstate->file_has_oids;
+   int        *defmap = cstate->defmap;
+   ExprState **defexprs = cstate->defexprs;
 
-       if (!cstate->binary)
-       {
-           ListCell   *cur;
-           int         fldct;
-           int         fieldno;
-           char       *string;
+   tupDesc = RelationGetDescr(cstate->rel);
+   attr = tupDesc->attrs;
+   num_phys_attrs = tupDesc->natts;
+   attr_count = list_length(cstate->attnumlist);
+   nfields = file_has_oids ? (attr_count + 1) : attr_count;
 
-           /* Actually read the line into memory here */
-           done = CopyReadLine(cstate);
+   /* Initialize all values for row to NULL */
+   MemSet(values, 0, num_phys_attrs * sizeof(Datum));
+   MemSet(nulls, true, num_phys_attrs * sizeof(bool));
 
-           /*
-            * EOF at start of line means we're done.  If we see EOF after
-            * some characters, we act as though it was newline followed by
-            * EOF, ie, process the line and then exit loop on next iteration.
-            */
-           if (done && cstate->line_buf.len == 0)
-               break;
+   if (!cstate->binary)
+   {
+       char      **field_strings;
+       ListCell   *cur;
+       int         fldct;
+       int         fieldno;
+       char       *string;
 
-           /* Parse the line into de-escaped field values */
-           if (cstate->csv_mode)
-               fldct = CopyReadAttributesCSV(cstate);
-           else
-               fldct = CopyReadAttributesText(cstate);
+       /* read raw fields in the next line */
+       if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+           return false;
+
+       /* check for overflowing fields */
+       if (nfields > 0 && fldct > nfields)
+           ereport(ERROR,
+                   (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                    errmsg("extra data after last expected column")));
 
-           /* check for overflowing fields */
-           if (nfields > 0 && fldct > nfields)
+       fieldno = 0;
+
+       /* Read the OID field if present */
+       if (file_has_oids)
+       {
+           if (fieldno >= fldct)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                        errmsg("extra data after last expected column")));
+                        errmsg("missing data for OID column")));
+           string = field_strings[fieldno++];
 
-           fieldno = 0;
-           field_strings = cstate->raw_fields;
-
-           /* Read the OID field if present */
-           if (file_has_oids)
+           if (string == NULL)
+               ereport(ERROR,
+                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                        errmsg("null OID in COPY data")));
+           else if (cstate->oids && tupleOid != NULL)
            {
-               if (fieldno >= fldct)
-                   ereport(ERROR,
-                           (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                            errmsg("missing data for OID column")));
-               string = field_strings[fieldno++];
-
-               if (string == NULL)
-                   ereport(ERROR,
-                           (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                            errmsg("null OID in COPY data")));
-               else
-               {
-                   cstate->cur_attname = "oid";
-                   cstate->cur_attval = string;
-                   loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
+               cstate->cur_attname = "oid";
+               cstate->cur_attval = string;
+               *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
                                                   CStringGetDatum(string)));
-                   if (loaded_oid == InvalidOid)
-                       ereport(ERROR,
-                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                errmsg("invalid OID in COPY data")));
-                   cstate->cur_attname = NULL;
-                   cstate->cur_attval = NULL;
-               }
-           }
-
-           /* Loop to read the user attributes on the line. */
-           foreach(cur, cstate->attnumlist)
-           {
-               int         attnum = lfirst_int(cur);
-               int         m = attnum - 1;
-
-               if (fieldno >= fldct)
+               if (*tupleOid == InvalidOid)
                    ereport(ERROR,
                            (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                            errmsg("missing data for column \"%s\"",
-                                   NameStr(attr[m]->attname))));
-               string = field_strings[fieldno++];
-
-               if (cstate->csv_mode && string == NULL &&
-                   cstate->force_notnull_flags[m])
-               {
-                   /* Go ahead and read the NULL string */
-                   string = cstate->null_print;
-               }
-
-               cstate->cur_attname = NameStr(attr[m]->attname);
-               cstate->cur_attval = string;
-               values[m] = InputFunctionCall(&in_functions[m],
-                                             string,
-                                             typioparams[m],
-                                             attr[m]->atttypmod);
-               if (string != NULL)
-                   nulls[m] = false;
+                            errmsg("invalid OID in COPY data")));
                cstate->cur_attname = NULL;
                cstate->cur_attval = NULL;
            }
-
-           Assert(fieldno == nfields);
        }
-       else
-       {
-           /* binary */
-           int16       fld_count;
-           ListCell   *cur;
 
-           if (!CopyGetInt16(cstate, &fld_count))
-           {
-               /* EOF detected (end of file, or protocol-level EOF) */
-               done = true;
-               break;
-           }
-
-           if (fld_count == -1)
-           {
-               /*
-                * Received EOF marker.  In a V3-protocol copy, wait for
-                * the protocol-level EOF, and complain if it doesn't come
-                * immediately.  This ensures that we correctly handle
-                * CopyFail, if client chooses to send that now.
-                *
-                * Note that we MUST NOT try to read more data in an
-                * old-protocol copy, since there is no protocol-level EOF
-                * marker then.  We could go either way for copy from file,
-                * but choose to throw error if there's data after the EOF
-                * marker, for consistency with the new-protocol case.
-                */
-               char    dummy;
-
-               if (cstate->copy_dest != COPY_OLD_FE &&
-                   CopyGetData(cstate, &dummy, 1, 1) > 0)
-                   ereport(ERROR,
-                           (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                            errmsg("received copy data after EOF marker")));
-               done = true;
-               break;
-           }
+       /* Loop to read the user attributes on the line. */
+       foreach(cur, cstate->attnumlist)
+       {
+           int         attnum = lfirst_int(cur);
+           int         m = attnum - 1;
 
-           if (fld_count != attr_count)
+           if (fieldno >= fldct)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                        errmsg("row field count is %d, expected %d",
-                               (int) fld_count, attr_count)));
-
-           if (file_has_oids)
-           {
-               cstate->cur_attname = "oid";
-               loaded_oid =
-                   DatumGetObjectId(CopyReadBinaryAttribute(cstate,
-                                                            0,
-                                                            &oid_in_function,
-                                                            oid_typioparam,
-                                                            -1,
-                                                            &isnull));
-               if (isnull || loaded_oid == InvalidOid)
-                   ereport(ERROR,
-                           (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                            errmsg("invalid OID in COPY data")));
-               cstate->cur_attname = NULL;
-           }
+                        errmsg("missing data for column \"%s\"",
+                               NameStr(attr[m]->attname))));
+           string = field_strings[fieldno++];
 
-           i = 0;
-           foreach(cur, cstate->attnumlist)
+           if (cstate->csv_mode && string == NULL &&
+               cstate->force_notnull_flags[m])
            {
-               int         attnum = lfirst_int(cur);
-               int         m = attnum - 1;
-
-               cstate->cur_attname = NameStr(attr[m]->attname);
-               i++;
-               values[m] = CopyReadBinaryAttribute(cstate,
-                                                   i,
-                                                   &in_functions[m],
-                                                   typioparams[m],
-                                                   attr[m]->atttypmod,
-                                                   &nulls[m]);
-               cstate->cur_attname = NULL;
+               /* Go ahead and read the NULL string */
+               string = cstate->null_print;
            }
-       }
 
-       /*
-        * Now compute and insert any defaults available for the columns not
-        * provided by the input data.  Anything not processed here or above
-        * will remain NULL.
-        */
-       for (i = 0; i < num_defaults; i++)
-       {
-           values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
-                                            &nulls[defmap[i]], NULL);
+           cstate->cur_attname = NameStr(attr[m]->attname);
+           cstate->cur_attval = string;
+           values[m] = InputFunctionCall(&in_functions[m],
+                                         string,
+                                         typioparams[m],
+                                         attr[m]->atttypmod);
+           if (string != NULL)
+               nulls[m] = false;
+           cstate->cur_attname = NULL;
+           cstate->cur_attval = NULL;
        }
 
-       /* And now we can form the input tuple. */
-       tuple = heap_form_tuple(tupDesc, values, nulls);
-
-       if (cstate->oids && file_has_oids)
-           HeapTupleSetOid(tuple, loaded_oid);
-
-       /* Triggers and stuff need to be invoked in query context. */
-       MemoryContextSwitchTo(oldcontext);
+       Assert(fieldno == nfields);
+   }
+   else
+   {
+       /* binary */
+       int16       fld_count;
+       ListCell   *cur;
 
-       skip_tuple = false;
+       cstate->cur_lineno++;
 
-       /* BEFORE ROW INSERT Triggers */
-       if (resultRelInfo->ri_TrigDesc &&
-           resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+       if (!CopyGetInt16(cstate, &fld_count))
        {
-           HeapTuple   newtuple;
-
-           newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
-
-           if (newtuple == NULL)       /* "do nothing" */
-               skip_tuple = true;
-           else if (newtuple != tuple) /* modified by Trigger(s) */
-           {
-               heap_freetuple(tuple);
-               tuple = newtuple;
-           }
+           /* EOF detected (end of file, or protocol-level EOF) */
+           return false;
        }
 
-       if (!skip_tuple)
+       if (fld_count == -1)
        {
-           List       *recheckIndexes = NIL;
-
-           /* Place tuple in tuple slot */
-           ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-
-           /* Check the constraints of the tuple */
-           if (cstate->rel->rd_att->constr)
-               ExecConstraints(resultRelInfo, slot, estate);
-
-           /* OK, store the tuple and create index entries for it */
-           heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
-
-           if (resultRelInfo->ri_NumIndices > 0)
-               recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
-                                                      estate);
-
-           /* AFTER ROW INSERT Triggers */
-           ExecARInsertTriggers(estate, resultRelInfo, tuple,
-                                recheckIndexes);
-
-           list_free(recheckIndexes);
-
            /*
-            * We count only tuples not suppressed by a BEFORE INSERT trigger;
-            * this is the same definition used by execMain.c for counting
-            * tuples inserted by an INSERT command.
+            * Received EOF marker.  In a V3-protocol copy, wait for
+            * the protocol-level EOF, and complain if it doesn't come
+            * immediately.  This ensures that we correctly handle
+            * CopyFail, if client chooses to send that now.
+            *
+            * Note that we MUST NOT try to read more data in an
+            * old-protocol copy, since there is no protocol-level EOF
+            * marker then.  We could go either way for copy from file,
+            * but choose to throw error if there's data after the EOF
+            * marker, for consistency with the new-protocol case.
             */
-           cstate->processed++;
-       }
-   }
-
-   /* Done, clean up */
-   error_context_stack = errcontext.previous;
-
-   FreeBulkInsertState(bistate);
-
-   MemoryContextSwitchTo(oldcontext);
-
-   /* Execute AFTER STATEMENT insertion triggers */
-   ExecASInsertTriggers(estate, resultRelInfo);
-
-   /* Handle queued AFTER triggers */
-   AfterTriggerEndQuery(estate);
-
-   pfree(values);
-   pfree(nulls);
-   if (! cstate->binary)
-       pfree(cstate->raw_fields);
+           char    dummy;
 
-   pfree(in_functions);
-   pfree(typioparams);
-   pfree(defmap);
-   pfree(defexprs);
-
-   ExecResetTupleTable(estate->es_tupleTable, false);
+           if (cstate->copy_dest != COPY_OLD_FE &&
+               CopyGetData(cstate, &dummy, 1, 1) > 0)
+               ereport(ERROR,
+                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                        errmsg("received copy data after EOF marker")));
+           return false;
+       }
 
-   ExecCloseIndices(resultRelInfo);
+       if (fld_count != attr_count)
+           ereport(ERROR,
+                   (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                    errmsg("row field count is %d, expected %d",
+                           (int) fld_count, attr_count)));
 
-   FreeExecutorState(estate);
+       if (file_has_oids)
+       {
+           Oid     loaded_oid;
+
+           cstate->cur_attname = "oid";
+           loaded_oid =
+               DatumGetObjectId(CopyReadBinaryAttribute(cstate,
+                                                        0,
+                                                        &cstate->oid_in_function,
+                                                        cstate->oid_typioparam,
+                                                        -1,
+                                                        &isnull));
+           if (isnull || loaded_oid == InvalidOid)
+               ereport(ERROR,
+                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                        errmsg("invalid OID in COPY data")));
+           cstate->cur_attname = NULL;
+           if (cstate->oids && tupleOid != NULL)
+               *tupleOid = loaded_oid;
+       }
 
-   if (!pipe)
-   {
-       if (FreeFile(cstate->copy_file))
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not close file \"%s\": %m",
-                           cstate->filename)));
+       i = 0;
+       foreach(cur, cstate->attnumlist)
+       {
+           int         attnum = lfirst_int(cur);
+           int         m = attnum - 1;
+
+           cstate->cur_attname = NameStr(attr[m]->attname);
+           i++;
+           values[m] = CopyReadBinaryAttribute(cstate,
+                                               i,
+                                               &in_functions[m],
+                                               typioparams[m],
+                                               attr[m]->atttypmod,
+                                               &nulls[m]);
+           cstate->cur_attname = NULL;
+       }
    }
 
    /*
-    * If we skipped writing WAL, then we need to sync the heap (but not
-    * indexes since those use WAL anyway)
+    * Now compute and insert any defaults available for the columns not
+    * provided by the input data.  Anything not processed here or above
+    * will remain NULL.
     */
-   if (hi_options & HEAP_INSERT_SKIP_WAL)
-       heap_sync(cstate->rel);
+   for (i = 0; i < num_defaults; i++)
+   {
+       /*
+        * The caller must supply econtext and have switched into the
+        * per-tuple memory context in it.
+        */
+       Assert(econtext != NULL);
+       Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+       values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
+                                        &nulls[defmap[i]], NULL);
+   }
+
+   return true;
 }
 
+/*
+ * Clean up storage and release resources for COPY FROM.
+ */
+void
+EndCopyFrom(CopyState cstate)
+{
+   /* No COPY FROM related resources except memory. */
+
+   EndCopy(cstate);
+}
 
 /*
  * Read the next input line and stash it in line_buf, with conversion to
@@ -3537,6 +3762,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
    /* And send the data */
    CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
+   myState->processed++;
 }
 
 /*
index 9e2bbe8d8e7dd2e3e2b0e62db5f7b0ad33484ca1..afe4b5e4501d8191e0578bd8f474b65268f6919f 100644 (file)
 #ifndef COPY_H
 #define COPY_H
 
+#include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "tcop/dest.h"
 
 
+typedef struct CopyStateData  *CopyState;
+
 extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
 
+extern CopyState BeginCopyFrom(Relation rel, const char *filename,
+                              List *attnamelist, List *options);
+extern void EndCopyFrom(CopyState cstate);
+extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
+                        Datum *values, bool *nulls, Oid *tupleOid);
+extern bool NextCopyFromRawFields(CopyState cstate,
+                                 char ***fields, int *nfields);
+extern void CopyFromErrorCallback(void *arg);
+
 extern DestReceiver *CreateCopyDestReceiver(void);
 
 #endif   /* COPY_H */