/* Otherwise, drop anything that's selected and has a dropStmt */
if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
{
+ bool not_allowed_in_txn = false;
+
pg_log_info("dropping %s %s", te->desc, te->tag);
+
+ /*
+ * In --transaction-size mode, we have to temporarily exit our
+ * transaction block to drop objects that can't be dropped
+ * within a transaction.
+ */
+ if (ropt->txn_size > 0)
+ {
+ if (strcmp(te->desc, "DATABASE") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ {
+ not_allowed_in_txn = true;
+ if (AH->connection)
+ CommitTransaction(AHX);
+ else
+ ahprintf(AH, "COMMIT;\n");
+ }
+ }
+
/* Select owner and schema as necessary */
_becomeOwner(AH, te);
_selectOutputSchema(AH, te->namespace);
}
}
}
+
+ /*
+ * In --transaction-size mode, re-establish the transaction
+ * block if needed; otherwise, commit after every N drops.
+ */
+ if (ropt->txn_size > 0)
+ {
+ if (not_allowed_in_txn)
+ {
+ if (AH->connection)
+ StartTransaction(AHX);
+ else
+ ahprintf(AH, "BEGIN;\n");
+ AH->txnCount = 0;
+ }
+ else if (++AH->txnCount >= ropt->txn_size)
+ {
+ if (AH->connection)
+ {
+ CommitTransaction(AHX);
+ StartTransaction(AHX);
+ }
+ else
+ ahprintf(AH, "COMMIT;\nBEGIN;\n");
+ AH->txnCount = 0;
+ }
+ }
}
}
}
}
- if (ropt->single_txn)
+ /*
+ * Close out any persistent transaction we may have. While these two
+ * cases are started in different places, we can end both cases here.
+ */
+ if (ropt->single_txn || ropt->txn_size > 0)
{
if (AH->connection)
CommitTransaction(AHX);
*/
if ((reqs & REQ_SCHEMA) != 0)
{
+ bool object_is_db = false;
+
+ /*
+ * In --transaction-size mode, must exit our transaction block to
+ * create a database or set its properties.
+ */
+ if (strcmp(te->desc, "DATABASE") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ {
+ object_is_db = true;
+ if (ropt->txn_size > 0)
+ {
+ if (AH->connection)
+ CommitTransaction(&AH->public);
+ else
+ ahprintf(AH, "COMMIT;\n\n");
+ }
+ }
+
/* Show namespace in log message if available */
if (te->namespace)
pg_log_info("creating %s \"%s.%s\"",
/*
* If we created a DB, connect to it. Also, if we changed DB
* properties, reconnect to ensure that relevant GUC settings are
- * applied to our session.
+ * applied to our session. (That also restarts the transaction block
+ * in --transaction-size mode.)
*/
- if (strcmp(te->desc, "DATABASE") == 0 ||
- strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ if (object_is_db)
{
pg_log_info("connecting to new database \"%s\"", te->tag);
_reconnectToDB(AH, te->tag);
}
}
+ /*
+ * If we emitted anything for this TOC entry, that counts as one action
+ * against the transaction-size limit. Commit if it's time to.
+ */
+ if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
+ {
+ if (++AH->txnCount >= ropt->txn_size)
+ {
+ if (AH->connection)
+ {
+ CommitTransaction(&AH->public);
+ StartTransaction(&AH->public);
+ }
+ else
+ ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+ AH->txnCount = 0;
+ }
+ }
+
if (AH->public.n_errors > 0 && status == WORKER_OK)
status = WORKER_IGNORED_ERRORS;
{
RestoreOptions *ropt = AH->public.ropt;
- if (!ropt->single_txn)
+ /*
+ * LOs must be restored within a transaction block, since we need the LO
+ * handle to stay open while we write it. Establish a transaction unless
+ * there's one being used globally.
+ */
+ if (!(ropt->single_txn || ropt->txn_size > 0))
{
if (AH->connection)
StartTransaction(&AH->public);
{
RestoreOptions *ropt = AH->public.ropt;
- if (!ropt->single_txn)
+ if (!(ropt->single_txn || ropt->txn_size > 0))
{
if (AH->connection)
CommitTransaction(&AH->public);
else
ahprintf(AH, "SET row_security = off;\n");
+ /*
+ * In --transaction-size mode, we should always be in a transaction when
+ * we begin to restore objects.
+ */
+ if (ropt && ropt->txn_size > 0)
+ {
+ if (AH->connection)
+ StartTransaction(&AH->public);
+ else
+ ahprintf(AH, "\nBEGIN;\n");
+ AH->txnCount = 0;
+ }
+
ahprintf(AH, "\n");
}
}
}
+ /*
+ * In --transaction-size mode, we must commit the open transaction before
+ * dropping the database connection. This also ensures that child workers
+ * can see the objects we've created so far.
+ */
+ if (AH->public.ropt->txn_size > 0)
+ CommitTransaction(&AH->public);
+
/*
* Now close parent connection in prep for parallel steps. We do this
* mainly to ensure that we don't exceed the specified number of parallel
clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
memcpy(clone, AH, sizeof(ArchiveHandle));
+ /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
+ clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
+ memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
+
/* Handle format-independent fields */
memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
/* clones should not share lo_buf */
clone->lo_buf = NULL;
+ /*
+ * Clone connections disregard --transaction-size; they must commit after
+ * each command so that the results are immediately visible to other
+ * workers.
+ */
+ clone->public.ropt->txn_size = 0;
+
/*
* Connect our new clone object to the database, using the same connection
* parameters used for the original connection.
{"role", required_argument, NULL, 2},
{"section", required_argument, NULL, 3},
{"strict-names", no_argument, &strict_names, 1},
+ {"transaction-size", required_argument, NULL, 5},
{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
{"no-comments", no_argument, &no_comments, 1},
{"no-publications", no_argument, &no_publications, 1},
set_dump_section(optarg, &(opts->dumpSections));
break;
- case 4:
+ case 4: /* filter */
read_restore_filters(optarg, opts);
break;
+ case 5: /* transaction-size */
+ if (!option_parse_int(optarg, "--transaction-size",
+ 1, INT_MAX,
+ &opts->txn_size))
+ exit(1);
+ opts->exit_on_error = true;
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
if (opts->dataOnly && opts->dropSchema)
pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
+ if (opts->single_txn && opts->txn_size > 0)
+ pg_fatal("options -1/--single-transaction and --transaction-size cannot be used together");
+
/*
* -C is not compatible with -1, because we can't create a database inside
* a transaction block.
printf(_(" --section=SECTION restore named section (pre-data, data, or post-data)\n"));
printf(_(" --strict-names require table and/or schema include patterns to\n"
" match at least one entity each\n"));
+ printf(_(" --transaction-size=N commit after every N objects\n"));
printf(_(" --use-set-session-authorization\n"
" use SET SESSION AUTHORIZATION commands instead of\n"
" ALTER OWNER commands to set ownership\n"));
#include "fe_utils/string_utils.h"
#include "pg_upgrade.h"
+/*
+ * Maximum number of pg_restore actions (TOC entries) to process within one
+ * transaction. At some point we might want to make this user-controllable,
+ * but for now a hard-wired setting will suffice.
+ */
+#define RESTORE_TRANSACTION_SIZE 1000
+
static void set_locale_and_encoding(void);
static void prepare_new_cluster(void);
static void prepare_new_globals(void);
true,
true,
"\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+ "--transaction-size=%d "
"--dbname postgres \"%s/%s\"",
new_cluster.bindir,
cluster_conn_opts(&new_cluster),
create_opts,
+ RESTORE_TRANSACTION_SIZE,
log_opts.dumpdir,
sql_file_name);
log_file_name[MAXPGPATH];
DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum];
const char *create_opts;
+ int txn_size;
/* Skip template1 in this pass */
if (strcmp(old_db->db_name, "template1") == 0)
else
create_opts = "--create";
+ /*
+ * In parallel mode, reduce the --transaction-size of each restore job
+ * so that the total number of locks that could be held across all the
+ * jobs stays in bounds.
+ */
+ txn_size = RESTORE_TRANSACTION_SIZE;
+ if (user_opts.jobs > 1)
+ {
+ txn_size /= user_opts.jobs;
+ /* Keep some sanity if -j is huge */
+ txn_size = Max(txn_size, 10);
+ }
+
parallel_exec_prog(log_file_name,
NULL,
"\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+ "--transaction-size=%d "
"--dbname template1 \"%s/%s\"",
new_cluster.bindir,
cluster_conn_opts(&new_cluster),
create_opts,
+ txn_size,
log_opts.dumpdir,
sql_file_name);
}