Send status updates back from standby server to master, indicating how far
authorHeikki Linnakangas
Thu, 10 Feb 2011 19:00:29 +0000 (21:00 +0200)
committerHeikki Linnakangas
Thu, 10 Feb 2011 19:04:02 +0000 (21:04 +0200)
the standby has written, flushed, and applied the WAL. At the moment, this
is for informational purposes only, the values are only shown in
pg_stat_replication system view, but in the future they will also be needed
for synchronous replication.

Extracted from Simon riggs' synchronous replication patch by Robert Haas, with
some tweaking by me.

15 files changed:
doc/src/sgml/config.sgml
doc/src/sgml/monitoring.sgml
doc/src/sgml/protocol.sgml
src/backend/access/transam/xlog.c
src/backend/catalog/system_views.sql
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/access/xlog.h
src/include/catalog/pg_proc.h
src/include/replication/walprotocol.h
src/include/replication/walreceiver.h
src/include/replication/walsender.h
src/test/regress/expected/rules.out

index 5a43774b33dc73d6a066874ac30ed22705638172..63c6283f915bf92a43db94816fec55b165b10311 100644 (file)
@@ -1984,6 +1984,29 @@ SET ENABLE_SEQSCAN TO OFF;
        
       
 
+      
+       wal_receiver_status_interval (integer)
+       
+        wal_receiver_status_interval configuration parameter
+       
+       
+       
+        Specifies the minimum frequency, in seconds, for the WAL receiver
+        process on the standby to send information about replication progress
+        to the primary, where they can be seen using the
+        pg_stat_replication view.  The standby will report
+        the last transaction log position it has written, the last position it
+        has flushed to disk, and the last position it has applied.  Updates are
+        sent each time the write or flush positions changed, or at least as
+        often as specified by this parameter.  Thus, the apply position may
+        lag slightly behind the true position.  Setting this parameter to zero
+        disables status updates completely.  This parameter can only be set in
+        the postgresql.conf file or on the server command line.
+        The default value is 10 seconds.
+       
+       
+      
+
      
       vacuum_defer_cleanup_age (integer)
       
index 2e8427a40f1913d2bade6c4210cf5ac5d917963b..58e3459e6785bc7294a51cf12adc0bc4c1c4d57b 100644 (file)
@@ -298,8 +298,11 @@ postgres: user database host 
       pg_stat_replicationpg_stat_replication
       One row per WAL sender process, showing process ID,
       user OID, user name, application name, client's address and port number,
-      time at which the server process began execution, current WAL sender
-      state and transaction log location. The columns detailing what exactly
+      time at which the server process began execution, and the current WAL
+      sender state and transaction log location.  In addition, the standby
+      reports the last transaction log position it received and wrote, the last
+      position it flushed to disk, and the last position it replayed, and this
+      information is also displayed here.  The columns detailing what exactly
       the connection is doing are only visible if the user examining the view
       is a superuser.
      
index c09d961d67e22748b5807313aecf6d63d071b3a2..c923d3b154c5a05b22389dc66fc0fb9379d20977 100644 (file)
@@ -1469,6 +1469,82 @@ The commands accepted in walsender mode are:
        shutdown), it will send a CommandComplete message before exiting.
        This might not happen during an abnormal shutdown, of course.
      
+
+     
+       The receiving process can send a status update back to the sender at
+       any time, using the following message format (also in the payload of
+       a CopyData message):
+     
+
+     
+      
+      
+      
+          Standby status update (F)
+      
+      
+      
+      
+      
+      
+          Byte1('r')
+      
+      
+      
+          Identifies the message as a receiver status update.
+      
+      
+      
+      
+      
+          Byte8
+      
+      
+      
+          The location of the last WAL byte + 1 received and written to disk
+          in the standby, in XLogRecPtr format.
+      
+      
+      
+      
+      
+          Byte8
+      
+      
+      
+          The location of the last WAL byte + 1 flushed to disk in
+          the standby, in XLogRecPtr format.
+      
+      
+      
+      
+      
+          Byte8
+      
+      
+      
+          The location of the last WAL byte + 1 applied in the standby, in
+          XLogRecPtr format.
+      
+      
+      
+      
+      
+          Byte8
+      
+      
+      
+          The server's system clock at the time of transmission,
+          given in TimestampTz format.
+      
+      
+      
+      
+      
+      
+      
+      
+     
     
   
 
index 8e9dc7ba92a954345953971a0eee1cc87d584765..f5cb6576de4bd00ab4d87ed3b30fb3977d98c21a 100644 (file)
@@ -9317,6 +9317,25 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
    PG_RETURN_TEXT_P(cstring_to_text(location));
 }
 
+/*
+ * Get latest redo apply position.
+ *
+ * Exported to allow WALReceiver to read the pointer directly.
+ */
+XLogRecPtr
+GetXLogReplayRecPtr(void)
+{
+   /* use volatile pointer to prevent code rearrangement */
+   volatile XLogCtlData *xlogctl = XLogCtl;
+   XLogRecPtr  recptr;
+
+   SpinLockAcquire(&xlogctl->info_lck);
+   recptr = xlogctl->recoveryLastRecPtr;
+   SpinLockRelease(&xlogctl->info_lck);
+
+   return recptr;
+}
+
 /*
  * Report the last WAL replay location (same format as pg_start_backup etc)
  *
@@ -9326,14 +9345,10 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
 Datum
 pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
 {
-   /* use volatile pointer to prevent code rearrangement */
-   volatile XLogCtlData *xlogctl = XLogCtl;
    XLogRecPtr  recptr;
    char        location[MAXFNAMELEN];
 
-   SpinLockAcquire(&xlogctl->info_lck);
-   recptr = xlogctl->recoveryLastRecPtr;
-   SpinLockRelease(&xlogctl->info_lck);
+   recptr = GetXLogReplayRecPtr();
 
    if (recptr.xlogid == 0 && recptr.xrecoff == 0)
        PG_RETURN_NULL();
index e1d91afbd360579a96f46f2172f3043b1227e7e8..408e174658cd6303bbf29fe0d8cb31f7d5d8ea80 100644 (file)
@@ -509,7 +509,10 @@ CREATE VIEW pg_stat_replication AS
             S.client_port,
             S.backend_start,
             W.state,
-            W.sent_location
+            W.sent_location,
+            W.write_location,
+            W.flush_location,
+            W.apply_location
     FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
             pg_stat_get_wal_senders() AS W
     WHERE S.usesysid = U.oid AND
index 7005307dc250da66e88e03d7da05ece6734e0e48..30e35dbd28ad976f05e8a259961ecb4fd8a3cd4e 100644 (file)
@@ -54,6 +54,9 @@
 /* Global variable to indicate if this process is a walreceiver process */
 bool       am_walreceiver;
 
+/* GUC variable */
+int            wal_receiver_status_interval;
+
 /* libpqreceiver hooks to these when loaded */
 walrcv_connect_type walrcv_connect = NULL;
 walrcv_receive_type walrcv_receive = NULL;
@@ -88,6 +91,8 @@ static struct
    XLogRecPtr  Flush;          /* last byte + 1 flushed in the standby */
 }  LogstreamResult;
 
+static StandbyReplyMessage reply_message;
+
 /*
  * About SIGTERM handling:
  *
@@ -114,6 +119,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
+static void XLogWalRcvSendReply(void);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -306,12 +312,23 @@ WalReceiverMain(void)
            while (walrcv_receive(0, &type, &buf, &len))
                XLogWalRcvProcessMsg(type, buf, len);
 
+           /* Let the master know that we received some data. */
+           XLogWalRcvSendReply();
+
            /*
             * If we've written some records, flush them to disk and let the
             * startup process know about them.
             */
            XLogWalRcvFlush();
        }
+       else
+       {
+           /*
+            * We didn't receive anything new, but send a status update to
+            * the master anyway, to report any progress in applying WAL.
+            */
+           XLogWalRcvSendReply();
+       }
    }
 }
 
@@ -546,5 +563,60 @@ XLogWalRcvFlush(void)
                     LogstreamResult.Write.xrecoff);
            set_ps_display(activitymsg, false);
        }
+
+       /* Also let the master know that we made some progress */
+       XLogWalRcvSendReply();
    }
 }
+
+/*
+ * Send reply message to primary, indicating our current XLOG positions and
+ * the current time.
+ */
+static void
+XLogWalRcvSendReply(void)
+{
+   char        buf[sizeof(StandbyReplyMessage) + 1];
+   TimestampTz now;
+
+   /*
+    * If the user doesn't want status to be reported to the master, be sure
+    * to exit before doing anything at all.
+    */
+   if (wal_receiver_status_interval <= 0)
+       return;
+
+   /* Get current timestamp. */
+   now = GetCurrentTimestamp();
+
+   /*
+    * We can compare the write and flush positions to the last message we
+    * sent without taking any lock, but the apply position requires a spin
+    * lock, so we don't check that unless something else has changed or 10
+    * seconds have passed.  This means that the apply log position will
+    * appear, from the master's point of view, to lag slightly, but since
+    * this is only for reporting purposes and only on idle systems, that's
+    * probably OK.
+    */
+   if (XLByteEQ(reply_message.write, LogstreamResult.Write)
+       && XLByteEQ(reply_message.flush, LogstreamResult.Flush)
+       && !TimestampDifferenceExceeds(reply_message.sendTime, now,
+           wal_receiver_status_interval * 1000))
+       return;
+
+   /* Construct a new message. */
+   reply_message.write = LogstreamResult.Write;
+   reply_message.flush = LogstreamResult.Flush;
+   reply_message.apply = GetXLogReplayRecPtr();
+   reply_message.sendTime = now;
+
+   elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+                reply_message.write.xlogid, reply_message.write.xrecoff,
+                reply_message.flush.xlogid, reply_message.flush.xrecoff,
+                reply_message.apply.xlogid, reply_message.apply.xrecoff);
+
+   /* Prepend with the message type and send it. */
+   buf[0] = 'r';
+   memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
+   walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+}
index 78963c1e6be895d8c2985024afcf3ba562436f31..3ad95b495ec8dbe86d218959d8336b8576847d24 100644 (file)
@@ -39,6 +39,7 @@
 
 #include "funcapi.h"
 #include "access/xlog_internal.h"
+#include "access/transam.h"
 #include "catalog/pg_type.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -51,6 +52,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
+#include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -106,9 +108,10 @@ static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyReplyMessage(void);
+static void ProcessRepliesIfAny(void);
 
 
 /* Main entry point for walsender process */
@@ -442,7 +445,7 @@ HandleReplicationCommand(const char *cmd_string)
  * Check if the remote end has closed the connection.
  */
 static void
-CheckClosedConnection(void)
+ProcessRepliesIfAny(void)
 {
    unsigned char firstchar;
    int         r;
@@ -465,6 +468,13 @@ CheckClosedConnection(void)
    /* Handle the very limited subset of commands expected in this phase */
    switch (firstchar)
    {
+           /*
+            * 'd' means a standby reply wrapped in a COPY BOTH packet.
+            */
+       case 'd':
+           ProcessStandbyReplyMessage();
+           break;
+
            /*
             * 'X' means that the standby is closing down the socket.
             */
@@ -479,6 +489,62 @@ CheckClosedConnection(void)
    }
 }
 
+/*
+ * Process a status update message received from standby.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+   static StringInfoData input_message;
+   StandbyReplyMessage reply;
+   char msgtype;
+
+   initStringInfo(&input_message);
+
+   /*
+    * Read the message contents.
+    */
+   if (pq_getmessage(&input_message, 0))
+   {
+       ereport(COMMERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg("unexpected EOF on standby connection")));
+       proc_exit(0);
+   }
+
+   /*
+    * Check message type from the first byte. At the moment, there is only
+    * one type.
+    */
+   msgtype = pq_getmsgbyte(&input_message);
+   if (msgtype != 'r')
+       ereport(COMMERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg("unexpected message type %c", msgtype)));
+
+   pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage));
+
+   elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
+        reply.write.xlogid, reply.write.xrecoff,
+        reply.flush.xlogid, reply.flush.xrecoff,
+        reply.apply.xlogid, reply.apply.xrecoff);
+
+   /*
+    * Update shared state for this WalSender process
+    * based on reply data from standby.
+    */
+   {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->write = reply.write;
+       walsnd->flush = reply.flush;
+       walsnd->apply = reply.apply;
+       SpinLockRelease(&walsnd->mutex);
+   }
+}
+
 /* Main loop of walsender process */
 static int
 WalSndLoop(void)
@@ -518,6 +584,7 @@ WalSndLoop(void)
        {
            if (!XLogSend(output_message, &caughtup))
                break;
+           ProcessRepliesIfAny();
            if (caughtup)
                walsender_shutdown_requested = true;
        }
@@ -561,9 +628,6 @@ WalSndLoop(void)
                WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
                                  WalSndDelay * 1000L);
            }
-
-           /* Check if the connection was closed */
-           CheckClosedConnection();
        }
        else
        {
@@ -574,6 +638,7 @@ WalSndLoop(void)
 
        /* Update our state to indicate if we're behind or not */
        WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
+       ProcessRepliesIfAny();
    }
 
    /*
@@ -1104,7 +1169,7 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   3
+#define PG_STAT_GET_WAL_SENDERS_COLS   6
    ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc           tupdesc;
    Tuplestorestate    *tupstore;
@@ -1141,8 +1206,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-       char        sent_location[MAXFNAMELEN];
+       char        location[MAXFNAMELEN];
        XLogRecPtr  sentPtr;
+       XLogRecPtr  write;
+       XLogRecPtr  flush;
+       XLogRecPtr  apply;
        WalSndState state;
        Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
        bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -1153,13 +1221,14 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        SpinLockAcquire(&walsnd->mutex);
        sentPtr = walsnd->sentPtr;
        state = walsnd->state;
+       write = walsnd->write;
+       flush = walsnd->flush;
+       apply = walsnd->apply;
        SpinLockRelease(&walsnd->mutex);
 
-       snprintf(sent_location, sizeof(sent_location), "%X/%X",
-                   sentPtr.xlogid, sentPtr.xrecoff);
-
        memset(nulls, 0, sizeof(nulls));
        values[0] = Int32GetDatum(walsnd->pid);
+
        if (!superuser())
        {
            /*
@@ -1168,11 +1237,35 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
             */
            nulls[1] = true;
            nulls[2] = true;
+           nulls[3] = true;
+           nulls[4] = true;
+           nulls[5] = true;
        }
        else
        {
            values[1] = CStringGetTextDatum(WalSndGetStateString(state));
-           values[2] = CStringGetTextDatum(sent_location);
+
+           snprintf(location, sizeof(location), "%X/%X",
+                    sentPtr.xlogid, sentPtr.xrecoff);
+           values[2] = CStringGetTextDatum(location);
+
+           if (write.xlogid == 0 && write.xrecoff == 0)
+               nulls[3] = true;
+           snprintf(location, sizeof(location), "%X/%X",
+                    write.xlogid, write.xrecoff);
+           values[3] = CStringGetTextDatum(location);
+
+           if (flush.xlogid == 0 && flush.xrecoff == 0)
+               nulls[4] = true;
+           snprintf(location, sizeof(location), "%X/%X",
+                   flush.xlogid, flush.xrecoff);
+           values[4] = CStringGetTextDatum(location);
+
+           if (apply.xlogid == 0 && apply.xrecoff == 0)
+               nulls[5] = true;
+           snprintf(location, sizeof(location), "%X/%X",
+                    apply.xlogid, apply.xrecoff);
+           values[5] = CStringGetTextDatum(location);
        }
 
        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
index 216236b52945eeaec805d689ba9fb284c1748cce..470183d4abad4ce8af8424c27380da6a01ee57c9 100644 (file)
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/standby.h"
@@ -1440,6 +1441,16 @@ static struct config_int ConfigureNamesInt[] =
        30 * 1000, -1, INT_MAX / 1000, NULL, NULL
    },
 
+   {
+       {"wal_receiver_status_interval", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+           gettext_noop("Sets the maximum interval between WAL receiver status reports to the master."),
+           NULL,
+           GUC_UNIT_S
+       },
+       &wal_receiver_status_interval,
+       10, 0, INT_MAX/1000, NULL, NULL
+   },
+
    {
        {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
            gettext_noop("Sets the maximum number of concurrent connections."),
index fe80c4dc23938ce7f96919eb6d12ee51e21f4e07..5d31365dab44001c81cb2db4b6cda8f207fcfc0d 100644 (file)
 #max_standby_streaming_delay = 30s # max delay before canceling queries
                    # when reading streaming WAL;
                    # -1 allows indefinite delay
+#wal_receiver_status_interval = 10s    # replies at least this often, 0 disables
 
 
 #------------------------------------------------------------------------------
index ff73272f755dd505bc119fb74e6474cb119767a6..1803d5ab2010962ea66411e62c74b694625dd327 100644 (file)
@@ -291,6 +291,7 @@ extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
 extern bool RecoveryInProgress(void);
 extern bool XLogInsertAllowed(void);
 extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
+extern XLogRecPtr GetXLogReplayRecPtr(void);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
index 01b002a1454b405f3861054e76a25a35e906db8c..cb275b8b5817a7515de1d5870170026e06a59917 100644 (file)
@@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 (  pg_stat_get_backend_idset       PGNSP PGUID 12 1 100 0 f f
 DESCR("statistics: currently active backend IDs");
 DATA(insert OID = 2022 (  pg_stat_get_activity         PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders  PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders  PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,apply_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 2026 (  pg_backend_pid               PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
 DESCR("statistics: current backend PID");
index 199385120a5e3d89b39ca058fcb5b278b4528c6d..32c49620c1da58a1ea2d191a3b19c821249e918d 100644 (file)
@@ -39,6 +39,27 @@ typedef struct
    TimestampTz sendTime;
 } WalDataMessageHeader;
 
+/*
+ * Reply message from standby (message type 'r').  This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef struct
+{
+   /*
+    * The xlog locations that have been written, flushed, and applied
+    * by standby-side. These may be invalid if the standby-side is unable
+    * to or chooses not to report these.
+    */
+   XLogRecPtr  write;
+   XLogRecPtr  flush;
+   XLogRecPtr  apply;
+
+   /* Sender's system clock at the time of transmission */
+   TimestampTz sendTime;
+} StandbyReplyMessage;
+
 /*
  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
  *
index 24ad43839f96eed7d2df934a6c4998adea2d3476..aa5bfb7aea1e82e88ea0f4cca2d64a52c025db0a 100644 (file)
@@ -17,6 +17,7 @@
 #include "pgtime.h"
 
 extern bool am_walreceiver;
+extern int wal_receiver_status_interval;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.
index 9a196ab1c8bb69d4a7013e416d7a23792a7bb954..5843307c9dc0f8f28b18cf45af8d0016ec722645 100644 (file)
@@ -35,7 +35,17 @@ typedef struct WalSnd
    WalSndState state;          /* this walsender's state */
    XLogRecPtr  sentPtr;        /* WAL has been sent up to this point */
 
-   slock_t     mutex;          /* locks shared variables shown above */
+   /*
+    * The xlog locations that have been written, flushed, and applied
+    * by standby-side. These may be invalid if the standby-side has not
+    * offered values yet.
+    */
+   XLogRecPtr  write;
+   XLogRecPtr  flush;
+   XLogRecPtr  apply;
+
+   /* Protects shared variables shown above. */
+   slock_t     mutex;
 
    /*
     * Latch used by backends to wake up this walsender when it has work
index b49467870efd86418adeac482e1c3e5efff6d8c4..c0142c25772a09e25617ca66a0acc4714fa1995b 100644 (file)
@@ -1297,7 +1297,7 @@ SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem
  pg_stat_bgwriter            | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
  pg_stat_database            | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
  pg_stat_database_conflicts  | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
- pg_stat_replication         | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_port, s.backend_start, w.state, w.sent_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
+ pg_stat_replication         | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.apply_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, apply_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
  pg_stat_sys_indexes         | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
  pg_stat_sys_tables          | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
  pg_stat_user_functions      | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));