.ts = 0,
};
-/*
- * Stream xid hash entry. Whenever we see a new xid we create this entry in the
- * xidhash and along with it create the streaming file and store the fileset handle.
- * The subxact file is created iff there is any subxact info under this xid. This
- * entry is used on the subsequent streams for the xid to get the corresponding
- * fileset handles, so storing them in hash makes the search faster.
- */
-typedef struct StreamXidHash
-{
- TransactionId xid; /* xid is the hash key and must be first */
- FileSet *stream_fileset; /* file set for stream data */
- FileSet *subxact_fileset; /* file set for subxact info */
-} StreamXidHash;
-
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
static TransactionId stream_xid = InvalidTransactionId;
-/*
- * Hash table for storing the streaming xid information along with filesets
- * for streaming and subxact files.
- */
-static HTAB *xidhash = NULL;
-
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
apply_handle_stream_start(StringInfo s)
{
bool first_segment;
- HASHCTL hash_ctl;
if (in_streamed_transaction)
ereport(ERROR,
set_apply_error_context_xact(stream_xid, 0);
/*
- * Initialize the xidhash table if we haven't yet. This will be used for
- * the entire duration of the apply worker so create it in permanent
- * context.
+ * Initialize the worker's stream_fileset if we haven't yet. This will be
+ * used for the entire duration of the worker so create it in a permanent
+ * context. We create this on the very first streaming message from any
+ * transaction and then use it for this and other streaming transactions.
+ * Now, we could create a fileset at the start of the worker as well but
+ * then we won't be sure that it will ever be used.
*/
- if (xidhash == NULL)
+ if (MyLogicalRepWorker->stream_fileset == NULL)
{
- hash_ctl.keysize = sizeof(TransactionId);
- hash_ctl.entrysize = sizeof(StreamXidHash);
- hash_ctl.hcxt = ApplyContext;
- xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
+ FileSetInit(MyLogicalRepWorker->stream_fileset);
+
+ MemoryContextSwitchTo(oldctx);
}
/* open the spool file for this transaction */
BufFile *fd;
bool found = false;
char path[MAXPGPATH];
- StreamXidHash *ent;
set_apply_error_context_xact(subxid, 0);
return;
}
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
- fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
+ O_RDWR, false);
/* OK, truncate the file at the right offset */
BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
- fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+ false);
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
}
}
-/*
- * Cleanup filesets.
- */
-void
-logicalrep_worker_cleanupfileset(void)
-{
- HASH_SEQ_STATUS status;
- StreamXidHash *hentry;
-
- /* Remove all the pending stream and subxact filesets. */
- if (xidhash)
- {
- hash_seq_init(&status, xidhash);
- while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
- {
- FileSetDeleteAll(hentry->stream_fileset);
-
- /* Delete the subxact fileset iff it is created. */
- if (hentry->subxact_fileset)
- FileSetDeleteAll(hentry->subxact_fileset);
- }
- }
-}
-
/*
* Apply main loop.
*/
{
char path[MAXPGPATH];
Size len;
- StreamXidHash *ent;
BufFile *fd;
Assert(TransactionIdIsValid(xid));
- /* Find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- /* By this time we must have created the transaction entry */
- Assert(ent);
+ /* construct the subxact filename */
+ subxact_filename(path, subid, xid);
- /*
- * If there is no subtransaction then nothing to do, but if already have
- * subxact file then delete that.
- */
+ /* Delete the subxacts file, if exists. */
if (subxact_data.nsubxacts == 0)
{
- if (ent->subxact_fileset)
- {
- cleanup_subxact_info();
- FileSetDeleteAll(ent->subxact_fileset);
- pfree(ent->subxact_fileset);
- ent->subxact_fileset = NULL;
- }
+ cleanup_subxact_info();
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
+
return;
}
- subxact_filename(path, subid, xid);
-
/*
* Create the subxact file if it not already created, otherwise open the
* existing file.
*/
- if (ent->subxact_fileset == NULL)
- {
- MemoryContext oldctx;
-
- /*
- * We need to maintain fileset across multiple stream start/stop
- * calls. So, need to allocate it in a persistent context.
- */
- oldctx = MemoryContextSwitchTo(ApplyContext);
- ent->subxact_fileset = palloc(sizeof(FileSet));
- FileSetInit(ent->subxact_fileset);
- MemoryContextSwitchTo(oldctx);
-
- fd = BufFileCreateFileSet(ent->subxact_fileset, path);
- }
- else
- fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
+ true);
+ if (fd == NULL)
+ fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
char path[MAXPGPATH];
Size len;
BufFile *fd;
- StreamXidHash *ent;
MemoryContext oldctx;
Assert(!subxact_data.subxacts);
Assert(subxact_data.nsubxacts == 0);
Assert(subxact_data.nsubxacts_max == 0);
- /* Find the stream xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
/*
- * If subxact_fileset is not valid that mean we don't have any subxact
- * info
+ * If the subxact file doesn't exist that means we don't have any subxact
+ * info.
*/
- if (ent->subxact_fileset == NULL)
- return;
-
subxact_filename(path, subid, xid);
-
- fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+ true);
+ if (fd == NULL)
+ return;
/* read number of subxact items */
if (BufFileRead(fd, &subxact_data.nsubxacts,
* Cleanup files for a subscription / toplevel transaction.
*
* Remove files with serialized changes and subxact info for a particular
- * toplevel transaction. Each subscription has a separate set of files.
+ * toplevel transaction. Each subscription has a separate set of files
+ * for any toplevel transaction.
*/
static void
stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
- StreamXidHash *ent;
-
- /* Find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
- /* Delete the change file and release the stream fileset memory */
+ /* Delete the changes file. */
changes_filename(path, subid, xid);
- FileSetDeleteAll(ent->stream_fileset);
- pfree(ent->stream_fileset);
- ent->stream_fileset = NULL;
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
- /* Delete the subxact file and release the memory, if it exist */
- if (ent->subxact_fileset)
- {
- subxact_filename(path, subid, xid);
- FileSetDeleteAll(ent->subxact_fileset);
- pfree(ent->subxact_fileset);
- ent->subxact_fileset = NULL;
- }
-
- /* Remove the xid entry from the stream xid hash */
- hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
+ /* Delete the subxact file, if it exists. */
+ subxact_filename(path, subid, xid);
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
}
/*
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the fileset and create the buffile,
- * otherwise open the previously created file.
+ * changes for this transaction, create the buffile, otherwise open the
+ * previously created file.
*
* This can only be called at the beginning of a "streaming" block, i.e.
* between stream_start/stream_stop messages from the upstream.
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
{
char path[MAXPGPATH];
- bool found;
MemoryContext oldcxt;
- StreamXidHash *ent;
Assert(in_streamed_transaction);
Assert(OidIsValid(subid));
Assert(TransactionIdIsValid(xid));
Assert(stream_fd == NULL);
- /* create or find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_ENTER,
- &found);
changes_filename(path, subid, xid);
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
/*
- * If this is the first streamed segment, the file must not exist, so make
- * sure we're the ones creating it. Otherwise just open the file for
- * writing, in append mode.
+ * If this is the first streamed segment, create the changes file.
+ * Otherwise, just open the file for writing, in append mode.
*/
if (first_segment)
- {
- MemoryContext savectx;
- FileSet *fileset;
-
- if (found)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
- /*
- * We need to maintain fileset across multiple stream start/stop
- * calls. So, need to allocate it in a persistent context.
- */
- savectx = MemoryContextSwitchTo(ApplyContext);
- fileset = palloc(sizeof(FileSet));
-
- FileSetInit(fileset);
- MemoryContextSwitchTo(savectx);
-
- stream_fd = BufFileCreateFileSet(fileset, path);
-
- /* Remember the fileset for the next stream of the same transaction */
- ent->xid = xid;
- ent->stream_fileset = fileset;
- ent->subxact_fileset = NULL;
- }
+ stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
+ path);
else
{
- if (!found)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
/*
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
- stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+ stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
+ path, O_RDWR, false);
BufFileSeek(stream_fd, 0, 0, SEEK_END);
}