bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
- bool xact_wrote_changes;
bool only_local;
} TestDecodingData;
+/*
+ * Maintain the per-transaction level variables to track whether the
+ * transaction and or streams have written any changes. In streaming mode the
+ * transaction can be decoded in streams so along with maintaining whether the
+ * transaction has written any changes, we also need to track whether the
+ * current stream has written any changes. This is required so that if user
+ * has requested to skip the empty transactions we can skip the empty streams
+ * even though the transaction has written some changes.
+ */
+typedef struct
+{
+ bool xact_wrote_changes;
+ bool stream_wrote_changes;
+} TestDecodingTxnData;
+
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init);
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
- data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
return;
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
Relation relation, ReorderBufferChange *change)
{
TestDecodingData *data;
+ TestDecodingTxnData *txndata;
Form_pg_class class_form;
TupleDesc tupdesc;
MemoryContext old;
data = ctx->output_plugin_private;
+ txndata = txn->output_plugin_private;
/* output BEGIN if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = true;
class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);
int nrelations, Relation relations[], ReorderBufferChange *change)
{
TestDecodingData *data;
+ TestDecodingTxnData *txndata;
MemoryContext old;
int i;
data = ctx->output_plugin_private;
+ txndata = txn->output_plugin_private;
/* output BEGIN if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = true;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- data->xact_wrote_changes = false;
+ /*
+ * Allocate the txn plugin data for the first stream in the transaction.
+ */
+ if (txndata == NULL)
+ {
+ txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
+ }
+
+ txndata->stream_wrote_changes = false;
if (data->skip_empty_xacts)
return;
pg_output_stream_start(ctx, data, txn, true);
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
{
TestDecodingData *data = ctx->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ /*
+ * stream abort can be sent for an individual subtransaction but we
+ * maintain the output_plugin_private only under the toptxn so if this is
+ * not the toptxn then fetch the toptxn.
+ */
+ ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+ TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ if (txn->toptxn == NULL)
+ {
+ Assert(txn->output_plugin_private != NULL);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+ }
+
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
/* output stream start if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)