--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.c
+ * Prefetching support for recovery.
+ *
+ * Portions Copyright (c) 2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogprefetch.c
+ *
+ * The goal of this module is to read future WAL records and issue
+ * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O
+ * stalls in the main recovery loop.
+ *
+ * When examining a WAL record from the future, we need to consider that a
+ * referenced block or segment file might not exist on disk until this record
+ * or some earlier record has been replayed. After a crash, a file might also
+ * be missing because it was dropped by a later WAL record; in that case, it
+ * will be recreated when this record is replayed. These cases are handled by
+ * recognizing them and adding a "filter" that prevents all prefetching of a
+ * certain block range until the present WAL record has been replayed. Blocks
+ * skipped for these reasons are counted as "skip_new" (that is, cases where we
+ * didn't try to prefetch "new" blocks).
+ *
+ * Blocks found in the buffer pool already are counted as "skip_hit".
+ * Repeated access to the same buffer is detected and skipped, and this is
+ * counted with "skip_seq". Blocks that were logged with FPWs are skipped if
+ * recovery_prefetch_fpw is off, since on most systems there will be no I/O
+ * stall; this is counted with "skip_fpw".
+ *
+ * The only way we currently have to know that an I/O initiated with
+ * PrefetchSharedBuffer() has that recovery will eventually call ReadBuffer(),
+ * and perform a synchronous read. Therefore, we track the number of
+ * potentially in-flight I/Os by using a circular buffer of LSNs. When it's
+ * full, we have to wait for recovery to replay records so that the queue
+ * depth can be reduced, before we can do any more prefetching. Ideally, this
+ * keeps us the right distance ahead to respect maintenance_io_concurrency.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetch.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/storage_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "utils/timestamp.h"
+#include "funcapi.h"
+#include "pgstat.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+
+/*
+ * Sample the queue depth and distance every time we replay this much WAL.
+ * This is used to compute avg_queue_depth and avg_distance for the log
+ * message that appears at the end of crash recovery. It's also used to send
+ * messages periodically to the stats collector, to save the counters on disk.
+ */
+#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000
+
+/* GUCs */
+bool recovery_prefetch = false;
+bool recovery_prefetch_fpw = false;
+
+int XLogPrefetchReconfigureCount;
+
+/*
+ * A prefetcher object. There is at most one of these in existence at a time,
+ * recreated whenever there is a configuration change.
+ */
+struct XLogPrefetcher
+{
+ /* Reader and current reading state. */
+ XLogReaderState *reader;
+ DecodedXLogRecord *record;
+ int next_block_id;
+ bool shutdown;
+
+ /* Details of last prefetch to skip repeats and seq scans. */
+ SMgrRelation last_reln;
+ RelFileNode last_rnode;
+ BlockNumber last_blkno;
+
+ /* Online averages. */
+ uint64 samples;
+ double avg_queue_depth;
+ double avg_distance;
+ XLogRecPtr next_sample_lsn;
+
+ /* Book-keeping required to avoid accessing non-existing blocks. */
+ HTAB *filter_table;
+ dlist_head filter_queue;
+
+ /* Book-keeping required to limit concurrent prefetches. */
+ int prefetch_head;
+ int prefetch_tail;
+ int prefetch_queue_size;
+ XLogRecPtr prefetch_queue[MAX_IO_CONCURRENCY + 1];
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that we must assume have already been dropped.
+ */
+typedef struct XLogPrefetcherFilter
+{
+ RelFileNode rnode;
+ XLogRecPtr filter_until_replayed;
+ BlockNumber filter_from_block;
+ dlist_node link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory for pg_stat_prefetch_recovery.
+ */
+typedef struct XLogPrefetchStats
+{
+ pg_atomic_uint64 reset_time; /* Time of last reset. */
+ pg_atomic_uint64 prefetch; /* Prefetches initiated. */
+ pg_atomic_uint64 skip_hit; /* Blocks already buffered. */
+ pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
+ pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
+ pg_atomic_uint64 skip_seq; /* Repeat blocks skipped. */
+ float avg_distance;
+ float avg_queue_depth;
+
+ /* Reset counters */
+ pg_atomic_uint32 reset_request;
+ uint32 reset_handled;
+
+ /* Dynamic values */
+ int distance; /* Number of bytes ahead in the WAL. */
+ int queue_depth; /* Number of I/Os possibly in progress. */
+} XLogPrefetchStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno,
+ XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr prefetching_lsn);
+static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher);
+static bool XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher);
+static void XLogPrefetchSaveStats(void);
+static void XLogPrefetchRestoreStats(void);
+
+static XLogPrefetchStats *SharedStats;
+
+size_t
+XLogPrefetchShmemSize(void)
+{
+ return sizeof(XLogPrefetchStats);
+}
+
+static void
+XLogPrefetchResetStats(void)
+{
+ pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_write_u64(&SharedStats->prefetch, 0);
+ pg_atomic_write_u64(&SharedStats->skip_hit, 0);
+ pg_atomic_write_u64(&SharedStats->skip_new, 0);
+ pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_write_u64(&SharedStats->skip_seq, 0);
+ SharedStats->avg_distance = 0;
+ SharedStats->avg_queue_depth = 0;
+}
+
+void
+XLogPrefetchShmemInit(void)
+{
+ bool found;
+
+ SharedStats = (XLogPrefetchStats *)
+ ShmemInitStruct("XLogPrefetchStats",
+ sizeof(XLogPrefetchStats),
+ &found);
+
+ if (!found)
+ {
+ pg_atomic_init_u32(&SharedStats->reset_request, 0);
+ SharedStats->reset_handled = 0;
+
+ pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_init_u64(&SharedStats->prefetch, 0);
+ pg_atomic_init_u64(&SharedStats->skip_hit, 0);
+ pg_atomic_init_u64(&SharedStats->skip_new, 0);
+ pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_init_u64(&SharedStats->skip_seq, 0);
+ SharedStats->avg_distance = 0;
+ SharedStats->avg_queue_depth = 0;
+ SharedStats->distance = 0;
+ SharedStats->queue_depth = 0;
+ }
+}
+
+/*
+ * Called when any GUC is changed that affects prefetching.
+ */
+void
+XLogPrefetchReconfigure(void)
+{
+ XLogPrefetchReconfigureCount++;
+}
+
+/*
+ * Called by any backend to request that the stats be reset.
+ */
+void
+XLogPrefetchRequestResetStats(void)
+{
+ pg_atomic_fetch_add_u32(&SharedStats->reset_request, 1);
+}
+
+/*
+ * Tell the stats collector to serialize the shared memory counters into the
+ * stats file.
+ */
+static void
+XLogPrefetchSaveStats(void)
+{
+ PgStat_RecoveryPrefetchStats serialized = {
+ .prefetch = pg_atomic_read_u64(&SharedStats->prefetch),
+ .skip_hit = pg_atomic_read_u64(&SharedStats->skip_hit),
+ .skip_new = pg_atomic_read_u64(&SharedStats->skip_new),
+ .skip_fpw = pg_atomic_read_u64(&SharedStats->skip_fpw),
+ .skip_seq = pg_atomic_read_u64(&SharedStats->skip_seq),
+ .stat_reset_timestamp = pg_atomic_read_u64(&SharedStats->reset_time)
+ };
+
+ pgstat_send_recoveryprefetch(&serialized);
+}
+
+/*
+ * Try to restore the shared memory counters from the stats file.
+ */
+static void
+XLogPrefetchRestoreStats(void)
+{
+ PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch();
+
+ if (serialized->stat_reset_timestamp != 0)
+ {
+ pg_atomic_write_u64(&SharedStats->prefetch, serialized->prefetch);
+ pg_atomic_write_u64(&SharedStats->skip_hit, serialized->skip_hit);
+ pg_atomic_write_u64(&SharedStats->skip_new, serialized->skip_new);
+ pg_atomic_write_u64(&SharedStats->skip_fpw, serialized->skip_fpw);
+ pg_atomic_write_u64(&SharedStats->skip_seq, serialized->skip_seq);
+ pg_atomic_write_u64(&SharedStats->reset_time, serialized->stat_reset_timestamp);
+ }
+}
+
+/*
+ * Increment a counter in shared memory. This is equivalent to *counter++ on a
+ * plain uint64 without any memory barrier or locking, except on platforms
+ * where readers can't read uint64 without possibly observing a torn value.
+ */
+static inline void
+XLogPrefetchIncrement(pg_atomic_uint64 *counter)
+{
+ Assert(AmStartupProcess() || !IsUnderPostmaster);
+ pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
+}
+
+/*
+ * Initialize an XLogPrefetchState object and restore the last saved
+ * statistics from disk.
+ */
+void
+XLogPrefetchBegin(XLogPrefetchState *state, XLogReaderState *reader)
+{
+ XLogPrefetchRestoreStats();
+
+ /* We'll reconfigure on the first call to XLogPrefetch(). */
+ state->reader = reader;
+ state->prefetcher = NULL;
+ state->reconfigure_count = XLogPrefetchReconfigureCount - 1;
+}
+
+/*
+ * Shut down the prefetching infrastructure, if configured.
+ */
+void
+XLogPrefetchEnd(XLogPrefetchState *state)
+{
+ XLogPrefetchSaveStats();
+
+ if (state->prefetcher)
+ XLogPrefetcherFree(state->prefetcher);
+ state->prefetcher = NULL;
+
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL records.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(XLogReaderState *reader)
+{
+ XLogPrefetcher *prefetcher;
+ static HASHCTL hash_table_ctl = {
+ .keysize = sizeof(RelFileNode),
+ .entrysize = sizeof(XLogPrefetcherFilter)
+ };
+
+ /*
+ * The size of the queue is based on the maintenance_io_concurrency
+ * setting. In theory we might have a separate queue for each tablespace,
+ * but it's not clear how that should work, so for now we'll just use the
+ * general GUC to rate-limit all prefetching. The queue has space for up
+ * the highest possible value of the GUC + 1, because our circular buffer
+ * has a gap between head and tail when full.
+ */
+ prefetcher = palloc0(sizeof(XLogPrefetcher));
+ prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1;
+ prefetcher->reader = reader;
+ prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
+ &hash_table_ctl,
+ HASH_ELEM | HASH_BLOBS);
+ dlist_init(&prefetcher->filter_queue);
+
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+
+ return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+ /* Log final statistics. */
+ ereport(LOG,
+ (errmsg("recovery finished prefetching at %X/%X; "
+ "prefetch = " UINT64_FORMAT ", "
+ "skip_hit = " UINT64_FORMAT ", "
+ "skip_new = " UINT64_FORMAT ", "
+ "skip_fpw = " UINT64_FORMAT ", "
+ "skip_seq = " UINT64_FORMAT ", "
+ "avg_distance = %f, "
+ "avg_queue_depth = %f",
+ (uint32) (prefetcher->reader->EndRecPtr << 32),
+ (uint32) (prefetcher->reader->EndRecPtr),
+ pg_atomic_read_u64(&SharedStats->prefetch),
+ pg_atomic_read_u64(&SharedStats->skip_hit),
+ pg_atomic_read_u64(&SharedStats->skip_new),
+ pg_atomic_read_u64(&SharedStats->skip_fpw),
+ pg_atomic_read_u64(&SharedStats->skip_seq),
+ SharedStats->avg_distance,
+ SharedStats->avg_queue_depth)));
+ hash_destroy(prefetcher->filter_table);
+ pfree(prefetcher);
+}
+
+/*
+ * Called when recovery is replaying a new LSN, to check if we can read ahead.
+ */
+bool
+XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ uint32 reset_request;
+
+ /* If an error has occurred or we've hit the end of the WAL, do nothing. */
+ if (prefetcher->shutdown)
+ return false;
+
+ /*
+ * Have any in-flight prefetches definitely completed, judging by the LSN
+ * that is currently being replayed?
+ */
+ XLogPrefetcherCompletedIO(prefetcher, replaying_lsn);
+
+ /*
+ * Do we already have the maximum permitted number of I/Os running
+ * (according to the information we have)? If so, we have to wait for at
+ * least one to complete, so give up early and let recovery catch up.
+ */
+ if (XLogPrefetcherSaturated(prefetcher))
+ return false;
+
+ /*
+ * Can we drop any filters yet? This happens when the LSN that is
+ * currently being replayed has moved past a record that prevents
+ * prefetching of a block range, such as relation extension.
+ */
+ XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn);
+
+ /*
+ * Have we been asked to reset our stats counters? This is checked with
+ * an unsynchronized memory read, but we'll see it eventually and we'll be
+ * accessing that cache line anyway.
+ */
+ reset_request = pg_atomic_read_u32(&SharedStats->reset_request);
+ if (reset_request != SharedStats->reset_handled)
+ {
+ XLogPrefetchResetStats();
+ SharedStats->reset_handled = reset_request;
+
+ prefetcher->avg_distance = 0;
+ prefetcher->avg_queue_depth = 0;
+ prefetcher->samples = 0;
+ }
+
+ /* OK, we can now try reading ahead. */
+ return XLogPrefetcherScanRecords(prefetcher, replaying_lsn);
+}
+
+/*
+ * Read ahead as far as we are allowed to, considering the LSN that recovery
+ * is currently replaying.
+ *
+ * Return true if the xlogreader would like more data.
+ */
+static bool
+XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ XLogReaderState *reader = prefetcher->reader;
+ DecodedXLogRecord *record;
+
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+
+ for (;;)
+ {
+ char *error;
+ int64 distance;
+
+ /* If we don't already have a record, then try to read one. */
+ if (prefetcher->record == NULL)
+ {
+ switch (XLogReadAhead(reader, &record, &error))
+ {
+ case XLREAD_NEED_DATA:
+ return true;
+ case XLREAD_FAIL:
+ if (error)
+ ereport(LOG,
+ (errmsg("recovery no longer prefetching: %s",
+ error)));
+ else
+ ereport(LOG,
+ (errmsg("recovery no longer prefetching")));
+ prefetcher->shutdown = true;
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+
+ return false;
+ case XLREAD_FULL:
+ return false;
+ case XLREAD_SUCCESS:
+ prefetcher->record = record;
+ prefetcher->next_block_id = 0;
+ break;
+ }
+ }
+ else
+ {
+ /*
+ * We ran out of I/O queue while part way through a record. We'll
+ * carry on where we left off, according to next_block_id.
+ */
+ record = prefetcher->record;
+ }
+
+ /* How far ahead of replay are we now? */
+ distance = record->lsn - replaying_lsn;
+
+ /* Update distance shown in shm. */
+ SharedStats->distance = distance;
+
+ /* Periodically recompute some statistics. */
+ if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn))
+ {
+ /* Compute online averages. */
+ prefetcher->samples++;
+ if (prefetcher->samples == 1)
+ {
+ prefetcher->avg_distance = SharedStats->distance;
+ prefetcher->avg_queue_depth = SharedStats->queue_depth;
+ }
+ else
+ {
+ prefetcher->avg_distance +=
+ (SharedStats->distance - prefetcher->avg_distance) /
+ prefetcher->samples;
+ prefetcher->avg_queue_depth +=
+ (SharedStats->queue_depth - prefetcher->avg_queue_depth) /
+ prefetcher->samples;
+ }
+
+ /* Expose it in shared memory. */
+ SharedStats->avg_distance = prefetcher->avg_distance;
+ SharedStats->avg_queue_depth = prefetcher->avg_queue_depth;
+
+ /* Also periodically save the simple counters. */
+ XLogPrefetchSaveStats();
+
+ prefetcher->next_sample_lsn =
+ replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE;
+ }
+
+ /* Are we not far enough ahead? */
+ if (distance <= 0)
+ {
+ /* XXX Is this still possible? */
+ prefetcher->record = NULL; /* skip this record */
+ continue;
+ }
+
+ /*
+ * If this is a record that creates a new SMGR relation, we'll avoid
+ * prefetching anything from that rnode until it has been replayed.
+ */
+ if (replaying_lsn < record->lsn &&
+ record->header.xl_rmid == RM_SMGR_ID &&
+ (record->header.xl_info & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE)
+ {
+ xl_smgr_create *xlrec = (xl_smgr_create *) record->main_data;
+
+ XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, record->lsn);
+ }
+
+ /* Scan the record's block references. */
+ if (!XLogPrefetcherScanBlocks(prefetcher))
+ return false;
+
+ /* Advance to the next record. */
+ prefetcher->record = NULL;
+ }
+}
+
+/*
+ * Scan the current record for block references, and consider prefetching.
+ *
+ * Return true if we processed the current record to completion and still have
+ * queue space to process a new record, and false if we saturated the I/O
+ * queue and need to wait for recovery to advance before we continue.
+ */
+static bool
+XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher)
+{
+ DecodedXLogRecord *record = prefetcher->record;
+
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+
+ /*
+ * We might already have been partway through processing this record when
+ * our queue became saturated, so we need to start where we left off.
+ */
+ for (int block_id = prefetcher->next_block_id;
+ block_id <= record->max_block_id;
+ ++block_id)
+ {
+ DecodedBkpBlock *block = &record->blocks[block_id];
+ PrefetchBufferResult prefetch;
+ SMgrRelation reln;
+
+ /* Ignore everything but the main fork for now. */
+ if (block->forknum != MAIN_FORKNUM)
+ continue;
+
+ /*
+ * If there is a full page image attached, we won't be reading the
+ * page, so you might think we should skip it. However, if the
+ * underlying filesystem uses larger logical blocks than us, it might
+ * still need to perform a read-before-write some time later.
+ * Therefore, only prefetch if configured to do so.
+ */
+ if (block->has_image && !recovery_prefetch_fpw)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_fpw);
+ continue;
+ }
+
+ /*
+ * If this block will initialize a new page then it's probably a
+ * relation extension. Since that might create a new segment, we
+ * can't try to prefetch this block until the record has been
+ * replayed, or we might try to open a file that doesn't exist yet.
+ */
+ if (block->flags & BKPBLOCK_WILL_INIT)
+ {
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ continue;
+ }
+
+ /* Should we skip this block due to a filter? */
+ if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ continue;
+ }
+
+ /* Fast path for repeated references to the same relation. */
+ if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode))
+ {
+ /*
+ * If this is a repeat access to the same block, then skip it.
+ *
+ * XXX We could also check for last_blkno + 1 too, and also update
+ * last_blkno; it's not clear if the kernel would do a better job
+ * of sequential prefetching.
+ */
+ if (block->blkno == prefetcher->last_blkno)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_seq);
+ continue;
+ }
+
+ /* We can avoid calling smgropen(). */
+ reln = prefetcher->last_reln;
+ }
+ else
+ {
+ /* Otherwise we have to open it. */
+ reln = smgropen(block->rnode, InvalidBackendId);
+ prefetcher->last_rnode = block->rnode;
+ prefetcher->last_reln = reln;
+ }
+ prefetcher->last_blkno = block->blkno;
+
+ /* Try to prefetch this block! */
+ prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
+ if (BufferIsValid(prefetch.recent_buffer))
+ {
+ /*
+ * It was already cached, so do nothing. We'll remember the
+ * buffer, so that recovery can try to avoid looking it up again.
+ */
+ block->recent_buffer = prefetch.recent_buffer;
+ XLogPrefetchIncrement(&SharedStats->skip_hit);
+ }
+ else if (prefetch.initiated_io)
+ {
+ /*
+ * I/O has possibly been initiated (though we don't know if it was
+ * already cached by the kernel, so we just have to assume that it
+ * has due to lack of better information). Record this as an I/O
+ * in progress until eventually we replay this LSN.
+ */
+ XLogPrefetchIncrement(&SharedStats->prefetch);
+ XLogPrefetcherInitiatedIO(prefetcher, record->lsn);
+
+ /*
+ * If the queue is now full, we'll have to wait before processing
+ * any more blocks from this record, or move to a new record if
+ * that was the last block.
+ */
+ if (XLogPrefetcherSaturated(prefetcher))
+ {
+ prefetcher->next_block_id = block_id + 1;
+ return false;
+ }
+ }
+ else
+ {
+ /*
+ * Neither cached nor initiated. The underlying segment file
+ * doesn't exist. Presumably it will be unlinked by a later WAL
+ * record. When recovery reads this block, it will use the
+ * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to do
+ * that sort of thing while merely prefetching, so let's just
+ * ignore references to this relation until this record is
+ * replayed, and let recovery create the dummy file or complain if
+ * something is wrong.
+ */
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ }
+ }
+
+ return true;
+}
+
+/*
+ * Expose statistics about recovery prefetching.
+ */
+Datum
+pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+ bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mod required, but it is not allowed in this context")));
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (pg_atomic_read_u32(&SharedStats->reset_request) != SharedStats->reset_handled)
+ {
+ /* There's an unhandled reset request, so just show NULLs */
+ for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+ nulls[i] = true;
+ }
+ else
+ {
+ for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+ nulls[i] = false;
+ }
+
+ values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
+ values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
+ values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_hit));
+ values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
+ values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
+ values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_seq));
+ values[6] = Int32GetDatum(SharedStats->distance);
+ values[7] = Int32GetDatum(SharedStats->queue_depth);
+ values[8] = Float4GetDatum(SharedStats->avg_distance);
+ values[9] = Float4GetDatum(SharedStats->avg_queue_depth);
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
+
+/*
+ * Compute (n + 1) % prefetch_queue_size, assuming n < prefetch_queue_size,
+ * without using division.
+ */
+static inline int
+XLogPrefetcherNext(XLogPrefetcher *prefetcher, int n)
+{
+ int next = n + 1;
+
+ return next == prefetcher->prefetch_queue_size ? 0 : next;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno, XLogRecPtr lsn)
+{
+ XLogPrefetcherFilter *filter;
+ bool found;
+
+ filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+ if (!found)
+ {
+ /*
+ * Don't allow any prefetching of this block or higher until replayed.
+ */
+ filter->filter_until_replayed = lsn;
+ filter->filter_from_block = blockno;
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ }
+ else
+ {
+ /*
+ * We were already filtering this rnode. Extend the filter's lifetime
+ * to cover this WAL record, but leave the (presumably lower) block
+ * number there because we don't want to have to track individual
+ * blocks.
+ */
+ filter->filter_until_replayed = lsn;
+ dlist_delete(&filter->link);
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ }
+}
+
+/*
+ * Have we replayed the records that caused us to begin filtering a block
+ * range? That means that relations should have been created, extended or
+ * dropped as required, so we can drop relevant filters.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+ link,
+ &prefetcher->filter_queue);
+
+ if (filter->filter_until_replayed >= replaying_lsn)
+ break;
+ dlist_delete(&filter->link);
+ hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+ }
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno)
+{
+ /*
+ * Test for empty queue first, because we expect it to be empty most of
+ * the time and we can avoid the hash table lookup in that case.
+ */
+ if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode,
+ HASH_FIND, NULL);
+
+ if (filter && filter->filter_from_block <= blockno)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Insert an LSN into the queue. The queue must not be full already. This
+ * tracks the fact that we have (to the best of our knowledge) initiated an
+ * I/O, so that we can impose a cap on concurrent prefetching.
+ */
+static inline void
+XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr prefetching_lsn)
+{
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+ prefetcher->prefetch_queue[prefetcher->prefetch_head] = prefetching_lsn;
+ prefetcher->prefetch_head =
+ XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
+ SharedStats->queue_depth++;
+
+ Assert(SharedStats->queue_depth <= prefetcher->prefetch_queue_size);
+}
+
+/*
+ * Have we replayed the records that caused us to initiate the oldest
+ * prefetches yet? That means that they're definitely finished, so we can can
+ * forget about them and allow ourselves to initiate more prefetches. For now
+ * we don't have any awareness of when I/O really completes.
+ */
+static inline void
+XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ while (prefetcher->prefetch_head != prefetcher->prefetch_tail &&
+ prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn)
+ {
+ prefetcher->prefetch_tail =
+ XLogPrefetcherNext(prefetcher, prefetcher->prefetch_tail);
+ SharedStats->queue_depth--;
+
+ Assert(SharedStats->queue_depth >= 0);
+ }
+}
+
+/*
+ * Check if the maximum allowed number of I/Os is already in flight.
+ */
+static inline bool
+XLogPrefetcherSaturated(XLogPrefetcher *prefetcher)
+{
+ int next = XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
+
+ return next == prefetcher->prefetch_tail;
+}
+
+void
+assign_recovery_prefetch(bool new_value, void *extra)
+{
+ /* Reconfigure prefetching, because a setting it depends on changed. */
+ recovery_prefetch = new_value;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+}
+
+void
+assign_recovery_prefetch_fpw(bool new_value, void *extra)
+{
+ /* Reconfigure prefetching, because a setting it depends on changed. */
+ recovery_prefetch_fpw = new_value;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+}
#include "access/transam.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
+#include "access/xlogprefetch.h"
#include "catalog/partition.h"
#include "catalog/pg_database.h"
#include "catalog/pg_proc.h"
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
static PgStat_ReplSlotStats *replSlotStats;
static int nReplSlotStats;
+static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
+static void pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len);
static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
msg.m_resettarget = RESET_BGWRITER;
else if (strcmp(target, "wal") == 0)
msg.m_resettarget = RESET_WAL;
+ else if (strcmp(target, "prefetch_recovery") == 0)
+ {
+ /*
+ * We can't ask the stats collector to do this for us as it is not
+ * attached to shared memory.
+ */
+ XLogPrefetchRequestResetStats();
+ return;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized reset target: \"%s\"", target),
- errhint("Target must be \"archiver\", \"bgwriter\" or \"wal\".")));
+ errhint("Target must be \"archiver\", \"bgwriter\", \"wal\" or \"prefetch_recovery\".")));
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
pgstat_send(&msg, sizeof(msg));
return replSlotStats;
}
+/*
+ * ---------
+ * pgstat_fetch_recoveryprefetch() -
+ *
+ * Support function for restoring the counters managed by xlogprefetch.c.
+ * ---------
+ */
+PgStat_RecoveryPrefetchStats *
+pgstat_fetch_recoveryprefetch(void)
+{
+ backend_read_statsfile();
+
+ return &recoveryPrefetchStats;
+}
+
+
/*
* Shut down a single backend's statistics reporting at process exit.
*
}
+/* ----------
+ * pgstat_send_recoveryprefetch() -
+ *
+ * Send recovery prefetch statistics to the collector
+ * ----------
+ */
+void
+pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats)
+{
+ PgStat_MsgRecoveryPrefetch msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYPREFETCH);
+ msg.m_stats = *stats;
+ pgstat_send(&msg, sizeof(msg));
+}
+
+
/* ----------
* PgstatCollectorMain() -
*
pgstat_recv_slru(&msg.msg_slru, len);
break;
+ case PGSTAT_MTYPE_RECOVERYPREFETCH:
+ pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len);
+ break;
+
case PGSTAT_MTYPE_FUNCSTAT:
pgstat_recv_funcstat(&msg.msg_funcstat, len);
break;
rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
(void) rc; /* we'll check for error with ferror */
+ /*
+ * Write recovery prefetch stats struct
+ */
+ rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1,
+ fpout);
+ (void) rc; /* we'll check for error with ferror */
+
/*
* Walk through the database table.
*/
memset(&archiverStats, 0, sizeof(archiverStats));
memset(&walStats, 0, sizeof(walStats));
memset(&slruStats, 0, sizeof(slruStats));
+ memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
/*
* Set the current timestamp (will be kept only in case we can't load an
goto done;
}
+ /*
+ * Read recoveryPrefetchStats struct
+ */
+ if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats),
+ fpin) != sizeof(recoveryPrefetchStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"", statfile)));
+ memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
+ goto done;
+ }
+
/*
* We found an existing collector stats file. Read it and put all the
* hashtable entries into place.
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
PgStat_ReplSlotStats myReplSlotStats;
+ PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
return false;
}
+ /*
+ * Read recovery prefetch stats struct
+ */
+ if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats),
+ fpin) != sizeof(myRecoveryPrefetchStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"", statfile)));
+ FreeFile(fpin);
+ return false;
+ }
+
/* By default, we're going to return the timestamp of the global file. */
*ts = myGlobalStats.stats_timestamp;
if (ok && file_ts >= min_ts)
break;
+ /*
+ * If we're in crash recovery, the collector may not even be running,
+ * so work with what we have.
+ */
+ if (InRecovery)
+ break;
+
/* Not there or too old, so kick the collector and wait a bit */
if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
slruStats[msg->m_index].truncate += msg->m_truncate;
}
+/* ----------
+ * pgstat_recv_recoveryprefetch() -
+ *
+ * Process a recovery prefetch message.
+ * ----------
+ */
+static void
+pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len)
+{
+ recoveryPrefetchStats = msg->m_stats;
+}
+
/* ----------
* pgstat_recv_recoveryconflict() -
*