*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.203 2005/11/22 18:17:06 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.204 2005/11/26 03:03:07 tgl Exp $
*
*
* INTERFACE ROUTINES
*/
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+ scan->rs_inited = false;
scan->rs_ctup.t_data = NULL;
+ ItemPointerSetInvalid(&scan->rs_ctup.t_self);
scan->rs_cbuf = InvalidBuffer;
+ scan->rs_cblock = InvalidBlockNumber;
/* we don't have a marked position... */
ItemPointerSetInvalid(&(scan->rs_mctid));
+ /* page-at-a-time fields are always invalid when not rs_inited */
+
/*
* copy the scan key, if appropriate
*/
pgstat_count_heap_scan(&scan->rs_pgstat_info);
}
-/* ----------------
- * heapgettup - fetch next heap tuple
- *
- * routine used by heap_getnext() which does most of the
- * real work in scanning tuples.
+/*
+ * heapgetpage - subroutine for heapgettup()
*
- * The passed-in *buffer must be either InvalidBuffer or the pinned
- * current page of the scan. If we have to move to another page,
- * we will unpin this buffer (if valid). On return, *buffer is either
- * InvalidBuffer or the ID of a pinned buffer.
- * ----------------
+ * This routine reads and pins the specified page of the relation.
+ * In page-at-a-time mode it performs additional work, namely determining
+ * which tuples on the page are visible.
*/
static void
-heapgettup(Relation relation,
- int dir,
- HeapTuple tuple,
- Buffer *buffer,
- Snapshot snapshot,
- int nkeys,
- ScanKey key,
- BlockNumber pages)
+heapgetpage(HeapScanDesc scan, BlockNumber page)
{
- ItemId lpp;
+ Buffer buffer;
+ Snapshot snapshot;
Page dp;
- BlockNumber page;
int lines;
+ int ntup;
OffsetNumber lineoff;
- int linesleft;
- ItemPointer tid;
+ ItemId lpp;
+
+ Assert(page < scan->rs_nblocks);
- tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
+ scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf,
+ scan->rs_rd,
+ page);
+ scan->rs_cblock = page;
+
+ if (!scan->rs_pageatatime)
+ return;
+
+ buffer = scan->rs_cbuf;
+ snapshot = scan->rs_snapshot;
/*
- * debugging stuff
- *
- * check validity of arguments, here and for other functions too
- *
- * Note: no locking manipulations needed--this is a local function
+ * We must hold share lock on the buffer content while examining
+ * tuple visibility. Afterwards, however, the tuples we have found
+ * to be visible are guaranteed good as long as we hold the buffer pin.
*/
-#ifdef HEAPDEBUGALL
- if (ItemPointerIsValid(tid))
- elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
- RelationGetRelationName(relation), tid, tid->ip_blkid,
- tid->ip_posid, dir);
- else
- elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
- RelationGetRelationName(relation), tid, dir);
-
- elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
- elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
- relation->rd_rel->relkind, RelationGetRelationName(relation),
- snapshot);
-#endif /* HEAPDEBUGALL */
+ dp = (Page) BufferGetPage(buffer);
+ lines = PageGetMaxOffsetNumber(dp);
+ ntup = 0;
- if (!ItemPointerIsValid(tid))
+ for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
+ lineoff <= lines;
+ lineoff++, lpp++)
{
- Assert(!PointerIsValid(tid));
- tid = NULL;
+ if (ItemIdIsUsed(lpp))
+ {
+ HeapTupleData loctup;
+ bool valid;
+
+ loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+ loctup.t_len = ItemIdGetLength(lpp);
+ ItemPointerSet(&(loctup.t_self), page, lineoff);
+
+ valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
+ if (valid)
+ scan->rs_vistuples[ntup++] = lineoff;
+ }
}
- tuple->t_tableOid = RelationGetRelid(relation);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- /*
- * return null immediately if relation is empty
- */
- if (pages == 0)
+ Assert(ntup <= MaxHeapTuplesPerPage);
+ scan->rs_ntuples = ntup;
+}
+
+/* ----------------
+ * heapgettup - fetch next heap tuple
+ *
+ * Initialize the scan if not already done; then advance to the next
+ * tuple as indicated by "dir"; return the next tuple in scan->rs_ctup,
+ * or set scan->rs_ctup.t_data = NULL if no more tuples.
+ *
+ * dir == 0 means "re-fetch the tuple indicated by scan->rs_ctup".
+ *
+ * Note: the reason nkeys/key are passed separately, even though they are
+ * kept in the scan descriptor, is that the caller may not want us to check
+ * the scankeys.
+ *
+ * Note: when we fall off the end of the scan in either direction, we
+ * reset rs_inited. This means that a further request with the same
+ * scan direction will restart the scan, which is a bit odd, but a
+ * request with the opposite scan direction will start a fresh scan
+ * in the proper direction. The latter is required behavior for cursors,
+ * while the former case is generally undefined behavior in Postgres
+ * so we don't care too much.
+ * ----------------
+ */
+static void
+heapgettup(HeapScanDesc scan,
+ int dir,
+ int nkeys,
+ ScanKey key)
+{
+ HeapTuple tuple = &(scan->rs_ctup);
+ ItemPointer tid = &(tuple->t_self);
+ Snapshot snapshot = scan->rs_snapshot;
+ BlockNumber pages = scan->rs_nblocks;
+ BlockNumber page;
+ Page dp;
+ int lines;
+ OffsetNumber lineoff;
+ int linesleft;
+ ItemId lpp;
+
+ if (!scan->rs_inited)
{
- if (BufferIsValid(*buffer))
- ReleaseBuffer(*buffer);
- *buffer = InvalidBuffer;
- tuple->t_data = NULL;
- return;
+ /*
+ * return null immediately if relation is empty
+ */
+ if (pages == 0)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ {
+ /* resuming scan from tuple indicated by scan->rs_ctup.t_self */
+ Assert(ItemPointerIsValid(tid));
}
/*
if (dir == 0)
{
/*
- * ``no movement'' scan direction: refetch same tuple
+ * ``no movement'' scan direction: refetch prior tuple
*/
- if (tid == NULL)
+ if (!scan->rs_inited)
{
- if (BufferIsValid(*buffer))
- ReleaseBuffer(*buffer);
- *buffer = InvalidBuffer;
+ Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- ItemPointerGetBlockNumber(tid));
-
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+ page = ItemPointerGetBlockNumber(tid);
+ if (page != scan->rs_cblock)
+ heapgetpage(scan, page);
- dp = (Page) BufferGetPage(*buffer);
+ /* Since the tuple was previously fetched, needn't lock page here */
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
lineoff = ItemPointerGetOffsetNumber(tid);
lpp = PageGetItemId(dp, lineoff);
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
return;
}
/*
* reverse scan direction
*/
- if (tid == NULL)
- {
+ if (!scan->rs_inited)
page = pages - 1; /* final page */
- }
else
- {
page = ItemPointerGetBlockNumber(tid); /* current page */
- }
-
- Assert(page < pages);
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- page);
+ if (page != scan->rs_cblock)
+ heapgetpage(scan, page);
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
- dp = (Page) BufferGetPage(*buffer);
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = PageGetMaxOffsetNumber(dp);
- if (tid == NULL)
+
+ if (!scan->rs_inited)
{
lineoff = lines; /* final offnum */
+ scan->rs_inited = true;
}
else
{
/*
* forward scan direction
*/
- if (tid == NULL)
+ if (!scan->rs_inited)
{
page = 0; /* first page */
lineoff = FirstOffsetNumber; /* first offnum */
+ scan->rs_inited = true;
}
else
{
OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
}
- Assert(page < pages);
-
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- page);
+ if (page != scan->rs_cblock)
+ heapgetpage(scan, page);
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
- dp = (Page) BufferGetPage(*buffer);
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = PageGetMaxOffsetNumber(dp);
/* page and lineoff now reference the physically next tid */
}
/* 'dir' is now non-zero */
/*
- * calculate line pointer and number of remaining items to check on this
- * page.
+ * calculate number of remaining items to check on this page
*/
- lpp = PageGetItemId(dp, lineoff);
if (dir < 0)
- linesleft = lineoff - 1;
+ linesleft = lineoff;
else
- linesleft = lines - lineoff;
+ linesleft = lines - lineoff + 1;
/*
* advance the scan until we find a qualifying tuple or run out of stuff
* to scan
*/
+ lpp = PageGetItemId(dp, lineoff);
for (;;)
{
- while (linesleft >= 0)
+ while (linesleft > 0)
{
if (ItemIdIsUsed(lpp))
{
/*
* if current tuple qualifies, return it.
*/
- HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
- snapshot, nkeys, key, valid);
+ valid = HeapTupleSatisfiesVisibility(tuple,
+ snapshot,
+ scan->rs_cbuf);
+
+ if (valid && key != NULL)
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ nkeys, key, valid);
+
if (valid)
{
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return;
}
}
* if we get here, it means we've exhausted the items on this page and
* it's time to move to the next.
*/
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
/*
* return NULL if we've exhausted all the pages
*/
if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
{
- if (BufferIsValid(*buffer))
- ReleaseBuffer(*buffer);
- *buffer = InvalidBuffer;
+ if (BufferIsValid(scan->rs_cbuf))
+ ReleaseBuffer(scan->rs_cbuf);
+ scan->rs_cbuf = InvalidBuffer;
+ scan->rs_cblock = InvalidBlockNumber;
tuple->t_data = NULL;
+ scan->rs_inited = false;
return;
}
page = (dir < 0) ? (page - 1) : (page + 1);
- Assert(page < pages);
+ heapgetpage(scan, page);
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- page);
+ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
- dp = (Page) BufferGetPage(*buffer);
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = PageGetMaxOffsetNumber((Page) dp);
- linesleft = lines - 1;
+ linesleft = lines;
if (dir < 0)
{
lineoff = lines;
}
}
+/* ----------------
+ * heapgettup_pagemode - fetch next heap tuple in page-at-a-time mode
+ *
+ * Same API as heapgettup, but used in page-at-a-time mode
+ *
+ * The internal logic is much the same as heapgettup's too, but there are some
+ * differences: we do not take the buffer content lock (that only needs to
+ * happen inside heapgetpage), and we iterate through just the tuples listed
+ * in rs_vistuples[] rather than all tuples on the page. Notice that
+ * lineindex is 0-based, where the corresponding loop variable lineoff in
+ * heapgettup is 1-based.
+ * ----------------
+ */
+static void
+heapgettup_pagemode(HeapScanDesc scan,
+ int dir,
+ int nkeys,
+ ScanKey key)
+{
+ HeapTuple tuple = &(scan->rs_ctup);
+ ItemPointer tid = &(tuple->t_self);
+ BlockNumber pages = scan->rs_nblocks;
+ BlockNumber page;
+ Page dp;
+ int lines;
+ int lineindex;
+ OffsetNumber lineoff;
+ int linesleft;
+ ItemId lpp;
+
+ if (!scan->rs_inited)
+ {
+ /*
+ * return null immediately if relation is empty
+ */
+ if (pages == 0)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ {
+ /* resuming scan from tuple indicated by scan->rs_ctup.t_self */
+ Assert(ItemPointerIsValid(tid));
+ }
+
+ /*
+ * calculate next starting lineindex, given scan direction
+ */
+ if (dir == 0)
+ {
+ /*
+ * ``no movement'' scan direction: refetch prior tuple
+ */
+ if (!scan->rs_inited)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+
+ page = ItemPointerGetBlockNumber(tid);
+ if (page != scan->rs_cblock)
+ heapgetpage(scan, page);
+
+ /* Since the tuple was previously fetched, needn't lock page here */
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
+ lineoff = ItemPointerGetOffsetNumber(tid);
+ lpp = PageGetItemId(dp, lineoff);
+
+ tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+ tuple->t_len = ItemIdGetLength(lpp);
+
+ /* check that rs_cindex is in sync */
+ Assert(scan->rs_cindex < scan->rs_ntuples);
+ Assert(lineoff == scan->rs_vistuples[scan->rs_cindex]);
+
+ return;
+ }
+ else if (dir < 0)
+ {
+ /*
+ * reverse scan direction
+ */
+ if (!scan->rs_inited)
+ page = pages - 1; /* final page */
+ else
+ page = ItemPointerGetBlockNumber(tid); /* current page */
+
+ if (page != scan->rs_cblock)
+ heapgetpage(scan, page);
+
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
+ lines = scan->rs_ntuples;
+
+ if (!scan->rs_inited)
+ {
+ lineindex = lines - 1;
+ scan->rs_inited = true;
+ }
+ else
+ {
+ lineindex = scan->rs_cindex - 1;
+ }
+ /* page and lineindex now reference the previous visible tid */
+ }
+ else
+ {
+ /*
+ * forward scan direction
+ */
+ if (!scan->rs_inited)
+ {
+ page = 0; /* first page */
+ lineindex = 0;
+ scan->rs_inited = true;
+ }
+ else
+ {
+ page = ItemPointerGetBlockNumber(tid); /* current page */
+ lineindex = scan->rs_cindex + 1;
+ }
+
+ if (page != scan->rs_cblock)
+ heapgetpage(scan, page);
+
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
+ lines = scan->rs_ntuples;
+ /* page and lineindex now reference the next visible tid */
+ }
+
+ /* 'dir' is now non-zero */
+
+ /*
+ * calculate number of remaining items to check on this page
+ */
+ if (dir < 0)
+ linesleft = lineindex + 1;
+ else
+ linesleft = lines - lineindex;
+
+ /*
+ * advance the scan until we find a qualifying tuple or run out of stuff
+ * to scan
+ */
+ for (;;)
+ {
+ while (linesleft > 0)
+ {
+ lineoff = scan->rs_vistuples[lineindex];
+ lpp = PageGetItemId(dp, lineoff);
+ Assert(ItemIdIsUsed(lpp));
+
+ tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+ tuple->t_len = ItemIdGetLength(lpp);
+ ItemPointerSet(&(tuple->t_self), page, lineoff);
+
+ /*
+ * if current tuple qualifies, return it.
+ */
+ if (key != NULL)
+ {
+ bool valid;
+
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ nkeys, key, valid);
+ if (valid)
+ {
+ scan->rs_cindex = lineindex;
+ return;
+ }
+ }
+ else
+ {
+ scan->rs_cindex = lineindex;
+ return;
+ }
+
+ /*
+ * otherwise move to the next item on the page
+ */
+ --linesleft;
+ if (dir < 0)
+ {
+ --lineindex;
+ }
+ else
+ {
+ ++lineindex;
+ }
+ }
+
+ /*
+ * if we get here, it means we've exhausted the items on this page and
+ * it's time to move to the next.
+ */
+
+ /*
+ * return NULL if we've exhausted all the pages
+ */
+ if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
+ {
+ if (BufferIsValid(scan->rs_cbuf))
+ ReleaseBuffer(scan->rs_cbuf);
+ scan->rs_cbuf = InvalidBuffer;
+ scan->rs_cblock = InvalidBlockNumber;
+ tuple->t_data = NULL;
+ scan->rs_inited = false;
+ return;
+ }
+
+ page = (dir < 0) ? (page - 1) : (page + 1);
+
+ heapgetpage(scan, page);
+
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
+ lines = scan->rs_ntuples;
+ linesleft = lines;
+ if (dir < 0)
+ lineindex = lines - 1;
+ else
+ lineindex = 0;
+ }
+}
+
#if defined(DISABLE_COMPLEX_MACRO)
/*
scan->rs_snapshot = snapshot;
scan->rs_nkeys = nkeys;
+ /*
+ * we can use page-at-a-time mode if it's an MVCC-safe snapshot
+ */
+ scan->rs_pageatatime = IsMVCCSnapshot(snapshot);
+
+ /* we only need to set this up once */
+ scan->rs_ctup.t_tableOid = RelationGetRelid(relation);
+
/*
* we do this here instead of in initscan() because heap_rescan also calls
* initscan() and we don't want to allocate memory again
/*
* Note: we depend here on the -1/0/1 encoding of ScanDirection.
*/
- heapgettup(scan->rs_rd,
- (int) direction,
- &(scan->rs_ctup),
- &(scan->rs_cbuf),
- scan->rs_snapshot,
- scan->rs_nkeys,
- scan->rs_key,
- scan->rs_nblocks);
+ if (scan->rs_pageatatime)
+ heapgettup_pagemode(scan, (int) direction,
+ scan->rs_nkeys, scan->rs_key);
+ else
+ heapgettup(scan, (int) direction,
+ scan->rs_nkeys, scan->rs_key);
- if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
+ if (scan->rs_ctup.t_data == NULL)
{
HEAPDEBUG_2; /* heap_getnext returning EOS */
return NULL;
* if we get here it means we have a new current scan tuple, so point to
* the proper return buffer and return the tuple.
*/
-
HEAPDEBUG_3; /* heap_getnext returning tuple */
- if (scan->rs_ctup.t_data != NULL)
- pgstat_count_heap_getnext(&scan->rs_pgstat_info);
+ pgstat_count_heap_getnext(&scan->rs_pgstat_info);
- return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
+ return &(scan->rs_ctup);
}
/*
/*
* check time qualification of tuple, then release lock
*/
- HeapTupleSatisfies(tuple, relation, buffer, dp,
- snapshot, 0, NULL, valid);
+ valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
* Check time qualification of tuple; if visible, set it as the new
* result candidate.
*/
- HeapTupleSatisfies(&tp, relation, buffer, dp,
- snapshot, 0, NULL, valid);
+ valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
if (valid)
*tid = ctid;
/* Note: no locking manipulations needed */
if (scan->rs_ctup.t_data != NULL)
+ {
scan->rs_mctid = scan->rs_ctup.t_self;
+ if (scan->rs_pageatatime)
+ scan->rs_mindex = scan->rs_cindex;
+ }
else
ItemPointerSetInvalid(&scan->rs_mctid);
}
{
/* XXX no amrestrpos checking that ammarkpos called */
- /* Note: no locking manipulations needed */
-
- /*
- * unpin scan buffers
- */
- if (BufferIsValid(scan->rs_cbuf))
- ReleaseBuffer(scan->rs_cbuf);
- scan->rs_cbuf = InvalidBuffer;
-
if (!ItemPointerIsValid(&scan->rs_mctid))
{
scan->rs_ctup.t_data = NULL;
+ /*
+ * unpin scan buffers
+ */
+ if (BufferIsValid(scan->rs_cbuf))
+ ReleaseBuffer(scan->rs_cbuf);
+ scan->rs_cbuf = InvalidBuffer;
+ scan->rs_cblock = InvalidBlockNumber;
}
else
{
+ /*
+ * If we reached end of scan, rs_inited will now be false. We must
+ * reset it to true to keep heapgettup from doing the wrong thing.
+ */
+ scan->rs_inited = true;
scan->rs_ctup.t_self = scan->rs_mctid;
- scan->rs_ctup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */
- heapgettup(scan->rs_rd,
- 0,
- &(scan->rs_ctup),
- &(scan->rs_cbuf),
- scan->rs_snapshot,
- 0,
- NULL,
- scan->rs_nblocks);
+ if (scan->rs_pageatatime)
+ {
+ scan->rs_cindex = scan->rs_mindex;
+ heapgettup_pagemode(scan,
+ 0, /* "no movement" */
+ 0, /* needn't recheck scan keys */
+ NULL);
+ }
+ else
+ heapgettup(scan,
+ 0, /* "no movement" */
+ 0, /* needn't recheck scan keys */
+ NULL);
}
}
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeBitmapHeapscan.c,v 1.5 2005/11/25 04:24:48 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeBitmapHeapscan.c,v 1.6 2005/11/26 03:03:07 tgl Exp $
*
*-------------------------------------------------------------------------
*/
static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
+static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres);
/* ----------------------------------------------------------------
{
EState *estate;
ExprContext *econtext;
- HeapScanDesc scandesc;
+ HeapScanDesc scan;
Index scanrelid;
TIDBitmap *tbm;
TBMIterateResult *tbmres;
estate = node->ss.ps.state;
econtext = node->ss.ps.ps_ExprContext;
slot = node->ss.ss_ScanTupleSlot;
- scandesc = node->ss.ss_currentScanDesc;
+ scan = node->ss.ss_currentScanDesc;
scanrelid = ((BitmapHeapScan *) node->ss.ps.plan)->scan.scanrelid;
tbm = node->tbm;
tbmres = node->tbmres;
for (;;)
{
+ Page dp;
+ ItemId lp;
+
/*
* Get next page of results if needed
*/
* AccessShareLock before performing any of the indexscans, but
* let's be safe.)
*/
- if (tbmres->blockno >= scandesc->rs_nblocks)
+ if (tbmres->blockno >= scan->rs_nblocks)
{
node->tbmres = tbmres = NULL;
continue;
}
/*
- * Acquire pin on the current heap page. We'll hold the pin until
- * done looking at the page. We trade in any pin we held before.
+ * Fetch the current heap page and identify candidate tuples.
*/
- scandesc->rs_cbuf = ReleaseAndReadBuffer(scandesc->rs_cbuf,
- scandesc->rs_rd,
- tbmres->blockno);
-
- /*
- * Determine how many entries we need to look at on this page. If
- * the bitmap is lossy then we need to look at each physical item
- * pointer; otherwise we just look through the offsets listed in
- * tbmres.
- */
- if (tbmres->ntuples >= 0)
- {
- /* non-lossy case */
- node->minslot = 0;
- node->maxslot = tbmres->ntuples - 1;
- }
- else
- {
- /* lossy case */
- Page dp;
-
- LockBuffer(scandesc->rs_cbuf, BUFFER_LOCK_SHARE);
- dp = (Page) BufferGetPage(scandesc->rs_cbuf);
-
- node->minslot = FirstOffsetNumber;
- node->maxslot = PageGetMaxOffsetNumber(dp);
-
- LockBuffer(scandesc->rs_cbuf, BUFFER_LOCK_UNLOCK);
- }
+ bitgetpage(scan, tbmres);
/*
- * Set curslot to first slot to examine
+ * Set rs_cindex to first slot to examine
*/
- node->curslot = node->minslot;
+ scan->rs_cindex = 0;
}
else
{
/*
- * Continuing in previously obtained page; advance curslot
+ * Continuing in previously obtained page; advance rs_cindex
*/
- node->curslot++;
+ scan->rs_cindex++;
}
/*
* Out of range? If so, nothing more to look at on this page
*/
- if (node->curslot < node->minslot || node->curslot > node->maxslot)
+ if (scan->rs_cindex < 0 || scan->rs_cindex >= scan->rs_ntuples)
{
node->tbmres = tbmres = NULL;
continue;
}
/*
- * Okay to try to fetch the tuple
+ * Okay to fetch the tuple
+ */
+ targoffset = scan->rs_vistuples[scan->rs_cindex];
+ dp = (Page) BufferGetPage(scan->rs_cbuf);
+ lp = PageGetItemId(dp, targoffset);
+ Assert(ItemIdIsUsed(lp));
+
+ scan->rs_ctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
+ scan->rs_ctup.t_len = ItemIdGetLength(lp);
+ ItemPointerSet(&scan->rs_ctup.t_self, tbmres->blockno, targoffset);
+
+ pgstat_count_heap_fetch(&scan->rs_pgstat_info);
+
+ /*
+ * Set up the result slot to point to this tuple. Note that the
+ * slot acquires a pin on the buffer.
*/
+ ExecStoreTuple(&scan->rs_ctup,
+ slot,
+ scan->rs_cbuf,
+ false);
+
+ /*
+ * If we are using lossy info, we have to recheck the qual
+ * conditions at every tuple.
+ */
+ if (tbmres->ntuples < 0)
+ {
+ econtext->ecxt_scantuple = slot;
+ ResetExprContext(econtext);
+
+ if (!ExecQual(node->bitmapqualorig, econtext, false))
+ {
+ /* Fails recheck, so drop it and loop back for another */
+ ExecClearTuple(slot);
+ continue;
+ }
+ }
+
+ /* OK to return this tuple */
+ return slot;
+ }
+
+ /*
+ * if we get here it means we are at the end of the scan..
+ */
+ return ExecClearTuple(slot);
+}
+
+/*
+ * bitgetpage - subroutine for BitmapHeapNext()
+ *
+ * This routine reads and pins the specified page of the relation, then
+ * builds an array indicating which tuples on the page are both potentially
+ * interesting according to the bitmap, and visible according to the snapshot.
+ */
+static void
+bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
+{
+ BlockNumber page = tbmres->blockno;
+ Buffer buffer;
+ Snapshot snapshot;
+ Page dp;
+ int ntup;
+ int curslot;
+ int minslot;
+ int maxslot;
+ int maxoff;
+
+ /*
+ * Acquire pin on the target heap page, trading in any pin we held before.
+ */
+ Assert(page < scan->rs_nblocks);
+
+ scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf,
+ scan->rs_rd,
+ page);
+ buffer = scan->rs_cbuf;
+ snapshot = scan->rs_snapshot;
+
+ /*
+ * We must hold share lock on the buffer content while examining
+ * tuple visibility. Afterwards, however, the tuples we have found
+ * to be visible are guaranteed good as long as we hold the buffer pin.
+ */
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+ dp = (Page) BufferGetPage(buffer);
+ maxoff = PageGetMaxOffsetNumber(dp);
+
+ /*
+ * Determine how many entries we need to look at on this page. If
+ * the bitmap is lossy then we need to look at each physical item
+ * pointer; otherwise we just look through the offsets listed in
+ * tbmres.
+ */
+ if (tbmres->ntuples >= 0)
+ {
+ /* non-lossy case */
+ minslot = 0;
+ maxslot = tbmres->ntuples - 1;
+ }
+ else
+ {
+ /* lossy case */
+ minslot = FirstOffsetNumber;
+ maxslot = maxoff;
+ }
+
+ ntup = 0;
+ for (curslot = minslot; curslot <= maxslot; curslot++)
+ {
+ OffsetNumber targoffset;
+ ItemId lp;
+ HeapTupleData loctup;
+ bool valid;
+
if (tbmres->ntuples >= 0)
{
/* non-lossy case */
- targoffset = tbmres->offsets[node->curslot];
+ targoffset = tbmres->offsets[curslot];
}
else
{
/* lossy case */
- targoffset = (OffsetNumber) node->curslot;
+ targoffset = (OffsetNumber) curslot;
}
- ItemPointerSet(&scandesc->rs_ctup.t_self, tbmres->blockno, targoffset);
-
/*
- * Fetch the heap tuple and see if it matches the snapshot. We use
- * heap_release_fetch to avoid useless bufmgr traffic.
+ * We'd better check for out-of-range offnum in case of VACUUM since
+ * the TID was obtained.
*/
- if (heap_release_fetch(scandesc->rs_rd,
- scandesc->rs_snapshot,
- &scandesc->rs_ctup,
- &scandesc->rs_cbuf,
- true,
- &scandesc->rs_pgstat_info))
- {
- /*
- * Set up the result slot to point to this tuple. Note that the
- * slot acquires a pin on the buffer.
- */
- ExecStoreTuple(&scandesc->rs_ctup,
- slot,
- scandesc->rs_cbuf,
- false);
+ if (targoffset < FirstOffsetNumber || targoffset > maxoff)
+ continue;
- /*
- * If we are using lossy info, we have to recheck the qual
- * conditions at every tuple.
- */
- if (tbmres->ntuples < 0)
- {
- econtext->ecxt_scantuple = slot;
- ResetExprContext(econtext);
-
- if (!ExecQual(node->bitmapqualorig, econtext, false))
- {
- /* Fails recheck, so drop it and loop back for another */
- ExecClearTuple(slot);
- continue;
- }
- }
+ lp = PageGetItemId(dp, targoffset);
- /* OK to return this tuple */
- return slot;
- }
+ /*
+ * Must check for deleted tuple.
+ */
+ if (!ItemIdIsUsed(lp))
+ continue;
/*
- * Failed the snap, so loop back and try again.
+ * check time qualification of tuple, remember it if valid
*/
+ loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
+ loctup.t_len = ItemIdGetLength(lp);
+ ItemPointerSet(&(loctup.t_self), page, targoffset);
+
+ valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
+ if (valid)
+ scan->rs_vistuples[ntup++] = targoffset;
}
- /*
- * if we get here it means we are at the end of the scan..
- */
- return ExecClearTuple(slot);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+ Assert(ntup <= MaxHeapTuplesPerPage);
+ scan->rs_ntuples = ntup;
}
/* ----------------------------------------------------------------
Oid reloid;
Relation currentRelation;
+ /*
+ * Assert caller didn't ask for an unsafe snapshot --- see comments
+ * at head of file.
+ */
+ Assert(IsMVCCSnapshot(estate->es_snapshot));
+
/*
* create state structure
*/