Report progress of COPY commands
authorTomas Vondra
Wed, 6 Jan 2021 20:46:26 +0000 (21:46 +0100)
committerTomas Vondra
Wed, 6 Jan 2021 20:51:06 +0000 (21:51 +0100)
This commit introduces a view pg_stat_progress_copy, reporting progress
of COPY commands.  This allows rough estimates how far a running COPY
progressed, with the caveat that the total number of bytes may not be
available in some cases (e.g. when the input comes from the client).

Author: Josef Šimánek
Reviewed-by: Fujii Masao, Bharath Rupireddy, Vignesh C, Matthias van de Meent
Discussion: https://postgr.es/m/CAFp7QwqMGEi4OyyaLEK9DR0+E+oK3UtA4bEjDVCa4bNkwUY2PQ@mail.gmail.com
Discussion: https://postgr.es/m/CAFp7Qwr6_FmRM6pCO0x_a0mymOfX_Gg+FEKet4XaTGSW=LitKQ@mail.gmail.com

doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/commands/copyfrom.c
src/backend/commands/copyfromparse.c
src/backend/commands/copyto.c
src/backend/utils/adt/pgstatfuncs.c
src/include/commands/copyfrom_internal.h
src/include/commands/progress.h
src/include/pgstat.h
src/test/regress/expected/rules.out

index 3d6c901306777caa56b6dfa6276d501f09b3d763..43fe8ae383eb923bdc6a7c31ece1178ec625cf3a 100644 (file)
@@ -399,6 +399,12 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       
      
 
+     
+      pg_stat_progress_copypg_stat_progress_copy
+      One row for each backend running COPY, showing current progress.
+       See .
+      
+     
     
    
   
@@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
    which support progress reporting are ANALYZE,
    CLUSTER,
    CREATE INDEXVACUUM,
+   COPY,
    and  (i.e., replication
    command that  issues to take
    a base backup).
@@ -6396,6 +6403,106 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
   
 
  
+
+  COPY Progress Reporting
+
+  
+   pg_stat_progress_copy
+  
+
+  
+   Whenever COPY is running, the
+   pg_stat_progress_copy view will contain one row
+   for each backend that is currently running COPY command.
+   The table bellow describes the information that will be reported and provide
+   information how to interpret it.
+  
+
+  
+   <structname>pg_stat_progress_copy</structname> View
+   
+    
+     
+      
+       Column Type
+      
+      
+       Description
+      
+     
+    
+
+    
+     
+      
+       pid integer
+      
+      
+       Process ID of backend.
+      
+     
+
+     
+      
+       datid text
+      
+      
+       OID of the database to which this backend is connected.
+      
+     
+
+     
+      
+       datname name
+      
+      
+       Name of the database to which this backend is connected.
+      
+     
+
+     
+      
+       relid oid
+      
+      
+       OID of the table on which the COPY command is executed.
+       It is set to 0 if SELECT query is provided.
+      
+     
+
+     
+      
+       bytes_processed bigint
+      
+      
+       Number of bytes already processed by COPY command.
+      
+     
+
+     
+      
+       bytes_total bigint
+      
+      
+       Size of source file for COPY FROM command in bytes.
+       It is set to 0 if not available.
+      
+     
+
+     
+      
+       lines_processed bigint
+      
+      
+       Number of lines already processed by COPY command.
+      
+     
+    
+   
+  
+
  
 
  
index ab4603c69b8e84141ab4dbff933890be7dc2e5a3..5d89e77dbe2f56982dc382bfc20f397130e4c7a8 100644 (file)
@@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS
         S.param5 AS tablespaces_streamed
     FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
 
+
+CREATE VIEW pg_stat_progress_copy AS
+    SELECT
+        S.pid AS pid, S.datid AS datid, D.datname AS datname,
+        S.relid AS relid,
+        S.param1 AS bytes_processed,
+        S.param2 AS bytes_total,
+        S.param3 AS lines_processed
+    FROM pg_stat_get_progress_info('COPY') AS S
+        LEFT JOIN pg_database D ON S.datid = D.oid;
+
 CREATE VIEW pg_user_mappings AS
     SELECT
         U.oid       AS umid,
index 84a5045215b076034d384989a4b6fc3e602dd69d..08b6f782c735879f4f93e3c0a2945316eb7eab55 100644 (file)
@@ -25,6 +25,7 @@
 #include "access/xlog.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
@@ -35,6 +36,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate)
            /*
             * We count only tuples not suppressed by a BEFORE INSERT trigger
             * or FDW; this is the same definition used by nodeModifyTable.c
-            * for counting tuples inserted by an INSERT command.
+            * for counting tuples inserted by an INSERT command. Update
+            * progress of the COPY command as well.
             */
-           processed++;
+           pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
        }
    }
 
@@ -1415,6 +1418,12 @@ BeginCopyFrom(ParseState *pstate,
        }
    }
 
+
+   /* initialize progress */
+   pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+                                 cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+   cstate->bytes_processed = 0;
+
    /* We keep those variables in cstate. */
    cstate->in_functions = in_functions;
    cstate->typioparams = typioparams;
@@ -1479,6 +1488,8 @@ BeginCopyFrom(ParseState *pstate,
                ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                         errmsg("\"%s\" is a directory", cstate->filename)));
+
+           pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
        }
    }
 
@@ -1522,6 +1533,8 @@ EndCopyFrom(CopyFromState cstate)
                            cstate->filename)));
    }
 
+   pgstat_progress_end_command();
+
    MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
 }
index 4360b7788ea0bd9d140cc4272aa2d67f25b66789..4c74067f849ccda40470b0ea752f351ff7b6212e 100644 (file)
 
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "executor/executor.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate)
    cstate->raw_buf[nbytes] = '\0';
    cstate->raw_buf_index = 0;
    cstate->raw_buf_len = nbytes;
+   cstate->bytes_processed += nbytes;
+   pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
    return (inbytes > 0);
 }
 
index 51597ae523d1a746b4d527592d986c83196dc6d0..e04ec1e331b4b1ab4c698275e832fa83db530434 100644 (file)
@@ -24,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "commands/copy.h"
+#include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
 #include "executor/tuptable.h"
@@ -32,6 +33,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -95,6 +97,7 @@ typedef struct CopyToStateData
 
    FmgrInfo   *out_functions;  /* lookup info for output functions */
    MemoryContext rowcontext;   /* per-row evaluation context */
+   uint64      bytes_processed;    /* number of bytes processed so far */
 
 } CopyToStateData;
 
@@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate)
            break;
    }
 
+   /* Update the progress */
+   cstate->bytes_processed += fe_msgbuf->len;
+   pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
    resetStringInfo(fe_msgbuf);
 }
 
@@ -363,6 +370,8 @@ EndCopy(CopyToState cstate)
                            cstate->filename)));
    }
 
+   pgstat_progress_end_command();
+
    MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
 }
@@ -760,6 +769,11 @@ BeginCopyTo(ParseState *pstate,
        }
    }
 
+   /* initialize progress */
+   pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+                                 cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+   cstate->bytes_processed = 0;
+
    MemoryContextSwitchTo(oldcontext);
 
    return cstate;
@@ -938,7 +952,9 @@ CopyTo(CopyToState cstate)
 
            /* Format and send the data */
            CopyOneRowTo(cstate, slot);
-           processed++;
+
+           /* Increment amount of processed tuples and update the progress */
+           pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
        }
 
        ExecDropSingleTupleTableSlot(slot);
@@ -1303,7 +1319,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
    /* Send the data */
    CopyOneRowTo(cstate, slot);
-   myState->processed++;
+
+   /* Increment amount of processed tuples and update the progress */
+   pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
 
    return true;
 }
index c9a1d4c56d9780c57ea55de7027a1fd9a44b5628..5c12a165a1535e8d90e12d8f2d31670054b5350c 100644 (file)
@@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
        cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
    else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
        cmdtype = PROGRESS_COMMAND_BASEBACKUP;
+   else if (pg_strcasecmp(cmd, "COPY") == 0)
+       cmdtype = PROGRESS_COMMAND_COPY;
    else
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
index 5401a966d270777b868369781d34ea3f7de6723b..e37942df391e7bd24c655a21193b4a96693f294f 100644 (file)
@@ -154,6 +154,7 @@ typedef struct CopyFromStateData
    char       *raw_buf;
    int         raw_buf_index;  /* next byte to process */
    int         raw_buf_len;    /* total # of bytes stored */
+   uint64      bytes_processed;/* number of bytes processed so far */
    /* Shorthand for number of unconsumed bytes available in raw_buf */
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 } CopyFromStateData;
index 49a158aabbf92dcc60193682b5942e70b8611998..95ec5d02e9cc47df7295dc6117db350604b8eeec 100644 (file)
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE     4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL         5
 
+/* Commands of PROGRESS_COPY */
+#define PROGRESS_COPY_BYTES_PROCESSED 0
+#define PROGRESS_COPY_BYTES_TOTAL 1
+#define PROGRESS_COPY_LINES_PROCESSED 2
+
 #endif
index 3a7e1997506e28abff317decf56e8aec4a7509b1..c38b68971019c2969a783461f7c79cf3f1b757e5 100644 (file)
@@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType
    PROGRESS_COMMAND_ANALYZE,
    PROGRESS_COMMAND_CLUSTER,
    PROGRESS_COMMAND_CREATE_INDEX,
-   PROGRESS_COMMAND_BASEBACKUP
+   PROGRESS_COMMAND_BASEBACKUP,
+   PROGRESS_COMMAND_COPY
 } ProgressCommandType;
 
 #define PGSTAT_NUM_PROGRESS_PARAM  20
index 6293ab57bcf61fe3bad1d1a89bbd605d0e77925b..a687e99d1e4fe18d0c0cc4c71351b432b29080d3 100644 (file)
@@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid,
     s.param8 AS index_rebuild_count
    FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_progress_copy| SELECT s.pid,
+    s.datid,
+    d.datname,
+    s.relid,
+    s.param1 AS bytes_processed,
+    s.param2 AS bytes_total,
+    s.param3 AS lines_processed
+   FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
+     LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
     s.datid,
     d.datname,