Separate messages for standby replies and hot standby feedback.
authorSimon Riggs
Fri, 18 Feb 2011 11:31:49 +0000 (11:31 +0000)
committerSimon Riggs
Fri, 18 Feb 2011 11:31:49 +0000 (11:31 +0000)
Allow messages to be sent at different times, and greatly reduce
the frequency of hot standby feedback. Refactor to allow additional
message types.

src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/replication/walprotocol.h

index ee09468db17610c235e9846bc8c8c4890d5bc842..c7f5bd5ea395aee4751f99746ea58ca1ea53f232 100644 (file)
@@ -95,6 +95,7 @@ static struct
 }  LogstreamResult;
 
 static StandbyReplyMessage reply_message;
+static StandbyHSFeedbackMessage    feedback_message;
 
 /*
  * About SIGTERM handling:
@@ -123,6 +124,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(void);
+static void XLogWalRcvSendHSFeedback(void);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -317,6 +319,7 @@ WalReceiverMain(void)
 
            /* Let the master know that we received some data. */
            XLogWalRcvSendReply();
+           XLogWalRcvSendHSFeedback();
 
            /*
             * If we've written some records, flush them to disk and let the
@@ -331,6 +334,7 @@ WalReceiverMain(void)
             * the master anyway, to report any progress in applying WAL.
             */
            XLogWalRcvSendReply();
+           XLogWalRcvSendHSFeedback();
        }
    }
 }
@@ -619,40 +623,82 @@ XLogWalRcvSendReply(void)
    reply_message.apply = GetXLogReplayRecPtr();
    reply_message.sendTime = now;
 
-   /*
-    * Get the OldestXmin and its associated epoch
-    */
-   if (hot_standby_feedback && HotStandbyActive())
-   {
-       TransactionId   nextXid;
-       uint32          nextEpoch;
-
-       reply_message.xmin = GetOldestXmin(true, false);
-
-       /*
-        * Get epoch and adjust if nextXid and oldestXmin are different
-        * sides of the epoch boundary.
-        */
-       GetNextXidAndEpoch(&nextXid, &nextEpoch);
-       if (nextXid < reply_message.xmin)
-           nextEpoch--;
-       reply_message.epoch = nextEpoch;
-   }
-   else
-   {
-       reply_message.xmin = InvalidTransactionId;
-       reply_message.epoch = 0;
-   }
-
-   elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+   elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
                 reply_message.write.xlogid, reply_message.write.xrecoff,
                 reply_message.flush.xlogid, reply_message.flush.xrecoff,
-                reply_message.apply.xlogid, reply_message.apply.xrecoff,
-                reply_message.xmin,
-                reply_message.epoch);
+                reply_message.apply.xlogid, reply_message.apply.xrecoff);
 
    /* Prepend with the message type and send it. */
    buf[0] = 'r';
    memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
    walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
 }
+
+/*
+ * Send hot standby feedback message to primary, plus the current time,
+ * in case they don't have a watch.
+ */
+static void
+XLogWalRcvSendHSFeedback(void)
+{
+   char            buf[sizeof(StandbyHSFeedbackMessage) + 1];
+   TimestampTz     now;
+   TransactionId   nextXid;
+   uint32          nextEpoch;
+   TransactionId   xmin;
+
+   /*
+    * If the user doesn't want status to be reported to the master, be sure
+    * to exit before doing anything at all.
+    */
+   if (!hot_standby_feedback)
+       return;
+
+   /* Get current timestamp. */
+   now = GetCurrentTimestamp();
+
+   /*
+    * Send feedback at most once per wal_receiver_status_interval.
+    */
+   if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
+           wal_receiver_status_interval * 1000))
+       return;
+
+   /*
+    * If Hot Standby is not yet active there is nothing to send.
+    * Check this after the interval has expired to reduce number of
+    * calls.
+    */
+   if (!HotStandbyActive())
+       return;
+
+   /*
+    * Make the expensive call to get the oldest xmin once we are
+    * certain everything else has been checked.
+    */
+   xmin = GetOldestXmin(true, false);
+
+   /*
+    * Get epoch and adjust if nextXid and oldestXmin are different
+    * sides of the epoch boundary.
+    */
+   GetNextXidAndEpoch(&nextXid, &nextEpoch);
+   if (nextXid < xmin)
+       nextEpoch--;
+
+   /*
+    * Always send feedback message.
+    */
+   feedback_message.sendTime = now;
+   feedback_message.xmin = xmin;
+   feedback_message.epoch = nextEpoch;
+
+   elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
+                feedback_message.xmin,
+                feedback_message.epoch);
+
+   /* Prepend with the message type and send it. */
+   buf[0] = 'h';
+   memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
+   walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
+}
index a6a7a1425be909eceb26f88f92d746f7a8280278..e04d59e1e774c69dde43e1cf8b1dccf294c8bdc2 100644 (file)
@@ -116,7 +116,9 @@ static void WalSndKill(int code, Datum arg);
 static bool XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
+static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
 
 
@@ -456,42 +458,45 @@ ProcessRepliesIfAny(void)
    unsigned char firstchar;
    int         r;
 
-   r = pq_getbyte_if_available(&firstchar);
-   if (r < 0)
-   {
-       /* unexpected error or EOF */
-       ereport(COMMERROR,
-               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                errmsg("unexpected EOF on standby connection")));
-       proc_exit(0);
-   }
-   if (r == 0)
+   for (;;)
    {
-       /* no data available without blocking */
-       return;
-   }
+       r = pq_getbyte_if_available(&firstchar);
+       if (r < 0)
+       {
+           /* unexpected error or EOF */
+           ereport(COMMERROR,
+                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                    errmsg("unexpected EOF on standby connection")));
+           proc_exit(0);
+       }
+       if (r == 0)
+       {
+           /* no data available without blocking */
+           return;
+       }
 
-   /* Handle the very limited subset of commands expected in this phase */
-   switch (firstchar)
-   {
-           /*
-            * 'd' means a standby reply wrapped in a CopyData packet.
-            */
-       case 'd':
-           ProcessStandbyReplyMessage();
-           break;
+       /* Handle the very limited subset of commands expected in this phase */
+       switch (firstchar)
+       {
+               /*
+                * 'd' means a standby reply wrapped in a CopyData packet.
+                */
+           case 'd':
+               ProcessStandbyMessage();
+               break;
 
-           /*
-            * 'X' means that the standby is closing down the socket.
-            */
-       case 'X':
-           proc_exit(0);
+               /*
+                * 'X' means that the standby is closing down the socket.
+                */
+           case 'X':
+               proc_exit(0);
 
-       default:
-           ereport(FATAL,
-                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                    errmsg("invalid standby closing message type %d",
-                           firstchar)));
+           default:
+               ereport(FATAL,
+                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                        errmsg("invalid standby closing message type %d",
+                               firstchar)));
+       }
    }
 }
 
@@ -499,11 +504,9 @@ ProcessRepliesIfAny(void)
  * Process a status update message received from standby.
  */
 static void
-ProcessStandbyReplyMessage(void)
+ProcessStandbyMessage(void)
 {
-   StandbyReplyMessage reply;
    char msgtype;
-   TransactionId newxmin = InvalidTransactionId;
 
    resetStringInfo(&reply_message);
 
@@ -523,22 +526,39 @@ ProcessStandbyReplyMessage(void)
     * one type.
     */
    msgtype = pq_getmsgbyte(&reply_message);
-   if (msgtype != 'r')
+
+   switch (msgtype)
    {
-       ereport(COMMERROR,
-               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                errmsg("unexpected message type %c", msgtype)));
-       proc_exit(0);
+       case 'r':
+           ProcessStandbyReplyMessage();
+           break;
+
+       case 'h':
+           ProcessStandbyHSFeedbackMessage();
+           break;
+
+       default:
+           ereport(COMMERROR,
+                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                    errmsg("unexpected message type %c", msgtype)));
+           proc_exit(0);
    }
+}
+
+/*
+ * Regular reply from standby advising of WAL positions on standby server.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+   StandbyReplyMessage reply;
 
    pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
-   elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+   elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
         reply.write.xlogid, reply.write.xrecoff,
         reply.flush.xlogid, reply.flush.xrecoff,
-        reply.apply.xlogid, reply.apply.xrecoff,
-        reply.xmin,
-        reply.epoch);
+        reply.apply.xlogid, reply.apply.xrecoff);
 
    /*
     * Update shared state for this WalSender process
@@ -554,6 +574,22 @@ ProcessStandbyReplyMessage(void)
        walsnd->apply = reply.apply;
        SpinLockRelease(&walsnd->mutex);
    }
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
+   StandbyHSFeedbackMessage    reply;
+   TransactionId newxmin = InvalidTransactionId;
+
+   pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+
+   elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+        reply.xmin,
+        reply.epoch);
 
    /*
     * Update the WalSender's proc xmin to allow it to be visible
index da94b6b2f306607bcfb381c8a1174bdce2ab1f94..9baca948a238227af417aad1c2d0ad6f77737a56 100644 (file)
@@ -56,6 +56,18 @@ typedef struct
    XLogRecPtr  flush;
    XLogRecPtr  apply;
 
+   /* Sender's system clock at the time of transmission */
+   TimestampTz sendTime;
+} StandbyReplyMessage;
+
+/*
+ * Hot Standby feedback from standby (message type 'h').  This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef struct
+{
    /*
     * The current xmin and epoch from the standby, for Hot Standby feedback.
     * This may be invalid if the standby-side does not support feedback,
@@ -64,10 +76,9 @@ typedef struct
    TransactionId   xmin;
    uint32          epoch;
 
-
    /* Sender's system clock at the time of transmission */
    TimestampTz sendTime;
-} StandbyReplyMessage;
+} StandbyHSFeedbackMessage;
 
 /*
  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.