vacuum.c refactoring
authorBruce Momjian
Tue, 8 Jun 2004 13:59:36 +0000 (13:59 +0000)
committerBruce Momjian
Tue, 8 Jun 2004 13:59:36 +0000 (13:59 +0000)
   . rename variables
     . cur_buffer -> dst_buffer
     . ToPage -> dst_page
     . cur_page -> dst_vacpage
   . move variable declarations into block where variable is used
   . various Asserts instead of elog(ERROR, ...)
   . extract functionality from repair_frag() into new routines
     . move_chain_tuple()
     . move_plain_tuple()
     . update_hint_bits()
   . create type ExecContext
   . add comments

Manfred Koizar

src/backend/commands/vacuum.c

index 68164a9a80c0ceb2bc6de30be9698604692784a4..80a021487f9d108e157f61a786b2cee272de5312 100644 (file)
@@ -13,7 +13,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.280 2004/06/05 19:48:07 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.281 2004/06/08 13:59:36 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -99,6 +99,64 @@ typedef struct VRelStats
    VTupleLink  vtlinks;
 } VRelStats;
 
+/*----------------------------------------------------------------------
+ * ExecContext:
+ *
+ * As these variables always appear together, we put them into one struct
+ * and pull initialization and cleanup into separate routines.
+ * ExecContext is used by repair_frag() and move_xxx_tuple().  More
+ * accurately:  It is *used* only in move_xxx_tuple(), but because this
+ * routine is called many times, we initialize the struct just once in
+ * repair_frag() and pass it on to move_xxx_tuple().
+ */
+typedef struct ExecContextData
+{
+   ResultRelInfo *resultRelInfo;
+   EState     *estate;
+   TupleTable  tupleTable;
+   TupleTableSlot *slot;
+} ExecContextData;
+typedef ExecContextData *ExecContext;
+
+static void
+ExecContext_Init(ExecContext ec, Relation rel)
+{
+   TupleDesc   tupdesc = RelationGetDescr(rel);
+
+   /*
+    * We need a ResultRelInfo and an EState so we can use the regular
+    * executor's index-entry-making machinery.
+    */
+   ec->estate = CreateExecutorState();
+
+   ec->resultRelInfo = makeNode(ResultRelInfo);
+   ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
+   ec->resultRelInfo->ri_RelationDesc = rel;
+   ec->resultRelInfo->ri_TrigDesc = NULL;  /* we don't fire triggers */
+
+   ExecOpenIndices(ec->resultRelInfo);
+
+   ec->estate->es_result_relations = ec->resultRelInfo;
+   ec->estate->es_num_result_relations = 1;
+   ec->estate->es_result_relation_info = ec->resultRelInfo;
+
+   /* Set up a dummy tuple table too */
+   ec->tupleTable = ExecCreateTupleTable(1);
+   ec->slot = ExecAllocTableSlot(ec->tupleTable);
+   ExecSetSlotDescriptor(ec->slot, tupdesc, false);
+}
+
+static void
+ExecContext_Finish(ExecContext ec)
+{
+   ExecDropTupleTable(ec->tupleTable, true);
+   ExecCloseIndices(ec->resultRelInfo);
+   FreeExecutorState(ec->estate);
+}
+/*
+ * End of ExecContext Implementation
+ *----------------------------------------------------------------------
+ */
 
 static MemoryContext vac_context = NULL;
 
@@ -122,6 +180,17 @@ static void scan_heap(VRelStats *vacrelstats, Relation onerel,
 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
            VacPageList vacuum_pages, VacPageList fraged_pages,
            int nindexes, Relation *Irel);
+static void move_chain_tuple(Relation rel,
+                    Buffer old_buf, Page old_page, HeapTuple old_tup,
+                    Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                    ExecContext ec, ItemPointer ctid, bool cleanVpd);
+static void move_plain_tuple(Relation rel,
+                    Buffer old_buf, Page old_page, HeapTuple old_tup,
+                    Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                    ExecContext ec);
+static void update_hint_bits(Relation rel, VacPageList fraged_pages,
+                   int num_fraged_pages, BlockNumber last_move_dest_block,
+                   int num_moved);
 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
            VacPageList vacpagelist);
 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
@@ -675,7 +744,7 @@ vac_update_dbstats(Oid dbid,
 static void
 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
 {
-   TransactionId myXID;
+   TransactionId myXID = GetCurrentTransactionId();
    Relation    relation;
    HeapScanDesc scan;
    HeapTuple   tuple;
@@ -683,7 +752,6 @@ vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
    bool        vacuumAlreadyWrapped = false;
    bool        frozenAlreadyWrapped = false;
 
-   myXID = GetCurrentTransactionId();
 
    relation = heap_openr(DatabaseRelationName, AccessShareLock);
 
@@ -1059,17 +1127,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 {
    BlockNumber nblocks,
                blkno;
-   ItemId      itemid;
-   Buffer      buf;
    HeapTupleData tuple;
-   OffsetNumber offnum,
-               maxoff;
-   bool        pgchanged,
-               tupgone,
-               notup;
    char       *relname;
-   VacPage     vacpage,
-               vacpagecopy;
+   VacPage     vacpage;
    BlockNumber empty_pages,
                empty_end_pages;
    double      num_tuples,
@@ -1080,7 +1140,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                usable_free_space;
    Size        min_tlen = MaxTupleSize;
    Size        max_tlen = 0;
-   int         i;
    bool        do_shrinking = true;
    VTupleLink  vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
    int         num_vtlinks = 0;
@@ -1113,6 +1172,11 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                    tempPage = NULL;
        bool        do_reap,
                    do_frag;
+       Buffer      buf;
+       OffsetNumber offnum,
+                   maxoff;
+       bool        pgchanged,
+                   notup;
 
        vacuum_delay_point();
 
@@ -1125,6 +1189,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
        if (PageIsNew(page))
        {
+           VacPage vacpagecopy;
+
            ereport(WARNING,
            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
                    relname, blkno)));
@@ -1142,6 +1208,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
        if (PageIsEmpty(page))
        {
+           VacPage vacpagecopy;
+
            vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
            free_space += vacpage->free;
            empty_pages++;
@@ -1161,8 +1229,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
             offnum = OffsetNumberNext(offnum))
        {
            uint16      sv_infomask;
-
-           itemid = PageGetItemId(page, offnum);
+           ItemId      itemid = PageGetItemId(page, offnum);
+           bool        tupgone = false;
 
            /*
             * Collect un-used items too - it's possible to have indexes
@@ -1180,7 +1248,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
            tuple.t_len = ItemIdGetLength(itemid);
            ItemPointerSet(&(tuple.t_self), blkno, offnum);
 
-           tupgone = false;
            sv_infomask = tuple.t_data->t_infomask;
 
            switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
@@ -1269,7 +1336,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                    do_shrinking = false;
                    break;
                default:
-                   elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+                   /* unexpected HeapTupleSatisfiesVacuum result */
+                   Assert(false);
                    break;
            }
 
@@ -1344,7 +1412,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
        if (do_reap || do_frag)
        {
-           vacpagecopy = copy_vac_page(vacpage);
+           VacPage vacpagecopy = copy_vac_page(vacpage);
            if (do_reap)
                vpage_insert(vacuum_pages, vacpagecopy);
            if (do_frag)
@@ -1390,6 +1458,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
     */
    if (do_shrinking)
    {
+       int         i;
+
        Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
        fraged_pages->num_pages -= empty_end_pages;
        usable_free_space = 0;
@@ -1453,76 +1523,29 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
            VacPageList vacuum_pages, VacPageList fraged_pages,
            int nindexes, Relation *Irel)
 {
-   TransactionId myXID;
-   CommandId   myCID;
-   Buffer      buf,
-               cur_buffer;
+   TransactionId myXID = GetCurrentTransactionId();
+   Buffer      dst_buffer = InvalidBuffer;
    BlockNumber nblocks,
                blkno;
    BlockNumber last_move_dest_block = 0,
                last_vacuum_block;
-   Page        page,
-               ToPage = NULL;
-   OffsetNumber offnum,
-               maxoff,
-               newoff,
-               max_offset;
-   ItemId      itemid,
-               newitemid;
-   HeapTupleData tuple,
-               newtup;
-   TupleDesc   tupdesc;
-   ResultRelInfo *resultRelInfo;
-   EState     *estate;
-   TupleTable  tupleTable;
-   TupleTableSlot *slot;
+   Page        dst_page = NULL;
+   ExecContextData ec;
    VacPageListData Nvacpagelist;
-   VacPage     cur_page = NULL,
+   VacPage     dst_vacpage = NULL,
                last_vacuum_page,
                vacpage,
               *curpage;
-   int         cur_item = 0;
    int         i;
-   Size        tuple_len;
-   int         num_moved,
+   int         num_moved = 0,
                num_fraged_pages,
                vacuumed_pages;
-   int         checked_moved,
-               num_tuples,
-               keep_tuples = 0;
-   bool        isempty,
-               dowrite,
-               chain_tuple_moved;
+   int         keep_tuples = 0;
    VacRUsage   ru0;
 
    vac_init_rusage(&ru0);
 
-   myXID = GetCurrentTransactionId();
-   myCID = GetCurrentCommandId();
-
-   tupdesc = RelationGetDescr(onerel);
-
-   /*
-    * We need a ResultRelInfo and an EState so we can use the regular
-    * executor's index-entry-making machinery.
-    */
-   estate = CreateExecutorState();
-
-   resultRelInfo = makeNode(ResultRelInfo);
-   resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
-   resultRelInfo->ri_RelationDesc = onerel;
-   resultRelInfo->ri_TrigDesc = NULL;  /* we don't fire triggers */
-
-   ExecOpenIndices(resultRelInfo);
-
-   estate->es_result_relations = resultRelInfo;
-   estate->es_num_result_relations = 1;
-   estate->es_result_relation_info = resultRelInfo;
-
-   /* Set up a dummy tuple table too */
-   tupleTable = ExecCreateTupleTable(1);
-   slot = ExecAllocTableSlot(tupleTable);
-   ExecSetSlotDescriptor(slot, tupdesc, false);
+   ExecContext_Init(&ec, onerel);
 
    Nvacpagelist.num_pages = 0;
    num_fraged_pages = fraged_pages->num_pages;
@@ -1539,8 +1562,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        last_vacuum_page = NULL;
        last_vacuum_block = InvalidBlockNumber;
    }
-   cur_buffer = InvalidBuffer;
-   num_moved = 0;
 
    vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
    vacpage->offsets_used = vacpage->offsets_free = 0;
@@ -1560,6 +1581,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
         blkno > last_move_dest_block;
         blkno--)
    {
+       Buffer          buf;
+       Page            page;
+       OffsetNumber    offnum,
+                       maxoff;
+       bool            isempty,
+                       dowrite,
+                       chain_tuple_moved;
+
        vacuum_delay_point();
 
        /*
@@ -1635,7 +1664,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
             offnum <= maxoff;
             offnum = OffsetNumberNext(offnum))
        {
-           itemid = PageGetItemId(page, offnum);
+           Size            tuple_len;
+           HeapTupleData   tuple;
+           ItemId          itemid = PageGetItemId(page, offnum);
 
            if (!ItemIdIsUsed(itemid))
                continue;
@@ -1645,45 +1676,71 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
            tuple_len = tuple.t_len = ItemIdGetLength(itemid);
            ItemPointerSet(&(tuple.t_self), blkno, offnum);
 
+           /*
+            * VACUUM FULL has an exclusive lock on the relation.  So
+            * normally no other transaction can have pending INSERTs or
+            * DELETEs in this relation.  A tuple is either
+            *   (a) a tuple in a system catalog, inserted or deleted by
+            *       a not yet committed transaction or
+            *   (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
+            *   (c) inserted by a committed xact (XMIN_COMMITTED) or
+            *   (d) moved by the currently running VACUUM.
+            * In case (a) we wouldn't be in repair_frag() at all.
+            * In case (b) we cannot be here, because scan_heap() has
+            * already marked the item as unused, see continue above.
+            * Case (c) is what normally is to be expected.
+            * Case (d) is only possible, if a whole tuple chain has been
+            * moved while processing this or a higher numbered block.
+            */
            if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
            {
-               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                   elog(ERROR, "HEAP_MOVED_IN was not expected");
+               /*
+                * There cannot be another concurrently running VACUUM.  If
+                * the tuple had been moved in by a previous VACUUM, the
+                * visibility check would have set XMIN_COMMITTED.  If the
+                * tuple had been moved in by the currently running VACUUM,
+                * the loop would have been terminated.  We had
+                * elog(ERROR, ...) here, but as we are testing for a
+                * can't-happen condition, Assert() seems more appropriate.
+                */
+               Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));
 
                /*
                 * If this (chain) tuple is moved by me already then I
                 * have to check is it in vacpage or not - i.e. is it
                 * moved while cleaning this page or some previous one.
                 */
-               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-               {
-                   if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                       elog(ERROR, "invalid XVAC in tuple header");
-                   if (keep_tuples == 0)
-                       continue;
-                   if (chain_tuple_moved)      /* some chains was moved
-                                                * while */
-                   {           /* cleaning this page */
-                       Assert(vacpage->offsets_free > 0);
-                       for (i = 0; i < vacpage->offsets_free; i++)
-                       {
-                           if (vacpage->offsets[i] == offnum)
-                               break;
-                       }
-                       if (i >= vacpage->offsets_free) /* not found */
-                       {
-                           vacpage->offsets[vacpage->offsets_free++] = offnum;
-                           keep_tuples--;
-                       }
+               Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
+               /*
+                * MOVED_OFF by another VACUUM would have caused the
+                * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
+                */
+               Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
+
+               /* Can't we Assert(keep_tuples > 0) here? */
+               if (keep_tuples == 0)
+                   continue;
+               if (chain_tuple_moved)      /* some chains was moved
+                                            * while */
+               {           /* cleaning this page */
+                   Assert(vacpage->offsets_free > 0);
+                   for (i = 0; i < vacpage->offsets_free; i++)
+                   {
+                       if (vacpage->offsets[i] == offnum)
+                           break;
                    }
-                   else
+                   if (i >= vacpage->offsets_free) /* not found */
                    {
                        vacpage->offsets[vacpage->offsets_free++] = offnum;
                        keep_tuples--;
                    }
-                   continue;
                }
-               elog(ERROR, "HEAP_MOVED_OFF was expected");
+               else
+               {
+                   vacpage->offsets[vacpage->offsets_free++] = offnum;
+                   keep_tuples--;
+               }
+               continue;
            }
 
            /*
@@ -1716,8 +1773,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                Buffer      Cbuf = buf;
                bool        freeCbuf = false;
                bool        chain_move_failed = false;
-               Page        Cpage;
-               ItemId      Citemid;
                ItemPointerData Ctid;
                HeapTupleData tp = tuple;
                Size        tlen = tuple_len;
@@ -1728,10 +1783,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                int         to_item = 0;
                int         ti;
 
-               if (cur_buffer != InvalidBuffer)
+               if (dst_buffer != InvalidBuffer)
                {
-                   WriteBuffer(cur_buffer);
-                   cur_buffer = InvalidBuffer;
+                   WriteBuffer(dst_buffer);
+                   dst_buffer = InvalidBuffer;
                }
 
                /* Quick exit if we have no vtlinks to search in */
@@ -1754,6 +1809,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                       !(ItemPointerEquals(&(tp.t_self),
                                           &(tp.t_data->t_ctid))))
                {
+                   Page        Cpage;
+                   ItemId      Citemid;
+                   ItemPointerData Ctid;
+
                    Ctid = tp.t_data->t_ctid;
                    if (freeCbuf)
                        ReleaseBuffer(Cbuf);
@@ -1929,12 +1988,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                }
 
                /*
-                * Okay, move the whle tuple chain
+                * Okay, move the whole tuple chain
                 */
                ItemPointerSetInvalid(&Ctid);
                for (ti = 0; ti < num_vtmove; ti++)
                {
                    VacPage     destvacpage = vtmove[ti].vacpage;
+                   Page        Cpage;
+                   ItemId      Citemid;
 
                    /* Get page to move from */
                    tuple.t_self = vtmove[ti].tid;
@@ -1942,13 +2003,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                             ItemPointerGetBlockNumber(&(tuple.t_self)));
 
                    /* Get page to move to */
-                   cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
+                   dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
 
-                   LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
-                   if (cur_buffer != Cbuf)
+                   LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
+                   if (dst_buffer != Cbuf)
                        LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
 
-                   ToPage = BufferGetPage(cur_buffer);
+                   dst_page = BufferGetPage(dst_buffer);
                    Cpage = BufferGetPage(Cbuf);
 
                    Citemid = PageGetItemId(Cpage,
@@ -1961,120 +2022,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                     * make a copy of the source tuple, and then mark the
                     * source tuple MOVED_OFF.
                     */
-                   heap_copytuple_with_tuple(&tuple, &newtup);
-
-                   /*
-                    * register invalidation of source tuple in catcaches.
-                    */
-                   CacheInvalidateHeapTuple(onerel, &tuple);
-
-                   /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
-                   START_CRIT_SECTION();
-
-                   tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                                 HEAP_XMIN_INVALID |
-                                                 HEAP_MOVED_IN);
-                   tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
-                   HeapTupleHeaderSetXvac(tuple.t_data, myXID);
-
-                   /*
-                    * If this page was not used before - clean it.
-                    *
-                    * NOTE: a nasty bug used to lurk here.  It is possible
-                    * for the source and destination pages to be the same
-                    * (since this tuple-chain member can be on a page
-                    * lower than the one we're currently processing in
-                    * the outer loop).  If that's true, then after
-                    * vacuum_page() the source tuple will have been
-                    * moved, and tuple.t_data will be pointing at
-                    * garbage.  Therefore we must do everything that uses
-                    * tuple.t_data BEFORE this step!!
-                    *
-                    * This path is different from the other callers of
-                    * vacuum_page, because we have already incremented
-                    * the vacpage's offsets_used field to account for the
-                    * tuple(s) we expect to move onto the page. Therefore
-                    * vacuum_page's check for offsets_used == 0 is wrong.
-                    * But since that's a good debugging check for all
-                    * other callers, we work around it here rather than
-                    * remove it.
-                    */
-                   if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
-                   {
-                       int         sv_offsets_used = destvacpage->offsets_used;
-
-                       destvacpage->offsets_used = 0;
-                       vacuum_page(onerel, cur_buffer, destvacpage);
-                       destvacpage->offsets_used = sv_offsets_used;
-                   }
-
-                   /*
-                    * Update the state of the copied tuple, and store it
-                    * on the destination page.
-                    */
-                   newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                                  HEAP_XMIN_INVALID |
-                                                  HEAP_MOVED_OFF);
-                   newtup.t_data->t_infomask |= HEAP_MOVED_IN;
-                   HeapTupleHeaderSetXvac(newtup.t_data, myXID);
-                   newoff = PageAddItem(ToPage,
-                                        (Item) newtup.t_data,
-                                        tuple_len,
-                                        InvalidOffsetNumber,
-                                        LP_USED);
-                   if (newoff == InvalidOffsetNumber)
-                   {
-                       elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
-                         (unsigned long) tuple_len, destvacpage->blkno);
-                   }
-                   newitemid = PageGetItemId(ToPage, newoff);
-                   pfree(newtup.t_data);
-                   newtup.t_datamcxt = NULL;
-                   newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
-                   ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
-
-                   /* XLOG stuff */
-                   if (!onerel->rd_istemp)
-                   {
-                       XLogRecPtr  recptr =
-                       log_heap_move(onerel, Cbuf, tuple.t_self,
-                                     cur_buffer, &newtup);
-
-                       if (Cbuf != cur_buffer)
-                       {
-                           PageSetLSN(Cpage, recptr);
-                           PageSetSUI(Cpage, ThisStartUpID);
-                       }
-                       PageSetLSN(ToPage, recptr);
-                       PageSetSUI(ToPage, ThisStartUpID);
-                   }
-                   else
-                   {
-                       /*
-                        * No XLOG record, but still need to flag that XID
-                        * exists on disk
-                        */
-                       MyXactMadeTempRelUpdate = true;
-                   }
-
-                   END_CRIT_SECTION();
+                   move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
+                                    dst_buffer, dst_page, destvacpage,
+                                    &ec, &Ctid, vtmove[ti].cleanVpd);
 
+                   num_moved++;
                    if (destvacpage->blkno > last_move_dest_block)
                        last_move_dest_block = destvacpage->blkno;
 
-                   /*
-                    * Set new tuple's t_ctid pointing to itself for last
-                    * tuple in chain, and to next tuple in chain
-                    * otherwise.
-                    */
-                   if (!ItemPointerIsValid(&Ctid))
-                       newtup.t_data->t_ctid = newtup.t_self;
-                   else
-                       newtup.t_data->t_ctid = Ctid;
-                   Ctid = newtup.t_self;
-
-                   num_moved++;
-
                    /*
                     * Remember that we moved tuple from the current page
                     * (corresponding index tuple will be cleaned).
@@ -2085,23 +2040,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                    else
                        keep_tuples++;
 
-                   LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
-                   if (cur_buffer != Cbuf)
-                       LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
-
-                   /* Create index entries for the moved tuple */
-                   if (resultRelInfo->ri_NumIndices > 0)
-                   {
-                       ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
-                       ExecInsertIndexTuples(slot, &(newtup.t_self),
-                                             estate, true);
-                   }
-
-                   WriteBuffer(cur_buffer);
+                   WriteBuffer(dst_buffer);
                    WriteBuffer(Cbuf);
                }               /* end of move-the-tuple-chain loop */
 
-               cur_buffer = InvalidBuffer;
+               dst_buffer = InvalidBuffer;
                pfree(vtmove);
                chain_tuple_moved = true;
 
@@ -2110,13 +2053,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
            }                   /* end of is-tuple-in-chain test */
 
            /* try to find new page for this tuple */
-           if (cur_buffer == InvalidBuffer ||
-               !enough_space(cur_page, tuple_len))
+           if (dst_buffer == InvalidBuffer ||
+               !enough_space(dst_vacpage, tuple_len))
            {
-               if (cur_buffer != InvalidBuffer)
+               if (dst_buffer != InvalidBuffer)
                {
-                   WriteBuffer(cur_buffer);
-                   cur_buffer = InvalidBuffer;
+                   WriteBuffer(dst_buffer);
+                   dst_buffer = InvalidBuffer;
                }
                for (i = 0; i < num_fraged_pages; i++)
                {
@@ -2125,110 +2068,32 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                }
                if (i == num_fraged_pages)
                    break;      /* can't move item anywhere */
-               cur_item = i;
-               cur_page = fraged_pages->pagedesc[cur_item];
-               cur_buffer = ReadBuffer(onerel, cur_page->blkno);
-               LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
-               ToPage = BufferGetPage(cur_buffer);
+               dst_vacpage = fraged_pages->pagedesc[i];
+               dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
+               LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
+               dst_page = BufferGetPage(dst_buffer);
                /* if this page was not used before - clean it */
-               if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
-                   vacuum_page(onerel, cur_buffer, cur_page);
+               if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
+                   vacuum_page(onerel, dst_buffer, dst_vacpage);
            }
            else
-               LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
+               LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
 
            LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
-           /* copy tuple */
-           heap_copytuple_with_tuple(&tuple, &newtup);
+           move_plain_tuple(onerel, buf, page, &tuple,
+                            dst_buffer, dst_page, dst_vacpage, &ec);
 
-           /*
-            * register invalidation of source tuple in catcaches.
-            *
-            * (Note: we do not need to register the copied tuple, because we
-            * are not changing the tuple contents and so there cannot be
-            * any need to flush negative catcache entries.)
-            */
-           CacheInvalidateHeapTuple(onerel, &tuple);
-
-           /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
-           START_CRIT_SECTION();
 
-           /*
-            * Mark new tuple as MOVED_IN by me.
-            */
-           newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                          HEAP_XMIN_INVALID |
-                                          HEAP_MOVED_OFF);
-           newtup.t_data->t_infomask |= HEAP_MOVED_IN;
-           HeapTupleHeaderSetXvac(newtup.t_data, myXID);
-
-           /* add tuple to the page */
-           newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
-                                InvalidOffsetNumber, LP_USED);
-           if (newoff == InvalidOffsetNumber)
-           {
-               elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
-                    (unsigned long) tuple_len,
-                    cur_page->blkno, (unsigned long) cur_page->free,
-                    cur_page->offsets_used, cur_page->offsets_free);
-           }
-           newitemid = PageGetItemId(ToPage, newoff);
-           pfree(newtup.t_data);
-           newtup.t_datamcxt = NULL;
-           newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
-           ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
-           newtup.t_self = newtup.t_data->t_ctid;
+           num_moved++;
+           if (dst_vacpage->blkno > last_move_dest_block)
+               last_move_dest_block = dst_vacpage->blkno;
 
            /*
-            * Mark old tuple as MOVED_OFF by me.
+            * Remember that we moved tuple from the current page
+            * (corresponding index tuple will be cleaned).
             */
-           tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                         HEAP_XMIN_INVALID |
-                                         HEAP_MOVED_IN);
-           tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
-           HeapTupleHeaderSetXvac(tuple.t_data, myXID);
-
-           /* XLOG stuff */
-           if (!onerel->rd_istemp)
-           {
-               XLogRecPtr  recptr =
-               log_heap_move(onerel, buf, tuple.t_self,
-                             cur_buffer, &newtup);
-
-               PageSetLSN(page, recptr);
-               PageSetSUI(page, ThisStartUpID);
-               PageSetLSN(ToPage, recptr);
-               PageSetSUI(ToPage, ThisStartUpID);
-           }
-           else
-           {
-               /*
-                * No XLOG record, but still need to flag that XID exists
-                * on disk
-                */
-               MyXactMadeTempRelUpdate = true;
-           }
-
-           END_CRIT_SECTION();
-
-           cur_page->offsets_used++;
-           num_moved++;
-           cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
-           if (cur_page->blkno > last_move_dest_block)
-               last_move_dest_block = cur_page->blkno;
-
            vacpage->offsets[vacpage->offsets_free++] = offnum;
-
-           LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
-           LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-
-           /* insert index' tuples if needed */
-           if (resultRelInfo->ri_NumIndices > 0)
-           {
-               ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
-               ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
-           }
        }                       /* walk along page */
 
        /*
@@ -2249,36 +2114,31 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                 off <= maxoff;
                 off = OffsetNumberNext(off))
            {
-               itemid = PageGetItemId(page, off);
+               ItemId  itemid = PageGetItemId(page, off);
+               HeapTupleHeader htup;
+
                if (!ItemIdIsUsed(itemid))
                    continue;
-               tuple.t_datamcxt = NULL;
-               tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
-               if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
+               htup = (HeapTupleHeader) PageGetItem(page, itemid);
+               if (htup->t_infomask & HEAP_XMIN_COMMITTED)
                    continue;
-               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                   elog(ERROR, "HEAP_MOVED_IN was not expected");
-               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
+               /*
+               ** See comments in the walk-along-page loop above, why we
+               ** have Asserts here instead of if (...) elog(ERROR).
+               */
+               Assert(!(htup->t_infomask & HEAP_MOVED_IN));
+               Assert(htup->t_infomask & HEAP_MOVED_OFF);
+               Assert(HeapTupleHeaderGetXvac(htup) == myXID);
+               if (chain_tuple_moved)
                {
-                   if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                       elog(ERROR, "invalid XVAC in tuple header");
-                   /* some chains was moved while */
-                   if (chain_tuple_moved)
-                   {           /* cleaning this page */
-                       Assert(vacpage->offsets_free > 0);
-                       for (i = 0; i < vacpage->offsets_free; i++)
-                       {
-                           if (vacpage->offsets[i] == off)
-                               break;
-                       }
-                       if (i >= vacpage->offsets_free) /* not found */
-                       {
-                           vacpage->offsets[vacpage->offsets_free++] = off;
-                           Assert(keep_tuples > 0);
-                           keep_tuples--;
-                       }
+                   /* some chains was moved while cleaning this page */
+                   Assert(vacpage->offsets_free > 0);
+                   for (i = 0; i < vacpage->offsets_free; i++)
+                   {
+                       if (vacpage->offsets[i] == off)
+                           break;
                    }
-                   else
+                   if (i >= vacpage->offsets_free) /* not found */
                    {
                        vacpage->offsets[vacpage->offsets_free++] = off;
                        Assert(keep_tuples > 0);
@@ -2286,7 +2146,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                    }
                }
                else
-                   elog(ERROR, "HEAP_MOVED_OFF was expected");
+               {
+                   vacpage->offsets[vacpage->offsets_free++] = off;
+                   Assert(keep_tuples > 0);
+                   keep_tuples--;
+               }
            }
        }
 
@@ -2312,10 +2176,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
    blkno++;                    /* new number of blocks */
 
-   if (cur_buffer != InvalidBuffer)
+   if (dst_buffer != InvalidBuffer)
    {
        Assert(num_moved > 0);
-       WriteBuffer(cur_buffer);
+       WriteBuffer(dst_buffer);
    }
 
    if (num_moved > 0)
@@ -2348,6 +2212,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        Assert((*curpage)->blkno < blkno);
        if ((*curpage)->offsets_used == 0)
        {
+           Buffer      buf;
+           Page        page;
+
            /* this page was not used as a move target, so must clean it */
            buf = ReadBuffer(onerel, (*curpage)->blkno);
            LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
@@ -2363,62 +2230,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
     * Now scan all the pages that we moved tuples onto and update tuple
     * status bits.  This is not really necessary, but will save time for
     * future transactions examining these tuples.
-    *
-    * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
-    * pages that were move source pages but not move dest pages.  One
-    * also wonders whether it wouldn't be better to skip this step and
-    * let the tuple status updates happen someplace that's not holding an
-    * exclusive lock on the relation.
     */
-   checked_moved = 0;
-   for (i = 0, curpage = fraged_pages->pagedesc;
-        i < num_fraged_pages;
-        i++, curpage++)
-   {
-       vacuum_delay_point();
-
-       Assert((*curpage)->blkno < blkno);
-       if ((*curpage)->blkno > last_move_dest_block)
-           break;              /* no need to scan any further */
-       if ((*curpage)->offsets_used == 0)
-           continue;           /* this page was never used as a move dest */
-       buf = ReadBuffer(onerel, (*curpage)->blkno);
-       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-       page = BufferGetPage(buf);
-       num_tuples = 0;
-       max_offset = PageGetMaxOffsetNumber(page);
-       for (newoff = FirstOffsetNumber;
-            newoff <= max_offset;
-            newoff = OffsetNumberNext(newoff))
-       {
-           itemid = PageGetItemId(page, newoff);
-           if (!ItemIdIsUsed(itemid))
-               continue;
-           tuple.t_datamcxt = NULL;
-           tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
-           if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
-           {
-               if (!(tuple.t_data->t_infomask & HEAP_MOVED))
-                   elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
-               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                   elog(ERROR, "invalid XVAC in tuple header");
-               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-               {
-                   tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
-                   tuple.t_data->t_infomask &= ~HEAP_MOVED;
-                   num_tuples++;
-               }
-               else
-                   tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
-           }
-       }
-       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-       WriteBuffer(buf);
-       Assert((*curpage)->offsets_used == num_tuples);
-       checked_moved += num_tuples;
-   }
-   Assert(num_moved == checked_moved);
-
+   update_hint_bits(onerel, fraged_pages, num_fraged_pages,
+                    last_move_dest_block, num_moved);
+  
    /*
     * It'd be cleaner to make this report at the bottom of this routine,
     * but then the rusage would double-count the second pass of index
@@ -2455,6 +2270,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                *vpleft = *vpright;
                *vpright = vpsave;
            }
+           /*
+            * keep_tuples is the number of tuples that have been moved
+            * off a page during chain moves but not been scanned over
+            * subsequently.  The tuple ids of these tuples are not
+            * recorded as free offsets for any VacPage, so they will not
+            * be cleared from the indexes.
+            */
            Assert(keep_tuples >= 0);
            for (i = 0; i < nindexes; i++)
                vacuum_index(&Nvacpagelist, Irel[i],
@@ -2465,36 +2287,41 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        if (vacpage->blkno == (blkno - 1) &&
            vacpage->offsets_free > 0)
        {
-           OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
-           int         uncnt;
+           Buffer          buf;
+           Page            page;
+           OffsetNumber    unused[BLCKSZ / sizeof(OffsetNumber)];
+           OffsetNumber    offnum,
+                           maxoff;
+           int             uncnt;
+           int             num_tuples = 0;
 
            buf = ReadBuffer(onerel, vacpage->blkno);
            LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
            page = BufferGetPage(buf);
-           num_tuples = 0;
            maxoff = PageGetMaxOffsetNumber(page);
            for (offnum = FirstOffsetNumber;
                 offnum <= maxoff;
                 offnum = OffsetNumberNext(offnum))
            {
-               itemid = PageGetItemId(page, offnum);
+               ItemId  itemid = PageGetItemId(page, offnum);
+               HeapTupleHeader htup;
+
                if (!ItemIdIsUsed(itemid))
                    continue;
-               tuple.t_datamcxt = NULL;
-               tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+               htup = (HeapTupleHeader) PageGetItem(page, itemid);
+               if (htup->t_infomask & HEAP_XMIN_COMMITTED)
+                   continue;
 
-               if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
-               {
-                   if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-                   {
-                       if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                           elog(ERROR, "invalid XVAC in tuple header");
-                       itemid->lp_flags &= ~LP_USED;
-                       num_tuples++;
-                   }
-                   else
-                       elog(ERROR, "HEAP_MOVED_OFF was expected");
-               }
+               /*
+               ** See comments in the walk-along-page loop above, why we
+               ** have Asserts here instead of if (...) elog(ERROR).
+               */
+               Assert(!(htup->t_infomask & HEAP_MOVED_IN));
+               Assert(htup->t_infomask & HEAP_MOVED_OFF);
+               Assert(HeapTupleHeaderGetXvac(htup) == myXID);
+
+               itemid->lp_flags &= ~LP_USED;
+               num_tuples++;
 
            }
            Assert(vacpage->offsets_free == num_tuples);
@@ -2554,11 +2381,337 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
    if (vacrelstats->vtlinks != NULL)
        pfree(vacrelstats->vtlinks);
 
-   ExecDropTupleTable(tupleTable, true);
+   ExecContext_Finish(&ec);
+}
+
+/*
+ * move_chain_tuple() -- move one tuple that is part of a tuple chain
+ *
+ *     This routine moves old_tup from old_page to dst_page.
+ *     old_page and dst_page might be the same page.
+ *     On entry old_buf and dst_buf are locked exclusively, both locks (or
+ *     the single lock, if this is a intra-page-move) are released before
+ *     exit.
+ *
+ *     Yes, a routine with ten parameters is ugly, but it's still better
+ *     than having these 120 lines of code in repair_frag() which is
+ *     already too long and almost unreadable.
+ */
+static void
+move_chain_tuple(Relation rel,
+                Buffer old_buf, Page old_page, HeapTuple old_tup,
+                Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                ExecContext ec, ItemPointer ctid, bool cleanVpd)
+{
+   TransactionId myXID = GetCurrentTransactionId();
+   HeapTupleData   newtup;
+   OffsetNumber    newoff;
+   ItemId          newitemid;
+   Size            tuple_len = old_tup->t_len;
+
+   heap_copytuple_with_tuple(old_tup, &newtup);
+
+   /*
+    * register invalidation of source tuple in catcaches.
+    */
+   CacheInvalidateHeapTuple(rel, old_tup);
+
+   /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
+   START_CRIT_SECTION();
+
+   old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                 HEAP_XMIN_INVALID |
+                                 HEAP_MOVED_IN);
+   old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
+   HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
+
+   /*
+    * If this page was not used before - clean it.
+    *
+    * NOTE: a nasty bug used to lurk here.  It is possible
+    * for the source and destination pages to be the same
+    * (since this tuple-chain member can be on a page
+    * lower than the one we're currently processing in
+    * the outer loop).  If that's true, then after
+    * vacuum_page() the source tuple will have been
+    * moved, and tuple.t_data will be pointing at
+    * garbage.  Therefore we must do everything that uses
+    * old_tup->t_data BEFORE this step!!
+    *
+    * This path is different from the other callers of
+    * vacuum_page, because we have already incremented
+    * the vacpage's offsets_used field to account for the
+    * tuple(s) we expect to move onto the page. Therefore
+    * vacuum_page's check for offsets_used == 0 is wrong.
+    * But since that's a good debugging check for all
+    * other callers, we work around it here rather than
+    * remove it.
+    */
+   if (!PageIsEmpty(dst_page) && cleanVpd)
+   {
+       int     sv_offsets_used = dst_vacpage->offsets_used;
+
+       dst_vacpage->offsets_used = 0;
+       vacuum_page(rel, dst_buf, dst_vacpage);
+       dst_vacpage->offsets_used = sv_offsets_used;
+   }
+
+   /*
+    * Update the state of the copied tuple, and store it
+    * on the destination page.
+    */
+   newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                  HEAP_XMIN_INVALID |
+                                  HEAP_MOVED_OFF);
+   newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+   HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+   newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
+                        InvalidOffsetNumber, LP_USED);
+   if (newoff == InvalidOffsetNumber)
+   {
+       elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
+         (unsigned long) tuple_len, dst_vacpage->blkno);
+   }
+   newitemid = PageGetItemId(dst_page, newoff);
+   pfree(newtup.t_data);
+   newtup.t_datamcxt = NULL;
+   newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
+   ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
+
+   /* XLOG stuff */
+   if (!rel->rd_istemp)
+   {
+       XLogRecPtr  recptr = log_heap_move(rel, old_buf, old_tup->t_self,
+                                          dst_buf, &newtup);
 
-   ExecCloseIndices(resultRelInfo);
+       if (old_buf != dst_buf)
+       {
+           PageSetLSN(old_page, recptr);
+           PageSetSUI(old_page, ThisStartUpID);
+       }
+       PageSetLSN(dst_page, recptr);
+       PageSetSUI(dst_page, ThisStartUpID);
+   }
+   else
+   {
+       /*
+        * No XLOG record, but still need to flag that XID
+        * exists on disk
+        */
+       MyXactMadeTempRelUpdate = true;
+   }
+
+   END_CRIT_SECTION();
+
+   /*
+    * Set new tuple's t_ctid pointing to itself for last
+    * tuple in chain, and to next tuple in chain
+    * otherwise.
+    */
+   /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
+   if (!ItemPointerIsValid(ctid))
+       newtup.t_data->t_ctid = newtup.t_self;
+   else
+       newtup.t_data->t_ctid = *ctid;
+   *ctid = newtup.t_self;
 
-   FreeExecutorState(estate);
+   LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
+   if (dst_buf != old_buf)
+       LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
+
+   /* Create index entries for the moved tuple */
+   if (ec->resultRelInfo->ri_NumIndices > 0)
+   {
+       ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
+       ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
+   }
+}
+
+/*
+ * move_plain_tuple() -- move one tuple that is not part of a chain
+ *
+ *     This routine moves old_tup from old_page to dst_page.
+ *     On entry old_buf and dst_buf are locked exclusively, both locks are
+ *     released before exit.
+ *
+ *     Yes, a routine with eight parameters is ugly, but it's still better
+ *     than having these 90 lines of code in repair_frag() which is already
+ *     too long and almost unreadable.
+ */
+static void
+move_plain_tuple(Relation rel,
+                Buffer old_buf, Page old_page, HeapTuple old_tup,
+                Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                ExecContext ec)
+{
+   TransactionId myXID = GetCurrentTransactionId();
+   HeapTupleData   newtup;
+   OffsetNumber    newoff;
+   ItemId          newitemid;
+   Size            tuple_len = old_tup->t_len;
+
+   /* copy tuple */
+   heap_copytuple_with_tuple(old_tup, &newtup);
+
+   /*
+    * register invalidation of source tuple in catcaches.
+    *
+    * (Note: we do not need to register the copied tuple, because we
+    * are not changing the tuple contents and so there cannot be
+    * any need to flush negative catcache entries.)
+    */
+   CacheInvalidateHeapTuple(rel, old_tup);
+
+   /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
+   START_CRIT_SECTION();
+
+   /*
+    * Mark new tuple as MOVED_IN by me.
+    */
+   newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                  HEAP_XMIN_INVALID |
+                                  HEAP_MOVED_OFF);
+   newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+   HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+
+   /* add tuple to the page */
+   newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
+                        InvalidOffsetNumber, LP_USED);
+   if (newoff == InvalidOffsetNumber)
+   {
+       elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
+            (unsigned long) tuple_len,
+            dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
+            dst_vacpage->offsets_used, dst_vacpage->offsets_free);
+   }
+   newitemid = PageGetItemId(dst_page, newoff);
+   pfree(newtup.t_data);
+   newtup.t_datamcxt = NULL;
+   newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
+   ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
+   newtup.t_self = newtup.t_data->t_ctid;
+
+   /*
+    * Mark old tuple as MOVED_OFF by me.
+    */
+   old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                 HEAP_XMIN_INVALID |
+                                 HEAP_MOVED_IN);
+   old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
+   HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
+
+   /* XLOG stuff */
+   if (!rel->rd_istemp)
+   {
+       XLogRecPtr  recptr = log_heap_move(rel, old_buf, old_tup->t_self,
+                                          dst_buf, &newtup);
+
+       PageSetLSN(old_page, recptr);
+       PageSetSUI(old_page, ThisStartUpID);
+       PageSetLSN(dst_page, recptr);
+       PageSetSUI(dst_page, ThisStartUpID);
+   }
+   else
+   {
+       /*
+        * No XLOG record, but still need to flag that XID exists
+        * on disk
+        */
+       MyXactMadeTempRelUpdate = true;
+   }
+
+   END_CRIT_SECTION();
+
+   dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
+                       ((PageHeader) dst_page)->pd_lower;
+   LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
+   LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
+
+   dst_vacpage->offsets_used++;
+
+   /* insert index' tuples if needed */
+   if (ec->resultRelInfo->ri_NumIndices > 0)
+   {
+       ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
+       ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
+   }
+}
+
+/*
+ * update_hint_bits() -- update hint bits in destination pages
+ *
+ * Scan all the pages that we moved tuples onto and update tuple
+ * status bits.  This is not really necessary, but will save time for
+ * future transactions examining these tuples.
+ *
+ * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
+ * pages that were move source pages but not move dest pages.  One
+ * also wonders whether it wouldn't be better to skip this step and
+ * let the tuple status updates happen someplace that's not holding an
+ * exclusive lock on the relation.
+ */
+static void
+update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
+                BlockNumber last_move_dest_block, int num_moved)
+{
+   int         checked_moved = 0;
+   int         i;
+   VacPage    *curpage;
+
+   for (i = 0, curpage = fraged_pages->pagedesc;
+        i < num_fraged_pages;
+        i++, curpage++)
+   {
+       Buffer          buf;
+       Page            page;
+       OffsetNumber    max_offset;
+       OffsetNumber    off;
+       int             num_tuples = 0;
+
+       vacuum_delay_point();
+
+       if ((*curpage)->blkno > last_move_dest_block)
+           break;              /* no need to scan any further */
+       if ((*curpage)->offsets_used == 0)
+           continue;           /* this page was never used as a move dest */
+       buf = ReadBuffer(rel, (*curpage)->blkno);
+       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+       page = BufferGetPage(buf);
+       max_offset = PageGetMaxOffsetNumber(page);
+       for (off = FirstOffsetNumber;
+            off <= max_offset;
+            off = OffsetNumberNext(off))
+       {
+           ItemId  itemid = PageGetItemId(page, off);
+           HeapTupleHeader htup;
+
+           if (!ItemIdIsUsed(itemid))
+               continue;
+           htup = (HeapTupleHeader) PageGetItem(page, itemid);
+           if (htup->t_infomask & HEAP_XMIN_COMMITTED)
+               continue;
+           /*
+            * See comments in the walk-along-page loop above, why we
+            * have Asserts here instead of if (...) elog(ERROR).  The
+            * difference here is that we may see MOVED_IN.
+            */
+           Assert(htup->t_infomask & HEAP_MOVED);
+           Assert(HeapTupleHeaderGetXvac(htup) == GetCurrentTransactionId());
+           if (htup->t_infomask & HEAP_MOVED_IN)
+           {
+               htup->t_infomask |= HEAP_XMIN_COMMITTED;
+               htup->t_infomask &= ~HEAP_MOVED;
+               num_tuples++;
+           }
+           else
+               htup->t_infomask |= HEAP_XMIN_INVALID;
+       }
+       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+       WriteBuffer(buf);
+       Assert((*curpage)->offsets_used == num_tuples);
+       checked_moved += num_tuples;
+   }
+   Assert(num_moved == checked_moved);
 }
 
 /*