Fix tuple chain moving bug found by "Hiroshi Inoue" .
authorVadim B. Mikheev
Sun, 23 May 1999 09:10:24 +0000 (09:10 +0000)
committerVadim B. Mikheev
Sun, 23 May 1999 09:10:24 +0000 (09:10 +0000)
src/backend/commands/vacuum.c

index d362ed2b10ffe85cbc2207b8095484d43c048027..82ba86f84e982d6dbb872f2aca1ee5dd48d6793a 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.102 1999/05/10 00:44:59 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.103 1999/05/23 09:10:24 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -87,7 +87,7 @@ static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuu
 static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages, VPageList fraged_pages, int nindices, Relation *Irel);
 static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl);
 static void vc_vacpage(Page page, VPageDescr vpd);
-static void vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples);
+static void vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples, int keep_tuples);
 static void vc_scanoneind(Relation indrel, int num_tuples);
 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple tuple);
 static void vc_bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int16 *bucket_len);
@@ -541,7 +541,7 @@ vc_vacone(Oid relid, bool analyze, List *va_cols)
        if (vacuum_pages.vpl_num_pages > 0)
        {
            for (i = 0; i < nindices; i++)
-               vc_vaconeind(&vacuum_pages, Irel[i], vacrelstats->num_tuples);
+               vc_vaconeind(&vacuum_pages, Irel[i], vacrelstats->num_tuples, 0);
        }
        else
 /* just scan indices to update statistic */
@@ -1042,9 +1042,11 @@ vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
                num_fraged_pages,
                vacuumed_pages;
    int         checked_moved,
-               num_tuples;
+               num_tuples,
+               keep_tuples = 0;
    bool        isempty,
-               dowrite;
+               dowrite,
+               chain_tuple_moved;
    struct rusage ru0,
                ru1;
 
@@ -1126,6 +1128,7 @@ vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
        else
            Assert(!isempty);
 
+       chain_tuple_moved = false;  /* no one chain-tuple was moved off this page, yet */
        vpc->vpd_blkno = blkno;
        maxoff = PageGetMaxOffsetNumber(page);
        for (offnum = FirstOffsetNumber;
@@ -1145,11 +1148,39 @@ vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
            {
                if ((TransactionId)tuple.t_data->t_cmin != myXID)
                    elog(ERROR, "Invalid XID in t_cmin");
-               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-                   continue;   /* already removed by me */
                if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                   break;
-               elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+                   elog(ERROR, "HEAP_MOVED_IN was not expected");
+               /* 
+                * If this (chain) tuple is moved by me already then
+                * I have to check is it in vpc or not - i.e. is it 
+                * moved while cleaning this page or some previous one.
+                */
+               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
+               {
+                   if (keep_tuples == 0)
+                       continue;
+                   if (chain_tuple_moved)  /* some chains was moved while */
+                   {                       /* cleaning this page */
+                       Assert(vpc->vpd_offsets_free > 0);
+                       for (i = 0; i < vpc->vpd_offsets_free; i++)
+                       {
+                           if (vpc->vpd_offsets[i] == offnum)
+                               break;
+                       }
+                       if (i >= vpc->vpd_offsets_free) /* not found */
+                       {
+                           vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum;
+                           keep_tuples--;
+                       }
+                   }
+                   else
+                   {
+                       vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum;
+                       keep_tuples--;
+                   }
+                   continue;
+               }
+               elog(ERROR, "HEAP_MOVED_OFF was expected");
            }
 
            /*
@@ -1386,9 +1417,15 @@ moving chain: failed to add item with len = %u to page %u",
                    tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
 
                    num_moved++;
+                   /*
+                    * Remember that we moved tuple from the current page
+                    * (corresponding index tuple will be cleaned).
+                    */
                    if (Cbuf == buf)
                        vpc->vpd_offsets[vpc->vpd_offsets_free++] = 
                                ItemPointerGetOffsetNumber(&(tuple.t_self));
+                   else
+                       keep_tuples++;
 
                    if (Irel != (Relation *) NULL)
                    {
@@ -1418,6 +1455,7 @@ moving chain: failed to add item with len = %u to page %u",
                }
                cur_buffer = InvalidBuffer;
                pfree(vtmove);
+               chain_tuple_moved = true;
                continue;
            }
 
@@ -1532,10 +1570,58 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
 
        }                       /* walk along page */
 
+       if (offnum < maxoff && keep_tuples > 0)
+       {
+           OffsetNumber    off;
+
+           for (off = OffsetNumberNext(offnum);
+                   off <= maxoff;
+                   off = OffsetNumberNext(off))
+           {
+               itemid = PageGetItemId(page, off);
+               if (!ItemIdIsUsed(itemid))
+                   continue;
+               tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+               if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
+                   continue;
+               if ((TransactionId)tuple.t_data->t_cmin != myXID)
+                   elog(ERROR, "Invalid XID in t_cmin (4)");
+               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
+                   elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
+               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
+               {
+                   if (chain_tuple_moved)  /* some chains was moved while */
+                   {                       /* cleaning this page */
+                       Assert(vpc->vpd_offsets_free > 0);
+                       for (i = 0; i < vpc->vpd_offsets_free; i++)
+                       {
+                           if (vpc->vpd_offsets[i] == off)
+                               break;
+                       }
+                       if (i >= vpc->vpd_offsets_free) /* not found */
+                       {
+                           vpc->vpd_offsets[vpc->vpd_offsets_free++] = off;
+                           Assert(keep_tuples > 0);
+                           keep_tuples--;
+                       }
+                   }
+                   else
+                   {
+                       vpc->vpd_offsets[vpc->vpd_offsets_free++] = off;
+                       Assert(keep_tuples > 0);
+                       keep_tuples--;
+                   }
+               }
+           }
+       }
+
        if (vpc->vpd_offsets_free > 0)  /* some tuples were moved */
        {
-           qsort((char *) (vpc->vpd_offsets), vpc->vpd_offsets_free, 
+           if (chain_tuple_moved)      /* else - they are ordered */
+           {
+               qsort((char *) (vpc->vpd_offsets), vpc->vpd_offsets_free, 
                                    sizeof(OffsetNumber), vc_cmp_offno);
+           }
            vc_reappage(&Nvpl, vpc);
            WriteBuffer(buf);
        }
@@ -1559,7 +1645,6 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
 
    if (num_moved > 0)
    {
-
        /*
         * We have to commit our tuple' movings before we'll truncate
         * relation, but we shouldn't lose our locks. And so - quick hack:
@@ -1610,7 +1695,7 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
                    else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                        tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
                    else
-                       elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected (2)");
+                       elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
                }
            }
            Assert((*vpp)->vpd_offsets_used == num_tuples);
@@ -1647,8 +1732,10 @@ Elapsed %u/%u sec.",
                *vpleft = *vpright;
                *vpright = vpsave;
            }
+           Assert(keep_tuples >= 0);
            for (i = 0; i < nindices; i++)
-               vc_vaconeind(&Nvpl, Irel[i], vacrelstats->num_tuples);
+               vc_vaconeind(&Nvpl, Irel[i], 
+                           vacrelstats->num_tuples, keep_tuples);
        }
 
        /*
@@ -1678,7 +1765,7 @@ Elapsed %u/%u sec.",
                        num_tuples++;
                    }
                    else
-                       elog(ERROR, "HEAP_MOVED_OFF was expected");
+                       elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
                }
 
            }
@@ -1854,7 +1941,7 @@ vc_scanoneind(Relation indrel, int num_tuples)
  *     pg_class.
  */
 static void
-vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples)
+vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples, int keep_tuples)
 {
    RetrieveIndexResult res;
    IndexScanDesc iscan;
@@ -1911,11 +1998,12 @@ vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples)
    getrusage(RUSAGE_SELF, &ru1);
 
    elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
-        indrel->rd_rel->relname.data, num_pages, num_index_tuples, tups_vacuumed,
+        indrel->rd_rel->relname.data, num_pages, 
+        num_index_tuples - keep_tuples, tups_vacuumed,
         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
 
-   if (num_index_tuples != num_tuples)
+   if (num_index_tuples != num_tuples + keep_tuples)
        elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
             indrel->rd_rel->relname.data, num_index_tuples, num_tuples);