Optimize fileset usage in apply worker.
authorAmit Kapila
Thu, 2 Sep 2021 02:43:46 +0000 (08:13 +0530)
committerAmit Kapila
Thu, 2 Sep 2021 02:43:46 +0000 (08:13 +0530)
Use one fileset for the entire worker lifetime instead of using
separate filesets for each streaming transaction. Now, the
changes/subxacts files for every streaming transaction will be
created under the same fileset and the files will be deleted
after the transaction is completed.

This patch extends the BufFileOpenFileSet and BufFileDeleteFileSet
APIs to allow users to specify whether to give an error on missing
files.

Author: Dilip Kumar, based on suggestion by Thomas Munro
Reviewed-by: Hou Zhijie, Masahiko Sawada, Amit Kapila
Discussion: https://postgr.es/m/[email protected]

src/backend/replication/logical/launcher.c
src/backend/replication/logical/worker.c
src/backend/storage/file/buffile.c
src/backend/utils/sort/logtape.c
src/backend/utils/sort/sharedtuplestore.c
src/include/replication/worker_internal.h
src/include/storage/buffile.h

index 8b1772db69eb1494053cb204a5049ca249590647..3fb4caa8033c662db866fe42a27c3c10957d3038 100644 (file)
@@ -379,6 +379,7 @@ retry:
    worker->relid = relid;
    worker->relstate = SUBREL_STATE_UNKNOWN;
    worker->relstate_lsn = InvalidXLogRecPtr;
+   worker->stream_fileset = NULL;
    worker->last_lsn = InvalidXLogRecPtr;
    TIMESTAMP_NOBEGIN(worker->last_send_time);
    TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -648,8 +649,9 @@ logicalrep_worker_onexit(int code, Datum arg)
 
    logicalrep_worker_detach();
 
-   /* Cleanup filesets used for streaming transactions. */
-   logicalrep_worker_cleanupfileset();
+   /* Cleanup fileset used for streaming transactions. */
+   if (MyLogicalRepWorker->stream_fileset != NULL)
+       FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
 
    ApplyLauncherWakeup();
 }
index bfb7d1a261ca0e49f99a3bc0c98fce4e8c25f641..8d96c926b4f2e35ba5fed9fd30422f7712569487 100644 (file)
@@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
    .ts = 0,
 };
 
-/*
- * Stream xid hash entry. Whenever we see a new xid we create this entry in the
- * xidhash and along with it create the streaming file and store the fileset handle.
- * The subxact file is created iff there is any subxact info under this xid. This
- * entry is used on the subsequent streams for the xid to get the corresponding
- * fileset handles, so storing them in hash makes the search faster.
- */
-typedef struct StreamXidHash
-{
-   TransactionId xid;          /* xid is the hash key and must be first */
-   FileSet    *stream_fileset; /* file set for stream data */
-   FileSet    *subxact_fileset;    /* file set for subxact info */
-} StreamXidHash;
-
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -269,12 +255,6 @@ static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
-/*
- * Hash table for storing the streaming xid information along with filesets
- * for streaming and subxact files.
- */
-static HTAB *xidhash = NULL;
-
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -1118,7 +1098,6 @@ static void
 apply_handle_stream_start(StringInfo s)
 {
    bool        first_segment;
-   HASHCTL     hash_ctl;
 
    if (in_streamed_transaction)
        ereport(ERROR,
@@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s)
    set_apply_error_context_xact(stream_xid, 0);
 
    /*
-    * Initialize the xidhash table if we haven't yet. This will be used for
-    * the entire duration of the apply worker so create it in permanent
-    * context.
+    * Initialize the worker's stream_fileset if we haven't yet. This will be
+    * used for the entire duration of the worker so create it in a permanent
+    * context. We create this on the very first streaming message from any
+    * transaction and then use it for this and other streaming transactions.
+    * Now, we could create a fileset at the start of the worker as well but
+    * then we won't be sure that it will ever be used.
     */
-   if (xidhash == NULL)
+   if (MyLogicalRepWorker->stream_fileset == NULL)
    {
-       hash_ctl.keysize = sizeof(TransactionId);
-       hash_ctl.entrysize = sizeof(StreamXidHash);
-       hash_ctl.hcxt = ApplyContext;
-       xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
-                             HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+       MemoryContext oldctx;
+
+       oldctx = MemoryContextSwitchTo(ApplyContext);
+
+       MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
+       FileSetInit(MyLogicalRepWorker->stream_fileset);
+
+       MemoryContextSwitchTo(oldctx);
    }
 
    /* open the spool file for this transaction */
@@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s)
        BufFile    *fd;
        bool        found = false;
        char        path[MAXPGPATH];
-       StreamXidHash *ent;
 
        set_apply_error_context_xact(subxid, 0);
 
@@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s)
            return;
        }
 
-       ent = (StreamXidHash *) hash_search(xidhash,
-                                           (void *) &xid,
-                                           HASH_FIND,
-                                           NULL);
-       if (!ent)
-           ereport(ERROR,
-                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                    errmsg_internal("transaction %u not found in stream XID hash table",
-                                    xid)));
-
        /* open the changes file */
        changes_filename(path, MyLogicalRepWorker->subid, xid);
-       fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+       fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
+                               O_RDWR, false);
 
        /* OK, truncate the file at the right offset */
        BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
@@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
    int         nchanges;
    char        path[MAXPGPATH];
    char       *buffer = NULL;
-   StreamXidHash *ent;
    MemoryContext oldcxt;
    BufFile    *fd;
 
@@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
    changes_filename(path, MyLogicalRepWorker->subid, xid);
    elog(DEBUG1, "replaying changes from file \"%s\"", path);
 
-   ent = (StreamXidHash *) hash_search(xidhash,
-                                       (void *) &xid,
-                                       HASH_FIND,
-                                       NULL);
-   if (!ent)
-       ereport(ERROR,
-               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                errmsg_internal("transaction %u not found in stream XID hash table",
-                                xid)));
-
-   fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
+   fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+                           false);
 
    buffer = palloc(BLCKSZ);
    initStringInfo(&s2);
@@ -2541,30 +2506,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
    }
 }
 
-/*
- * Cleanup filesets.
- */
-void
-logicalrep_worker_cleanupfileset(void)
-{
-   HASH_SEQ_STATUS status;
-   StreamXidHash *hentry;
-
-   /* Remove all the pending stream and subxact filesets. */
-   if (xidhash)
-   {
-       hash_seq_init(&status, xidhash);
-       while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
-       {
-           FileSetDeleteAll(hentry->stream_fileset);
-
-           /* Delete the subxact fileset iff it is created. */
-           if (hentry->subxact_fileset)
-               FileSetDeleteAll(hentry->subxact_fileset);
-       }
-   }
-}
-
 /*
  * Apply main loop.
  */
@@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid)
 {
    char        path[MAXPGPATH];
    Size        len;
-   StreamXidHash *ent;
    BufFile    *fd;
 
    Assert(TransactionIdIsValid(xid));
 
-   /* Find the xid entry in the xidhash */
-   ent = (StreamXidHash *) hash_search(xidhash,
-                                       (void *) &xid,
-                                       HASH_FIND,
-                                       NULL);
-   /* By this time we must have created the transaction entry */
-   Assert(ent);
+   /* construct the subxact filename */
+   subxact_filename(path, subid, xid);
 
-   /*
-    * If there is no subtransaction then nothing to do, but if already have
-    * subxact file then delete that.
-    */
+   /* Delete the subxacts file, if exists. */
    if (subxact_data.nsubxacts == 0)
    {
-       if (ent->subxact_fileset)
-       {
-           cleanup_subxact_info();
-           FileSetDeleteAll(ent->subxact_fileset);
-           pfree(ent->subxact_fileset);
-           ent->subxact_fileset = NULL;
-       }
+       cleanup_subxact_info();
+       BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
+
        return;
    }
 
-   subxact_filename(path, subid, xid);
-
    /*
     * Create the subxact file if it not already created, otherwise open the
     * existing file.
     */
-   if (ent->subxact_fileset == NULL)
-   {
-       MemoryContext oldctx;
-
-       /*
-        * We need to maintain fileset across multiple stream start/stop
-        * calls. So, need to allocate it in a persistent context.
-        */
-       oldctx = MemoryContextSwitchTo(ApplyContext);
-       ent->subxact_fileset = palloc(sizeof(FileSet));
-       FileSetInit(ent->subxact_fileset);
-       MemoryContextSwitchTo(oldctx);
-
-       fd = BufFileCreateFileSet(ent->subxact_fileset, path);
-   }
-   else
-       fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
+   fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
+                           true);
+   if (fd == NULL)
+       fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
 
    len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
 
@@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid)
    char        path[MAXPGPATH];
    Size        len;
    BufFile    *fd;
-   StreamXidHash *ent;
    MemoryContext oldctx;
 
    Assert(!subxact_data.subxacts);
    Assert(subxact_data.nsubxacts == 0);
    Assert(subxact_data.nsubxacts_max == 0);
 
-   /* Find the stream xid entry in the xidhash */
-   ent = (StreamXidHash *) hash_search(xidhash,
-                                       (void *) &xid,
-                                       HASH_FIND,
-                                       NULL);
-   if (!ent)
-       ereport(ERROR,
-               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                errmsg_internal("transaction %u not found in stream XID hash table",
-                                xid)));
-
    /*
-    * If subxact_fileset is not valid that mean we don't have any subxact
-    * info
+    * If the subxact file doesn't exist that means we don't have any subxact
+    * info.
     */
-   if (ent->subxact_fileset == NULL)
-       return;
-
    subxact_filename(path, subid, xid);
-
-   fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
+   fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+                           true);
+   if (fd == NULL)
+       return;
 
    /* read number of subxact items */
    if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid)
  *   Cleanup files for a subscription / toplevel transaction.
  *
  * Remove files with serialized changes and subxact info for a particular
- * toplevel transaction. Each subscription has a separate set of files.
+ * toplevel transaction. Each subscription has a separate set of files
+ * for any toplevel transaction.
  */
 static void
 stream_cleanup_files(Oid subid, TransactionId xid)
 {
    char        path[MAXPGPATH];
-   StreamXidHash *ent;
-
-   /* Find the xid entry in the xidhash */
-   ent = (StreamXidHash *) hash_search(xidhash,
-                                       (void *) &xid,
-                                       HASH_FIND,
-                                       NULL);
-   if (!ent)
-       ereport(ERROR,
-               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                errmsg_internal("transaction %u not found in stream XID hash table",
-                                xid)));
 
-   /* Delete the change file and release the stream fileset memory */
+   /* Delete the changes file. */
    changes_filename(path, subid, xid);
-   FileSetDeleteAll(ent->stream_fileset);
-   pfree(ent->stream_fileset);
-   ent->stream_fileset = NULL;
+   BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
 
-   /* Delete the subxact file and release the memory, if it exist */
-   if (ent->subxact_fileset)
-   {
-       subxact_filename(path, subid, xid);
-       FileSetDeleteAll(ent->subxact_fileset);
-       pfree(ent->subxact_fileset);
-       ent->subxact_fileset = NULL;
-   }
-
-   /* Remove the xid entry from the stream xid hash */
-   hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
+   /* Delete the subxact file, if it exists. */
+   subxact_filename(path, subid, xid);
+   BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
 }
 
 /*
@@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
  *
  * Open a file for streamed changes from a toplevel transaction identified
  * by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the fileset and create the buffile,
- * otherwise open the previously created file.
+ * changes for this transaction, create the buffile, otherwise open the
+ * previously created file.
  *
  * This can only be called at the beginning of a "streaming" block, i.e.
  * between stream_start/stream_stop messages from the upstream.
@@ -3322,20 +3201,13 @@ static void
 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 {
    char        path[MAXPGPATH];
-   bool        found;
    MemoryContext oldcxt;
-   StreamXidHash *ent;
 
    Assert(in_streamed_transaction);
    Assert(OidIsValid(subid));
    Assert(TransactionIdIsValid(xid));
    Assert(stream_fd == NULL);
 
-   /* create or find the xid entry in the xidhash */
-   ent = (StreamXidHash *) hash_search(xidhash,
-                                       (void *) &xid,
-                                       HASH_ENTER,
-                                       &found);
 
    changes_filename(path, subid, xid);
    elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
@@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
    oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
 
    /*
-    * If this is the first streamed segment, the file must not exist, so make
-    * sure we're the ones creating it. Otherwise just open the file for
-    * writing, in append mode.
+    * If this is the first streamed segment, create the changes file.
+    * Otherwise, just open the file for writing, in append mode.
     */
    if (first_segment)
-   {
-       MemoryContext savectx;
-       FileSet    *fileset;
-
-       if (found)
-           ereport(ERROR,
-                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                    errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
-       /*
-        * We need to maintain fileset across multiple stream start/stop
-        * calls. So, need to allocate it in a persistent context.
-        */
-       savectx = MemoryContextSwitchTo(ApplyContext);
-       fileset = palloc(sizeof(FileSet));
-
-       FileSetInit(fileset);
-       MemoryContextSwitchTo(savectx);
-
-       stream_fd = BufFileCreateFileSet(fileset, path);
-
-       /* Remember the fileset for the next stream of the same transaction */
-       ent->xid = xid;
-       ent->stream_fileset = fileset;
-       ent->subxact_fileset = NULL;
-   }
+       stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
+                                        path);
    else
    {
-       if (!found)
-           ereport(ERROR,
-                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                    errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
        /*
         * Open the file and seek to the end of the file because we always
         * append the changes file.
         */
-       stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+       stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
+                                      path, O_RDWR, false);
        BufFileSeek(stream_fd, 0, 0, SEEK_END);
    }
 
index 5e5409d84d932770f15048d17c3a8da1b1d2904e..ff3aa67cde0aa8016690f45bf8d73c439fca7eb6 100644 (file)
@@ -278,10 +278,13 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
  * with BufFileCreateFileSet in the same FileSet using the same name.
  * The backend that created the file must have called BufFileClose() or
  * BufFileExportFileSet() to make sure that it is ready to be opened by other
- * backends and render it read-only.
+ * backends and render it read-only.  If missing_ok is true, which indicates
+ * that missing files can be safely ignored, then return NULL if the BufFile
+ * with the given name is not found, otherwise, throw an error.
  */
 BufFile *
-BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
+BufFileOpenFileSet(FileSet *fileset, const char *name, int mode,
+                  bool missing_ok)
 {
    BufFile    *file;
    char        segment_name[MAXPGPATH];
@@ -318,10 +321,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
     * name.
     */
    if (nfiles == 0)
+   {
+       /* free the memory */
+       pfree(files);
+
+       if (missing_ok)
+           return NULL;
+
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m",
                        segment_name, name)));
+   }
 
    file = makeBufFileCommon(nfiles);
    file->files = files;
@@ -341,10 +352,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
  * the FileSet to be cleaned up.
  *
  * Only one backend should attempt to delete a given name, and should know
- * that it exists and has been exported or closed.
+ * that it exists and has been exported or closed otherwise missing_ok should
+ * be passed true.
  */
 void
-BufFileDeleteFileSet(FileSet *fileset, const char *name)
+BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
 {
    char        segment_name[MAXPGPATH];
    int         segment = 0;
@@ -366,7 +378,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name)
        CHECK_FOR_INTERRUPTS();
    }
 
-   if (!found)
+   if (!found && !missing_ok)
        elog(ERROR, "could not delete unknown BufFile \"%s\"", name);
 }
 
index f7994d771d6ff2e7404b889d73a873f6e1a097c1..debf12e1b0bd4e2cc12489e5e9e46766b06aa0c0 100644 (file)
@@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
        lt = <s->tapes[i];
 
        pg_itoa(i, filename);
-       file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY);
+       file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
        filesize = BufFileSize(file);
 
        /*
index 504ef1c2869d0e93b2082e7c4b5acb305b0ed93d..033088f9bc1d1e8259b4f4206d3ef503cfbfae0c 100644 (file)
@@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 
                sts_filename(name, accessor, accessor->read_participant);
                accessor->read_file =
-                   BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY);
+                   BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
+                                      false);
            }
 
            /* Seek and load the chunk header. */
index a6c9d4e2a107124a8862e5ae01b8e5d63d91d43a..c00be2a2b6f4af7b7ba079eef279e4b1d6483c8c 100644 (file)
@@ -50,6 +50,15 @@ typedef struct LogicalRepWorker
    XLogRecPtr  relstate_lsn;
    slock_t     relmutex;
 
+   /*
+    * Used to create the changes and subxact files for the streaming
+    * transactions.  Upon the arrival of the first streaming transaction, the
+    * fileset will be initialized, and it will be deleted when the worker
+    * exits.  Under this, separate buffiles would be created for each
+    * transaction which will be deleted after the transaction is finished.
+    */
+   FileSet    *stream_fileset;
+
    /* Stats. */
    XLogRecPtr  last_lsn;
    TimestampTz last_send_time;
@@ -79,7 +88,6 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
-extern void logicalrep_worker_cleanupfileset(void);
 
 extern int logicalrep_sync_worker_count(Oid subid);
 
index 143eada85fe5bb21a6bd334a9967b0f21d517954..7ae5ea2dde0b0c028ec22d6e3b89e2b4967c5a89 100644 (file)
@@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source);
 extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
 extern void BufFileExportFileSet(BufFile *file);
 extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
-                                  int mode);
-extern void BufFileDeleteFileSet(FileSet *fileset, const char *name);
+                                  int mode, bool missing_ok);
+extern void BufFileDeleteFileSet(FileSet *fileset, const char *name,
+                                bool missing_ok);
 extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
 
 #endif                         /* BUFFILE_H */