Fix walsender failure at promotion.
authorHeikki Linnakangas
Wed, 8 May 2013 17:10:17 +0000 (20:10 +0300)
committerHeikki Linnakangas
Wed, 8 May 2013 17:30:17 +0000 (20:30 +0300)
If a standby server has a cascading standby server connected to it, it's
possible that WAL has already been sent up to the next WAL page boundary,
splitting a WAL record in the middle, when the first standby server is
promoted. Don't throw an assertion failure or error in walsender if that
happens.

Also, fix a variant of the same bug in pg_receivexlog: if it had already
received WAL on previous timeline up to a segment boundary, when the
upstream standby server is promoted so that the timeline switch record falls
on the previous segment, pg_receivexlog would miss the segment containing
the timeline switch. To fix that, have walsender send the position of the
timeline switch at end-of-streaming, in addition to the next timeline's ID.
It was previously assumed that the switch happened exactly where the
streaming stopped.

Note: this is an incompatible change in the streaming protocol. You might
get an error if you try to stream over timeline switches, if the client is
running 9.3beta1 and the server is more recent. It should be fine after a
reconnect, however.

Reported by Fujii Masao.

doc/src/sgml/protocol.sgml
src/backend/access/transam/xlog.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c

index 1e2604b8326c0fb8fe41ec2e020a762219e3b4d8..70165115f5c302c551fade23ba34eee6d4d19023 100644 (file)
@@ -1423,10 +1423,15 @@ The commands accepted in walsender mode are:
      
       After streaming all the WAL on a timeline that is not the latest one,
       the server will end streaming by exiting the COPY mode. When the client
-      acknowledges this by also exiting COPY mode, the server sends a
-      single-row, single-column result set indicating the next timeline in
-      this server's history. That is followed by a CommandComplete message,
-      and the server is ready to accept a new command.
+      acknowledges this by also exiting COPY mode, the server sends a result
+      set with one row and two columns, indicating the next timeline in this
+      server's history. The first column is the next timeline's ID, and the
+      second column is the XLOG position where the switch happened. Usually,
+      the switch position is the end of the WAL that was streamed, but there
+      are corner cases where the server can send some WAL from the old
+      timeline that it has not itself replayed before promoting. Finally, the
+      server sends CommandComplete message, and is ready to accept a new
+      command.
      
 
      
index 959f4231873cbd8f9fe72049a715d9c48a7a5f2b..f7dd61c4c7538fe99cef58bf8e27be9a1ede0052 100644 (file)
@@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                        }
                        else
                        {
-                           ptr = RecPtr;
+                           ptr = tliRecPtr;
                            tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
 
                            if (curFileTLI > 0 && tli < curFileTLI)
@@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                     tli, curFileTLI);
                        }
                        curFileTLI = tli;
-                       RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
+                       RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+                       receivedUpto = 0;
                    }
                    /*
                     * Move to XLOG_FROM_STREAM state in either case. We'll get
index e6e670e9e4bc8f8002d5b06b672fb46c2a79765b..f7cc6e3c2f56f91cdfa8cacdc95aa3ef76c73021 100644 (file)
@@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
    res = PQgetResult(streamConn);
    if (PQresultStatus(res) == PGRES_TUPLES_OK)
    {
-       /* Read the next timeline's ID */
-       if (PQnfields(res) != 1 || PQntuples(res) != 1)
+       /*
+        * Read the next timeline's ID. The server also sends the timeline's
+        * starting point, but it is ignored.
+        */
+       if (PQnfields(res) < 2 || PQntuples(res) != 1)
            ereport(ERROR,
                    (errmsg("unexpected result set after end-of-streaming")));
        *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
index d414808c9f2f464053adbffb5d6b9ee32b991f80..e5ad84393fa598910a27c8ec0e5fa367e93b993e 100644 (file)
@@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
    walrcv->startTime = now;
 
    /*
-    * If this is the first startup of walreceiver, we initialize receivedUpto
-    * and latestChunkStart to receiveStart.
+    * If this is the first startup of walreceiver (on this timeline),
+    * initialize receivedUpto and latestChunkStart to the starting point.
     */
-   if (walrcv->receiveStart == 0)
+   if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
    {
        walrcv->receivedUpto = recptr;
+       walrcv->receivedTLI = tli;
        walrcv->latestChunkStart = recptr;
    }
    walrcv->receiveStart = recptr;
index c05bb1e081841c11e70e37bdac0ddb1d2c55c4d4..1dcb0f57f446b0f88cd41d3a8d724aa4af36d10b 100644 (file)
@@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd)
     */
    if (sendTimeLineIsHistoric)
    {
-       char        str[11];
-       snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
+       char        tli_str[11];
+       char        startpos_str[8+1+8+1];
 
-       pq_beginmessage(&buf, 'T'); /* RowDescription */
-       pq_sendint(&buf, 1, 2);     /* 1 field */
+       snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
+       snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
+                (uint32) (sendTimeLineValidUpto >> 32),
+                (uint32) sendTimeLineValidUpto);
+
+       pq_beginmessage(&buf, 'T');     /* RowDescription */
+       pq_sendint(&buf, 2, 2);         /* 2 fields */
 
        /* Field header */
        pq_sendstring(&buf, "next_tli");
-       pq_sendint(&buf, 0, 4);     /* table oid */
-       pq_sendint(&buf, 0, 2);     /* attnum */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
        /*
         * int8 may seem like a surprising data type for this, but in theory
         * int4 would not be wide enough for this, as TimeLineID is unsigned.
@@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd)
        pq_sendint(&buf, -1, 2);
        pq_sendint(&buf, 0, 4);
        pq_sendint(&buf, 0, 2);
+
+       pq_sendstring(&buf, "next_tli_startpos");
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);   /* type oid */
+       pq_sendint(&buf, -1, 2);
+       pq_sendint(&buf, 0, 4);
+       pq_sendint(&buf, 0, 2);
        pq_endmessage(&buf);
 
        /* Data row */
        pq_beginmessage(&buf, 'D');
-       pq_sendint(&buf, 1, 2);     /* number of columns */
-       pq_sendint(&buf, strlen(str), 4);   /* length */
-       pq_sendbytes(&buf, str, strlen(str));
+       pq_sendint(&buf, 2, 2);         /* number of columns */
+
+       pq_sendint(&buf, strlen(tli_str), 4);   /* length */
+       pq_sendbytes(&buf, tli_str, strlen(tli_str));
+
+       pq_sendint(&buf, strlen(startpos_str), 4);  /* length */
+       pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
+
        pq_endmessage(&buf);
    }
 
@@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup)
 
            history = readTimeLineHistory(ThisTimeLineID);
            sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
-           Assert(sentPtr <= sendTimeLineValidUpto);
+
            Assert(sendTimeLine < sendTimeLineNextTLI);
            list_free_deep(history);
 
-           /* the current send pointer should be <= the switchpoint */
-           if (!(sentPtr <= sendTimeLineValidUpto))
-               elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
-                    sendTimeLine,
-                    (uint32) (sendTimeLineValidUpto >> 32),
-                    (uint32) sendTimeLineValidUpto,
-                    (uint32) (sentPtr >> 32),
-                    (uint32) sentPtr);
-
            sendTimeLineIsHistoric = true;
 
            SendRqstPtr = sendTimeLineValidUpto;
@@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup)
    /*
     * If this is a historic timeline and we've reached the point where we
     * forked to the next timeline, stop streaming.
+    *
+    * Note: We might already have sent WAL > sendTimeLineValidUpto. The
+    * startup process will normally replay all WAL that has been received from
+    * the master, before promoting, but if the WAL streaming is terminated at
+    * a WAL page boundary, the valid portion of the timeline might end in the
+    * middle of a WAL record. We might've already sent the first half of that
+    * partial WAL record to the cascading standby, so that sentPtr >
+    * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the
+    * partial WAL record either, so it can still follow our timeline switch.
     */
    if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
    {
@@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup)
        streamingDoneSending = true;
 
        *caughtup = true;
+
+       elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+            (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+            (uint32) (sentPtr >> 32), (uint32) sentPtr);
        return;
    }
 
index e4da799d1fddb28617d46c92f4176618427a71b5..fa0ac5184c1384eb122305c5cc28d3b52d31c1c5 100644 (file)
@@ -83,10 +83,12 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
                timeline);
 
    /*
-    * Note that we report the previous, not current, position here. That's
-    * the exact location where the timeline switch happend. After the switch,
-    * we restart streaming from the beginning of the segment, so xlogpos can
-    * smaller than prevpos if we just switched to new timeline.
+    * Note that we report the previous, not current, position here. After a
+    * timeline switch, xlogpos points to the beginning of the segment because
+    * that's where we always begin streaming. Reporting the end of previous
+    * timeline isn't totally accurate, because the next timeline can begin
+    * slightly before the end of the WAL that we received on the previous
+    * timeline, but it's close enough for reporting purposes.
     */
    if (prevtimeline != 0 && prevtimeline != timeline)
        fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
index f297003c62f81a608e4f47a0cf139af9e1e8fd86..98e874f4ffe266da08d09f8fbf9aef4f60555b7f 100644 (file)
@@ -37,6 +37,9 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                 stream_stop_callback stream_stop, int standby_message_timeout,
                 char *partial_suffix, XLogRecPtr *stoppos);
 
+static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
+                        uint32 *timeline);
+
 /*
  * Open a new WAL file in the specified directory.
  *
@@ -627,26 +630,44 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
         * There are two possible reasons for that: a controlled shutdown,
         * or we reached the end of the current timeline. In case of
         * end-of-timeline, the server sends a result set after Copy has
-        * finished, containing the next timeline's ID. Read that, and
-        * restart streaming from the next timeline.
+        * finished, containing information about the next timeline. Read
+        * that, and restart streaming from the next timeline. In case of
+        * controlled shutdown, stop here.
         */
-
        if (PQresultStatus(res) == PGRES_TUPLES_OK)
        {
            /*
-            * End-of-timeline. Read the next timeline's ID.
+            * End-of-timeline. Read the next timeline's ID and starting
+            * position. Usually, the starting position will match the end of
+            * the previous timeline, but there are corner cases like if the
+            * server had sent us half of a WAL record, when it was promoted.
+            * The new timeline will begin at the end of the last complete
+            * record in that case, overlapping the partial WAL record on the
+            * the old timeline.
             */
            uint32      newtimeline;
+           bool        parsed;
 
-           newtimeline = atoi(PQgetvalue(res, 0, 0));
+           parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
            PQclear(res);
+           if (!parsed)
+               goto error;
 
+           /* Sanity check the values the server gave us */
            if (newtimeline <= timeline)
            {
-               /* shouldn't happen */
                fprintf(stderr,
-                       "server reported unexpected next timeline %u, following timeline %u\n",
-                       newtimeline, timeline);
+                       _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
+                       progname, newtimeline, timeline);
+               goto error;
+           }
+           if (startpos > stoppos)
+           {
+               fprintf(stderr,
+                       _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
+                       progname,
+                       timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
+                       newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
                goto error;
            }
 
@@ -666,7 +687,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
             * Always start streaming at the beginning of a segment.
             */
            timeline = newtimeline;
-           startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
+           startpos = startpos - (startpos % XLOG_SEG_SIZE);
            continue;
        }
        else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -704,6 +725,50 @@ error:
    return false;
 }
 
+/*
+ * Helper function to parse the result set returned by server after streaming
+ * has finished. On failure, prints an error to stderr and returns false.
+ */
+static bool
+ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
+{
+   uint32      startpos_xlogid,
+               startpos_xrecoff;
+
+   /*----------
+    * The result set consists of one row and two columns, e.g:
+    *
+    *  next_tli | next_tli_startpos
+    * ----------+-------------------
+    *         4 | 0/9949AE0
+    *
+    * next_tli is the timeline ID of the next timeline after the one that
+    * just finished streaming. next_tli_startpos is the XLOG position where
+    * the server switched to it.
+    *----------
+    */
+   if (PQnfields(res) < 2 || PQntuples(res) != 1)
+   {
+       fprintf(stderr,
+               _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
+               progname, PQntuples(res), PQnfields(res), 1, 2);
+       return false;
+   }
+
+   *timeline = atoi(PQgetvalue(res, 0, 0));
+   if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
+              &startpos_xrecoff) != 2)
+   {
+       fprintf(stderr,
+               _("%s: could not parse next timeline's starting point \"%s\"\n"),
+               progname, PQgetvalue(res, 0, 1));
+       return false;
+   }
+   *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
+
+   return true;
+}
+
 /*
  * The main loop of ReceiveXLogStream. Handles the COPY stream after
  * initiating streaming with the START_STREAMING command.