Arrange to recycle old XLOG log segment files as new segment files,
authorTom Lane
Thu, 19 Jul 2001 02:12:35 +0000 (02:12 +0000)
committerTom Lane
Thu, 19 Jul 2001 02:12:35 +0000 (02:12 +0000)
rather than deleting them only to have to create more.  Steady state
is 2*CHECKPOINT_SEGMENTS + WAL_FILES + 1 segment files, which will
simply be renamed rather than constantly deleted and recreated.
To make this safe, added current XLOG file/offset number to page
header of XLOG pages, so that an un-overwritten page from an old
incarnation of a logfile can be reliably told from a valid page.
This change means that if you try to restart postmaster in a CVS-tip
database after installing the change, you'll get a complaint about
bad XLOG page magic number.  If you don't want to initdb, run
contrib/pg_resetxlog (and be sure you shut down the old postmaster
cleanly).

contrib/pg_resetxlog/pg_resetxlog.c
src/backend/access/transam/xlog.c
src/include/access/xlog.h

index f8c81b5c55dddcb1ea3983f35eb1cf574dc5f40d..6d32160905d4834c2a0f9e5f392495f6f1b1a4e3 100644 (file)
@@ -23,7 +23,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.5 2001/06/06 17:07:38 tgl Exp $
+ * $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.6 2001/07/19 02:12:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -857,6 +857,10 @@ WriteEmptyXLOG(void)
    page->xlp_magic = XLOG_PAGE_MAGIC;
    page->xlp_info = 0;
    page->xlp_sui = ControlFile.checkPointCopy.ThisStartUpID;
+   page->xlp_pageaddr.xlogid =
+       ControlFile.checkPointCopy.redo.xlogid;
+   page->xlp_pageaddr.xrecoff =
+       ControlFile.checkPointCopy.redo.xrecoff - SizeOfXLogPHD;
    record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
    record->xl_prev.xlogid = 0;
    record->xl_prev.xrecoff = 0;
index 94ba140b3ad3fa35aab037031e0231d0f2572961..3251fb2afdbe5713b5068cafaa77eeb28d5f9035 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.70 2001/06/21 19:45:45 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.71 2001/07/19 02:12:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 /* User-settable parameters */
 int            CheckPointSegments = 3;
 int            XLOGbuffers = 8;
-int            XLOGfiles = 0;      /* how many files to pre-allocate during
-                                * ckpt */
+int            XLOGfiles = 0;      /* # of files to preallocate during ckpt */
 int            XLOG_DEBUG = 0;
 char      *XLOG_sync_method = NULL;
 const char XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
 char       XLOG_archive_dir[MAXPGPATH];        /* null string means
                                                 * delete 'em */
 
+/*
+ * XLOGfileslop is used in the code as the allowed "fuzz" in the number of 
+ * preallocated XLOG segments --- we try to have at least XLOGfiles advance
+ * segments but no more than XLOGfiles+XLOGfileslop segments.  This could
+ * be made a separate GUC variable, but at present I think it's sufficient
+ * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
+ * checkpoint will free no more than 2*CheckPointSegments log segments, and
+ * we want to recycle all of them; the +1 allows boundary cases to happen
+ * without wasting a delete/create-segment cycle.
+ */
+
+#define XLOGfileslop   (2*CheckPointSegments + 1)
+
+
 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
 static int sync_method = DEFAULT_SYNC_METHOD;
 static int open_sync_bit = DEFAULT_SYNC_FLAGBIT;
 
-#define MinXLOGbuffers 4
-
 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
 
+#define MinXLOGbuffers 4
+
 
 /*
  * ThisStartUpID will be same in all backends --- it identifies current
@@ -405,9 +418,12 @@ static bool AdvanceXLInsertBuffer(void);
 static void XLogWrite(XLogwrtRqst WriteRqst);
 static int XLogFileInit(uint32 log, uint32 seg,
             bool *use_existent, bool use_lock);
+static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+                                  bool find_free, int max_advance,
+                                  bool use_lock);
 static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
 static void PreallocXlogFiles(XLogRecPtr endptr);
-static void MoveOfflineLogs(uint32 log, uint32 seg);
+static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
@@ -856,6 +872,8 @@ AdvanceXLInsertBuffer(void)
    bool        update_needed = true;
    XLogRecPtr  OldPageRqstPtr;
    XLogwrtRqst WriteRqst;
+   XLogRecPtr  NewPageEndPtr;
+   XLogPageHeader NewPage;
 
    /* Use Insert->LogwrtResult copy if it's more fresh */
    if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
@@ -930,32 +948,35 @@ AdvanceXLInsertBuffer(void)
     * Now the next buffer slot is free and we can set it up to be the
     * next output page.
     */
-   if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
+   NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+   if (NewPageEndPtr.xrecoff >= XLogFileSize)
    {
        /* crossing a logid boundary */
-       XLogCtl->xlblocks[nextidx].xlogid =
-           XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
-       XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
+       NewPageEndPtr.xlogid += 1;
+       NewPageEndPtr.xrecoff = BLCKSZ;
    }
    else
    {
-       XLogCtl->xlblocks[nextidx].xlogid =
-           XLogCtl->xlblocks[Insert->curridx].xlogid;
-       XLogCtl->xlblocks[nextidx].xrecoff =
-           XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
+       NewPageEndPtr.xrecoff += BLCKSZ;
    }
+   XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
+   NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
    Insert->curridx = nextidx;
-   Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
-   Insert->currpos = ((char *) Insert->currpage) + SizeOfXLogPHD;
+   Insert->currpage = NewPage;
+   Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
 
    /*
     * Be sure to re-zero the buffer so that bytes beyond what we've
     * written will look like zeroes and not valid XLOG records...
     */
-   MemSet((char *) Insert->currpage, 0, BLCKSZ);
-   Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
-   /* Insert->currpage->xlp_info = 0; *//* done by memset */
-   Insert->currpage->xlp_sui = ThisStartUpID;
+   MemSet((char *) NewPage, 0, BLCKSZ);
+
+   /* And fill the new page's header */
+   NewPage->xlp_magic = XLOG_PAGE_MAGIC;
+   /* NewPage->xlp_info = 0; */            /* done by memset */
+   NewPage->xlp_sui = ThisStartUpID;
+   NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+   NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
 
    return update_needed;
 }
@@ -1273,10 +1294,7 @@ XLogFileInit(uint32 log, uint32 seg,
 {
    char        path[MAXPGPATH];
    char        tmppath[MAXPGPATH];
-   char        targpath[MAXPGPATH];
    char        zbuffer[BLCKSZ];
-   uint32      targlog,
-               targseg;
    int         fd;
    int         nbytes;
 
@@ -1352,32 +1370,96 @@ XLogFileInit(uint32 log, uint32 seg,
    close(fd);
 
    /*
-    * Now move the segment into place with its final name.  We want to be
-    * sure that only one process does this at a time.
-    */
-   if (use_lock)
-       SpinAcquire(ControlFileLockId);
-
-   /*
+    * Now move the segment into place with its final name.
+    *
     * If caller didn't want to use a pre-existing file, get rid of any
     * pre-existing file.  Otherwise, cope with possibility that someone
     * else has created the file while we were filling ours: if so, use
     * ours to pre-create a future log segment.
     */
-   targlog = log;
-   targseg = seg;
-   strcpy(targpath, path);
+   if (!InstallXLogFileSegment(log, seg, tmppath,
+                               *use_existent, XLOGfiles + XLOGfileslop,
+                               use_lock))
+   {
+       /* No need for any more future segments... */
+       unlink(tmppath);
+   }
+
+   /* Set flag to tell caller there was no existent file */
+   *use_existent = false;
+
+   /* Now open original target segment (might not be file I just made) */
+   fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+                      S_IRUSR | S_IWUSR);
+   if (fd < 0)
+       elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
+            path, log, seg);
+
+   return (fd);
+}
+
+/*
+ * Install a new XLOG segment file as a current or future log segment.
+ *
+ * This is used both to install a newly-created segment (which has a temp
+ * filename while it's being created) and to recycle an old segment.
+ *
+ * log, seg: identify segment to install as (or first possible target).
+ *
+ * tmppath: initial name of file to install.  It will be renamed into place.
+ *
+ * find_free: if TRUE, install the new segment at the first empty log/seg
+ * number at or after the passed numbers.  If FALSE, install the new segment
+ * exactly where specified, deleting any existing segment file there.
+ *
+ * max_advance: maximum number of log/seg slots to advance past the starting
+ * point.  Fail if no free slot is found in this range.  (Irrelevant if
+ * find_free is FALSE.)
+ *
+ * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * place.  This should be TRUE except during bootstrap log creation.  The
+ * caller must *not* hold the spinlock at call.
+ *
+ * Returns TRUE if file installed, FALSE if not installed because of
+ * exceeding max_advance limit.  (Any other kind of failure causes elog().)
+ */
+static bool
+InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+                      bool find_free, int max_advance,
+                      bool use_lock)
+{
+   char        path[MAXPGPATH];
+   int         fd;
+
+   XLogFileName(path, log, seg);
+
+   /*
+    * We want to be sure that only one process does this at a time.
+    */
+   if (use_lock)
+       SpinAcquire(ControlFileLockId);
 
-   if (!*use_existent)
-       unlink(targpath);
+   if (!find_free)
+   {
+       /* Force installation: get rid of any pre-existing segment file */
+       unlink(path);
+   }
    else
    {
-       while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
+       /* Find a free slot to put it in */
+       while ((fd = BasicOpenFile(path, O_RDWR | PG_BINARY,
                                   S_IRUSR | S_IWUSR)) >= 0)
        {
            close(fd);
-           NextLogSeg(targlog, targseg);
-           XLogFileName(targpath, targlog, targseg);
+           if (--max_advance < 0)
+           {
+               /* Failed to find a free slot within specified range */
+               if (use_lock)
+                   SpinRelease(ControlFileLockId);
+               return false;
+           }
+           NextLogSeg(log, seg);
+           XLogFileName(path, log, seg);
        }
    }
 
@@ -1387,30 +1469,20 @@ XLogFileInit(uint32 log, uint32 seg,
     * rename() is an acceptable substitute except for the truly paranoid.
     */
 #ifndef __BEOS__
-   if (link(tmppath, targpath) < 0)
+   if (link(tmppath, path) < 0)
        elog(STOP, "link from %s to %s (initialization of log file %u, segment %u) failed: %m",
-            tmppath, targpath, targlog, targseg);
+            tmppath, path, log, seg);
    unlink(tmppath);
 #else
-   if (rename(tmppath, targpath) < 0)
+   if (rename(tmppath, path) < 0)
        elog(STOP, "rename from %s to %s (initialization of log file %u, segment %u) failed: %m",
-            tmppath, targpath targlog, targseg);
+            tmppath, path, log, seg);
 #endif
 
    if (use_lock)
        SpinRelease(ControlFileLockId);
 
-   /* Set flag to tell caller there was no existent file */
-   *use_existent = false;
-
-   /* Now open original target segment (might not be file I just made) */
-   fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
-                      S_IRUSR | S_IWUSR);
-   if (fd < 0)
-       elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
-            path, log, seg);
-
-   return (fd);
+   return true;
 }
 
 /*
@@ -1477,20 +1549,26 @@ PreallocXlogFiles(XLogRecPtr endptr)
 
 /*
  * Remove or move offline all log files older or equal to passed log/seg#
+ *
+ * endptr is current (or recent) end of xlog; this is used to determine
+ * whether we want to recycle rather than delete no-longer-wanted log files.
  */
 static void
-MoveOfflineLogs(uint32 log, uint32 seg)
+MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
 {
+   uint32      endlogId;
+   uint32      endlogSeg;
    DIR        *xldir;
    struct dirent *xlde;
    char        lastoff[32];
    char        path[MAXPGPATH];
 
-   Assert(XLOG_archive_dir[0] == 0);   /* not implemented yet */
+   XLByteToPrevSeg(endptr, endlogId, endlogSeg);
 
    xldir = opendir(XLogDir);
    if (xldir == NULL)
-       elog(STOP, "could not open transaction log directory (%s): %m", XLogDir);
+       elog(STOP, "could not open transaction log directory (%s): %m",
+            XLogDir);
 
    sprintf(lastoff, "%08X%08X", log, seg);
 
@@ -1501,19 +1579,42 @@ MoveOfflineLogs(uint32 log, uint32 seg)
            strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
            strcmp(xlde->d_name, lastoff) <= 0)
        {
+           sprintf(path, "%s/%s", XLogDir, xlde->d_name);
            if (XLOG_archive_dir[0])
-               elog(LOG, "archiving transaction log file %s", xlde->d_name);
+           {
+               elog(LOG, "archiving transaction log file %s",
+                    xlde->d_name);
+               elog(NOTICE, "archiving log files is not implemented!");
+           }
            else
-               elog(LOG, "removing transaction log file %s", xlde->d_name);
-
-           sprintf(path, "%s/%s", XLogDir, xlde->d_name);
-           if (XLOG_archive_dir[0] == 0)
-               unlink(path);
+           {
+               /*
+                * Before deleting the file, see if it can be recycled as
+                * a future log segment.  We allow recycling segments up to
+                * XLOGfiles + XLOGfileslop segments beyond the current
+                * XLOG location.
+                */
+               if (InstallXLogFileSegment(endlogId, endlogSeg, path,
+                                          true, XLOGfiles + XLOGfileslop,
+                                          true))
+               {
+                   elog(LOG, "recycled transaction log file %s",
+                        xlde->d_name);
+               }
+               else
+               {
+                   /* No need for any more future segments... */
+                   elog(LOG, "removing transaction log file %s",
+                        xlde->d_name);
+                   unlink(path);
+               }
+           }
        }
        errno = 0;
    }
    if (errno)
-       elog(STOP, "could not read transaction log directory (%s): %m", XLogDir);
+       elog(STOP, "could not read transaction log directory (%s): %m",
+            XLogDir);
    closedir(xldir);
 }
 
@@ -1866,6 +1967,8 @@ next_record_is_invalid:;
 static bool
 ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
 {
+   XLogRecPtr  recaddr;
+
    if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
    {
        elog(emode, "ReadRecord: invalid magic number %04X in log file %u, segment %u, offset %u",
@@ -1878,6 +1981,15 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
             hdr->xlp_info, readId, readSeg, readOff);
        return false;
    }
+   recaddr.xlogid = readId;
+   recaddr.xrecoff = readSeg * XLogSegSize + readOff;
+   if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+   {
+       elog(emode, "ReadRecord: unexpected pageaddr (%u, %u) in log file %u, segment %u, offset %u",
+            hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
+            readId, readSeg, readOff);
+       return false;
+   }
 
    /*
     * We disbelieve a SUI less than the previous page's SUI, or more than
@@ -2248,6 +2360,8 @@ BootStrapXLOG(void)
    page->xlp_magic = XLOG_PAGE_MAGIC;
    page->xlp_info = 0;
    page->xlp_sui = checkPoint.ThisStartUpID;
+   page->xlp_pageaddr.xlogid = 0;
+   page->xlp_pageaddr.xrecoff = 0;
    record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
    record->xl_prev.xlogid = 0;
    record->xl_prev.xrecoff = 0;
@@ -2500,23 +2614,29 @@ StartupXLOG(void)
        EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
    if (EndOfLog.xrecoff % BLCKSZ == 0)
    {
-       if (EndOfLog.xrecoff >= XLogFileSize)
+       XLogRecPtr  NewPageEndPtr;
+
+       NewPageEndPtr = EndOfLog;
+       if (NewPageEndPtr.xrecoff >= XLogFileSize)
        {
-           XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid + 1;
-           XLogCtl->xlblocks[0].xrecoff = BLCKSZ;
+           /* crossing a logid boundary */
+           NewPageEndPtr.xlogid += 1;
+           NewPageEndPtr.xrecoff = BLCKSZ;
        }
        else
        {
-           XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid;
-           XLogCtl->xlblocks[0].xrecoff = EndOfLog.xrecoff + BLCKSZ;
+           NewPageEndPtr.xrecoff += BLCKSZ;
        }
-       Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
+       XLogCtl->xlblocks[0] = NewPageEndPtr;
        Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
        if (InRecovery)
            Insert->currpage->xlp_sui = ThisStartUpID;
        else
            Insert->currpage->xlp_sui = ThisStartUpID + 1;
+       Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+       Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
        /* rest of buffer was zeroed in XLOGShmemInit */
+       Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
    }
    else
    {
@@ -2916,7 +3036,7 @@ CreateCheckPoint(bool shutdown)
    if (_logId || _logSeg)
    {
        PrevLogSeg(_logId, _logSeg);
-       MoveOfflineLogs(_logId, _logSeg);
+       MoveOfflineLogs(_logId, _logSeg, recptr);
    }
 
    /*
index 41a8d84dade4a667df4a8fffd5ef4904f22f3983..73a60b2e0ce018e19b31aab40594f378d3831dff 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xlog.h,v 1.23 2001/03/22 04:00:32 momjian Exp $
+ * $Id: xlog.h,v 1.24 2001/07/19 02:12:35 tgl Exp $
  */
 #ifndef XLOG_H
 #define XLOG_H
@@ -109,13 +109,14 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD058 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD059 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
    uint16      xlp_magic;      /* magic value for correctness checks */
    uint16      xlp_info;       /* flag bits, see below */
    StartUpID   xlp_sui;        /* StartUpID of first record on page */
+   XLogRecPtr  xlp_pageaddr;   /* XLOG address of this page */
 } XLogPageHeaderData;
 
 #define SizeOfXLogPHD  MAXALIGN(sizeof(XLogPageHeaderData))