Advance old-segment horizon properly after slot invalidation
authorAlvaro Herrera
Fri, 16 Jul 2021 16:07:30 +0000 (12:07 -0400)
committerAlvaro Herrera
Fri, 16 Jul 2021 16:07:30 +0000 (12:07 -0400)
When some slots are invalidated due to the max_slot_wal_keep_size limit,
the old segment horizon should move forward to stay within the limit.
However, in commit c6550776394e we forgot to call KeepLogSeg again to
recompute the horizon after invalidating replication slots.  In cases
where other slots remained, the limits would be recomputed eventually
for other reasons, but if all slots were invalidated, the limits would
not move at all afterwards.  Repair.

Backpatch to 13 where the feature was introduced.

Author: Kyotaro Horiguchi 
Reported-by: Marcin Krupowicz
Discussion: https://postgr.es/m/17103-004130e8f27782c9@postgresql.org

src/backend/access/transam/xlog.c
src/backend/replication/slot.c
src/include/replication/slot.h
src/test/recovery/t/019_replslot_limit.pl

index 87fd095d0cff064e0d11a1abaca5cdc8f333a7b7..d2bcccaf0ae640188b5d1e138ecc666a9c3b86df 100644 (file)
@@ -9056,7 +9056,15 @@ CreateCheckPoint(int flags)
     */
    XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
    KeepLogSeg(recptr, &_logSegNo);
-   InvalidateObsoleteReplicationSlots(_logSegNo);
+   if (InvalidateObsoleteReplicationSlots(_logSegNo))
+   {
+       /*
+        * Some slots have been invalidated; recalculate the old-segment
+        * horizon, starting again from RedoRecPtr.
+        */
+       XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+       KeepLogSeg(recptr, &_logSegNo);
+   }
    _logSegNo--;
    RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
@@ -9391,7 +9399,15 @@ CreateRestartPoint(int flags)
    replayPtr = GetXLogReplayRecPtr(&replayTLI);
    endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
    KeepLogSeg(endptr, &_logSegNo);
-   InvalidateObsoleteReplicationSlots(_logSegNo);
+   if (InvalidateObsoleteReplicationSlots(_logSegNo))
+   {
+       /*
+        * Some slots have been invalidated; recalculate the old-segment
+        * horizon, starting again from RedoRecPtr.
+        */
+       XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+       KeepLogSeg(endptr, &_logSegNo);
+   }
    _logSegNo--;
 
    /*
@@ -9559,6 +9575,12 @@ GetWALAvailability(XLogRecPtr targetLSN)
  * requirement of replication slots.  For the latter criterion we do consider
  * the effects of max_slot_wal_keep_size: reserve at most that much space back
  * from recptr.
+ *
+ * Note about replication slots: if this function calculates a value
+ * that's further ahead than what slots need reserved, then affected
+ * slots need to be invalidated and this function invoked again.
+ * XXX it might be a good idea to rewrite this function so that
+ * invalidation is optionally done here, instead.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
index 1dc7ec64e4647adfd775f2ed7cbd767459f14657..02047ea92072d30bdb050165a139b7757d9fab39 100644 (file)
@@ -1130,11 +1130,14 @@ ReplicationSlotReserveWal(void)
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
  *
+ * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
+ *
  * This is inherently racy, because we release the LWLock
  * for syscalls, so caller must restart if we return true.
  */
 static bool
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
+InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
+                              bool *invalidated)
 {
    int         last_signaled_pid = 0;
    bool        released_lock = false;
@@ -1191,6 +1194,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
            s->active_pid = MyProcPid;
            s->data.invalidated_at = restart_lsn;
            s->data.restart_lsn = InvalidXLogRecPtr;
+
+           /* Let caller know */
+           *invalidated = true;
        }
 
        SpinLockRelease(&s->mutex);
@@ -1279,12 +1285,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
  *
+ * Returns true when any slot have got invalidated.
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
-void
+bool
 InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 {
    XLogRecPtr  oldestLSN;
+   bool        invalidated = false;
 
    XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
@@ -1297,13 +1306,24 @@ restart:
        if (!s->in_use)
            continue;
 
-       if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
+       if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
        {
            /* if the lock was released, start from scratch */
            goto restart;
        }
    }
    LWLockRelease(ReplicationSlotControlLock);
+
+   /*
+    * If any slots have been invalidated, recalculate the resource limits.
+    */
+   if (invalidated)
+   {
+       ReplicationSlotsComputeRequiredXmin(false);
+       ReplicationSlotsComputeRequiredLSN();
+   }
+
+   return invalidated;
 }
 
 /*
index 31362585ecb1e2d0c3eabf25f6043cceb534b6af..12c68ddbd2eff379d1735602a7b094c289503d24 100644 (file)
@@ -209,7 +209,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
index a7231dcd47af4027d8e96dcf6c8878b60edac61a..319a41b27ba3a3fbcf299754a37ad0f148029610 100644 (file)
@@ -8,7 +8,7 @@ use TestLib;
 use PostgresNode;
 
 use File::Path qw(rmtree);
-use Test::More tests => 14;
+use Test::More tests => 15;
 use Time::HiRes qw(usleep);
 
 $ENV{PGDATABASE} = 'postgres';
@@ -173,7 +173,12 @@ ok( !find_in_log(
 # Advance WAL again, the slot loses the oldest segment.
 my $logstart = get_log_size($node_master);
 advance_wal($node_master, 7);
-$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# This slot should be broken, wait for that to happen
+$node_master->poll_query_until(
+   'postgres',
+   qq[SELECT wal_status = 'lost' FROM pg_replication_slots
+   WHERE slot_name = 'rep1']);
 
 # WARNING should be issued
 ok( find_in_log(
@@ -182,13 +187,28 @@ ok( find_in_log(
        $logstart),
    'check that the warning is logged');
 
-# This slot should be broken
-$result = $node_master->safe_psql('postgres',
-   "SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size FROM pg_replication_slots WHERE slot_name = 'rep1'"
-);
+$result = $node_master->safe_psql(
+   'postgres',
+   qq[
+   SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size
+   FROM pg_replication_slots WHERE slot_name = 'rep1']);
 is($result, "rep1|f|t|lost|",
    'check that the slot became inactive and the state "lost" persists');
 
+# The invalidated slot shouldn't keep the old-segment horizon back;
+# see bug #17103: https://postgr.es/m/[email protected]
+# Test for this by creating a new slot and comparing its restart LSN
+# to the oldest existing file.
+my $redoseg = $node_master->safe_psql('postgres',
+   "SELECT pg_walfile_name(lsn) FROM pg_create_physical_replication_slot('s2', true)"
+);
+my $oldestseg = $node_master->safe_psql('postgres',
+   "SELECT pg_ls_dir AS f FROM pg_ls_dir('pg_wal') WHERE pg_ls_dir ~ '^[0-9A-F]{24}\$' ORDER BY 1 LIMIT 1"
+);
+$node_master->safe_psql('postgres',
+   qq[SELECT pg_drop_replication_slot('s2')]);
+is($oldestseg, $redoseg, "check that segments have been removed");
+
 # The standby no longer can connect to the master
 $logstart = get_log_size($node_standby);
 $node_standby->start;