Fix 'skip-empty-xacts' option in test_decoding for streaming mode.
authorAmit Kapila
Tue, 17 Nov 2020 06:44:53 +0000 (12:14 +0530)
committerAmit Kapila
Tue, 17 Nov 2020 06:44:53 +0000 (12:14 +0530)
In streaming mode, the transaction can be decoded in multiple streams and
those streams can be interleaved with streams of other transactions. So,
we can't remember the transaction's write status in the logical decoding
context because that might get changed due to some other transactions and
lead to wrong answers for 'skip-empty-xacts' option. We decided to keep
each transaction's write status in the ReorderBufferTxn to avoid
interleaved streams changing the status of some unrelated transactions.

Diagnosed-by: Amit Kapila
Author: Dilip Kumar
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1LR7=XNM_TLmpZMFuV8ZQpoxkem--NZJYf8YXmesbvwLA@mail.gmail.com

contrib/test_decoding/expected/concurrent_stream.out
contrib/test_decoding/specs/concurrent_stream.spec
contrib/test_decoding/test_decoding.c
src/backend/replication/logical/reorderbuffer.c
src/include/replication/reorderbuffer.h
src/tools/pgindent/typedefs.list

index e731d13d8fa974fe04aa05848cf61bc4dbbf5401..6f8b2176db50e32294b54b4e832cfd640bd37757 100644 (file)
@@ -1,11 +1,12 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
-starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
 step s0_begin: BEGIN;
 step s0_ddl: CREATE TABLE stream_test1(data text);
 step s1_ddl: CREATE TABLE stream_test(data text);
 step s1_begin: BEGIN;
 step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s2_ddl: CREATE TABLE stream_test2(data text);
 step s1_commit: COMMIT;
 step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 data           
index ad9fde9c284470af317930b7fe2e77ee73c20356..54218a4b3f6551b2048a333935597317d09a754d 100644 (file)
@@ -23,9 +23,15 @@ setup { SET synchronous_commit=on; }
 step "s0_begin" { BEGIN; }
 step "s0_ddl"   {CREATE TABLE stream_test1(data text);}
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl"   {CREATE TABLE stream_test2(data text);}
+
 # The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
 # the currently running s0_ddl and we want to test that s0_ddl should not get
-# streamed when user asked to skip-empty-xacts.
+# streamed when user asked to skip-empty-xacts. Similarly, the
+# INTERNAL_SNAPSHOT change added by s2_ddl should not change the results for
+# what gets streamed.
 session "s1"
 setup { SET synchronous_commit=on; }
 step "s1_ddl"   { CREATE TABLE stream_test(data text); }
@@ -34,4 +40,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
 step "s1_commit" { COMMIT; }
 step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
 
-permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
index 8e33614f14424a101611baea09305f879a6f077c..e12278beb581702d31845e6d788a3c941f032c21 100644 (file)
@@ -34,10 +34,24 @@ typedef struct
    bool        include_xids;
    bool        include_timestamp;
    bool        skip_empty_xacts;
-   bool        xact_wrote_changes;
    bool        only_local;
 } TestDecodingData;
 
+/*
+ * Maintain the per-transaction level variables to track whether the
+ * transaction and or streams have written any changes. In streaming mode the
+ * transaction can be decoded in streams so along with maintaining whether the
+ * transaction has written any changes, we also need to track whether the
+ * current stream has written any changes. This is required so that if user
+ * has requested to skip the empty transactions we can skip the empty streams
+ * even though the transaction has written some changes.
+ */
+typedef struct
+{
+   bool        xact_wrote_changes;
+   bool        stream_wrote_changes;
+} TestDecodingTxnData;
+
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                              bool is_init);
 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +269,12 @@ static void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata =
+   MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+   txndata->xact_wrote_changes = false;
+   txn->output_plugin_private = txndata;
 
-   data->xact_wrote_changes = false;
    if (data->skip_empty_xacts)
        return;
 
@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                     XLogRecPtr commit_lsn)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
+   bool        xact_wrote_changes = txndata->xact_wrote_changes;
+
+   pfree(txndata);
+   txn->output_plugin_private = NULL;
 
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   if (data->skip_empty_xacts && !xact_wrote_changes)
        return;
 
    OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                 Relation relation, ReorderBufferChange *change)
 {
    TestDecodingData *data;
+   TestDecodingTxnData *txndata;
    Form_pg_class class_form;
    TupleDesc   tupdesc;
    MemoryContext old;
 
    data = ctx->output_plugin_private;
+   txndata = txn->output_plugin_private;
 
    /* output BEGIN if we haven't yet */
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
    {
        pg_output_begin(ctx, data, txn, false);
    }
-   data->xact_wrote_changes = true;
+   txndata->xact_wrote_changes = true;
 
    class_form = RelationGetForm(relation);
    tupdesc = RelationGetDescr(relation);
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
    TestDecodingData *data;
+   TestDecodingTxnData *txndata;
    MemoryContext old;
    int         i;
 
    data = ctx->output_plugin_private;
+   txndata = txn->output_plugin_private;
 
    /* output BEGIN if we haven't yet */
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
    {
        pg_output_begin(ctx, data, txn, false);
    }
-   data->xact_wrote_changes = true;
+   txndata->xact_wrote_changes = true;
 
    /* Avoid leaking memory by using and resetting our own context */
    old = MemoryContextSwitchTo(data->context);
@@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
                       ReorderBufferTXN *txn)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-   data->xact_wrote_changes = false;
+   /*
+    * Allocate the txn plugin data for the first stream in the transaction.
+    */
+   if (txndata == NULL)
+   {
+       txndata =
+           MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+       txndata->xact_wrote_changes = false;
+       txn->output_plugin_private = txndata;
+   }
+
+   txndata->stream_wrote_changes = false;
    if (data->skip_empty_xacts)
        return;
    pg_output_stream_start(ctx, data, txn, true);
@@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
                      ReorderBufferTXN *txn)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
        return;
 
    OutputPluginPrepareWrite(ctx, true);
@@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 {
    TestDecodingData *data = ctx->output_plugin_private;
 
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   /*
+    * stream abort can be sent for an individual subtransaction but we
+    * maintain the output_plugin_private only under the toptxn so if this is
+    * not the toptxn then fetch the toptxn.
+    */
+   ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+   TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+   bool        xact_wrote_changes = txndata->xact_wrote_changes;
+
+   if (txn->toptxn == NULL)
+   {
+       Assert(txn->output_plugin_private != NULL);
+       pfree(txndata);
+       txn->output_plugin_private = NULL;
+   }
+
+   if (data->skip_empty_xacts && !xact_wrote_changes)
        return;
 
    OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
                        XLogRecPtr commit_lsn)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
+   bool        xact_wrote_changes = txndata->xact_wrote_changes;
+
+   pfree(txndata);
+   txn->output_plugin_private = NULL;
 
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   if (data->skip_empty_xacts && !xact_wrote_changes)
        return;
 
    OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
                        ReorderBufferChange *change)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
 
    /* output stream start if we haven't yet */
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
    {
        pg_output_stream_start(ctx, data, txn, false);
    }
-   data->xact_wrote_changes = true;
+   txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
    OutputPluginPrepareWrite(ctx, true);
    if (data->include_xids)
@@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                          ReorderBufferChange *change)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-   if (data->skip_empty_xacts && !data->xact_wrote_changes)
+   if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
    {
        pg_output_stream_start(ctx, data, txn, false);
    }
-   data->xact_wrote_changes = true;
+   txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
    OutputPluginPrepareWrite(ctx, true);
    if (data->include_xids)
index c1bd68011c5935cfbd68a8ba50829a8c67c785e1..301baff24466766a7cf072c704aad6e907433c25 100644 (file)
@@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 
    /* InvalidCommandId is not zero, so set it explicitly */
    txn->command_id = InvalidCommandId;
+   txn->output_plugin_private = NULL;
 
    return txn;
 }
index dfdda938b2a9c7a451febbc83b895e49bbb0868b..bd9dd7ec6764f34ce8ba6e3a796764222d60f68c 100644 (file)
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
 
    /* If we have detected concurrent abort then ignore future changes. */
    bool        concurrent_abort;
+
+   /*
+    * Private data pointer of the output plugin.
+    */
+   void       *output_plugin_private;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
index b146b3ea73dde6a1f01795ca8cfef62f5240c36f..fde701bfd4d16059dc070f0e3809f89b515f7c49 100644 (file)
@@ -2505,6 +2505,7 @@ Tcl_Obj
 Tcl_Time
 TempNamespaceStatus
 TestDecodingData
+TestDecodingTxnData
 TestSpec
 TextFreq
 TextPositionState