Prefetch data referenced by the WAL, take II.
authorThomas Munro
Thu, 7 Apr 2022 07:28:40 +0000 (19:28 +1200)
committerThomas Munro
Thu, 7 Apr 2022 07:42:14 +0000 (19:42 +1200)
Introduce a new GUC recovery_prefetch.  When enabled, look ahead in the
WAL and try to initiate asynchronous reading of referenced data blocks
that are not yet cached in our buffer pool.  For now, this is done with
posix_fadvise(), which has several caveats.  Since not all OSes have
that system call, "try" is provided so that it can be enabled where
available.  Better mechanisms for asynchronous I/O are possible in later
work.

Set to "try" for now for test coverage.  Default setting to be finalized
before release.

The GUC wal_decode_buffer_size limits the distance we can look ahead in
bytes of decoded data.

The existing GUC maintenance_io_concurrency is used to limit the number
of concurrent I/Os allowed, based on pessimistic heuristics used to
infer that I/Os have begun and completed.  We'll also not look more than
maintenance_io_concurrency * 4 block references ahead.

Reviewed-by: Julien Rouhaud
Reviewed-by: Tomas Vondra
Reviewed-by: Alvaro Herrera (earlier version)
Reviewed-by: Andres Freund (earlier version)
Reviewed-by: Justin Pryzby (earlier version)
Tested-by: Tomas Vondra (earlier version)
Tested-by: Jakub Wartak (earlier version)
Tested-by: Dmitry Dolgov <[email protected]> (earlier version)
Tested-by: Sait Talha Nisanci (earlier version)
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com

27 files changed:
doc/src/sgml/config.sgml
doc/src/sgml/monitoring.sgml
doc/src/sgml/wal.sgml
src/backend/access/transam/Makefile
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogprefetcher.c [new file with mode: 0644]
src/backend/access/transam/xlogreader.c
src/backend/access/transam/xlogrecovery.c
src/backend/access/transam/xlogutils.c
src/backend/catalog/system_views.sql
src/backend/storage/buffer/bufmgr.c
src/backend/storage/freespace/freespace.c
src/backend/storage/ipc/ipci.c
src/backend/storage/smgr/md.c
src/backend/utils/adt/pgstatfuncs.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/access/xlog.h
src/include/access/xlogprefetcher.h [new file with mode: 0644]
src/include/access/xlogreader.h
src/include/access/xlogutils.h
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/utils/guc.h
src/include/utils/guc_tables.h
src/test/regress/expected/rules.out
src/tools/pgindent/typedefs.list

index 6901e71f9d3ef87ce0fa6fb5b5969f0c3e461022..ac533968a0ccf7e14ff3d2a82a3347902d48a008 100644 (file)
@@ -3657,6 +3657,70 @@ include_dir 'conf.d'
      
     
 
+   
+
+    Recovery
+
+     
+      configuration
+      of recovery
+      general settings
+     
+
+    
+     This section describes the settings that apply to recovery in general,
+     affecting crash recovery, streaming replication and archive-based
+     replication.
+    
+
+
+    
+     
+      recovery_prefetch (enum)
+      
+       recovery_prefetch configuration parameter
+      
+      
+      
+       
+        Whether to try to prefetch blocks that are referenced in the WAL that
+        are not yet in the buffer pool, during recovery.  Valid values are
+        off (the default), on and
+        try.  The setting try enables
+        prefetching only if the operating system provides the
+        posix_fadvise function, which is currently used
+        to implement prefetching.  Note that some operating systems provide the
+        function, but it doesn't do anything.
+       
+       
+        Prefetching blocks that will soon be needed can reduce I/O wait times
+        during recovery with some workloads.
+        See also the  and
+         settings, which limit
+        prefetching activity.
+       
+      
+     
+
+     
+      wal_decode_buffer_size (integer)
+      
+       wal_decode_buffer_size configuration parameter
+      
+      
+      
+       
+        A limit on how far ahead the server can look in the WAL, to find
+        blocks to prefetch.  If this value is specified without units, it is
+        taken as bytes.
+        The default is 512kB.
+       
+      
+     
+
+    
+   
+
   
 
     Archive Recovery
index 24924647b5fc5c000502d8dbecf391ebd758b2e0..76766d28dd4f38f87783a7cdcc8c89a9c2e70046 100644 (file)
@@ -328,6 +328,13 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       
      
 
+     
+      pg_stat_recovery_prefetchpg_stat_recovery_prefetch
+      Only one row, showing statistics about blocks prefetched during recovery.
+       See  for details.
+      
+     
+
      
       pg_stat_subscriptionpg_stat_subscription
       At least one row per subscription, showing information about
@@ -2979,6 +2986,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    copy of the subscribed tables.
   
 
+  
+   <structname>pg_stat_recovery_prefetch</structname> View
+   
+    
+    
+      Column
+      Type
+      Description
+     
+    
+
+   
+    
+     prefetch
+     bigint
+     Number of blocks prefetched because they were not in the buffer pool
+    
+    
+     hit
+     bigint
+     Number of blocks not prefetched because they were already in the buffer pool
+    
+    
+     skip_init
+     bigint
+     Number of blocks not prefetched because they would be zero-initialized
+    
+    
+     skip_new
+     bigint
+     Number of blocks not prefetched because they didn't exist yet
+    
+    
+     skip_fpw
+     bigint
+     Number of blocks not prefetched because a full page image was included in the WAL
+    
+    
+     skip_rep
+     bigint
+     Number of blocks not prefetched because they were already recently prefetched
+    
+    
+     wal_distance
+     integer
+     How many bytes ahead the prefetcher is looking
+    
+    
+     block_distance
+     integer
+     How many blocks ahead the prefetcher is looking
+    
+    
+     io_depth
+     integer
+     How many prefetches have been initiated but are not yet known to have completed
+    
+    
+   
+  
+
+  
+   The pg_stat_recovery_prefetch view will contain
+   only one row.  It is filled with nulls if recovery has not run or
+    is not enabled.  The
+   columns wal_distance,
+   block_distance
+   and io_depth show current values, and the
+   other columns show cumulative counters that can be reset
+   with the pg_stat_reset_shared function.
+  
+
   
    <structname>pg_stat_subscription</structname> View
    
@@ -5199,8 +5278,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
         all the counters shown in
         the pg_stat_bgwriter
         view, archiver to reset all the counters shown in
-        the pg_stat_archiver view or wal
-        to reset all the counters shown in the pg_stat_wal view.
+        the pg_stat_archiver view,
+        wal to reset all the counters shown in the
+        pg_stat_wal view or
+        recovery_prefetch to reset all the counters shown
+        in the pg_stat_recovery_prefetch view.
        
        
         This function is restricted to superusers by default, but other users
index 2bb27a846828aaabcad660d752c1732f955fe9bf..6b3406b7de6204af155bb979f297ca8b4fcfeaa0 100644 (file)
    counted as wal_write and wal_sync
    in pg_stat_wal, respectively.
   
+
+  
+   The  parameter can be used to reduce
+   I/O wait times during recovery by instructing the kernel to initiate reads
+   of disk blocks that will soon be needed but are not currently in
+   PostgreSQL's buffer pool.
+   The  and
+    settings limit prefetching
+   concurrency and distance, respectively.  By default, it is set to
+   try, which enabled the feature on systems where
+   posix_fadvise is available.
+  
  
 
  
index 79314c69abc01e4b1dc0f9e1525dc38b183fc2ab..8c17c88dfc4806f38bcda7cdf2514342527e08d6 100644 (file)
@@ -31,6 +31,7 @@ OBJS = \
    xlogarchive.o \
    xlogfuncs.o \
    xloginsert.o \
+   xlogprefetcher.o \
    xlogreader.o \
    xlogrecovery.o \
    xlogutils.o
index c076e48445d054b716a50f3aeb4bd8e3056b93f5..6770c3ddbafb89ee9a1f328a85d6b3006a21d550 100644 (file)
@@ -59,6 +59,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xloginsert.h"
+#include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
@@ -133,6 +134,7 @@ int         CommitDelay = 0;    /* precommit delay in microseconds */
 int            CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int            wal_retrieve_retry_interval = 5000;
 int            max_slot_wal_keep_size_mb = -1;
+int            wal_decode_buffer_size = 512 * 1024;
 bool       track_wal_io_timing = false;
 
 #ifdef WAL_DEBUG
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
new file mode 100644 (file)
index 0000000..f342888
--- /dev/null
@@ -0,0 +1,1082 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetcher.c
+ *     Prefetching support for recovery.
+ *
+ * Portions Copyright (c) 2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *     src/backend/access/transam/xlogprefetcher.c
+ *
+ * This module provides a drop-in replacement for an XLogReader that tries to
+ * minimize I/O stalls by looking ahead in the WAL.  If blocks that will be
+ * accessed in the near future are not already in the buffer pool, it initiates
+ * I/Os that might complete before the caller eventually needs the data.  When
+ * referenced blocks are found in the buffer pool already, the buffer is
+ * recorded in the decoded record so that XLogReadBufferForRedo() can try to
+ * avoid a second buffer mapping table lookup.
+ *
+ * Currently, only the main fork is considered for prefetching.  Currently,
+ * prefetching is only effective on systems where BufferPrefetch() does
+ * something useful (mainly Linux).
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+#include "catalog/storage_xlog.h"
+#include "commands/dbcommands_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "utils/timestamp.h"
+#include "funcapi.h"
+#include "pgstat.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+
+/*
+ * Every time we process this much WAL, we'll update the values in
+ * pg_stat_recovery_prefetch.
+ */
+#define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
+
+/*
+ * To detect repeated access to the same block and skip useless extra system
+ * calls, we remember a small window of recently prefetched blocks.
+ */
+#define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
+
+/*
+ * When maintenance_io_concurrency is not saturated, we're prepared to look
+ * ahead up to N times that number of block references.
+ */
+#define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
+
+/* Define to log internal debugging messages. */
+/* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
+
+/* GUCs */
+int            recovery_prefetch = RECOVERY_PREFETCH_TRY;
+
+#ifdef USE_PREFETCH
+#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
+#else
+#define RecoveryPrefetchEnabled() false
+#endif
+
+static int XLogPrefetchReconfigureCount = 0;
+
+/*
+ * Enum used to report whether an IO should be started.
+ */
+typedef enum
+{
+   LRQ_NEXT_NO_IO,
+   LRQ_NEXT_IO,
+   LRQ_NEXT_AGAIN
+} LsnReadQueueNextStatus;
+
+/*
+ * Type of callback that can decide which block to prefetch next.  For now
+ * there is only one.
+ */
+typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
+                                                      XLogRecPtr *lsn);
+
+/*
+ * A simple circular queue of LSNs, using to control the number of
+ * (potentially) inflight IOs.  This stands in for a later more general IO
+ * control mechanism, which is why it has the apparently unnecessary
+ * indirection through a function pointer.
+ */
+typedef struct LsnReadQueue
+{
+   LsnReadQueueNextFun next;
+   uintptr_t   lrq_private;
+   uint32      max_inflight;
+   uint32      inflight;
+   uint32      completed;
+   uint32      head;
+   uint32      tail;
+   uint32      size;
+   struct
+   {
+       bool        io;
+       XLogRecPtr  lsn;
+   }           queue[FLEXIBLE_ARRAY_MEMBER];
+} LsnReadQueue;
+
+/*
+ * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
+ * blocks that will be soon be referenced, to try to avoid IO stalls.
+ */
+struct XLogPrefetcher
+{
+   /* WAL reader and current reading state. */
+   XLogReaderState *reader;
+   DecodedXLogRecord *record;
+   int         next_block_id;
+
+   /* When to publish stats. */
+   XLogRecPtr  next_stats_shm_lsn;
+
+   /* Book-keeping to avoid accessing blocks that don't exist yet. */
+   HTAB       *filter_table;
+   dlist_head  filter_queue;
+
+   /* Book-keeping to avoid repeat prefetches. */
+   RelFileNode recent_rnode[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
+   BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
+   int         recent_idx;
+
+   /* Book-keeping to disable prefetching temporarily. */
+   XLogRecPtr  no_readahead_until;
+
+   /* IO depth manager. */
+   LsnReadQueue *streaming_read;
+
+   XLogRecPtr  begin_ptr;
+
+   int         reconfigure_count;
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that (we assume) have already been dropped, or will be created by bulk WAL
+ * operators.
+ */
+typedef struct XLogPrefetcherFilter
+{
+   RelFileNode rnode;
+   XLogRecPtr  filter_until_replayed;
+   BlockNumber filter_from_block;
+   dlist_node  link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory for pg_stat_recovery_prefetch.
+ */
+typedef struct XLogPrefetchStats
+{
+   pg_atomic_uint64 reset_time;    /* Time of last reset. */
+   pg_atomic_uint64 prefetch;  /* Prefetches initiated. */
+   pg_atomic_uint64 hit;       /* Blocks already in cache. */
+   pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
+   pg_atomic_uint64 skip_new;  /* New/missing blocks filtered. */
+   pg_atomic_uint64 skip_fpw;  /* FPWs skipped. */
+   pg_atomic_uint64 skip_rep;  /* Repeat accesses skipped. */
+
+   /* Dynamic values */
+   int         wal_distance;   /* Number of WAL bytes ahead. */
+   int         block_distance; /* Number of block references ahead. */
+   int         io_depth;       /* Number of I/Os in progress. */
+} XLogPrefetchStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+                                          RelFileNode rnode,
+                                          BlockNumber blockno,
+                                          XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+                                           RelFileNode rnode,
+                                           BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+                                                XLogRecPtr replaying_lsn);
+static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
+                                                     XLogRecPtr *lsn);
+
+static XLogPrefetchStats *SharedStats;
+
+static inline LsnReadQueue *
+lrq_alloc(uint32 max_distance,
+         uint32 max_inflight,
+         uintptr_t lrq_private,
+         LsnReadQueueNextFun next)
+{
+   LsnReadQueue *lrq;
+   uint32      size;
+
+   Assert(max_distance >= max_inflight);
+
+   size = max_distance + 1;    /* full ring buffer has a gap */
+   lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
+   lrq->lrq_private = lrq_private;
+   lrq->max_inflight = max_inflight;
+   lrq->size = size;
+   lrq->next = next;
+   lrq->head = 0;
+   lrq->tail = 0;
+   lrq->inflight = 0;
+   lrq->completed = 0;
+
+   return lrq;
+}
+
+static inline void
+lrq_free(LsnReadQueue *lrq)
+{
+   pfree(lrq);
+}
+
+static inline uint32
+lrq_inflight(LsnReadQueue *lrq)
+{
+   return lrq->inflight;
+}
+
+static inline uint32
+lrq_completed(LsnReadQueue *lrq)
+{
+   return lrq->completed;
+}
+
+static inline void
+lrq_prefetch(LsnReadQueue *lrq)
+{
+   /* Try to start as many IOs as we can within our limits. */
+   while (lrq->inflight < lrq->max_inflight &&
+          lrq->inflight + lrq->completed < lrq->size - 1)
+   {
+       Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
+       switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
+       {
+           case LRQ_NEXT_AGAIN:
+               return;
+           case LRQ_NEXT_IO:
+               lrq->queue[lrq->head].io = true;
+               lrq->inflight++;
+               break;
+           case LRQ_NEXT_NO_IO:
+               lrq->queue[lrq->head].io = false;
+               lrq->completed++;
+               break;
+       }
+       lrq->head++;
+       if (lrq->head == lrq->size)
+           lrq->head = 0;
+   }
+}
+
+static inline void
+lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
+{
+   /*
+    * We know that LSNs before 'lsn' have been replayed, so we can now assume
+    * that any IOs that were started before then have finished.
+    */
+   while (lrq->tail != lrq->head &&
+          lrq->queue[lrq->tail].lsn < lsn)
+   {
+       if (lrq->queue[lrq->tail].io)
+           lrq->inflight--;
+       else
+           lrq->completed--;
+       lrq->tail++;
+       if (lrq->tail == lrq->size)
+           lrq->tail = 0;
+   }
+   if (RecoveryPrefetchEnabled())
+       lrq_prefetch(lrq);
+}
+
+size_t
+XLogPrefetchShmemSize(void)
+{
+   return sizeof(XLogPrefetchStats);
+}
+
+/*
+ * Reset all counters to zero.
+ */
+void
+XLogPrefetchResetStats(void)
+{
+   pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+   pg_atomic_write_u64(&SharedStats->prefetch, 0);
+   pg_atomic_write_u64(&SharedStats->hit, 0);
+   pg_atomic_write_u64(&SharedStats->skip_init, 0);
+   pg_atomic_write_u64(&SharedStats->skip_new, 0);
+   pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
+   pg_atomic_write_u64(&SharedStats->skip_rep, 0);
+}
+
+void
+XLogPrefetchShmemInit(void)
+{
+   bool        found;
+
+   SharedStats = (XLogPrefetchStats *)
+       ShmemInitStruct("XLogPrefetchStats",
+                       sizeof(XLogPrefetchStats),
+                       &found);
+
+   if (!found)
+   {
+       pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+       pg_atomic_init_u64(&SharedStats->prefetch, 0);
+       pg_atomic_init_u64(&SharedStats->hit, 0);
+       pg_atomic_init_u64(&SharedStats->skip_init, 0);
+       pg_atomic_init_u64(&SharedStats->skip_new, 0);
+       pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
+       pg_atomic_init_u64(&SharedStats->skip_rep, 0);
+   }
+}
+
+/*
+ * Called when any GUC is changed that affects prefetching.
+ */
+void
+XLogPrefetchReconfigure(void)
+{
+   XLogPrefetchReconfigureCount++;
+}
+
+/*
+ * Increment a counter in shared memory.  This is equivalent to *counter++ on a
+ * plain uint64 without any memory barrier or locking, except on platforms
+ * where readers can't read uint64 without possibly observing a torn value.
+ */
+static inline void
+XLogPrefetchIncrement(pg_atomic_uint64 *counter)
+{
+   Assert(AmStartupProcess() || !IsUnderPostmaster);
+   pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL records.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(XLogReaderState *reader)
+{
+   XLogPrefetcher *prefetcher;
+   static HASHCTL hash_table_ctl = {
+       .keysize = sizeof(RelFileNode),
+       .entrysize = sizeof(XLogPrefetcherFilter)
+   };
+
+   prefetcher = palloc0(sizeof(XLogPrefetcher));
+
+   prefetcher->reader = reader;
+   prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
+                                          &hash_table_ctl,
+                                          HASH_ELEM | HASH_BLOBS);
+   dlist_init(&prefetcher->filter_queue);
+
+   SharedStats->wal_distance = 0;
+   SharedStats->block_distance = 0;
+   SharedStats->io_depth = 0;
+
+   /* First usage will cause streaming_read to be allocated. */
+   prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
+
+   return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+   lrq_free(prefetcher->streaming_read);
+   hash_destroy(prefetcher->filter_table);
+   pfree(prefetcher);
+}
+
+/*
+ * Provide access to the reader.
+ */
+XLogReaderState *
+XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
+{
+   return prefetcher->reader;
+}
+
+/*
+ * Update the statistics visible in the pg_stat_recovery_prefetch view.
+ */
+void
+XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
+{
+   uint32      io_depth;
+   uint32      completed;
+   int64       wal_distance;
+
+
+   /* How far ahead of replay are we now? */
+   if (prefetcher->reader->decode_queue_tail)
+   {
+       wal_distance =
+           prefetcher->reader->decode_queue_tail->lsn -
+           prefetcher->reader->decode_queue_head->lsn;
+   }
+   else
+   {
+       wal_distance = 0;
+   }
+
+   /* How many IOs are currently in flight and completed? */
+   io_depth = lrq_inflight(prefetcher->streaming_read);
+   completed = lrq_completed(prefetcher->streaming_read);
+
+   /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
+   SharedStats->io_depth = io_depth;
+   SharedStats->block_distance = io_depth + completed;
+   SharedStats->wal_distance = wal_distance;
+
+   prefetcher->next_stats_shm_lsn =
+       prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
+}
+
+/*
+ * A callback that examines the next block reference in the WAL, and possibly
+ * starts an IO so that a later read will be fast.
+ *
+ * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
+ *
+ * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
+ * that isn't in the buffer pool, and the kernel has been asked to start
+ * reading it to make a future read system call faster. An LSN is written to
+ * *lsn, and the I/O will be considered to have completed once that LSN is
+ * replayed.
+ *
+ * Returns LRQ_NO_IO if we examined the next block reference and found that it
+ * was already in the buffer pool, or we decided for various reasons not to
+ * prefetch.
+ */
+static LsnReadQueueNextStatus
+XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
+{
+   XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
+   XLogReaderState *reader = prefetcher->reader;
+   XLogRecPtr  replaying_lsn = reader->ReadRecPtr;
+
+   /*
+    * We keep track of the record and block we're up to between calls with
+    * prefetcher->record and prefetcher->next_block_id.
+    */
+   for (;;)
+   {
+       DecodedXLogRecord *record;
+
+       /* Try to read a new future record, if we don't already have one. */
+       if (prefetcher->record == NULL)
+       {
+           bool        nonblocking;
+
+           /*
+            * If there are already records or an error queued up that could
+            * be replayed, we don't want to block here.  Otherwise, it's OK
+            * to block waiting for more data: presumably the caller has
+            * nothing else to do.
+            */
+           nonblocking = XLogReaderHasQueuedRecordOrError(reader);
+
+           /* Certain records act as barriers for all readahead. */
+           if (nonblocking && replaying_lsn < prefetcher->no_readahead_until)
+               return LRQ_NEXT_AGAIN;
+
+           record = XLogReadAhead(prefetcher->reader, nonblocking);
+           if (record == NULL)
+           {
+               /*
+                * We can't read any more, due to an error or lack of data in
+                * nonblocking mode.
+                */
+               return LRQ_NEXT_AGAIN;
+           }
+
+           /*
+            * If prefetching is disabled, we don't need to analyze the record
+            * or issue any prefetches.  We just need to cause one record to
+            * be decoded.
+            */
+           if (!RecoveryPrefetchEnabled())
+           {
+               *lsn = InvalidXLogRecPtr;
+               return LRQ_NEXT_NO_IO;
+           }
+
+           /* We have a new record to process. */
+           prefetcher->record = record;
+           prefetcher->next_block_id = 0;
+       }
+       else
+       {
+           /* Continue to process from last call, or last loop. */
+           record = prefetcher->record;
+       }
+
+       /*
+        * Check for operations that require us to filter out block ranges, or
+        * pause readahead completely.
+        */
+       if (replaying_lsn < record->lsn)
+       {
+           uint8       rmid = record->header.xl_rmid;
+           uint8       record_type = record->header.xl_info & ~XLR_INFO_MASK;
+
+           if (rmid == RM_XLOG_ID)
+           {
+               if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
+                   record_type == XLOG_END_OF_RECOVERY)
+               {
+                   /*
+                    * These records might change the TLI.  Avoid potential
+                    * bugs if we were to allow "read TLI" and "replay TLI" to
+                    * differ without more analysis.
+                    */
+                   prefetcher->no_readahead_until = record->lsn;
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+                   elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                        "suppressing all readahead until %X/%X is replayed due to possible TLI change",
+                        LSN_FORMAT_ARGS(record->lsn));
+#endif
+
+                   /* Fall through so we move past this record. */
+               }
+           }
+           else if (rmid == RM_DBASE_ID)
+           {
+               /*
+                * When databases are created with the file-copy strategy,
+                * there are no WAL records to tell us about the creation of
+                * individual relations.
+                */
+               if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
+               {
+                   xl_dbase_create_file_copy_rec *xlrec =
+                   (xl_dbase_create_file_copy_rec *) record->main_data;
+                   RelFileNode rnode = {InvalidOid, xlrec->db_id, InvalidOid};
+
+                   /*
+                    * Don't try to prefetch anything in this database until
+                    * it has been created, or we might confuse the blocks of
+                    * different generations, if a database OID or relfilenode
+                    * is reused.  It's also more efficient than discovering
+                    * that relations don't exist on disk yet with ENOENT
+                    * errors.
+                    */
+                   XLogPrefetcherAddFilter(prefetcher, rnode, 0, record->lsn);
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+                   elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                        "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
+                        rnode.dbNode,
+                        LSN_FORMAT_ARGS(record->lsn));
+#endif
+               }
+           }
+           else if (rmid == RM_SMGR_ID)
+           {
+               if (record_type == XLOG_SMGR_CREATE)
+               {
+                   xl_smgr_create *xlrec = (xl_smgr_create *)
+                   record->main_data;
+
+                   if (xlrec->forkNum == MAIN_FORKNUM)
+                   {
+                       /*
+                        * Don't prefetch anything for this whole relation
+                        * until it has been created.  Otherwise we might
+                        * confuse the blocks of different generations, if a
+                        * relfilenode is reused.  This also avoids the need
+                        * to discover the problem via extra syscalls that
+                        * report ENOENT.
+                        */
+                       XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0,
+                                               record->lsn);
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+                       elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                            "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
+                            xlrec->rnode.spcNode,
+                            xlrec->rnode.dbNode,
+                            xlrec->rnode.relNode,
+                            LSN_FORMAT_ARGS(record->lsn));
+#endif
+                   }
+               }
+               else if (record_type == XLOG_SMGR_TRUNCATE)
+               {
+                   xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
+                   record->main_data;
+
+                   /*
+                    * Don't consider prefetching anything in the truncated
+                    * range until the truncation has been performed.
+                    */
+                   XLogPrefetcherAddFilter(prefetcher, xlrec->rnode,
+                                           xlrec->blkno,
+                                           record->lsn);
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+                   elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                        "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
+                        xlrec->rnode.spcNode,
+                        xlrec->rnode.dbNode,
+                        xlrec->rnode.relNode,
+                        xlrec->blkno,
+                        LSN_FORMAT_ARGS(record->lsn));
+#endif
+               }
+           }
+       }
+
+       /* Scan the block references, starting where we left off last time. */
+       while (prefetcher->next_block_id <= record->max_block_id)
+       {
+           int         block_id = prefetcher->next_block_id++;
+           DecodedBkpBlock *block = &record->blocks[block_id];
+           SMgrRelation reln;
+           PrefetchBufferResult result;
+
+           if (!block->in_use)
+               continue;
+
+           Assert(!BufferIsValid(block->prefetch_buffer));;
+
+           /*
+            * Record the LSN of this record.  When it's replayed,
+            * LsnReadQueue will consider any IOs submitted for earlier LSNs
+            * to be finished.
+            */
+           *lsn = record->lsn;
+
+           /* We don't try to prefetch anything but the main fork for now. */
+           if (block->forknum != MAIN_FORKNUM)
+           {
+               return LRQ_NEXT_NO_IO;
+           }
+
+           /*
+            * If there is a full page image attached, we won't be reading the
+            * page, so don't bother trying to prefetch.
+            */
+           if (block->has_image)
+           {
+               XLogPrefetchIncrement(&SharedStats->skip_fpw);
+               return LRQ_NEXT_NO_IO;
+           }
+
+           /* There is no point in reading a page that will be zeroed. */
+           if (block->flags & BKPBLOCK_WILL_INIT)
+           {
+               XLogPrefetchIncrement(&SharedStats->skip_init);
+               return LRQ_NEXT_NO_IO;
+           }
+
+           /* Should we skip prefetching this block due to a filter? */
+           if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
+           {
+               XLogPrefetchIncrement(&SharedStats->skip_new);
+               return LRQ_NEXT_NO_IO;
+           }
+
+           /* There is no point in repeatedly prefetching the same block. */
+           for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
+           {
+               if (block->blkno == prefetcher->recent_block[i] &&
+                   RelFileNodeEquals(block->rnode, prefetcher->recent_rnode[i]))
+               {
+                   /*
+                    * XXX If we also remembered where it was, we could set
+                    * recent_buffer so that recovery could skip smgropen()
+                    * and a buffer table lookup.
+                    */
+                   XLogPrefetchIncrement(&SharedStats->skip_rep);
+                   return LRQ_NEXT_NO_IO;
+               }
+           }
+           prefetcher->recent_rnode[prefetcher->recent_idx] = block->rnode;
+           prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
+           prefetcher->recent_idx =
+               (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
+
+           /*
+            * We could try to have a fast path for repeated references to the
+            * same relation (with some scheme to handle invalidations
+            * safely), but for now we'll call smgropen() every time.
+            */
+           reln = smgropen(block->rnode, InvalidBackendId);
+
+           /*
+            * If the relation file doesn't exist on disk, for example because
+            * we're replaying after a crash and the file will be created and
+            * then unlinked by WAL that hasn't been replayed yet, suppress
+            * further prefetching in the relation until this record is
+            * replayed.
+            */
+           if (!smgrexists(reln, MAIN_FORKNUM))
+           {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+               elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                    "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
+                    reln->smgr_rnode.node.spcNode,
+                    reln->smgr_rnode.node.dbNode,
+                    reln->smgr_rnode.node.relNode,
+                    LSN_FORMAT_ARGS(record->lsn));
+#endif
+               XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+                                       record->lsn);
+               XLogPrefetchIncrement(&SharedStats->skip_new);
+               return LRQ_NEXT_NO_IO;
+           }
+
+           /*
+            * If the relation isn't big enough to contain the referenced
+            * block yet, suppress prefetching of this block and higher until
+            * this record is replayed.
+            */
+           if (block->blkno >= smgrnblocks(reln, block->forknum))
+           {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+               elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                    "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
+                    reln->smgr_rnode.node.spcNode,
+                    reln->smgr_rnode.node.dbNode,
+                    reln->smgr_rnode.node.relNode,
+                    block->blkno,
+                    LSN_FORMAT_ARGS(record->lsn));
+#endif
+               XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+                                       record->lsn);
+               XLogPrefetchIncrement(&SharedStats->skip_new);
+               return LRQ_NEXT_NO_IO;
+           }
+
+           /* Try to initiate prefetching. */
+           result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
+           if (BufferIsValid(result.recent_buffer))
+           {
+               /* Cache hit, nothing to do. */
+               XLogPrefetchIncrement(&SharedStats->hit);
+               block->prefetch_buffer = result.recent_buffer;
+               return LRQ_NEXT_NO_IO;
+           }
+           else if (result.initiated_io)
+           {
+               /* Cache miss, I/O (presumably) started. */
+               XLogPrefetchIncrement(&SharedStats->prefetch);
+               block->prefetch_buffer = InvalidBuffer;
+               return LRQ_NEXT_IO;
+           }
+           else
+           {
+               /*
+                * This shouldn't be possible, because we already determined
+                * that the relation exists on disk and is big enough.
+                * Something is wrong with the cache invalidation for
+                * smgrexists(), smgrnblocks(), or the file was unlinked or
+                * truncated beneath our feet?
+                */
+               elog(ERROR,
+                    "could not prefetch relation %u/%u/%u block %u",
+                    reln->smgr_rnode.node.spcNode,
+                    reln->smgr_rnode.node.dbNode,
+                    reln->smgr_rnode.node.relNode,
+                    block->blkno);
+           }
+       }
+
+       /*
+        * Several callsites need to be able to read exactly one record
+        * without any internal readahead.  Examples: xlog.c reading
+        * checkpoint records with emode set to PANIC, which might otherwise
+        * cause XLogPageRead() to panic on some future page, and xlog.c
+        * determining where to start writing WAL next, which depends on the
+        * contents of the reader's internal buffer after reading one record.
+        * Therefore, don't even think about prefetching until the first
+        * record after XLogPrefetcherBeginRead() has been consumed.
+        */
+       if (prefetcher->reader->decode_queue_tail &&
+           prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
+           return LRQ_NEXT_AGAIN;
+
+       /* Advance to the next record. */
+       prefetcher->record = NULL;
+   }
+   pg_unreachable();
+}
+
+/*
+ * Expose statistics about recovery prefetching.
+ */
+Datum
+pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
+   ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+   Datum       values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
+   bool        nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
+
+   SetSingleFuncCall(fcinfo, 0);
+
+   for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
+       nulls[i] = false;
+
+   values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
+   values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
+   values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
+   values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
+   values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
+   values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
+   values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
+   values[7] = Int32GetDatum(SharedStats->wal_distance);
+   values[8] = Int32GetDatum(SharedStats->block_distance);
+   values[9] = Int32GetDatum(SharedStats->io_depth);
+   tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+   return (Datum) 0;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+                       BlockNumber blockno, XLogRecPtr lsn)
+{
+   XLogPrefetcherFilter *filter;
+   bool        found;
+
+   filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+   if (!found)
+   {
+       /*
+        * Don't allow any prefetching of this block or higher until replayed.
+        */
+       filter->filter_until_replayed = lsn;
+       filter->filter_from_block = blockno;
+       dlist_push_head(&prefetcher->filter_queue, &filter->link);
+   }
+   else
+   {
+       /*
+        * We were already filtering this rnode.  Extend the filter's lifetime
+        * to cover this WAL record, but leave the lower of the block numbers
+        * there because we don't want to have to track individual blocks.
+        */
+       filter->filter_until_replayed = lsn;
+       dlist_delete(&filter->link);
+       dlist_push_head(&prefetcher->filter_queue, &filter->link);
+       filter->filter_from_block = Min(filter->filter_from_block, blockno);
+   }
+}
+
+/*
+ * Have we replayed any records that caused us to begin filtering a block
+ * range?  That means that relations should have been created, extended or
+ * dropped as required, so we can stop filtering out accesses to a given
+ * relfilenode.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+   while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+   {
+       XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+                                                         link,
+                                                         &prefetcher->filter_queue);
+
+       if (filter->filter_until_replayed >= replaying_lsn)
+           break;
+
+       dlist_delete(&filter->link);
+       hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+   }
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+                        BlockNumber blockno)
+{
+   /*
+    * Test for empty queue first, because we expect it to be empty most of
+    * the time and we can avoid the hash table lookup in that case.
+    */
+   if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+   {
+       XLogPrefetcherFilter *filter;
+
+       /* See if the block range is filtered. */
+       filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL);
+       if (filter && filter->filter_from_block <= blockno)
+       {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+           elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
+                rnode.spcNode, rnode.dbNode, rnode.relNode, blockno,
+                LSN_FORMAT_ARGS(filter->filter_until_replayed),
+                filter->filter_from_block);
+#endif
+           return true;
+       }
+
+       /* See if the whole database is filtered. */
+       rnode.relNode = InvalidOid;
+       rnode.spcNode = InvalidOid;
+       filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL);
+       if (filter)
+       {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+           elog(XLOGPREFETCHER_DEBUG_LEVEL,
+                "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
+                rnode.spcNode, rnode.dbNode, rnode.relNode, blockno,
+                LSN_FORMAT_ARGS(filter->filter_until_replayed));
+#endif
+           return true;
+       }
+   }
+
+   return false;
+}
+
+/*
+ * A wrapper for XLogBeginRead() that also resets the prefetcher.
+ */
+void
+XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
+{
+   /* This will forget about any in-flight IO. */
+   prefetcher->reconfigure_count--;
+
+   /* Book-keeping to avoid readahead on first read. */
+   prefetcher->begin_ptr = recPtr;
+
+   prefetcher->no_readahead_until = 0;
+
+   /* This will forget about any queued up records in the decoder. */
+   XLogBeginRead(prefetcher->reader, recPtr);
+}
+
+/*
+ * A wrapper for XLogReadRecord() that provides the same interface, but also
+ * tries to initiate I/O for blocks referenced in future WAL records.
+ */
+XLogRecord *
+XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
+{
+   DecodedXLogRecord *record;
+
+   /*
+    * See if it's time to reset the prefetching machinery, because a relevant
+    * GUC was changed.
+    */
+   if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
+   {
+       uint32      max_distance;
+       uint32      max_inflight;
+
+       if (prefetcher->streaming_read)
+           lrq_free(prefetcher->streaming_read);
+
+       if (RecoveryPrefetchEnabled())
+       {
+           max_inflight = Max(maintenance_io_concurrency, 2);
+           max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
+       }
+       else
+       {
+           max_inflight = 1;
+           max_distance = 1;
+       }
+
+       prefetcher->streaming_read = lrq_alloc(max_distance,
+                                              max_inflight,
+                                              (uintptr_t) prefetcher,
+                                              XLogPrefetcherNextBlock);
+
+       prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
+   }
+
+   /*
+    * Release last returned record, if there is one.  We need to do this so
+    * that we can check for empty decode queue accurately.
+    */
+   XLogReleasePreviousRecord(prefetcher->reader);
+
+   /* If there's nothing queued yet, then start prefetching. */
+   if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
+       lrq_prefetch(prefetcher->streaming_read);
+
+   /* Read the next record. */
+   record = XLogNextRecord(prefetcher->reader, errmsg);
+   if (!record)
+       return NULL;
+
+   /*
+    * The record we just got is the "current" one, for the benefit of the
+    * XLogRecXXX() macros.
+    */
+   Assert(record == prefetcher->reader->record);
+
+   /*
+    * Can we drop any prefetch filters yet, given the record we're about to
+    * return?  This assumes that any records with earlier LSNs have been
+    * replayed, so if we were waiting for a relation to be created or
+    * extended, it is now OK to access blocks in the covered range.
+    */
+   XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
+
+   /*
+    * See if it's time to compute some statistics, because enough WAL has
+    * been processed.
+    */
+   if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
+       XLogPrefetcherComputeStats(prefetcher);
+
+   /*
+    * The caller is about to replay this record, so we can now report that
+    * all IO initiated because of early WAL must be finished. This may
+    * trigger more readahead.
+    */
+   lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
+
+   Assert(record == prefetcher->reader->record);
+
+   return &record->header;
+}
+
+bool
+check_recovery_prefetch(int *new_value, void **extra, GucSource source)
+{
+#ifndef USE_PREFETCH
+   if (*new_value == RECOVERY_PREFETCH_ON)
+   {
+       GUC_check_errdetail("recovery_prefetch not supported on platforms that lack posix_fadvise().");
+       return false;
+   }
+#endif
+
+   return true;
+}
+
+void
+assign_recovery_prefetch(int new_value, void *extra)
+{
+   /* Reconfigure prefetching, because a setting it depends on changed. */
+   recovery_prefetch = new_value;
+   if (AmStartupProcess())
+       XLogPrefetchReconfigure();
+}
index e612aa933a579925041693cd08b34472f102f6ac..5862d9dc75f894779e9852b64c5fc1297047ef72 100644 (file)
@@ -1727,6 +1727,8 @@ DecodeXLogRecord(XLogReaderState *state,
            blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
            blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
 
+           blk->prefetch_buffer = InvalidBuffer;
+
            COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
            /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
            if (blk->has_data && blk->data_len == 0)
@@ -1925,14 +1927,29 @@ err:
 
 /*
  * Returns information about the block that a block reference refers to.
- *
- * If the WAL record contains a block reference with the given ID, *rnode,
- * *forknum, and *blknum are filled in (if not NULL), and returns true.
- * Otherwise returns false.
+ * See XLogRecGetBlockTagExtended().
  */
 bool
 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
                   RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
+{
+   return XLogRecGetBlockTagExtended(record, block_id, rnode, forknum, blknum,
+                                     NULL);
+}
+
+/*
+ * Returns information about the block that a block reference refers to,
+ * optionally including the buffer that the block may already be in.
+ *
+ * If the WAL record contains a block reference with the given ID, *rnode,
+ * *forknum, *blknum and *prefetch_buffer are filled in (if not NULL), and
+ * returns true.  Otherwise returns false.
+ */
+bool
+XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
+                          RelFileNode *rnode, ForkNumber *forknum,
+                          BlockNumber *blknum,
+                          Buffer *prefetch_buffer)
 {
    DecodedBkpBlock *bkpb;
 
@@ -1947,6 +1964,8 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
        *forknum = bkpb->forknum;
    if (blknum)
        *blknum = bkpb->blkno;
+   if (prefetch_buffer)
+       *prefetch_buffer = bkpb->prefetch_buffer;
    return true;
 }
 
index 79d38a837c4ecd6094a9f62fa644e31921ab86cd..54fd10475a709e9a48eed1872f0d33de6c044832 100644 (file)
@@ -36,6 +36,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
+#include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
@@ -183,6 +184,9 @@ static bool doRequestWalReceiverReply;
 /* XLogReader object used to parse the WAL records */
 static XLogReaderState *xlogreader = NULL;
 
+/* XLogPrefetcher object used to consume WAL records with read-ahead */
+static XLogPrefetcher *xlogprefetcher = NULL;
+
 /* Parameters passed down from ReadRecord to the XLogPageRead callback. */
 typedef struct XLogPageReadPrivate
 {
@@ -404,18 +408,21 @@ static void recoveryPausesHere(bool endOfRecovery);
 static bool recoveryApplyDelay(XLogReaderState *record);
 static void ConfirmRecoveryPaused(void);
 
-static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
-                             int emode, bool fetching_ckpt, TimeLineID replayTLI);
+static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
+                             int emode, bool fetching_ckpt,
+                             TimeLineID replayTLI);
 
 static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
                         int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
-static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
-                                       bool fetching_ckpt,
-                                       XLogRecPtr tliRecPtr,
-                                       TimeLineID replayTLI,
-                                       XLogRecPtr replayLSN);
+static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
+                                                     bool randAccess,
+                                                     bool fetching_ckpt,
+                                                     XLogRecPtr tliRecPtr,
+                                                     TimeLineID replayTLI,
+                                                     XLogRecPtr replayLSN,
+                                                     bool nonblocking);
 static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
-static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+static XLogRecord *ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr,
                                        int whichChkpt, bool report, TimeLineID replayTLI);
 static bool rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN);
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
@@ -561,6 +568,15 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
                 errdetail("Failed while allocating a WAL reading processor.")));
    xlogreader->system_identifier = ControlFile->system_identifier;
 
+   /*
+    * Set the WAL decode buffer size.  This limits how far ahead we can read
+    * in the WAL.
+    */
+   XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+   /* Create a WAL prefetcher. */
+   xlogprefetcher = XLogPrefetcherAllocate(xlogreader);
+
    /*
     * Allocate two page buffers dedicated to WAL consistency checks.  We do
     * it this way, rather than just making static arrays, for two reasons:
@@ -589,7 +605,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
         * When a backup_label file is present, we want to roll forward from
         * the checkpoint it identifies, rather than using pg_control.
         */
-       record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 0, true, CheckPointTLI);
+       record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 0, true,
+                                     CheckPointTLI);
        if (record != NULL)
        {
            memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
@@ -607,8 +624,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
             */
            if (checkPoint.redo < CheckPointLoc)
            {
-               XLogBeginRead(xlogreader, checkPoint.redo);
-               if (!ReadRecord(xlogreader, LOG, false,
+               XLogPrefetcherBeginRead(xlogprefetcher, checkPoint.redo);
+               if (!ReadRecord(xlogprefetcher, LOG, false,
                                checkPoint.ThisTimeLineID))
                    ereport(FATAL,
                            (errmsg("could not find redo location referenced by checkpoint record"),
@@ -727,7 +744,7 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
        CheckPointTLI = ControlFile->checkPointCopy.ThisTimeLineID;
        RedoStartLSN = ControlFile->checkPointCopy.redo;
        RedoStartTLI = ControlFile->checkPointCopy.ThisTimeLineID;
-       record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 1, true,
+       record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 1, true,
                                      CheckPointTLI);
        if (record != NULL)
        {
@@ -1413,8 +1430,8 @@ FinishWalRecovery(void)
        lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr;
        lastRecTLI = XLogRecoveryCtl->lastReplayedTLI;
    }
-   XLogBeginRead(xlogreader, lastRec);
-   (void) ReadRecord(xlogreader, PANIC, false, lastRecTLI);
+   XLogPrefetcherBeginRead(xlogprefetcher, lastRec);
+   (void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI);
    endOfLog = xlogreader->EndRecPtr;
 
    /*
@@ -1503,6 +1520,9 @@ ShutdownWalRecovery(void)
 {
    char        recoveryPath[MAXPGPATH];
 
+   /* Final update of pg_stat_recovery_prefetch. */
+   XLogPrefetcherComputeStats(xlogprefetcher);
+
    /* Shut down xlogreader */
    if (readFile >= 0)
    {
@@ -1510,6 +1530,7 @@ ShutdownWalRecovery(void)
        readFile = -1;
    }
    XLogReaderFree(xlogreader);
+   XLogPrefetcherFree(xlogprefetcher);
 
    if (ArchiveRecoveryRequested)
    {
@@ -1593,15 +1614,15 @@ PerformWalRecovery(void)
    {
        /* back up to find the record */
        replayTLI = RedoStartTLI;
-       XLogBeginRead(xlogreader, RedoStartLSN);
-       record = ReadRecord(xlogreader, PANIC, false, replayTLI);
+       XLogPrefetcherBeginRead(xlogprefetcher, RedoStartLSN);
+       record = ReadRecord(xlogprefetcher, PANIC, false, replayTLI);
    }
    else
    {
        /* just have to read next record after CheckPoint */
        Assert(xlogreader->ReadRecPtr == CheckPointLoc);
        replayTLI = CheckPointTLI;
-       record = ReadRecord(xlogreader, LOG, false, replayTLI);
+       record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
    }
 
    if (record != NULL)
@@ -1710,7 +1731,7 @@ PerformWalRecovery(void)
            }
 
            /* Else, try to fetch the next WAL record */
-           record = ReadRecord(xlogreader, LOG, false, replayTLI);
+           record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
        } while (record != NULL);
 
        /*
@@ -1921,6 +1942,9 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
         */
        if (AllowCascadeReplication())
            WalSndWakeup();
+
+       /* Reset the prefetcher. */
+       XLogPrefetchReconfigure();
    }
 }
 
@@ -2305,7 +2329,8 @@ verifyBackupPageConsistency(XLogReaderState *record)
         * temporary page.
         */
        buf = XLogReadBufferExtended(rnode, forknum, blkno,
-                                    RBM_NORMAL_NO_LOG);
+                                    RBM_NORMAL_NO_LOG,
+                                    InvalidBuffer);
        if (!BufferIsValid(buf))
            continue;
 
@@ -2917,17 +2942,18 @@ ConfirmRecoveryPaused(void)
  * Attempt to read the next XLOG record.
  *
  * Before first call, the reader needs to be positioned to the first record
- * by calling XLogBeginRead().
+ * by calling XLogPrefetcherBeginRead().
  *
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
  * (emode must be either PANIC, LOG). In standby mode, retries until a valid
  * record is available.
  */
 static XLogRecord *
-ReadRecord(XLogReaderState *xlogreader, int emode,
+ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
           bool fetching_ckpt, TimeLineID replayTLI)
 {
    XLogRecord *record;
+   XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
    XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 
    /* Pass through parameters to XLogPageRead */
@@ -2943,7 +2969,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
    {
        char       *errormsg;
 
-       record = XLogReadRecord(xlogreader, &errormsg);
+       record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg);
        if (record == NULL)
        {
            /*
@@ -3056,9 +3082,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 
 /*
  * Read the XLOG page containing RecPtr into readBuf (if not read already).
- * Returns number of bytes read, if the page is read successfully, or -1
- * in case of errors.  When errors occur, they are ereport'ed, but only
- * if they have not been previously reported.
+ * Returns number of bytes read, if the page is read successfully, or
+ * XLREAD_FAIL in case of errors.  When errors occur, they are ereport'ed, but
+ * only if they have not been previously reported.
+ *
+ * While prefetching, xlogreader->nonblocking may be set.  In that case,
+ * returns XLREAD_WOULDBLOCK if we'd otherwise have to wait for more WAL.
  *
  * This is responsible for restoring files from archive as needed, as well
  * as for waiting for the requested WAL record to arrive in standby mode.
@@ -3066,7 +3095,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
  * 'emode' specifies the log level used for reporting "file not found" or
  * "end of WAL" situations in archive recovery, or in standby mode when a
  * trigger file is found. If set to WARNING or below, XLogPageRead() returns
- * false in those situations, on higher log levels the ereport() won't
+ * XLREAD_FAIL in those situations, on higher log levels the ereport() won't
  * return.
  *
  * In standby mode, if after a successful return of XLogPageRead() the
@@ -3125,20 +3154,31 @@ retry:
        (readSource == XLOG_FROM_STREAM &&
         flushedUpto < targetPagePtr + reqLen))
    {
-       if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
-                                        private->randAccess,
-                                        private->fetching_ckpt,
-                                        targetRecPtr,
-                                        private->replayTLI,
-                                        xlogreader->EndRecPtr))
+       if (readFile >= 0 &&
+           xlogreader->nonblocking &&
+           readSource == XLOG_FROM_STREAM &&
+           flushedUpto < targetPagePtr + reqLen)
+           return XLREAD_WOULDBLOCK;
+
+       switch (WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
+                                           private->randAccess,
+                                           private->fetching_ckpt,
+                                           targetRecPtr,
+                                           private->replayTLI,
+                                           xlogreader->EndRecPtr,
+                                           xlogreader->nonblocking))
        {
-           if (readFile >= 0)
-               close(readFile);
-           readFile = -1;
-           readLen = 0;
-           readSource = XLOG_FROM_ANY;
-
-           return -1;
+           case XLREAD_WOULDBLOCK:
+               return XLREAD_WOULDBLOCK;
+           case XLREAD_FAIL:
+               if (readFile >= 0)
+                   close(readFile);
+               readFile = -1;
+               readLen = 0;
+               readSource = XLOG_FROM_ANY;
+               return XLREAD_FAIL;
+           case XLREAD_SUCCESS:
+               break;
        }
    }
 
@@ -3263,7 +3303,7 @@ next_record_is_invalid:
    if (StandbyMode)
        goto retry;
    else
-       return -1;
+       return XLREAD_FAIL;
 }
 
 /*
@@ -3292,14 +3332,18 @@ next_record_is_invalid:
  * available.
  *
  * When the requested record becomes available, the function opens the file
- * containing it (if not open already), and returns true. When end of standby
- * mode is triggered by the user, and there is no more WAL available, returns
- * false.
+ * containing it (if not open already), and returns XLREAD_SUCCESS. When end
+ * of standby mode is triggered by the user, and there is no more WAL
+ * available, returns XLREAD_FAIL.
+ *
+ * If nonblocking is true, then give up immediately if we can't satisfy the
+ * request, returning XLREAD_WOULDBLOCK instead of waiting.
  */
-static bool
+static XLogPageReadResult
 WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                            bool fetching_ckpt, XLogRecPtr tliRecPtr,
-                           TimeLineID replayTLI, XLogRecPtr replayLSN)
+                           TimeLineID replayTLI, XLogRecPtr replayLSN,
+                           bool nonblocking)
 {
    static TimestampTz last_fail_time = 0;
    TimestampTz now;
@@ -3353,6 +3397,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
         */
        if (lastSourceFailed)
        {
+           /*
+            * Don't allow any retry loops to occur during nonblocking
+            * readahead.  Let the caller process everything that has been
+            * decoded already first.
+            */
+           if (nonblocking)
+               return XLREAD_WOULDBLOCK;
+
            switch (currentSource)
            {
                case XLOG_FROM_ARCHIVE:
@@ -3367,7 +3419,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                    if (StandbyMode && CheckForStandbyTrigger())
                    {
                        XLogShutdownWalRcv();
-                       return false;
+                       return XLREAD_FAIL;
                    }
 
                    /*
@@ -3375,7 +3427,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                     * and pg_wal.
                     */
                    if (!StandbyMode)
-                       return false;
+                       return XLREAD_FAIL;
 
                    /*
                     * Move to XLOG_FROM_STREAM state, and set to start a
@@ -3519,7 +3571,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                              currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
                                              currentSource);
                if (readFile >= 0)
-                   return true;    /* success! */
+                   return XLREAD_SUCCESS;  /* success! */
 
                /*
                 * Nope, not found in archive or pg_wal.
@@ -3674,11 +3726,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                            /* just make sure source info is correct... */
                            readSource = XLOG_FROM_STREAM;
                            XLogReceiptSource = XLOG_FROM_STREAM;
-                           return true;
+                           return XLREAD_SUCCESS;
                        }
                        break;
                    }
 
+                   /* In nonblocking mode, return rather than sleeping. */
+                   if (nonblocking)
+                       return XLREAD_WOULDBLOCK;
+
                    /*
                     * Data not here yet. Check for trigger, then wait for
                     * walreceiver to wake us up when new WAL arrives.
@@ -3686,13 +3742,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                    if (CheckForStandbyTrigger())
                    {
                        /*
-                        * Note that we don't "return false" immediately here.
-                        * After being triggered, we still want to replay all
-                        * the WAL that was already streamed. It's in pg_wal
-                        * now, so we just treat this as a failure, and the
-                        * state machine will move on to replay the streamed
-                        * WAL from pg_wal, and then recheck the trigger and
-                        * exit replay.
+                        * Note that we don't return XLREAD_FAIL immediately
+                        * here. After being triggered, we still want to
+                        * replay all the WAL that was already streamed. It's
+                        * in pg_wal now, so we just treat this as a failure,
+                        * and the state machine will move on to replay the
+                        * streamed WAL from pg_wal, and then recheck the
+                        * trigger and exit replay.
                         */
                        lastSourceFailed = true;
                        break;
@@ -3711,6 +3767,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                        streaming_reply_sent = true;
                    }
 
+                   /* Update pg_stat_recovery_prefetch before sleeping. */
+                   XLogPrefetcherComputeStats(xlogprefetcher);
+
                    /*
                     * Wait for more WAL to arrive. Time out after 5 seconds
                     * to react to a trigger file promptly and to check if the
@@ -3743,7 +3802,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
        HandleStartupProcInterrupts();
    }
 
-   return false;               /* not reached */
+   return XLREAD_FAIL;             /* not reached */
 }
 
 
@@ -3788,7 +3847,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
  * 1 for "primary", 0 for "other" (backup_label)
  */
 static XLogRecord *
-ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr,
                     int whichChkpt, bool report, TimeLineID replayTLI)
 {
    XLogRecord *record;
@@ -3815,8 +3874,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
        return NULL;
    }
 
-   XLogBeginRead(xlogreader, RecPtr);
-   record = ReadRecord(xlogreader, LOG, true, replayTLI);
+   XLogPrefetcherBeginRead(xlogprefetcher, RecPtr);
+   record = ReadRecord(xlogprefetcher, LOG, true, replayTLI);
 
    if (record == NULL)
    {
index a4dedc58b71e0c6710a22c1005b80cb66b2671d4..bb2d3ec991c52041df56dcd162553d20608ff6f0 100644 (file)
@@ -22,6 +22,7 @@
 #include "access/timeline.h"
 #include "access/xlogrecovery.h"
 #include "access/xlog_internal.h"
+#include "access/xlogprefetcher.h"
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -355,11 +356,13 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
    RelFileNode rnode;
    ForkNumber  forknum;
    BlockNumber blkno;
+   Buffer      prefetch_buffer;
    Page        page;
    bool        zeromode;
    bool        willinit;
 
-   if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
+   if (!XLogRecGetBlockTagExtended(record, block_id, &rnode, &forknum, &blkno,
+                                   &prefetch_buffer))
    {
        /* Caller specified a bogus block_id */
        elog(PANIC, "failed to locate backup block with ID %d", block_id);
@@ -381,7 +384,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
    {
        Assert(XLogRecHasBlockImage(record, block_id));
        *buf = XLogReadBufferExtended(rnode, forknum, blkno,
-                                     get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
+                                     get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
+                                     prefetch_buffer);
        page = BufferGetPage(*buf);
        if (!RestoreBlockImage(record, block_id, page))
            elog(ERROR, "failed to restore block image");
@@ -410,7 +414,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
    }
    else
    {
-       *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode);
+       *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode, prefetch_buffer);
        if (BufferIsValid(*buf))
        {
            if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
@@ -450,6 +454,10 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
  * exist, and we don't check for all-zeroes.  Thus, no log entry is made
  * to imply that the page should be dropped or truncated later.
  *
+ * Optionally, recent_buffer can be used to provide a hint about the location
+ * of the page in the buffer pool; it does not have to be correct, but avoids
+ * a buffer mapping table probe if it is.
+ *
  * NB: A redo function should normally not call this directly. To get a page
  * to modify, use XLogReadBufferForRedoExtended instead. It is important that
  * all pages modified by a WAL record are registered in the WAL records, or
@@ -457,7 +465,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
  */
 Buffer
 XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
-                      BlockNumber blkno, ReadBufferMode mode)
+                      BlockNumber blkno, ReadBufferMode mode,
+                      Buffer recent_buffer)
 {
    BlockNumber lastblock;
    Buffer      buffer;
@@ -465,6 +474,15 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 
    Assert(blkno != P_NEW);
 
+   /* Do we have a clue where the buffer might be already? */
+   if (BufferIsValid(recent_buffer) &&
+       mode == RBM_NORMAL &&
+       ReadRecentBuffer(rnode, forknum, blkno, recent_buffer))
+   {
+       buffer = recent_buffer;
+       goto recent_buffer_fast_path;
+   }
+
    /* Open the relation at smgr level */
    smgr = smgropen(rnode, InvalidBackendId);
 
@@ -523,6 +541,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
        }
    }
 
+recent_buffer_fast_path:
    if (mode == RBM_NORMAL)
    {
        /* check that page has been initialized */
index e701d1c676a44fad7856693571b8503381e4bc72..b1a6df16ad33cf4893e1ee6f22844f65988d9909 100644 (file)
@@ -930,6 +930,20 @@ CREATE VIEW pg_stat_wal_receiver AS
     FROM pg_stat_get_wal_receiver() s
     WHERE s.pid IS NOT NULL;
 
+CREATE VIEW pg_stat_recovery_prefetch AS
+    SELECT
+            s.stats_reset,
+            s.prefetch,
+            s.hit,
+            s.skip_init,
+            s.skip_new,
+            s.skip_fpw,
+            s.skip_rep,
+            s.wal_distance,
+            s.block_distance,
+            s.io_depth
+     FROM pg_stat_get_recovery_prefetch() s;
+
 CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
index f80f90ac3cc14270edc9c98a7e79ff4477f09d77..93c1ea2d9f7b434d6fe90d18dbc10dfa5077a70f 100644 (file)
@@ -649,6 +649,8 @@ ReadRecentBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum,
                pg_atomic_write_u32(&bufHdr->state,
                                    buf_state + BUF_USAGECOUNT_ONE);
 
+           pgBufferUsage.local_blks_hit++;
+
            return true;
        }
    }
@@ -680,6 +682,8 @@ ReadRecentBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum,
            else
                PinBuffer_Locked(bufHdr);   /* pin for first time */
 
+           pgBufferUsage.shared_blks_hit++;
+
            return true;
        }
 
index 78c073b7c98eea940d5d7cd9877fc2099407f357..d41ae37090add6317ea637c1182c41ae9c7199fe 100644 (file)
@@ -211,7 +211,8 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
    blkno = fsm_logical_to_physical(addr);
 
    /* If the page doesn't exist already, extend */
-   buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
+   buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR,
+                                InvalidBuffer);
    LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
    page = BufferGetPage(buf);
index 88ff59c568fdc7d972bc26512591465dcaa15604..75e456360bef90c4cc210fc9495047c4cb2a884a 100644 (file)
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/syncscan.h"
 #include "access/twophase.h"
+#include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
 #include "miscadmin.h"
@@ -119,6 +120,7 @@ CalculateShmemSize(int *num_semaphores)
    size = add_size(size, LockShmemSize());
    size = add_size(size, PredicateLockShmemSize());
    size = add_size(size, ProcGlobalShmemSize());
+   size = add_size(size, XLogPrefetchShmemSize());
    size = add_size(size, XLOGShmemSize());
    size = add_size(size, XLogRecoveryShmemSize());
    size = add_size(size, CLOGShmemSize());
@@ -244,6 +246,7 @@ CreateSharedMemoryAndSemaphores(void)
     * Set up xlog, clog, and buffers
     */
    XLOGShmemInit();
+   XLogPrefetchShmemInit();
    XLogRecoveryShmemInit();
    CLOGShmemInit();
    CommitTsShmemInit();
index 879f647dbc22205e1ca46f9b36b40ead157e202e..286dd3f75518a1a7fcfba879b98f0e1cf8a5900c 100644 (file)
@@ -162,9 +162,11 @@ mdexists(SMgrRelation reln, ForkNumber forkNum)
 {
    /*
     * Close it first, to ensure that we notice if the fork has been unlinked
-    * since we opened it.
+    * since we opened it.  As an optimization, we can skip that in recovery,
+    * which already closes relations when dropping them.
     */
-   mdclose(reln, forkNum);
+   if (!InRecovery)
+       mdclose(reln, forkNum);
 
    return (mdopenfork(reln, forkNum, EXTENSION_RETURN_NULL) != NULL);
 }
index 2bf8ab8f9863c2723db8e2f3a578cefafa2dbfd4..d3ad795a6ea35630d196db01883c5c39bab916f1 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "access/htup_details.h"
 #include "access/xlog.h"
+#include "access/xlogprefetcher.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
 #include "common/ip.h"
@@ -2103,13 +2104,15 @@ pg_stat_reset_shared(PG_FUNCTION_ARGS)
        pgstat_reset_of_kind(PGSTAT_KIND_BGWRITER);
        pgstat_reset_of_kind(PGSTAT_KIND_CHECKPOINTER);
    }
+   else if (strcmp(target, "recovery_prefetch") == 0)
+       XLogPrefetchResetStats();
    else if (strcmp(target, "wal") == 0)
        pgstat_reset_of_kind(PGSTAT_KIND_WAL);
    else
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("unrecognized reset target: \"%s\"", target),
-                errhint("Target must be \"archiver\", \"bgwriter\", or \"wal\".")));
+                errhint("Target must be \"archiver\", \"bgwriter\", \"recovery_prefetch\", or \"wal\".")));
 
    PG_RETURN_VOID();
 }
index 89f8259bac53838cdf036640e0bdf3a4d8d7d914..22b5571a704d463af2e8ab357fc8f4c268b60c13 100644 (file)
@@ -41,6 +41,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
@@ -217,6 +218,7 @@ static bool check_effective_io_concurrency(int *newval, void **extra, GucSource
 static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source);
 static bool check_huge_page_size(int *newval, void **extra, GucSource source);
 static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source);
+static void assign_maintenance_io_concurrency(int newval, void *extra);
 static bool check_application_name(char **newval, void **extra, GucSource source);
 static void assign_application_name(const char *newval, void *extra);
 static bool check_cluster_name(char **newval, void **extra, GucSource source);
@@ -495,6 +497,19 @@ static const struct config_enum_entry huge_pages_options[] = {
    {NULL, 0, false}
 };
 
+static const struct config_enum_entry recovery_prefetch_options[] = {
+   {"off", RECOVERY_PREFETCH_OFF, false},
+   {"on", RECOVERY_PREFETCH_ON, false},
+   {"try", RECOVERY_PREFETCH_TRY, false},
+   {"true", RECOVERY_PREFETCH_ON, true},
+   {"false", RECOVERY_PREFETCH_OFF, true},
+   {"yes", RECOVERY_PREFETCH_ON, true},
+   {"no", RECOVERY_PREFETCH_OFF, true},
+   {"1", RECOVERY_PREFETCH_ON, true},
+   {"0", RECOVERY_PREFETCH_OFF, true},
+   {NULL, 0, false}
+};
+
 static const struct config_enum_entry force_parallel_mode_options[] = {
    {"off", FORCE_PARALLEL_OFF, false},
    {"on", FORCE_PARALLEL_ON, false},
@@ -785,6 +800,8 @@ const char *const config_group_names[] =
    gettext_noop("Write-Ahead Log / Checkpoints"),
    /* WAL_ARCHIVING */
    gettext_noop("Write-Ahead Log / Archiving"),
+   /* WAL_RECOVERY */
+   gettext_noop("Write-Ahead Log / Recovery"),
    /* WAL_ARCHIVE_RECOVERY */
    gettext_noop("Write-Ahead Log / Archive Recovery"),
    /* WAL_RECOVERY_TARGET */
@@ -2818,6 +2835,17 @@ static struct config_int ConfigureNamesInt[] =
        NULL, NULL, NULL
    },
 
+   {
+       {"wal_decode_buffer_size", PGC_POSTMASTER, WAL_RECOVERY,
+           gettext_noop("Maximum buffer size for reading ahead in the WAL during recovery."),
+           gettext_noop("This controls the maximum distance we can read ahead in the WAL to prefetch referenced blocks."),
+           GUC_UNIT_BYTE
+       },
+       &wal_decode_buffer_size,
+       512 * 1024, 64 * 1024, MaxAllocSize,
+       NULL, NULL, NULL
+   },
+
    {
        {"wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
            gettext_noop("Sets the size of WAL files held for standby servers."),
@@ -3141,7 +3169,8 @@ static struct config_int ConfigureNamesInt[] =
        0,
 #endif
        0, MAX_IO_CONCURRENCY,
-       check_maintenance_io_concurrency, NULL, NULL
+       check_maintenance_io_concurrency, assign_maintenance_io_concurrency,
+       NULL
    },
 
    {
@@ -5013,6 +5042,16 @@ static struct config_enum ConfigureNamesEnum[] =
        NULL, NULL, NULL
    },
 
+   {
+       {"recovery_prefetch", PGC_SIGHUP, WAL_RECOVERY,
+           gettext_noop("Prefetch referenced blocks during recovery"),
+           gettext_noop("Look ahead in the WAL to find references to uncached data.")
+       },
+       &recovery_prefetch,
+       RECOVERY_PREFETCH_TRY, recovery_prefetch_options,
+       check_recovery_prefetch, assign_recovery_prefetch, NULL
+   },
+
    {
        {"force_parallel_mode", PGC_USERSET, DEVELOPER_OPTIONS,
            gettext_noop("Forces use of parallel query facilities."),
@@ -12422,6 +12461,20 @@ check_client_connection_check_interval(int *newval, void **extra, GucSource sour
    return true;
 }
 
+static void
+assign_maintenance_io_concurrency(int newval, void *extra)
+{
+#ifdef USE_PREFETCH
+   /*
+    * Reconfigure recovery prefetching, because a setting it depends on
+    * changed.
+    */
+   maintenance_io_concurrency = newval;
+   if (AmStartupProcess())
+       XLogPrefetchReconfigure();
+#endif
+}
+
 static bool
 check_application_name(char **newval, void **extra, GucSource source)
 {
index e75b7d63ea3139000df88c048890298a16024418..94270eb0ecbec33aaf8ffb13e10dacfdc468177d 100644 (file)
 #max_wal_size = 1GB
 #min_wal_size = 80MB
 
+# - Prefetching during recovery -
+
+#recovery_prefetch = try       # prefetch pages referenced in the WAL?
+#wal_decode_buffer_size = 512kB        # lookahead window used for prefetching
+                   # (change requires restart)
+
 # - Archiving -
 
 #archive_mode = off        # enables archiving; off, on, or always
index b81917f243d00b1872a3c93a7d20b859a8c78736..e302bd102cd8162c846592939fd3c58910f8bef8 100644 (file)
@@ -50,6 +50,7 @@ extern bool *wal_consistency_checking;
 extern char *wal_consistency_checking_string;
 extern bool log_checkpoints;
 extern bool track_wal_io_timing;
+extern int wal_decode_buffer_size;
 
 extern int CheckPointSegments;
 
diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h
new file mode 100644 (file)
index 0000000..c30b09b
--- /dev/null
@@ -0,0 +1,53 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetcher.h
+ *     Declarations for the recovery prefetching module.
+ *
+ * Portions Copyright (c) 2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *     src/include/access/xlogprefetcher.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGPREFETCHER_H
+#define XLOGPREFETCHER_H
+
+#include "access/xlogdefs.h"
+
+/* GUCs */
+extern int recovery_prefetch;
+
+/* Possible values for recovery_prefetch */
+typedef enum
+{
+   RECOVERY_PREFETCH_OFF,
+   RECOVERY_PREFETCH_ON,
+   RECOVERY_PREFETCH_TRY
+}          RecoveryPrefetchValue;
+
+struct XLogPrefetcher;
+typedef struct XLogPrefetcher XLogPrefetcher;
+
+
+extern void XLogPrefetchReconfigure(void);
+
+extern size_t XLogPrefetchShmemSize(void);
+extern void XLogPrefetchShmemInit(void);
+
+extern void XLogPrefetchResetStats(void);
+
+extern XLogPrefetcher *XLogPrefetcherAllocate(XLogReaderState *reader);
+extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher);
+
+extern XLogReaderState *XLogPrefetcherGetReader(XLogPrefetcher *prefetcher);
+
+extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher,
+                                   XLogRecPtr recPtr);
+
+extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher,
+                                           char **errmsg);
+
+extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher);
+
+#endif
index f4388cc9be89ae42982798b196c1f90a2422e5fe..d8eb857611040de2f0f471ab14d9770dad2593ea 100644 (file)
@@ -39,6 +39,7 @@
 #endif
 
 #include "access/xlogrecord.h"
+#include "storage/buf.h"
 
 /* WALOpenSegment represents a WAL segment being read. */
 typedef struct WALOpenSegment
@@ -125,6 +126,9 @@ typedef struct
    ForkNumber  forknum;
    BlockNumber blkno;
 
+   /* Prefetching workspace. */
+   Buffer      prefetch_buffer;
+
    /* copy of the fork_flags field from the XLogRecordBlockHeader */
    uint8       flags;
 
@@ -430,5 +434,9 @@ extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *
 extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
                               RelFileNode *rnode, ForkNumber *forknum,
                               BlockNumber *blknum);
+extern bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
+                                      RelFileNode *rnode, ForkNumber *forknum,
+                                      BlockNumber *blknum,
+                                      Buffer *prefetch_buffer);
 
 #endif                         /* XLOGREADER_H */
index 64708949db957087230195f554861fc0876108da..ff40f96e423ed2d582cf462a2229b7656ac3ceb9 100644 (file)
@@ -84,7 +84,8 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record,
                                                    Buffer *buf);
 
 extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
-                                    BlockNumber blkno, ReadBufferMode mode);
+                                    BlockNumber blkno, ReadBufferMode mode,
+                                    Buffer recent_buffer);
 
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
index e133113543f2893dbc0bb2719eff8ed51f37cfce..67f3d8526cdd83669644d45e8398313c2a70e9e9 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202204073
+#define CATALOG_VERSION_NO 202204074
 
 #endif
index 776e31f3b58eb2c03460d4e1b9f7d38fb479f9d2..61876c4e8089a64831f604f53e8ce691ecd959bb 100644 (file)
   proargmodes => '{o,o,o,o,o,o,o,o,o}',
   proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
+{ oid => '9085', descr => 'statistics: information about WAL prefetching',
+  proname => 'pg_stat_get_recovery_prefetch', prorows => '1', provolatile => 'v',
+  proretset => 't', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int8,int4,int4,int4}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{stats_reset,prefetch,hit,skip_init,skip_new,skip_fpw,skip_rep,wal_distance,block_distance,io_depth}',
+  prosrc => 'pg_stat_get_recovery_prefetch' },
 
 { oid => '2306', descr => 'statistics: information about SLRU caches',
   proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
index 74018ea27bc8dad7b084736125247249b8b7bc50..1189e1a2263646328926c6e81c427e07a59ba4ff 100644 (file)
@@ -453,4 +453,8 @@ extern void assign_search_path(const char *newval, void *extra);
 extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
 
+/* in access/transam/xlogprefetcher.c */
+extern bool check_recovery_prefetch(int *new_value, void **extra, GucSource source);
+extern void assign_recovery_prefetch(int new_value, void *extra);
+
 #endif                         /* GUC_H */
index 1c5b3930a978784d800a5c1e1b155e3f91ccc433..63b56f18e0d5c13cb227cebf30f73331f2157582 100644 (file)
@@ -67,6 +67,7 @@ enum config_group
    WAL_SETTINGS,
    WAL_CHECKPOINTS,
    WAL_ARCHIVING,
+   WAL_RECOVERY,
    WAL_ARCHIVE_RECOVERY,
    WAL_RECOVERY_TARGET,
    REPLICATION_SENDING,
index 423b9b99fb6052bff5456e7bc46860d717514c1c..db652ea8d8931e0cdb7235daf0c821da457bb843 100644 (file)
@@ -2019,6 +2019,17 @@ pg_stat_progress_vacuum| SELECT s.pid,
     s.param7 AS num_dead_tuples
    FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_recovery_prefetch| SELECT s.stats_reset,
+    s.prefetch,
+    s.hit,
+    s.skip_init,
+    s.skip_new,
+    s.skip_fpw,
+    s.skip_rep,
+    s.wal_distance,
+    s.block_distance,
+    s.io_depth
+   FROM pg_stat_get_recovery_prefetch() s(stats_reset, prefetch, hit, skip_init, skip_new, skip_fpw, skip_rep, wal_distance, block_distance, io_depth);
 pg_stat_replication| SELECT s.pid,
     s.usesysid,
     u.rolname AS usename,
index 566ecbf091679b438709e468b192ab52b47ee23c..be3fafadf88ccd48f093d92668d66b20abfe38e5 100644 (file)
@@ -1421,6 +1421,9 @@ LogicalRepWorker
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
+LsnReadQueue
+LsnReadQueueNextFun
+LsnReadQueueNextStatus
 LtreeGistOptions
 LtreeSignature
 MAGIC
@@ -2949,6 +2952,9 @@ XLogPageHeaderData
 XLogPageReadCB
 XLogPageReadPrivate
 XLogPageReadResult
+XLogPrefetcher
+XLogPrefetcherFilter
+XLogPrefetchStats
 XLogReaderRoutine
 XLogReaderState
 XLogRecData