+ |
+
pg_stat_progress_copypg_stat_progress_copy
+ One row for each backend running COPY, showing current progress.
+ See .
+
+
which support progress reporting are ANALYZE,
CLUSTER,
CREATE INDEX, VACUUM,
+ COPY,
and (i.e., replication
command that issues to take
a base backup).
+
+
+
COPY Progress Reporting
+
+
+
+
+ Whenever COPY is running, the
+ pg_stat_progress_copy view will contain one row
+ for each backend that is currently running COPY command.
+ The table bellow describes the information that will be reported and provide
+ information how to interpret it.
+
+
+
+
pg_stat_progress_copy View
+
+
+ |
+ Column Type
+
+ Description
+
+
+
+
+
+ |
+ pid integer
+
+ Process ID of backend.
+
+
+
+ |
+ datid text
+
+ OID of the database to which this backend is connected.
+
+
+
+ |
+ datname name
+
+ Name of the database to which this backend is connected.
+
+
+
+ |
+ relid oid
+
+ OID of the table on which the COPY command is executed.
+ It is set to 0 if SELECT query is provided.
+
+
+
+ |
+ bytes_processed bigint
+
+ Number of bytes already processed by COPY command.
+
+
+
+ |
+ bytes_total bigint
+
+ Size of source file for COPY FROM command in bytes.
+ It is set to 0 if not available.
+
+
+
+ |
+ lines_processed bigint
+
+ Number of lines already processed by COPY command.
+
+
+
+
+
+
+
S.param5 AS tablespaces_streamed
FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
+
+CREATE VIEW pg_stat_progress_copy AS
+ SELECT
+ S.pid AS pid, S.datid AS datid, D.datname AS datname,
+ S.relid AS relid,
+ S.param1 AS bytes_processed,
+ S.param2 AS bytes_total,
+ S.param3 AS lines_processed
+ FROM pg_stat_get_progress_info('COPY') AS S
+ LEFT JOIN pg_database D ON S.datid = D.oid;
+
CREATE VIEW pg_user_mappings AS
SELECT
U.oid AS umid,
#include "access/xlog.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
+#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
- * for counting tuples inserted by an INSERT command.
+ * for counting tuples inserted by an INSERT command. Update
+ * progress of the COPY command as well.
*/
- processed++;
+ pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
}
}
}
}
+
+ /* initialize progress */
+ pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+ cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+ cstate->bytes_processed = 0;
+
/* We keep those variables in cstate. */
cstate->in_functions = in_functions;
cstate->typioparams = typioparams;
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
+
+ pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
}
}
cstate->filename)));
}
+ pgstat_progress_end_command();
+
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "port/pg_bswap.h"
#include "utils/memutils.h"
#include "utils/rel.h"
cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0;
cstate->raw_buf_len = nbytes;
+ cstate->bytes_processed += nbytes;
+ pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
return (inbytes > 0);
}
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/copy.h"
+#include "commands/progress.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
+#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
+ uint64 bytes_processed; /* number of bytes processed so far */
} CopyToStateData;
break;
}
+ /* Update the progress */
+ cstate->bytes_processed += fe_msgbuf->len;
+ pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
resetStringInfo(fe_msgbuf);
}
cstate->filename)));
}
+ pgstat_progress_end_command();
+
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
}
}
+ /* initialize progress */
+ pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+ cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+ cstate->bytes_processed = 0;
+
MemoryContextSwitchTo(oldcontext);
return cstate;
/* Format and send the data */
CopyOneRowTo(cstate, slot);
- processed++;
+
+ /* Increment amount of processed tuples and update the progress */
+ pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
}
ExecDropSingleTupleTableSlot(slot);
/* Send the data */
CopyOneRowTo(cstate, slot);
- myState->processed++;
+
+ /* Increment amount of processed tuples and update the progress */
+ pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
return true;
}
cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
cmdtype = PROGRESS_COMMAND_BASEBACKUP;
+ else if (pg_strcasecmp(cmd, "COPY") == 0)
+ cmdtype = PROGRESS_COMMAND_COPY;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
+ uint64 bytes_processed;/* number of bytes processed so far */
/* Shorthand for number of unconsumed bytes available in raw_buf */
#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
} CopyFromStateData;
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5
+/* Commands of PROGRESS_COPY */
+#define PROGRESS_COPY_BYTES_PROCESSED 0
+#define PROGRESS_COPY_BYTES_TOTAL 1
+#define PROGRESS_COPY_LINES_PROCESSED 2
+
#endif
PROGRESS_COMMAND_ANALYZE,
PROGRESS_COMMAND_CLUSTER,
PROGRESS_COMMAND_CREATE_INDEX,
- PROGRESS_COMMAND_BASEBACKUP
+ PROGRESS_COMMAND_BASEBACKUP,
+ PROGRESS_COMMAND_COPY
} ProgressCommandType;
#define PGSTAT_NUM_PROGRESS_PARAM 20
s.param8 AS index_rebuild_count
FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_progress_copy| SELECT s.pid,
+ s.datid,
+ d.datname,
+ s.relid,
+ s.param1 AS bytes_processed,
+ s.param2 AS bytes_total,
+ s.param3 AS lines_processed
+ FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
+ LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_create_index| SELECT s.pid,
s.datid,
d.datname,