Extend the output plugin API to allow decoding of prepared xacts.
authorAmit Kapila
Wed, 30 Dec 2020 10:47:26 +0000 (16:17 +0530)
committerAmit Kapila
Wed, 30 Dec 2020 10:47:26 +0000 (16:17 +0530)
This adds six methods to the output plugin API, adding support for
streaming changes of two-phase transactions at prepare time.

* begin_prepare
* filter_prepare
* prepare
* commit_prepared
* rollback_prepared
* stream_prepare

Most of this is a simple extension of the existing methods, with the
semantic difference that the transaction is not yet committed and maybe
aborted later.

Until now two-phase transactions were translated into regular transactions
on the subscriber, and the GID was not forwarded to it. None of the
two-phase commands were communicated to the subscriber.

This patch provides the infrastructure for logical decoding plugins to be
informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED
and ROLLBACK PREPARED commands with the corresponding GID.

This also extends the 'test_decoding' plugin, implementing these new
methods.

This commit simply adds these new APIs and the upcoming patch to "allow
the decoding at prepare time in ReorderBuffer" will use these APIs.

Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com

contrib/test_decoding/test_decoding.c
doc/src/sgml/logicaldecoding.sgml
src/backend/replication/logical/logical.c
src/include/replication/logical.h
src/include/replication/output_plugin.h
src/include/replication/reorderbuffer.h
src/tools/pgindent/typedefs.list

index e12278beb581702d31845e6d788a3c941f032c21..05763553a40eecb80ea3858e3b73379911168457 100644 (file)
@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
                              bool transactional, const char *prefix,
                              Size sz, const char *message);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+                                    const char *gid);
+static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
+                                       ReorderBufferTXN *txn);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+                                 ReorderBufferTXN *txn,
+                                 XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+                                         ReorderBufferTXN *txn,
+                                         XLogRecPtr commit_lsn);
+static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+                                           ReorderBufferTXN *txn,
+                                           XLogRecPtr prepare_end_lsn,
+                                           TimestampTz prepare_time);
 static void pg_decode_stream_start(LogicalDecodingContext *ctx,
                                   ReorderBufferTXN *txn);
 static void pg_output_stream_start(LogicalDecodingContext *ctx,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
                                   ReorderBufferTXN *txn,
                                   XLogRecPtr abort_lsn);
+static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+                                    ReorderBufferTXN *txn,
+                                    XLogRecPtr prepare_lsn);
 static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
                                    ReorderBufferTXN *txn,
                                    XLogRecPtr commit_lsn);
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
    cb->filter_by_origin_cb = pg_decode_filter;
    cb->shutdown_cb = pg_decode_shutdown;
    cb->message_cb = pg_decode_message;
+   cb->filter_prepare_cb = pg_decode_filter_prepare;
+   cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
+   cb->prepare_cb = pg_decode_prepare_txn;
+   cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+   cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
    cb->stream_start_cb = pg_decode_stream_start;
    cb->stream_stop_cb = pg_decode_stream_stop;
    cb->stream_abort_cb = pg_decode_stream_abort;
+   cb->stream_prepare_cb = pg_decode_stream_prepare;
    cb->stream_commit_cb = pg_decode_stream_commit;
    cb->stream_change_cb = pg_decode_stream_change;
    cb->stream_message_cb = pg_decode_stream_message;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
    ListCell   *option;
    TestDecodingData *data;
    bool        enable_streaming = false;
+   bool        enable_twophase = false;
 
    data = palloc0(sizeof(TestDecodingData));
    data->context = AllocSetContextCreate(ctx->context,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                         errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                strVal(elem->arg), elem->defname)));
        }
+       else if (strcmp(elem->defname, "two-phase-commit") == 0)
+       {
+           if (elem->arg == NULL)
+               continue;
+           else if (!parse_bool(strVal(elem->arg), &enable_twophase))
+               ereport(ERROR,
+                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                        errmsg("could not parse value \"%s\" for parameter \"%s\"",
+                               strVal(elem->arg), elem->defname)));
+       }
        else
        {
            ereport(ERROR,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
    }
 
    ctx->streaming &= enable_streaming;
+   ctx->twophase &= enable_twophase;
 }
 
 /* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    OutputPluginWrite(ctx, true);
 }
 
+/* BEGIN PREPARE callback */
+static void
+pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+   TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata =
+   MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+   txndata->xact_wrote_changes = false;
+   txn->output_plugin_private = txndata;
+
+   if (data->skip_empty_xacts)
+       return;
+
+   pg_output_begin(ctx, data, txn, true);
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                     XLogRecPtr prepare_lsn)
+{
+   TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+   if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+       return;
+
+   OutputPluginPrepareWrite(ctx, true);
+
+   appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+                    quote_literal_cstr(txn->gid));
+
+   if (data->include_xids)
+       appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+   if (data->include_timestamp)
+       appendStringInfo(ctx->out, " (at %s)",
+                        timestamptz_to_str(txn->commit_time));
+
+   OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                             XLogRecPtr commit_lsn)
+{
+   TestDecodingData *data = ctx->output_plugin_private;
+
+   OutputPluginPrepareWrite(ctx, true);
+
+   appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+                    quote_literal_cstr(txn->gid));
+
+   if (data->include_xids)
+       appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+   if (data->include_timestamp)
+       appendStringInfo(ctx->out, " (at %s)",
+                        timestamptz_to_str(txn->commit_time));
+
+   OutputPluginWrite(ctx, true);
+}
+
+/* ROLLBACK PREPARED callback */
+static void
+pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+                               ReorderBufferTXN *txn,
+                               XLogRecPtr prepare_end_lsn,
+                               TimestampTz prepare_time)
+{
+   TestDecodingData *data = ctx->output_plugin_private;
+
+   OutputPluginPrepareWrite(ctx, true);
+
+   appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+                    quote_literal_cstr(txn->gid));
+
+   if (data->include_xids)
+       appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+   if (data->include_timestamp)
+       appendStringInfo(ctx->out, " (at %s)",
+                        timestamptz_to_str(txn->commit_time));
+
+   OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here we demonstrate a
+ * simple logic by checking the GID. If the GID contains the "_nodecode"
+ * substring, then we filter it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+{
+   if (strstr(gid, "_nodecode") != NULL)
+       return true;
+
+   return false;
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
                 RepOriginId origin_id)
@@ -701,6 +841,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
    OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+                        ReorderBufferTXN *txn,
+                        XLogRecPtr prepare_lsn)
+{
+   TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+   if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+       return;
+
+   OutputPluginPrepareWrite(ctx, true);
+
+   if (data->include_xids)
+       appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
+                        quote_literal_cstr(txn->gid), txn->xid);
+   else
+       appendStringInfo(ctx->out, "preparing streamed transaction %s",
+                        quote_literal_cstr(txn->gid));
+
+   if (data->include_timestamp)
+       appendStringInfo(ctx->out, " (at %s)",
+                        timestamptz_to_str(txn->commit_time));
+
+   OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_commit(LogicalDecodingContext *ctx,
                        ReorderBufferTXN *txn,
index ca78a81e9c545a42e0d375ba55bca8fdc5ee18a2..d63f90ff282b8a67b0b672afbe40e9b68ad68d0d 100644 (file)
@@ -389,9 +389,15 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
+    LogicalDecodeFilterPrepareCB filter_prepare_cb;
+    LogicalDecodeBeginPrepareCB begin_prepare_cb;
+    LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeCommitPreparedCB commit_prepared_cb;
+    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
     LogicalDecodeStreamStartCB stream_start_cb;
     LogicalDecodeStreamStopCB stream_stop_cb;
     LogicalDecodeStreamAbortCB stream_abort_cb;
+    LogicalDecodeStreamPrepareCB stream_prepare_cb;
     LogicalDecodeStreamCommitCB stream_commit_cb;
     LogicalDecodeStreamChangeCB stream_change_cb;
     LogicalDecodeStreamMessageCB stream_message_cb;
@@ -413,10 +419,20 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      An output plugin may also define functions to support streaming of large,
      in-progress transactions. The stream_start_cb,
      stream_stop_cbstream_abort_cb,
-     stream_commit_cb and stream_change_cb
+     stream_commit_cbstream_change_cb,
+     and stream_prepare_cb
      are required, while stream_message_cb and
      stream_truncate_cb are optional.
     
+
+    
+    An output plugin may also define functions to support two-phase commits,
+    which allows actions to be decoded on the PREPARE TRANSACTION.
+    The begin_prepare_cbprepare_cb
+    stream_prepare_cb,
+    commit_prepared_cb and rollback_prepared_cb
+    callbacks are required, while filter_prepare_cb is optional.
+    
    
 
    
@@ -477,7 +493,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      never get
      decoded. Successful savepoints are
      folded into the transaction containing them in the order they were
-     executed within that transaction.
+     executed within that transaction. A transaction that is prepared for
+     a two-phase commit using PREPARE TRANSACTION will
+     also be decoded if the output plugin callbacks needed for decoding
+     them are provided. It is possible that the current transaction which
+     is being decoded is aborted concurrently via a ROLLBACK PREPARED
+     command. In that case, the logical decoding of this transaction will
+     be aborted too. We will skip all the changes of such a transaction once
+     the abort is detected and abort the transaction when we read WAL for
+     ROLLBACK PREPARED.
     
 
     
@@ -587,7 +611,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
       an INSERTUPDATE,
       or DELETE. Even if the original command modified
       several rows at once the callback will be called individually for each
-      row.
+      row. The change_cb callback may access system or
+      user catalog tables to aid in the process of outputting the row
+      modification details. In case of decoding a prepared (but yet
+      uncommitted) transaction or decoding of an uncommitted transaction, this
+      change callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
 
 typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
@@ -685,7 +715,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
       non-transactional and the XID was not assigned yet in the transaction
       which logged the message. The lsn has WAL
       location of the message. The transactional says
-      if the message was sent as transactional or not.
+      if the message was sent as transactional or not. Similar to the change
+      callback, in case of decoding a prepared (but yet uncommitted)
+      transaction or decoding of an uncommitted transaction, this message
+      callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
+
       The prefix is arbitrary null-terminated prefix
       which can be used for identifying interesting messages for the current
       plugin. And finally the message parameter holds
@@ -698,6 +734,111 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
      
     
 
+    
+     Prepare Filter Callback
+
+     
+       The optional filter_prepare_cb callback
+       is called to determine whether data that is part of the current
+       two-phase commit transaction should be considered for decode
+       at this prepare stage or as a regular one-phase transaction at
+       COMMIT PREPARED time later. To signal that
+       decoding should be skipped, return true;
+       false otherwise. When the callback is not
+       defined, false is assumed (i.e. nothing is
+       filtered).
+
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              const char *gid);
+
+      The ctx parameter has the same contents as for the
+      other callbacks. The gid is the identifier that later
+      identifies this transaction for COMMIT PREPARED or
+      ROLLBACK PREPARED.
+     
+     
+      The callback has to provide the same static answer for a given
+      gid every time it is called.
+     
+     
+
+    
+     Transaction Begin Prepare Callback
+
+     
+      The required begin_prepare_cb callback is called
+      whenever the start of a prepared transaction has been decoded. The
+      gid field, which is part of the
+      txn parameter can be used in this callback to
+      check if the plugin has already received this prepare in which case it
+      can skip the remaining changes of the transaction. This can only happen
+      if the user restarts the decoding after receiving the prepare for a
+      transaction but before receiving the commit prepared say because of some
+      error.
+      
+       typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+                                                    ReorderBufferTXN *txn);
+      
+     
+    
+
+    
+     Transaction Prepare Callback
+
+     
+      The required prepare_cb callback is called whenever
+      a transaction which is prepared for two-phase commit has been
+      decoded. The change_cb callback for all modified
+      rows will have been called before this, if there have been any modified
+      rows. The gid field, which is part of the
+      txn parameter can be used in this callback.
+      
+       typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr prepare_lsn);
+      
+     
+    
+
+    
+     Transaction Commit Prepared Callback
+
+     
+      The required commit_prepared_cb callback is called
+      whenever a transaction commit prepared has been decoded. The
+      gid field, which is part of the
+      txn parameter can be used in this callback.
+      
+       typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                      ReorderBufferTXN *txn,
+                                                      XLogRecPtr commit_lsn);
+      
+     
+    
+
+    
+     Transaction Rollback Prepared Callback
+
+     
+      The required rollback_prepared_cb callback is called
+      whenever a transaction rollback prepared has been decoded. The
+      gid field, which is part of the
+      txn parameter can be used in this callback. The
+      parameters prepare_end_lsn and
+      prepare_time can be used to check if the plugin
+      has received this prepare transaction in which case it can apply the
+      rollback, otherwise, it can skip the rollback operation. The
+      gid alone is not sufficient because the downstream
+      node can have prepared transaction with same identifier.
+      
+       typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                        ReorderBufferTXN *txn,
+                                                        XLogRecPtr preapre_end_lsn,
+                                                        TimestampTz prepare_time);
+      
+     
+    
+
     
      Stream Start Callback
      
@@ -735,6 +876,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
      
     
 
+    
+     Stream Prepare Callback
+     
+      The stream_prepare_cb callback is called to prepare
+      a previously streamed transaction as part of a two-phase commit.
+
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr prepare_lsn);
+
+     
+    
+
     
      Stream Commit Callback
      
@@ -913,9 +1067,13 @@ OutputPluginWrite(ctx, true);
     When streaming an in-progress transaction, the changes (and messages) are
     streamed in blocks demarcated by stream_start_cb
     and stream_stop_cb callbacks. Once all the decoded
-    changes are transmitted, the transaction is committed using the
-    stream_commit_cb callback (or possibly aborted using
-    the stream_abort_cb callback).
+    changes are transmitted, the transaction can be committed using the
+    the stream_commit_cb callback
+    (or possibly aborted using the stream_abort_cb callback).
+    If two-phase commits are supported, the transaction can be prepared using the
+    stream_prepare_cb callback, commit prepared using the
+    commit_prepared_cb callback or aborted using the
+    rollback_prepared_cb.
    
 
    
index f1f4df7d70f286ad4d4a06efda0a82fbd133a7a8..6e3de92a67c54f0c460d46b2134eeaced171594a 100644 (file)
@@ -59,6 +59,13 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                              XLogRecPtr commit_lsn);
+static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                              XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                      XLogRecPtr commit_lsn);
+static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                        XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                              Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -74,6 +81,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                   XLogRecPtr last_lsn);
 static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                    XLogRecPtr abort_lsn);
+static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                     XLogRecPtr prepare_lsn);
 static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                     XLogRecPtr commit_lsn);
 static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -237,11 +246,37 @@ StartupDecodingContext(List *output_plugin_options,
    ctx->reorder->stream_start = stream_start_cb_wrapper;
    ctx->reorder->stream_stop = stream_stop_cb_wrapper;
    ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+   ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
    ctx->reorder->stream_commit = stream_commit_cb_wrapper;
    ctx->reorder->stream_change = stream_change_cb_wrapper;
    ctx->reorder->stream_message = stream_message_cb_wrapper;
    ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
 
+
+   /*
+    * To support two-phase logical decoding, we require
+    * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
+    * filter_prepare callback is optional. We however enable two-phase
+    * logical decoding when at least one of the methods is enabled so that we
+    * can easily identify missing methods.
+    *
+    * We decide it here, but only check it later in the wrappers.
+    */
+   ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
+       (ctx->callbacks.prepare_cb != NULL) ||
+       (ctx->callbacks.commit_prepared_cb != NULL) ||
+       (ctx->callbacks.rollback_prepared_cb != NULL) ||
+       (ctx->callbacks.stream_prepare_cb != NULL) ||
+       (ctx->callbacks.filter_prepare_cb != NULL);
+
+   /*
+    * Callback to support decoding at prepare time.
+    */
+   ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
+   ctx->reorder->prepare = prepare_cb_wrapper;
+   ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+   ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+
    ctx->out = makeStringInfo();
    ctx->prepare_write = prepare_write;
    ctx->write = do_write;
@@ -782,6 +817,186 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    error_context_stack = errcallback.previous;
 }
 
+/*
+ * The functionality of begin_prepare is quite similar to begin with the
+ * exception that this will have gid (global transaction id) information which
+ * can be used by plugin. Now, we thought about extending the existing begin
+ * but that would break the replication protocol and additionally this looks
+ * cleaner.
+ */
+static void
+begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+   LogicalDecodingContext *ctx = cache->private_data;
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;
+
+   Assert(!ctx->fast_forward);
+
+   /* We're only supposed to call this when two-phase commits are supported */
+   Assert(ctx->twophase);
+
+   /* Push callback + info on the error context stack */
+   state.ctx = ctx;
+   state.callback_name = "begin_prepare";
+   state.report_location = txn->first_lsn;
+   errcallback.callback = output_plugin_error_callback;
+   errcallback.arg = (void *) &state;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
+   /* set output state */
+   ctx->accept_writes = true;
+   ctx->write_xid = txn->xid;
+   ctx->write_location = txn->first_lsn;
+
+   /*
+    * If the plugin supports two-phase commits then begin prepare callback is
+    * mandatory
+    */
+   if (ctx->callbacks.begin_prepare_cb == NULL)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("logical replication at prepare time requires begin_prepare_cb callback")));
+
+   /* do the actual work: call callback */
+   ctx->callbacks.begin_prepare_cb(ctx, txn);
+
+   /* Pop the error context stack */
+   error_context_stack = errcallback.previous;
+}
+
+static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                  XLogRecPtr prepare_lsn)
+{
+   LogicalDecodingContext *ctx = cache->private_data;
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;
+
+   Assert(!ctx->fast_forward);
+
+   /* We're only supposed to call this when two-phase commits are supported */
+   Assert(ctx->twophase);
+
+   /* Push callback + info on the error context stack */
+   state.ctx = ctx;
+   state.callback_name = "prepare";
+   state.report_location = txn->final_lsn; /* beginning of prepare record */
+   errcallback.callback = output_plugin_error_callback;
+   errcallback.arg = (void *) &state;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
+   /* set output state */
+   ctx->accept_writes = true;
+   ctx->write_xid = txn->xid;
+   ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+   /*
+    * If the plugin supports two-phase commits then prepare callback is
+    * mandatory
+    */
+   if (ctx->callbacks.prepare_cb == NULL)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("logical replication at prepare time requires prepare_cb callback")));
+
+   /* do the actual work: call callback */
+   ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+   /* Pop the error context stack */
+   error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                          XLogRecPtr commit_lsn)
+{
+   LogicalDecodingContext *ctx = cache->private_data;
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;
+
+   Assert(!ctx->fast_forward);
+
+   /* We're only supposed to call this when two-phase commits are supported */
+   Assert(ctx->twophase);
+
+   /* Push callback + info on the error context stack */
+   state.ctx = ctx;
+   state.callback_name = "commit_prepared";
+   state.report_location = txn->final_lsn; /* beginning of commit record */
+   errcallback.callback = output_plugin_error_callback;
+   errcallback.arg = (void *) &state;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
+   /* set output state */
+   ctx->accept_writes = true;
+   ctx->write_xid = txn->xid;
+   ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+   /*
+    * If the plugin support two-phase commits then commit prepared callback
+    * is mandatory
+    */
+   if (ctx->callbacks.commit_prepared_cb == NULL)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("logical replication at prepare time requires commit_prepared_cb callback")));
+
+   /* do the actual work: call callback */
+   ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+   /* Pop the error context stack */
+   error_context_stack = errcallback.previous;
+}
+
+static void
+rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                            XLogRecPtr prepare_end_lsn,
+                            TimestampTz prepare_time)
+{
+   LogicalDecodingContext *ctx = cache->private_data;
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;
+
+   Assert(!ctx->fast_forward);
+
+   /* We're only supposed to call this when two-phase commits are supported */
+   Assert(ctx->twophase);
+
+   /* Push callback + info on the error context stack */
+   state.ctx = ctx;
+   state.callback_name = "rollback_prepared";
+   state.report_location = txn->final_lsn; /* beginning of commit record */
+   errcallback.callback = output_plugin_error_callback;
+   errcallback.arg = (void *) &state;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
+   /* set output state */
+   ctx->accept_writes = true;
+   ctx->write_xid = txn->xid;
+   ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+   /*
+    * If the plugin support two-phase commits then rollback prepared callback
+    * is mandatory
+    */
+   if (ctx->callbacks.rollback_prepared_cb == NULL)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("logical replication at prepare time requires rollback_prepared_cb callback")));
+
+   /* do the actual work: call callback */
+   ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
+                                       prepare_time);
+
+   /* Pop the error context stack */
+   error_context_stack = errcallback.previous;
+}
+
 static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                  Relation relation, ReorderBufferChange *change)
@@ -859,6 +1074,45 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    error_context_stack = errcallback.previous;
 }
 
+bool
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+{
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;
+   bool        ret;
+
+   Assert(!ctx->fast_forward);
+
+   /*
+    * Skip if decoding of two-phase transactions at PREPARE time is not
+    * enabled. In that case, all two-phase transactions are considered
+    * filtered out and will be applied as regular transactions at COMMIT
+    * PREPARED.
+    */
+   if (!ctx->twophase)
+       return true;
+
+   /* Push callback + info on the error context stack */
+   state.ctx = ctx;
+   state.callback_name = "filter_prepare";
+   state.report_location = InvalidXLogRecPtr;
+   errcallback.callback = output_plugin_error_callback;
+   errcallback.arg = (void *) &state;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
+   /* set output state */
+   ctx->accept_writes = false;
+
+   /* do the actual work: call callback */
+   ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+
+   /* Pop the error context stack */
+   error_context_stack = errcallback.previous;
+
+   return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1056,6 +1310,49 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    error_context_stack = errcallback.previous;
 }
 
+static void
+stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                         XLogRecPtr prepare_lsn)
+{
+   LogicalDecodingContext *ctx = cache->private_data;
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;
+
+   Assert(!ctx->fast_forward);
+
+   /*
+    * We're only supposed to call this when streaming and two-phase commits
+    * are supported.
+    */
+   Assert(ctx->streaming);
+   Assert(ctx->twophase);
+
+   /* Push callback + info on the error context stack */
+   state.ctx = ctx;
+   state.callback_name = "stream_prepare";
+   state.report_location = txn->final_lsn;
+   errcallback.callback = output_plugin_error_callback;
+   errcallback.arg = (void *) &state;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
+   /* set output state */
+   ctx->accept_writes = true;
+   ctx->write_xid = txn->xid;
+   ctx->write_location = txn->end_lsn;
+
+   /* in streaming mode with two-phase commits, stream_prepare_cb is required */
+   if (ctx->callbacks.stream_prepare_cb == NULL)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("logical streaming at prepare time requires a stream_prepare_cb callback")));
+
+   ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
+
+   /* Pop the error context stack */
+   error_context_stack = errcallback.previous;
+}
+
 static void
 stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                         XLogRecPtr commit_lsn)
index 40bab7ee02df48ed4d203af1b61f1f23c10fbf68..28c9c1f474eb3cf03e5df07eba47dea359955d22 100644 (file)
@@ -84,6 +84,11 @@ typedef struct LogicalDecodingContext
     */
    bool        streaming;
 
+   /*
+    * Does the output plugin support two-phase decoding, and is it enabled?
+    */
+   bool        twophase;
+
    /*
     * State for writing output.
     */
@@ -120,6 +125,7 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
                                                  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
index b78c796450a18330eda89a1bde95396670ebb9b4..89e1dc3517d66b134b3c2cf3b6680c9627fa7b9c 100644 (file)
@@ -99,6 +99,45 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
 
+/*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare and
+ * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                             const char *gid);
+
+/*
+ * Callback called for every BEGIN of a prepared trnsaction.
+ */
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+                                            ReorderBufferTXN *txn);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                       ReorderBufferTXN *txn,
+                                       XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                ReorderBufferTXN *txn,
+                                                XLogRecPtr prepare_end_lsn,
+                                                TimestampTz prepare_time);
+
+
 /*
  * Called when starting to stream a block of changes from in-progress
  * transaction (may be called repeatedly, if it's streamed in multiple
@@ -123,6 +162,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);
 
+/*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+                                             ReorderBufferTXN *txn,
+                                             XLogRecPtr prepare_lsn);
+
 /*
  * Called to apply changes streamed to remote node from in-progress
  * transaction.
@@ -173,10 +220,19 @@ typedef struct OutputPluginCallbacks
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
+
+   /* streaming of changes at prepare time */
+   LogicalDecodeFilterPrepareCB filter_prepare_cb;
+   LogicalDecodeBeginPrepareCB begin_prepare_cb;
+   LogicalDecodePrepareCB prepare_cb;
+   LogicalDecodeCommitPreparedCB commit_prepared_cb;
+   LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+
    /* streaming of changes */
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
+   LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
index bd9dd7ec6764f34ce8ba6e3a796764222d60f68c..1e60afe70f4a07aa268579e27aea60045190ae19 100644 (file)
@@ -244,6 +244,12 @@ typedef struct ReorderBufferTXN
    /* Xid of top-level transaction, if known */
    TransactionId toplevel_xid;
 
+   /*
+    * Global transaction id required for identification of prepared
+    * transactions.
+    */
+   char       *gid;
+
    /*
     * LSN of the first data carrying, WAL record with knowledge about this
     * xid. This is allowed to *not* be first record adorned with this xid, if
@@ -418,6 +424,26 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
                                        const char *prefix, Size sz,
                                        const char *message);
 
+/* begin prepare callback signature */
+typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
+                                            ReorderBufferTXN *txn);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
+                                       ReorderBufferTXN *txn,
+                                       XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr commit_lsn);
+
+/* rollback  prepared callback signature */
+typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
+                                                ReorderBufferTXN *txn,
+                                                XLogRecPtr prepare_end_lsn,
+                                                TimestampTz prepare_time);
+
 /* start streaming transaction callback signature */
 typedef void (*ReorderBufferStreamStartCB) (
                                            ReorderBuffer *rb,
@@ -436,6 +462,12 @@ typedef void (*ReorderBufferStreamAbortCB) (
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);
 
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+                                             ReorderBuffer *rb,
+                                             ReorderBufferTXN *txn,
+                                             XLogRecPtr prepare_lsn);
+
 /* commit streamed transaction callback signature */
 typedef void (*ReorderBufferStreamCommitCB) (
                                             ReorderBuffer *rb,
@@ -504,12 +536,21 @@ struct ReorderBuffer
    ReorderBufferCommitCB commit;
    ReorderBufferMessageCB message;
 
+   /*
+    * Callbacks to be called when streaming a transaction at prepare time.
+    */
+   ReorderBufferBeginCB begin_prepare;
+   ReorderBufferPrepareCB prepare;
+   ReorderBufferCommitPreparedCB commit_prepared;
+   ReorderBufferRollbackPreparedCB rollback_prepared;
+
    /*
     * Callbacks to be called when streaming a transaction.
     */
    ReorderBufferStreamStartCB stream_start;
    ReorderBufferStreamStopCB stream_stop;
    ReorderBufferStreamAbortCB stream_abort;
+   ReorderBufferStreamPrepareCB stream_prepare;
    ReorderBufferStreamCommitCB stream_commit;
    ReorderBufferStreamChangeCB stream_change;
    ReorderBufferStreamMessageCB stream_message;
index bca37c536eef1d88db88bd69d25116df284370db..9cd047ba25ea688fa41df6d266a23783f9642b20 100644 (file)
@@ -1315,9 +1315,21 @@ LogStmtLevel
 LogicalDecodeBeginCB
 LogicalDecodeChangeCB
 LogicalDecodeCommitCB
+LogicalDecodeFilterPrepareCB
+LogicalDecodeBeginPrepareCB
+LogicalDecodePrepareCB
+LogicalDecodeCommitPreparedCB
+LogicalDecodeRollbackPreparedCB
 LogicalDecodeFilterByOriginCB
 LogicalDecodeMessageCB
 LogicalDecodeShutdownCB
+LogicalDecodeStreamStartCB
+LogicalDecodeStreamStopCB
+LogicalDecodeStreamAbortCB
+LogicalDecodeStreamPrepareCB
+LogicalDecodeStreamCommitCB
+LogicalDecodeStreamChangeCB
+LogicalDecodeStreamMessageCB
 LogicalDecodeStartupCB
 LogicalDecodeTruncateCB
 LogicalDecodingContext