Add support for piping COPY to/from an external program.
authorHeikki Linnakangas
Wed, 27 Feb 2013 16:17:21 +0000 (18:17 +0200)
committerHeikki Linnakangas
Wed, 27 Feb 2013 16:22:31 +0000 (18:22 +0200)
This includes backend "COPY TO/FROM PROGRAM '...'" syntax, and corresponding
psql \copy syntax. Like with reading/writing files, the backend version is
superuser-only, and in the psql version, the program is run in the client.

In the passing, the psql \copy STDIN/STDOUT syntax is subtly changed: if you
the stdin/stdout is quoted, it's now interpreted as a filename. For example,
"\copy foo from 'stdin'" now reads from a file called 'stdin', not from
standard input. Before this, there was no way to specify a filename called
stdin, stdout, pstdin or pstdout.

This creates a new function in pgport, wait_result_to_str(), which can
be used to convert the exit status of a process, as returned by wait(3),
to a human-readable string.

Etsuro Fujita, reviewed by Amit Kapila.

21 files changed:
contrib/file_fdw/file_fdw.c
doc/src/sgml/keywords.sgml
doc/src/sgml/ref/copy.sgml
doc/src/sgml/ref/psql-ref.sgml
src/backend/commands/copy.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/parser/gram.y
src/backend/storage/file/fd.c
src/bin/psql/copy.c
src/bin/psql/stringutils.c
src/bin/psql/stringutils.h
src/include/commands/copy.h
src/include/nodes/parsenodes.h
src/include/parser/kwlist.h
src/include/port.h
src/include/storage/fd.h
src/interfaces/ecpg/preproc/ecpg.addons
src/port/Makefile
src/port/exec.c
src/port/wait_error.c [new file with mode: 0644]

index d644a46ea7dd8ba2560fdac4c2871bafbd592a33..d1cca1ec3ed66b091a95ec6cd258c0a9e8bd70bb 100644 (file)
@@ -588,6 +588,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
     */
    cstate = BeginCopyFrom(node->ss.ss_currentRelation,
                           filename,
+                          false,
                           NIL,
                           options);
 
@@ -660,6 +661,7 @@ fileReScanForeignScan(ForeignScanState *node)
 
    festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
                                    festate->filename,
+                                   false,
                                    NIL,
                                    festate->options);
 }
@@ -993,7 +995,7 @@ file_acquire_sample_rows(Relation onerel, int elevel,
    /*
     * Create CopyState from FDW options.
     */
-   cstate = BeginCopyFrom(onerel, filename, NIL, options);
+   cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
 
    /*
     * Use per-tuple memory context to prevent leak of memory used to read
index 0e7b32285145f8c16aafbed481341cfb59bf6008..576fd65f3160bf2f493acd0147e9b50c88ce6403 100644 (file)
     reserved
     reserved
    
+   
+    PROGRAM
+    non-reserved
+    
+    
+    
+   
    
     PUBLIC
     
index 2137c67cb4b542344017b08c26cf4a36d894a1c1..2854d9c0ca69e2157dd1377b972c7a7ac421abac 100644 (file)
@@ -23,11 +23,11 @@ PostgreSQL documentation
  
 
 COPY table_name [ ( column_name [, ...] ) ]
-    FROM { 'filename' | STDIN }
+    FROM { 'filename' | PROGRAM 'command' | STDIN }
     [ [ WITH ] ( option [, ...] ) ]
 
 COPY { table_name [ ( column_name [, ...] ) ] | ( query ) }
-    TO { 'filename' | STDOUT }
+    TO { 'filename' | PROGRAM 'command' | STDOUT }
     [ [ WITH ] ( option [, ...] ) ]
 
 where option can be one of:
@@ -72,6 +72,10 @@ COPY { table_name [ ( 
    PostgreSQL server to directly read from
    or write to a file. The file must be accessible to the server and
    the name must be specified from the viewpoint of the server. When
+   PROGRAM is specified, the server executes the
+   given command, and reads from its standard input, or writes to its
+   standard output. The command must be specified from the viewpoint of the
+   server, and be executable by the postgres user. When
    STDIN or STDOUT is
    specified, data is transmitted via the connection between the
    client and the server.
@@ -125,6 +129,25 @@ COPY { table_name [ ( 
     
    
 
+   
+    PROGRAM
+    
+     
+      A command to execute. In COPY FROM, the input is
+      read from standard output of the command, and in COPY TO,
+      the output is written to the standard input of the command.
+     
+     
+      Note that the command is invoked by the shell, so if you need to pass
+      any arguments to shell command that come from an untrusted source, you
+      must be careful to strip or escape any special characters that might
+      have a special meaning for the shell. For security reasons, it is best
+      to use a fixed command string, or at least avoid passing any user input
+      in it.
+     
+    
+   
+
    
     STDIN
     
@@ -367,9 +390,13 @@ COPY count
     they must reside on or be accessible to the database server machine,
     not the client. They must be accessible to and readable or writable
     by the PostgreSQL user (the user ID the
-    server runs as), not the client. COPY naming a
-    file is only allowed to database superusers, since it allows reading
-    or writing any file that the server has privileges to access.
+    server runs as), not the client. Similarly,
+    the command specified with PROGRAM is executed directly
+    by the server, not by the client application, must be executable by the
+    PostgreSQL user.
+    COPY naming a file or command is only allowed to
+    database superusers, since it allows reading or writing any file that the
+    server has privileges to access.
    
 
    
@@ -393,6 +420,11 @@ COPY count
     the cluster's data directory), not the client's working directory.
    
 
+   
+    Executing a command with PROGRAM might be restricted
+    by operating system's access control mechanisms, such as the SELinux.
+   
+
    
     COPY FROM will invoke any triggers and check
     constraints on the destination table. However, it will not invoke rules.
@@ -841,6 +873,14 @@ COPY (SELECT * FROM country WHERE country_name LIKE 'A%') TO '/usr1/proj/bray/sq
 
   
 
+  
+   To copy into a compressed file, you can pipe the output through an external
+   compression program:
+
+COPY country TO PROGRAM 'gzip > /usr1/proj/bray/sql/country_data.gz';
+
+  
+
   
    Here is a sample of data suitable for copying into a table from
    STDIN:
index 465d3a1882dd3dc1e6da27baae58233ba7628648..fb63845a2606b9cffb76df13f677a79f880b05db 100644 (file)
@@ -830,7 +830,7 @@ testdb=>
       
         \copy { table [ ( column_list ) ] | ( query ) }
         { from | to }
-        { filename | stdin | stdout | pstdin | pstdout }
+        { 'filename' | program 'command' | stdin | stdout | pstdin | pstdout }
         [ [ with ] ( option [, ...] ) ]
 
         
@@ -847,16 +847,14 @@ testdb=>
         
 
         
-        The syntax of the command is similar to that of the
-        SQL 
-        command, and
-        option
-        must indicate one of the options of the
-        SQL  command.
-        Note that, because of this,
-        special parsing rules apply to the \copy
-        command. In particular, the variable substitution rules and
-        backslash escapes do not apply.
+        When program is specified,
+        command is
+        executed by psql and the data from
+        or to command is
+        routed between the server and the client.
+        This means that the execution privileges are those of
+        the local user, not the server, and no SQL superuser
+        privileges are required.
         
 
         \copy ... from stdin | to stdout
@@ -870,6 +868,19 @@ testdb=>
         for populating tables in-line within a SQL script file.
         
 
+        
+        The syntax of the command is similar to that of the
+        SQL 
+        command, and
+        option
+        must indicate one of the options of the
+        SQL  command.
+        Note that, because of this,
+        special parsing rules apply to the \copy
+        command. In particular, the variable substitution rules and
+        backslash escapes do not apply.
+        
+
         
         
         This operation is not as efficient as the SQL
index 523c1e03315952c0abd41da625d0d6bdeb4b4ad3..c651ea302809a42c5c43e119577d1753725068bf 100644 (file)
@@ -58,7 +58,7 @@
  */
 typedef enum CopyDest
 {
-   COPY_FILE,                  /* to/from file */
+   COPY_FILE,                  /* to/from file (or a piped program) */
    COPY_OLD_FE,                /* to/from frontend (2.0 protocol) */
    COPY_NEW_FE                 /* to/from frontend (3.0 protocol) */
 } CopyDest;
@@ -108,6 +108,7 @@ typedef struct CopyStateData
    QueryDesc  *queryDesc;      /* executable query to copy from */
    List       *attnumlist;     /* integer list of attnums to copy */
    char       *filename;       /* filename, or NULL for STDIN/STDOUT */
+   bool        is_program;     /* is 'filename' a program to popen? */
    bool        binary;         /* binary format? */
    bool        oids;           /* include OIDs? */
    bool        freeze;         /* freeze rows on loading? */
@@ -277,8 +278,10 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
          const char *queryString, List *attnamelist, List *options);
 static void EndCopy(CopyState cstate);
+static void ClosePipeToProgram(CopyState cstate);
 static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
-           const char *filename, List *attnamelist, List *options);
+           const char *filename, bool is_program, List *attnamelist,
+           List *options);
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
@@ -482,9 +485,35 @@ CopySendEndOfRow(CopyState cstate)
            if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                       cstate->copy_file) != 1 ||
                ferror(cstate->copy_file))
-               ereport(ERROR,
-                       (errcode_for_file_access(),
-                        errmsg("could not write to COPY file: %m")));
+           {
+               if (cstate->is_program)
+               {
+                   if (errno == EPIPE)
+                   {
+                       /*
+                        * The pipe will be closed automatically on error at
+                        * the end of transaction, but we might get a better
+                        * error message from the subprocess' exit code than
+                        * just "Broken Pipe"
+                        */
+                       ClosePipeToProgram(cstate);
+
+                       /*
+                        * If ClosePipeToProgram() didn't throw an error,
+                        * the program terminated normally, but closed the
+                        * pipe first. Restore errno, and throw an error.
+                        */
+                       errno = EPIPE;
+                   }
+                   ereport(ERROR,
+                           (errcode_for_file_access(),
+                            errmsg("could not write to COPY program: %m")));
+               }
+               else
+                   ereport(ERROR,
+                           (errcode_for_file_access(),
+                            errmsg("could not write to COPY file: %m")));
+           }
            break;
        case COPY_OLD_FE:
            /* The FE/BE protocol uses \n as newline for all platforms */
@@ -752,13 +781,22 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
    Relation    rel;
    Oid         relid;
 
-   /* Disallow file COPY except to superusers. */
+   /* Disallow COPY to/from file or program except to superusers. */
    if (!pipe && !superuser())
-       ereport(ERROR,
-               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                errmsg("must be superuser to COPY to or from a file"),
-                errhint("Anyone can COPY to stdout or from stdin. "
-                        "psql's \\copy command also works for anyone.")));
+   {
+       if (stmt->is_program)
+           ereport(ERROR,
+                   (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+            errmsg("must be superuser to COPY to or from an external program"),
+                    errhint("Anyone can COPY to stdout or from stdin. "
+                            "psql's \\copy command also works for anyone.")));
+       else
+           ereport(ERROR,
+                   (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                    errmsg("must be superuser to COPY to or from a file"),
+                    errhint("Anyone can COPY to stdout or from stdin. "
+                            "psql's \\copy command also works for anyone.")));
+   }
 
    if (stmt->relation)
    {
@@ -812,14 +850,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
        if (XactReadOnly && !rel->rd_islocaltemp)
            PreventCommandIfReadOnly("COPY FROM");
 
-       cstate = BeginCopyFrom(rel, stmt->filename,
+       cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
                               stmt->attlist, stmt->options);
        *processed = CopyFrom(cstate);  /* copy from file to database */
        EndCopyFrom(cstate);
    }
    else
    {
-       cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
+       cstate = BeginCopyTo(rel, stmt->query, queryString,
+                            stmt->filename, stmt->is_program,
                             stmt->attlist, stmt->options);
        *processed = DoCopyTo(cstate);  /* copy from database to file */
        EndCopyTo(cstate);
@@ -1389,17 +1428,45 @@ BeginCopy(bool is_from,
    return cstate;
 }
 
+/*
+ * Closes the pipe to an external program, checking the pclose() return code.
+ */
+static void
+ClosePipeToProgram(CopyState cstate)
+{
+   int pclose_rc;
+
+   Assert(cstate->is_program);
+
+   pclose_rc = ClosePipeStream(cstate->copy_file);
+   if (pclose_rc == -1)
+       ereport(ERROR,
+               (errmsg("could not close pipe to external command: %m")));
+   else if (pclose_rc != 0)
+       ereport(ERROR,
+               (errmsg("program \"%s\" failed",
+                       cstate->filename),
+                errdetail_internal("%s", wait_result_to_str(pclose_rc))));
+}
+
 /*
  * Release resources allocated in a cstate for COPY TO/FROM.
  */
 static void
 EndCopy(CopyState cstate)
 {
-   if (cstate->filename != NULL && FreeFile(cstate->copy_file))
-       ereport(ERROR,
-               (errcode_for_file_access(),
-                errmsg("could not close file \"%s\": %m",
-                       cstate->filename)));
+   if (cstate->is_program)
+   {
+       ClosePipeToProgram(cstate);
+   }
+   else
+   {
+       if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+           ereport(ERROR,
+                   (errcode_for_file_access(),
+                    errmsg("could not close file \"%s\": %m",
+                           cstate->filename)));
+   }
 
    MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
@@ -1413,6 +1480,7 @@ BeginCopyTo(Relation rel,
            Node *query,
            const char *queryString,
            const char *filename,
+           bool  is_program,
            List *attnamelist,
            List *options)
 {
@@ -1451,39 +1519,52 @@ BeginCopyTo(Relation rel,
 
    if (pipe)
    {
+       Assert(!is_program);    /* the grammar does not allow this */
        if (whereToSendOutput != DestRemote)
            cstate->copy_file = stdout;
    }
    else
    {
-       mode_t      oumask;     /* Pre-existing umask value */
-       struct stat st;
+       cstate->filename = pstrdup(filename);
+       cstate->is_program = is_program;
 
-       /*
-        * Prevent write to relative path ... too easy to shoot oneself in the
-        * foot by overwriting a database file ...
-        */
-       if (!is_absolute_path(filename))
-           ereport(ERROR,
-                   (errcode(ERRCODE_INVALID_NAME),
-                    errmsg("relative path not allowed for COPY to file")));
+       if (is_program)
+       {
+           cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
+           if (cstate->copy_file == NULL)
+               ereport(ERROR,
+                       (errmsg("could not execute command \"%s\": %m",
+                               cstate->filename)));
+       }
+       else
+       {
+           mode_t      oumask;     /* Pre-existing umask value */
+           struct stat st;
 
-       cstate->filename = pstrdup(filename);
-       oumask = umask(S_IWGRP | S_IWOTH);
-       cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
-       umask(oumask);
+           /*
+            * Prevent write to relative path ... too easy to shoot oneself in
+            * the foot by overwriting a database file ...
+            */
+           if (!is_absolute_path(filename))
+               ereport(ERROR,
+                       (errcode(ERRCODE_INVALID_NAME),
+                        errmsg("relative path not allowed for COPY to file")));
 
-       if (cstate->copy_file == NULL)
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not open file \"%s\" for writing: %m",
-                           cstate->filename)));
+           oumask = umask(S_IWGRP | S_IWOTH);
+           cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+           umask(oumask);
+           if (cstate->copy_file == NULL)
+               ereport(ERROR,
+                       (errcode_for_file_access(),
+                        errmsg("could not open file \"%s\" for writing: %m",
+                               cstate->filename)));
 
-       fstat(fileno(cstate->copy_file), &st);
-       if (S_ISDIR(st.st_mode))
-           ereport(ERROR,
-                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                    errmsg("\"%s\" is a directory", cstate->filename)));
+           fstat(fileno(cstate->copy_file), &st);
+           if (S_ISDIR(st.st_mode))
+               ereport(ERROR,
+                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                        errmsg("\"%s\" is a directory", cstate->filename)));
+       }
    }
 
    MemoryContextSwitchTo(oldcontext);
@@ -2317,6 +2398,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 CopyState
 BeginCopyFrom(Relation rel,
              const char *filename,
+             bool  is_program,
              List *attnamelist,
              List *options)
 {
@@ -2413,9 +2495,11 @@ BeginCopyFrom(Relation rel,
    cstate->defexprs = defexprs;
    cstate->volatile_defexprs = volatile_defexprs;
    cstate->num_defaults = num_defaults;
+   cstate->is_program = is_program;
 
    if (pipe)
    {
+       Assert(!is_program);    /* the grammar does not allow this */
        if (whereToSendOutput == DestRemote)
            ReceiveCopyBegin(cstate);
        else
@@ -2423,22 +2507,33 @@ BeginCopyFrom(Relation rel,
    }
    else
    {
-       struct stat st;
-
        cstate->filename = pstrdup(filename);
-       cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
 
-       if (cstate->copy_file == NULL)
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not open file \"%s\" for reading: %m",
-                           cstate->filename)));
+       if (cstate->is_program)
+       {
+           cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
+           if (cstate->copy_file == NULL)
+               ereport(ERROR,
+                       (errmsg("could not execute command \"%s\": %m",
+                               cstate->filename)));
+       }
+       else
+       {
+           struct stat st;
 
-       fstat(fileno(cstate->copy_file), &st);
-       if (S_ISDIR(st.st_mode))
-           ereport(ERROR,
-                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                    errmsg("\"%s\" is a directory", cstate->filename)));
+           cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+           if (cstate->copy_file == NULL)
+               ereport(ERROR,
+                       (errcode_for_file_access(),
+                        errmsg("could not open file \"%s\" for reading: %m",
+                               cstate->filename)));
+
+           fstat(fileno(cstate->copy_file), &st);
+           if (S_ISDIR(st.st_mode))
+               ereport(ERROR,
+                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                        errmsg("\"%s\" is a directory", cstate->filename)));
+       }
    }
 
    if (!cstate->binary)
index 2da08d1cc154018a6a2deb681bf2ec9b09aeb8c3..23ec88d54c72a5a0a0a349b4412585d890dda189 100644 (file)
@@ -2703,6 +2703,7 @@ _copyCopyStmt(const CopyStmt *from)
    COPY_NODE_FIELD(query);
    COPY_NODE_FIELD(attlist);
    COPY_SCALAR_FIELD(is_from);
+   COPY_SCALAR_FIELD(is_program);
    COPY_STRING_FIELD(filename);
    COPY_NODE_FIELD(options);
 
index 9e313c8b1be18da46834a802a5f7cb18d481aeda..99c034ab684f1caa004775f3781d549ed57703a1 100644 (file)
@@ -1090,6 +1090,7 @@ _equalCopyStmt(const CopyStmt *a, const CopyStmt *b)
    COMPARE_NODE_FIELD(query);
    COMPARE_NODE_FIELD(attlist);
    COMPARE_SCALAR_FIELD(is_from);
+   COMPARE_SCALAR_FIELD(is_program);
    COMPARE_STRING_FIELD(filename);
    COMPARE_NODE_FIELD(options);
 
index b998431f5f36b3b62833db0e4d3c080810fd864f..d3009b67b411ed329ef2c20d37c00b25b89b1591 100644 (file)
@@ -381,7 +381,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type  opt_freeze opt_default opt_recheck
 %type  opt_binary opt_oids copy_delimiter
 
-%type  copy_from
+%type  copy_from opt_program
 
 %type    opt_column event cursor_options opt_hold opt_set_data
 %type     reindex_type drop_type comment_type security_label_type
@@ -568,7 +568,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
    PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POSITION
    PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
-   PRIOR PRIVILEGES PROCEDURAL PROCEDURE
+   PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM
 
    QUOTE
 
@@ -2309,7 +2309,10 @@ ClosePortalStmt:
  *
  *     QUERY :
  *             COPY relname [(columnList)] FROM/TO file [WITH] [(options)]
- *             COPY ( SELECT ... ) TO file [WITH] [(options)]
+ *             COPY ( SELECT ... ) TO file [WITH] [(options)]
+ *
+ *             where 'file' can be one of:
+ *             { PROGRAM 'command' | STDIN | STDOUT | 'filename' }
  *
  *             In the preferred syntax the options are comma-separated
  *             and use generic identifiers instead of keywords.  The pre-9.0
@@ -2324,14 +2327,21 @@ ClosePortalStmt:
  *****************************************************************************/
 
 CopyStmt:  COPY opt_binary qualified_name opt_column_list opt_oids
-           copy_from copy_file_name copy_delimiter opt_with copy_options
+           copy_from opt_program copy_file_name copy_delimiter opt_with copy_options
                {
                    CopyStmt *n = makeNode(CopyStmt);
                    n->relation = $3;
                    n->query = NULL;
                    n->attlist = $4;
                    n->is_from = $6;
-                   n->filename = $7;
+                   n->is_program = $7;
+                   n->filename = $8;
+
+                   if (n->is_program && n->filename == NULL)
+                       ereport(ERROR,
+                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                errmsg("STDIN/STDOUT not allowed with PROGRAM"),
+                                parser_errposition(@8)));
 
                    n->options = NIL;
                    /* Concatenate user-supplied flags */
@@ -2339,21 +2349,29 @@ CopyStmt:   COPY opt_binary qualified_name opt_column_list opt_oids
                        n->options = lappend(n->options, $2);
                    if ($5)
                        n->options = lappend(n->options, $5);
-                   if ($8)
-                       n->options = lappend(n->options, $8);
-                   if ($10)
-                       n->options = list_concat(n->options, $10);
+                   if ($9)
+                       n->options = lappend(n->options, $9);
+                   if ($11)
+                       n->options = list_concat(n->options, $11);
                    $$ = (Node *)n;
                }
-           | COPY select_with_parens TO copy_file_name opt_with copy_options
+           | COPY select_with_parens TO opt_program copy_file_name opt_with copy_options
                {
                    CopyStmt *n = makeNode(CopyStmt);
                    n->relation = NULL;
                    n->query = $2;
                    n->attlist = NIL;
                    n->is_from = false;
-                   n->filename = $4;
-                   n->options = $6;
+                   n->is_program = $4;
+                   n->filename = $5;
+                   n->options = $7;
+
+                   if (n->is_program && n->filename == NULL)
+                       ereport(ERROR,
+                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                errmsg("STDIN/STDOUT not allowed with PROGRAM"),
+                                parser_errposition(@5)));
+
                    $$ = (Node *)n;
                }
        ;
@@ -2363,6 +2381,11 @@ copy_from:
            | TO                                    { $$ = FALSE; }
        ;
 
+opt_program:
+           PROGRAM                                 { $$ = TRUE; }
+           | /* EMPTY */                           { $$ = FALSE; }
+       ;
+
 /*
  * copy_file_name NULL indicates stdio is used. Whether stdin or stdout is
  * used depends on the direction. (It really doesn't make sense to copy from
@@ -12666,6 +12689,7 @@ unreserved_keyword:
            | PRIVILEGES
            | PROCEDURAL
            | PROCEDURE
+           | PROGRAM
            | QUOTE
            | RANGE
            | READ
index ba1b84eadeff288b130b6de8b5572219ae9dd561..c31a523857d3faa14b7450a8b215d2fdf0d8c89d 100644 (file)
  * for a long time, like relation files. It is the caller's responsibility
  * to close them, there is no automatic mechanism in fd.c for that.
  *
- * AllocateFile, AllocateDir and OpenTransientFile are wrappers around
- * fopen(3), opendir(3), and open(2), respectively. They behave like the
- * corresponding native functions, except that the handle is registered with
- * the current subtransaction, and will be automatically closed at abort.
- * These are intended for short operations like reading a configuration file,
- * and there is a fixed limit on the number of files that can be opened using
- * these functions at any one time.
+ * AllocateFile, AllocateDir, OpenPipeStream and OpenTransientFile are
+ * wrappers around fopen(3), opendir(3), popen(3) and open(2), respectively.
+ * They behave like the corresponding native functions, except that the handle
+ * is registered with the current subtransaction, and will be automatically
+ * closed at abort. These are intended for short operations like reading a
+ * configuration file, and there is a fixed limit on the number of files that
+ * can be opened using these functions at any one time.
  *
  * Finally, BasicOpenFile is just a thin wrapper around open() that can
  * release file descriptors in use by the virtual file descriptors if
@@ -202,6 +202,7 @@ static uint64 temporary_files_size = 0;
 typedef enum
 {
    AllocateDescFile,
+   AllocateDescPipe,
    AllocateDescDir,
    AllocateDescRawFD
 } AllocateDescKind;
@@ -1585,6 +1586,61 @@ OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
    return -1;                  /* failure */
 }
 
+/*
+ * Routines that want to initiate a pipe stream should use OpenPipeStream
+ * rather than plain popen().  This lets fd.c deal with freeing FDs if
+ * necessary.  When done, call ClosePipeStream rather than pclose.
+ */
+FILE *
+OpenPipeStream(const char *command, const char *mode)
+{
+   FILE       *file;
+
+   DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
+              numAllocatedDescs, command));
+
+   /*
+    * The test against MAX_ALLOCATED_DESCS prevents us from overflowing
+    * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile
+    * from hogging every one of the available FDs, which'd lead to infinite
+    * looping.
+    */
+   if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
+       numAllocatedDescs >= max_safe_fds - 1)
+       elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"",
+            command);
+
+TryAgain:
+   fflush(stdout);
+   fflush(stderr);
+   errno = 0;
+   if ((file = popen(command, mode)) != NULL)
+   {
+       AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
+
+       desc->kind = AllocateDescPipe;
+       desc->desc.file = file;
+       desc->create_subid = GetCurrentSubTransactionId();
+       numAllocatedDescs++;
+       return desc->desc.file;
+   }
+
+   if (errno == EMFILE || errno == ENFILE)
+   {
+       int         save_errno = errno;
+
+       ereport(LOG,
+               (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+                errmsg("out of file descriptors: %m; release and retry")));
+       errno = 0;
+       if (ReleaseLruFile())
+           goto TryAgain;
+       errno = save_errno;
+   }
+
+   return NULL;
+}
+
 /*
  * Free an AllocateDesc of any type.
  *
@@ -1601,6 +1657,9 @@ FreeDesc(AllocateDesc *desc)
        case AllocateDescFile:
            result = fclose(desc->desc.file);
            break;
+       case AllocateDescPipe:
+           result = pclose(desc->desc.file);
+           break;
        case AllocateDescDir:
            result = closedir(desc->desc.dir);
            break;
@@ -1814,6 +1873,31 @@ FreeDir(DIR *dir)
 }
 
 
+/*
+ * Close a pipe stream returned by OpenPipeStream.
+ */
+int
+ClosePipeStream(FILE *file)
+{
+   int         i;
+
+   DO_DB(elog(LOG, "ClosePipeStream: Allocated %d", numAllocatedDescs));
+
+   /* Remove file from list of allocated files, if it's present */
+   for (i = numAllocatedDescs; --i >= 0;)
+   {
+       AllocateDesc *desc = &allocatedDescs[i];
+
+       if (desc->kind == AllocateDescPipe && desc->desc.file == file)
+           return FreeDesc(desc);
+   }
+
+   /* Only get here if someone passes us a file not in allocatedDescs */
+   elog(WARNING, "file passed to ClosePipeStream was not obtained from OpenPipeStream");
+
+   return pclose(file);
+}
+
 /*
  * closeAllVfds
  *
index a31d789919ab83fd3f26f13aa6995ca48cc511ff..a97795f943e1c97e0c840eca2f83240a2f80d56b 100644 (file)
@@ -35,6 +35,9 @@
  * \copy tablename [(columnlist)] from|to filename [options]
  * \copy ( select stmt ) to filename [options]
  *
+ * where 'filename' can be one of the following:
+ *  '' | PROGRAM '' | stdin | stdout | pstdout | pstdout
+ *
  * An undocumented fact is that you can still write BINARY before the
  * tablename; this is a hangover from the pre-7.3 syntax.  The options
  * syntax varies across backend versions, but we avoid all that mess
@@ -43,6 +46,7 @@
  * table name can be double-quoted and can have a schema part.
  * column names can be double-quoted.
  * filename can be single-quoted like SQL literals.
+ * command must be single-quoted like SQL literals.
  *
  * returns a malloc'ed structure with the options, or NULL on parsing error
  */
@@ -52,6 +56,7 @@ struct copy_options
    char       *before_tofrom;  /* COPY string before TO/FROM */
    char       *after_tofrom;   /* COPY string after TO/FROM filename */
    char       *file;           /* NULL = stdin/stdout */
+   bool        program;        /* is 'file' a program to popen? */
    bool        psql_inout;     /* true = use psql stdin/stdout */
    bool        from;           /* true = FROM, false = TO */
 };
@@ -191,15 +196,37 @@ parse_slash_copy(const char *args)
    else
        goto error;
 
+   /* { 'filename' | PROGRAM 'command' | STDIN | STDOUT | PSTDIN | PSTDOUT } */
    token = strtokx(NULL, whitespace, NULL, "'",
-                   0, false, true, pset.encoding);
+                   0, false, false, pset.encoding);
    if (!token)
        goto error;
 
-   if (pg_strcasecmp(token, "stdin") == 0 ||
-       pg_strcasecmp(token, "stdout") == 0)
+   if (pg_strcasecmp(token, "program") == 0)
+   {
+       int toklen;
+
+       token = strtokx(NULL, whitespace, NULL, "'",
+                       0, false, false, pset.encoding);
+       if (!token)
+           goto error;
+
+       /*
+        * The shell command must be quoted. This isn't fool-proof, but catches
+        * most quoting errors.
+        */
+       toklen = strlen(token);
+       if (token[0] != '\'' || toklen < 2 || token[toklen - 1] != '\'')
+           goto error;
+
+       strip_quotes(token, '\'', 0, pset.encoding);
+
+       result->program = true;
+       result->file = pg_strdup(token);
+   }
+   else if (pg_strcasecmp(token, "stdin") == 0 ||
+            pg_strcasecmp(token, "stdout") == 0)
    {
-       result->psql_inout = false;
        result->file = NULL;
    }
    else if (pg_strcasecmp(token, "pstdin") == 0 ||
@@ -210,7 +237,8 @@ parse_slash_copy(const char *args)
    }
    else
    {
-       result->psql_inout = false;
+       /* filename can be optionally quoted */
+       strip_quotes(token, '\'', 0, pset.encoding);
        result->file = pg_strdup(token);
        expand_tilde(&result->file);
    }
@@ -235,9 +263,9 @@ error:
 
 
 /*
- * Execute a \copy command (frontend copy). We have to open a file, then
- * submit a COPY query to the backend and either feed it data from the
- * file or route its response into the file.
+ * Execute a \copy command (frontend copy). We have to open a file (or execute
+ * a command), then submit a COPY query to the backend and either feed it data
+ * from the file or route its response into the file.
  */
 bool
 do_copy(const char *args)
@@ -257,7 +285,7 @@ do_copy(const char *args)
        return false;
 
    /* prepare to read or write the target file */
-   if (options->file)
+   if (options->file && !options->program)
        canonicalize_path(options->file);
 
    if (options->from)
@@ -265,7 +293,17 @@ do_copy(const char *args)
        override_file = &pset.cur_cmd_source;
 
        if (options->file)
-           copystream = fopen(options->file, PG_BINARY_R);
+       {
+           if (options->program)
+           {
+               fflush(stdout);
+               fflush(stderr);
+               errno = 0;
+               copystream = popen(options->file, PG_BINARY_R);
+           }
+           else
+               copystream = fopen(options->file, PG_BINARY_R);
+       }
        else if (!options->psql_inout)
            copystream = pset.cur_cmd_source;
        else
@@ -276,7 +314,20 @@ do_copy(const char *args)
        override_file = &pset.queryFout;
 
        if (options->file)
-           copystream = fopen(options->file, PG_BINARY_W);
+       {
+           if (options->program)
+           {
+               fflush(stdout);
+               fflush(stderr);
+               errno = 0;
+#ifndef WIN32
+               pqsignal(SIGPIPE, SIG_IGN);
+#endif
+               copystream = popen(options->file, PG_BINARY_W);
+           }
+           else
+               copystream = fopen(options->file, PG_BINARY_W);
+       }
        else if (!options->psql_inout)
            copystream = pset.queryFout;
        else
@@ -285,21 +336,28 @@ do_copy(const char *args)
 
    if (!copystream)
    {
-       psql_error("%s: %s\n",
-                  options->file, strerror(errno));
+       if (options->program)
+           psql_error("could not execute command \"%s\": %s\n",
+                      options->file, strerror(errno));
+       else
+           psql_error("%s: %s\n",
+                      options->file, strerror(errno));
        free_copy_options(options);
        return false;
    }
 
-   /* make sure the specified file is not a directory */
-   fstat(fileno(copystream), &st);
-   if (S_ISDIR(st.st_mode))
+   if (!options->program)
    {
-       fclose(copystream);
-       psql_error("%s: cannot copy from/to a directory\n",
-                  options->file);
-       free_copy_options(options);
-       return false;
+       /* make sure the specified file is not a directory */
+       fstat(fileno(copystream), &st);
+       if (S_ISDIR(st.st_mode))
+       {
+           fclose(copystream);
+           psql_error("%s: cannot copy from/to a directory\n",
+                      options->file);
+           free_copy_options(options);
+           return false;
+       }
    }
 
    /* build the command we will send to the backend */
@@ -322,10 +380,35 @@ do_copy(const char *args)
 
    if (options->file != NULL)
    {
-       if (fclose(copystream) != 0)
+       if (options->program)
        {
-           psql_error("%s: %s\n", options->file, strerror(errno));
-           success = false;
+           int pclose_rc = pclose(copystream);
+           if (pclose_rc != 0)
+           {
+               if (pclose_rc < 0)
+                   psql_error("could not close pipe to external command: %s\n",
+                              strerror(errno));
+               else
+               {
+                   char *reason = wait_result_to_str(pclose_rc);
+                   psql_error("%s: %s\n", options->file,
+                              reason ? reason : "");
+                   if (reason)
+                       free(reason);
+               }
+               success = false;
+           }
+#ifndef WIN32
+           pqsignal(SIGPIPE, SIG_DFL);
+#endif
+       }
+       else
+       {
+           if (fclose(copystream) != 0)
+           {
+               psql_error("%s: %s\n", options->file, strerror(errno));
+               success = false;
+           }
        }
    }
    free_copy_options(options);
index 450240dd9c728a47096519b234542f2df3f9f4d4..99968a16f961604687a558737df144657a9f4efa 100644 (file)
@@ -13,9 +13,6 @@
 #include "stringutils.h"
 
 
-static void strip_quotes(char *source, char quote, char escape, int encoding);
-
-
 /*
  * Replacement for strtok() (a.k.a. poor man's flex)
  *
@@ -239,7 +236,7 @@ strtokx(const char *s,
  *
  * Note that the source string is overwritten in-place.
  */
-static void
+void
 strip_quotes(char *source, char quote, char escape, int encoding)
 {
    char       *src;
index b991376c11b4dd7f5c447b11ebb1a623da56d822..bb2a194463bc798e088560e18d17c519c9908951 100644 (file)
@@ -19,6 +19,8 @@ extern char *strtokx(const char *s,
        bool del_quotes,
        int encoding);
 
+extern void strip_quotes(char *source, char quote, char escape, int encoding);
+
 extern char *quote_if_needed(const char *source, const char *entails_quote,
                char quote, char escape, int encoding);
 
index 725c277ecfd6588a608e2a8f9ced124133920bd5..5860e4cf6b2a3c7829f9ef39263065d9055903fd 100644 (file)
@@ -26,7 +26,7 @@ extern Oid DoCopy(const CopyStmt *stmt, const char *queryString,
 
 extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options);
 extern CopyState BeginCopyFrom(Relation rel, const char *filename,
-             List *attnamelist, List *options);
+             bool is_program, List *attnamelist, List *options);
 extern void EndCopyFrom(CopyState cstate);
 extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
             Datum *values, bool *nulls, Oid *tupleOid);
index d8678e5b3fd7170cb972b64958b004fd29246d8f..d54990d39c170bc70333f68e98f5736c903f2f5e 100644 (file)
@@ -1407,6 +1407,7 @@ typedef struct CopyStmt
    List       *attlist;        /* List of column names (as Strings), or NIL
                                 * for all columns */
    bool        is_from;        /* TO or FROM */
+   bool        is_program;     /* is 'filename' a program to popen? */
    char       *filename;       /* filename, or NULL for STDIN/STDOUT */
    List       *options;        /* List of DefElem nodes */
 } CopyStmt;
index 03aa7616409164cd7f9bd399da742a595aca212d..6f67a65f3d1d1db1b9832d4cda7f9fe15ec6caa5 100644 (file)
@@ -292,6 +292,7 @@ PG_KEYWORD("prior", PRIOR, UNRESERVED_KEYWORD)
 PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD)
 PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD)
+PG_KEYWORD("program", PROGRAM, UNRESERVED_KEYWORD)
 PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD)
 PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD)
 PG_KEYWORD("read", READ, UNRESERVED_KEYWORD)
index 99d3a9b7fecd2590bf9490f05c5c3476f6950282..c5d0e0a9709e197dbd99f7e26e8bec36e6e45b88 100644 (file)
@@ -465,4 +465,7 @@ extern int  pg_mkdir_p(char *path, int omode);
 /* port/quotes.c */
 extern char *escape_single_quotes_ascii(const char *src);
 
+/* port/wait_error.c */
+extern char *wait_result_to_str(int exit_status);
+
 #endif   /* PG_PORT_H */
index bd36c9d7fcadc3ece77a01e0a6901770ca4b0a0a..90b4933ecb528d034427422664807f8c51714fd2 100644 (file)
@@ -80,6 +80,10 @@ extern char *FilePathName(File file);
 extern FILE *AllocateFile(const char *name, const char *mode);
 extern int FreeFile(FILE *file);
 
+/* Operations that allow use of pipe streams (popen/pclose) */
+extern FILE *OpenPipeStream(const char *command, const char *mode);
+extern int ClosePipeStream(FILE *file);
+
 /* Operations to allow use of the  library routines */
 extern DIR *AllocateDir(const char *dirname);
 extern struct dirent *ReadDir(DIR *dir, const char *dirname);
index d8147cf43cbe4c3c608d8fbc67702a74a57a83c5..85ec391988c6feca38a84cc2e43ec9777cc2374e 100644 (file)
@@ -192,7 +192,7 @@ ECPG: where_or_current_clauseWHERECURRENT_POFcursor_name block
        char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4;
        $$ = cat_str(2,mm_strdup("where current of"), cursor_marker);
    }
-ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromcopy_file_namecopy_delimiteropt_withcopy_options addon
+ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromopt_programcopy_file_namecopy_delimiteropt_withcopy_options addon
            if (strcmp($6, "from") == 0 &&
               (strcmp($7, "stdin") == 0 || strcmp($7, "stdout") == 0))
                mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");
index a3db615400e3d09f777395e205e0fbe302850c2d..0774e33f9fb4a2d3a275f3b0383e09a66e79d190 100644 (file)
@@ -32,7 +32,8 @@ LIBS += $(PTHREAD_LIBS)
 
 OBJS = $(LIBOBJS) chklocale.o dirmod.o erand48.o exec.o fls.o inet_net_ntop.o \
    noblock.o path.o pgcheckdir.o pg_crc.o pgmkdirp.o pgsleep.o \
-   pgstrcasecmp.o qsort.o qsort_arg.o quotes.o sprompt.o tar.o thread.o
+   pgstrcasecmp.o qsort.o qsort_arg.o quotes.o sprompt.o tar.o thread.o \
+   wait_error.o
 
 # foo_srv.o and foo.o are both built from foo.c, but only foo.o has -DFRONTEND
 OBJS_SRV = $(OBJS:%.o=%_srv.o)
index 18be1408f7aacccb975285ebf1d2ab51db541d69..01203c056cc068a30628908537456b08a7041c25 100644 (file)
@@ -505,14 +505,12 @@ pipe_read_line(char *cmd, char *line, int maxsize)
 
 /*
  * pclose() plus useful error reporting
- * Is this necessary?  bjm 2004-05-11
- * Originally this was stated to be here because pipe.c had backend linkage.
- * Perhaps that's no longer so now we have got rid of pipe.c amd 2012-03-28
  */
 int
 pclose_check(FILE *stream)
 {
    int         exitstatus;
+   char       *reason;
 
    exitstatus = pclose(stream);
 
@@ -522,36 +520,21 @@ pclose_check(FILE *stream)
    if (exitstatus == -1)
    {
        /* pclose() itself failed, and hopefully set errno */
-       perror("pclose failed");
+       log_error(_("pclose failed: %s"), strerror(errno));
    }
-   else if (WIFEXITED(exitstatus))
-       log_error(_("child process exited with exit code %d"),
-                 WEXITSTATUS(exitstatus));
-   else if (WIFSIGNALED(exitstatus))
-#if defined(WIN32)
-       log_error(_("child process was terminated by exception 0x%X"),
-                 WTERMSIG(exitstatus));
-#elif defined(HAVE_DECL_SYS_SIGLIST) && HAVE_DECL_SYS_SIGLIST
+   else
    {
-       char        str[256];
-
-       snprintf(str, sizeof(str), "%d: %s", WTERMSIG(exitstatus),
-                WTERMSIG(exitstatus) < NSIG ?
-                sys_siglist[WTERMSIG(exitstatus)] : "(unknown)");
-       log_error(_("child process was terminated by signal %s"), str);
-   }
+       reason = wait_result_to_str(exitstatus);
+       log_error("%s", reason);
+#ifdef FRONTEND
+       free(reason);
 #else
-       log_error(_("child process was terminated by signal %d"),
-                 WTERMSIG(exitstatus));
+       pfree(reason);
 #endif
-   else
-       log_error(_("child process exited with unrecognized status %d"),
-                 exitstatus);
-
-   return -1;
+   }
+   return exitstatus;
 }
 
-
 /*
  * set_pglocale_pgservice
  *
diff --git a/src/port/wait_error.c b/src/port/wait_error.c
new file mode 100644 (file)
index 0000000..ac9c52b
--- /dev/null
@@ -0,0 +1,92 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait_error.c
+ *     Convert a wait/waitpid(2) result code to a human-readable string
+ *
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *   src/port/wait_error.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef FRONTEND
+#include "postgres.h"
+#else
+#include "postgres_fe.h"
+#endif
+
+#include 
+#include 
+#include 
+#include 
+
+/*
+ * Return a human-readable string explaining the reason a child process
+ * terminated. The argument is a return code returned by wait(2) or
+ * waitpid(2). The result is a translated, palloc'd or malloc'd string.
+ */
+char *
+wait_result_to_str(int exitstatus)
+{
+   char        str[512];
+   char       *result;
+
+   if (WIFEXITED(exitstatus))
+   {
+       /*
+        * Give more specific error message for some common exit codes that
+        * have a special meaning in shells.
+        */
+       switch (WEXITSTATUS(exitstatus))
+       {
+           case 126:
+               snprintf(str, sizeof(str), _("command not executable"));
+               break;
+
+           case 127:
+               snprintf(str, sizeof(str), _("command not found"));
+               break;
+
+           default:
+               snprintf(str, sizeof(str),
+                        _("child process exited with exit code %d"),
+                        WEXITSTATUS(exitstatus));
+       }
+   }
+   else if (WIFSIGNALED(exitstatus))
+#if defined(WIN32)
+       snprintf(str, sizeof(str),
+                _("child process was terminated by exception 0x%X"),
+                WTERMSIG(exitstatus));
+#elif defined(HAVE_DECL_SYS_SIGLIST) && HAVE_DECL_SYS_SIGLIST
+   {
+       char        str2[256];
+
+       snprintf(str2, sizeof(str2), "%d: %s", WTERMSIG(exitstatus),
+                WTERMSIG(exitstatus) < NSIG ?
+                sys_siglist[WTERMSIG(exitstatus)] : "(unknown)");
+       snprintf(str, sizeof(str),
+                _("child process was terminated by signal %s"), str2);
+   }
+#else
+       snprintf(str, sizeof(str),
+                _("child process was terminated by signal %d"),
+                WTERMSIG(exitstatus));
+#endif
+   else
+       snprintf(str, sizeof(str),
+                _("child process exited with unrecognized status %d"),
+                 exitstatus);
+
+#ifndef FRONTEND
+   result = pstrdup(str);
+#else
+   result = strdup(str);
+#endif
+   return result;
+}