Lag tracking for logical replication
authorSimon Riggs
Fri, 12 May 2017 09:50:56 +0000 (10:50 +0100)
committerSimon Riggs
Fri, 12 May 2017 09:50:56 +0000 (10:50 +0100)
Lag tracking is called for each commit, but we introduce
a pacing delay to ensure we don't swamp the lag tracker.

Author: Petr Jelinek, with minor pacing delay code from me

src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/replication/logical.h
src/include/replication/output_plugin.h

index ab963c53456a7fe716bfa3da9e93f7ce47476494..7409e5ce3de759d2acead457824c4f1af49330ea 100644 (file)
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
                       bool need_full_snapshot,
                       XLogPageReadCB read_page,
                       LogicalOutputPluginWriterPrepareWrite prepare_write,
-                      LogicalOutputPluginWriterWrite do_write)
+                      LogicalOutputPluginWriterWrite do_write,
+                      LogicalOutputPluginWriterUpdateProgress update_progress)
 {
    ReplicationSlot *slot;
    MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
    ctx->out = makeStringInfo();
    ctx->prepare_write = prepare_write;
    ctx->write = do_write;
+   ctx->update_progress = update_progress;
 
    ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *     perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *     callbacks that have to be filled to perform the use-case dependent,
+ *     actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
                          bool need_full_snapshot,
                          XLogPageReadCB read_page,
                          LogicalOutputPluginWriterPrepareWrite prepare_write,
-                         LogicalOutputPluginWriterWrite do_write)
+                         LogicalOutputPluginWriterWrite do_write,
+                         LogicalOutputPluginWriterUpdateProgress update_progress)
 {
    TransactionId xmin_horizon = InvalidTransactionId;
    ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
 
    ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
                                 need_full_snapshot, read_page, prepare_write,
-                                do_write);
+                                do_write, update_progress);
 
    /* call output plugin initialization callback */
    old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *     contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *     callbacks that have to be filled to perform the use-case dependent,
  *     actual work.
  *
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                      List *output_plugin_options,
                      XLogPageReadCB read_page,
                      LogicalOutputPluginWriterPrepareWrite prepare_write,
-                     LogicalOutputPluginWriterWrite do_write)
+                     LogicalOutputPluginWriterWrite do_write,
+                     LogicalOutputPluginWriterUpdateProgress update_progress)
 {
    LogicalDecodingContext *ctx;
    ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
    ctx = StartupDecodingContext(output_plugin_options,
                                 start_lsn, InvalidTransactionId, false,
-                                read_page, prepare_write, do_write);
+                                read_page, prepare_write, do_write,
+                                update_progress);
 
    /* call output plugin initialization callback */
    old_context = MemoryContextSwitchTo(ctx->context);
@@ -503,6 +509,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
    ctx->prepared_write = false;
 }
 
+/*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+   if (!ctx->update_progress)
+       return;
+
+   ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
 /*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
index c251b92f57bcbc248a9afde17b47328a03c55978..27164de093dd8de2874cb16edf7b4ca9dae75c20 100644 (file)
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
                                    options,
                                    logical_read_local_xlog_page,
                                    LogicalOutputPrepareWrite,
-                                   LogicalOutputWrite);
+                                   LogicalOutputWrite, NULL);
 
        MemoryContextSwitchTo(oldcontext);
 
index f3eaccffd5b8b8be0bcad857b6a93b2befa9a145..4ddfbf7a98b14e7d7a7a081dc5129cc6c863828a 100644 (file)
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                     XLogRecPtr commit_lsn)
 {
+   OutputPluginUpdateProgress(ctx);
+
    OutputPluginPrepareWrite(ctx, true);
    logicalrep_write_commit(ctx->out, txn, commit_lsn);
    OutputPluginWrite(ctx, true);
index 6ee1e68819afb0c578020ada73081fb6276f6a00..56a9ca965172e17df5600780201759f9585da50f 100644 (file)
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     */
    ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
                                    false, /* do not build snapshot */
-                                   logical_read_local_xlog_page, NULL, NULL);
+                                   logical_read_local_xlog_page, NULL, NULL,
+                                   NULL);
 
    /* build initial snapshot, might take a while */
    DecodingContextFindStartpoint(ctx);
index 45d027803ab997494828d7f99490ca12ea620ad4..e4e5337d549b9066924aabdf2dc0c1e3211fbaa7 100644 (file)
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
        ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
                                        logical_read_xlog_page,
-                                       WalSndPrepareWrite, WalSndWriteData);
+                                       WalSndPrepareWrite, WalSndWriteData,
+                                       WalSndUpdateProgress);
 
        /*
         * Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
     * Initialize position to the last ack'ed one, then the xlog records begin
     * to be shipped from that position.
     */
-   logical_decoding_ctx = CreateDecodingContext(
-                                              cmd->startpoint, cmd->options,
+   logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
                                                 logical_read_xlog_page,
-                                       WalSndPrepareWrite, WalSndWriteData);
+                                                WalSndPrepareWrite,
+                                                WalSndWriteData,
+                                                WalSndUpdateProgress);
 
    /* Start reading WAL from the oldest required WAL. */
    logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1239,6 +1243,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
    SetLatch(MyLatch);
 }
 
+/*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+   static TimestampTz sendTime = 0;
+   TimestampTz now = GetCurrentTimestamp();
+
+   /*
+    * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+    * to avoid flooding the lag tracker when we commit frequently.
+    */
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
+   if (!TimestampDifferenceExceeds(sendTime, now,
+                                   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+       return;
+
+   LagTrackerWrite(lsn, now);
+   sendTime = now;
+}
+
 /*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
@@ -2730,9 +2758,9 @@ XLogSendLogical(void)
    if (record != NULL)
    {
        /*
-        * Note the lack of any call to LagTrackerWrite() which is the responsibility
-        * of the logical decoding plugin. Response messages are handled normally,
-        * so this responsibility does not extend to needing to call LagTrackerRead().
+        * Note the lack of any call to LagTrackerWrite() which is handled
+        * by WalSndUpdateProgress which is called by output plugin through
+        * logical decoding write api.
         */
        LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
    bool buffer_full;
index d0b2e0bbaefff5433659a80f60818ec4ba99aa69..090f9c82680584acf47d597dbb9c08748aa3439a 100644 (file)
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+                                          struct LogicalDecodingContext *lr,
+                                                           XLogRecPtr Ptr,
+                                                           TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
    /* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
     */
    LogicalOutputPluginWriterPrepareWrite prepare_write;
    LogicalOutputPluginWriterWrite write;
+   LogicalOutputPluginWriterUpdateProgress update_progress;
 
    /*
     * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
                          bool need_full_snapshot,
                          XLogPageReadCB read_page,
                          LogicalOutputPluginWriterPrepareWrite prepare_write,
-                         LogicalOutputPluginWriterWrite do_write);
+                         LogicalOutputPluginWriterWrite do_write,
+                         LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
                      XLogRecPtr start_lsn,
                      List *output_plugin_options,
                      XLogPageReadCB read_page,
                      LogicalOutputPluginWriterPrepareWrite prepare_write,
-                     LogicalOutputPluginWriterWrite do_write);
+                     LogicalOutputPluginWriterWrite do_write,
+                     LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
                                      XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
index 08e962d0c0c9376fb7b80a2dcb4fb3499d09029a..2435e2be2d2d4ad94bd54aa632c7d67765a5485e 100644 (file)
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */