COMMIT PREPARED 'test_prepared#1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+ data
+-----------------------------------
COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
-- Test that rollback of a prepared xact is decoded.
BEGIN;
COMMIT PREPARED 'test_prepared#3';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+ data
+-----------------------------------
COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+ data
+--------------------------------------
COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
-- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly.
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+ data
+-------------------------------------------
COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN;
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
- data
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+ data
+-------------------------
COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
lsn | xid | data
-----------+-----+--------------------------------------------
- 0/1689DC0 | 529 | BEGIN 529
- 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
- 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
(4 row)
gid field, which is part of the
txn parameter, can be used in this callback to
check if the plugin has already received this PREPARE
- in which case it can skip the remaining changes of the transaction.
- This can only happen if the user restarts the decoding after receiving
- the PREPARE for a transaction but before receiving
- the COMMIT PREPARED, say because of some error.
+ in which case it can either error out or skip the remaining changes of
+ the transaction.
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
abort_time, origin_id, origin_lsn,
+ InvalidXLogRecPtr,
parsed->twophase_gid, false);
}
else
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot);
+ need_full_snapshot, slot->data.initial_consistent_point);
ctx->reorder->private_data = ctx;
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
/*
* It is possible that this transaction is not decoded at prepare time
* either because by that time we didn't have a consistent snapshot or it
- * was decoded earlier but we have restarted. We can't distinguish between
- * those two cases so we send the prepare in both the cases and let
- * downstream decide whether to process or skip it. We don't need to
- * decode the xact for aborts if it is not done already.
+ * was decoded earlier but we have restarted. We only need to send the
+ * prepare if it was not decoded earlier. We don't need to decode the xact
+ * for aborts if it is not done already.
*/
- if (!rbtxn_prepared(txn) && is_commit)
+ if ((txn->final_lsn < initial_consistent_point) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
*/
XLogRecPtr start_decoding_at;
+ /*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ *
+ * The prepared transactions that are not covered by initial snapshot
+ * needs to be sent later along with commit prepared and they must be
+ * before this point.
+ */
+ XLogRecPtr initial_consistent_point;
+
/*
* Don't start decoding WAL until the "xl_running_xacts" information
* indicates there are no running xids with an xid smaller than this.
AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
- bool need_full_snapshot)
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point)
{
MemoryContext context;
MemoryContext oldcontext;
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
+ builder->initial_consistent_point = initial_consistent_point;
MemoryContextSwitchTo(oldcontext);
return builder->state;
}
+/*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildInitialConsistentPoint(SnapBuild *builder)
+{
+ return builder->initial_consistent_point;
+}
+
/*
* Should the contents of transaction ending at 'ptr' be decoded?
*/
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 3
+#define SNAPBUILD_VERSION 4
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
*/
XLogRecPtr confirmed_flush;
+ /*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ */
+ XLogRecPtr initial_consistent_point;
+
/* plugin name */
NameData plugin;
} ReplicationSlotPersistentData;
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
TransactionId xmin_horizon, XLogRecPtr start_lsn,
- bool need_full_snapshot);
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point);
extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,