FILE *copy_file; /* used if copy_dest == COPY_FILE */
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
* dest == COPY_NEW_FE in COPY FROM */
- bool fe_copy; /* true for all FE copy dests */
bool fe_eof; /* true if detected end of copy data */
EolType eol_type; /* EOL type of input */
int client_encoding; /* remote side's character encoding */
bool need_transcoding; /* client encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
- uint64 processed; /* # of tuples processed */
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
bool *force_quote_flags; /* per-column CSV FQ flags */
bool *force_notnull_flags; /* per-column CSV FNN flags */
- /* these are just for error messages, see copy_in_error_callback */
+ /* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
int cur_lineno; /* line number for error messages */
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
+ /*
+ * Working state for COPY TO/FROM
+ */
+ MemoryContext copycontext; /* per-copy execution context */
+
/*
* Working state for COPY TO
*/
FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
+ /*
+ * Working state for COPY FROM
+ */
+ AttrNumber num_defaults;
+ bool file_has_oids;
+ FmgrInfo oid_in_function;
+ Oid oid_typioparam;
+ FmgrInfo *in_functions; /* array of input functions for each attrs */
+ Oid *typioparams; /* array of element types for in_functions */
+ int *defmap; /* array of default att numbers */
+ ExprState **defexprs; /* array of default att expressions */
+
/*
* These variables are used to reduce overhead in textual COPY FROM.
*
int raw_buf_len; /* total # of bytes stored */
} CopyStateData;
-typedef CopyStateData *CopyState;
-
/* DestReceiver for COPY (SELECT) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyState cstate; /* CopyStateData for the command */
+ uint64 processed; /* # of tuples processed */
} DR_copy;
/* non-export function prototypes */
-static void DoCopyTo(CopyState cstate);
-static void CopyTo(CopyState cstate);
+static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
+ const char *queryString, List *attnamelist, List *options);
+static void EndCopy(CopyState cstate);
+static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
+ const char *filename, List *attnamelist, List *options);
+static void EndCopyTo(CopyState cstate);
+static uint64 DoCopyTo(CopyState cstate);
+static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
-static void CopyFrom(CopyState cstate);
+static uint64 CopyFrom(CopyState cstate);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
* input/output stream. The latter could be either stdin/stdout or a
* socket, depending on whether we're running under Postmaster control.
*
+ * Do not allow a Postgres user without superuser privilege to read from
+ * or write to a file.
+ *
+ * Do not allow the copy if user doesn't have proper permission to access
+ * the table or the specifically requested columns.
+ */
+uint64
+DoCopy(const CopyStmt *stmt, const char *queryString)
+{
+ CopyState cstate;
+ bool is_from = stmt->is_from;
+ bool pipe = (stmt->filename == NULL);
+ Relation rel;
+ uint64 processed;
+
+ /* Disallow file COPY except to superusers. */
+ if (!pipe && !superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to COPY to or from a file"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+
+ if (stmt->relation)
+ {
+ TupleDesc tupDesc;
+ AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
+ RangeTblEntry *rte;
+ List *attnums;
+ ListCell *cur;
+
+ Assert(!stmt->query);
+
+ /* Open and lock the relation, using the appropriate lock type. */
+ rel = heap_openrv(stmt->relation,
+ (is_from ? RowExclusiveLock : AccessShareLock));
+
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel);
+ rte->requiredPerms = required_access;
+
+ tupDesc = RelationGetDescr(rel);
+ attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
+ foreach (cur, attnums)
+ {
+ int attno = lfirst_int(cur) -
+ FirstLowInvalidHeapAttributeNumber;
+
+ if (is_from)
+ rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
+ else
+ rte->selectedCols = bms_add_member(rte->selectedCols, attno);
+ }
+ ExecCheckRTPerms(list_make1(rte), true);
+ }
+ else
+ {
+ Assert(stmt->query);
+
+ rel = NULL;
+ }
+
+ if (is_from)
+ {
+ /* check read-only transaction */
+ if (XactReadOnly && rel->rd_backend != MyBackendId)
+ PreventCommandIfReadOnly("COPY FROM");
+
+ cstate = BeginCopyFrom(rel, stmt->filename,
+ stmt->attlist, stmt->options);
+ processed = CopyFrom(cstate); /* copy from file to database */
+ EndCopyFrom(cstate);
+ }
+ else
+ {
+ cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
+ stmt->attlist, stmt->options);
+ processed = DoCopyTo(cstate); /* copy from database to file */
+ EndCopyTo(cstate);
+ }
+
+ /*
+ * Close the relation. If reading, we can release the AccessShareLock we
+ * got; if writing, we should hold the lock until end of transaction to
+ * ensure that updates will be committed before lock is released.
+ */
+ if (rel != NULL)
+ heap_close(rel, (is_from ? NoLock : AccessShareLock));
+
+ return processed;
+}
+
+/*
+ * Common setup routines used by BeginCopyFrom and BeginCopyTo.
+ *
* Iff , unload or reload in the binary format, as opposed to the
* more wasteful but more robust and portable text format.
*
*
* If in the text format, delimit columns with delimiter and print
* NULL values as .
- *
- * Do not allow a Postgres user without superuser privilege to read from
- * or write to a file.
- *
- * Do not allow the copy if user doesn't have proper permission to access
- * the table or the specifically requested columns.
*/
-uint64
-DoCopy(const CopyStmt *stmt, const char *queryString)
+static CopyState
+BeginCopy(bool is_from,
+ Relation rel,
+ Node *raw_query,
+ const char *queryString,
+ List *attnamelist,
+ List *options)
{
CopyState cstate;
- bool is_from = stmt->is_from;
- bool pipe = (stmt->filename == NULL);
- List *attnamelist = stmt->attlist;
List *force_quote = NIL;
List *force_notnull = NIL;
bool force_quote_all = false;
bool format_specified = false;
- AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
ListCell *option;
TupleDesc tupDesc;
int num_phys_attrs;
- uint64 processed;
+ MemoryContext oldcontext;
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
+ /*
+ * We allocate everything used by a cstate in a new memory context.
+ * This would avoid memory leaks repeated uses of COPY in a query.
+ */
+ cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
+ "COPY",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
/* Extract options from the statement node tree */
- foreach(option, stmt->options)
+ foreach(option, options)
{
DefElem *defel = (DefElem *) lfirst(option);
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
- /* Disallow file COPY except to superusers. */
- if (!pipe && !superuser())
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("must be superuser to COPY to or from a file"),
- errhint("Anyone can COPY to stdout or from stdin. "
- "psql's \\copy command also works for anyone.")));
-
- if (stmt->relation)
+ if (rel)
{
- RangeTblEntry *rte;
- List *attnums;
- ListCell *cur;
-
- Assert(!stmt->query);
- cstate->queryDesc = NULL;
+ Assert(!raw_query);
- /* Open and lock the relation, using the appropriate lock type. */
- cstate->rel = heap_openrv(stmt->relation,
- (is_from ? RowExclusiveLock : AccessShareLock));
+ cstate->rel = rel;
tupDesc = RelationGetDescr(cstate->rel);
- /* Check relation permissions. */
- rte = makeNode(RangeTblEntry);
- rte->rtekind = RTE_RELATION;
- rte->relid = RelationGetRelid(cstate->rel);
- rte->requiredPerms = required_access;
-
- attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
- foreach (cur, attnums)
- {
- int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
-
- if (is_from)
- rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
- else
- rte->selectedCols = bms_add_member(rte->selectedCols, attno);
- }
- ExecCheckRTPerms(list_make1(rte), true);
-
- /* check read-only transaction */
- if (XactReadOnly && is_from && cstate->rel->rd_backend != MyBackendId)
- PreventCommandIfReadOnly("COPY FROM");
-
/* Don't allow COPY w/ OIDs to or from a table without them */
if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
ereport(ERROR,
* function and is executed repeatedly. (See also the same hack in
* DECLARE CURSOR and PREPARE.) XXX FIXME someday.
*/
- rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
+ rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
queryString, NULL, 0);
/* We don't expect more or less than one result query */
}
}
- /* Set up variables to avoid per-attribute overhead. */
- initStringInfo(&cstate->attribute_buf);
- initStringInfo(&cstate->line_buf);
- cstate->line_buf_converted = false;
- cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
- cstate->raw_buf_index = cstate->raw_buf_len = 0;
- cstate->processed = 0;
-
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to validate
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
cstate->copy_dest = COPY_FILE; /* default */
- cstate->filename = stmt->filename;
- if (is_from)
- CopyFrom(cstate); /* copy from file to database */
- else
- DoCopyTo(cstate); /* copy from database to file */
+ MemoryContextSwitchTo(oldcontext);
- /*
- * Close the relation or query. If reading, we can release the
- * AccessShareLock we got; if writing, we should hold the lock until end
- * of transaction to ensure that updates will be committed before lock is
- * released.
- */
- if (cstate->rel)
- heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
- else
- {
- /* Close down the query and free resources. */
- ExecutorEnd(cstate->queryDesc);
- FreeQueryDesc(cstate->queryDesc);
- PopActiveSnapshot();
- }
+ return cstate;
+}
- /* Clean up storage (probably not really necessary) */
- processed = cstate->processed;
+/*
+ * Release resources allocated in a cstate for COPY TO/FROM.
+ */
+static void
+EndCopy(CopyState cstate)
+{
+ if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->filename)));
- pfree(cstate->attribute_buf.data);
- pfree(cstate->line_buf.data);
- pfree(cstate->raw_buf);
+ MemoryContextDelete(cstate->copycontext);
pfree(cstate);
-
- return processed;
}
-
/*
- * This intermediate routine exists mainly to localize the effects of setjmp
- * so we don't need to plaster a lot of variables with "volatile".
+ * Setup CopyState to read tuples from a table or a query for COPY TO.
*/
-static void
-DoCopyTo(CopyState cstate)
+static CopyState
+BeginCopyTo(Relation rel,
+ Node *query,
+ const char *queryString,
+ const char *filename,
+ List *attnamelist,
+ List *options)
{
- bool pipe = (cstate->filename == NULL);
+ CopyState cstate;
+ bool pipe = (filename == NULL);
+ MemoryContext oldcontext;
- if (cstate->rel)
+ if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
{
- if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
- {
- if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy from view \"%s\"",
- RelationGetRelationName(cstate->rel)),
- errhint("Try the COPY (SELECT ...) TO variant.")));
- else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy from foreign table \"%s\"",
- RelationGetRelationName(cstate->rel)),
- errhint("Try the COPY (SELECT ...) TO variant.")));
- else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy from sequence \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy from non-table relation \"%s\"",
- RelationGetRelationName(cstate->rel))));
- }
+ if (rel->rd_rel->relkind == RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from view \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
+ else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from foreign table \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
+ else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from sequence \"%s\"",
+ RelationGetRelationName(rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from non-table relation \"%s\"",
+ RelationGetRelationName(rel))));
}
+ cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
+ oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
if (pipe)
{
- if (whereToSendOutput == DestRemote)
- cstate->fe_copy = true;
- else
+ if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
}
else
* Prevent write to relative path ... too easy to shoot oneself in the
* foot by overwriting a database file ...
*/
- if (!is_absolute_path(cstate->filename))
+ if (!is_absolute_path(filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));
+ cstate->filename = pstrdup(filename);
oumask = umask(S_IWGRP | S_IWOTH);
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
umask(oumask);
errmsg("\"%s\" is a directory", cstate->filename)));
}
+ MemoryContextSwitchTo(oldcontext);
+
+ return cstate;
+}
+
+/*
+ * This intermediate routine exists mainly to localize the effects of setjmp
+ * so we don't need to plaster a lot of variables with "volatile".
+ */
+static uint64
+DoCopyTo(CopyState cstate)
+{
+ bool pipe = (cstate->filename == NULL);
+ bool fe_copy = (pipe && whereToSendOutput == DestRemote);
+ uint64 processed;
+
PG_TRY();
{
- if (cstate->fe_copy)
+ if (fe_copy)
SendCopyBegin(cstate);
- CopyTo(cstate);
+ processed = CopyTo(cstate);
- if (cstate->fe_copy)
+ if (fe_copy)
SendCopyEnd(cstate);
}
PG_CATCH();
}
PG_END_TRY();
- if (!pipe)
+ return processed;
+}
+
+/*
+ * Clean up storage and release resources for COPY TO.
+ */
+static void
+EndCopyTo(CopyState cstate)
+{
+ if (cstate->queryDesc != NULL)
{
- if (FreeFile(cstate->copy_file))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close file \"%s\": %m",
- cstate->filename)));
+ /* Close down the query and free resources. */
+ ExecutorEnd(cstate->queryDesc);
+ FreeQueryDesc(cstate->queryDesc);
+ PopActiveSnapshot();
}
+
+ /* Clean up storage */
+ EndCopy(cstate);
}
/*
* Copy from relation or query TO file.
*/
-static void
+static uint64
CopyTo(CopyState cstate)
{
TupleDesc tupDesc;
int num_phys_attrs;
Form_pg_attribute *attr;
ListCell *cur;
+ uint64 processed;
if (cstate->rel)
tupDesc = RelationGetDescr(cstate->rel);
scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+ processed = 0;
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
{
CHECK_FOR_INTERRUPTS();
/* Format and send the data */
CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
+ processed++;
}
heap_endscan(scandesc);
+
+ pfree(values);
+ pfree(nulls);
}
else
{
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+ processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
if (cstate->binary)
}
MemoryContextDelete(cstate->rowcontext);
+
+ return processed;
}
/*
CopySendEndOfRow(cstate);
MemoryContextSwitchTo(oldcontext);
-
- cstate->processed++;
}
/*
* error context callback for COPY FROM
+ *
+ * The argument for the error context must be CopyState.
*/
-static void
-copy_in_error_callback(void *arg)
+void
+CopyFromErrorCallback(void *arg)
{
CopyState cstate = (CopyState) arg;
/*
* Copy FROM file to relation.
*/
-static void
+static uint64
CopyFrom(CopyState cstate)
{
- bool pipe = (cstate->filename == NULL);
HeapTuple tuple;
TupleDesc tupDesc;
- Form_pg_attribute *attr;
- AttrNumber num_phys_attrs,
- attr_count,
- num_defaults;
- FmgrInfo *in_functions;
- FmgrInfo oid_in_function;
- Oid *typioparams;
- Oid oid_typioparam;
- int attnum;
- int i;
- Oid in_func_oid;
Datum *values;
bool *nulls;
- int nfields;
- char **field_strings;
- bool done = false;
- bool isnull;
ResultRelInfo *resultRelInfo;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
+ ExprContext *econtext;
TupleTableSlot *slot;
- bool file_has_oids;
- int *defmap;
- ExprState **defexprs; /* array of default att expressions */
- ExprContext *econtext; /* used for ExecEvalExpr for default atts */
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */
BulkInsertState bistate;
+ uint64 processed = 0;
Assert(cstate->rel);
RelationGetRelationName(cstate->rel))));
}
+ tupDesc = RelationGetDescr(cstate->rel);
+
/*----------
* Check to see if we can avoid writing WAL
*
hi_options |= HEAP_INSERT_SKIP_WAL;
}
- if (pipe)
- {
- if (whereToSendOutput == DestRemote)
- ReceiveCopyBegin(cstate);
- else
- cstate->copy_file = stdin;
- }
- else
- {
- struct stat st;
-
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
-
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for reading: %m",
- cstate->filename)));
-
- fstat(fileno(cstate->copy_file), &st);
- if (S_ISDIR(st.st_mode))
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
- }
-
- tupDesc = RelationGetDescr(cstate->rel);
- attr = tupDesc->attrs;
- num_phys_attrs = tupDesc->natts;
- attr_count = list_length(cstate->attnumlist);
- num_defaults = 0;
-
- /*
- * We need a ResultRelInfo so we can use the regular executor's
- * index-entry-making machinery. (There used to be a huge amount of code
- * here that basically duplicated execUtils.c ...)
- */
- resultRelInfo = makeNode(ResultRelInfo);
- resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
- resultRelInfo->ri_RelationDesc = cstate->rel;
- resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
- if (resultRelInfo->ri_TrigDesc)
+ /*
+ * We need a ResultRelInfo so we can use the regular executor's
+ * index-entry-making machinery. (There used to be a huge amount of code
+ * here that basically duplicated execUtils.c ...)
+ */
+ resultRelInfo = makeNode(ResultRelInfo);
+ resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
+ resultRelInfo->ri_RelationDesc = cstate->rel;
+ resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
+ if (resultRelInfo->ri_TrigDesc)
{
resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
slot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(slot, tupDesc);
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
+ /*
+ * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
+ * should do this for COPY, since it's not really an "INSERT" statement as
+ * such. However, executing these triggers maintains consistency with the
+ * EACH ROW triggers that we already fire on COPY.
+ */
+ ExecBSInsertTriggers(estate, resultRelInfo);
+
+ values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
+
+ bistate = GetBulkInsertState();
econtext = GetPerTupleExprContext(estate);
+ /* Set up callback to identify error line number */
+ errcontext.callback = CopyFromErrorCallback;
+ errcontext.arg = (void *) cstate;
+ errcontext.previous = error_context_stack;
+ error_context_stack = &errcontext;
+
+ for (;;)
+ {
+ bool skip_tuple;
+ Oid loaded_oid = InvalidOid;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Reset the per-tuple exprcontext */
+ ResetPerTupleExprContext(estate);
+
+ /* Switch into its memory context */
+ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
+ break;
+
+ /* And now we can form the input tuple. */
+ tuple = heap_form_tuple(tupDesc, values, nulls);
+
+ if (loaded_oid != InvalidOid)
+ HeapTupleSetOid(tuple, loaded_oid);
+
+ /* Triggers and stuff need to be invoked in query context. */
+ MemoryContextSwitchTo(oldcontext);
+
+ skip_tuple = false;
+
+ /* BEFORE ROW INSERT Triggers */
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+ {
+ HeapTuple newtuple;
+
+ newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
+
+ if (newtuple == NULL) /* "do nothing" */
+ skip_tuple = true;
+ else if (newtuple != tuple) /* modified by Trigger(s) */
+ {
+ heap_freetuple(tuple);
+ tuple = newtuple;
+ }
+ }
+
+ if (!skip_tuple)
+ {
+ List *recheckIndexes = NIL;
+
+ /* Place tuple in tuple slot */
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+ /* Check the constraints of the tuple */
+ if (cstate->rel->rd_att->constr)
+ ExecConstraints(resultRelInfo, slot, estate);
+
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ recheckIndexes);
+
+ list_free(recheckIndexes);
+
+ /*
+ * We count only tuples not suppressed by a BEFORE INSERT trigger;
+ * this is the same definition used by execMain.c for counting
+ * tuples inserted by an INSERT command.
+ */
+ processed++;
+ }
+ }
+
+ /* Done, clean up */
+ error_context_stack = errcontext.previous;
+
+ FreeBulkInsertState(bistate);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Execute AFTER STATEMENT insertion triggers */
+ ExecASInsertTriggers(estate, resultRelInfo);
+
+ /* Handle queued AFTER triggers */
+ AfterTriggerEndQuery(estate);
+
+ pfree(values);
+ pfree(nulls);
+
+ ExecResetTupleTable(estate->es_tupleTable, false);
+
+ ExecCloseIndices(resultRelInfo);
+
+ FreeExecutorState(estate);
+
+ /*
+ * If we skipped writing WAL, then we need to sync the heap (but not
+ * indexes since those use WAL anyway)
+ */
+ if (hi_options & HEAP_INSERT_SKIP_WAL)
+ heap_sync(cstate->rel);
+
+ return processed;
+}
+
+/*
+ * Setup to read tuples from a file for COPY FROM.
+ *
+ * 'rel': Used as a template for the tuples
+ * 'filename': Name of server-local file to read
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyState, to be passed to NextCopyFrom and related functions.
+ */
+CopyState
+BeginCopyFrom(Relation rel,
+ const char *filename,
+ List *attnamelist,
+ List *options)
+{
+ CopyState cstate;
+ bool pipe = (filename == NULL);
+ TupleDesc tupDesc;
+ Form_pg_attribute *attr;
+ AttrNumber num_phys_attrs,
+ num_defaults;
+ FmgrInfo *in_functions;
+ Oid *typioparams;
+ int attnum;
+ Oid in_func_oid;
+ int *defmap;
+ ExprState **defexprs;
+ MemoryContext oldcontext;
+
+ cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
+ oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
+ /* Initialize state variables */
+ cstate->fe_eof = false;
+ cstate->eol_type = EOL_UNKNOWN;
+ cstate->cur_relname = RelationGetRelationName(cstate->rel);
+ cstate->cur_lineno = 0;
+ cstate->cur_attname = NULL;
+ cstate->cur_attval = NULL;
+
+ /* Set up variables to avoid per-attribute overhead. */
+ initStringInfo(&cstate->attribute_buf);
+ initStringInfo(&cstate->line_buf);
+ cstate->line_buf_converted = false;
+ cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+ cstate->raw_buf_index = cstate->raw_buf_len = 0;
+
+ tupDesc = RelationGetDescr(cstate->rel);
+ attr = tupDesc->attrs;
+ num_phys_attrs = tupDesc->natts;
+ num_defaults = 0;
+
/*
* Pick up the required catalog information for each attribute in the
* relation, including the input function, the element type (to pass to
if (defexpr != NULL)
{
- defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
- estate);
+ /* Initialize expressions in copycontext. */
+ defexprs[num_defaults] = ExecInitExpr(
+ expression_planner((Expr *) defexpr), NULL);
defmap[num_defaults] = attnum - 1;
num_defaults++;
}
}
}
- /* Prepare to catch AFTER triggers. */
- AfterTriggerBeginQuery();
+ /* We keep those variables in cstate. */
+ cstate->in_functions = in_functions;
+ cstate->typioparams = typioparams;
+ cstate->defmap = defmap;
+ cstate->defexprs = defexprs;
+ cstate->num_defaults = num_defaults;
- /*
- * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
- * should do this for COPY, since it's not really an "INSERT" statement as
- * such. However, executing these triggers maintains consistency with the
- * EACH ROW triggers that we already fire on COPY.
- */
- ExecBSInsertTriggers(estate, resultRelInfo);
+ if (pipe)
+ {
+ if (whereToSendOutput == DestRemote)
+ ReceiveCopyBegin(cstate);
+ else
+ cstate->copy_file = stdin;
+ }
+ else
+ {
+ struct stat st;
+
+ cstate->filename = pstrdup(filename);
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for reading: %m",
+ cstate->filename)));
+
+ fstat(fileno(cstate->copy_file), &st);
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
if (!cstate->binary)
- file_has_oids = cstate->oids; /* must rely on user to tell us... */
+ {
+ /* must rely on user to tell us... */
+ cstate->file_has_oids = cstate->oids;
+ }
else
{
/* Read and verify binary header */
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (missing flags)")));
- file_has_oids = (tmp & (1 << 16)) != 0;
+ cstate->file_has_oids = (tmp & (1 << 16)) != 0;
tmp &= ~(1 << 16);
if ((tmp >> 16) != 0)
ereport(ERROR,
}
}
- if (file_has_oids && cstate->binary)
+ if (cstate->file_has_oids && cstate->binary)
{
getTypeBinaryInputInfo(OIDOID,
- &in_func_oid, &oid_typioparam);
- fmgr_info(in_func_oid, &oid_in_function);
+ &in_func_oid, &cstate->oid_typioparam);
+ fmgr_info(in_func_oid, &cstate->oid_in_function);
}
- values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
- nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
-
/* create workspace for CopyReadAttributes results */
- nfields = file_has_oids ? (attr_count + 1) : attr_count;
- if (! cstate->binary)
+ if (!cstate->binary)
{
+ AttrNumber attr_count = list_length(cstate->attnumlist);
+ int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
+
cstate->max_fields = nfields;
cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
}
- /* Initialize state variables */
- cstate->fe_eof = false;
- cstate->eol_type = EOL_UNKNOWN;
- cstate->cur_relname = RelationGetRelationName(cstate->rel);
- cstate->cur_lineno = 0;
- cstate->cur_attname = NULL;
- cstate->cur_attval = NULL;
+ MemoryContextSwitchTo(oldcontext);
- bistate = GetBulkInsertState();
+ return cstate;
+}
- /* Set up callback to identify error line number */
- errcontext.callback = copy_in_error_callback;
- errcontext.arg = (void *) cstate;
- errcontext.previous = error_context_stack;
- error_context_stack = &errcontext;
+/*
+ * Read raw fields in the next line for COPY FROM in text or csv mode.
+ * Return false if no more lines.
+ *
+ * An internal temporary buffer is returned via 'fields'. It is valid until
+ * the next call of the function. Since the function returns all raw fields
+ * in the input file, 'nfields' could be different from the number of columns
+ * in the relation.
+ *
+ * NOTE: force_not_null option are not applied to the returned fields.
+ */
+bool
+NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
+{
+ int fldct;
+ bool done;
+
+ /* only available for text or csv input */
+ Assert(!cstate->binary);
/* on input just throw the header line away */
- if (cstate->header_line)
+ if (cstate->cur_lineno == 0 && cstate->header_line)
{
cstate->cur_lineno++;
- done = CopyReadLine(cstate);
+ if (CopyReadLine(cstate))
+ return false; /* done */
}
- while (!done)
- {
- bool skip_tuple;
- Oid loaded_oid = InvalidOid;
+ cstate->cur_lineno++;
- CHECK_FOR_INTERRUPTS();
+ /* Actually read the line into memory here */
+ done = CopyReadLine(cstate);
- cstate->cur_lineno++;
+ /*
+ * EOF at start of line means we're done. If we see EOF after
+ * some characters, we act as though it was newline followed by
+ * EOF, ie, process the line and then exit loop on next iteration.
+ */
+ if (done && cstate->line_buf.len == 0)
+ return false;
- /* Reset the per-tuple exprcontext */
- ResetPerTupleExprContext(estate);
+ /* Parse the line into de-escaped field values */
+ if (cstate->csv_mode)
+ fldct = CopyReadAttributesCSV(cstate);
+ else
+ fldct = CopyReadAttributesText(cstate);
- /* Switch into its memory context */
- MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ *fields = cstate->raw_fields;
+ *nfields = fldct;
+ return true;
+}
- /* Initialize all values for row to NULL */
- MemSet(values, 0, num_phys_attrs * sizeof(Datum));
- MemSet(nulls, true, num_phys_attrs * sizeof(bool));
+/*
+ * Read next tuple from file for COPY FROM. Return false if no more tuples.
+ *
+ * 'econtext' is used to evaluate default expression for each columns not
+ * read from the file. It can be NULL when no default values are used, i.e.
+ * when all columns are read from the file.
+ *
+ * 'values' and 'nulls' arrays must be the same length as columns of the
+ * relation passed to BeginCopyFrom. This function fills the arrays.
+ * Oid of the tuple is returned with 'tupleOid' separately.
+ */
+bool
+NextCopyFrom(CopyState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls, Oid *tupleOid)
+{
+ TupleDesc tupDesc;
+ Form_pg_attribute *attr;
+ AttrNumber num_phys_attrs,
+ attr_count,
+ num_defaults = cstate->num_defaults;
+ FmgrInfo *in_functions = cstate->in_functions;
+ Oid *typioparams = cstate->typioparams;
+ int i;
+ int nfields;
+ bool isnull;
+ bool file_has_oids = cstate->file_has_oids;
+ int *defmap = cstate->defmap;
+ ExprState **defexprs = cstate->defexprs;
- if (!cstate->binary)
- {
- ListCell *cur;
- int fldct;
- int fieldno;
- char *string;
+ tupDesc = RelationGetDescr(cstate->rel);
+ attr = tupDesc->attrs;
+ num_phys_attrs = tupDesc->natts;
+ attr_count = list_length(cstate->attnumlist);
+ nfields = file_has_oids ? (attr_count + 1) : attr_count;
- /* Actually read the line into memory here */
- done = CopyReadLine(cstate);
+ /* Initialize all values for row to NULL */
+ MemSet(values, 0, num_phys_attrs * sizeof(Datum));
+ MemSet(nulls, true, num_phys_attrs * sizeof(bool));
- /*
- * EOF at start of line means we're done. If we see EOF after
- * some characters, we act as though it was newline followed by
- * EOF, ie, process the line and then exit loop on next iteration.
- */
- if (done && cstate->line_buf.len == 0)
- break;
+ if (!cstate->binary)
+ {
+ char **field_strings;
+ ListCell *cur;
+ int fldct;
+ int fieldno;
+ char *string;
- /* Parse the line into de-escaped field values */
- if (cstate->csv_mode)
- fldct = CopyReadAttributesCSV(cstate);
- else
- fldct = CopyReadAttributesText(cstate);
+ /* read raw fields in the next line */
+ if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+ return false;
+
+ /* check for overflowing fields */
+ if (nfields > 0 && fldct > nfields)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("extra data after last expected column")));
- /* check for overflowing fields */
- if (nfields > 0 && fldct > nfields)
+ fieldno = 0;
+
+ /* Read the OID field if present */
+ if (file_has_oids)
+ {
+ if (fieldno >= fldct)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("extra data after last expected column")));
+ errmsg("missing data for OID column")));
+ string = field_strings[fieldno++];
- fieldno = 0;
- field_strings = cstate->raw_fields;
-
- /* Read the OID field if present */
- if (file_has_oids)
+ if (string == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("null OID in COPY data")));
+ else if (cstate->oids && tupleOid != NULL)
{
- if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for OID column")));
- string = field_strings[fieldno++];
-
- if (string == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("null OID in COPY data")));
- else
- {
- cstate->cur_attname = "oid";
- cstate->cur_attval = string;
- loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
+ cstate->cur_attname = "oid";
+ cstate->cur_attval = string;
+ *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
CStringGetDatum(string)));
- if (loaded_oid == InvalidOid)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("invalid OID in COPY data")));
- cstate->cur_attname = NULL;
- cstate->cur_attval = NULL;
- }
- }
-
- /* Loop to read the user attributes on the line. */
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- int m = attnum - 1;
-
- if (fieldno >= fldct)
+ if (*tupleOid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for column \"%s\"",
- NameStr(attr[m]->attname))));
- string = field_strings[fieldno++];
-
- if (cstate->csv_mode && string == NULL &&
- cstate->force_notnull_flags[m])
- {
- /* Go ahead and read the NULL string */
- string = cstate->null_print;
- }
-
- cstate->cur_attname = NameStr(attr[m]->attname);
- cstate->cur_attval = string;
- values[m] = InputFunctionCall(&in_functions[m],
- string,
- typioparams[m],
- attr[m]->atttypmod);
- if (string != NULL)
- nulls[m] = false;
+ errmsg("invalid OID in COPY data")));
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
}
-
- Assert(fieldno == nfields);
}
- else
- {
- /* binary */
- int16 fld_count;
- ListCell *cur;
- if (!CopyGetInt16(cstate, &fld_count))
- {
- /* EOF detected (end of file, or protocol-level EOF) */
- done = true;
- break;
- }
-
- if (fld_count == -1)
- {
- /*
- * Received EOF marker. In a V3-protocol copy, wait for
- * the protocol-level EOF, and complain if it doesn't come
- * immediately. This ensures that we correctly handle
- * CopyFail, if client chooses to send that now.
- *
- * Note that we MUST NOT try to read more data in an
- * old-protocol copy, since there is no protocol-level EOF
- * marker then. We could go either way for copy from file,
- * but choose to throw error if there's data after the EOF
- * marker, for consistency with the new-protocol case.
- */
- char dummy;
-
- if (cstate->copy_dest != COPY_OLD_FE &&
- CopyGetData(cstate, &dummy, 1, 1) > 0)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("received copy data after EOF marker")));
- done = true;
- break;
- }
+ /* Loop to read the user attributes on the line. */
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ int m = attnum - 1;
- if (fld_count != attr_count)
+ if (fieldno >= fldct)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
-
- if (file_has_oids)
- {
- cstate->cur_attname = "oid";
- loaded_oid =
- DatumGetObjectId(CopyReadBinaryAttribute(cstate,
- 0,
- &oid_in_function,
- oid_typioparam,
- -1,
- &isnull));
- if (isnull || loaded_oid == InvalidOid)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("invalid OID in COPY data")));
- cstate->cur_attname = NULL;
- }
+ errmsg("missing data for column \"%s\"",
+ NameStr(attr[m]->attname))));
+ string = field_strings[fieldno++];
- i = 0;
- foreach(cur, cstate->attnumlist)
+ if (cstate->csv_mode && string == NULL &&
+ cstate->force_notnull_flags[m])
{
- int attnum = lfirst_int(cur);
- int m = attnum - 1;
-
- cstate->cur_attname = NameStr(attr[m]->attname);
- i++;
- values[m] = CopyReadBinaryAttribute(cstate,
- i,
- &in_functions[m],
- typioparams[m],
- attr[m]->atttypmod,
- &nulls[m]);
- cstate->cur_attname = NULL;
+ /* Go ahead and read the NULL string */
+ string = cstate->null_print;
}
- }
- /*
- * Now compute and insert any defaults available for the columns not
- * provided by the input data. Anything not processed here or above
- * will remain NULL.
- */
- for (i = 0; i < num_defaults; i++)
- {
- values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
- &nulls[defmap[i]], NULL);
+ cstate->cur_attname = NameStr(attr[m]->attname);
+ cstate->cur_attval = string;
+ values[m] = InputFunctionCall(&in_functions[m],
+ string,
+ typioparams[m],
+ attr[m]->atttypmod);
+ if (string != NULL)
+ nulls[m] = false;
+ cstate->cur_attname = NULL;
+ cstate->cur_attval = NULL;
}
- /* And now we can form the input tuple. */
- tuple = heap_form_tuple(tupDesc, values, nulls);
-
- if (cstate->oids && file_has_oids)
- HeapTupleSetOid(tuple, loaded_oid);
-
- /* Triggers and stuff need to be invoked in query context. */
- MemoryContextSwitchTo(oldcontext);
+ Assert(fieldno == nfields);
+ }
+ else
+ {
+ /* binary */
+ int16 fld_count;
+ ListCell *cur;
- skip_tuple = false;
+ cstate->cur_lineno++;
- /* BEFORE ROW INSERT Triggers */
- if (resultRelInfo->ri_TrigDesc &&
- resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+ if (!CopyGetInt16(cstate, &fld_count))
{
- HeapTuple newtuple;
-
- newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
-
- if (newtuple == NULL) /* "do nothing" */
- skip_tuple = true;
- else if (newtuple != tuple) /* modified by Trigger(s) */
- {
- heap_freetuple(tuple);
- tuple = newtuple;
- }
+ /* EOF detected (end of file, or protocol-level EOF) */
+ return false;
}
- if (!skip_tuple)
+ if (fld_count == -1)
{
- List *recheckIndexes = NIL;
-
- /* Place tuple in tuple slot */
- ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-
- /* Check the constraints of the tuple */
- if (cstate->rel->rd_att->constr)
- ExecConstraints(resultRelInfo, slot, estate);
-
- /* OK, store the tuple and create index entries for it */
- heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
-
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
- estate);
-
- /* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(estate, resultRelInfo, tuple,
- recheckIndexes);
-
- list_free(recheckIndexes);
-
/*
- * We count only tuples not suppressed by a BEFORE INSERT trigger;
- * this is the same definition used by execMain.c for counting
- * tuples inserted by an INSERT command.
+ * Received EOF marker. In a V3-protocol copy, wait for
+ * the protocol-level EOF, and complain if it doesn't come
+ * immediately. This ensures that we correctly handle
+ * CopyFail, if client chooses to send that now.
+ *
+ * Note that we MUST NOT try to read more data in an
+ * old-protocol copy, since there is no protocol-level EOF
+ * marker then. We could go either way for copy from file,
+ * but choose to throw error if there's data after the EOF
+ * marker, for consistency with the new-protocol case.
*/
- cstate->processed++;
- }
- }
-
- /* Done, clean up */
- error_context_stack = errcontext.previous;
-
- FreeBulkInsertState(bistate);
-
- MemoryContextSwitchTo(oldcontext);
-
- /* Execute AFTER STATEMENT insertion triggers */
- ExecASInsertTriggers(estate, resultRelInfo);
-
- /* Handle queued AFTER triggers */
- AfterTriggerEndQuery(estate);
-
- pfree(values);
- pfree(nulls);
- if (! cstate->binary)
- pfree(cstate->raw_fields);
+ char dummy;
- pfree(in_functions);
- pfree(typioparams);
- pfree(defmap);
- pfree(defexprs);
-
- ExecResetTupleTable(estate->es_tupleTable, false);
+ if (cstate->copy_dest != COPY_OLD_FE &&
+ CopyGetData(cstate, &dummy, 1, 1) > 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("received copy data after EOF marker")));
+ return false;
+ }
- ExecCloseIndices(resultRelInfo);
+ if (fld_count != attr_count)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
- FreeExecutorState(estate);
+ if (file_has_oids)
+ {
+ Oid loaded_oid;
+
+ cstate->cur_attname = "oid";
+ loaded_oid =
+ DatumGetObjectId(CopyReadBinaryAttribute(cstate,
+ 0,
+ &cstate->oid_in_function,
+ cstate->oid_typioparam,
+ -1,
+ &isnull));
+ if (isnull || loaded_oid == InvalidOid)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("invalid OID in COPY data")));
+ cstate->cur_attname = NULL;
+ if (cstate->oids && tupleOid != NULL)
+ *tupleOid = loaded_oid;
+ }
- if (!pipe)
- {
- if (FreeFile(cstate->copy_file))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close file \"%s\": %m",
- cstate->filename)));
+ i = 0;
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ int m = attnum - 1;
+
+ cstate->cur_attname = NameStr(attr[m]->attname);
+ i++;
+ values[m] = CopyReadBinaryAttribute(cstate,
+ i,
+ &in_functions[m],
+ typioparams[m],
+ attr[m]->atttypmod,
+ &nulls[m]);
+ cstate->cur_attname = NULL;
+ }
}
/*
- * If we skipped writing WAL, then we need to sync the heap (but not
- * indexes since those use WAL anyway)
+ * Now compute and insert any defaults available for the columns not
+ * provided by the input data. Anything not processed here or above
+ * will remain NULL.
*/
- if (hi_options & HEAP_INSERT_SKIP_WAL)
- heap_sync(cstate->rel);
+ for (i = 0; i < num_defaults; i++)
+ {
+ /*
+ * The caller must supply econtext and have switched into the
+ * per-tuple memory context in it.
+ */
+ Assert(econtext != NULL);
+ Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+ values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
+ &nulls[defmap[i]], NULL);
+ }
+
+ return true;
}
+/*
+ * Clean up storage and release resources for COPY FROM.
+ */
+void
+EndCopyFrom(CopyState cstate)
+{
+ /* No COPY FROM related resources except memory. */
+
+ EndCopy(cstate);
+}
/*
* Read the next input line and stash it in line_buf, with conversion to
/* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
+ myState->processed++;
}
/*