pgstat_report_wait_end();
pgstat_progress_end_command();
- /* Clean up buffer I/O and buffer context locks, too */
- AbortBufferIO();
+ /* Clean up buffer context locks, too */
UnlockBuffers();
/* Reset WAL record construction state */
pgstat_report_wait_end();
pgstat_progress_end_command();
- AbortBufferIO();
UnlockBuffers();
/* Reset WAL record construction state */
*/
LWLockReleaseAll();
pgstat_report_wait_end();
- AbortBufferIO();
UnlockBuffers();
/* this is probably dead code, but let's be safe: */
if (AuxProcessResourceOwner)
*/
LWLockReleaseAll();
ConditionVariableCancelSleep();
- AbortBufferIO();
UnlockBuffers();
ReleaseAuxProcessResources(false);
AtEOXact_Buffers(false);
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
- AbortBufferIO();
UnlockBuffers();
ReleaseAuxProcessResources(false);
AtEOXact_Buffers(false);
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
- AbortBufferIO();
UnlockBuffers();
ReleaseAuxProcessResources(false);
AtEOXact_Buffers(false);
int bgwriter_flush_after = DEFAULT_BGWRITER_FLUSH_AFTER;
int backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER;
-/* local state for StartBufferIO and related functions */
-static BufferDesc *InProgressBuf = NULL;
-static bool IsForInput;
-
/* local state for LockBufferForCleanup */
static BufferDesc *PinCountWaitBuf = NULL;
static void
AtProcExit_Buffers(int code, Datum arg)
{
- AbortBufferIO();
UnlockBuffers();
CheckForBufferLeaks();
{
uint32 buf_state;
- Assert(!InProgressBuf);
+ ResourceOwnerEnlargeBufferIOs(CurrentResourceOwner);
for (;;)
{
buf_state |= BM_IO_IN_PROGRESS;
UnlockBufHdr(buf, buf_state);
- InProgressBuf = buf;
- IsForInput = forInput;
+ ResourceOwnerRememberBufferIO(CurrentResourceOwner,
+ BufferDescriptorGetBuffer(buf));
return true;
}
{
uint32 buf_state;
- Assert(buf == InProgressBuf);
-
buf_state = LockBufHdr(buf);
Assert(buf_state & BM_IO_IN_PROGRESS);
buf_state |= set_flag_bits;
UnlockBufHdr(buf, buf_state);
- InProgressBuf = NULL;
+ ResourceOwnerForgetBufferIO(CurrentResourceOwner,
+ BufferDescriptorGetBuffer(buf));
ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
}
/*
- * AbortBufferIO: Clean up any active buffer I/O after an error.
+ * AbortBufferIO: Clean up active buffer I/O after an error.
*
* All LWLocks we might have held have been released,
* but we haven't yet released buffer pins, so the buffer is still pinned.
* possible the error condition wasn't related to the I/O.
*/
void
-AbortBufferIO(void)
+AbortBufferIO(Buffer buf)
{
- BufferDesc *buf = InProgressBuf;
+ BufferDesc *buf_hdr = GetBufferDescriptor(buf - 1);
+ uint32 buf_state;
- if (buf)
- {
- uint32 buf_state;
+ buf_state = LockBufHdr(buf_hdr);
+ Assert(buf_state & (BM_IO_IN_PROGRESS | BM_TAG_VALID));
- buf_state = LockBufHdr(buf);
- Assert(buf_state & BM_IO_IN_PROGRESS);
- if (IsForInput)
- {
- Assert(!(buf_state & BM_DIRTY));
+ if (!(buf_state & BM_VALID))
+ {
+ Assert(!(buf_state & BM_DIRTY));
+ UnlockBufHdr(buf_hdr, buf_state);
+ }
+ else
+ {
+ Assert(!(buf_state & BM_DIRTY));
+ UnlockBufHdr(buf_hdr, buf_state);
- /* We'd better not think buffer is valid yet */
- Assert(!(buf_state & BM_VALID));
- UnlockBufHdr(buf, buf_state);
- }
- else
+ /* Issue notice if this is not the first failure... */
+ if (buf_state & BM_IO_ERROR)
{
- Assert(buf_state & BM_DIRTY);
- UnlockBufHdr(buf, buf_state);
- /* Issue notice if this is not the first failure... */
- if (buf_state & BM_IO_ERROR)
- {
- /* Buffer is pinned, so we can read tag without spinlock */
- char *path;
-
- path = relpathperm(BufTagGetRelFileLocator(&buf->tag),
- BufTagGetForkNum(&buf->tag));
- ereport(WARNING,
- (errcode(ERRCODE_IO_ERROR),
- errmsg("could not write block %u of %s",
- buf->tag.blockNum, path),
- errdetail("Multiple failures --- write error might be permanent.")));
- pfree(path);
- }
+ /* Buffer is pinned, so we can read tag without spinlock */
+ char *path;
+
+ path = relpathperm(BufTagGetRelFileLocator(&buf_hdr->tag),
+ BufTagGetForkNum(&buf_hdr->tag));
+ ereport(WARNING,
+ (errcode(ERRCODE_IO_ERROR),
+ errmsg("could not write block %u of %s",
+ buf_hdr->tag.blockNum, path),
+ errdetail("Multiple failures --- write error might be permanent.")));
+ pfree(path);
}
- TerminateBufferIO(buf, false, BM_IO_ERROR);
}
+
+ TerminateBufferIO(buf_hdr, false, BM_IO_ERROR);
}
/*
/* We have built-in support for remembering: */
ResourceArray bufferarr; /* owned buffers */
+ ResourceArray bufferioarr; /* in-progress buffer IO */
ResourceArray catrefarr; /* catcache references */
ResourceArray catlistrefarr; /* catcache-list pins */
ResourceArray relrefarr; /* relcache references */
}
ResourceArrayInit(&(owner->bufferarr), BufferGetDatum(InvalidBuffer));
+ ResourceArrayInit(&(owner->bufferioarr), BufferGetDatum(InvalidBuffer));
ResourceArrayInit(&(owner->catrefarr), PointerGetDatum(NULL));
ResourceArrayInit(&(owner->catlistrefarr), PointerGetDatum(NULL));
ResourceArrayInit(&(owner->relrefarr), PointerGetDatum(NULL));
if (phase == RESOURCE_RELEASE_BEFORE_LOCKS)
{
+ /*
+ * Abort failed buffer IO. AbortBufferIO()->TerminateBufferIO() calls
+ * ResourceOwnerForgetBufferIOs(), so we just have to iterate till
+ * there are none.
+ *
+ * Needs to be before we release buffer pins.
+ *
+ * During a commit, there shouldn't be any in-progress IO.
+ */
+ while (ResourceArrayGetAny(&(owner->bufferioarr), &foundres))
+ {
+ Buffer res = DatumGetBuffer(foundres);
+
+ if (isCommit)
+ elog(PANIC, "lost track of buffer IO on buffer %u", res);
+ AbortBufferIO(res);
+ }
+
/*
* Release buffer pins. Note that ReleaseBuffer will remove the
* buffer entry from our array, so we just have to iterate till there
/* And it better not own any resources, either */
Assert(owner->bufferarr.nitems == 0);
+ Assert(owner->bufferioarr.nitems == 0);
Assert(owner->catrefarr.nitems == 0);
Assert(owner->catlistrefarr.nitems == 0);
Assert(owner->relrefarr.nitems == 0);
/* And free the object. */
ResourceArrayFree(&(owner->bufferarr));
+ ResourceArrayFree(&(owner->bufferioarr));
ResourceArrayFree(&(owner->catrefarr));
ResourceArrayFree(&(owner->catlistrefarr));
ResourceArrayFree(&(owner->relrefarr));
buffer, owner->name);
}
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * buffer array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeBufferIOs(ResourceOwner owner)
+{
+ /* We used to allow pinning buffers without a resowner, but no more */
+ Assert(owner != NULL);
+ ResourceArrayEnlarge(&(owner->bufferioarr));
+}
+
+/*
+ * Remember that a buffer IO is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeBufferIOs()
+ */
+void
+ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer)
+{
+ ResourceArrayAdd(&(owner->bufferioarr), BufferGetDatum(buffer));
+}
+
+/*
+ * Forget that a buffer IO is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
+{
+ if (!ResourceArrayRemove(&(owner->bufferioarr), BufferGetDatum(buffer)))
+ elog(PANIC, "buffer IO %d is not owned by resource owner %s",
+ buffer, owner->name);
+}
+
/*
* Remember that a Local Lock is owned by a ResourceOwner
*
extern bool IsBufferCleanupOK(Buffer buffer);
extern bool HoldingBufferPinThatDelaysRecovery(void);
-extern void AbortBufferIO(void);
+extern void AbortBufferIO(Buffer buffer);
extern bool BgBufferSync(struct WritebackContext *wb_context);
extern void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer);
extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
+/* support for IO-in-progress management */
+extern void ResourceOwnerEnlargeBufferIOs(ResourceOwner owner);
+extern void ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer);
+extern void ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer);
+
/* support for local lock management */
extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock);
extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);