Add confirmed_flush column to pg_replication_slots.
authorAndres Freund
Mon, 10 Aug 2015 11:28:18 +0000 (13:28 +0200)
committerAndres Freund
Mon, 10 Aug 2015 11:28:18 +0000 (13:28 +0200)
There's no reason not to expose both restart_lsn and confirmed_flush
since they have rather distinct meanings. The former is the oldest WAL
still required and valid for both physical and logical slots, whereas
the latter is the location up to which a logical slot's consumer has
confirmed receiving data. Most of the time a slot will require older
WAL (i.e. restart_lsn) than the confirmed
position (i.e. confirmed_flush_lsn).

Author: Marko Tiikkaja, editorialized by me
Discussion: 559D110B.1020109@joh.to

contrib/test_decoding/expected/ddl.out
doc/src/sgml/catalogs.sgml
doc/src/sgml/high-availability.sgml
doc/src/sgml/logicaldecoding.sgml
src/backend/catalog/system_views.sql
src/backend/replication/slotfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.h
src/test/regress/expected/rules.out

index 728798b3b7344d039c53480dc5a7c3d94b9a9ef7..11b2d9c07f46aee54060b061c27fe7cea351a789 100644 (file)
@@ -673,7 +673,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn 
------------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------
+ slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
+-----------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
 (0 rows)
 
index 12471d1c6e1c02662a8bd92de7fd3a5acff5ca8a..1e895e4999acfcc269474f5ec49325dc49e51e60 100644 (file)
       automatically removed during checkpoints.
       
      
+
+     
+      confirmed_flush
+      pg_lsn
+      
+      The address (LSN) up to which the logical
+      slot's consumer has confirmed receiving data. Data older than this is
+      not available anymore. NULL for physical slots.
+      
+     
+
     
    
   
index d2f7fec5234413b0bc1ca60b4ef5cdfc8d68ecf6..37aa0470ab8c4a4dc5b151f9b5c18c09c0af29e7 100644 (file)
@@ -934,9 +934,9 @@ postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
  node_a_slot |
 
 postgres=# SELECT * FROM pg_replication_slots;
-  slot_name  | slot_type | datoid | database | active | xmin | restart_lsn
--------------+-----------+--------+----------+--------+------+-------------
- node_a_slot | physical  |        |          | f      |      |
+  slot_name  | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn
+-------------+-----------+--------+----------+--------+------+-------------+---------------------
+ node_a_slot | physical  |        |          | f      |      |             |
 (1 row)
 
      To configure the standby to use this slot, primary_slot_name
index 5fa2f77ea893e8012a43c4456e6efdfeb7b07cfc..4f57765e918adddd30ecc2fdb9120d8d4ac109b4 100644 (file)
@@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', '
  regression_slot | 0/16B1970
 (1 row)
 
-postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;
-    slot_name    |    plugin     | slot_type | database | active | restart_lsn
------------------+---------------+-----------+----------+--------+-------------
- regression_slot | test_decoding | logical   | postgres | f      | 0/16A4408
+postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
+    slot_name    |    plugin     | slot_type | database | active | restart_lsn | confirmed_flush_lsn
+-----------------+---------------+-----------+----------+--------+-------------+-----------------
+ regression_slot | test_decoding | logical   | postgres | f      | 0/16A4408   | 0/16A4440
 (1 row)
 
 postgres=# -- There are no changes to see yet
index c0bd6fa96b750a07d5a2e970fc4dbb3f93243e85..3190c7f7e018965f6afb974547d99bc4b152bb14 100644 (file)
@@ -676,7 +676,8 @@ CREATE VIEW pg_replication_slots AS
             L.active_pid,
             L.xmin,
             L.catalog_xmin,
-            L.restart_lsn
+            L.restart_lsn,
+            L.confirmed_flush_lsn
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
index 348c7fe9fce08023f97f41fe81f783d31f82efb7..ecfcb0754bda3e84bdc6b8553a70f0247e47bc23 100644 (file)
@@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 9
+#define PG_GET_REPLICATION_SLOTS_COLS 10
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc   tupdesc;
    Tuplestorestate *tupstore;
@@ -206,6 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
        TransactionId xmin;
        TransactionId catalog_xmin;
        XLogRecPtr  restart_lsn;
+       XLogRecPtr  confirmed_flush_lsn;
        pid_t       active_pid;
        Oid         database;
        NameData    slot_name;
@@ -224,6 +225,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
            catalog_xmin = slot->data.catalog_xmin;
            database = slot->data.database;
            restart_lsn = slot->data.restart_lsn;
+           confirmed_flush_lsn = slot->data.confirmed_flush;
            namecpy(&slot_name, &slot->data.name);
            namecpy(&plugin, &slot->data.plugin);
 
@@ -273,6 +275,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
        else
            nulls[i++] = true;
 
+       if (confirmed_flush_lsn != InvalidXLogRecPtr)
+           values[i++] = LSNGetDatum(confirmed_flush_lsn);
+       else
+           nulls[i++] = true;
+
        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    }
 
index 19c40e4978957c826bcc7354c86b87f92c5cf122..8cd677298795a89323e75e854868792069519d26 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 201508062
+#define CATALOG_VERSION_NO 201508101
 
 #endif
index c9fe0f877825274338c7b94327a2599658085bf2..51639624a9eb0d8b9558fa0a6dba54d3476ff057 100644 (file)
@@ -5197,7 +5197,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
index 6206c819cd872adf67db7133dd06ffb3db65641e..44c6740582aa4db335649ffd8ddd298f4b5d8021 100644 (file)
@@ -1416,8 +1416,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.active_pid,
     l.xmin,
     l.catalog_xmin,
-    l.restart_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn)
+    l.restart_lsn,
+    l.confirmed_flush_lsn
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,