Rearrange XLogFileInit so that control-file spinlock is not held while filling
authorTom Lane
Sat, 17 Mar 2001 20:54:13 +0000 (20:54 +0000)
committerTom Lane
Sat, 17 Mar 2001 20:54:13 +0000 (20:54 +0000)
the new log file with zeroes, only while renaming it into place.  This should
prevent problems with 'stuck spinlock' errors under heavy load.

src/backend/access/transam/xlog.c

index d2d75d652c911fb553c04f783029078a6d29e0ee..5bbc4812ff8922700bc7d4798f568525e652fe4b 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.59 2001/03/16 05:44:33 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.60 2001/03/17 20:54:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -86,7 +86,7 @@
 /* Max time to wait to acquire XLog activity locks */
 #define XLOG_LOCK_TIMEOUT          (5*60*1000000) /* 5 minutes */
 /* Max time to wait to acquire checkpoint lock */
-#define CHECKPOINT_LOCK_TIMEOUT        (10*60*1000000) /* 10 minutes */
+#define CHECKPOINT_LOCK_TIMEOUT        (20*60*1000000) /* 20 minutes */
 
 /* User-settable parameters */
 int            CheckPointSegments = 3;
@@ -335,10 +335,6 @@ static ControlFileData *ControlFile = NULL;
            snprintf(path, MAXPGPATH, "%s%c%08X%08X",   \
                     XLogDir, SEP_CHAR, log, seg)
 
-#define XLogTempFileName(path, log, seg)   \
-           snprintf(path, MAXPGPATH, "%s%cT%08X%08X",  \
-                    XLogDir, SEP_CHAR, log, seg)
-
 #define PrevBufIdx(idx)        \
        (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
 
@@ -401,7 +397,8 @@ static bool InRedo = false;
 
 static bool AdvanceXLInsertBuffer(void);
 static void XLogWrite(XLogwrtRqst WriteRqst);
-static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent);
+static int XLogFileInit(uint32 log, uint32 seg,
+                        bool *use_existent, bool use_lock);
 static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
 static void PreallocXlogFiles(XLogRecPtr endptr);
 static void MoveOfflineLogs(uint32 log, uint32 seg);
@@ -960,7 +957,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
    XLogCtlWrite *Write = &XLogCtl->Write;
    char       *from;
    bool        ispartialpage;
-   bool        usexistent;
+   bool        use_existent;
 
    /* Update local LogwrtResult (caller probably did this already, but...) */
    LogwrtResult = Write->LogwrtResult;
@@ -994,12 +991,18 @@ XLogWrite(XLogwrtRqst WriteRqst)
            }
            XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
 
-           /* create/use new log file; need lock in case creating */
-           SpinAcquire(ControlFileLockId);
-           usexistent = true;
-           openLogFile = XLogFileInit(openLogId, openLogSeg, &usexistent);
+           /* create/use new log file */
+           use_existent = true;
+           openLogFile = XLogFileInit(openLogId, openLogSeg,
+                                      &use_existent, true);
            openLogOff = 0;
+
+           if (!use_existent)  /* there was no precreated file */
+               elog(LOG, "XLogWrite: new log file created - "
+                    "consider increasing WAL_FILES");
+
            /* update pg_control, unless someone else already did */
+           SpinAcquire(ControlFileLockId);
            if (ControlFile->logId != openLogId ||
                ControlFile->logSeg != openLogSeg + 1)
            {
@@ -1007,28 +1010,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
                ControlFile->logSeg = openLogSeg + 1;
                ControlFile->time = time(NULL);
                UpdateControlFile();
+               /*
+                * Signal postmaster to start a checkpoint if it's been too
+                * long since the last one.  (We look at local copy of
+                * RedoRecPtr which might be a little out of date, but should
+                * be close enough for this purpose.)
+                */
+               if (IsUnderPostmaster &&
+                   (openLogId != RedoRecPtr.xlogid ||
+                    openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
+                    (uint32) CheckPointSegments))
+               {
+                   if (XLOG_DEBUG)
+                       fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
+                   kill(getppid(), SIGUSR1);
+               }
            }
            SpinRelease(ControlFileLockId);
-
-           if (!usexistent)    /* there was no precreated file */
-               elog(LOG, "XLogWrite: new log file created - "
-                    "consider increasing WAL_FILES");
-
-           /*
-            * Signal postmaster to start a checkpoint if it's been too
-            * long since the last one.  (We look at local copy of RedoRecPtr
-            * which might be a little out of date, but should be close enough
-            * for this purpose.)
-            */
-           if (IsUnderPostmaster &&
-               (openLogId != RedoRecPtr.xlogid ||
-                openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
-                (uint32) CheckPointSegments))
-           {
-               if (XLOG_DEBUG)
-                   fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
-               kill(getppid(), SIGUSR1);
-           }
        }
 
        if (openLogFile < 0)
@@ -1230,14 +1228,28 @@ XLogFlush(XLogRecPtr record)
 /*
  * Create a new XLOG file segment, or open a pre-existing one.
  *
+ * log, seg: identify segment to be created/opened.
+ *
+ * *use_existent: if TRUE, OK to use a pre-existing file (else, any
+ * pre-existing file will be deleted).  On return, TRUE if a pre-existing
+ * file was used.
+ *
+ * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * place.  This should be TRUE except during bootstrap log creation.  The
+ * caller must *not* hold the spinlock at call.
+ *
  * Returns FD of opened file.
  */
 static int
-XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
+XLogFileInit(uint32 log, uint32 seg,
+            bool *use_existent, bool use_lock)
 {
    char        path[MAXPGPATH];
-   char        tpath[MAXPGPATH];
+   char        tmppath[MAXPGPATH];
+   char        targpath[MAXPGPATH];
    char        zbuffer[BLCKSZ];
+   uint32      targlog,
+               targseg;
    int         fd;
    int         nbytes;
 
@@ -1246,7 +1258,7 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
    /*
     * Try to use existent file (checkpoint maker may have created it already)
     */
-   if (*usexistent)
+   if (*use_existent)
    {
        fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
                           S_IRUSR | S_IWUSR);
@@ -1258,20 +1270,24 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
        }
        else
            return(fd);
-       /* Set flag to tell caller there was no existent file */
-       *usexistent = false;
    }
 
-   XLogTempFileName(tpath, log, seg);
-   unlink(tpath);
-   unlink(path);
+   /*
+    * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
+    * another process is doing the same thing.  If so, we will end up
+    * pre-creating an extra log segment.  That seems OK, and better than
+    * holding the spinlock throughout this lengthy process.
+    */
+   snprintf(tmppath, MAXPGPATH, "%s%cxlogtemp.%d",
+            XLogDir, SEP_CHAR, (int) getpid());
+
+   unlink(tmppath);
 
    /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
-   fd = BasicOpenFile(tpath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
+   fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
                       S_IRUSR | S_IWUSR);
    if (fd < 0)
-       elog(STOP, "InitCreate(logfile %u seg %u) failed: %m",
-            log, seg);
+       elog(STOP, "InitCreate(%s) failed: %m", tmppath);
 
    /*
     * Zero-fill the file.  We have to do this the hard way to ensure that
@@ -1290,36 +1306,73 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
            int     save_errno = errno;
 
            /* If we fail to make the file, delete it to release disk space */
-           unlink(tpath);
+           unlink(tmppath);
            errno = save_errno;
 
-           elog(STOP, "ZeroFill(logfile %u seg %u) failed: %m",
-                log, seg);
+           elog(STOP, "ZeroFill(%s) failed: %m", tmppath);
        }
    }
 
    if (pg_fsync(fd) != 0)
-       elog(STOP, "fsync(logfile %u seg %u) failed: %m",
-            log, seg);
+       elog(STOP, "fsync(%s) failed: %m", tmppath);
 
    close(fd);
 
    /*
-    * Prefer link() to rename() here just to be sure that we don't overwrite
-    * an existing logfile.  However, there shouldn't be one, so rename()
-    * is an acceptable substitute except for the truly paranoid.
+    * Now move the segment into place with its final name.  We want to be
+    * sure that only one process does this at a time.
+    */
+   if (use_lock)
+       SpinAcquire(ControlFileLockId);
+
+   /*
+    * If caller didn't want to use a pre-existing file, get rid of any
+    * pre-existing file.  Otherwise, cope with possibility that someone
+    * else has created the file while we were filling ours: if so, use
+    * ours to pre-create a future log segment.
+    */
+   targlog = log;
+   targseg = seg;
+   strcpy(targpath, path);
+
+   if (! *use_existent)
+   {
+       unlink(targpath);
+   }
+   else
+   {
+       while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
+                                  S_IRUSR | S_IWUSR)) >= 0)
+       {
+           close(fd);
+           NextLogSeg(targlog, targseg);
+           XLogFileName(targpath, targlog, targseg);
+       }
+   }
+
+   /*
+    * Prefer link() to rename() here just to be really sure that we don't
+    * overwrite an existing logfile.  However, there shouldn't be one, so
+    * rename() is an acceptable substitute except for the truly paranoid.
     */
 #ifndef __BEOS__
-   if (link(tpath, path) < 0)
+   if (link(tmppath, targpath) < 0)
        elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
-            log, seg);
-   unlink(tpath);
+            targlog, targseg);
+   unlink(tmppath);
 #else
-   if (rename(tpath, path) < 0)
+   if (rename(tmppath, targpath) < 0)
        elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
-            log, seg);
+            targlog, targseg);
 #endif
 
+   if (use_lock)
+       SpinRelease(ControlFileLockId);
+
+   /* Set flag to tell caller there was no existent file */
+   *use_existent = false;
+
+   /* Now open original target segment (might not be file I just made) */
    fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
                       S_IRUSR | S_IWUSR);
    if (fd < 0)
@@ -1367,8 +1420,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
    uint32      _logId;
    uint32      _logSeg;
    int         lf;
-   bool        usexistent;
-   struct timeval delay;
+   bool        use_existent;
    int         i;
 
    XLByteToPrevSeg(endptr, _logId, _logSeg);
@@ -1376,30 +1428,19 @@ PreallocXlogFiles(XLogRecPtr endptr)
    {
        for (i = 1; i <= XLOGfiles; i++)
        {
-           usexistent = true;
            NextLogSeg(_logId, _logSeg);
-           SpinAcquire(ControlFileLockId);
-           lf = XLogFileInit(_logId, _logSeg, &usexistent);
+           use_existent = true;
+           lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
            close(lf);
-           SpinRelease(ControlFileLockId);
-           /*
-            * Give up ControlFileLockId for 1/50 sec to let other
-            * backends switch to new log file in XLogWrite()
-            */
-           delay.tv_sec = 0;
-           delay.tv_usec = 20000;
-           (void) select(0, NULL, NULL, NULL, &delay);
        }
    }
    else if ((endptr.xrecoff - 1) % XLogSegSize >=
             (uint32) (0.75 * XLogSegSize))
    {
-       usexistent = true;
        NextLogSeg(_logId, _logSeg);
-       SpinAcquire(ControlFileLockId);
-       lf = XLogFileInit(_logId, _logSeg, &usexistent);
+       use_existent = true;
+       lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
        close(lf);
-       SpinRelease(ControlFileLockId);
    }
 }
 
@@ -2103,7 +2144,7 @@ BootStrapXLOG(void)
    char       *buffer;
    XLogPageHeader page;
    XLogRecord *record;
-   bool        usexistent = false;
+   bool        use_existent;
    crc64       crc;
 
    /* Use malloc() to ensure buffer is MAXALIGNED */
@@ -2144,7 +2185,8 @@ BootStrapXLOG(void)
    FIN_CRC64(crc);
    record->xl_crc = crc;
 
-   openLogFile = XLogFileInit(0, 0, &usexistent);
+   use_existent = false;
+   openLogFile = XLogFileInit(0, 0, &use_existent, false);
 
    if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
        elog(STOP, "BootStrapXLOG failed to write logfile: %m");