Send new protocol keepalive messages to standby servers.
authorSimon Riggs
Sat, 31 Dec 2011 13:30:26 +0000 (13:30 +0000)
committerSimon Riggs
Sat, 31 Dec 2011 13:30:26 +0000 (13:30 +0000)
Allows streaming replication users to calculate transfer latency
and apply delay via internal functions. No external functions yet.

doc/src/sgml/protocol.sgml
src/backend/access/transam/xlog.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/include/access/xlog.h
src/include/replication/walprotocol.h
src/include/replication/walreceiver.h

index d6332e58cf79f981bf7396df782686fc0b0738d7..71c40cc592e2b0cd5075075886c2d948af254f56 100644 (file)
@@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are:
        CopyData message):
      
 
+     
+      
+      
+      
+          Primary keepalive message (B)
+      
+      
+      
+      
+      
+      
+          Byte1('k')
+      
+      
+      
+          Identifies the message as a sender keepalive.
+      
+      
+      
+      
+      
+          Byte8
+      
+      
+      
+          The current end of WAL on the server, given in
+          XLogRecPtr format.
+      
+      
+      
+      
+      
+          Byte8
+      
+      
+      
+          The server's system clock at the time of transmission,
+          given in TimestampTz format.
+      
+      
+      
+      
+      
+      
+      
+      
+     
+
      
       
       
index 41800a46040b56f911a950ef87df293a11ae1560..d98a763fda67606e16bd01a91ab85701598acd7e 100644 (file)
@@ -452,6 +452,9 @@ typedef struct XLogCtlData
    XLogRecPtr  recoveryLastRecPtr;
    /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
    TimestampTz recoveryLastXTime;
+   /* timestamp of when we started replaying the current chunk of WAL data,
+    * only relevant for replication or archive recovery */
+   TimestampTz currentChunkStartTime;
    /* end of the last record restored from the archive */
    XLogRecPtr  restoreLastRecPtr;
    /* Are we requested to pause recovery? */
@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
 static void recoveryPausesHere(void);
 static void SetLatestXTime(TimestampTz xtime);
+static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
 static void LocalSetXLogInsertAllowed(void);
@@ -5847,6 +5851,41 @@ GetLatestXTime(void)
    return xtime;
 }
 
+/*
+ * Save timestamp of the next chunk of WAL records to apply.
+ *
+ * We keep this in XLogCtl, not a simple static variable, so that it can be
+ * seen by all backends.
+ */
+static void
+SetCurrentChunkStartTime(TimestampTz xtime)
+{
+   /* use volatile pointer to prevent code rearrangement */
+   volatile XLogCtlData *xlogctl = XLogCtl;
+
+   SpinLockAcquire(&xlogctl->info_lck);
+   xlogctl->currentChunkStartTime = xtime;
+   SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Fetch timestamp of latest processed commit/abort record.
+ * Startup process maintains an accurate local copy in XLogReceiptTime
+ */
+TimestampTz
+GetCurrentChunkReplayStartTime(void)
+{
+   /* use volatile pointer to prevent code rearrangement */
+   volatile XLogCtlData *xlogctl = XLogCtl;
+   TimestampTz xtime;
+
+   SpinLockAcquire(&xlogctl->info_lck);
+   xtime = xlogctl->currentChunkStartTime;
+   SpinLockRelease(&xlogctl->info_lck);
+
+   return xtime;
+}
+
 /*
  * Returns time of receipt of current chunk of XLOG data, as well as
  * whether it was received from streaming replication or from archives.
@@ -6390,6 +6429,7 @@ StartupXLOG(void)
        xlogctl->replayEndRecPtr = ReadRecPtr;
        xlogctl->recoveryLastRecPtr = ReadRecPtr;
        xlogctl->recoveryLastXTime = 0;
+       xlogctl->currentChunkStartTime = 0;
        xlogctl->recoveryPause = false;
        SpinLockRelease(&xlogctl->info_lck);
 
@@ -9696,7 +9736,10 @@ retry:
                        {
                            havedata = true;
                            if (!XLByteLT(*RecPtr, latestChunkStart))
+                           {
                                XLogReceiptTime = GetCurrentTimestamp();
+                               SetCurrentChunkStartTime(XLogReceiptTime);
+                           }
                        }
                        else
                            havedata = false;
index 1f12dcb62aa0fba4dfd6f859caa1163e5f47ed4e..8106d6b3a41ea680fa7e76e0c135753adc75b686 100644 (file)
@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(void);
 static void XLogWalRcvSendHSFeedback(void);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -218,6 +219,10 @@ WalReceiverMain(void)
    /* Fetch information required to start streaming */
    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
    startpoint = walrcv->receiveStart;
+
+   /* Initialise to a sanish value */
+   walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
+
    SpinLockRelease(&walrcv->mutex);
 
    /* Arrange to clean up at walreceiver exit */
@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
                             errmsg_internal("invalid WAL message received from primary")));
                /* memcpy is required here for alignment reasons */
                memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+
+               ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
+
                buf += sizeof(WalDataMessageHeader);
                len -= sizeof(WalDataMessageHeader);
-
                XLogWalRcvWrite(buf, len, msghdr.dataStart);
                break;
            }
+       case 'k':               /* Keepalive */
+           {
+               PrimaryKeepaliveMessage keepalive;
+
+               if (len != sizeof(PrimaryKeepaliveMessage))
+                   ereport(ERROR,
+                           (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                            errmsg_internal("invalid keepalive message received from primary")));
+               /* memcpy is required here for alignment reasons */
+               memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+
+               ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+               break;
+           }
        default:
            ereport(ERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
    memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
    walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
 }
+
+/*
+ * Keep track of important messages from primary.
+ */
+static void
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+{
+   /* use volatile pointer to prevent code rearrangement */
+   volatile WalRcvData *walrcv = WalRcv;
+
+   TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+
+   /* Update shared-memory status */
+   SpinLockAcquire(&walrcv->mutex);
+   walrcv->lastMsgSendTime = sendTime;
+   walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+   SpinLockRelease(&walrcv->mutex);
+
+   elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
+                   timestamptz_to_str(sendTime),
+                   timestamptz_to_str(lastMsgReceiptTime),
+                   GetReplicationApplyDelay(),
+                   GetReplicationTransferLatency());
+}
index 5bce1c34a1b5f06670b83efce9636fe49d3eff75..054355b2c59f0b9ec895030aa12e199c9e81e866 100644 (file)
@@ -28,6 +28,7 @@
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/timestamp.h"
 
 WalRcvData *WalRcv = NULL;
 
@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
 
    return recptr;
 }
+
+/*
+ * Returns the replication apply delay in ms
+ */
+int
+GetReplicationApplyDelay(void)
+{
+   /* use volatile pointer to prevent code rearrangement */
+   volatile WalRcvData *walrcv = WalRcv;
+
+   XLogRecPtr  receivePtr;
+   XLogRecPtr  replayPtr;
+
+   long    secs;
+   int     usecs;
+
+   SpinLockAcquire(&walrcv->mutex);
+   receivePtr = walrcv->receivedUpto;
+   SpinLockRelease(&walrcv->mutex);
+
+   replayPtr = GetXLogReplayRecPtr(NULL);
+
+   if (XLByteLE(receivePtr, replayPtr))
+       return 0;
+
+   TimestampDifference(GetCurrentChunkReplayStartTime(),
+                       GetCurrentTimestamp(),
+                       &secs, &usecs);
+
+   return (((int) secs * 1000) + (usecs / 1000));
+}
+
+/*
+ * Returns the network latency in ms, note that this includes any
+ * difference in clock settings between the servers, as well as timezone.
+ */
+int
+GetReplicationTransferLatency(void)
+{
+   /* use volatile pointer to prevent code rearrangement */
+   volatile WalRcvData *walrcv = WalRcv;
+
+   TimestampTz lastMsgSendTime;
+   TimestampTz lastMsgReceiptTime;
+
+   long    secs = 0;
+   int     usecs = 0;
+   int     ms;
+
+   SpinLockAcquire(&walrcv->mutex);
+   lastMsgSendTime = walrcv->lastMsgSendTime;
+   lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
+   SpinLockRelease(&walrcv->mutex);
+
+   TimestampDifference(lastMsgSendTime,
+                       lastMsgReceiptTime,
+                       &secs, &usecs);
+
+   ms = ((int) secs * 1000) + (usecs / 1000);
+
+   return ms;
+}
index ea865204172263fe0dc143c52c994a0bde562845..ed7298b6ee8fc5580630466096e24ee16cafd9b2 100644 (file)
@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
@@ -823,30 +824,24 @@ WalSndLoop(void)
         */
        if (caughtup || pq_is_send_pending())
        {
-           TimestampTz finish_time = 0;
-           long        sleeptime = -1;
+           TimestampTz timeout = 0;
+           long        sleeptime = 10000; /* 10 s */
            int         wakeEvents;
 
            wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
-               WL_SOCKET_READABLE;
+               WL_SOCKET_READABLE | WL_TIMEOUT;
+
            if (pq_is_send_pending())
                wakeEvents |= WL_SOCKET_WRITEABLE;
+           else
+               WalSndKeepalive(output_message);
 
            /* Determine time until replication timeout */
            if (replication_timeout > 0)
            {
-               long        secs;
-               int         usecs;
-
-               finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+               timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
                                                          replication_timeout);
-               TimestampDifference(GetCurrentTimestamp(),
-                                   finish_time, &secs, &usecs);
-               sleeptime = secs * 1000 + usecs / 1000;
-               /* Avoid Assert in WaitLatchOrSocket if timeout is past */
-               if (sleeptime < 0)
-                   sleeptime = 0;
-               wakeEvents |= WL_TIMEOUT;
+               sleeptime = 1 + (replication_timeout / 10);
            }
 
            /* Sleep until something happens or replication timeout */
@@ -859,7 +854,7 @@ WalSndLoop(void)
             * timeout ... he's supposed to reply *before* that.
             */
            if (replication_timeout > 0 &&
-               GetCurrentTimestamp() >= finish_time)
+               GetCurrentTimestamp() >= timeout)
            {
                /*
                 * Since typically expiration of replication timeout means
@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
    return (Datum) 0;
 }
 
+static void
+WalSndKeepalive(char *msgbuf)
+{
+   PrimaryKeepaliveMessage keepalive_message;
+
+   /* Construct a new message */
+   keepalive_message.walEnd = sentPtr;
+   keepalive_message.sendTime = GetCurrentTimestamp();
+
+   elog(DEBUG2, "sending replication keepalive");
+
+   /* Prepend with the message type and send it. */
+   msgbuf[0] = 'k';
+   memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+   pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the
index 86ab3276caf125e79b380173c7fff840e98a67cf..4b1f8b8c2f34cdd6990c72f3e75246424ae2939a 100644 (file)
@@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
 extern bool RecoveryIsPaused(void);
 extern void SetRecoveryPause(bool recoveryPause);
 extern TimestampTz GetLatestXTime(void);
+extern TimestampTz GetCurrentChunkReplayStartTime(void);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
index 656c8fc17fdf5973d4869657d38958b0724c9c3e..053376d3774d945385113c4e7c1515fff6e71186 100644 (file)
 #include "datatype/timestamp.h"
 
 
+/*
+ * All messages from WalSender must contain these fields to allow us to
+ * correctly calculate the replication delay.
+ */
+typedef struct
+{
+   /* Current end of WAL on the sender */
+   XLogRecPtr  walEnd;
+
+   /* Sender's system clock at the time of transmission */
+   TimestampTz sendTime;
+} WalSndrMessage;
+
+
 /*
  * Header for a WAL data message (message type 'w').  This is wrapped within
  * a CopyData message at the FE/BE protocol level.
@@ -39,6 +53,14 @@ typedef struct
    TimestampTz sendTime;
 } WalDataMessageHeader;
 
+/*
+ * Keepalive message from primary (message type 'k'). (lowercase k)
+ * This is wrapped within a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef WalSndrMessage PrimaryKeepaliveMessage;
+
 /*
  * Reply message from standby (message type 'r').  This is wrapped within
  * a CopyData message at the FE/BE protocol level.
index 77f525209170cbeeb0eaa10f77cc8f18a6fff334..926730c9f823a6917e0943ae5279a3452d1d15fb 100644 (file)
@@ -78,6 +78,12 @@ typedef struct
     */
    XLogRecPtr  latestChunkStart;
 
+   /*
+    * Time of send and receive of any message received.
+    */
+   TimestampTz lastMsgSendTime;
+   TimestampTz lastMsgReceiptTime;
+
    /*
     * connection string; is used for walreceiver to connect with the primary.
     */
@@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern int GetReplicationApplyDelay(void);
+extern int GetReplicationTransferLatency(void);
 
 #endif   /* _WALRECEIVER_H */