* expects that it can move them around when resizing the table. So we
* cannot make the DumpableObjects be elements of the hash table directly;
* instead, the hash table elements contain pointers to DumpableObjects.
+ * This does have the advantage of letting us map multiple CatalogIds
+ * to one DumpableObject, which is useful for blobs.
*
* It turns out to be convenient to also use this data structure to map
* CatalogIds to owning extensions, if any. Since extension membership
}
}
+/*
+ * recordAdditionalCatalogID
+ * Record an additional catalog ID for the given DumpableObject
+ */
+void
+recordAdditionalCatalogID(CatalogId catId, DumpableObject *dobj)
+{
+ CatalogIdMapEntry *entry;
+ bool found;
+
+ /* CatalogId hash table must exist, if we have a DumpableObject */
+ Assert(catalogIdHash != NULL);
+
+ /* Add reference to CatalogId hash */
+ entry = catalogid_insert(catalogIdHash, catId, &found);
+ if (!found)
+ {
+ entry->dobj = NULL;
+ entry->ext = NULL;
+ }
+ Assert(entry->dobj == NULL);
+ entry->dobj = dobj;
+}
+
/*
* Assign a DumpId that's not tied to a DumpableObject.
*
* don't necessarily emit it verbatim; at this point we add an
* appropriate IF EXISTS clause, if the user requested it.
*/
- if (*te->dropStmt != '\0')
+ if (strcmp(te->desc, "BLOB METADATA") == 0)
+ {
+ /* We must generate the per-blob commands */
+ if (ropt->if_exists)
+ IssueCommandPerBlob(AH, te,
+ "SELECT pg_catalog.lo_unlink(oid) "
+ "FROM pg_catalog.pg_largeobject_metadata "
+ "WHERE oid = '", "'");
+ else
+ IssueCommandPerBlob(AH, te,
+ "SELECT pg_catalog.lo_unlink('",
+ "')");
+ }
+ else if (*te->dropStmt != '\0')
{
if (!ropt->if_exists ||
strncmp(te->dropStmt, "--", 2) == 0)
{
/*
* Inject an appropriate spelling of "if exists". For
- * large objects, we have a separate routine that
+ * old-style large objects, we have a routine that
* knows how to do it, without depending on
* te->dropStmt; use that. For other objects we need
* to parse the command.
*/
- if (strncmp(te->desc, "BLOB", 4) == 0)
+ if (strcmp(te->desc, "BLOB") == 0)
{
DropLOIfExists(AH, te->catalogId.oid);
}
**********/
/*
- * Called by a format handler before any LOs are restored
+ * Called by a format handler before a group of LOs is restored
*/
void
StartRestoreLOs(ArchiveHandle *AH)
}
/*
- * Called by a format handler after all LOs are restored
+ * Called by a format handler after a group of LOs is restored
*/
void
EndRestoreLOs(ArchiveHandle *AH)
AH->loCount++;
/* Initialize the LO Buffer */
+ if (AH->lo_buf == NULL)
+ {
+ /* First time through (in this process) so allocate the buffer */
+ AH->lo_buf_size = LOBBUFSIZE;
+ AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
+ }
AH->lo_buf_used = 0;
pg_log_info("restoring large object with OID %u", oid);
{
/*
* Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
- * is considered a data entry. We don't need to check for the BLOBS
- * entry or old-style BLOB COMMENTS, because they will have hadDumper
- * = true ... but we do need to check new-style BLOB ACLs, comments,
+ * is considered a data entry. We don't need to check for BLOBS or
+ * old-style BLOB COMMENTS entries, because they will have hadDumper =
+ * true ... but we do need to check new-style BLOB ACLs, comments,
* etc.
*/
if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
strcmp(te->desc, "BLOB") == 0 ||
+ strcmp(te->desc, "BLOB METADATA") == 0 ||
(strcmp(te->desc, "ACL") == 0 &&
- strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
(strcmp(te->desc, "COMMENT") == 0 &&
- strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
(strcmp(te->desc, "SECURITY LABEL") == 0 &&
- strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
+ strncmp(te->tag, "LARGE OBJECT", 12) == 0))
res = res & REQ_DATA;
else
res = res & ~REQ_DATA;
if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
!(ropt->binary_upgrade &&
(strcmp(te->desc, "BLOB") == 0 ||
+ strcmp(te->desc, "BLOB METADATA") == 0 ||
(strcmp(te->desc, "ACL") == 0 &&
- strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
(strcmp(te->desc, "COMMENT") == 0 &&
- strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
(strcmp(te->desc, "SECURITY LABEL") == 0 &&
- strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
+ strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
res = res & REQ_SCHEMA;
}
}
/*
- * Actually print the definition.
+ * Actually print the definition. Normally we can just print the defn
+ * string if any, but we have three special cases:
*
- * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
+ * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
* versions put into CREATE SCHEMA. Don't mutate the variant for schema
* "public" that is a comment. We have to do this when --no-owner mode is
* selected. This is ugly, but I see no other good way ...
+ *
+ * 2. BLOB METADATA entries need special processing since their defn
+ * strings are just lists of OIDs, not complete SQL commands.
+ *
+ * 3. ACL LARGE OBJECTS entries need special processing because they
+ * contain only one copy of the ACL GRANT/REVOKE commands, which we must
+ * apply to each large object listed in the associated BLOB METADATA.
*/
if (ropt->noOwner &&
strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
{
ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
}
+ else if (strcmp(te->desc, "BLOB METADATA") == 0)
+ {
+ IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
+ }
+ else if (strcmp(te->desc, "ACL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
+ {
+ IssueACLPerBlob(AH, te);
+ }
else
{
if (te->defn && strlen(te->defn) > 0)
te->owner && strlen(te->owner) > 0 &&
te->dropStmt && strlen(te->dropStmt) > 0)
{
- PQExpBufferData temp;
+ if (strcmp(te->desc, "BLOB METADATA") == 0)
+ {
+ /* BLOB METADATA needs special code to handle multiple LOs */
+ char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
- initPQExpBuffer(&temp);
- _getObjectDescription(&temp, te);
+ IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
+ pg_free(cmdEnd);
+ }
+ else
+ {
+ /* For all other cases, we can use _getObjectDescription */
+ PQExpBufferData temp;
- /*
- * If _getObjectDescription() didn't fill the buffer, then there is no
- * owner.
- */
- if (temp.data[0])
- ahprintf(AH, "ALTER %s OWNER TO %s;\n\n", temp.data, fmtId(te->owner));
- termPQExpBuffer(&temp);
+ initPQExpBuffer(&temp);
+ _getObjectDescription(&temp, te);
+
+ /*
+ * If _getObjectDescription() didn't fill the buffer, then there
+ * is no owner.
+ */
+ if (temp.data[0])
+ ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
+ temp.data, fmtId(te->owner));
+ termPQExpBuffer(&temp);
+ }
}
/*
/* clone has its own error count, too */
clone->public.n_errors = 0;
+ /* clones should not share lo_buf */
+ clone->lo_buf = NULL;
+
/*
* Connect our new clone object to the database, using the same connection
* parameters used for the original connection.
#define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0) /* add
* compression_algorithm
* in header */
+#define K_VERS_1_16 MAKE_ARCHIVE_VERSION(1, 16, 0) /* BLOB METADATA entries
+ * and multiple BLOBS */
/* Current archive version number (the format we can output) */
#define K_VERS_MAJOR 1
-#define K_VERS_MINOR 15
+#define K_VERS_MINOR 16
#define K_VERS_REV 0
#define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV)
extern bool isValidTarHeader(char *header);
extern void ReconnectToServer(ArchiveHandle *AH, const char *dbname);
+extern void IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
+ const char *cmdBegin, const char *cmdEnd);
+extern void IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te);
extern void DropLOIfExists(ArchiveHandle *AH, Oid oid);
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH);
ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
AH->formatData = (void *) ctx;
- /* Initialize LO buffering */
- AH->lo_buf_size = LOBBUFSIZE;
- AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
/*
* Now open the file
*/
}
/*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
* This routine should save whatever format-specific information is needed
* to read the LOs back into memory.
*
}
/*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
*
* Optional.
*/
* share knowledge about where the data blocks are across threads.
* _PrintTocData has to be careful about the order of operations on that
* state, though.
- *
- * Note: we do not make a local lo_buf because we expect at most one BLOBS
- * entry per archive, so no parallelism is possible.
*/
}
ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
}
+/*
+ * Issue per-blob commands for the large object(s) listed in the TocEntry
+ *
+ * The TocEntry's defn string is assumed to consist of large object OIDs,
+ * one per line. Wrap these in the given SQL command fragments and issue
+ * the commands. (cmdEnd need not include a semicolon.)
+ */
void
-DropLOIfExists(ArchiveHandle *AH, Oid oid)
+IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
+ const char *cmdBegin, const char *cmdEnd)
{
- /*
- * If we are not restoring to a direct database connection, we have to
- * guess about how to detect whether the LO exists. Assume new-style.
- */
- if (AH->connection == NULL ||
- PQserverVersion(AH->connection) >= 90000)
+ /* Make a writable copy of the command string */
+ char *buf = pg_strdup(te->defn);
+ char *st;
+ char *en;
+
+ st = buf;
+ while ((en = strchr(st, '\n')) != NULL)
{
- ahprintf(AH,
- "SELECT pg_catalog.lo_unlink(oid) "
- "FROM pg_catalog.pg_largeobject_metadata "
- "WHERE oid = '%u';\n",
- oid);
+ *en++ = '\0';
+ ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd);
+ st = en;
}
- else
+ ahprintf(AH, "\n");
+ pg_free(buf);
+}
+
+/*
+ * Process a "LARGE OBJECTS" ACL TocEntry.
+ *
+ * To save space in the dump file, the TocEntry contains only one copy
+ * of the required GRANT/REVOKE commands, written to apply to the first
+ * blob in the group (although we do not depend on that detail here).
+ * We must expand the text to generate commands for all the blobs listed
+ * in the associated BLOB METADATA entry.
+ */
+void
+IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
+{
+ TocEntry *blobte = getTocEntryByDumpId(AH, te->dependencies[0]);
+ char *buf;
+ char *st;
+ char *st2;
+ char *en;
+ bool inquotes;
+
+ if (!blobte)
+ pg_fatal("could not find entry for ID %d", te->dependencies[0]);
+ Assert(strcmp(blobte->desc, "BLOB METADATA") == 0);
+
+ /* Make a writable copy of the ACL commands string */
+ buf = pg_strdup(te->defn);
+
+ /*
+ * We have to parse out the commands sufficiently to locate the blob OIDs
+ * and find the command-ending semicolons. The commands should not
+ * contain anything hard to parse except for double-quoted role names,
+ * which are easy to ignore. Once we've split apart the first and second
+ * halves of a command, apply IssueCommandPerBlob. (This means the
+ * updates on the blobs are interleaved if there's multiple commands, but
+ * that should cause no trouble.)
+ */
+ inquotes = false;
+ st = en = buf;
+ st2 = NULL;
+ while (*en)
{
- /* Restoring to pre-9.0 server, so do it the old way */
- ahprintf(AH,
- "SELECT CASE WHEN EXISTS("
- "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
- ") THEN pg_catalog.lo_unlink('%u') END;\n",
- oid, oid);
+ /* Ignore double-quoted material */
+ if (*en == '"')
+ inquotes = !inquotes;
+ if (inquotes)
+ {
+ en++;
+ continue;
+ }
+ /* If we found "LARGE OBJECT", that's the end of the first half */
+ if (strncmp(en, "LARGE OBJECT ", 13) == 0)
+ {
+ /* Terminate the first-half string */
+ en += 13;
+ Assert(isdigit((unsigned char) *en));
+ *en++ = '\0';
+ /* Skip the rest of the blob OID */
+ while (isdigit((unsigned char) *en))
+ en++;
+ /* Second half starts here */
+ Assert(st2 == NULL);
+ st2 = en;
+ }
+ /* If we found semicolon, that's the end of the second half */
+ else if (*en == ';')
+ {
+ /* Terminate the second-half string */
+ *en++ = '\0';
+ Assert(st2 != NULL);
+ /* Issue this command for each blob */
+ IssueCommandPerBlob(AH, blobte, st, st2);
+ /* For neatness, skip whitespace before the next command */
+ while (isspace((unsigned char) *en))
+ en++;
+ /* Reset for new command */
+ st = en;
+ st2 = NULL;
+ }
+ else
+ en++;
}
+ pg_free(buf);
+}
+
+void
+DropLOIfExists(ArchiveHandle *AH, Oid oid)
+{
+ ahprintf(AH,
+ "SELECT pg_catalog.lo_unlink(oid) "
+ "FROM pg_catalog.pg_largeobject_metadata "
+ "WHERE oid = '%u';\n",
+ oid);
}
* A directory format dump is a directory, which contains a "toc.dat" file
* for the TOC, and a separate file for each data entry, named ".dat".
* Large objects are stored in separate files named "blob_.dat",
- * and there's a plain-text TOC file for them called "blobs.toc". If
- * compression is used, each data file is individually compressed and the
+ * and there's a plain-text TOC file for each BLOBS TOC entry named
+ * "blobs_.toc" (or just "blobs.toc" in archive versions before 16).
+ *
+ * If compression is used, each data file is individually compressed and the
* ".gz" suffix is added to the filenames. The TOC files are never
* compressed by pg_dump, however they are accepted with the .gz suffix too,
* in case the user has manually compressed them with 'gzip'.
char *directory;
CompressFileHandle *dataFH; /* currently open data file */
- CompressFileHandle *LOsTocFH; /* file handle for blobs.toc */
+ CompressFileHandle *LOsTocFH; /* file handle for blobs_NNN.toc */
ParallelState *pstate; /* for parallel backup / restore */
} lclContext;
static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
-static void _LoadLOs(ArchiveHandle *AH);
+static void _LoadLOs(ArchiveHandle *AH, TocEntry *te);
static void _PrepParallelRestore(ArchiveHandle *AH);
static void _Clone(ArchiveHandle *AH);
ctx->dataFH = NULL;
ctx->LOsTocFH = NULL;
- /* Initialize LO buffering */
- AH->lo_buf_size = LOBBUFSIZE;
- AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
/*
* Now open the TOC file
*/
tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
if (strcmp(te->desc, "BLOBS") == 0)
- tctx->filename = pg_strdup("blobs.toc");
+ {
+ snprintf(fn, MAXPGPATH, "blobs_%d.toc", te->dumpId);
+ tctx->filename = pg_strdup(fn);
+ }
else if (te->dataDumper)
{
snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
return;
if (strcmp(te->desc, "BLOBS") == 0)
- _LoadLOs(AH);
+ _LoadLOs(AH, te);
else
{
char fname[MAXPGPATH];
}
static void
-_LoadLOs(ArchiveHandle *AH)
+_LoadLOs(ArchiveHandle *AH, TocEntry *te)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
CompressFileHandle *CFH;
char tocfname[MAXPGPATH];
char line[MAXPGPATH];
StartRestoreLOs(AH);
- setFilePath(AH, tocfname, "blobs.toc");
+ /*
+ * Note: before archive v16, there was always only one BLOBS TOC entry,
+ * now there can be multiple. We don't need to worry what version we are
+ * reading though, because tctx->filename should be correct either way.
+ */
+ setFilePath(AH, tocfname, tctx->filename);
CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
*/
/*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
* It is called just prior to the dumper's DataDumper routine.
*
* We open the large object TOC file here, so that we can append a line to
_StartLOs(ArchiveHandle *AH, TocEntry *te)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
pg_compress_specification compression_spec = {0};
char fname[MAXPGPATH];
- setFilePath(AH, fname, "blobs.toc");
+ setFilePath(AH, fname, tctx->filename);
/* The LO TOC file is never compressed */
compression_spec.algorithm = PG_COMPRESSION_NONE;
pg_fatal("could not close LO data file: %m");
ctx->dataFH = NULL;
- /* register the LO in blobs.toc */
+ /* register the LO in blobs_NNN.toc */
len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid);
if (!CFH->write_func(buf, len, CFH))
{
}
/*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
*
* We close the LOs TOC file.
*/
}
/*
- * If this is the BLOBS entry, what we stat'd was blobs.toc, which
+ * If this is a BLOBS entry, what we stat'd was blobs_NNN.toc, which
* most likely is a lot smaller than the actual blob data. We don't
* have a cheap way to estimate how much smaller, but fortunately it
* doesn't matter too much as long as we get the LOs processed
ctx = (lclContext *) AH->formatData;
/*
- * Note: we do not make a local lo_buf because we expect at most one BLOBS
- * entry per archive, so no parallelism is possible. Likewise,
* TOC-entry-local state isn't an issue because any one TOC entry is
* touched by just one worker child.
*/
AH->ClonePtr = NULL;
AH->DeClonePtr = NULL;
- /* Initialize LO buffering */
- AH->lo_buf_size = LOBBUFSIZE;
- AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
/*
* Now prevent reading...
*/
}
/*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
* This routine should save whatever format-specific information is needed
* to read the LOs back into memory.
*
}
/*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
*
* Optional.
*/
char *filename;
} lclTocEntry;
-static void _LoadLOs(ArchiveHandle *AH);
+static void _LoadLOs(ArchiveHandle *AH, TocEntry *te);
static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
static void tarClose(ArchiveHandle *AH, TAR_MEMBER *th);
ctx->filePos = 0;
ctx->isSpecialScript = 0;
- /* Initialize LO buffering */
- AH->lo_buf_size = LOBBUFSIZE;
- AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
/*
* Now open the tar file, and load the TOC if we're in read mode.
*/
}
if (strcmp(te->desc, "BLOBS") == 0)
- _LoadLOs(AH);
+ _LoadLOs(AH, te);
else
_PrintFileData(AH, tctx->filename);
}
static void
-_LoadLOs(ArchiveHandle *AH)
+_LoadLOs(ArchiveHandle *AH, TocEntry *te)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
StartRestoreLOs(AH);
- th = tarOpen(AH, NULL, 'r'); /* Open next file */
+ /*
+ * The blobs_NNN.toc or blobs.toc file is fairly useless to us because it
+ * will appear only after the associated blob_NNN.dat files. For archive
+ * versions >= 16 we can look at the BLOBS entry's te->tag to discover the
+ * OID of the first blob we want to restore, and then search forward to
+ * find the appropriate blob_.dat file. For older versions we rely
+ * on the knowledge that there was only one BLOBS entry and just search
+ * for the first blob_.dat file. Once we find the first blob file to
+ * restore, restore all blobs until we reach the blobs[_NNN].toc file.
+ */
+ if (AH->version >= K_VERS_1_16)
+ {
+ /* We rely on atooid to not complain about nnnn..nnnn tags */
+ oid = atooid(te->tag);
+ snprintf(buf, sizeof(buf), "blob_%u.dat", oid);
+ th = tarOpen(AH, buf, 'r'); /* Advance to first desired file */
+ }
+ else
+ th = tarOpen(AH, NULL, 'r'); /* Open next file */
+
while (th != NULL)
{
ctx->FH = th;
/*
* Once we have found the first LO, stop at the first non-LO entry
- * (which will be 'blobs.toc'). This coding would eat all the
- * rest of the archive if there are no LOs ... but this function
- * shouldn't be called at all in that case.
+ * (which will be 'blobs[_NNN].toc'). This coding would eat all
+ * the rest of the archive if there are no LOs ... but this
+ * function shouldn't be called at all in that case.
*/
if (foundLO)
break;
*/
/*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
* This routine should save whatever format-specific information is needed
* to read the LOs back into memory.
*
lclContext *ctx = (lclContext *) AH->formatData;
char fname[K_STD_BUF_SIZE];
- sprintf(fname, "blobs.toc");
+ sprintf(fname, "blobs_%d.toc", te->dumpId);
ctx->loToc = tarOpen(AH, fname, 'w');
}
}
/*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
*
* Optional.
*
*/
#define DUMP_DEFAULT_ROWS_PER_INSERT 1
+/*
+ * Maximum number of large objects to group into a single ArchiveEntry.
+ * At some point we might want to make this user-controllable, but for now
+ * a hard-wired setting will suffice.
+ */
+#define MAX_BLOBS_PER_ARCHIVE_ENTRY 1000
+
/*
* Macro for producing quoted, schema-qualified name of a dumpable object.
*/
static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
const char *type, const char *name, const char *subname,
- const char *nspname, const char *owner,
+ const char *nspname, const char *tag, const char *owner,
const DumpableAcl *dacl);
static void getDependencies(Archive *fout);
dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
qdatname, NULL, NULL,
- dba, &dbdacl);
+ NULL, dba, &dbdacl);
/*
* Now construct a DATABASE PROPERTIES archive entry to restore any
{
DumpOptions *dopt = fout->dopt;
PQExpBuffer loQry = createPQExpBuffer();
- LoInfo *loinfo;
- DumpableObject *lodata;
PGresult *res;
int ntups;
int i;
+ int n;
int i_oid;
int i_lomowner;
int i_lomacl;
pg_log_info("reading large objects");
- /* Fetch LO OIDs, and owner/ACL data */
+ /*
+ * Fetch LO OIDs and owner/ACL data. Order the data so that all the blobs
+ * with the same owner/ACL appear together.
+ */
appendPQExpBufferStr(loQry,
"SELECT oid, lomowner, lomacl, "
"acldefault('L', lomowner) AS acldefault "
- "FROM pg_largeobject_metadata");
+ "FROM pg_largeobject_metadata "
+ "ORDER BY lomowner, lomacl::pg_catalog.text, oid");
res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
/*
- * Each large object has its own "BLOB" archive entry.
+ * Group the blobs into suitably-sized groups that have the same owner and
+ * ACL setting, and build a metadata and a data DumpableObject for each
+ * group. (If we supported initprivs for blobs, we'd have to insist that
+ * groups also share initprivs settings, since the DumpableObject only has
+ * room for one.) i is the index of the first tuple in the current group,
+ * and n is the number of tuples we include in the group.
*/
- loinfo = (LoInfo *) pg_malloc(ntups * sizeof(LoInfo));
+ for (i = 0; i < ntups; i += n)
+ {
+ Oid thisoid = atooid(PQgetvalue(res, i, i_oid));
+ char *thisowner = PQgetvalue(res, i, i_lomowner);
+ char *thisacl = PQgetvalue(res, i, i_lomacl);
+ LoInfo *loinfo;
+ DumpableObject *lodata;
+ char namebuf[64];
+
+ /* Scan to find first tuple not to be included in group */
+ n = 1;
+ while (n < MAX_BLOBS_PER_ARCHIVE_ENTRY && i + n < ntups)
+ {
+ if (strcmp(thisowner, PQgetvalue(res, i + n, i_lomowner)) != 0 ||
+ strcmp(thisacl, PQgetvalue(res, i + n, i_lomacl)) != 0)
+ break;
+ n++;
+ }
- for (i = 0; i < ntups; i++)
- {
- loinfo[i].dobj.objType = DO_LARGE_OBJECT;
- loinfo[i].dobj.catId.tableoid = LargeObjectRelationId;
- loinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
- AssignDumpId(&loinfo[i].dobj);
+ /* Build the metadata DumpableObject */
+ loinfo = (LoInfo *) pg_malloc(offsetof(LoInfo, looids) + n * sizeof(Oid));
- loinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
- loinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_lomacl));
- loinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
- loinfo[i].dacl.privtype = 0;
- loinfo[i].dacl.initprivs = NULL;
- loinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_lomowner));
+ loinfo->dobj.objType = DO_LARGE_OBJECT;
+ loinfo->dobj.catId.tableoid = LargeObjectRelationId;
+ loinfo->dobj.catId.oid = thisoid;
+ AssignDumpId(&loinfo->dobj);
+
+ if (n > 1)
+ snprintf(namebuf, sizeof(namebuf), "%u..%u", thisoid,
+ atooid(PQgetvalue(res, i + n - 1, i_oid)));
+ else
+ snprintf(namebuf, sizeof(namebuf), "%u", thisoid);
+ loinfo->dobj.name = pg_strdup(namebuf);
+ loinfo->dacl.acl = pg_strdup(thisacl);
+ loinfo->dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
+ loinfo->dacl.privtype = 0;
+ loinfo->dacl.initprivs = NULL;
+ loinfo->rolname = getRoleName(thisowner);
+ loinfo->numlos = n;
+ loinfo->looids[0] = thisoid;
+ /* Collect OIDs of the remaining blobs in this group */
+ for (int k = 1; k < n; k++)
+ {
+ CatalogId extraID;
+
+ loinfo->looids[k] = atooid(PQgetvalue(res, i + k, i_oid));
+
+ /* Make sure we can look up loinfo by any of the blobs' OIDs */
+ extraID.tableoid = LargeObjectRelationId;
+ extraID.oid = loinfo->looids[k];
+ recordAdditionalCatalogID(extraID, &loinfo->dobj);
+ }
/* LOs have data */
- loinfo[i].dobj.components |= DUMP_COMPONENT_DATA;
+ loinfo->dobj.components |= DUMP_COMPONENT_DATA;
- /* Mark whether LO has an ACL */
+ /* Mark whether LO group has a non-empty ACL */
if (!PQgetisnull(res, i, i_lomacl))
- loinfo[i].dobj.components |= DUMP_COMPONENT_ACL;
+ loinfo->dobj.components |= DUMP_COMPONENT_ACL;
/*
* In binary-upgrade mode for LOs, we do *not* dump out the LO data,
* pg_largeobject_metadata, after the dump is restored.
*/
if (dopt->binary_upgrade)
- loinfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
- }
+ loinfo->dobj.dump &= ~DUMP_COMPONENT_DATA;
- /*
- * If we have any large objects, a "BLOBS" archive entry is needed. This
- * is just a placeholder for sorting; it carries no data now.
- */
- if (ntups > 0)
- {
+ /*
+ * Create a "BLOBS" data item for the group, too. This is just a
+ * placeholder for sorting; it carries no data now.
+ */
lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
lodata->objType = DO_LARGE_OBJECT_DATA;
lodata->catId = nilCatalogId;
AssignDumpId(lodata);
- lodata->name = pg_strdup("BLOBS");
+ lodata->name = pg_strdup(namebuf);
lodata->components |= DUMP_COMPONENT_DATA;
+ /* Set up explicit dependency from data to metadata */
+ lodata->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
+ lodata->dependencies[0] = loinfo->dobj.dumpId;
+ lodata->nDeps = lodata->allocDeps = 1;
}
PQclear(res);
/*
* dumpLO
*
- * dump the definition (metadata) of the given large object
+ * dump the definition (metadata) of the given large object group
*/
static void
dumpLO(Archive *fout, const LoInfo *loinfo)
{
PQExpBuffer cquery = createPQExpBuffer();
- PQExpBuffer dquery = createPQExpBuffer();
-
- appendPQExpBuffer(cquery,
- "SELECT pg_catalog.lo_create('%s');\n",
- loinfo->dobj.name);
- appendPQExpBuffer(dquery,
- "SELECT pg_catalog.lo_unlink('%s');\n",
- loinfo->dobj.name);
+ /*
+ * The "definition" is just a newline-separated list of OIDs. We need to
+ * put something into the dropStmt too, but it can just be a comment.
+ */
+ for (int i = 0; i < loinfo->numlos; i++)
+ appendPQExpBuffer(cquery, "%u\n", loinfo->looids[i]);
if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
ARCHIVE_OPTS(.tag = loinfo->dobj.name,
.owner = loinfo->rolname,
- .description = "BLOB",
- .section = SECTION_PRE_DATA,
+ .description = "BLOB METADATA",
+ .section = SECTION_DATA,
.createStmt = cquery->data,
- .dropStmt = dquery->data));
-
- /* Dump comment if any */
- if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
- dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
- NULL, loinfo->rolname,
- loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+ .dropStmt = "-- dummy"));
- /* Dump security label if any */
- if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
- dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
- NULL, loinfo->rolname,
- loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+ /*
+ * Dump per-blob comments and seclabels if any. We assume these are rare
+ * enough that it's okay to generate retail TOC entries for them.
+ */
+ if (loinfo->dobj.dump & (DUMP_COMPONENT_COMMENT |
+ DUMP_COMPONENT_SECLABEL))
+ {
+ for (int i = 0; i < loinfo->numlos; i++)
+ {
+ CatalogId catId;
+ char namebuf[32];
+
+ /* Build identifying info for this blob */
+ catId.tableoid = loinfo->dobj.catId.tableoid;
+ catId.oid = loinfo->looids[i];
+ snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[i]);
+
+ if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+ dumpComment(fout, "LARGE OBJECT", namebuf,
+ NULL, loinfo->rolname,
+ catId, 0, loinfo->dobj.dumpId);
+
+ if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+ dumpSecLabel(fout, "LARGE OBJECT", namebuf,
+ NULL, loinfo->rolname,
+ catId, 0, loinfo->dobj.dumpId);
+ }
+ }
- /* Dump ACL if any */
+ /*
+ * Dump the ACLs if any (remember that all blobs in the group will have
+ * the same ACL). If there's just one blob, dump a simple ACL entry; if
+ * there's more, make a "LARGE OBJECTS" entry that really contains only
+ * the ACL for the first blob. _printTocEntry() will be cued by the tag
+ * string to emit a mutated version for each blob.
+ */
if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
- dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
- loinfo->dobj.name, NULL,
- NULL, loinfo->rolname, &loinfo->dacl);
+ {
+ char namebuf[32];
+
+ /* Build identifying info for the first blob */
+ snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[0]);
+
+ if (loinfo->numlos > 1)
+ {
+ char tagbuf[64];
+
+ snprintf(tagbuf, sizeof(tagbuf), "LARGE OBJECTS %u..%u",
+ loinfo->looids[0], loinfo->looids[loinfo->numlos - 1]);
+
+ dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
+ "LARGE OBJECT", namebuf, NULL, NULL,
+ tagbuf, loinfo->rolname, &loinfo->dacl);
+ }
+ else
+ {
+ dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
+ "LARGE OBJECT", namebuf, NULL, NULL,
+ NULL, loinfo->rolname, &loinfo->dacl);
+ }
+ }
destroyPQExpBuffer(cquery);
- destroyPQExpBuffer(dquery);
}
/*
* dumpLOs:
- * dump the data contents of all large objects
+ * dump the data contents of the large objects in the given group
*/
static int
dumpLOs(Archive *fout, const void *arg)
{
- const char *loQry;
- const char *loFetchQry;
+ const LoInfo *loinfo = (const LoInfo *) arg;
PGconn *conn = GetConnection(fout);
- PGresult *res;
char buf[LOBBUFSIZE];
- int ntups;
- int i;
- int cnt;
- pg_log_info("saving large objects");
+ pg_log_info("saving large objects \"%s\"", loinfo->dobj.name);
- /*
- * Currently, we re-fetch all LO OIDs using a cursor. Consider scanning
- * the already-in-memory dumpable objects instead...
- */
- loQry =
- "DECLARE looid CURSOR FOR "
- "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
-
- ExecuteSqlStatement(fout, loQry);
+ for (int i = 0; i < loinfo->numlos; i++)
+ {
+ Oid loOid = loinfo->looids[i];
+ int loFd;
+ int cnt;
- /* Command to fetch from cursor */
- loFetchQry = "FETCH 1000 IN looid";
+ /* Open the LO */
+ loFd = lo_open(conn, loOid, INV_READ);
+ if (loFd == -1)
+ pg_fatal("could not open large object %u: %s",
+ loOid, PQerrorMessage(conn));
- do
- {
- /* Do a fetch */
- res = ExecuteSqlQuery(fout, loFetchQry, PGRES_TUPLES_OK);
+ StartLO(fout, loOid);
- /* Process the tuples, if any */
- ntups = PQntuples(res);
- for (i = 0; i < ntups; i++)
+ /* Now read it in chunks, sending data to archive */
+ do
{
- Oid loOid;
- int loFd;
-
- loOid = atooid(PQgetvalue(res, i, 0));
- /* Open the LO */
- loFd = lo_open(conn, loOid, INV_READ);
- if (loFd == -1)
- pg_fatal("could not open large object %u: %s",
+ cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
+ if (cnt < 0)
+ pg_fatal("error reading large object %u: %s",
loOid, PQerrorMessage(conn));
- StartLO(fout, loOid);
-
- /* Now read it in chunks, sending data to archive */
- do
- {
- cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
- if (cnt < 0)
- pg_fatal("error reading large object %u: %s",
- loOid, PQerrorMessage(conn));
-
- WriteData(fout, buf, cnt);
- } while (cnt > 0);
+ WriteData(fout, buf, cnt);
+ } while (cnt > 0);
- lo_close(conn, loFd);
+ lo_close(conn, loFd);
- EndLO(fout, loOid);
- }
-
- PQclear(res);
- } while (ntups > 0);
+ EndLO(fout, loOid);
+ }
return 1;
}
case DO_LARGE_OBJECT_DATA:
if (dobj->dump & DUMP_COMPONENT_DATA)
{
+ LoInfo *loinfo;
TocEntry *te;
+ loinfo = (LoInfo *) findObjectByDumpId(dobj->dependencies[0]);
+ if (loinfo == NULL)
+ pg_fatal("missing metadata for large objects \"%s\"",
+ dobj->name);
+
te = ArchiveEntry(fout, dobj->catId, dobj->dumpId,
ARCHIVE_OPTS(.tag = dobj->name,
+ .owner = loinfo->rolname,
.description = "BLOBS",
.section = SECTION_DATA,
- .dumpFn = dumpLOs));
+ .deps = dobj->dependencies,
+ .nDeps = dobj->nDeps,
+ .dumpFn = dumpLOs,
+ .dumpArg = loinfo));
/*
* Set the TocEntry's dataLength in case we are doing a
* parallel dump and want to order dump jobs by table size.
* (We need some size estimate for every TocEntry with a
* DataDumper function.) We don't currently have any cheap
- * way to estimate the size of LOs, but it doesn't matter;
- * let's just set the size to a large value so parallel dumps
- * will launch this job first. If there's lots of LOs, we
- * win, and if there aren't, we don't lose much. (If you want
- * to improve on this, really what you should be thinking
- * about is allowing LO dumping to be parallelized, not just
- * getting a smarter estimate for the single TOC entry.)
+ * way to estimate the size of LOs, but fortunately it doesn't
+ * matter too much as long as we get large batches of LOs
+ * processed reasonably early. Assume 8K per blob.
*/
- te->dataLength = INT_MAX;
+ te->dataLength = loinfo->numlos * (pgoff_t) 8192;
}
break;
case DO_POLICY:
if (nspinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, nspinfo->dobj.dumpId, InvalidDumpId, "SCHEMA",
qnspname, NULL, NULL,
- nspinfo->rolname, &nspinfo->dacl);
+ NULL, nspinfo->rolname, &nspinfo->dacl);
free(qnspname);
dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
qtypname, NULL,
tyinfo->dobj.namespace->dobj.name,
- tyinfo->rolname, &tyinfo->dacl);
+ NULL, tyinfo->rolname, &tyinfo->dacl);
PQclear(res);
destroyPQExpBuffer(q);
dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
qtypname, NULL,
tyinfo->dobj.namespace->dobj.name,
- tyinfo->rolname, &tyinfo->dacl);
+ NULL, tyinfo->rolname, &tyinfo->dacl);
PQclear(res);
destroyPQExpBuffer(q);
dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
qtypname, NULL,
tyinfo->dobj.namespace->dobj.name,
- tyinfo->rolname, &tyinfo->dacl);
+ NULL, tyinfo->rolname, &tyinfo->dacl);
destroyPQExpBuffer(q);
destroyPQExpBuffer(delq);
dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
qtypname, NULL,
tyinfo->dobj.namespace->dobj.name,
- tyinfo->rolname, &tyinfo->dacl);
+ NULL, tyinfo->rolname, &tyinfo->dacl);
PQclear(res);
destroyPQExpBuffer(q);
dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
qtypname, NULL,
tyinfo->dobj.namespace->dobj.name,
- tyinfo->rolname, &tyinfo->dacl);
+ NULL, tyinfo->rolname, &tyinfo->dacl);
/* Dump any per-constraint comments */
for (i = 0; i < tyinfo->nDomChecks; i++)
dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
qtypname, NULL,
tyinfo->dobj.namespace->dobj.name,
- tyinfo->rolname, &tyinfo->dacl);
+ NULL, tyinfo->rolname, &tyinfo->dacl);
/* Dump any per-column comments */
if (tyinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
if (plang->lanpltrusted && plang->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, plang->dobj.dumpId, InvalidDumpId, "LANGUAGE",
qlanname, NULL, NULL,
- plang->lanowner, &plang->dacl);
+ NULL, plang->lanowner, &plang->dacl);
free(qlanname);
dumpACL(fout, finfo->dobj.dumpId, InvalidDumpId, keyword,
funcsig, NULL,
finfo->dobj.namespace->dobj.name,
- finfo->rolname, &finfo->dacl);
+ NULL, finfo->rolname, &finfo->dacl);
PQclear(res);
dumpACL(fout, agginfo->aggfn.dobj.dumpId, InvalidDumpId,
"FUNCTION", aggsig, NULL,
agginfo->aggfn.dobj.namespace->dobj.name,
- agginfo->aggfn.rolname, &agginfo->aggfn.dacl);
+ NULL, agginfo->aggfn.rolname, &agginfo->aggfn.dacl);
free(aggsig);
free(aggfullsig);
/* Handle the ACL */
if (fdwinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, fdwinfo->dobj.dumpId, InvalidDumpId,
- "FOREIGN DATA WRAPPER", qfdwname, NULL,
+ "FOREIGN DATA WRAPPER", qfdwname, NULL, NULL,
NULL, fdwinfo->rolname, &fdwinfo->dacl);
free(qfdwname);
/* Handle the ACL */
if (srvinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, srvinfo->dobj.dumpId, InvalidDumpId,
- "FOREIGN SERVER", qsrvname, NULL,
+ "FOREIGN SERVER", qsrvname, NULL, NULL,
NULL, srvinfo->rolname, &srvinfo->dacl);
/* Dump user mappings */
* 'subname' is the formatted name of the sub-object, if any. Must be quoted.
* (Currently we assume that subname is only provided for table columns.)
* 'nspname' is the namespace the object is in (NULL if none).
+ * 'tag' is the tag to use for the ACL TOC entry; typically, this is NULL
+ * to use the default for the object type.
* 'owner' is the owner, NULL if there is no owner (for languages).
* 'dacl' is the DumpableAcl struct for the object.
*
static DumpId
dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
const char *type, const char *name, const char *subname,
- const char *nspname, const char *owner,
+ const char *nspname, const char *tag, const char *owner,
const DumpableAcl *dacl)
{
DumpId aclDumpId = InvalidDumpId;
if (sql->len > 0)
{
- PQExpBuffer tag = createPQExpBuffer();
+ PQExpBuffer tagbuf = createPQExpBuffer();
DumpId aclDeps[2];
int nDeps = 0;
- if (subname)
- appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
+ if (tag)
+ appendPQExpBufferStr(tagbuf, tag);
+ else if (subname)
+ appendPQExpBuffer(tagbuf, "COLUMN %s.%s", name, subname);
else
- appendPQExpBuffer(tag, "%s %s", type, name);
+ appendPQExpBuffer(tagbuf, "%s %s", type, name);
aclDeps[nDeps++] = objDumpId;
if (altDumpId != InvalidDumpId)
aclDumpId = createDumpId();
ArchiveEntry(fout, nilCatalogId, aclDumpId,
- ARCHIVE_OPTS(.tag = tag->data,
+ ARCHIVE_OPTS(.tag = tagbuf->data,
.namespace = nspname,
.owner = owner,
.description = "ACL",
.deps = aclDeps,
.nDeps = nDeps));
- destroyPQExpBuffer(tag);
+ destroyPQExpBuffer(tagbuf);
}
destroyPQExpBuffer(sql);
tableAclDumpId =
dumpACL(fout, tbinfo->dobj.dumpId, InvalidDumpId,
objtype, namecopy, NULL,
- tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
- &tbinfo->dacl);
+ tbinfo->dobj.namespace->dobj.name,
+ NULL, tbinfo->rolname, &tbinfo->dacl);
}
/*
*/
dumpACL(fout, tbinfo->dobj.dumpId, tableAclDumpId,
"TABLE", namecopy, attnamecopy,
- tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
- &coldacl);
+ tbinfo->dobj.namespace->dobj.name,
+ NULL, tbinfo->rolname, &coldacl);
free(attnamecopy);
}
PQclear(res);
case DO_FDW:
case DO_FOREIGN_SERVER:
case DO_TRANSFORM:
- case DO_LARGE_OBJECT:
/* Pre-data objects: must come before the pre-data boundary */
addObjectDependency(preDataBound, dobj->dumpId);
break;
case DO_TABLE_DATA:
case DO_SEQUENCE_SET:
+ case DO_LARGE_OBJECT:
case DO_LARGE_OBJECT_DATA:
/* Data objects: must come between the boundaries */
addObjectDependency(dobj, preDataBound->dumpId);
char defaclobjtype;
} DefaultACLInfo;
+/*
+ * LoInfo represents a group of large objects (blobs) that share the same
+ * owner and ACL setting. dobj.components has the DUMP_COMPONENT_COMMENT bit
+ * set if any blob in the group has a comment; similarly for sec labels.
+ * If there are many blobs with the same owner/ACL, we can divide them into
+ * multiple LoInfo groups, which will each spawn a BLOB METADATA and a BLOBS
+ * (data) TOC entry. This allows more parallelism during restore.
+ */
typedef struct _loInfo
{
DumpableObject dobj;
DumpableAcl dacl;
const char *rolname;
+ int numlos;
+ Oid looids[FLEXIBLE_ARRAY_MEMBER];
} LoInfo;
/*
extern TableInfo *getSchemaData(Archive *fout, int *numTablesPtr);
extern void AssignDumpId(DumpableObject *dobj);
+extern void recordAdditionalCatalogID(CatalogId catId, DumpableObject *dobj);
extern DumpId createDumpId(void);
extern DumpId getMaxDumpId(void);
extern DumpableObject *findObjectByDumpId(DumpId dumpId);
'--format=directory', '--compress=gzip:1',
"--file=$tempdir/compression_gzip_dir", 'postgres',
],
- # Give coverage for manually compressed blob.toc files during
+ # Give coverage for manually compressed blobs.toc files during
# restore.
compress_cmd => {
program => $ENV{'GZIP_PROGRAM'},
- args => [ '-f', "$tempdir/compression_gzip_dir/blobs.toc", ],
+ args => [ '-f', "$tempdir/compression_gzip_dir/blobs_*.toc", ],
},
# Verify that only data files were compressed
glob_patterns => [
'--format=directory', '--compress=lz4:1',
"--file=$tempdir/compression_lz4_dir", 'postgres',
],
- # Give coverage for manually compressed blob.toc files during
- # restore.
- compress_cmd => {
- program => $ENV{'LZ4'},
- args => [
- '-z', '-f', '--rm',
- "$tempdir/compression_lz4_dir/blobs.toc",
- "$tempdir/compression_lz4_dir/blobs.toc.lz4",
- ],
- },
# Verify that data files were compressed
glob_patterns => [
"$tempdir/compression_lz4_dir/toc.dat",
'--format=directory', '--compress=zstd:1',
"--file=$tempdir/compression_zstd_dir", 'postgres',
],
- # Give coverage for manually compressed blob.toc files during
+ # Give coverage for manually compressed blobs.toc files during
# restore.
compress_cmd => {
program => $ENV{'ZSTD'},
args => [
'-z', '-f',
- '--rm', "$tempdir/compression_zstd_dir/blobs.toc",
- "-o", "$tempdir/compression_zstd_dir/blobs.toc.zst",
+ '--rm', "$tempdir/compression_zstd_dir/blobs_*.toc",
],
},
# Verify that data files were compressed
},
glob_patterns => [
"$tempdir/defaults_dir_format/toc.dat",
- "$tempdir/defaults_dir_format/blobs.toc",
+ "$tempdir/defaults_dir_format/blobs_*.toc",
$supports_gzip ? "$tempdir/defaults_dir_format/*.dat.gz"
: "$tempdir/defaults_dir_format/*.dat",
],
column_inserts => 1,
data_only => 1,
inserts => 1,
- section_pre_data => 1,
+ section_data => 1,
test_schema_plus_large_objects => 1,
},
unlike => {
column_inserts => 1,
data_only => 1,
inserts => 1,
- section_pre_data => 1,
+ section_data => 1,
test_schema_plus_large_objects => 1,
},
unlike => {
column_inserts => 1,
data_only => 1,
inserts => 1,
- section_pre_data => 1,
+ section_data => 1,
test_schema_plus_large_objects => 1,
},
unlike => {
column_inserts => 1,
data_only => 1,
inserts => 1,
- section_pre_data => 1,
+ section_data => 1,
test_schema_plus_large_objects => 1,
binary_upgrade => 1,
},
# not defined.
next if (!defined($compress_program) || $compress_program eq '');
- my @full_compress_cmd =
- ($compress_cmd->{program}, @{ $compress_cmd->{args} });
+ # Arguments may require globbing.
+ my @full_compress_cmd = ($compress_program);
+ foreach my $arg (@{ $compress_cmd->{args} })
+ {
+ push @full_compress_cmd, glob($arg);
+ }
+
command_ok(\@full_compress_cmd, "$run: compression commands");
}