Fix the logic in libpqrcv_receive() to determine if there's any incoming data
authorHeikki Linnakangas
Thu, 13 Jan 2011 15:51:28 +0000 (17:51 +0200)
committerHeikki Linnakangas
Thu, 13 Jan 2011 16:26:39 +0000 (18:26 +0200)
that can be read without blocking. It used to conclude that there isn't, even
though there was data in the socket receive buffer. That lead walreceiver to
flush the WAL after every received chunk, potentially causing big performance
issues.

Backpatch to 9.0, because the performance impact can be very significant.

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

index 5aac85d2ee7fc066517bacc7e152badd65c68029..c052df242f2452c094d5fdb54d17e678db6d84ac 100644 (file)
@@ -41,7 +41,6 @@ void      _PG_init(void);
 
 /* Current connection to the primary, if any */
 static PGconn *streamConn = NULL;
-static bool justconnected = false;
 
 /* Buffer for currently read records */
 static char *recvBuf = NULL;
@@ -168,7 +167,6 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
    }
    PQclear(res);
 
-   justconnected = true;
    ereport(LOG,
        (errmsg("streaming replication successfully connected to primary")));
 
@@ -321,7 +319,6 @@ libpqrcv_disconnect(void)
 {
    PQfinish(streamConn);
    streamConn = NULL;
-   justconnected = false;
 }
 
 /*
@@ -351,28 +348,30 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
        PQfreemem(recvBuf);
    recvBuf = NULL;
 
-   /*
-    * If the caller requested to block, wait for data to arrive. But if this
-    * is the first call after connecting, don't wait, because there might
-    * already be some data in libpq buffer that we haven't returned to
-    * caller.
-    */
-   if (timeout > 0 && !justconnected)
+   /* Try to receive a CopyData message */
+   rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+   if (rawlen == 0)
    {
-       if (!libpq_select(timeout))
-           return false;
+       /*
+        * No data available yet. If the caller requested to block, wait for
+        * more data to arrive.
+        */
+       if (timeout > 0)
+       {
+           if (!libpq_select(timeout))
+               return false;
+       }
 
        if (PQconsumeInput(streamConn) == 0)
            ereport(ERROR,
                    (errmsg("could not receive data from WAL stream: %s",
                            PQerrorMessage(streamConn))));
-   }
-   justconnected = false;
 
-   /* Receive CopyData message */
-   rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
-   if (rawlen == 0)            /* no data available yet, then return */
-       return false;
+       /* Now that we've consumed some input, try again */
+       rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+       if (rawlen == 0)
+           return false;
+   }
    if (rawlen == -1)           /* end-of-streaming or error */
    {
        PGresult   *res;