Shrinking and other things.
authorVadim B. Mikheev
Wed, 27 Nov 1996 07:27:20 +0000 (07:27 +0000)
committerVadim B. Mikheev
Wed, 27 Nov 1996 07:27:20 +0000 (07:27 +0000)
src/backend/commands/vacuum.c

index 7fe8648e1ec974fb7f37171e54f973dd9ee39473..06140f7c277e9f3dfe902c5445d39853812696e3 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.8 1996/11/06 08:21:40 scrappy Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.9 1996/11/27 07:27:20 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include 
 #include 
 #include 
+#include 
 #include 
 #include 
 #include 
 #include 
 #include 
 #include 
+#include 
 #include 
 #include 
+#include "storage/shmem.h"
 #ifdef NEED_RUSAGE
 # include 
 #else /* NEED_RUSAGE */
 
 bool VacuumRunning =   false;
 
+#ifdef VACUUM_QUIET
+static int MESSLEV = DEBUG;
+#else
+static int MESSLEV = NOTICE;
+#endif
+
+typedef struct {
+    FuncIndexInfo  finfo;
+    FuncIndexInfo  *finfoP;
+    IndexTupleForm tform;
+    int            natts;
+} IndDesc;
+
 /* non-export function prototypes */
 static void _vc_init(void);
 static void _vc_shutdown(void);
 static void _vc_vacuum(NameData *VacRelP);
 static VRelList _vc_getrels(Portal p, NameData *VacRelP);
-static void _vc_vacone(Portal p, VRelList curvrl);
-static void _vc_vacheap(Portal p, VRelList curvrl, Relation onerel);
-static void _vc_vacindices(VRelList curvrl, Relation onerel);
-static void _vc_vaconeind(VRelList curvrl, Relation indrel);
+static void _vc_vacone (VRelList curvrl);
+static void _vc_scanheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl);
+static void _vc_rpfheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
+static void _vc_vacheap (VRelList curvrl, Relation onerel, VPageList vpl);
+static void _vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
+static void _vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
 static void _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex);
 static void _vc_setpagelock(Relation rel, BlockNumber blkno);
-static bool _vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel);
-static void _vc_reappage(Portal p, VRelList curvrl, VPageDescr vpc);
+static VPageDescr _vc_tidreapped (ItemPointer itemptr, VPageList curvrl);
+static void _vc_reappage (VPageList vpl, VPageDescr vpc);
+static void _vc_vpinsert (VPageList vpl, VPageDescr vpnew);
 static void _vc_free(Portal p, VRelList vrl);
+static void _vc_getindices (Oid relid, int *nindices, Relation **Irel);
+static void _vc_clsindices (int nindices, Relation *Irel);
 static Relation _vc_getarchrel(Relation heaprel);
 static void _vc_archive(Relation archrel, HeapTuple htup);
 static bool _vc_isarchrel(char *rname);
+static void _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
 static char * _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
 static int _vc_cmp_blk (char *left, char *right);
 static int _vc_cmp_offno (char *left, char *right);
+static bool _vc_enough_space (VPageDescr vpd, Size len);
+
 
 void
 vacuum(char *vacrel)
@@ -183,7 +207,7 @@ _vc_vacuum(NameData *VacRelP)
 
     /* vacuum each heap relation */
     for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
-   _vc_vacone(p, cur);
+   _vc_vacone (cur);
 
     _vc_free(p, vrl);
 
@@ -200,7 +224,7 @@ _vc_getrels(Portal p, NameData *VacRelP)
     Buffer buf;
     PortalVariableMemory portalmem;
     MemoryContext old;
-    VRelList vrl, cur = NULL;
+    VRelList vrl, cur;
     Datum d;
     char *rname;
     char rkind;
@@ -249,6 +273,17 @@ _vc_getrels(Portal p, NameData *VacRelP)
        continue;
    }
 
+   /* don't vacuum large objects for now - something breaks when we do */
+   if ( (strlen(rname) > 4) && rname[0] == 'X' &&
+       rname[1] == 'i' && rname[2] == 'n' &&
+       (rname[3] == 'v' || rname[3] == 'x'))
+   {
+       elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now", 
+           NAMEDATALEN, rname);
+       ReleaseBuffer(buf);
+       continue;
+   }
+
    d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
                 pgcdesc, &n);
    smgrno = DatumGetInt16(d);
@@ -283,8 +318,6 @@ _vc_getrels(Portal p, NameData *VacRelP)
 
    cur->vrl_relid = pgctup->t_oid;
    cur->vrl_attlist = (VAttList) NULL;
-   cur->vrl_pgdsc = (VPageDescr*) NULL;
-   cur->vrl_nrepg = 0;
    cur->vrl_npages = cur->vrl_ntups = 0;
    cur->vrl_hasindex = false;
    cur->vrl_next = (VRelList) NULL;
@@ -317,7 +350,7 @@ _vc_getrels(Portal p, NameData *VacRelP)
  * us to lock the entire database during one pass of the vacuum cleaner.
  */
 static void
-_vc_vacone(Portal p, VRelList curvrl)
+_vc_vacone (VRelList curvrl)
 {
     Relation pgclass;
     TupleDesc pgcdesc;
@@ -326,6 +359,12 @@ _vc_vacone(Portal p, VRelList curvrl)
     HeapScanDesc pgcscan;
     Relation onerel;
     ScanKeyData pgckey;
+    VPageListData Vvpl;    /* List of pages to vacuum and/or clean indices */
+    VPageListData Fvpl;    /* List of pages with space enough for re-using */
+    VPageDescr *vpp;
+    Relation *Irel;
+    int nindices;
+    int i;
 
     StartTransactionCommand();
 
@@ -355,14 +394,46 @@ _vc_vacone(Portal p, VRelList curvrl)
     /* we require the relation to be locked until the indices are cleaned */
     RelationSetLockForWrite(onerel);
 
-    /* vacuum it */
-    _vc_vacheap(p, curvrl, onerel);
+    /* scan it */
+    Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
+    _vc_scanheap(curvrl, onerel, &Vvpl, &Fvpl);
 
-    /* if we vacuumed any heap tuples, vacuum the indices too */
-    if (curvrl->vrl_nrepg > 0)
-   _vc_vacindices(curvrl, onerel);
+    /* Now open/count indices */
+    Irel = (Relation *) NULL;
+    if ( Vvpl.vpl_npages > 0 )
+           /* Open all indices of this relation */
+   _vc_getindices(curvrl->vrl_relid, &nindices, &Irel);
     else
-   curvrl->vrl_hasindex = onerel->rd_rel->relhasindex;
+           /* Count indices only */
+   _vc_getindices(curvrl->vrl_relid, &nindices, NULL);
+    
+    if ( nindices > 0 )
+   curvrl->vrl_hasindex = true;
+    else
+   curvrl->vrl_hasindex = false;
+
+    /* Clean index' relation(s) */
+    if ( Irel != (Relation*) NULL )
+    {
+   for (i = 0; i < nindices; i++)
+       _vc_vaconeind (&Vvpl, Irel[i], curvrl->vrl_ntups);
+    }
+
+    if ( Fvpl.vpl_npages > 0 )     /* Try to shrink heap */
+       _vc_rpfheap (curvrl, onerel, &Vvpl, &Fvpl, nindices, Irel);
+    else if ( Vvpl.vpl_npages > 0 )    /* Clean pages from Vvpl list */
+   _vc_vacheap (curvrl, onerel, &Vvpl);
+
+    /* ok - free Vvpl list of reapped pages */
+    if ( Vvpl.vpl_npages > 0 )
+    {
+       vpp = Vvpl.vpl_pgdesc;
+       for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
+       pfree(*vpp);
+       pfree (Vvpl.vpl_pgdesc);
+       if ( Fvpl.vpl_npages > 0 )
+       pfree (Fvpl.vpl_pgdesc);
+    }
 
     /* all done with this class */
     heap_close(onerel);
@@ -376,69 +447,46 @@ _vc_vacone(Portal p, VRelList curvrl)
     CommitTransactionCommand();
 }
 
-# define   ADJUST_FREE_SPACE(S)\
-   ((DOUBLEALIGN(S) == (S)) ? (S) : (DOUBLEALIGN(S) - sizeof(double)))
-
 /*
- *  _vc_vacheap() -- vacuum an open heap relation
+ *  _vc_scanheap() -- scan an open heap relation
  *
- * This routine sets commit times, vacuums dead tuples, cleans up
- * wasted space on the page, and maintains statistics on the number
- * of live tuples in a heap.  In addition, it records the tids of
- * all tuples removed from the heap for any reason.  These tids are
- * used in a scan of indices on the relation to get rid of dead
- * index tuples.
+ * This routine sets commit times, constructs Vvpl list of 
+ * empty/uninitialized pages and pages with dead tuples and
+ * ~LP_USED line pointers, constructs Fvpl list of pages
+ * appropriate for purposes of shrinking and maintains statistics 
+ * on the number of live tuples in a heap.
  */
 static void
-_vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
+_vc_scanheap (VRelList curvrl, Relation onerel, 
+           VPageList Vvpl, VPageList Fvpl)
 {
     int nblocks, blkno;
     ItemId itemid;
     HeapTuple htup;
     Buffer buf;
-    Page page;
+    Page page, tempPage = NULL;
     OffsetNumber offnum, maxoff;
-    Relation archrel = NULL;
-    bool isarchived;
-    int nvac;
-    int ntups;
-    bool pgchanged, tupgone;
+    bool pgchanged, tupgone, dobufrel, notup;
     AbsoluteTime purgetime, expiretime;
     RelativeTime preservetime;
     char *relname;
-    VPageDescr vpc;
-    uint32 nunused, nempg, nnepg, nchpg;
-    Size frsize;
+    VPageDescr vpc, vp;
+    uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
+    Size frsize, frsusf;
+    Size min_tlen = MAXTUPLEN;
+    Size max_tlen = 0;
+    int i;
     struct rusage ru0, ru1;
 
     getrusage(RUSAGE_SELF, &ru0);
 
+    nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
+    frsize = frsusf = 0;
+    
     relname = (RelationGetRelationName(onerel))->data;
 
-    nvac = 0;
-    ntups = 0;
-    nunused = nempg = nnepg = nchpg = 0;
-    frsize = 0;
-    curvrl->vrl_nrepg = 0;
-    
     nblocks = RelationGetNumberOfBlocks(onerel);
 
-    /* if the relation has an archive, open it */
-    if (onerel->rd_rel->relarch != 'n') {
-   isarchived = true;
-   archrel = _vc_getarchrel(onerel);
-    } else
-   isarchived = false;
-
-    /* don't vacuum large objects for now.
-       something breaks when we do*/
-    if ( (strlen(relname) > 4) && 
-   relname[0] == 'X' &&
-   relname[1] == 'i' &&
-   relname[2] == 'n' &&
-   (relname[3] == 'v' || relname[3] == 'x'))
-       return;
-
     /* calculate the purge time: tuples that expired before this time
        will be archived or deleted */
     purgetime = GetCurrentTransactionStartTime();
@@ -456,46 +504,53 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
    purgetime = expiretime;
 
     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
+    vpc->vpd_nusd = 0;
        
     for (blkno = 0; blkno < nblocks; blkno++) {
    buf = ReadBuffer(onerel, blkno);
    page = BufferGetPage(buf);
    vpc->vpd_blkno = blkno;
    vpc->vpd_noff = 0;
+   vpc->vpd_noff = 0;
 
    if (PageIsNew(page)) {
        elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
        NAMEDATALEN, relname, blkno);
        PageInit (page, BufferGetPageSize (buf), 0);
-       vpc->vpd_free = PageGetFreeSpace (page);
-       vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-       frsize += vpc->vpd_free;
+       vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+       frsize += (vpc->vpd_free - sizeof (ItemIdData));
        nnepg++;
-       _vc_reappage(p, curvrl, vpc);   /* No one index should point here */
+       nemend++;
+       _vc_reappage (Vvpl, vpc);
        WriteBuffer(buf);
        continue;
    }
 
    if (PageIsEmpty(page)) {
-       vpc->vpd_free = PageGetFreeSpace (page);
-       vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-       frsize += vpc->vpd_free;
+       vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+       frsize += (vpc->vpd_free - sizeof (ItemIdData));
        nempg++;
-       _vc_reappage(p, curvrl, vpc);   /* No one index should point here */
+       nemend++;
+       _vc_reappage (Vvpl, vpc);
        ReleaseBuffer(buf);
        continue;
    }
 
    pgchanged = false;
+   notup = true;
    maxoff = PageGetMaxOffsetNumber(page);
    for (offnum = FirstOffsetNumber;
         offnum <= maxoff;
         offnum = OffsetNumberNext(offnum)) {
        itemid = PageGetItemId(page, offnum);
 
+       /*
+        * Collect un-used items too - it's possible to have
+        * indices pointing here after crash.
+        */
        if (!ItemIdIsUsed(itemid)) {
            vpc->vpd_voff[vpc->vpd_noff++] = offnum;
-       nunused++;
+           nunused++;
        continue;
        }
 
@@ -506,11 +561,21 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
        TransactionIdIsValid((TransactionId)htup->t_xmin)) {
 
        if (TransactionIdDidAbort(htup->t_xmin)) {
-           pgchanged = true;
            tupgone = true;
        } else if (TransactionIdDidCommit(htup->t_xmin)) {
            htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
            pgchanged = true;
+       } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
+           /* 
+            * Not Aborted, Not Committed, Not in Progress -
+            * so it from crashed process. - vadim 11/26/96
+            */
+           ncrash++;
+           tupgone = true;
+       }
+       else {
+           elog (MESSLEV, "Rel %.*s: InsertTransactionInProgress %u for TID %u/%u",
+           NAMEDATALEN, relname, htup->t_xmin, blkno, offnum);
        }
        }
 
@@ -532,140 +597,618 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
 
            if (htup->t_tmax < purgetime) {
            tupgone = true;
-           pgchanged = true;
            }
        }
        }
 
+       /*
+        * Is it possible at all ? - vadim 11/26/96
+        */
+       if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
+       {
+       elog (NOTICE, "TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
+DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", 
+           TransactionIdIsValid((TransactionId)htup->t_xmax),
+           tupgone);
+       }
+       
        if (tupgone) {
-       ItemId lpp = &(((PageHeader) page)->pd_linp[offnum - 1]);
-
-       /* write the tuple to the archive, if necessary */
-       if (isarchived)
-           _vc_archive(archrel, htup);
+       ItemId lpp;
+                                                    
+       if ( tempPage == (Page) NULL )
+       {
+           Size pageSize;
+           
+           pageSize = PageGetPageSize(page);
+           tempPage = (Page) palloc(pageSize);
+           memmove (tempPage, page, pageSize);
+       }
+       
+       lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
 
        /* mark it unused */
        lpp->lp_flags &= ~LP_USED;
 
            vpc->vpd_voff[vpc->vpd_noff++] = offnum;
+           nvac++;
 
-       ++nvac;
        } else {
        ntups++;
+       notup = false;
+       if ( htup->t_len < min_tlen )
+           min_tlen = htup->t_len;
+       if ( htup->t_len > max_tlen )
+           max_tlen = htup->t_len;
        }
    }
 
    if (pgchanged) {
-       PageRepairFragmentation(page);
-       if ( vpc->vpd_noff > 0 ) {  /* there are some new unused tids here */
-       vpc->vpd_free = PageGetFreeSpace (page);
-       vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-       frsize += vpc->vpd_free;
-       _vc_reappage(p, curvrl, vpc);
-       }
        WriteBuffer(buf);
+       dobufrel = false;
        nchpg++;
-   } else {
-       if ( vpc->vpd_noff > 0 ) {  /* there are only old unused tids here */
-       vpc->vpd_free = PageGetFreeSpace (page);
-       vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-       frsize += vpc->vpd_free;
-       _vc_reappage(p, curvrl, vpc);
-       }
-       ReleaseBuffer(buf);
    }
+   else
+       dobufrel = true;
+   if ( tempPage != (Page) NULL )
+   { /* Some tuples are gone */
+       PageRepairFragmentation(tempPage);
+       vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
+       frsize += vpc->vpd_free;
+       _vc_reappage (Vvpl, vpc);
+       pfree (tempPage);
+       tempPage = (Page) NULL;
+   }
+   else if ( vpc->vpd_noff > 0 )
+   { /* there are only ~LP_USED line pointers */
+       vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+       frsize += vpc->vpd_free;
+       _vc_reappage (Vvpl, vpc);
+   }
+   if ( dobufrel )
+       ReleaseBuffer(buf);
+   if ( notup )
+       nemend++;
+   else
+       nemend = 0;
     }
 
-    if (isarchived)
-   heap_close(archrel);
+    pfree (vpc);
 
     /* save stats in the rel list for use later */
     curvrl->vrl_ntups = ntups;
     curvrl->vrl_npages = nblocks;
+    
+    Vvpl->vpl_nemend = nemend;
+    Fvpl->vpl_nemend = nemend;
+
+    /* 
+     * Try to make Fvpl keeping in mind that we can't use free space 
+     * of "empty" end-pages and last page if it reapped.
+     */
+    if ( Vvpl->vpl_npages - nemend > 0 )
+    {
+   int nusf;       /* blocks usefull for re-using */
+   
+   nusf = Vvpl->vpl_npages - nemend;
+   if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
+       nusf--;
+    
+   for (i = 0; i < nusf; i++)
+       {
+       vp = Vvpl->vpl_pgdesc[i];
+       if ( _vc_enough_space (vp, min_tlen) )
+       {
+       _vc_vpinsert (Fvpl, vp);
+       frsusf += vp->vpd_free;
+       }
+   }
+    }
 
     getrusage(RUSAGE_SELF, &ru1);
     
-    elog (NOTICE, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
-Tuples %u: Vac %u, UnUsed %u; FreeSpace %u. Elapsed %u/%u sec.",
+    elog (MESSLEV, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
+Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
    NAMEDATALEN, relname, 
-   nblocks, nchpg, curvrl->vrl_nrepg, nempg, nnepg, 
-   ntups, nvac, nunused, frsize, 
+   nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
+   ntups, nvac, ncrash, nunused, min_tlen, max_tlen, 
+   frsize, frsusf, nemend, Fvpl->vpl_npages,
    ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
    ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
 
-    pfree (vpc);
-}
+} /* _vc_scanheap */
+
 
 /*
- *  _vc_vacindices() -- vacuum all the indices for a particular heap relation.
- *
- * On entry, curvrl points at the relation currently being vacuumed.
- * We already have a write lock on the relation, so we don't need to
- * worry about anyone building an index on it while we're doing the
- * vacuuming.  The tid list for curvrl is sorted in reverse tid order:
- * that is, tids on higher page numbers are before those on lower page
- * numbers, and tids high on the page are before those low on the page.
- * We use this ordering to cut down the search cost when we look at an
- * index entry.
+ *  _vc_rpfheap() -- try to repaire relation' fragmentation
  *
- * We're executing inside the transaction that vacuumed the heap.
+ * This routine marks dead tuples as unused and tries re-use dead space
+ * by moving tuples (and inserting indices if needed). It constructs 
+ * Nvpl list of free-ed pages (moved tuples) and clean indices
+ * for them after committing (in hack-manner - without losing locks
+ * and freeing memory!) current transaction. It truncates relation
+ * if some end-blocks are gone away.
  */
 static void
-_vc_vacindices(VRelList curvrl, Relation onerel)
+_vc_rpfheap (VRelList curvrl, Relation onerel, 
+       VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
 {
-    Relation pgindex;
-    TupleDesc pgidesc;
-    HeapTuple pgitup;
-    HeapScanDesc pgiscan;
-    Buffer buf;
-    Relation indrel;
-    Oid indoid;
-    Datum d;
-    bool n;
-    int nindices;
-    ScanKeyData pgikey;
+    TransactionId myXID;
+    CommandId myCID;
+    AbsoluteTime myCTM;
+    Buffer buf, ToBuf;
+    int nblocks, blkno;
+    Page page, ToPage;
+    OffsetNumber offnum, maxoff, newoff, moff;
+    ItemId itemid, newitemid;
+    HeapTuple htup, newtup;
+    TupleDesc tupdesc;
+    Datum *idatum;
+    char *inulls;
+    InsertIndexResult iresult;
+    VPageListData Nvpl;
+    VPageDescr ToVpd, Fvplast, Vvplast, vpc, *vpp;
+    IndDesc *Idesc, *idcur;
+    int Fblklast, Vblklast, i;
+    Size tlen;
+    int nmoved, Fnpages, Vnpages;
+    int nchkmvd, ntups;
+    bool isempty, dowrite;
+    Relation archrel;
+    struct rusage ru0, ru1;
 
-    /* see if we can dodge doing any work at all */
-    if (!(onerel->rd_rel->relhasindex))
-   return;
+    getrusage(RUSAGE_SELF, &ru0);
 
-    nindices = 0;
+    myXID = GetCurrentTransactionId();
+    myCID = GetCurrentCommandId();
+   
+    if ( Irel != (Relation*) NULL )    /* preparation for index' inserts */
+    {
+   _vc_mkindesc (onerel, nindices, Irel, &Idesc);
+   tupdesc = RelationGetTupleDescriptor(onerel);
+   idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
+   inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
+    }
 
-    /* prepare a heap scan on the pg_index relation */
-    pgindex = heap_openr(IndexRelationName);
-    pgidesc = RelationGetTupleDescriptor(pgindex);
+    /* if the relation has an archive, open it */
+    if (onerel->rd_rel->relarch != 'n')
+    {
+   archrel = _vc_getarchrel(onerel);
+   /* Archive tuples from "empty" end-pages */
+   for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, 
+               i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
+   {
+       if ( (*vpp)->vpd_noff > 0 )
+       {
+           buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+           page = BufferGetPage(buf);
+           Assert ( !PageIsEmpty(page) );
+       _vc_vacpage (page, *vpp, archrel);
+       WriteBuffer (buf);
+       }
+   }
+    }
+    else
+   archrel = (Relation) NULL;
+
+    Nvpl.vpl_npages = 0;
+    Fnpages = Fvpl->vpl_npages;
+    Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
+    Fblklast = Fvplast->vpd_blkno;
+    Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
+    Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
+    Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
+    Vblklast = Vvplast->vpd_blkno;
+    Assert ( Vblklast >= Fblklast );
+    ToBuf = InvalidBuffer;
+    nmoved = 0;
 
-    ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
-              ObjectIdEqualRegProcedure,
-              ObjectIdGetDatum(curvrl->vrl_relid));
+    vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
+    vpc->vpd_nusd = 0;
+   
+    nblocks = curvrl->vrl_npages;
+    for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
+    {
+   /* if it's reapped page and it was used by me - quit */
+   if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
+       break;
 
-    pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
+   buf = ReadBuffer(onerel, blkno);
+   page = BufferGetPage(buf);
 
-    /* vacuum all the indices */
-    while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, &buf))) {
-   d = (Datum) heap_getattr(pgitup, buf, Anum_pg_index_indexrelid,
-                pgidesc, &n);
-   indoid = DatumGetObjectId(d);
-   indrel = index_open(indoid);
-   _vc_vaconeind(curvrl, indrel);
-   heap_close(indrel);
-   nindices++;
+   vpc->vpd_noff = 0;
+
+   isempty = PageIsEmpty(page);
+
+   if ( blkno == Vblklast )        /* it's reapped page */
+   {
+       if ( Vvplast->vpd_noff > 0 )    /* there are dead tuples */
+       {                   /* on this page - clean */
+       Assert ( ! isempty );
+       _vc_vacpage (page, Vvplast, archrel);
+       dowrite = true;
+       }
+       else
+       Assert ( isempty );
+       Assert ( --Vnpages > 0 );
+       /* get prev reapped page from Vvpl */
+       Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
+       Vblklast = Vvplast->vpd_blkno;
+       if ( blkno == Fblklast )    /* this page in Fvpl too */
+       {
+       Assert ( --Fnpages > 0 );
+       Assert ( Fvplast->vpd_nusd == 0 );
+       /* get prev reapped page from Fvpl */
+       Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
+       Fblklast = Fvplast->vpd_blkno;
+       }
+       Assert ( Fblklast <= Vblklast );
+       if ( isempty )
+       {
+       ReleaseBuffer(buf);
+       continue;
+       }
+   }
+   else
+   {
+       Assert ( ! isempty );
+       dowrite = false;
+   }
+
+   vpc->vpd_blkno = blkno;
+   maxoff = PageGetMaxOffsetNumber(page);
+   for (offnum = FirstOffsetNumber;
+       offnum <= maxoff;
+       offnum = OffsetNumberNext(offnum))
+   {
+       itemid = PageGetItemId(page, offnum);
+
+       if (!ItemIdIsUsed(itemid))
+       continue;
+
+       htup = (HeapTuple) PageGetItem(page, itemid);
+       tlen = htup->t_len;
+       
+       /* try to find new page for this tuple */
+       if ( ToBuf == InvalidBuffer ||
+       ! _vc_enough_space (ToVpd, tlen) )
+       {
+       if ( ToBuf != InvalidBuffer )
+           WriteBuffer(ToBuf);
+       ToBuf = InvalidBuffer;
+       for (i=0; i < Fnpages; i++)
+       {
+           if ( _vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
+           break;
+       }
+       if ( i == Fnpages )
+           break;          /* can't move item anywhere */
+       ToVpd = Fvpl->vpl_pgdesc[i];
+       ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
+       ToPage = BufferGetPage(ToBuf);
+       /* if this page was not used before - clean it */
+       if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
+           _vc_vacpage (ToPage, ToVpd, archrel);
+       }
+       
+       /* copy tuple */
+       newtup = (HeapTuple) palloc (tlen);
+       memmove((char *) newtup, (char *) htup, tlen);
+
+       /* store transaction information */
+       TransactionIdStore(myXID, &(newtup->t_xmin));
+       newtup->t_cmin = myCID;
+       StoreInvalidTransactionId(&(newtup->t_xmax));
+       newtup->t_tmin = INVALID_ABSTIME;
+       newtup->t_tmax = CURRENT_ABSTIME;
+       ItemPointerSetInvalid(&newtup->t_chain);
+
+       /* add tuple to the page */
+       newoff = PageAddItem (ToPage, (Item)newtup, tlen, 
+               InvalidOffsetNumber, LP_USED);
+       if ( newoff == InvalidOffsetNumber )
+       {
+       elog (WARN, "\
+failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
+       tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, 
+       ToVpd->vpd_nusd, ToVpd->vpd_noff);
+       }
+       newitemid = PageGetItemId(ToPage, newoff);
+       pfree (newtup);
+       newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
+       ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
+
+       /* now logically delete end-tuple */
+       TransactionIdStore(myXID, &(htup->t_xmax));
+       htup->t_cmax = myCID;
+       memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
+
+       ToVpd->vpd_nusd++;
+       nmoved++;
+       ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
+       vpc->vpd_voff[vpc->vpd_noff++] = offnum;
+       
+       /* insert index' tuples if needed */
+       if ( Irel != (Relation*) NULL )
+       {
+       for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
+       {
+           FormIndexDatum (
+                   idcur->natts,
+                   (AttrNumber *)&(idcur->tform->indkey[0]),
+               newtup, 
+               tupdesc,
+               InvalidBuffer,
+               idatum,
+               inulls,
+               idcur->finfoP);
+           iresult = index_insert (
+               Irel[i],
+               idatum,
+               inulls,
+               &(newtup->t_ctid),
+               true);
+           if (iresult) pfree(iresult);
+       }
+       }
+       
+   } /* walk along page */
+
+   if ( vpc->vpd_noff > 0 )        /* some tuples were moved */
+   {
+       _vc_reappage (&Nvpl, vpc);
+       WriteBuffer(buf);
+   }
+   else if ( dowrite )
+       WriteBuffer(buf);
+   else
+       ReleaseBuffer(buf);
+       
+   if ( offnum <= maxoff )
+       break;              /* some item(s) left */
+       
+    } /* walk along relation */
+   
+    blkno++;               /* new number of blocks */
+
+    if ( ToBuf != InvalidBuffer )
+    {
+   Assert (nmoved > 0);
+   WriteBuffer(ToBuf);
     }
 
-    heap_endscan(pgiscan);
-    heap_close(pgindex);
+    if ( nmoved > 0 )
+    {
+   /* 
+    * We have to commit our tuple' movings before we'll truncate 
+    * relation, but we shouldn't lose our locks. And so - quick hack: 
+    * flush buffers and record status of current transaction
+    * as committed, and continue. - vadim 11/13/96
+    */
+   FlushBufferPool(!TransactionFlushEnabled());
+   TransactionIdCommit(myXID);
+   FlushBufferPool(!TransactionFlushEnabled());
+   myCTM = TransactionIdGetCommitTime(myXID);
+    }
+   
+    /* 
+     * Clean uncleaned reapped pages from Vvpl list 
+     * and set commit' times  for inserted tuples
+     */
+    nchkmvd = 0;
+    for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
+    {
+   Assert ( (*vpp)->vpd_blkno < blkno );
+   buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+   page = BufferGetPage(buf);
+   if ( (*vpp)->vpd_nusd == 0 )    /* this page was not used */
+   {
+       /* noff == 0 in empty pages only - such pages should be re-used */
+       Assert ( (*vpp)->vpd_noff > 0 );
+       _vc_vacpage (page, *vpp, archrel);
+   }
+   else                /* this page was used */
+   {
+       ntups = 0;
+       moff = PageGetMaxOffsetNumber(page);
+       for (newoff = FirstOffsetNumber;
+           newoff <= moff;
+           newoff = OffsetNumberNext(newoff))
+       {
+           itemid = PageGetItemId(page, newoff);
+           if (!ItemIdIsUsed(itemid))
+           continue;
+           htup = (HeapTuple) PageGetItem(page, itemid);
+           if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
+           {
+               htup->t_tmin = myCTM;
+               ntups++;
+           }
+       }
+       Assert ( (*vpp)->vpd_nusd == ntups );
+       nchkmvd += ntups;
+   }
+       WriteBuffer (buf);
+    }
+    Assert ( nmoved == nchkmvd );
 
-    if (nindices > 0)
-   curvrl->vrl_hasindex = true;
+    getrusage(RUSAGE_SELF, &ru1);
+    
+    elog (MESSLEV, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
+Elapsed %u/%u sec.",
+       NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
+       nblocks, blkno, nmoved,
+       ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
+       ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
+
+    if ( Nvpl.vpl_npages > 0 )
+    {
+   /* vacuum indices again if needed */
+   if ( Irel != (Relation*) NULL )
+   {
+       VPageDescr *vpleft, *vpright, vpsave;
+       
+       /* re-sort Nvpl.vpl_pgdesc */
+       for (vpleft = Nvpl.vpl_pgdesc, 
+       vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
+       vpleft < vpright; vpleft++, vpright--)
+       {
+       vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
+       }
+       for (i = 0; i < nindices; i++)
+       _vc_vaconeind (&Nvpl, Irel[i], curvrl->vrl_ntups);
+   }
+
+   /* 
+    * clean moved tuples from last page in Nvpl list
+    * if some tuples left there
+    */
+   if ( vpc->vpd_noff > 0 && offnum <= maxoff )
+   {
+       Assert (vpc->vpd_blkno == blkno - 1);
+       buf = ReadBuffer(onerel, vpc->vpd_blkno);
+       page = BufferGetPage (buf);
+       ntups = 0;
+       maxoff = offnum;
+       for (offnum = FirstOffsetNumber;
+           offnum < maxoff;
+           offnum = OffsetNumberNext(offnum))
+       {
+           itemid = PageGetItemId(page, offnum);
+           if (!ItemIdIsUsed(itemid))
+           continue;
+           htup = (HeapTuple) PageGetItem(page, itemid);
+           Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
+       itemid->lp_flags &= ~LP_USED;
+       ntups++;
+       }
+       Assert ( vpc->vpd_noff == ntups );
+       PageRepairFragmentation(page);
+       WriteBuffer (buf);
+   }
+
+   /* now - free new list of reapped pages */
+   vpp = Nvpl.vpl_pgdesc;
+   for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
+       pfree(*vpp);
+   pfree (Nvpl.vpl_pgdesc);
+    }
+   
+    /* truncate relation */
+    if ( blkno < nblocks )
+    {
+       blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
+   Assert ( blkno >= 0 );
+   curvrl->vrl_npages = blkno; /* set new number of blocks */
+    }
+
+    if ( archrel != (Relation) NULL )
+   heap_close(archrel);
+
+    if ( Irel != (Relation*) NULL )    /* pfree index' allocations */
+    {
+   pfree (Idesc);
+   pfree (idatum);
+   pfree (inulls);
+   _vc_clsindices (nindices, Irel);
+    }
+
+    pfree (vpc);
+
+} /* _vc_rpfheap */
+
+/*
+ *  _vc_vacheap() -- free dead tuples
+ *
+ * This routine marks dead tuples as unused and truncates relation
+ * if there are "empty" end-blocks.
+ */
+static void
+_vc_vacheap (VRelList curvrl, Relation onerel, VPageList Vvpl)
+{
+    Buffer buf;
+    Page page;
+    VPageDescr *vpp;
+    Relation archrel;
+    int nblocks;
+    int i;
+
+    nblocks = Vvpl->vpl_npages;
+    /* if the relation has an archive, open it */
+    if (onerel->rd_rel->relarch != 'n')
+   archrel = _vc_getarchrel(onerel);
     else
-   curvrl->vrl_hasindex = false;
-}
+    {
+   archrel = (Relation) NULL;
+   nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
+    }
+   
+    for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
+    {
+   if ( (*vpp)->vpd_noff > 0 )
+   {
+       buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+       page = BufferGetPage (buf);
+       _vc_vacpage (page, *vpp, archrel);
+       WriteBuffer (buf);
+   }
+    }
+
+    /* truncate relation if there are some empty end-pages */
+    if ( Vvpl->vpl_nemend > 0 )
+    {
+   Assert ( curvrl->vrl_npages >= Vvpl->vpl_nemend );
+   nblocks = curvrl->vrl_npages - Vvpl->vpl_nemend;
+   elog (MESSLEV, "Rel %.*s: Pages: %u --> %u.",
+       NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
+       curvrl->vrl_npages, nblocks);
+
+   /* 
+    * we have to flush "empty" end-pages (if changed, but who knows it)
+    * before truncation 
+    */
+   FlushBufferPool(!TransactionFlushEnabled());
+
+       nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
+   Assert ( nblocks >= 0 );
+   curvrl->vrl_npages = nblocks;   /* set new number of blocks */
+    }
+
+    if ( archrel != (Relation) NULL )
+   heap_close(archrel);
+
+} /* _vc_vacheap */
+
+/*
+ *  _vc_vacpage() -- free (and archive if needed) dead tuples on a page
+ *          and repaire its fragmentation.
+ */
+static void
+_vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
+{
+    ItemId itemid;
+    HeapTuple htup;
+    int i;
+    
+    Assert ( vpd->vpd_nusd == 0 );
+    for (i=0; i < vpd->vpd_noff; i++)
+    {
+   itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
+   if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
+   {
+       htup = (HeapTuple) PageGetItem (page, itemid);
+       _vc_archive (archrel, htup);
+   }
+   itemid->lp_flags &= ~LP_USED;
+    }
+    PageRepairFragmentation(page);
+
+} /* _vc_vacpage */
 
 /*
  *  _vc_vaconeind() -- vacuum one index relation.
  *
- * Curvrl is the VRelList entry for the heap we're currently vacuuming.
- * It's locked. Onerel is an index relation on the vacuumed heap. 
+ * Vpl is the VPageList of the heap we're currently vacuuming.
+ * It's locked. Indrel is an index relation on the vacuumed heap. 
  * We don't set locks on the index relation here, since the indexed 
  * access methods support locking at different granularities. 
  * We let them handle it.
@@ -674,7 +1217,7 @@ _vc_vacindices(VRelList curvrl, Relation onerel)
  * pg_class.
  */
 static void
-_vc_vaconeind(VRelList curvrl, Relation indrel)
+_vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
 {
     RetrieveIndexResult res;
     IndexScanDesc iscan;
@@ -682,6 +1225,7 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
     int nvac;
     int nitups;
     int nipages;
+    VPageDescr vp;
     struct rusage ru0, ru1;
 
     getrusage(RUSAGE_SELF, &ru0);
@@ -695,7 +1239,8 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
       != (RetrieveIndexResult) NULL) {
    heapptr = &res->heap_iptr;
 
-   if (_vc_tidreapped(heapptr, curvrl, indrel)) {
+   if ( (vp = _vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
+   {
 #if 0
        elog(DEBUG, "<%x,%x> -> <%x,%x>",
         ItemPointerGetBlockNumber(&(res->index_iptr)),
@@ -703,6 +1248,12 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
         ItemPointerGetBlockNumber(&(res->heap_iptr)),
         ItemPointerGetOffsetNumber(&(res->heap_iptr)));
 #endif
+       if ( vp->vpd_noff == 0 ) 
+       {               /* this is EmptyPage !!! */
+           elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
+           NAMEDATALEN, indrel->rd_rel->relname.data,
+               vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
+       }
        ++nvac;
        index_delete(indrel, &res->index_iptr);
    } else {
@@ -721,12 +1272,58 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
 
     getrusage(RUSAGE_SELF, &ru1);
 
-    elog (NOTICE, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
+    elog (MESSLEV, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
    NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
    ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
    ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
 
-}
+    if ( nitups != nhtups )
+       elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
+           nitups, nhtups);
+
+} /* _vc_vaconeind */
+
+/*
+ *  _vc_tidreapped() -- is a particular tid reapped?
+ *
+ * vpl->VPageDescr_array is sorted in right order.
+ */
+static VPageDescr
+_vc_tidreapped(ItemPointer itemptr, VPageList vpl)
+{
+    OffsetNumber ioffno;
+    OffsetNumber *voff;
+    VPageDescr vp, *vpp;
+    VPageDescrData vpd;
+
+    vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
+    ioffno = ItemPointerGetOffsetNumber(itemptr);
+   
+    vp = &vpd;
+    vpp = (VPageDescr*) _vc_find_eq ((char*)(vpl->vpl_pgdesc), 
+       vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, 
+       _vc_cmp_blk);
+
+    if ( vpp == (VPageDescr*) NULL )
+   return ((VPageDescr)NULL);
+    vp = *vpp;
+
+    /* ok - we are on true page */
+
+    if ( vp->vpd_noff == 0 ) {     /* this is EmptyPage !!! */
+   return (vp);
+    }
+    
+    voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), 
+       vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
+       _vc_cmp_offno);
+
+    if ( voff == (OffsetNumber*) NULL )
+   return ((VPageDescr)NULL);
+
+    return (vp);
+
+} /* _vc_tidreapped */
 
 /*
  *  _vc_updstats() -- update pg_class statistics for one relation
@@ -771,7 +1368,7 @@ _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex)
     pgcform->relhasindex = hasindex;
  
     /* XXX -- after write, should invalidate relcache in other backends */
-    WriteNoReleaseBuffer(buf);
+    WriteNoReleaseBuffer(buf); /* heap_endscan release scan' buffers ? */
 
     /* that's all, folks */
     heap_endscan(sdesc);
@@ -788,88 +1385,47 @@ static void _vc_setpagelock(Relation rel, BlockNumber blkno)
     RelationSetLockForWritePage(rel, &itm);
 }
 
-/*
- *  _vc_tidreapped() -- is a particular tid reapped?
- *
- * VPageDescr array is sorted in right order.
- */
-static bool
-_vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel)
-{
-    OffsetNumber ioffno;
-    OffsetNumber *voff;
-    VPageDescr vp, *vpp;
-    VPageDescrData vpd;
-
-    vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
-    ioffno = ItemPointerGetOffsetNumber(itemptr);
-   
-    vp = &vpd;
-    vpp = (VPageDescr*) _vc_find_eq ((char*)(curvrl->vrl_pgdsc), 
-       curvrl->vrl_nrepg, sizeof (VPageDescr), (char*)&vp, 
-       _vc_cmp_blk);
-
-    if ( vpp == (VPageDescr*) NULL )
-   return (false);
-    vp = *vpp;
-
-    /* ok - we are on true page */
-
-    if ( vp->vpd_noff == 0 ) {     /* this is EmptyPage !!! */
-   elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
-       NAMEDATALEN, indrel->rd_rel->relname.data,
-           vpd.vpd_blkno, ioffno);
-   return (true);
-    }
-    
-    voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), 
-       vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
-       _vc_cmp_offno);
-
-    if ( voff == (OffsetNumber*) NULL )
-   return (false);
-
-    return (true);
-
-} /* _vc_tidreapped */
-
 
 /*
- *  _vc_reappage() -- save a page on the array of reaped pages for the current
- *          entry on the vacuum relation list.
+ *  _vc_reappage() -- save a page on the array of reapped pages.
  *
  * As a side effect of the way that the vacuuming loop for a given
  * relation works, higher pages come after lower pages in the array
  * (and highest tid on a page is last).
  */
-static void
-_vc_reappage(Portal p,
-       VRelList curvrl,
-       VPageDescr vpc)
+static void 
+_vc_reappage(VPageList vpl, VPageDescr vpc)
 {
-    PortalVariableMemory pmem;
-    MemoryContext old;
     VPageDescr newvpd;
 
-    /* allocate a VPageDescrData entry in the portal memory context */
-    pmem = PortalGetVariableMemory(p);
-    old = MemoryContextSwitchTo((MemoryContext) pmem);
-    if ( curvrl->vrl_nrepg == 0 )
-       curvrl->vrl_pgdsc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
-    else if ( curvrl->vrl_nrepg % 100 == 0 )
-       curvrl->vrl_pgdsc = (VPageDescr*) repalloc(curvrl->vrl_pgdsc, (curvrl->vrl_nrepg+100)*sizeof(VPageDescr));
+    /* allocate a VPageDescrData entry */
     newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
-    MemoryContextSwitchTo(old);
 
     /* fill it in */
     if ( vpc->vpd_noff > 0 )
        memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
     newvpd->vpd_blkno = vpc->vpd_blkno;
     newvpd->vpd_free = vpc->vpd_free;
+    newvpd->vpd_nusd = vpc->vpd_nusd;
     newvpd->vpd_noff = vpc->vpd_noff;
 
-    curvrl->vrl_pgdsc[curvrl->vrl_nrepg] = newvpd;
-    (curvrl->vrl_nrepg)++;
+    /* insert this page into vpl list */
+    _vc_vpinsert (vpl, newvpd);
+    
+} /* _vc_reappage */
+
+static void
+_vc_vpinsert (VPageList vpl, VPageDescr vpnew)
+{
+
+    /* allocate a VPageDescr entry if needed */
+    if ( vpl->vpl_npages == 0 )
+       vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
+    else if ( vpl->vpl_npages % 100 == 0 )
+       vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
+    vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
+    (vpl->vpl_npages)++;
+    
 }
 
 static void
@@ -877,8 +1433,6 @@ _vc_free(Portal p, VRelList vrl)
 {
     VRelList p_vrl;
     VAttList p_val, val;
-    VPageDescr *vpd;
-    int i;
     MemoryContext old;
     PortalVariableMemory pmem;
 
@@ -895,15 +1449,6 @@ _vc_free(Portal p, VRelList vrl)
        pfree(p_val);
    }
 
-   /* free page' descriptions */
-   if ( vrl->vrl_nrepg > 0 ) {
-       vpd = vrl->vrl_pgdsc;
-       for (i = 0; i < vrl->vrl_nrepg; i++) {
-       pfree(vpd[i]);
-       }
-       pfree (vpd);
-   }
-
    /* free rel list entry */
    p_vrl = vrl;
    vrl = vrl->vrl_next;
@@ -1038,3 +1583,149 @@ _vc_cmp_offno (char *left, char *right)
     return (1);
 
 } /* _vc_cmp_offno */
+
+
+static void
+_vc_getindices (Oid relid, int *nindices, Relation **Irel)
+{
+    Relation pgindex;
+    Relation irel;
+    TupleDesc pgidesc;
+    HeapTuple pgitup;
+    HeapScanDesc pgiscan;
+    Datum d;
+    int i, k;
+    bool n;
+    ScanKeyData pgikey;
+    Oid *ioid;
+
+    *nindices = i = 0;
+    
+    ioid = (Oid *) palloc(10*sizeof(Oid));
+
+    /* prepare a heap scan on the pg_index relation */
+    pgindex = heap_openr(IndexRelationName);
+    pgidesc = RelationGetTupleDescriptor(pgindex);
+
+    ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
+              ObjectIdEqualRegProcedure,
+              ObjectIdGetDatum(relid));
+
+    pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
+
+    while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
+   d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
+                pgidesc, &n);
+   i++;
+   if ( i % 10 == 0 )
+       ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
+   ioid[i-1] = DatumGetObjectId(d);
+    }
+
+    heap_endscan(pgiscan);
+    heap_close(pgindex);
+
+    if ( i == 0 ) {    /* No one index found */
+   pfree(ioid);
+   return;
+    }
+
+    if ( Irel != (Relation **) NULL )
+   *Irel = (Relation *) palloc(i * sizeof(Relation));
+    
+    for (k = 0; i > 0; )
+    {
+   irel = index_open(ioid[--i]);
+   if ( irel != (Relation) NULL )
+   {
+       if ( Irel != (Relation **) NULL )
+       (*Irel)[k] = irel;
+       else
+       index_close (irel);
+       k++;
+   }
+   else
+       elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
+    }
+    *nindices = k;
+    pfree(ioid);
+
+    if ( Irel != (Relation **) NULL && *nindices == 0 )
+    {
+   pfree (*Irel);
+   *Irel = (Relation *) NULL;
+    }
+
+} /* _vc_getindices */
+
+
+static void
+_vc_clsindices (int nindices, Relation *Irel)
+{
+
+    if ( Irel == (Relation*) NULL )
+       return;
+
+    while (nindices--) {
+   index_close (Irel[nindices]);
+    }
+    pfree (Irel);
+
+} /* _vc_clsindices */
+
+
+static void
+_vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
+{
+    IndDesc *idcur;
+    HeapTuple pgIndexTup;
+    AttrNumber *attnumP;
+    int natts;
+    int i;
+
+    *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
+    
+    for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
+   pgIndexTup =
+       SearchSysCacheTuple(INDEXRELID,
+               ObjectIdGetDatum(Irel[i]->rd_id),
+               0,0,0);
+   Assert(pgIndexTup);
+   idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
+   for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
+       *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
+       attnumP++, natts++);
+   if (idcur->tform->indproc != InvalidOid) {
+       idcur->finfoP = &(idcur->finfo);
+       FIgetnArgs(idcur->finfoP) = natts;
+       natts = 1;
+       FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
+       *(FIgetname(idcur->finfoP)) = '\0';
+   } else
+       idcur->finfoP = (FuncIndexInfo *) NULL;
+   
+   idcur->natts = natts;
+    }
+    
+} /* _vc_mkindesc */
+
+
+static bool
+_vc_enough_space (VPageDescr vpd, Size len)
+{
+
+    len = DOUBLEALIGN(len);
+
+    if ( len > vpd->vpd_free )
+       return (false);
+    
+    if ( vpd->vpd_nusd < vpd->vpd_noff )   /* there are free itemid(s) */
+       return (true);              /* and len <= free_space */
+    
+    /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
+    if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
+       return (true);
+    
+    return (false);
+    
+} /* _vc_enough_space */