Buffer manager modifications to keep a local buffer-dirtied bit as well
authorTom Lane
Sun, 9 Apr 2000 04:43:20 +0000 (04:43 +0000)
committerTom Lane
Sun, 9 Apr 2000 04:43:20 +0000 (04:43 +0000)
as a shared dirtybit for each shared buffer.  The shared dirtybit still
controls writing the buffer, but the local bit controls whether we need
to fsync the buffer's file.  This arrangement fixes a bug that allowed
some required fsyncs to be missed, and should improve performance as well.
For more info see my post of same date on pghackers.

13 files changed:
src/backend/access/transam/xact.c
src/backend/catalog/catalog.c
src/backend/storage/buffer/buf_init.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/freelist.c
src/backend/storage/file/fd.c
src/backend/storage/smgr/md.c
src/backend/storage/smgr/smgr.c
src/include/catalog/catalog.h
src/include/storage/buf_internals.h
src/include/storage/bufmgr.h
src/include/storage/fd.h
src/include/storage/smgr.h

index c3db87a187a063dfb57538b7bcc3cac530a485dd..2522cca46c12296a4abfe76bc6391c120fede8ae 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.62 2000/03/17 02:36:05 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.63 2000/04/09 04:43:16 tgl Exp $
  *
  * NOTES
  *     Transaction aborts can now occur two ways:
@@ -642,7 +642,7 @@ RecordTransactionCommit()
    {
        FlushBufferPool();
        if (leak)
-           ResetBufferPool();
+           ResetBufferPool(true);
 
        /*
         *  have the transaction access methods record the status
@@ -658,7 +658,7 @@ RecordTransactionCommit()
    }
 
    if (leak)
-       ResetBufferPool();
+       ResetBufferPool(true);
 }
 
 
@@ -759,7 +759,10 @@ RecordTransactionAbort()
    if (SharedBufferChanged && !TransactionIdDidCommit(xid))
        TransactionIdAbort(xid);
 
-   ResetBufferPool();
+   /*
+    * Tell bufmgr and smgr to release resources.
+    */
+   ResetBufferPool(false);     /* false -> is abort */
 }
 
 /* --------------------------------
index 1072877713e13e56cebef52c9e4569fe02625177..5425387b3fe32c5c1134d421ae83f06126cee8cb 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/catalog/catalog.c,v 1.30 2000/01/26 05:56:10 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/catalog/catalog.c,v 1.31 2000/04/09 04:43:15 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "utils/syscache.h"
 
 /*
- * relpath             - path to the relation
- *     Perhaps this should be in-line code in relopen().
+ * relpath             - construct path to a relation's file
+ *
+ * Note that this only works with relations that are visible to the current
+ * backend, ie, either in the current database or shared system relations.
+ *
+ * Result is a palloc'd string.
  */
 char *
 relpath(const char *relname)
 {
    char       *path;
-   size_t      bufsize = 0;
 
    if (IsSharedSystemRelationName(relname))
    {
-       bufsize = strlen(DataDir) + sizeof(NameData) + 2;
+       /* Shared system relations live in DataDir */
+       size_t      bufsize = strlen(DataDir) + sizeof(NameData) + 2;
+
        path = (char *) palloc(bufsize);
-       snprintf(path, bufsize, "%s/%s", DataDir, relname);
+       snprintf(path, bufsize, "%s%c%s", DataDir, SEP_CHAR, relname);
        return path;
    }
+   /*
+    * If it is in the current database, assume it is in current working
+    * directory.  NB: this does not work during bootstrap!
+    */
    return pstrdup(relname);
 }
 
+/*
+ * relpath_blind           - construct path to a relation's file
+ *
+ * Construct the path using only the info available to smgrblindwrt,
+ * namely the names and OIDs of the database and relation.  (Shared system
+ * relations are identified with dbid = 0.)  Note that we may have to
+ * access a relation belonging to a different database!
+ *
+ * Result is a palloc'd string.
+ */
+
+char *
+relpath_blind(const char *dbname, const char *relname,
+             Oid dbid, Oid relid)
+{
+   char       *path;
+
+   if (dbid == (Oid) 0)
+   {
+       /* Shared system relations live in DataDir */
+       path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2);
+       sprintf(path, "%s%c%s", DataDir, SEP_CHAR, relname);
+   }
+   else if (dbid == MyDatabaseId)
+   {
+       /* XXX why is this inconsistent with relpath() ? */
+       path = (char *) palloc(strlen(DatabasePath) + sizeof(NameData) + 2);
+       sprintf(path, "%s%c%s", DatabasePath, SEP_CHAR, relname);
+   }
+   else
+   {
+       /* this is work around only !!! */
+       char        dbpathtmp[MAXPGPATH];
+       Oid         id;
+       char       *dbpath;
+
+       GetRawDatabaseInfo(dbname, &id, dbpathtmp);
+
+       if (id != dbid)
+           elog(FATAL, "relpath_blind: oid of db %s is not %u",
+                dbname, dbid);
+       dbpath = ExpandDatabasePath(dbpathtmp);
+       if (dbpath == NULL)
+           elog(FATAL, "relpath_blind: can't expand path for db %s",
+                dbname);
+       path = (char *) palloc(strlen(dbpath) + sizeof(NameData) + 2);
+       sprintf(path, "%s%c%s", dbpath, SEP_CHAR, relname);
+       pfree(dbpath);
+   }
+   return path;
+}
+
+
 /*
  * IsSystemRelationName
  *     True iff name is the name of a system catalog relation.
index 0a58033c97f28ee5c006d0d040e23658e5ac49a6..4f1d033bc42f2eb72dcd9f30b8b66453fb243690 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.32 2000/01/26 05:56:50 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.33 2000/04/09 04:43:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -65,9 +65,11 @@ long    *NWaitIOBackendP;
 extern IpcSemaphoreId WaitIOSemId;
 
 long      *PrivateRefCount;    /* also used in freelist.c */
-bits8     *BufferLocks;        /* */
-long      *CommitInfoNeedsSave;/* to write buffers where we have filled
-                                * in t_infomask */
+bits8     *BufferLocks;        /* flag bits showing locks I have set */
+BufferTag  *BufferTagLastDirtied; /* tag buffer had when last dirtied by me */
+BufferBlindId *BufferBlindLastDirtied; /* and its BlindId too */
+bool      *BufferDirtiedByMe;  /* T if buf has been dirtied in cur xact */
+
 
 /*
  * Data Structures:
@@ -247,7 +249,9 @@ InitBufferPool(IPCKey key)
 #endif
    PrivateRefCount = (long *) calloc(NBuffers, sizeof(long));
    BufferLocks = (bits8 *) calloc(NBuffers, sizeof(bits8));
-   CommitInfoNeedsSave = (long *) calloc(NBuffers, sizeof(long));
+   BufferTagLastDirtied = (BufferTag *) calloc(NBuffers, sizeof(BufferTag));
+   BufferBlindLastDirtied = (BufferBlindId *) calloc(NBuffers, sizeof(BufferBlindId));
+   BufferDirtiedByMe = (bool *) calloc(NBuffers, sizeof(bool));
 }
 
 /* -----------------------------------------------------
index acc719ca4b6acfae0db02882403c81d7a7cd1d63..0887f3d1ecda533c6988fc129e41ceb02724a18a 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.77 2000/03/31 02:43:31 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.78 2000/04/09 04:43:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -94,8 +94,10 @@ static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
                         bool bufferLockHeld);
 static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
            bool *foundPtr, bool bufferLockHeld);
+static void SetBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr);
+static void ClearBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr);
 static void BufferSync(void);
-static int BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld);
+static int BufferReplace(BufferDesc *bufHdr);
 void       PrintBufferDescs(void);
 
 /* ---------------------------------------------------
@@ -176,7 +178,7 @@ is_userbuffer(Buffer buffer)
 {
    BufferDesc *buf = &BufferDescriptors[buffer - 1];
 
-   if (IsSystemRelationName(buf->sb_relname))
+   if (IsSystemRelationName(buf->blind.relname))
        return false;
    return true;
 }
@@ -199,7 +201,7 @@ ReadBuffer_Debug(char *file,
 
        fprintf(stderr, "PIN(RD) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               buffer, buf->sb_relname, buf->tag.blockNum,
+               buffer, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[buffer - 1], file, line);
    }
    return buffer;
@@ -390,22 +392,21 @@ BufferAlloc(Relation reln,
             * If there's no IO for the buffer and the buffer
             * is BROKEN,it should be read again. So start a
             * new buffer IO here. 
-
-                *
-                * wierd race condition:
-                *
-                * We were waiting for someone else to read the buffer. While
-                * we were waiting, the reader boof'd in some way, so the
-                * contents of the buffer are still invalid.  By saying
-                * that we didn't find it, we can make the caller
-                * reinitialize the buffer.  If two processes are waiting
-                * for this block, both will read the block.  The second
-                * one to finish may overwrite any updates made by the
-                * first.  (Assume higher level synchronization prevents
-                * this from happening).
-                *
-                * This is never going to happen, don't worry about it.
-                */
+            *
+            * wierd race condition:
+            *
+            * We were waiting for someone else to read the buffer. While
+            * we were waiting, the reader boof'd in some way, so the
+            * contents of the buffer are still invalid.  By saying
+            * that we didn't find it, we can make the caller
+            * reinitialize the buffer.  If two processes are waiting
+            * for this block, both will read the block.  The second
+            * one to finish may overwrite any updates made by the
+            * first.  (Assume higher level synchronization prevents
+            * this from happening).
+            *
+            * This is never going to happen, don't worry about it.
+            */
            *foundPtr = FALSE;
        }
 #ifdef BMTRACE
@@ -465,33 +466,24 @@ BufferAlloc(Relation reln,
             * in WaitIO until we're done.
             */
            inProgress = TRUE;
-#ifdef HAS_TEST_AND_SET
 
            /*
             * All code paths that acquire this lock pin the buffer first;
             * since no one had it pinned (it just came off the free
             * list), no one else can have this lock.
             */
-#endif  /* HAS_TEST_AND_SET */
            StartBufferIO(buf, false);
 
            /*
             * Write the buffer out, being careful to release BufMgrLock
             * before starting the I/O.
-            *
-            * This #ifndef is here because a few extra semops REALLY kill
-            * you on machines that don't have spinlocks.  If you don't
-            * operate with much concurrency, well...
             */
-           smok = BufferReplace(buf, true);
-#ifndef OPTIMIZE_SINGLE
-           SpinAcquire(BufMgrLock);
-#endif  /* OPTIMIZE_SINGLE */
+           smok = BufferReplace(buf);
 
            if (smok == FALSE)
            {
                elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
-                    buf->tag.blockNum, buf->sb_dbname, buf->sb_relname);
+                    buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
                inProgress = FALSE;
                buf->flags |= BM_IO_ERROR;
                buf->flags &= ~BM_IO_IN_PROGRESS;
@@ -516,7 +508,7 @@ BufferAlloc(Relation reln,
                if (buf->flags & BM_JUST_DIRTIED)
                {
                    elog(FATAL, "BufferAlloc: content of block %u (%s) changed while flushing",
-                        buf->tag.blockNum, buf->sb_relname);
+                        buf->tag.blockNum, buf->blind.relname);
                }
                else
                    buf->flags &= ~BM_DIRTY;
@@ -562,6 +554,7 @@ BufferAlloc(Relation reln,
                 */
                if (buf != NULL)
                {
+                   buf->flags &= ~BM_IO_IN_PROGRESS;
                    TerminateBufferIO(buf);
                    /* give up the buffer since we don't need it any more */
                    PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
@@ -572,7 +565,6 @@ BufferAlloc(Relation reln,
                        AddBufferToFreelist(buf);
                        buf->flags |= BM_FREE;
                    }
-                   buf->flags &= ~BM_IO_IN_PROGRESS;
                }
 
                PinBuffer(buf2);
@@ -619,8 +611,8 @@ BufferAlloc(Relation reln,
    }
 
    /* record the database name and relation name for this buffer */
-   strcpy(buf->sb_relname, RelationGetPhysicalRelationName(reln));
-   strcpy(buf->sb_dbname, DatabaseName);
+   strcpy(buf->blind.dbname, DatabaseName);
+   strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln));
 
    INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
    if (!BufTableInsert(buf))
@@ -683,9 +675,9 @@ WriteBuffer(Buffer buffer)
    SpinAcquire(BufMgrLock);
    Assert(bufHdr->refcount > 0);
    bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+   SetBufferDirtiedByMe(buffer, bufHdr);
    UnpinBuffer(bufHdr);
    SpinRelease(BufMgrLock);
-   CommitInfoNeedsSave[buffer - 1] = 0;
 
    return TRUE;
 }
@@ -702,7 +694,7 @@ WriteBuffer_Debug(char *file, int line, Buffer buffer)
        buf = &BufferDescriptors[buffer - 1];
        fprintf(stderr, "UNPIN(WR) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               buffer, buf->sb_relname, buf->tag.blockNum,
+               buffer, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[buffer - 1], file, line);
    }
 }
@@ -767,8 +759,9 @@ DirtyBufferCopy(Oid dbid, Oid relid, BlockNumber blkno, char *dest)
  *
  * 'buffer' is known to be dirty/pinned, so there should not be a
  * problem reading the BufferDesc members without the BufMgrLock
- * (nobody should be able to change tags, flags, etc. out from under
- * us).  Unpin if 'release' is TRUE.
+ * (nobody should be able to change tags out from under us).
+ *
+ * Unpin if 'release' is TRUE.
  */
 int
 FlushBuffer(Buffer buffer, bool release)
@@ -784,6 +777,8 @@ FlushBuffer(Buffer buffer, bool release)
    if (BAD_BUFFER_ID(buffer))
        return STATUS_ERROR;
 
+   Assert(PrivateRefCount[buffer - 1] > 0); /* else caller didn't pin */
+
    bufHdr = &BufferDescriptors[buffer - 1];
    bufdb = bufHdr->tag.relId.dbId;
 
@@ -809,7 +804,7 @@ FlushBuffer(Buffer buffer, bool release)
    if (status == SM_FAIL)
    {
        elog(ERROR, "FlushBuffer: cannot flush block %u of the relation %s",
-            bufHdr->tag.blockNum, bufHdr->sb_relname);
+            bufHdr->tag.blockNum, bufHdr->blind.relname);
        return STATUS_ERROR;
    }
    BufferFlushCount++;
@@ -820,19 +815,21 @@ FlushBuffer(Buffer buffer, bool release)
 
    /*
     * If this buffer was marked by someone as DIRTY while we were
-    * flushing it out we must not clear DIRTY flag - vadim 01/17/97
+    * flushing it out we must not clear shared DIRTY flag - vadim 01/17/97
+    *
+    * ... but we can clear BufferDirtiedByMe anyway - tgl 3/31/00
     */
    if (bufHdr->flags & BM_JUST_DIRTIED)
    {
        elog(NOTICE, "FlushBuffer: content of block %u (%s) changed while flushing",
-            bufHdr->tag.blockNum, bufHdr->sb_relname);
+            bufHdr->tag.blockNum, bufHdr->blind.relname);
    }
    else
        bufHdr->flags &= ~BM_DIRTY;
+   ClearBufferDirtiedByMe(buffer, bufHdr);
    if (release)
        UnpinBuffer(bufHdr);
    SpinRelease(BufMgrLock);
-   CommitInfoNeedsSave[buffer - 1] = 0;
 
    return STATUS_OK;
 }
@@ -857,9 +854,10 @@ WriteNoReleaseBuffer(Buffer buffer)
    SharedBufferChanged = true;
 
    SpinAcquire(BufMgrLock);
+   Assert(bufHdr->refcount > 0);
    bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+   SetBufferDirtiedByMe(buffer, bufHdr);
    SpinRelease(BufMgrLock);
-   CommitInfoNeedsSave[buffer - 1] = 0;
 
    return STATUS_OK;
 }
@@ -901,11 +899,6 @@ ReleaseAndReadBuffer(Buffer buffer,
                    AddBufferToFreelist(bufHdr);
                    bufHdr->flags |= BM_FREE;
                }
-               if (CommitInfoNeedsSave[buffer - 1])
-               {
-                   bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
-                   CommitInfoNeedsSave[buffer - 1] = 0;
-               }
                retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
                return retbuf;
            }
@@ -915,13 +908,120 @@ ReleaseAndReadBuffer(Buffer buffer,
    return ReadBuffer(relation, blockNum);
 }
 
+/*
+ * SetBufferDirtiedByMe -- mark a shared buffer as being dirtied by this xact
+ *
+ * This flag essentially remembers that we need to write and fsync this buffer
+ * before we can commit the transaction.  The write might end up getting done
+ * by another backend, but we must do the fsync ourselves (else we could
+ * commit before the data actually reaches disk).  We do not issue fsync
+ * instantly upon write; the storage manager keeps track of which files need
+ * to be fsync'd before commit can occur.  A key aspect of this data structure
+ * is that we will be able to notify the storage manager that an fsync is
+ * needed even after another backend has done the physical write and replaced
+ * the buffer contents with something else!
+ *
+ * NB: we must be holding the bufmgr lock at entry, and the buffer must be
+ * pinned so that no other backend can take it away from us.
+ */
+static void
+SetBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
+{
+   BufferTag  *tagLastDirtied = & BufferTagLastDirtied[buffer - 1];
+   Relation    reln;
+   int         status;
+
+   /*
+    * If the flag is already set, check to see whether the buffertag is
+    * the same.  If not, some other backend already wrote the buffer data
+    * that we dirtied.  We must tell the storage manager to make an fsync
+    * pending on that file before we can overwrite the old tag value.
+    */
+   if (BufferDirtiedByMe[buffer - 1])
+   {
+       if (bufHdr->tag.relId.dbId == tagLastDirtied->relId.dbId &&
+           bufHdr->tag.relId.relId == tagLastDirtied->relId.relId &&
+           bufHdr->tag.blockNum == tagLastDirtied->blockNum)
+           return;             /* Same tag already dirtied, so no work */
+
+#ifndef OPTIMIZE_SINGLE
+       SpinRelease(BufMgrLock);
+#endif  /* OPTIMIZE_SINGLE */
+
+       reln = RelationIdCacheGetRelation(tagLastDirtied->relId.relId);
+
+       if (reln == (Relation) NULL)
+       {
+           status = smgrblindmarkdirty(DEFAULT_SMGR,
+                                       BufferBlindLastDirtied[buffer - 1].dbname,
+                                       BufferBlindLastDirtied[buffer - 1].relname,
+                                       tagLastDirtied->relId.dbId,
+                                       tagLastDirtied->relId.relId,
+                                       tagLastDirtied->blockNum);
+       }
+       else
+       {
+           status = smgrmarkdirty(DEFAULT_SMGR, reln,
+                                  tagLastDirtied->blockNum);
+           /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+           RelationDecrementReferenceCount(reln);
+       }
+       if (status == SM_FAIL)
+       {
+           elog(ERROR, "SetBufferDirtiedByMe: cannot mark %u for %s",
+                tagLastDirtied->blockNum,
+                BufferBlindLastDirtied[buffer - 1].relname);
+       }
+
+#ifndef OPTIMIZE_SINGLE
+       SpinAcquire(BufMgrLock);
+#endif  /* OPTIMIZE_SINGLE */
+
+   }
+
+   *tagLastDirtied = bufHdr->tag;
+   BufferBlindLastDirtied[buffer - 1] = bufHdr->blind;
+   BufferDirtiedByMe[buffer - 1] = true;
+}
+
+/*
+ * ClearBufferDirtiedByMe -- mark a shared buffer as no longer needing fsync
+ *
+ * If we write out a buffer ourselves, then the storage manager will set its
+ * needs-fsync flag for that file automatically, and so we can clear our own
+ * flag that says it needs to be done later.
+ *
+ * NB: we must be holding the bufmgr lock at entry.
+ */
+static void
+ClearBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
+{
+   BufferTag  *tagLastDirtied = & BufferTagLastDirtied[buffer - 1];
+
+   /*
+    * Do *not* clear the flag if it refers to some other buffertag than
+    * the data we just wrote.  This is unlikely, but possible if some
+    * other backend replaced the buffer contents since we set our flag.
+    */
+   if (bufHdr->tag.relId.dbId == tagLastDirtied->relId.dbId &&
+       bufHdr->tag.relId.relId == tagLastDirtied->relId.relId &&
+       bufHdr->tag.blockNum == tagLastDirtied->blockNum)
+   {
+       BufferDirtiedByMe[buffer - 1] = false;
+   }
+}
+
 /*
  * BufferSync -- Flush all dirty buffers in the pool.
  *
- *     This is called at transaction commit time.  It does the wrong thing,
- *     right now.  We should flush only our own changes to stable storage,
- *     and we should obey the lock protocol on the buffer manager metadata
- *     as we do it.  Also, we need to be sure that no other transaction is
+ *     This is called at transaction commit time.  We find all buffers
+ *     that have been dirtied by the current xact and flush them to disk.
+ *     We do *not* flush dirty buffers that have been dirtied by other xacts.
+ *     (This is a substantial change from pre-7.0 behavior.)
+ *
+ * OLD COMMENTS (do these still apply?)
+ *
+ *     Also, we need to be sure that no other transaction is
  *     modifying the page as we flush it.  This is only a problem for objects
  *     that use a non-two-phase locking protocol, like btree indices.  For
  *     those objects, we would like to set a write lock for the duration of
@@ -936,21 +1036,49 @@ static void
 BufferSync()
 {
    int         i;
-   Oid         bufdb;
-   Oid         bufrel;
-   Relation    reln;
    BufferDesc *bufHdr;
    int         status;
+   Relation    reln;
+   bool        didwrite;
 
-   SpinAcquire(BufMgrLock);
    for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
    {
+       /* Ignore buffers that were not dirtied by me */
+       if (! BufferDirtiedByMe[i])
+           continue;
+
+       SpinAcquire(BufMgrLock);
+
+       /*
+        * We only need to write if the buffer is still dirty and still
+        * contains the same disk page that it contained when we dirtied it.
+        * Otherwise, someone else has already written our changes for us,
+        * and we need only fsync.
+        *
+        * (NOTE: it's still possible to do an unnecessary write, if other
+        * xacts have written and then re-dirtied the page since our last
+        * change to it.  But that should be pretty uncommon, and there's
+        * no easy way to detect it anyway.)
+        */
+       reln = NULL;
+       didwrite = false;
        if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
        {
+           Oid         bufdb;
+           Oid         bufrel;
+
            bufdb = bufHdr->tag.relId.dbId;
            bufrel = bufHdr->tag.relId.relId;
-           if (bufdb == MyDatabaseId || bufdb == (Oid) 0)
+           if (bufdb == BufferTagLastDirtied[i].relId.dbId &&
+               bufrel == BufferTagLastDirtied[i].relId.relId &&
+               bufHdr->tag.blockNum == BufferTagLastDirtied[i].blockNum)
            {
+               /*
+                * Try to find relation for buf.  This could fail, if the
+                * rel has been flushed from the relcache since we dirtied
+                * the page.  That should be uncommon, so paying the extra
+                * cost of a blind write when it happens seems OK.
+                */
                reln = RelationIdCacheGetRelation(bufrel);
 
                /*
@@ -970,74 +1098,114 @@ BufferSync()
                    if (bufHdr->flags & BM_IO_ERROR)
                    {
                        elog(ERROR, "BufferSync: write error %u for %s",
-                            bufHdr->tag.blockNum, bufHdr->sb_relname);
+                            bufHdr->tag.blockNum, bufHdr->blind.relname);
                    }
-                   /* drop refcnt from RelationIdCacheGetRelation */
-                   if (reln != (Relation) NULL)
-                       RelationDecrementReferenceCount(reln);
-                   continue;
-               }
-
-               /*
-                * To check if block content changed while flushing (see
-                * below). - vadim 01/17/97
-                */
-               WaitIO(bufHdr, BufMgrLock); /* confirm end of IO */
-               bufHdr->flags &= ~BM_JUST_DIRTIED;
-               StartBufferIO(bufHdr, false);   /* output IO start */
-
-               /*
-                * If we didn't have the reldesc in our local cache, flush
-                * this page out using the 'blind write' storage manager
-                * routine.  If we did find it, use the standard
-                * interface.
-                */
-
-#ifndef OPTIMIZE_SINGLE
-               SpinRelease(BufMgrLock);
-#endif  /* OPTIMIZE_SINGLE */
-               if (reln == (Relation) NULL)
-               {
-                   status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
-                                      bufHdr->sb_relname, bufdb, bufrel,
-                                         bufHdr->tag.blockNum,
-                                       (char *) MAKE_PTR(bufHdr->data));
                }
                else
                {
-                   status = smgrwrite(DEFAULT_SMGR, reln,
-                                      bufHdr->tag.blockNum,
-                                      (char *) MAKE_PTR(bufHdr->data));
-               }
+                   /*
+                    * To check if block content changed while flushing (see
+                    * below). - vadim 01/17/97
+                    */
+                   WaitIO(bufHdr, BufMgrLock); /* confirm end of IO */
+                   bufHdr->flags &= ~BM_JUST_DIRTIED;
+                   StartBufferIO(bufHdr, false); /* output IO start */
+
+                   /*
+                    * If we didn't have the reldesc in our local cache, write
+                    * this page out using the 'blind write' storage manager
+                    * routine.  If we did find it, use the standard
+                    * interface.
+                    */
 #ifndef OPTIMIZE_SINGLE
-               SpinAcquire(BufMgrLock);
+                   SpinRelease(BufMgrLock);
+#endif  /* OPTIMIZE_SINGLE */
+                   if (reln == (Relation) NULL)
+                   {
+                       status = smgrblindwrt(DEFAULT_SMGR,
+                                             bufHdr->blind.dbname,
+                                             bufHdr->blind.relname,
+                                             bufdb, bufrel,
+                                             bufHdr->tag.blockNum,
+                                             (char *) MAKE_PTR(bufHdr->data));
+                   }
+                   else
+                   {
+                       status = smgrwrite(DEFAULT_SMGR, reln,
+                                          bufHdr->tag.blockNum,
+                                          (char *) MAKE_PTR(bufHdr->data));
+                   }
+#ifndef OPTIMIZE_SINGLE
+                   SpinAcquire(BufMgrLock);
 #endif  /* OPTIMIZE_SINGLE */
 
-               UnpinBuffer(bufHdr);
-               if (status == SM_FAIL)
-               {
-                   bufHdr->flags |= BM_IO_ERROR;
-                   elog(ERROR, "BufferSync: cannot write %u for %s",
-                        bufHdr->tag.blockNum, bufHdr->sb_relname);
+                   UnpinBuffer(bufHdr);
+                   if (status == SM_FAIL)
+                   {
+                       bufHdr->flags |= BM_IO_ERROR;
+                       elog(ERROR, "BufferSync: cannot write %u for %s",
+                            bufHdr->tag.blockNum, bufHdr->blind.relname);
+                   }
+                   bufHdr->flags &= ~BM_IO_IN_PROGRESS; /* mark IO finished */
+                   TerminateBufferIO(bufHdr);  /* Sync IO finished */
+                   BufferFlushCount++;
+                   didwrite = true;
+
+                   /*
+                    * If this buffer was marked by someone as DIRTY while we
+                    * were flushing it out we must not clear DIRTY flag -
+                    * vadim 01/17/97
+                    *
+                    * but it is OK to clear BufferDirtiedByMe - tgl 3/31/00
+                    */
+                   if (!(bufHdr->flags & BM_JUST_DIRTIED))
+                       bufHdr->flags &= ~BM_DIRTY;
                }
-               bufHdr->flags &= ~BM_IO_IN_PROGRESS;    /* mark IO finished */
-               TerminateBufferIO(bufHdr);  /* Sync IO finished */
-               BufferFlushCount++;
 
-               /*
-                * If this buffer was marked by someone as DIRTY while we
-                * were flushing it out we must not clear DIRTY flag -
-                * vadim 01/17/97
-                */
-               if (!(bufHdr->flags & BM_JUST_DIRTIED))
-                   bufHdr->flags &= ~BM_DIRTY;
-               /* drop refcnt from RelationIdCacheGetRelation */
+               /* drop refcnt obtained by RelationIdCacheGetRelation */
                if (reln != (Relation) NULL)
                    RelationDecrementReferenceCount(reln);
            }
        }
+
+       /*
+        * If we did not write the buffer (because someone else did),
+        * we must still fsync the file containing it, to ensure that the
+        * write is down to disk before we commit.
+        */
+       if (! didwrite)
+       {
+#ifndef OPTIMIZE_SINGLE
+           SpinRelease(BufMgrLock);
+#endif  /* OPTIMIZE_SINGLE */
+
+           reln = RelationIdCacheGetRelation(BufferTagLastDirtied[i].relId.relId);
+           if (reln == (Relation) NULL)
+           {
+               status = smgrblindmarkdirty(DEFAULT_SMGR,
+                                           BufferBlindLastDirtied[i].dbname,
+                                           BufferBlindLastDirtied[i].relname,
+                                           BufferTagLastDirtied[i].relId.dbId,
+                                           BufferTagLastDirtied[i].relId.relId,
+                                           BufferTagLastDirtied[i].blockNum);
+           }
+           else
+           {
+               status = smgrmarkdirty(DEFAULT_SMGR, reln,
+                                      BufferTagLastDirtied[i].blockNum);
+               /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+               RelationDecrementReferenceCount(reln);
+
+           }
+#ifndef OPTIMIZE_SINGLE
+           SpinAcquire(BufMgrLock);
+#endif  /* OPTIMIZE_SINGLE */
+       }
+
+       BufferDirtiedByMe[i] = false;
+
+       SpinRelease(BufMgrLock);
    }
-   SpinRelease(BufMgrLock);
 
    LocalBufferSync();
 }
@@ -1166,13 +1334,19 @@ ResetBufferUsage()
 /* ----------------------------------------------
  *     ResetBufferPool
  *
- *     this routine is supposed to be called when a transaction aborts.
+ *     This routine is supposed to be called when a transaction aborts.
  *     it will release all the buffer pins held by the transaction.
+ *     Currently, we also call it during commit if BufferPoolCheckLeak
+ *     detected a problem --- in that case, isCommit is TRUE, and we
+ *     only clean up buffer pin counts.
+ *
+ * During abort, we also forget any pending fsync requests.  Dirtied buffers
+ * will still get written, eventually, but there will be no fsync for them.
  *
  * ----------------------------------------------
  */
 void
-ResetBufferPool()
+ResetBufferPool(bool isCommit)
 {
    int         i;
 
@@ -1193,10 +1367,15 @@ ResetBufferPool()
            SpinRelease(BufMgrLock);
        }
        PrivateRefCount[i] = 0;
-       CommitInfoNeedsSave[i] = 0;
+
+       if (! isCommit)
+           BufferDirtiedByMe[i] = false;
    }
 
    ResetLocalBufferPool();
+
+   if (! isCommit)
+       smgrabort();
 }
 
 /* -----------------------------------------------
@@ -1222,7 +1401,7 @@ BufferPoolCheckLeak()
                 "Buffer Leak: [%03d] (freeNext=%ld, freePrev=%ld, \
 relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)",
                 i - 1, buf->freeNext, buf->freePrev,
-                buf->sb_relname, buf->tag.blockNum, buf->flags,
+                buf->blind.relname, buf->tag.blockNum, buf->flags,
                 buf->refcount, PrivateRefCount[i - 1]);
            result = 1;
        }
@@ -1306,25 +1485,25 @@ BufferGetRelation(Buffer buffer)
 /*
  * BufferReplace
  *
- * Flush the buffer corresponding to 'bufHdr'
+ * Write out the buffer corresponding to 'bufHdr'
  *
+ * This routine used to flush the data to disk (ie, force immediate fsync)
+ * but that's no longer necessary because BufferSync is smarter than before.
+ *
+ * BufMgrLock must be held at entry, and the buffer must be pinned.
  */
 static int
-BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
+BufferReplace(BufferDesc *bufHdr)
 {
    Relation    reln;
    Oid         bufdb,
                bufrel;
    int         status;
 
-   if (!bufferLockHeld)
-       SpinAcquire(BufMgrLock);
-
    /*
     * first try to find the reldesc in the cache, if no luck, don't
     * bother to build the reldesc from scratch, just do a blind write.
     */
-
    bufdb = bufHdr->tag.relId.dbId;
    bufrel = bufHdr->tag.relId.relId;
 
@@ -1336,22 +1515,27 @@ BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
    /* To check if block content changed while flushing. - vadim 01/17/97 */
    bufHdr->flags &= ~BM_JUST_DIRTIED;
 
+#ifndef OPTIMIZE_SINGLE
    SpinRelease(BufMgrLock);
+#endif  /* OPTIMIZE_SINGLE */
 
    if (reln != (Relation) NULL)
    {
-       status = smgrflush(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+       status = smgrwrite(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
                           (char *) MAKE_PTR(bufHdr->data));
    }
    else
    {
-       /* blind write always flushes */
-       status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
-                             bufHdr->sb_relname, bufdb, bufrel,
+       status = smgrblindwrt(DEFAULT_SMGR, bufHdr->blind.dbname,
+                             bufHdr->blind.relname, bufdb, bufrel,
                              bufHdr->tag.blockNum,
                              (char *) MAKE_PTR(bufHdr->data));
    }
 
+#ifndef OPTIMIZE_SINGLE
+   SpinAcquire(BufMgrLock);
+#endif  /* OPTIMIZE_SINGLE */
+
    /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
    if (reln != (Relation) NULL)
        RelationDecrementReferenceCount(reln);
@@ -1359,6 +1543,11 @@ BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
    if (status == SM_FAIL)
        return FALSE;
 
+   /* If we had marked this buffer as needing to be fsync'd, we can forget
+    * about that, because it's now the storage manager's responsibility.
+    */
+   ClearBufferDirtiedByMe(BufferDescriptorGetBuffer(bufHdr), bufHdr);
+
    BufferFlushCount++;
 
    return TRUE;
@@ -1440,7 +1629,7 @@ ReleaseRelationBuffers(Relation rel)
            }
            /* Now we can do what we came for */
            buf->flags &= ~ ( BM_DIRTY | BM_JUST_DIRTIED);
-           CommitInfoNeedsSave[i - 1] = 0;
+           ClearBufferDirtiedByMe(i, buf);
            /*
             * Release any refcount we may have.
             *
@@ -1502,6 +1691,7 @@ DropBuffers(Oid dbid)
            }
            /* Now we can do what we came for */
            buf->flags &= ~ ( BM_DIRTY | BM_JUST_DIRTIED);
+           ClearBufferDirtiedByMe(i, buf);
            /*
             * The thing should be free, if caller has checked that
             * no backends are running in that database.
@@ -1533,7 +1723,7 @@ PrintBufferDescs()
            elog(DEBUG, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
 blockNum=%d, flags=0x%x, refcount=%d %ld)",
                 i, buf->freeNext, buf->freePrev,
-                buf->sb_relname, buf->tag.blockNum, buf->flags,
+                buf->blind.relname, buf->tag.blockNum, buf->flags,
                 buf->refcount, PrivateRefCount[i]);
        }
        SpinRelease(BufMgrLock);
@@ -1544,7 +1734,7 @@ blockNum=%d, flags=0x%x, refcount=%d %ld)",
        for (i = 0; i < NBuffers; ++i, ++buf)
        {
            printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
-                  i, buf->sb_relname, buf->tag.blockNum,
+                  i, buf->blind.relname, buf->tag.blockNum,
                   buf->flags, buf->refcount, PrivateRefCount[i]);
        }
    }
@@ -1562,7 +1752,7 @@ PrintPinnedBufs()
        if (PrivateRefCount[i] > 0)
            elog(NOTICE, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
 blockNum=%d, flags=0x%x, refcount=%d %ld)\n",
-                i, buf->freeNext, buf->freePrev, buf->sb_relname,
+                i, buf->freeNext, buf->freePrev, buf->blind.relname,
                 buf->tag.blockNum, buf->flags,
                 buf->refcount, PrivateRefCount[i]);
    }
@@ -1601,33 +1791,42 @@ BufferPoolBlowaway()
  *     FlushRelationBuffers
  *
  *     This function removes from the buffer pool all pages of a relation
- *     that have blocknumber >= specified block.  If doFlush is true,
- *     dirty buffers are written out --- otherwise it's an error for any
- *     of the buffers to be dirty.
+ *     that have blocknumber >= specified block.  Pages that are dirty are
+ *     written out first.  If expectDirty is false, a notice is emitted
+ *     warning of dirty buffers, but we proceed anyway.  An error code is
+ *     returned if we fail to dump a dirty buffer or if we find one of
+ *     the target pages is pinned into the cache.
  *
  *     This is used by VACUUM before truncating the relation to the given
- *     number of blocks.  For VACUUM, we pass doFlush = false since it would
- *     mean a bug in VACUUM if any of the unwanted pages were still dirty.
- *     (TRUNCATE TABLE also uses it in the same way.)
+ *     number of blocks.  For VACUUM, we pass expectDirty = false since it
+ *     could mean a bug in VACUUM if any of the unwanted pages were still
+ *     dirty.  (TRUNCATE TABLE also uses it in the same way.)
  *
- *     This is also used by RENAME TABLE (with block = 0 and doFlush = true)
+ *     This is also used by RENAME TABLE (with block=0 and expectDirty=true)
  *     to clear out the buffer cache before renaming the physical files of
  *     a relation.  Without that, some other backend might try to do a
- *     blind write of a buffer page (relying on the sb_relname of the buffer)
+ *     blind write of a buffer page (relying on the BlindId of the buffer)
  *     and fail because it's not got the right filename anymore.
  *
  *     In both cases, the caller should be holding AccessExclusiveLock on
  *     the target relation to ensure that no other backend is busy reading
- *     more blocks of the relation...
+ *     more blocks of the relation.
+ *
+ *     Formerly, we considered it an error condition if we found unexpectedly
+ *     dirty buffers.  However, since BufferSync no longer forces out all
+ *     dirty buffers at every xact commit, it's possible for dirty buffers
+ *     to still be present in the cache due to failure of an earlier
+ *     transaction.  So, downgrade the error to a mere notice.  Maybe we
+ *     shouldn't even emit a notice...
  *
- *     Returns: 0 - Ok, -1 - DIRTY, -2 - PINNED
+ *     Returns: 0 - Ok, -1 - FAILED TO WRITE DIRTY BUFFER, -2 - PINNED
  *
  *     XXX currently it sequentially searches the buffer pool, should be
  *     changed to more clever ways of searching.
  * --------------------------------------------------------------------
  */
 int
-FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush)
+FlushRelationBuffers(Relation rel, BlockNumber block, bool expectDirty)
 {
    int         i;
    BufferDesc *buf;
@@ -1642,21 +1841,15 @@ FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush)
            {
                if (buf->flags & BM_DIRTY)
                {
-                   if (doFlush)
-                   {
-                       if (FlushBuffer(-i-1, false) != STATUS_OK)
-                       {
-                           elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
-                                RelationGetRelationName(rel),
-                                block, buf->tag.blockNum);
-                           return -1;
-                       }
-                   }
-                   else
-                   {
+                   if (! expectDirty)
                        elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty",
                             RelationGetRelationName(rel),
                             block, buf->tag.blockNum);
+                   if (FlushBuffer(-i-1, false) != STATUS_OK)
+                   {
+                       elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
+                            RelationGetRelationName(rel),
+                            block, buf->tag.blockNum);
                        return -1;
                    }
                }
@@ -1676,39 +1869,42 @@ FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush)
    SpinAcquire(BufMgrLock);
    for (i = 0; i < NBuffers; i++)
    {
+   recheck:
        buf = &BufferDescriptors[i];
-       if (buf->tag.relId.dbId == MyDatabaseId &&
-           buf->tag.relId.relId == RelationGetRelid(rel) &&
+       if (buf->tag.relId.relId == RelationGetRelid(rel) &&
+           (buf->tag.relId.dbId == MyDatabaseId ||
+            buf->tag.relId.dbId == (Oid) NULL) &&
            buf->tag.blockNum >= block)
        {
            if (buf->flags & BM_DIRTY)
            {
-               if (doFlush)
-               {
-                   SpinRelease(BufMgrLock);
-                   if (FlushBuffer(i+1, false) != STATUS_OK)
-                   {
-                       elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d), could not flush it",
-                            buf->sb_relname, block, buf->tag.blockNum,
-                            PrivateRefCount[i], buf->refcount);
-                       return -1;
-                   }
-                   SpinAcquire(BufMgrLock);
-               }
-               else
-               {
-                   SpinRelease(BufMgrLock);
+               PinBuffer(buf);
+               SpinRelease(BufMgrLock);
+               if (! expectDirty)
                    elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d)",
-                        buf->sb_relname, block, buf->tag.blockNum,
+                        RelationGetRelationName(rel), block,
+                        buf->tag.blockNum,
+                        PrivateRefCount[i], buf->refcount);
+               if (FlushBuffer(i+1, true) != STATUS_OK)
+               {
+                   elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d), could not flush it",
+                        RelationGetRelationName(rel), block,
+                        buf->tag.blockNum,
                         PrivateRefCount[i], buf->refcount);
                    return -1;
                }
+               SpinAcquire(BufMgrLock);
+               /* Buffer could already be reassigned, so must recheck
+                * whether it still belongs to rel before freeing it!
+                */
+               goto recheck;
            }
            if (!(buf->flags & BM_FREE))
            {
                SpinRelease(BufMgrLock);
                elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %ld, global %d)",
-                    buf->sb_relname, block, buf->tag.blockNum,
+                    RelationGetRelationName(rel), block,
+                    buf->tag.blockNum,
                     PrivateRefCount[i], buf->refcount);
                return -2;
            }
@@ -1755,11 +1951,6 @@ ReleaseBuffer(Buffer buffer)
            AddBufferToFreelist(bufHdr);
            bufHdr->flags |= BM_FREE;
        }
-       if (CommitInfoNeedsSave[buffer - 1])
-       {
-           bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
-           CommitInfoNeedsSave[buffer - 1] = 0;
-       }
        SpinRelease(BufMgrLock);
    }
 
@@ -1777,7 +1968,7 @@ IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
 
        fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               buffer, buf->sb_relname, buf->tag.blockNum,
+               buffer, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[buffer - 1], file, line);
    }
 }
@@ -1795,7 +1986,7 @@ ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
 
        fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               buffer, buf->sb_relname, buf->tag.blockNum,
+               buffer, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[buffer - 1], file, line);
    }
 }
@@ -1822,7 +2013,7 @@ ReleaseAndReadBuffer_Debug(char *file,
 
        fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               buffer, buf->sb_relname, buf->tag.blockNum,
+               buffer, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[buffer - 1], file, line);
    }
    if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
@@ -1831,7 +2022,7 @@ refcount = %ld, file: %s, line: %d\n",
 
        fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               b, buf->sb_relname, buf->tag.blockNum,
+               b, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[b - 1], file, line);
    }
    return b;
@@ -1983,11 +2174,43 @@ _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
 
 #endif  /* BMTRACE */
 
+/*
+ * SetBufferCommitInfoNeedsSave
+ *
+ * Mark a buffer dirty when we have updated tuple commit-status bits in it.
+ *
+ * This is similar to WriteNoReleaseBuffer, except that we do not set
+ * SharedBufferChanged or BufferDirtiedByMe, because we have not made a
+ * critical change that has to be flushed to disk before xact commit --- the
+ * status-bit update could be redone by someone else just as easily.  The
+ * buffer will be marked dirty, but it will not be written to disk until
+ * there is another reason to write it.
+ *
+ * This routine might get called many times on the same page, if we are making
+ * the first scan after commit of an xact that added/deleted many tuples.
+ * So, be as quick as we can if the buffer is already dirty.
+ */
 void
 SetBufferCommitInfoNeedsSave(Buffer buffer)
 {
-   if (!BufferIsLocal(buffer))
-       CommitInfoNeedsSave[buffer - 1]++;
+   BufferDesc *bufHdr;
+
+   if (BufferIsLocal(buffer))
+       return;
+
+   if (BAD_BUFFER_ID(buffer))
+       return;
+
+   bufHdr = &BufferDescriptors[buffer - 1];
+
+   if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+       (BM_DIRTY | BM_JUST_DIRTIED))
+   {
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+       SpinRelease(BufMgrLock);
+   }
 }
 
 void
@@ -2175,7 +2398,16 @@ static void StartBufferIO(BufferDesc *buf, bool forInput)
    Assert(!(buf->flags & BM_IO_IN_PROGRESS));
    buf->flags |= BM_IO_IN_PROGRESS;
 #ifdef HAS_TEST_AND_SET
-   Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)))
+   /*
+    * There used to be
+    *
+    * Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
+    *
+    * here, but that's wrong because of the way WaitIO works: someone else
+    * waiting for the I/O to complete will succeed in grabbing the lock for
+    * a few instructions, and if we context-swap back to here the Assert
+    * could fail.  Tiny window for failure, but I've seen it happen -- tgl
+    */
    S_LOCK(&(buf->io_in_progress_lock));
 #endif /* HAS_TEST_AND_SET */
    InProgressBuf = buf;
@@ -2217,7 +2449,7 @@ static void ContinueBufferIO(BufferDesc *buf, bool forInput)
    IsForInput = forInput;
 }
 
-extern void    InitBufferIO(void)
+void InitBufferIO(void)
 {
    InProgressBuf = (BufferDesc *)0;
 }
@@ -2229,7 +2461,7 @@ extern void   InitBufferIO(void)
  * set in case of output,this routine would kill all 
  * backends and reset postmaster.
  */
-extern void    AbortBufferIO(void)
+void AbortBufferIO(void)
 {
    BufferDesc *buf = InProgressBuf;
    if (buf)
@@ -2252,8 +2484,8 @@ extern void   AbortBufferIO(void)
            buf->flags |= BM_DIRTY;
        }
        buf->flags |= BM_IO_ERROR;
-       TerminateBufferIO(buf);
        buf->flags &= ~BM_IO_IN_PROGRESS;
+       TerminateBufferIO(buf);
        SpinRelease(BufMgrLock);
    }
 }
index 7b927886f55fc27bfa7bf69876d96be9335871fe..a77a16c2a0a53c4ce813630575c72eada37aa741 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.20 2000/01/26 05:56:52 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.21 2000/04/09 04:43:19 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -122,7 +122,7 @@ PinBuffer_Debug(char *file, int line, BufferDesc *buf)
 
        fprintf(stderr, "PIN(Pin) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               buffer, buf->sb_relname, buf->tag.blockNum,
+               buffer, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[buffer - 1], file, line);
    }
 }
@@ -168,7 +168,7 @@ UnpinBuffer_Debug(char *file, int line, BufferDesc *buf)
 
        fprintf(stderr, "UNPIN(Unpin) %ld relname = %s, blockNum = %d, \
 refcount = %ld, file: %s, line: %d\n",
-               buffer, buf->sb_relname, buf->tag.blockNum,
+               buffer, buf->blind.relname, buf->tag.blockNum,
                PrivateRefCount[buffer - 1], file, line);
    }
 }
@@ -304,7 +304,7 @@ PrintBufferFreeList()
        int         i = (buf - BufferDescriptors);
 
        printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld, nxt=%ld prv=%ld)\n",
-              i, buf->sb_relname, buf->tag.blockNum,
+              i, buf->blind.relname, buf->tag.blockNum,
               buf->flags, buf->refcount, PrivateRefCount[i],
               buf->freeNext, buf->freePrev);
 
index b11e1d999cce29fe5d84a2f952a81cefdf15fe81..1fb753dbf3a393ff699aded42e0588058849355e 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.54 2000/03/17 02:36:19 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.55 2000/04/09 04:43:19 tgl Exp $
  *
  * NOTES:
  *
@@ -293,7 +293,7 @@ LruDelete(File file)
    vfdP->seekPos = (long) lseek(vfdP->fd, 0L, SEEK_CUR);
    Assert(vfdP->seekPos != -1);
 
-   /* if we have written to the file, sync it */
+   /* if we have written to the file, sync it before closing */
    if (vfdP->fdstate & FD_DIRTY)
    {
        returnValue = pg_fsync(vfdP->fd);
@@ -381,9 +381,6 @@ tryAgain:
            returnValue = lseek(vfdP->fd, vfdP->seekPos, SEEK_SET);
            Assert(returnValue != -1);
        }
-
-       /* Update state as appropriate for re-open (needed?) */
-       vfdP->fdstate &= ~FD_DIRTY;
    }
 
    /*
@@ -804,7 +801,7 @@ FileWrite(File file, char *buffer, int amount)
    if (returnCode > 0)
        VfdCache[file].seekPos += returnCode;
 
-   /* record the write */
+   /* mark the file as needing fsync */
    VfdCache[file].fdstate |= FD_DIRTY;
 
    return returnCode;
@@ -873,6 +870,35 @@ FileTruncate(File file, long offset)
    return returnCode;
 }
 
+/*
+ * FileSync --- if a file is marked as dirty, fsync it.
+ *
+ * The FD_DIRTY bit is slightly misnamed: it doesn't mean that we need to
+ * write the file, but that we *have* written it and need to execute an
+ * fsync() to ensure the changes are down on disk before we mark the current
+ * transaction committed.
+ *
+ * FD_DIRTY is set by FileWrite or by an explicit FileMarkDirty() call.
+ * It is cleared after successfully fsync'ing the file.  FileClose() will
+ * fsync a dirty File that is about to be closed, since there will be no
+ * other place to remember the need to fsync after the VFD is gone.
+ *
+ * Note that the DIRTY bit is logically associated with the actual disk file,
+ * not with any particular kernel FD we might have open for it.  We assume
+ * that fsync will force out any dirty buffers for that file, whether or not
+ * they were written through the FD being used for the fsync call --- they
+ * might even have been written by some other backend!
+ *
+ * Note also that LruDelete currently fsyncs a dirty file that it is about
+ * to close the kernel file descriptor for.  The idea there is to avoid
+ * having to re-open the kernel descriptor later.  But it's not real clear
+ * that this is a performance win; we could end up fsyncing the same file
+ * multiple times in a transaction, which would probably cost more time
+ * than is saved by avoiding an open() call.  This should be studied.
+ *
+ * This routine used to think it could skip the fsync if the file is
+ * physically closed, but that is now WRONG; see comments for FileMarkDirty.
+ */
 int
 FileSync(File file)
 {
@@ -880,23 +906,66 @@ FileSync(File file)
 
    Assert(FileIsValid(file));
 
-   /*
-    * If the file isn't open, then we don't need to sync it; we always
-    * sync files when we close them.  Also, if we haven't done any writes
-    * that we haven't already synced, we can ignore the request.
-    */
-
-   if (VfdCache[file].fd < 0 || !(VfdCache[file].fdstate & FD_DIRTY))
+   if (!(VfdCache[file].fdstate & FD_DIRTY))
+   {
+       /* Need not sync if file is not dirty. */
        returnCode = 0;
-   else
+   }
+   else if (disableFsync)
    {
-       returnCode = pg_fsync(VfdCache[file].fd);
+       /* Don't force the file open if pg_fsync isn't gonna sync it. */
+       returnCode = 0;
        VfdCache[file].fdstate &= ~FD_DIRTY;
    }
+   else 
+   {
+       /* We don't use FileAccess() because we don't want to force the
+        * file to the front of the LRU ring; we aren't expecting to
+        * access it again soon.
+        */
+       if (FileIsNotOpen(file))
+       {
+           returnCode = LruInsert(file);
+           if (returnCode != 0)
+               return returnCode;
+       }
+       returnCode = pg_fsync(VfdCache[file].fd);
+       if (returnCode == 0)
+           VfdCache[file].fdstate &= ~FD_DIRTY;
+   }
 
    return returnCode;
 }
 
+/*
+ * FileMarkDirty --- mark a file as needing fsync at transaction commit.
+ *
+ * Since FileWrite marks the file dirty, this routine is not needed in
+ * normal use.  It is called when the buffer manager detects that some other
+ * backend has written out a shared buffer that this backend dirtied (but
+ * didn't write) in the current xact.  In that scenario, we need to fsync
+ * the file before we can commit.  We cannot assume that the other backend
+ * has fsync'd the file yet; we need to do our own fsync to ensure that
+ * (a) the disk page is written and (b) this backend's commit is delayed
+ * until the write is complete.
+ *
+ * Note we are assuming that an fsync issued by this backend will write
+ * kernel disk buffers that were dirtied by another backend.  Furthermore,
+ * it doesn't matter whether we currently have the file physically open;
+ * we must fsync even if we have to re-open the file to do it.
+ */
+void
+FileMarkDirty(File file)
+{
+   Assert(FileIsValid(file));
+
+   DO_DB(elog(DEBUG, "FileMarkDirty: %d (%s)",
+              file, VfdCache[file].fileName));
+
+   VfdCache[file].fdstate |= FD_DIRTY;
+}
+
+
 /*
  * Routines that want to use stdio (ie, FILE*) should use AllocateFile
  * rather than plain fopen().  This lets fd.c deal with freeing FDs if
@@ -992,6 +1061,12 @@ closeAllVfds()
  * exit (it doesn't particularly care which).  All still-open temporary-file
  * VFDs are closed, which also causes the underlying files to be deleted.
  * Furthermore, all "allocated" stdio files are closed.
+ *
+ * This routine is not involved in fsync'ing non-temporary files at xact
+ * commit; that is done by FileSync under control of the buffer manager.
+ * During a commit, that is done *before* control gets here.  If we still
+ * have any needs-fsync bits set when we get here, we assume this is abort
+ * and clear them.
  */
 void
 AtEOXact_Files(void)
@@ -1006,6 +1081,8 @@ AtEOXact_Files(void)
            if ((VfdCache[i].fdstate & FD_TEMPORARY) &&
                VfdCache[i].fileName != NULL)
                FileClose(i);
+           else
+               VfdCache[i].fdstate &= ~FD_DIRTY;
        }
    }
 
index 7382b1867d1601b1bbb5808d4cf97ecce281632a..233bbb0ac25ef9a11630bdd0f5f820c9c2079d64 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.64 2000/02/07 02:38:18 inoue Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.65 2000/04/09 04:43:20 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 typedef struct _MdfdVec
 {
    int         mdfd_vfd;       /* fd number in vfd pool */
-   uint16      mdfd_flags;     /* clean, dirty, free */
+   int         mdfd_flags;     /* free, temporary */
+
+/* these are the assigned bits in mdfd_flags: */
+#define MDFD_FREE      (1 << 0)/* unused entry */
+#define MDFD_TEMP      (1 << 1)/* close this entry at transaction end */
+
    int         mdfd_lstbcnt;   /* most recent block count */
    int         mdfd_nextFree;  /* next free vector */
 #ifndef LET_OS_MANAGE_FILESIZE
@@ -62,13 +67,13 @@ static int  Md_Free = -1;       /* head of freelist of unused fdvec entries */
 static int CurFd = 0;          /* first never-used fdvec index */
 static MemoryContext MdCxt;        /* context for all my allocations */
 
-#define MDFD_DIRTY     (uint16) 0x01
-#define MDFD_FREE      (uint16) 0x02
-
 /* routines declared here */
+static void mdclose_fd(int fd);
 static int _mdfd_getrelnfd(Relation reln);
 static MdfdVec *_mdfd_openseg(Relation reln, int segno, int oflags);
 static MdfdVec *_mdfd_getseg(Relation reln, int blkno);
+static MdfdVec *_mdfd_blind_getseg(char *dbname, char *relname,
+                                  Oid dbid, Oid relid, int blkno);
 static int _fdvec_alloc(void);
 static void _fdvec_free(int);
 static BlockNumber _mdnblocks(File file, Size blcksz);
@@ -186,6 +191,8 @@ mdcreate(Relation reln)
 #endif
    Md_fdvec[vfd].mdfd_lstbcnt = 0;
 
+   pfree(path);
+
    return vfd;
 }
 
@@ -290,9 +297,6 @@ mdextend(Relation reln, char *buffer)
        return SM_FAIL;
    }
 
-   /* remember that we did a write, so we can sync at xact commit */
-   v->mdfd_flags |= MDFD_DIRTY;
-
    /* try to keep the last block count current, though it's just a hint */
 #ifndef LET_OS_MANAGE_FILESIZE
    if ((v->mdfd_lstbcnt = (++nblocks % RELSEG_SIZE)) == 0)
@@ -367,6 +371,8 @@ mdopen(Relation reln)
 #endif
 #endif
 
+   pfree(path);
+
    return vfd;
 }
 
@@ -382,13 +388,24 @@ int
 mdclose(Relation reln)
 {
    int         fd;
-   MdfdVec    *v;
-   MemoryContext oldcxt;
 
    fd = RelationGetFile(reln);
    if (fd < 0)
        return SM_SUCCESS;      /* already closed, so no work */
 
+   mdclose_fd(fd);
+
+   reln->rd_fd = -1;
+
+   return SM_SUCCESS;
+}
+
+static void
+mdclose_fd(int fd)
+{
+   MdfdVec    *v;
+   MemoryContext oldcxt;
+
    oldcxt = MemoryContextSwitchTo(MdCxt);
 #ifndef LET_OS_MANAGE_FILESIZE
    for (v = &Md_fdvec[fd]; v != (MdfdVec *) NULL;)
@@ -398,17 +415,14 @@ mdclose(Relation reln)
        /* if not closed already */
        if (v->mdfd_vfd >= 0)
        {
-
            /*
             * We sync the file descriptor so that we don't need to reopen
-            * it at transaction commit to force changes to disk.
+            * it at transaction commit to force changes to disk.  (This
+            * is not really optional, because we are about to forget that
+            * the file even exists...)
             */
-
            FileSync(v->mdfd_vfd);
            FileClose(v->mdfd_vfd);
-
-           /* mark this file descriptor as clean in our private table */
-           v->mdfd_flags &= ~MDFD_DIRTY;
        }
        /* Now free vector */
        v = v->mdfd_chain;
@@ -423,28 +437,20 @@ mdclose(Relation reln)
    {
        if (v->mdfd_vfd >= 0)
        {
-
            /*
             * We sync the file descriptor so that we don't need to reopen
-            * it at transaction commit to force changes to disk.
+            * it at transaction commit to force changes to disk.  (This
+            * is not really optional, because we are about to forget that
+            * the file even exists...)
             */
-
            FileSync(v->mdfd_vfd);
            FileClose(v->mdfd_vfd);
-
-           /* mark this file descriptor as clean in our private table */
-           v->mdfd_flags &= ~MDFD_DIRTY;
        }
    }
 #endif
    MemoryContextSwitchTo(oldcxt);
 
    _fdvec_free(fd);
-
-   /* be sure to mark relation closed */
-   reln->rd_fd = -1;
-
-   return SM_SUCCESS;
 }
 
 /*
@@ -521,8 +527,6 @@ mdwrite(Relation reln, BlockNumber blocknum, char *buffer)
    if (FileWrite(v->mdfd_vfd, buffer, BLCKSZ) != BLCKSZ)
        status = SM_FAIL;
 
-   v->mdfd_flags |= MDFD_DIRTY;
-
    return status;
 }
 
@@ -560,14 +564,6 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer)
        || FileSync(v->mdfd_vfd) < 0)
        status = SM_FAIL;
 
-   /*
-    * By here, the block is written and changes have been forced to
-    * stable storage.  Mark the descriptor as clean until the next write,
-    * so we don't sync it again unnecessarily at transaction commit.
-    */
-
-   v->mdfd_flags &= ~MDFD_DIRTY;
-
    return status;
 }
 
@@ -575,139 +571,87 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer)
  * mdblindwrt() -- Write a block to disk blind.
  *
  *     We have to be able to do this using only the name and OID of
- *     the database and relation in which the block belongs.  This
- *     is a synchronous write.
+ *     the database and relation in which the block belongs.  Otherwise
+ *     this is just like mdwrite().
  */
 int
-mdblindwrt(char *dbstr,
-          char *relstr,
+mdblindwrt(char *dbname,
+          char *relname,
           Oid dbid,
           Oid relid,
           BlockNumber blkno,
           char *buffer)
 {
-   int         fd;
-   int         segno;
-   long        seekpos;
    int         status;
-   char       *path;
-
-#ifndef LET_OS_MANAGE_FILESIZE
-   int         nchars;
-
-   /* be sure we have enough space for the '.segno', if any */
-   segno = blkno / RELSEG_SIZE;
-   if (segno > 0)
-       nchars = 10;
-   else
-       nchars = 0;
+   long        seekpos;
+   MdfdVec    *v;
 
-   /* construct the path to the file and open it */
-   /* system table? then put in system area... */
-   if (dbid == (Oid) 0)
-   {
-       path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2 + nchars);
-       if (segno == 0)
-           sprintf(path, "%s/%s", DataDir, relstr);
-       else
-           sprintf(path, "%s/%s.%d", DataDir, relstr, segno);
-   }
-   /* user table? then put in user database area... */
-   else if (dbid == MyDatabaseId)
-   {
-       path = (char *) palloc(strlen(DatabasePath) + 2 * sizeof(NameData) + 2 + nchars);
-       if (segno == 0)
-           sprintf(path, "%s%c%s", DatabasePath, SEP_CHAR, relstr);
-       else
-           sprintf(path, "%s%c%s.%d", DatabasePath, SEP_CHAR, relstr, segno);
-   }
-   else
-/* this is work arround only !!! */
-   {
-       char        dbpath[MAXPGPATH];
-       Oid         id;
-       char       *tmpPath;
-
-       GetRawDatabaseInfo(dbstr, &id, dbpath);
-
-       if (id != dbid)
-           elog(FATAL, "mdblindwrt: oid of db %s is not %u", dbstr, dbid);
-       tmpPath = ExpandDatabasePath(dbpath);
-       if (tmpPath == NULL)
-           elog(FATAL, "mdblindwrt: can't expand path for db %s", dbstr);
-       path = (char *) palloc(strlen(tmpPath) + 2 * sizeof(NameData) + 2 + nchars);
-       if (segno == 0)
-           sprintf(path, "%s%c%s", tmpPath, SEP_CHAR, relstr);
-       else
-           sprintf(path, "%s%c%s.%d", tmpPath, SEP_CHAR, relstr, segno);
-       pfree(tmpPath);
-   }
-#else
-   /* construct the path to the file and open it */
-   /* system table? then put in system area... */
-   if (dbid == (Oid) 0)
-   {
-       path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2);
-       sprintf(path, "%s/%s", DataDir, relstr);
-   }
-   /* user table? then put in user database area... */
-   else if (dbid == MyDatabaseId)
-   {
-       path = (char *) palloc(strlen(DatabasePath) + 2 * sizeof(NameData) + 2);
-       sprintf(path, "%s%c%s", DatabasePath, SEP_CHAR, relstr);
-   }
-   else
-/* this is work arround only !!! */
-   {
-       char        dbpath[MAXPGPATH];
-       Oid         id;
-       char       *tmpPath;
-
-       GetRawDatabaseInfo(dbstr, &id, dbpath);
-
-       if (id != dbid)
-           elog(FATAL, "mdblindwrt: oid of db %s is not %u", dbstr, dbid);
-       tmpPath = ExpandDatabasePath(dbpath);
-       if (tmpPath == NULL)
-           elog(FATAL, "mdblindwrt: can't expand path for db %s", dbstr);
-       path = (char *) palloc(strlen(tmpPath) + 2 * sizeof(NameData) + 2);
-       sprintf(path, "%s%c%s", tmpPath, SEP_CHAR, relstr);
-       pfree(tmpPath);
-   }
-#endif
+   v = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
 
-#ifndef __CYGWIN32__
-   if ((fd = open(path, O_RDWR, 0600)) < 0)
-#else
-   if ((fd = open(path, O_RDWR | O_BINARY, 0600)) < 0)
-#endif
+   if (v == NULL)
        return SM_FAIL;
 
-   /* seek to the right spot */
 #ifndef LET_OS_MANAGE_FILESIZE
    seekpos = (long) (BLCKSZ * (blkno % RELSEG_SIZE));
+#ifdef DIAGNOSTIC
+   if (seekpos >= BLCKSZ * RELSEG_SIZE)
+       elog(FATAL, "seekpos too big!");
+#endif
 #else
    seekpos = (long) (BLCKSZ * (blkno));
 #endif
 
-   if (lseek(fd, seekpos, SEEK_SET) != seekpos)
-   {
-       close(fd);
+   if (FileSeek(v->mdfd_vfd, seekpos, SEEK_SET) != seekpos)
        return SM_FAIL;
-   }
 
    status = SM_SUCCESS;
-
-   /* write and sync the block */
-   if (write(fd, buffer, BLCKSZ) != BLCKSZ || (pg_fsync(fd) < 0))
+   if (FileWrite(v->mdfd_vfd, buffer, BLCKSZ) != BLCKSZ)
        status = SM_FAIL;
 
-   if (close(fd) < 0)
-       status = SM_FAIL;
+   return status;
+}
 
-   pfree(path);
+/*
+ * mdmarkdirty() -- Mark the specified block "dirty" (ie, needs fsync).
+ *
+ *     Returns SM_SUCCESS or SM_FAIL.
+ */
+int
+mdmarkdirty(Relation reln, BlockNumber blkno)
+{
+   MdfdVec    *v;
 
-   return status;
+   v = _mdfd_getseg(reln, blkno);
+
+   FileMarkDirty(v->mdfd_vfd);
+
+   return SM_SUCCESS;
+}
+
+/*
+ * mdblindmarkdirty() -- Mark the specified block "dirty" (ie, needs fsync).
+ *
+ *     We have to be able to do this using only the name and OID of
+ *     the database and relation in which the block belongs.  Otherwise
+ *     this is just like mdmarkdirty().
+ */
+int
+mdblindmarkdirty(char *dbname,
+                char *relname,
+                Oid dbid,
+                Oid relid,
+                BlockNumber blkno)
+{
+   MdfdVec    *v;
+
+   v = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
+
+   if (v == NULL)
+       return SM_FAIL;
+
+   FileMarkDirty(v->mdfd_vfd);
+
+   return SM_SUCCESS;
 }
 
 /*
@@ -873,19 +817,26 @@ mdcommit()
 
    for (i = 0; i < CurFd; i++)
    {
+       v = &Md_fdvec[i];
+       if (v->mdfd_flags & MDFD_FREE)
+           continue;
+       if (v->mdfd_flags & MDFD_TEMP)
+       {
+           /* Sync and close the file */
+           mdclose_fd(i);
+       }
+       else
+       {
+           /* Sync, but keep the file entry */
+
 #ifndef LET_OS_MANAGE_FILESIZE
-       for (v = &Md_fdvec[i]; v != (MdfdVec *) NULL; v = v->mdfd_chain)
+           for ( ; v != (MdfdVec *) NULL; v = v->mdfd_chain)
 #else
-       v = &Md_fdvec[i];
-       if (v != (MdfdVec *) NULL)
+           if (v != (MdfdVec *) NULL)
 #endif
-       {
-           if (v->mdfd_flags & MDFD_DIRTY)
            {
                if (FileSync(v->mdfd_vfd) < 0)
                    return SM_FAIL;
-
-               v->mdfd_flags &= ~MDFD_DIRTY;
            }
        }
    }
@@ -908,13 +859,14 @@ mdabort()
 
    for (i = 0; i < CurFd; i++)
    {
-#ifndef LET_OS_MANAGE_FILESIZE
-       for (v = &Md_fdvec[i]; v != (MdfdVec *) NULL; v = v->mdfd_chain)
-           v->mdfd_flags &= ~MDFD_DIRTY;
-#else
        v = &Md_fdvec[i];
-       v->mdfd_flags &= ~MDFD_DIRTY;
-#endif
+       if (v->mdfd_flags & MDFD_FREE)
+           continue;
+       if (v->mdfd_flags & MDFD_TEMP)
+       {
+           /* Close the file */
+           mdclose_fd(i);
+       }
    }
 
    return SM_SUCCESS;
@@ -995,7 +947,6 @@ _fdvec_free(int fdvec)
    Md_fdvec[fdvec].mdfd_nextFree = Md_Free;
    Md_fdvec[fdvec].mdfd_flags = MDFD_FREE;
    Md_Free = fdvec;
-
 }
 
 static MdfdVec *
@@ -1004,19 +955,17 @@ _mdfd_openseg(Relation reln, int segno, int oflags)
    MemoryContext oldcxt;
    MdfdVec    *v;
    int         fd;
-   bool        dofree;
    char       *path,
               *fullpath;
 
    /* be sure we have enough space for the '.segno', if any */
    path = relpath(RelationGetPhysicalRelationName(reln));
 
-   dofree = false;
    if (segno > 0)
    {
-       dofree = true;
        fullpath = (char *) palloc(strlen(path) + 12);
        sprintf(fullpath, "%s.%d", path, segno);
+       pfree(path);
    }
    else
        fullpath = path;
@@ -1028,8 +977,7 @@ _mdfd_openseg(Relation reln, int segno, int oflags)
    fd = FileNameOpenFile(fullpath, O_RDWR | O_BINARY | oflags, 0600);
 #endif
 
-   if (dofree)
-       pfree(fullpath);
+   pfree(fullpath);
 
    if (fd < 0)
        return (MdfdVec *) NULL;
@@ -1109,6 +1057,104 @@ _mdfd_getseg(Relation reln, int blkno)
    return v;
 }
 
+/* Find the segment of the relation holding the specified block.
+ * This is the same as _mdfd_getseg() except that we must work
+ * "blind" with no Relation struct.
+ *
+ * NOTE: we have no easy way to tell whether a FD already exists for the
+ * target relation, so we always make a new one.  This should probably
+ * be improved somehow, but I doubt it's a significant performance issue
+ * under normal circumstances.  The FD is marked to be closed at end of xact
+ * so that we don't accumulate a lot of dead FDs.
+ */
+
+static MdfdVec *
+_mdfd_blind_getseg(char *dbname, char *relname, Oid dbid, Oid relid,
+                  int blkno)
+{
+   MdfdVec    *v;
+   char       *path;
+   int         fd;
+   int         vfd;
+#ifndef LET_OS_MANAGE_FILESIZE
+   int         segno;
+   int         targsegno;
+#endif
+
+   /* construct the path to the file and open it */
+   path = relpath_blind(dbname, relname, dbid, relid);
+
+#ifndef __CYGWIN32__
+   fd = FileNameOpenFile(path, O_RDWR, 0600);
+#else
+   fd = FileNameOpenFile(path, O_RDWR | O_BINARY, 0600);
+#endif
+
+   if (fd < 0)
+       return NULL;
+
+   vfd = _fdvec_alloc();
+   if (vfd < 0)
+       return NULL;
+
+   Md_fdvec[vfd].mdfd_vfd = fd;
+   Md_fdvec[vfd].mdfd_flags = MDFD_TEMP;
+   Md_fdvec[vfd].mdfd_lstbcnt = _mdnblocks(fd, BLCKSZ);
+#ifndef LET_OS_MANAGE_FILESIZE
+   Md_fdvec[vfd].mdfd_chain = (MdfdVec *) NULL;
+
+#ifdef DIAGNOSTIC
+   if (Md_fdvec[vfd].mdfd_lstbcnt > RELSEG_SIZE)
+       elog(FATAL, "segment too big on relopen!");
+#endif
+
+   targsegno = blkno / RELSEG_SIZE;
+   for (v = &Md_fdvec[vfd], segno = 1; segno <= targsegno; segno++)
+   {
+       char       *segpath;
+       MdfdVec    *newv;
+       MemoryContext oldcxt;
+
+       segpath = (char *) palloc(strlen(path) + 12);
+       sprintf(segpath, "%s.%d", path, segno);
+
+#ifndef __CYGWIN32__
+       fd = FileNameOpenFile(segpath, O_RDWR | O_CREAT, 0600);
+#else
+       fd = FileNameOpenFile(segpath, O_RDWR | O_BINARY | O_CREAT, 0600);
+#endif
+
+       pfree(segpath);
+
+       if (fd < 0)
+           return (MdfdVec *) NULL;
+
+       /* allocate an mdfdvec entry for it */
+       oldcxt = MemoryContextSwitchTo(MdCxt);
+       newv = (MdfdVec *) palloc(sizeof(MdfdVec));
+       MemoryContextSwitchTo(oldcxt);
+
+       /* fill the entry */
+       newv->mdfd_vfd = fd;
+       newv->mdfd_flags = MDFD_TEMP;
+       newv->mdfd_lstbcnt = _mdnblocks(fd, BLCKSZ);
+       newv->mdfd_chain = (MdfdVec *) NULL;
+#ifdef DIAGNOSTIC
+       if (newv->mdfd_lstbcnt > RELSEG_SIZE)
+           elog(FATAL, "segment too big on open!");
+#endif
+       v->mdfd_chain = newv;
+       v = newv;
+   }
+#else
+   v = &Md_fdvec[vfd];
+#endif
+
+   pfree(path);
+
+   return v;
+}
+
 static BlockNumber
 _mdnblocks(File file, Size blcksz)
 {
index f0c20f8219b2fa4c6b8e5dc6a38a80d5119f438d..839636b118b930a413e8d407c7a4eaa887627ff5 100644 (file)
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.32 2000/01/26 05:57:05 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.33 2000/04/09 04:43:20 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,21 +23,30 @@ static void smgrshutdown(int dummy);
 
 typedef struct f_smgr
 {
-   int         (*smgr_init) ();/* may be NULL */
-   int         (*smgr_shutdown) ();    /* may be NULL */
-   int         (*smgr_create) ();
-   int         (*smgr_unlink) ();
-   int         (*smgr_extend) ();
-   int         (*smgr_open) ();
-   int         (*smgr_close) ();
-   int         (*smgr_read) ();
-   int         (*smgr_write) ();
-   int         (*smgr_flush) ();
-   int         (*smgr_blindwrt) ();
-   int         (*smgr_nblocks) ();
-   int         (*smgr_truncate) ();
-   int         (*smgr_commit) ();      /* may be NULL */
-   int         (*smgr_abort) ();       /* may be NULL */
+   int         (*smgr_init) (void); /* may be NULL */
+   int         (*smgr_shutdown) (void); /* may be NULL */
+   int         (*smgr_create) (Relation reln);
+   int         (*smgr_unlink) (Relation reln);
+   int         (*smgr_extend) (Relation reln, char *buffer);
+   int         (*smgr_open) (Relation reln);
+   int         (*smgr_close) (Relation reln);
+   int         (*smgr_read) (Relation reln, BlockNumber blocknum,
+                             char *buffer);
+   int         (*smgr_write) (Relation reln, BlockNumber blocknum,
+                              char *buffer);
+   int         (*smgr_flush) (Relation reln, BlockNumber blocknum,
+                              char *buffer);
+   int         (*smgr_blindwrt) (char *dbname, char *relname,
+                                 Oid dbid, Oid relid,
+                                 BlockNumber blkno, char *buffer);
+   int         (*smgr_markdirty) (Relation reln, BlockNumber blkno);
+   int         (*smgr_blindmarkdirty) (char *dbname, char *relname,
+                                       Oid dbid, Oid relid,
+                                       BlockNumber blkno);
+   int         (*smgr_nblocks) (Relation reln);
+   int         (*smgr_truncate) (Relation reln, int nblocks);
+   int         (*smgr_commit) (void); /* may be NULL */
+   int         (*smgr_abort) (void); /* may be NULL */
 } f_smgr;
 
 /*
@@ -49,14 +58,14 @@ static f_smgr smgrsw[] = {
 
    /* magnetic disk */
    {mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
-       mdread, mdwrite, mdflush, mdblindwrt, mdnblocks, mdtruncate,
-   mdcommit, mdabort},
+    mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
+    mdnblocks, mdtruncate, mdcommit, mdabort},
 
 #ifdef STABLE_MEMORY_STORAGE
    /* main memory */
    {mminit, mmshutdown, mmcreate, mmunlink, mmextend, mmopen, mmclose,
-       mmread, mmwrite, mmflush, mmblindwrt, mmnblocks, NULL,
-   mmcommit, mmabort},
+    mmread, mmwrite, mmflush, mmblindwrt, mmmarkdirty, mmblindmarkdirty,
+    mmnblocks, NULL, mmcommit, mmabort},
 
 #endif
 };
@@ -299,6 +308,7 @@ smgrblindwrt(int16 which,
    char       *relstr;
    int         status;
 
+   /* strdup here is probably redundant */
    dbstr = pstrdup(dbname);
    relstr = pstrdup(relname);
 
@@ -315,6 +325,67 @@ smgrblindwrt(int16 which,
    return status;
 }
 
+/*
+ * smgrmarkdirty() -- Mark a page dirty (needs fsync).
+ *
+ *     Mark the specified page as needing to be fsync'd before commit.
+ *     Ordinarily, the storage manager will do this implicitly during
+ *     smgrwrite().  However, the buffer manager may discover that some
+ *     other backend has written a buffer that we dirtied in the current
+ *     transaction.  In that case, we still need to fsync the file to be
+ *     sure the page is down to disk before we commit.
+ */
+int
+smgrmarkdirty(int16 which,
+             Relation reln,
+             BlockNumber blkno)
+{
+   int         status;
+
+   status = (*(smgrsw[which].smgr_markdirty)) (reln, blkno);
+
+   if (status == SM_FAIL)
+       elog(ERROR, "cannot mark block %d of %s",
+            blkno, RelationGetRelationName(reln));
+
+   return status;
+}
+
+/*
+ * smgrblindmarkdirty() -- Mark a page dirty, "blind".
+ *
+ *     Just like smgrmarkdirty, except we don't have a reldesc.
+ */
+int
+smgrblindmarkdirty(int16 which,
+                  char *dbname,
+                  char *relname,
+                  Oid dbid,
+                  Oid relid,
+                  BlockNumber blkno)
+{
+   char       *dbstr;
+   char       *relstr;
+   int         status;
+
+   /* strdup here is probably redundant */
+   dbstr = pstrdup(dbname);
+   relstr = pstrdup(relname);
+
+   status = (*(smgrsw[which].smgr_blindmarkdirty)) (dbstr, relstr,
+                                                    dbid, relid,
+                                                    blkno);
+
+   if (status == SM_FAIL)
+       elog(ERROR, "cannot mark block %d of %s [%s] blind",
+            blkno, relstr, dbstr);
+
+   pfree(dbstr);
+   pfree(relstr);
+
+   return status;
+}
+
 /*
  * smgrnblocks() -- Calculate the number of POSTGRES blocks in the
  *                  supplied relation.
@@ -378,7 +449,6 @@ smgrcommit()
    return SM_SUCCESS;
 }
 
-#ifdef NOT_USED
 int
 smgrabort()
 {
@@ -396,8 +466,6 @@ smgrabort()
    return SM_SUCCESS;
 }
 
-#endif
-
 #ifdef NOT_USED
 bool
 smgriswo(int16 smgrno)
index 1d99fe567c89b7e70101286766913e2ddbb40716..8eaa1a9c8be83bfa8d92b36b2f420eccce0d2e37 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catalog.h,v 1.10 2000/01/26 05:57:56 momjian Exp $
+ * $Id: catalog.h,v 1.11 2000/04/09 04:43:14 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -17,6 +17,8 @@
 #include "access/tupdesc.h"
 
 extern char *relpath(const char *relname);
+extern char *relpath_blind(const char *dbname, const char *relname,
+                          Oid dbid, Oid relid);
 extern bool IsSystemRelationName(const char *relname);
 extern bool IsSharedSystemRelationName(const char *relname);
 extern Oid newoid(void);
index f18322f169d06cad8fd6fc15a5cfeaabe43f3428..0c5b6d64c0c68da248609090ab8e65bee21d3b96 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: buf_internals.h,v 1.35 2000/01/26 05:58:32 momjian Exp $
+ * $Id: buf_internals.h,v 1.36 2000/04/09 04:43:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -61,6 +61,16 @@ typedef struct buftag
    (a)->relId = (xx_reln)->rd_lockInfo.lockRelId \
 )
 
+/* If we have to write a buffer "blind" (without a relcache entry),
+ * the BufferTag is not enough information.  BufferBlindId carries the
+ * additional information needed.
+ */
+typedef struct bufblindid
+{
+   char        dbname[NAMEDATALEN]; /* name of db in which buf belongs */
+   char        relname[NAMEDATALEN]; /* name of reln */
+} BufferBlindId;
+
 #define BAD_BUFFER_ID(bid) ((bid) < 1 || (bid) > NBuffers)
 #define INVALID_DESCRIPTOR (-3)
 
@@ -98,8 +108,7 @@ typedef struct sbufdesc
    bool        ri_lock;        /* read-intent lock */
    bool        w_lock;         /* context exclusively locked */
 
-   char        sb_dbname[NAMEDATALEN]; /* name of db in which buf belongs */
-   char        sb_relname[NAMEDATALEN];        /* name of reln */
+   BufferBlindId blind;        /* extra info to support blind write */
 } BufferDesc;
 
 /*
@@ -164,7 +173,9 @@ extern BufferDesc *BufferDescriptors;
 extern BufferBlock BufferBlocks;
 extern long *PrivateRefCount;
 extern bits8 *BufferLocks;
-extern long *CommitInfoNeedsSave;
+extern BufferTag *BufferTagLastDirtied;
+extern BufferBlindId *BufferBlindLastDirtied;
+extern bool *BufferDirtiedByMe;
 extern SPINLOCK BufMgrLock;
 
 /* localbuf.c */
index 9c83e04e2ed7ed850354240643511d44006b01c8..e852dfd0520f3b866744b0ded27c36e942e2dfc5 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufmgr.h,v 1.35 2000/03/31 02:43:30 tgl Exp $
+ * $Id: bufmgr.h,v 1.36 2000/04/09 04:43:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -164,7 +164,7 @@ extern int  FlushBuffer(Buffer buffer, bool release);
 extern void InitBufferPool(IPCKey key);
 extern void PrintBufferUsage(FILE *statfp);
 extern void ResetBufferUsage(void);
-extern void ResetBufferPool(void);
+extern void ResetBufferPool(bool isCommit);
 extern int BufferPoolCheckLeak(void);
 extern void FlushBufferPool(void);
 extern BlockNumber BufferGetBlockNumber(Buffer buffer);
index 74fdb94fa036035900efb005136b992469b0bddf..f82fc14cae91072bbf3a1825fce269585c6e73a9 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: fd.h,v 1.19 2000/01/26 05:58:32 momjian Exp $
+ * $Id: fd.h,v 1.20 2000/04/09 04:43:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -15,7 +15,7 @@
 /*
  * calls:
  *
- * File {Close, Read, Write, Seek, Tell, Sync}
+ * File {Close, Read, Write, Seek, Tell, MarkDirty, Sync}
  * {File Name Open, Allocate, Free} File
  *
  * These are NOT JUST RENAMINGS OF THE UNIX ROUTINES.
@@ -58,6 +58,7 @@ extern int    FileWrite(File file, char *buffer, int amount);
 extern long FileSeek(File file, long offset, int whence);
 extern int FileTruncate(File file, long offset);
 extern int FileSync(File file);
+extern void FileMarkDirty(File file);
 
 /* Operations that allow use of regular stdio --- USE WITH CAUTION */
 extern FILE *AllocateFile(char *name, char *mode);
index 2ef2467a098fbc35d85841b917df0cc92aa011d7..053a63196e5b23ce36e1ba4f09808294586d4331 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: smgr.h,v 1.17 2000/01/26 05:58:33 momjian Exp $
+ * $Id: smgr.h,v 1.18 2000/04/09 04:43:18 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -29,17 +29,23 @@ extern int  smgrunlink(int16 which, Relation reln);
 extern int smgrextend(int16 which, Relation reln, char *buffer);
 extern int smgropen(int16 which, Relation reln);
 extern int smgrclose(int16 which, Relation reln);
-extern int smgrread(int16 which, Relation reln, BlockNumber blocknum,
-        char *buffer);
-extern int smgrwrite(int16 which, Relation reln, BlockNumber blocknum,
-         char *buffer);
-extern int smgrflush(int16 which, Relation reln, BlockNumber blocknum,
-         char *buffer);
-extern int smgrblindwrt(int16 which, char *dbname, char *relname, Oid dbid,
-            Oid relid, BlockNumber blkno, char *buffer);
+extern int smgrread(int16 which, Relation reln, BlockNumber blocknum,
+                    char *buffer);
+extern int smgrwrite(int16 which, Relation reln, BlockNumber blocknum,
+                     char *buffer);
+extern int smgrflush(int16 which, Relation reln, BlockNumber blocknum,
+                     char *buffer);
+extern int smgrblindwrt(int16 which, char *dbname, char *relname,
+                        Oid dbid, Oid relid,
+                        BlockNumber blkno, char *buffer);
+extern int smgrmarkdirty(int16 which, Relation reln, BlockNumber blkno);
+extern int smgrblindmarkdirty(int16 which, char *dbname, char *relname,
+                              Oid dbid, Oid relid,
+                              BlockNumber blkno);
 extern int smgrnblocks(int16 which, Relation reln);
 extern int smgrtruncate(int16 which, Relation reln, int nblocks);
 extern int smgrcommit(void);
+extern int smgrabort(void);
 
 
 
@@ -55,8 +61,11 @@ extern int   mdclose(Relation reln);
 extern int mdread(Relation reln, BlockNumber blocknum, char *buffer);
 extern int mdwrite(Relation reln, BlockNumber blocknum, char *buffer);
 extern int mdflush(Relation reln, BlockNumber blocknum, char *buffer);
-extern int mdblindwrt(char *dbstr, char *relstr, Oid dbid, Oid relid,
-          BlockNumber blkno, char *buffer);
+extern int mdblindwrt(char *dbname, char *relname, Oid dbid, Oid relid,
+                      BlockNumber blkno, char *buffer);
+extern int mdmarkdirty(Relation reln, BlockNumber blkno);
+extern int mdblindmarkdirty(char *dbname, char *relname, Oid dbid, Oid relid,
+                            BlockNumber blkno);
 extern int mdnblocks(Relation reln);
 extern int mdtruncate(Relation reln, int nblocks);
 extern int mdcommit(void);
@@ -66,7 +75,6 @@ extern int    mdabort(void);
 extern SPINLOCK MMCacheLock;
 
 extern int mminit(void);
-extern int mmshutdown(void);
 extern int mmcreate(Relation reln);
 extern int mmunlink(Relation reln);
 extern int mmextend(Relation reln, char *buffer);
@@ -75,11 +83,17 @@ extern int  mmclose(Relation reln);
 extern int mmread(Relation reln, BlockNumber blocknum, char *buffer);
 extern int mmwrite(Relation reln, BlockNumber blocknum, char *buffer);
 extern int mmflush(Relation reln, BlockNumber blocknum, char *buffer);
-extern int mmblindwrt(char *dbstr, char *relstr, Oid dbid, Oid relid,
-          BlockNumber blkno, char *buffer);
+extern int mmblindwrt(char *dbname, char *relname, Oid dbid, Oid relid,
+                      BlockNumber blkno, char *buffer);
+extern int mmmarkdirty(Relation reln, BlockNumber blkno);
+extern int mmblindmarkdirty(char *dbname, char *relname, Oid dbid, Oid relid,
+                            BlockNumber blkno);
 extern int mmnblocks(Relation reln);
+extern int mmtruncate(Relation reln, int nblocks);
 extern int mmcommit(void);
 extern int mmabort(void);
+
+extern int mmshutdown(void);
 extern int MMShmemSize(void);
 
 /* smgrtype.c */