Major overhaul of large-object implementation, by Denis Perchine with
authorTom Lane
Tue, 24 Oct 2000 01:38:44 +0000 (01:38 +0000)
committerTom Lane
Tue, 24 Oct 2000 01:38:44 +0000 (01:38 +0000)
kibitzing from Tom Lane.  Large objects are now all stored in a single
system relation "pg_largeobject" --- no more xinv or xinx files, no more
relkind 'l'.  This should offer substantial performance improvement for
large numbers of LOs, since there won't be directory bloat anymore.
It'll also fix problems like running out of locktable space when you
access thousands of LOs in one transaction.
Also clean up cruft in read/write routines.  LOs with "holes" in them
(never-written byte ranges) now work just like Unix files with holes do:
a hole reads as zeroes but doesn't occupy storage space.
INITDB forced!

21 files changed:
contrib/pg_dumplo/lo_export.c
contrib/vacuumlo/vacuumlo.c
doc/src/sgml/ref/psql-ref.sgml
src/backend/catalog/Makefile
src/backend/catalog/indexing.c
src/backend/catalog/pg_largeobject.c [new file with mode: 0644]
src/backend/libpq/be-fsstubs.c
src/backend/storage/large_object/inv_api.c
src/bin/pg_dump/pg_dump.c
src/bin/pgtclsh/updateStats.tcl
src/bin/psql/describe.c
src/bin/psql/large_obj.c
src/include/catalog/catname.h
src/include/catalog/catversion.h
src/include/catalog/indexing.h
src/include/catalog/pg_class.h
src/include/catalog/pg_largeobject.h [new file with mode: 0644]
src/include/storage/large_object.h
src/interfaces/odbc/info.c
src/test/regress/expected/opr_sanity.out
src/test/regress/expected/sanity_check.out

index e18c3ef651ea807e0d1f2ac561ab817ab5d45e71..248cf831f5c2dc1bba93bf6e624af9f52b41f040 100644 (file)
@@ -94,7 +94,7 @@ pglo_export(LODumpMaster *pgLO)
         * Query
         * ----------
         */
-       sprintf(Qbuff, "SELECT x.%s FROM %s x, pg_class c WHERE x.%s = c.oid and c.relkind = 'l'", 
+       sprintf(Qbuff, "SELECT DISTINCT x.\"%s\" FROM \"%s\" x, pg_largeobject l WHERE x.\"%s\" = l.loid",
            ll->lo_attr, ll->lo_table, ll->lo_attr);
        
        /* puts(Qbuff); */
@@ -104,7 +104,8 @@ pglo_export(LODumpMaster *pgLO)
        if ((tuples = PQntuples(pgLO->res)) == 0) {
        
            if (!pgLO->quiet && pgLO->action == ACTION_EXPORT_ATTR)
-               printf("%s: no large objets in '%s'\n", progname, ll->lo_table);    
+               printf("%s: no large objects in '%s'\n",
+                      progname, ll->lo_table); 
            continue;
        
        } else if (check_res(pgLO)) {
index 3f2c592c091284630083c8d78ad03dff11aa8b62..6e46caf8dd6d9bfbc1dcad374a44c050c8d1b980 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/contrib/vacuumlo/vacuumlo.c,v 1.5 2000/06/19 13:54:50 momjian Exp $
+ *   $Header: /cvsroot/pgsql/contrib/vacuumlo/vacuumlo.c,v 1.6 2000/10/24 01:38:20 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -59,10 +59,9 @@ vacuumlo(char *database, int verbose)
     * First we create and populate the lo temp table
     */
    buf[0] = '\0';
-   strcat(buf, "SELECT oid AS lo ");
+   strcat(buf, "SELECT DISTINCT loid AS lo ");
    strcat(buf, "INTO TEMP TABLE vacuum_l ");
-   strcat(buf, "FROM pg_class ");
-   strcat(buf, "WHERE relkind='l'");
+   strcat(buf, "FROM pg_largeobject ");
    if (!(res = PQexec(conn, buf)))
    {
        fprintf(stderr, "Failed to create temp table.\n");
index c8daa1f7a40c14ecbc2fde4f7f8f21237bf70b37..446449d95e251b42c911887465501357d93ebf4b 100644 (file)
@@ -1,5 +1,5 @@
 
 
@@ -706,7 +706,8 @@ lo_import 152801
    
    
    Shows a list of all Postgres large
-   objects currently stored in the database along with their owners.
+   objects currently stored in the database, along with any
+   comments provided for them.
    
    
       
index 6a5beee94d7ef3e4aa31c8996eb5568cd076c785..e17a37388c61bfb408590781dab61a9e43a997ca 100644 (file)
@@ -2,7 +2,7 @@
 #
 # Makefile for catalog
 #
-# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.30 2000/10/22 05:27:10 momjian Exp $
+# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.31 2000/10/24 01:38:23 tgl Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -11,7 +11,8 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = catalog.o heap.o index.o indexing.o aclchk.o \
-       pg_aggregate.o pg_operator.o pg_proc.o pg_type.o
+       pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \
+       pg_type.o
 
 BKIFILES = global.bki template1.bki global.description template1.description
 
@@ -29,7 +30,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\
    pg_proc.h pg_type.h pg_attribute.h pg_class.h \
    pg_inherits.h pg_index.h pg_statistic.h \
    pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
-   pg_language.h \
+   pg_language.h pg_largeobject.h \
    pg_aggregate.h pg_ipl.h pg_inheritproc.h \
    pg_rewrite.h pg_listener.h pg_description.h indexing.h \
     )
index 342896a93b233beff8cb6f0b908bb23d3d2e1e66..1a96c3f5ea538b33263ab468f0e6d56ae5432ba4 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.71 2000/10/22 05:27:10 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.72 2000/10/24 01:38:22 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -51,6 +51,8 @@ char     *Name_pg_inherits_indices[Num_pg_inherits_indices] =
 {InheritsRelidSeqnoIndex};
 char      *Name_pg_language_indices[Num_pg_language_indices] =
 {LanguageOidIndex, LanguageNameIndex};
+char      *Name_pg_largeobject_indices[Num_pg_largeobject_indices] =
+{LargeObjectLOidPNIndex};
 char      *Name_pg_listener_indices[Num_pg_listener_indices] =
 {ListenerPidRelnameIndex};
 char      *Name_pg_opclass_indices[Num_pg_opclass_indices] =
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c
new file mode 100644 (file)
index 0000000..c471a9a
--- /dev/null
@@ -0,0 +1,184 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.c
+ *   routines to support manipulation of the pg_largeobject relation
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *   $Header: /cvsroot/pgsql/src/backend/catalog/pg_largeobject.c,v 1.5 2000/10/24 01:38:23 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_largeobject.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+
+
+/*
+ * Create a large object having the given LO identifier.
+ *
+ * We do this by inserting an empty first page, so that the object will
+ * appear to exist with size 0.  Note that the unique index will reject
+ * an attempt to create a duplicate page.
+ *
+ * Return value is OID assigned to the page tuple (any use in it?)
+ */
+Oid
+LargeObjectCreate(Oid loid)
+{
+   Oid         retval;
+   Relation    pg_largeobject;
+   HeapTuple   ntup;
+   Relation    idescs[Num_pg_largeobject_indices];
+   Datum       values[Natts_pg_largeobject];
+   char        nulls[Natts_pg_largeobject];
+   int         i;
+
+   pg_largeobject = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+
+   /*
+    * Form new tuple
+    */
+   for (i = 0; i < Natts_pg_largeobject; i++)
+   {
+       values[i] = (Datum)NULL;
+       nulls[i] = ' ';
+   }
+
+   i = 0;
+   values[i++] = ObjectIdGetDatum(loid);
+   values[i++] = Int32GetDatum(0);
+   values[i++] = DirectFunctionCall1(byteain,
+                                     CStringGetDatum(""));
+   
+   ntup = heap_formtuple(pg_largeobject->rd_att, values, nulls);
+
+   /*
+    * Insert it
+    */
+   retval = heap_insert(pg_largeobject, ntup);
+
+   /*
+    * Update indices
+    */
+   if (!IsIgnoringSystemIndexes())
+   {
+       CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
+       CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup);
+       CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
+   }
+   
+   heap_close(pg_largeobject, RowExclusiveLock);
+
+   heap_freetuple(ntup);
+
+   return retval;
+}
+
+void
+LargeObjectDrop(Oid loid)
+{
+   bool        found = false;
+   Relation    pg_largeobject;
+   Relation    pg_lo_idx;
+   ScanKeyData skey[1];
+   IndexScanDesc sd;
+   RetrieveIndexResult indexRes;
+   HeapTupleData tuple;
+   Buffer      buffer;
+
+   ScanKeyEntryInitialize(&skey[0],
+                          (bits16) 0x0,
+                          (AttrNumber) 1,
+                          (RegProcedure) F_OIDEQ,
+                          ObjectIdGetDatum(loid));
+
+   pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock);
+   pg_lo_idx = index_openr(LargeObjectLOidPNIndex);
+
+   sd = index_beginscan(pg_lo_idx, false, 1, skey);
+
+   tuple.t_datamcxt = CurrentMemoryContext;
+   tuple.t_data = NULL;
+
+   while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+   {
+       tuple.t_self = indexRes->heap_iptr;
+       heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer);
+       pfree(indexRes);
+       if (tuple.t_data != NULL)
+       {
+           heap_delete(pg_largeobject, &tuple.t_self, NULL);
+           ReleaseBuffer(buffer);
+           found = true;
+       }
+   }
+
+   index_endscan(sd);
+
+   index_close(pg_lo_idx);
+   heap_close(pg_largeobject, RowShareLock);
+
+   if (!found)
+       elog(ERROR, "LargeObjectDrop: large object %u not found", loid);
+}
+
+bool
+LargeObjectExists(Oid loid)
+{
+   bool        retval = false;
+   Relation    pg_largeobject;
+   Relation    pg_lo_idx;
+   ScanKeyData skey[1];
+   IndexScanDesc sd;
+   RetrieveIndexResult indexRes;
+   HeapTupleData tuple;
+   Buffer      buffer;
+
+   /*
+    * See if we can find any tuples belonging to the specified LO
+    */
+   ScanKeyEntryInitialize(&skey[0],
+                          (bits16) 0x0,
+                          (AttrNumber) 1,
+                          (RegProcedure) F_OIDEQ,
+                          ObjectIdGetDatum(loid));
+
+   pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock);
+   pg_lo_idx = index_openr(LargeObjectLOidPNIndex);
+
+   sd = index_beginscan(pg_lo_idx, false, 1, skey);
+
+   tuple.t_datamcxt = CurrentMemoryContext;
+   tuple.t_data = NULL;
+
+   while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+   {
+       tuple.t_self = indexRes->heap_iptr;
+       heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer);
+       pfree(indexRes);
+       if (tuple.t_data != NULL)
+       {
+           retval = true;
+           ReleaseBuffer(buffer);
+           break;
+       }
+   }
+
+   index_endscan(sd);
+
+   index_close(pg_lo_idx);
+   heap_close(pg_largeobject, RowShareLock);
+
+   return retval;
+}
index bb5c7f6e5564faed571289fee759d5a0f30468dc..7eff84e5d3322af2feff7969c6dc144b6aeffe08 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.54 2000/10/22 05:27:12 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.55 2000/10/24 01:38:26 tgl Exp $
  *
  * NOTES
  *   This should be moved to a more appropriate place.  It is here
  *-------------------------------------------------------------------------
  */
 
+#include "postgres.h"
+
 #include 
 #include 
 #include 
 #include 
 
-#include "postgres.h"
-
 #include "catalog/pg_shadow.h"
 #include "libpq/be-fsstubs.h"
 #include "libpq/libpq-fs.h"
@@ -50,8 +50,7 @@
 
 /*#define FSDB 1*/
 #define MAX_LOBJ_FDS   256
-#define BUFSIZE            1024
-#define FNAME_BUFSIZE  8192
+#define BUFSIZE            8192
 
 /*
  * LO "FD"s are indexes into this array.
@@ -141,10 +140,10 @@ lo_close(PG_FUNCTION_ARGS)
 
    inv_close(cookies[fd]);
 
-   MemoryContextSwitchTo(currentContext);
-
    deleteLOfd(fd);
 
+   MemoryContextSwitchTo(currentContext);
+
    PG_RETURN_INT32(0);
 }
 
@@ -267,7 +266,7 @@ lo_creat(PG_FUNCTION_ARGS)
        PG_RETURN_OID(InvalidOid);
    }
 
-   lobjId = RelationGetRelid(lobjDesc->heap_r);
+   lobjId = lobjDesc->id;
 
    inv_close(lobjDesc);
 
@@ -310,8 +309,8 @@ lo_unlink(PG_FUNCTION_ARGS)
     * any LO-specific data structures at all.  (Again, that's probably
     * more than this module ought to be assuming.)
     *
-    * XXX there ought to be some code to clean up any open LOs that
-    * reference the specified relation... as is, they remain "open".
+    * XXX there ought to be some code to clean up any open LO FDs that
+    * reference the specified LO... as is, they remain "open".
     */
    PG_RETURN_INT32(inv_drop(lobjId));
 }
@@ -367,7 +366,7 @@ lo_import(PG_FUNCTION_ARGS)
    int         nbytes,
                tmp;
    char        buf[BUFSIZE];
-   char        fnamebuf[FNAME_BUFSIZE];
+   char        fnamebuf[MAXPGPATH];
    LargeObjectDesc *lobj;
    Oid         lobjOid;
 
@@ -382,8 +381,8 @@ lo_import(PG_FUNCTION_ARGS)
     * open the file to be read in
     */
    nbytes = VARSIZE(filename) - VARHDRSZ;
-   if (nbytes >= FNAME_BUFSIZE)
-       nbytes = FNAME_BUFSIZE-1;
+   if (nbytes >= MAXPGPATH)
+       nbytes = MAXPGPATH-1;
    memcpy(fnamebuf, VARDATA(filename), nbytes);
    fnamebuf[nbytes] = '\0';
    fd = PathNameOpenFile(fnamebuf, O_RDONLY | PG_BINARY, 0666);
@@ -398,12 +397,7 @@ lo_import(PG_FUNCTION_ARGS)
    if (lobj == NULL)
        elog(ERROR, "lo_import: can't create inv object for \"%s\"",
             fnamebuf);
-
-   /*
-    * the oid for the large object is just the oid of the relation
-    * XInv??? which contains the data.
-    */
-   lobjOid = RelationGetRelid(lobj->heap_r);
+   lobjOid = lobj->id;
 
    /*
     * read in from the Unix file and write to the inversion file
@@ -411,7 +405,7 @@ lo_import(PG_FUNCTION_ARGS)
    while ((nbytes = FileRead(fd, buf, BUFSIZE)) > 0)
    {
        tmp = inv_write(lobj, buf, nbytes);
-       if (tmp < nbytes)
+       if (tmp != nbytes)
            elog(ERROR, "lo_import: error while reading \"%s\"",
                 fnamebuf);
    }
@@ -435,7 +429,7 @@ lo_export(PG_FUNCTION_ARGS)
    int         nbytes,
                tmp;
    char        buf[BUFSIZE];
-   char        fnamebuf[FNAME_BUFSIZE];
+   char        fnamebuf[MAXPGPATH];
    LargeObjectDesc *lobj;
    mode_t      oumask;
 
@@ -461,8 +455,8 @@ lo_export(PG_FUNCTION_ARGS)
     * world-writable export files doesn't seem wise.
     */
    nbytes = VARSIZE(filename) - VARHDRSZ;
-   if (nbytes >= FNAME_BUFSIZE)
-       nbytes = FNAME_BUFSIZE-1;
+   if (nbytes >= MAXPGPATH)
+       nbytes = MAXPGPATH-1;
    memcpy(fnamebuf, VARDATA(filename), nbytes);
    fnamebuf[nbytes] = '\0';
    oumask = umask((mode_t) 0022);
@@ -473,12 +467,12 @@ lo_export(PG_FUNCTION_ARGS)
             fnamebuf);
 
    /*
-    * read in from the Unix file and write to the inversion file
+    * read in from the inversion file and write to the Unix file
     */
    while ((nbytes = inv_read(lobj, buf, BUFSIZE)) > 0)
    {
        tmp = FileWrite(fd, buf, nbytes);
-       if (tmp < nbytes)
+       if (tmp != nbytes)
            elog(ERROR, "lo_export: error while writing \"%s\"",
                 fnamebuf);
    }
@@ -513,7 +507,7 @@ lo_commit(bool isCommit)
        if (cookies[i] != NULL)
        {
            if (isCommit)
-               inv_cleanindex(cookies[i]);
+               inv_close(cookies[i]);
            cookies[i] = NULL;
        }
    }
index 5b7df0562ade9ac2c55a5556754689ba9ab1b3ce..607c4861dc58dba43c52e6a9dde1a490b12b0973 100644 (file)
@@ -9,77 +9,51 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.78 2000/10/22 05:27:15 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.79 2000/10/24 01:38:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
+#include "postgres.h"
+
+#include 
 #include 
 #include 
 #include 
 
-#include "postgres.h"
-
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/nbtree.h"
+#include "access/htup.h"
 #include "catalog/catalog.h"
+#include "catalog/catname.h"
 #include "catalog/heap.h"
 #include "catalog/index.h"
+#include "catalog/indexing.h"
 #include "catalog/pg_opclass.h"
+#include "catalog/pg_largeobject.h"
 #include "catalog/pg_type.h"
 #include "libpq/libpq-fs.h"
 #include "miscadmin.h"
 #include "storage/large_object.h"
 #include "storage/smgr.h"
 #include "utils/fmgroids.h"
-#include "utils/relcache.h"
-
-/*
- * Warning, Will Robinson...  In order to pack data into an inversion
- * file as densely as possible, we violate the class abstraction here.
- * When we're appending a new tuple to the end of the table, we check
- * the last page to see how much data we can put on it.  If it's more
- * than IMINBLK, we write enough to fill the page.  This limits external
- * fragmentation.  In no case can we write more than IMAXBLK, since
- * the 8K postgres page size less overhead leaves only this much space
- * for data.
- */
+#include "utils/builtins.h"
 
-/*
- *     In order to prevent buffer leak on transaction commit, large object
- *     scan index handling has been modified. Indexes are persistant inside
- *     a transaction but may be closed between two calls to this API (when
- *     transaction is committed while object is opened, or when no
- *     transaction is active). Scan indexes are thus now reinitialized using
- *     the object current offset. [PA]
- *
- *     Some cleanup has been also done for non freed memory.
- *
- *     For subsequent notes, [PA] is Pascal André 
- */
 
-#define IFREESPC(p)        (PageGetFreeSpace(p) - \
-                MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \
-                MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \
-                sizeof(double))
-#define IMAXBLK            8092
-#define IMINBLK            512
-
-/* non-export function prototypes */
-static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
-            Page page, char *dbuf, int nwrite);
-static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer);
-static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
-static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
-         HeapTuple tuple, Buffer buffer);
-static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple);
-static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
+static int32
+getbytealen(bytea *data)
+{
+   Assert(! VARATT_IS_EXTENDED(data));
+   if (VARSIZE(data) < VARHDRSZ)
+       elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error.");
+   return (VARSIZE(data) - VARHDRSZ);
+}
 
 /*
  * inv_create -- create a new large object.
  *
  *     Arguments:
- *       flags -- was archive, smgr
+ *       flags
  *
  *     Returns:
  *       large object descriptor, appropriately filled in.
@@ -87,168 +61,80 @@ static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
 LargeObjectDesc *
 inv_create(int flags)
 {
-   LargeObjectDesc *retval;
    Oid         file_oid;
-   Relation    r;
-   Relation    indr;
-   TupleDesc   tupdesc;
-   IndexInfo  *indexInfo;
-   Oid         classObjectId[1];
-   char        objname[NAMEDATALEN];
-   char        indname[NAMEDATALEN];
-
-   /*
-    * add one here since the pg_class tuple created will have the next
-    * oid and we want to have the relation name to correspond to the
-    * tuple OID
-    */
-   file_oid = newoid() + 1;
-
-   /* come up with some table names */
-   sprintf(objname, "xinv%u", file_oid);
-   sprintf(indname, "xinx%u", file_oid);
-
-   if (RelnameFindRelid(objname) != InvalidOid)
-       elog(ERROR,
-         "internal error: %s already exists -- cannot create large obj",
-            objname);
-   if (RelnameFindRelid(indname) != InvalidOid)
-       elog(ERROR,
-         "internal error: %s already exists -- cannot create large obj",
-            indname);
-
-   /* this is pretty painful...  want a tuple descriptor */
-   tupdesc = CreateTemplateTupleDesc(2);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 1,
-                      "olastbye",
-                      INT4OID,
-                      -1, 0, false);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 2,
-                      "odata",
-                      BYTEAOID,
-                      -1, 0, false);
+   LargeObjectDesc *retval;
 
    /*
-    * First create the table to hold the inversion large object.  It will
-    * be located on whatever storage manager the user requested.
+    * Allocate an OID to be the LO's identifier.
     */
+   file_oid = newoid();
 
-   heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT,
-                            false, false);
+   /* Check for duplicate (shouldn't happen) */
+   if (LargeObjectExists(file_oid))
+       elog(ERROR, "inv_create: large object %u already exists. This is internal error.", file_oid);
 
-   /* make the relation visible in this transaction */
-   CommandCounterIncrement();
-
-   /*--------------------
-    * We hold AccessShareLock on any large object we have open
-    * by inv_create or inv_open; it is released by inv_close.
-    * Note this will not conflict with ExclusiveLock or ShareLock
-    * that we acquire when actually reading/writing; it just prevents
-    * deletion of the large object while we have it open.
-    *--------------------
+   /*
+    * Create the LO by writing an empty first page for it in pg_largeobject
     */
-   r = heap_openr(objname, AccessShareLock);
+   (void) LargeObjectCreate(file_oid);
 
    /*
-    * Now create a btree index on the relation's olastbyte attribute to
-    * make seeks go faster.
+    * Advance command counter so that new tuple will be seen by later
+    * large-object operations in this transaction.
     */
-   indexInfo = makeNode(IndexInfo);
-   indexInfo->ii_NumIndexAttrs = 1;
-   indexInfo->ii_NumKeyAttrs = 1;
-   indexInfo->ii_KeyAttrNumbers[0] = 1;
-   indexInfo->ii_Predicate = NULL;
-   indexInfo->ii_FuncOid = InvalidOid;
-   indexInfo->ii_Unique = false;
-
-   classObjectId[0] = INT4_OPS_OID;
-
-   index_create(objname, indname, indexInfo,
-                BTREE_AM_OID, classObjectId,
-                false, false, false);
-
-   /* make the index visible in this transaction */
    CommandCounterIncrement();
 
-   indr = index_openr(indname);
-
-   if (!RelationIsValid(indr))
-   {
-       elog(ERROR, "cannot create index for large obj on %s under inversion",
-            DatumGetCString(DirectFunctionCall1(smgrout,
-                            Int16GetDatum(DEFAULT_SMGR))));
-   }
-
+   /*
+    * Prepare LargeObjectDesc data structure for accessing LO
+    */
    retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
 
-   retval->heap_r = r;
-   retval->index_r = indr;
-   retval->iscan = (IndexScanDesc) NULL;
-   retval->hdesc = RelationGetDescr(r);
-   retval->idesc = RelationGetDescr(indr);
-   retval->offset = retval->lowbyte = retval->highbyte = 0;
-   ItemPointerSetInvalid(&(retval->htid));
-   retval->flags = 0;
+   retval->id = file_oid;
+   retval->offset = 0;
 
-   if (flags & INV_WRITE)
-   {
-       LockRelation(r, ExclusiveLock);
+   if (flags & INV_WRITE) {
        retval->flags = IFS_WRLOCK | IFS_RDLOCK;
-   }
-   else if (flags & INV_READ)
-   {
-       LockRelation(r, ShareLock);
+       retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+   } else if (flags & INV_READ) {
        retval->flags = IFS_RDLOCK;
-   }
-   retval->flags |= IFS_ATEOF; /* since we know the object is empty */
+       retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock);
+   } else
+       elog(ERROR, "inv_create: invalid flags: %d", flags);
+
+   retval->index_r = index_openr(LargeObjectLOidPNIndex);
 
    return retval;
 }
 
+/*
+ * inv_open -- access an existing large object.
+ *
+ *     Returns:
+ *       large object descriptor, appropriately filled in.
+ */
 LargeObjectDesc *
 inv_open(Oid lobjId, int flags)
 {
    LargeObjectDesc *retval;
-   Relation    r;
-   char       *indname;
-   Relation    indrel;
-
-   r = heap_open(lobjId, AccessShareLock);
-
-   indname = pstrdup(RelationGetRelationName(r));
-
-   /*
-    * hack hack hack...  we know that the fourth character of the
-    * relation name is a 'v', and that the fourth character of the index
-    * name is an 'x', and that they're otherwise identical.
-    */
-   indname[3] = 'x';
-   indrel = index_openr(indname);
-
-   if (!RelationIsValid(indrel))
-       return (LargeObjectDesc *) NULL;
 
+   if (! LargeObjectExists(lobjId))
+       elog(ERROR, "inv_open: large object %u not found", lobjId);
+   
    retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
 
-   retval->heap_r = r;
-   retval->index_r = indrel;
-   retval->iscan = (IndexScanDesc) NULL;
-   retval->hdesc = RelationGetDescr(r);
-   retval->idesc = RelationGetDescr(indrel);
-   retval->offset = retval->lowbyte = retval->highbyte = 0;
-   ItemPointerSetInvalid(&(retval->htid));
-   retval->flags = 0;
+   retval->id = lobjId;
+   retval->offset = 0;
 
-   if (flags & INV_WRITE)
-   {
-       LockRelation(r, ExclusiveLock);
+   if (flags & INV_WRITE) {
        retval->flags = IFS_WRLOCK | IFS_RDLOCK;
-   }
-   else if (flags & INV_READ)
-   {
-       LockRelation(r, ShareLock);
+       retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+   } else if (flags & INV_READ) {
        retval->flags = IFS_RDLOCK;
-   }
+       retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock);
+   } else
+       elog(ERROR, "inv_open: invalid flags: %d", flags);
+
+   retval->index_r = index_openr(LargeObjectLOidPNIndex);
 
    return retval;
 }
@@ -261,174 +147,129 @@ inv_close(LargeObjectDesc *obj_desc)
 {
    Assert(PointerIsValid(obj_desc));
 
-   if (obj_desc->iscan != (IndexScanDesc) NULL)
-   {
-       index_endscan(obj_desc->iscan);
-       obj_desc->iscan = NULL;
-   }
-
+   if (obj_desc->flags & IFS_WRLOCK)
+       heap_close(obj_desc->heap_r, RowExclusiveLock);
+   else if (obj_desc->flags & IFS_RDLOCK)
+       heap_close(obj_desc->heap_r, AccessShareLock);
    index_close(obj_desc->index_r);
-   heap_close(obj_desc->heap_r, AccessShareLock);
 
    pfree(obj_desc);
 }
 
 /*
- * Destroys an existing large object, and frees its associated pointers.
+ * Destroys an existing large object (not to be confused with a descriptor!)
  *
  * returns -1 if failed
  */
 int
 inv_drop(Oid lobjId)
 {
-   Relation    r;
-
-   r = RelationIdGetRelation(lobjId);
-   if (!RelationIsValid(r))
-       return -1;
-
-   if (r->rd_rel->relkind != RELKIND_LOBJECT)
-   {
-       /* drop relcache refcount from RelationIdGetRelation */
-       RelationDecrementReferenceCount(r);
-       return -1;
-   }
+   LargeObjectDrop(lobjId);
 
    /*
-    * Since heap_drop_with_catalog will destroy the relcache entry,
-    * there's no need to drop the refcount in this path.
+    * Advance command counter so that tuple removal will be seen by later
+    * large-object operations in this transaction.
     */
-   heap_drop_with_catalog(RelationGetRelationName(r), false);
+   CommandCounterIncrement();
+
    return 1;
 }
 
 /*
- * inv_stat() -- do a stat on an inversion file.
+ * Determine size of a large object
  *
- *     For the time being, this is an insanely expensive operation.  In
- *     order to find the size of the file, we seek to the last block in
- *     it and compute the size from that.  We scan pg_class to determine
- *     the file's owner and create time.  We don't maintain mod time or
- *     access time, yet.
- *
- *     These fields aren't stored in a table anywhere because they're
- *     updated so frequently, and postgres only appends tuples at the
- *     end of relations.  Once clustering works, we should fix this.
+ * NOTE: LOs can contain gaps, just like Unix files.  We actually return
+ * the offset of the last byte + 1.
  */
-#ifdef NOT_USED
-
-struct pgstat
-{                              /* just the fields we need from stat
-                                * structure */
-   int         st_ino;
-   int         st_mode;
-   unsigned int st_size;
-   unsigned int st_sizehigh;   /* high order bits */
-/* 2^64 == 1.8 x 10^20 bytes */
-   int         st_uid;
-   int         st_atime_s;     /* just the seconds */
-   int         st_mtime_s;     /* since SysV and the new BSD both have */
-   int         st_ctime_s;     /* usec fields.. */
-};
-
-int
-inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
+static uint32
+inv_getsize(LargeObjectDesc *obj_desc)
 {
+   bool            found = false;
+   uint32          lastbyte = 0;
+   uint32          thislastbyte;
+   ScanKeyData     skey[1];
+   IndexScanDesc   sd;
+   RetrieveIndexResult indexRes;
+   HeapTupleData   tuple;
+   Buffer          buffer;
+   Form_pg_largeobject data;
+   bytea          *datafield;
+   bool            pfreeit;
+
    Assert(PointerIsValid(obj_desc));
-   Assert(stbuf != NULL);
 
-   /* need read lock for stat */
-   if (!(obj_desc->flags & IFS_RDLOCK))
-   {
-       LockRelation(obj_desc->heap_r, ShareLock);
-       obj_desc->flags |= IFS_RDLOCK;
-   }
+   ScanKeyEntryInitialize(&skey[0],
+                          (bits16) 0x0,
+                          (AttrNumber) 1,
+                          (RegProcedure) F_OIDEQ,
+                          ObjectIdGetDatum(obj_desc->id));
 
-   stbuf->st_ino = RelationGetRelid(obj_desc->heap_r);
-#if 1
-   stbuf->st_mode = (S_IFREG | 0666);  /* IFREG|rw-rw-rw- */
-#else
-   stbuf->st_mode = 100666;    /* IFREG|rw-rw-rw- */
-#endif
-   stbuf->st_size = _inv_getsize(obj_desc->heap_r,
-                                 obj_desc->hdesc,
-                                 obj_desc->index_r);
+   sd = index_beginscan(obj_desc->index_r, true, 1, skey);
 
-   stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
+   tuple.t_datamcxt = CurrentMemoryContext;
+   tuple.t_data = NULL;
 
-   /* we have no good way of computing access times right now */
-   stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
+   while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+   {
+       tuple.t_self = indexRes->heap_iptr;
+       heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+       pfree(indexRes);
+       if (tuple.t_data == NULL)
+           continue;
+       found = true;
+       data = (Form_pg_largeobject) GETSTRUCT(&tuple);
+       datafield = &(data->data);
+       pfreeit = false;
+       if (VARATT_IS_EXTENDED(datafield))
+       {
+           datafield = (bytea *)
+               heap_tuple_untoast_attr((varattrib *) datafield);
+           pfreeit = true;
+       }
+       thislastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
+       if (thislastbyte > lastbyte)
+           lastbyte = thislastbyte;
+       if (pfreeit)
+           pfree(datafield);
+       ReleaseBuffer(buffer);
+   }
+   
+   index_endscan(sd);
 
-   return 0;
+   if (!found)
+       elog(ERROR, "inv_getsize: large object %u not found", obj_desc->id);
+   return lastbyte;
 }
 
-#endif
-
 int
 inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
 {
-   int         oldOffset;
-   Datum       d;
-   ScanKeyData skey;
-
    Assert(PointerIsValid(obj_desc));
 
-   if (whence == SEEK_CUR)
-   {
-       offset += obj_desc->offset;     /* calculate absolute position */
-   }
-   else if (whence == SEEK_END)
+   switch (whence)
    {
-       /* need read lock for getsize */
-       if (!(obj_desc->flags & IFS_RDLOCK))
-       {
-           LockRelation(obj_desc->heap_r, ShareLock);
-           obj_desc->flags |= IFS_RDLOCK;
-       }
-       offset += _inv_getsize(obj_desc->heap_r,
-                              obj_desc->hdesc,
-                              obj_desc->index_r);
-   }
-   /* now we can assume that the operation is SEEK_SET */
-
-   /*
-    * Whenever we do a seek, we turn off the EOF flag bit to force
-    * ourselves to check for real on the next read.
-    */
-
-   obj_desc->flags &= ~IFS_ATEOF;
-   oldOffset = obj_desc->offset;
-   obj_desc->offset = offset;
-
-   /* try to avoid doing any work, if we can manage it */
-   if (offset >= obj_desc->lowbyte
-       && offset <= obj_desc->highbyte
-       && oldOffset <= obj_desc->highbyte
-       && obj_desc->iscan != (IndexScanDesc) NULL)
-       return offset;
-
-   /*
-    * To do a seek on an inversion file, we start an index scan that will
-    * bring us to the right place.  Each tuple in an inversion file
-    * stores the offset of the last byte that appears on it, and we have
-    * an index on this.
-    */
-   if (obj_desc->iscan != (IndexScanDesc) NULL)
-   {
-       d = Int32GetDatum(offset);
-       btmovescan(obj_desc->iscan, d);
-   }
-   else
-   {
-       ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
-                              Int32GetDatum(offset));
-
-       obj_desc->iscan = index_beginscan(obj_desc->index_r,
-                                         (bool) 0, (uint16) 1,
-                                         &skey);
+       case SEEK_SET:
+           if (offset < 0)
+               elog(ERROR, "inv_seek: invalid offset: %d", offset);
+           obj_desc->offset = offset;
+           break;
+       case SEEK_CUR:
+           if ((obj_desc->offset + offset) < 0)
+               elog(ERROR, "inv_seek: invalid offset: %d", offset);
+           obj_desc->offset += offset;
+           break;
+       case SEEK_END:
+           {
+               uint32 size = inv_getsize(obj_desc);
+               if (offset < 0 || ((uint32) offset) > size)
+                   elog(ERROR, "inv_seek: invalid offset");
+               obj_desc->offset = size - offset;
+           }
+           break;
+       default:
+           elog(ERROR, "inv_seek: invalid whence: %d", whence);
    }
-
-   return offset;
+   return obj_desc->offset;
 }
 
 int
@@ -442,862 +283,306 @@ inv_tell(LargeObjectDesc *obj_desc)
 int
 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 {
-   HeapTupleData tuple;
-   int         nread;
-   int         off;
-   int         ncopy;
-   Datum       d;
-   struct varlena *fsblock;
-   bool        isNull;
+   int             nread = 0;
+   int             n;
+   int             off;
+   int             len;
+   int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
+   uint32          pageoff;
+   ScanKeyData     skey[2];
+   IndexScanDesc   sd;
+   RetrieveIndexResult indexRes;
+   HeapTupleData   tuple;
+   Buffer          buffer;
+   Form_pg_largeobject data;
+   bytea          *datafield;
+   bool            pfreeit;
 
    Assert(PointerIsValid(obj_desc));
    Assert(buf != NULL);
 
-   /* if we're already at EOF, we don't need to do any work here */
-   if (obj_desc->flags & IFS_ATEOF)
+   if (nbytes <= 0)
        return 0;
 
-   /* make sure we obey two-phase locking */
-   if (!(obj_desc->flags & IFS_RDLOCK))
-   {
-       LockRelation(obj_desc->heap_r, ShareLock);
-       obj_desc->flags |= IFS_RDLOCK;
-   }
+   ScanKeyEntryInitialize(&skey[0],
+                          (bits16) 0x0,
+                          (AttrNumber) 1,
+                          (RegProcedure) F_OIDEQ,
+                          ObjectIdGetDatum(obj_desc->id));
 
-   nread = 0;
+   ScanKeyEntryInitialize(&skey[1],
+                          (bits16) 0x0,
+                          (AttrNumber) 2,
+                          (RegProcedure) F_INT4GE,
+                          Int32GetDatum(pageno));
 
-   /* fetch a block at a time */
-   while (nread < nbytes)
-   {
-       Buffer      buffer;
+   sd = index_beginscan(obj_desc->index_r, false, 2, skey);
 
-       /* fetch an inversion file system block */
-       inv_fetchtup(obj_desc, &tuple, &buffer);
+   tuple.t_datamcxt = CurrentMemoryContext;
+   tuple.t_data = NULL;
 
-       if (tuple.t_data == NULL)
-       {
-           obj_desc->flags |= IFS_ATEOF;
-           break;
-       }
+   while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+   {
+       tuple.t_self = indexRes->heap_iptr;
+       heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+       pfree(indexRes);
 
-       /* copy the data from this block into the buffer */
-       d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
-       fsblock = (struct varlena *) DatumGetPointer(d);
-       ReleaseBuffer(buffer);
+       if (tuple.t_data == NULL)
+           continue;
+       
+       data = (Form_pg_largeobject) GETSTRUCT(&tuple);
 
        /*
-        * If block starts beyond current seek point, then we are looking
-        * at a "hole" (unwritten area) in the object.  Return zeroes for
-        * the "hole".
+        * We assume the indexscan will deliver pages in order.  However,
+        * there may be missing pages if the LO contains unwritten "holes".
+        * We want missing sections to read out as zeroes.
         */
-       if (obj_desc->offset < obj_desc->lowbyte)
+       pageoff = ((uint32) data->pageno) * LOBLKSIZE;
+       if (pageoff > obj_desc->offset)
        {
-           int     nzeroes = obj_desc->lowbyte - obj_desc->offset;
-
-           if (nzeroes > (nbytes - nread))
-               nzeroes = (nbytes - nread);
-           MemSet(buf, 0, nzeroes);
-           buf += nzeroes;
-           nread += nzeroes;
-           obj_desc->offset += nzeroes;
-           if (nread >= nbytes)
-               break;
+           n = pageoff - obj_desc->offset;
+           n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
+           MemSet(buf + nread, 0, n);
+           nread += n;
+           obj_desc->offset += n;
        }
 
-       off = obj_desc->offset - obj_desc->lowbyte;
-       ncopy = obj_desc->highbyte - obj_desc->offset + 1;
-       if (ncopy > (nbytes - nread))
-           ncopy = (nbytes - nread);
-       memmove(buf, &(fsblock->vl_dat[off]), ncopy);
+       if (nread < nbytes)
+       {
+           Assert(obj_desc->offset >= pageoff);
+           off = (int) (obj_desc->offset - pageoff);
+           Assert(off >= 0 && off < LOBLKSIZE);
+
+           datafield = &(data->data);
+           pfreeit = false;
+           if (VARATT_IS_EXTENDED(datafield))
+           {
+               datafield = (bytea *)
+                   heap_tuple_untoast_attr((varattrib *) datafield);
+               pfreeit = true;
+           }
+           len = getbytealen(datafield);
+           if (len > off)
+           {
+               n = len - off;
+               n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
+               memcpy(buf + nread, VARDATA(datafield) + off, n);
+               nread += n;
+               obj_desc->offset += n;
+           }
+           if (pfreeit)
+               pfree(datafield);
+       }
 
-       /* move pointers past the amount we just read */
-       buf += ncopy;
-       nread += ncopy;
-       obj_desc->offset += ncopy;
+       ReleaseBuffer(buffer);
+       if (nread >= nbytes)
+           break;
    }
 
+   index_endscan(sd);
+
    return nread;
 }
 
 int
 inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 {
-   HeapTupleData tuple;
-   int         nwritten;
-   int         tuplen;
+   int             nwritten = 0;
+   int             n;
+   int             off;
+   int             len;
+   int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
+   ScanKeyData     skey[2];
+   IndexScanDesc   sd;
+   RetrieveIndexResult indexRes;
+   HeapTupleData   oldtuple;
+   Buffer          buffer;
+   Form_pg_largeobject olddata;
+   bool            neednextpage;
+   bytea          *datafield;
+   bool            pfreeit;
+   char            workbuf[LOBLKSIZE + VARHDRSZ];
+   char           *workb = VARATT_DATA(workbuf);
+   HeapTuple       newtup;
+   Datum           values[Natts_pg_largeobject];
+   char            nulls[Natts_pg_largeobject];
+   char            replace[Natts_pg_largeobject];
+   bool            write_indices;
+   Relation        idescs[Num_pg_largeobject_indices];
 
    Assert(PointerIsValid(obj_desc));
    Assert(buf != NULL);
 
-   /*
-    * Make sure we obey two-phase locking.  A write lock entitles you to
-    * read the relation, as well.
-    */
+   if (nbytes <= 0)
+       return 0;
 
-   if (!(obj_desc->flags & IFS_WRLOCK))
-   {
-       LockRelation(obj_desc->heap_r, ExclusiveLock);
-       obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
-   }
+   write_indices = ! IsIgnoringSystemIndexes();
+   if (write_indices)
+       CatalogOpenIndices(Num_pg_largeobject_indices,
+                          Name_pg_largeobject_indices,
+                          idescs);
+
+   ScanKeyEntryInitialize(&skey[0],
+                          (bits16) 0x0,
+                          (AttrNumber) 1,
+                          (RegProcedure) F_OIDEQ,
+                          ObjectIdGetDatum(obj_desc->id));
+
+   ScanKeyEntryInitialize(&skey[1],
+                          (bits16) 0x0,
+                          (AttrNumber) 2,
+                          (RegProcedure) F_INT4GE,
+                          Int32GetDatum(pageno));
 
-   nwritten = 0;
+   sd = index_beginscan(obj_desc->index_r, false, 2, skey);
+
+   oldtuple.t_datamcxt = CurrentMemoryContext;
+   oldtuple.t_data = NULL;
+   olddata = NULL;
+   buffer = InvalidBuffer;
+   neednextpage = true;
 
-   /* write a block at a time */
    while (nwritten < nbytes)
    {
-       Buffer      buffer;
-
        /*
-        * Fetch the current inversion file system block.  We can skip
-        * the work if we already know we are at EOF.
+        * If possible, get next pre-existing page of the LO.  We assume
+        * the indexscan will deliver these in order --- but there may be
+        * holes.
         */
-
-       if (obj_desc->flags & IFS_ATEOF)
-           tuple.t_data = NULL;
-       else
-           inv_fetchtup(obj_desc, &tuple, &buffer);
-
-       /* either append or replace a block, as required */
-       if (tuple.t_data == NULL)
-           tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
-       else
+       if (neednextpage)
        {
-           if (obj_desc->offset > obj_desc->highbyte)
+           while ((indexRes = index_getnext(sd, ForwardScanDirection)))
            {
-               tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
-               ReleaseBuffer(buffer);
+               oldtuple.t_self = indexRes->heap_iptr;
+               heap_fetch(obj_desc->heap_r, SnapshotNow, &oldtuple, &buffer);
+               pfree(indexRes);
+               if (oldtuple.t_data != NULL)
+               {
+                   olddata = (Form_pg_largeobject) GETSTRUCT(&oldtuple);
+                   Assert(olddata->pageno >= pageno);
+                   break;
+               }
            }
-           else
-               tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer);
-
-           /*
-            * inv_wrold() has already issued WriteBuffer() which has
-            * decremented local reference counter (LocalRefCount). So we
-            * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4
-            */
+           neednextpage = false;
        }
-
-       /* move pointers past the amount we just wrote */
-       buf += tuplen;
-       nwritten += tuplen;
-       obj_desc->offset += tuplen;
-   }
-
-   /* that's it */
-   return nwritten;
-}
-
-/*
- * inv_cleanindex
- *      Clean opened indexes for large objects, and clears current result.
- *      This is necessary on transaction commit in order to prevent buffer
- *      leak.
- *      This function must be called for each opened large object.
- *      [ PA, 7/17/98 ]
- */
-void
-inv_cleanindex(LargeObjectDesc *obj_desc)
-{
-   Assert(PointerIsValid(obj_desc));
-
-   if (obj_desc->iscan == (IndexScanDesc) NULL)
-       return;
-
-   index_endscan(obj_desc->iscan);
-   obj_desc->iscan = (IndexScanDesc) NULL;
-
-   ItemPointerSetInvalid(&(obj_desc->htid));
-}
-
-/*
- * inv_fetchtup -- Fetch an inversion file system block.
- *
- *     This routine finds the file system block containing the offset
- *     recorded in the obj_desc structure.  Later, we need to think about
- *     the effects of non-functional updates (can you rewrite the same
- *     block twice in a single transaction?), but for now, we won't bother.
- *
- *     Parameters:
- *             obj_desc -- the object descriptor.
- *             bufP -- pointer to a buffer in the buffer cache; caller
- *                     must free this.
- *
- *     Returns:
- *             A heap tuple containing the desired block, or NULL if no
- *             such tuple exists.
- */
-static void
-inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
-{
-   RetrieveIndexResult res;
-   Datum       d;
-   int         firstbyte,
-               lastbyte;
-   struct varlena *fsblock;
-   bool        isNull;
-
-   /*
-    * If we've exhausted the current block, we need to get the next one.
-    * When we support time travel and non-functional updates, we will
-    * need to loop over the blocks, rather than just have an 'if', in
-    * order to find the one we're really interested in.
-    */
-
-   if (obj_desc->offset > obj_desc->highbyte
-       || obj_desc->offset < obj_desc->lowbyte
-       || !ItemPointerIsValid(&(obj_desc->htid)))
-   {
-       ScanKeyData skey;
-
-       ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
-                              Int32GetDatum(obj_desc->offset));
-
-       /* initialize scan key if not done */
-       if (obj_desc->iscan == (IndexScanDesc) NULL)
+       /*
+        * If we have a pre-existing page, see if it is the page we want
+        * to write, or a later one.
+        */
+       if (olddata != NULL && olddata->pageno == pageno)
        {
-
            /*
-            * As scan index may be prematurely closed (on commit), we
-            * must use object current offset (was 0) to reinitialize the
-            * entry [ PA ].
+            * Update an existing page with fresh data.
+            *
+            * First, load old data into workbuf
             */
-           obj_desc->iscan = index_beginscan(obj_desc->index_r,
-                                             (bool) 0, (uint16) 1,
-                                             &skey);
-       }
-       else
-           index_rescan(obj_desc->iscan, false, &skey);
-
-       do
-       {
-           res = index_getnext(obj_desc->iscan, ForwardScanDirection);
-
-           if (res == (RetrieveIndexResult) NULL)
+           datafield = &(olddata->data);
+           pfreeit = false;
+           if (VARATT_IS_EXTENDED(datafield))
            {
-               ItemPointerSetInvalid(&(obj_desc->htid));
-               tuple->t_datamcxt = NULL;
-               tuple->t_data = NULL;
-               return;
+               datafield = (bytea *)
+                   heap_tuple_untoast_attr((varattrib *) datafield);
+               pfreeit = true;
            }
-
+           len = getbytealen(datafield);
+           Assert(len <= LOBLKSIZE);
+           memcpy(workb, VARDATA(datafield), len);
+           if (pfreeit)
+               pfree(datafield);
            /*
-            * For time travel, we need to use the actual time qual here,
-            * rather that NowTimeQual.  We currently have no way to pass
-            * a time qual in.
-            *
-            * This is now valid for snapshot !!! And should be fixed in some
-            * way...   - vadim 07/28/98
-            *
+            * Fill any hole
+            */
+           off = (int) (obj_desc->offset % LOBLKSIZE);
+           if (off > len)
+               MemSet(workb + len, 0, off - len);
+           /*
+            * Insert appropriate portion of new data
+            */
+           n = LOBLKSIZE - off;
+           n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
+           memcpy(workb + off, buf + nwritten, n);
+           nwritten += n;
+           obj_desc->offset += n;
+           off += n;
+           /* compute valid length of new page */
+           len = (len >= off) ? len : off;
+           VARATT_SIZEP(workbuf) = len + VARHDRSZ;
+           /*
+            * Form and insert updated tuple
+            */
+           memset(values, 0, sizeof(values));
+           memset(nulls, ' ', sizeof(nulls));
+           memset(replace, ' ', sizeof(replace));
+           values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf);
+           replace[Anum_pg_largeobject_data - 1] = 'r';
+           newtup = heap_modifytuple(&oldtuple, obj_desc->heap_r,
+                                     values, nulls, replace);
+           heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL);
+           if (write_indices)
+               CatalogIndexInsert(idescs, Num_pg_largeobject_indices,
+                                  obj_desc->heap_r, newtup);
+           heap_freetuple(newtup);
+           /*
+            * We're done with this old page.
             */
-           tuple->t_self = res->heap_iptr;
-           heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
-           pfree(res);
-       } while (tuple->t_data == NULL);
-
-       /* remember this tid -- we may need it for later reads/writes */
-       ItemPointerCopy(&(tuple->t_self), &obj_desc->htid);
-   }
-   else
-   {
-       tuple->t_self = obj_desc->htid;
-       heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
-       if (tuple->t_data == NULL)
-           elog(ERROR, "inv_fetchtup: heap_fetch failed");
-   }
-
-   /*
-    * By here, we have the heap tuple we're interested in.  We cache the
-    * upper and lower bounds for this block in the object descriptor and
-    * return the tuple.
-    */
-
-   d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull);
-   lastbyte = (int32) DatumGetInt32(d);
-   d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
-   fsblock = (struct varlena *) DatumGetPointer(d);
-
-   /*
-    * order of + and - is important -- these are unsigned quantites near
-    * 0
-    */
-   firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
-
-   obj_desc->lowbyte = firstbyte;
-   obj_desc->highbyte = lastbyte;
-
-   return;
-}
-
-/*
- * inv_wrnew() -- append a new filesystem block tuple to the inversion
- *                 file.
- *
- *     In response to an inv_write, we append one or more file system
- *     blocks to the class containing the large object.  We violate the
- *     class abstraction here in order to pack things as densely as we
- *     are able.  We examine the last page in the relation, and write
- *     just enough to fill it, assuming that it has above a certain
- *     threshold of space available.  If the space available is less than
- *     the threshold, we allocate a new page by writing a big tuple.
- *
- *     By the time we get here, we know all the parameters passed in
- *     are valid, and that we hold the appropriate lock on the heap
- *     relation.
- *
- *     Parameters:
- *             obj_desc: large object descriptor for which to append block.
- *             buf: buffer containing data to write.
- *             nbytes: amount to write
- *
- *     Returns:
- *             number of bytes actually written to the new tuple.
- */
-static int
-inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
-{
-   Relation    hr;
-   HeapTuple   ntup;
-   Buffer      buffer;
-   Page        page;
-   int         nblocks;
-   int         nwritten;
-
-   hr = obj_desc->heap_r;
-
-   /*
-    * Get the last block in the relation.  If there's no data in the
-    * relation at all, then we just get a new block.  Otherwise, we check
-    * the last block to see whether it has room to accept some or all of
-    * the data that the user wants to write.  If it doesn't, then we
-    * allocate a new block.
-    */
-
-   nblocks = RelationGetNumberOfBlocks(hr);
-
-   if (nblocks > 0)
-   {
-       buffer = ReadBuffer(hr, nblocks - 1);
-       page = BufferGetPage(buffer);
-   }
-   else
-   {
-       buffer = ReadBuffer(hr, P_NEW);
-       page = BufferGetPage(buffer);
-       PageInit(page, BufferGetPageSize(buffer), 0);
-   }
-
-   /*
-    * If the last page is too small to hold all the data, and it's too
-    * small to hold IMINBLK, then we allocate a new page.  If it will
-    * hold at least IMINBLK, but less than all the data requested, then
-    * we write IMINBLK here.  The caller is responsible for noticing that
-    * less than the requested number of bytes were written, and calling
-    * this routine again.
-    */
-
-   nwritten = IFREESPC(page);
-   if (nwritten < nbytes)
-   {
-       if (nwritten < IMINBLK)
-       {
            ReleaseBuffer(buffer);
-           buffer = ReadBuffer(hr, P_NEW);
-           page = BufferGetPage(buffer);
-           PageInit(page, BufferGetPageSize(buffer), 0);
-           if (nbytes > IMAXBLK)
-               nwritten = IMAXBLK;
-           else
-               nwritten = nbytes;
-       }
-   }
-   else
-       nwritten = nbytes;
-
-   /*
-    * Insert a new file system block tuple, index it, and write it out.
-    */
-
-   ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
-   inv_indextup(obj_desc, ntup);
-   heap_freetuple(ntup);
-
-   /* new tuple is inserted */
-   WriteBuffer(buffer);
-
-   return nwritten;
-}
-
-static int
-inv_wrold(LargeObjectDesc *obj_desc,
-         char *dbuf,
-         int nbytes,
-         HeapTuple tuple,
-         Buffer buffer)
-{
-   Relation    hr;
-   HeapTuple   ntup;
-   Buffer      newbuf;
-   Page        page;
-   Page        newpage;
-   int         tupbytes;
-   Datum       d;
-   struct varlena *fsblock;
-   int         nwritten,
-               nblocks,
-               freespc;
-   bool        isNull;
-   int         keep_offset;
-   RetrieveIndexResult res;
-
-   /*
-    * Since we're using a no-overwrite storage manager, the way we
-    * overwrite blocks is to mark the old block invalid and append a new
-    * block.  First mark the old block invalid.  This violates the tuple
-    * abstraction.
-    */
-
-   TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
-   tuple->t_data->t_cmax = GetCurrentCommandId();
-   tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
-
-   /*
-    * If we're overwriting the entire block, we're lucky.  All we need to
-    * do is to insert a new block.
-    */
-
-   if (obj_desc->offset == obj_desc->lowbyte
-       && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
-   {
-       WriteBuffer(buffer);
-       return inv_wrnew(obj_desc, dbuf, nbytes);
-   }
-
-   /*
-    * By here, we need to overwrite part of the data in the current
-    * tuple.  In order to reduce the degree to which we fragment blocks,
-    * we guarantee that no block will be broken up due to an overwrite.
-    * This means that we need to allocate a tuple on a new page, if
-    * there's not room for the replacement on this one.
-    */
-
-   newbuf = buffer;
-   page = BufferGetPage(buffer);
-   newpage = BufferGetPage(newbuf);
-   hr = obj_desc->heap_r;
-   freespc = IFREESPC(page);
-   d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
-   fsblock = (struct varlena *) DatumGetPointer(d);
-   tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
-
-   if (freespc < tupbytes)
-   {
-
-       /*
-        * First see if there's enough space on the last page of the table
-        * to put this tuple.
-        */
-
-       nblocks = RelationGetNumberOfBlocks(hr);
-
-       if (nblocks > 0)
-       {
-           newbuf = ReadBuffer(hr, nblocks - 1);
-           newpage = BufferGetPage(newbuf);
+           oldtuple.t_datamcxt = CurrentMemoryContext;
+           oldtuple.t_data = NULL;
+           olddata = NULL;
+           neednextpage = true;
        }
        else
        {
-           newbuf = ReadBuffer(hr, P_NEW);
-           newpage = BufferGetPage(newbuf);
-           PageInit(newpage, BufferGetPageSize(newbuf), 0);
-       }
-
-       freespc = IFREESPC(newpage);
-
-       /*
-        * If there's no room on the last page, allocate a new last page
-        * for the table, and put it there.
-        */
-
-       if (freespc < tupbytes)
-       {
-           ReleaseBuffer(newbuf);
-           newbuf = ReadBuffer(hr, P_NEW);
-           newpage = BufferGetPage(newbuf);
-           PageInit(newpage, BufferGetPageSize(newbuf), 0);
+           /*
+            * Write a brand new page.
+            *
+            * First, fill any hole
+            */
+           off = (int) (obj_desc->offset % LOBLKSIZE);
+           if (off > 0)
+               MemSet(workb, 0, off);
+           /*
+            * Insert appropriate portion of new data
+            */
+           n = LOBLKSIZE - off;
+           n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
+           memcpy(workb + off, buf + nwritten, n);
+           nwritten += n;
+           obj_desc->offset += n;
+           /* compute valid length of new page */
+           len = off + n;
+           VARATT_SIZEP(workbuf) = len + VARHDRSZ;
+           /*
+            * Form and insert updated tuple
+            */
+           memset(values, 0, sizeof(values));
+           memset(nulls, ' ', sizeof(nulls));
+           values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
+           values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
+           values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf);
+           newtup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls);
+           heap_insert(obj_desc->heap_r, newtup);
+           if (write_indices)
+               CatalogIndexInsert(idescs, Num_pg_largeobject_indices,
+                                  obj_desc->heap_r, newtup);
+           heap_freetuple(newtup);
        }
+       pageno++;
    }
 
-   nwritten = nbytes;
-   if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
-       nwritten = obj_desc->highbyte - obj_desc->offset + 1;
-   memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
-           dbuf, nwritten);
-
-   /*
-    * we are rewriting the entire old block, therefore we reset offset to
-    * the lowbyte of the original block before jumping into
-    * inv_newtuple()
-    */
-   keep_offset = obj_desc->offset;
-   obj_desc->offset = obj_desc->lowbyte;
-   ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
-                       tupbytes);
-   /* after we are done, we restore to the true offset */
-   obj_desc->offset = keep_offset;
-
-   /*
-    * By here, we have a page (newpage) that's guaranteed to have enough
-    * space on it to put the new tuple.  Call inv_newtuple to do the
-    * work.  Passing NULL as a buffer to inv_newtuple() keeps it from
-    * copying any data into the new tuple.  When it returns, the tuple is
-    * ready to receive data from the old tuple and the user's data
-    * buffer.
-    */
-/*
-   ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
-   dptr = ((char *) ntup) + ntup->t_hoff -
-               (sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) +
-               sizeof(int4)
-               + sizeof(fsblock->vl_len);
-
-   if (obj_desc->offset > obj_desc->lowbyte) {
-       memmove(dptr,
-               &(fsblock->vl_dat[0]),
-               obj_desc->offset - obj_desc->lowbyte);
-       dptr += obj_desc->offset - obj_desc->lowbyte;
-   }
-
-
-   nwritten = nbytes;
-   if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
-       nwritten = obj_desc->highbyte - obj_desc->offset + 1;
-
-   memmove(dptr, dbuf, nwritten);
-   dptr += nwritten;
-
-   if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
-*/
-/*
-       loc = (obj_desc->highbyte - obj_desc->offset)
-               + nwritten;
-       sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
-
-       what's going on here?? - jolly
-*/
-/*
-       sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
-       memmove(&(fsblock->vl_dat[0]), dptr, sz);
-   }
-*/
-
-
-   /* index the new tuple */
-   inv_indextup(obj_desc, ntup);
-   heap_freetuple(ntup);
+   if (olddata != NULL)
+       ReleaseBuffer(buffer);
 
-   /*
-    * move the scandesc forward so we don't reread the newly inserted
-    * tuple on the next index scan
-    */
-   res = NULL;
-   if (obj_desc->iscan)
-       res = index_getnext(obj_desc->iscan, ForwardScanDirection);
+   index_endscan(sd);
 
-   if (res)
-       pfree(res);
+   if (write_indices)
+       CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
 
    /*
-    * Okay, by here, a tuple for the new block is correctly placed,
-    * indexed, and filled.  Write the changed pages out.
+    * Advance command counter so that my tuple updates will be seen by later
+    * large-object operations in this transaction.
     */
+   CommandCounterIncrement();
 
-   WriteBuffer(buffer);
-   if (newbuf != buffer)
-       WriteBuffer(newbuf);
-
-   /* Tuple id is no longer valid */
-   ItemPointerSetInvalid(&(obj_desc->htid));
-
-   /* done */
    return nwritten;
 }
-
-static HeapTuple
-inv_newtuple(LargeObjectDesc *obj_desc,
-            Buffer buffer,
-            Page page,
-            char *dbuf,
-            int nwrite)
-{
-   HeapTuple   ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
-   PageHeader  ph;
-   int         tupsize;
-   int         hoff;
-   Offset      lower;
-   Offset      upper;
-   ItemId      itemId;
-   OffsetNumber off;
-   OffsetNumber limit;
-   char       *attptr;
-
-   /* compute tuple size -- no nulls */
-   hoff = offsetof(HeapTupleHeaderData, t_bits);
-   hoff = MAXALIGN(hoff);
-
-   /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
-   tupsize = hoff + (2 * sizeof(int32)) + nwrite;
-   tupsize = MAXALIGN(tupsize);
-
-   /*
-    * Allocate the tuple on the page, violating the page abstraction.
-    * This code was swiped from PageAddItem().
-    */
-
-   ph = (PageHeader) page;
-   limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
-
-   /* look for "recyclable" (unused & deallocated) ItemId */
-   for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
-   {
-       itemId = &ph->pd_linp[off - 1];
-       if ((((*itemId).lp_flags & LP_USED) == 0) &&
-           ((*itemId).lp_len == 0))
-           break;
-   }
-
-   if (off > limit)
-       lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
-   else if (off == limit)
-       lower = ph->pd_lower + sizeof(ItemIdData);
-   else
-       lower = ph->pd_lower;
-
-   upper = ph->pd_upper - tupsize;
-
-   itemId = &ph->pd_linp[off - 1];
-   (*itemId).lp_off = upper;
-   (*itemId).lp_len = tupsize;
-   (*itemId).lp_flags = LP_USED;
-   ph->pd_lower = lower;
-   ph->pd_upper = upper;
-
-   ntup->t_datamcxt = NULL;
-   ntup->t_data = (HeapTupleHeader) ((char *) page + upper);
-
-   /*
-    * Tuple is now allocated on the page.  Next, fill in the tuple
-    * header.  This block of code violates the tuple abstraction.
-    */
-
-   ntup->t_len = tupsize;
-   ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off);
-   ntup->t_data->t_oid = newoid();
-   TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin));
-   ntup->t_data->t_cmin = GetCurrentCommandId();
-   StoreInvalidTransactionId(&(ntup->t_data->t_xmax));
-   ntup->t_data->t_cmax = 0;
-   ntup->t_data->t_infomask = HEAP_XMAX_INVALID;
-   ntup->t_data->t_natts = 2;
-   ntup->t_data->t_hoff = hoff;
-
-   /* if a NULL is passed in, avoid the calculations below */
-   if (dbuf == NULL)
-       return ntup;
-
-   /*
-    * Finally, copy the user's data buffer into the tuple.  This violates
-    * the tuple and class abstractions.
-    */
-
-   attptr = ((char *) ntup->t_data) + hoff;
-   *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
-   attptr += sizeof(int32);
-
-   /*
-    * *  mer fixed disk layout of varlenas to get rid of the need for
-    * this. *
-    *
-    * ((int32 *) attptr) = nwrite + sizeof(int32); *  attptr +=
-    * sizeof(int32);
-    */
-
-   *((int32 *) attptr) = nwrite + sizeof(int32);
-   attptr += sizeof(int32);
-
-   /*
-    * If a data buffer was passed in, then copy the data from the buffer
-    * to the tuple.  Some callers (eg, inv_wrold()) may not pass in a
-    * buffer, since they have to copy part of the old tuple data and part
-    * of the user's new data into the new tuple.
-    */
-
-   if (dbuf != (char *) NULL)
-       memmove(attptr, dbuf, nwrite);
-
-   /* keep track of boundary of current tuple */
-   obj_desc->lowbyte = obj_desc->offset;
-   obj_desc->highbyte = obj_desc->offset + nwrite - 1;
-
-   /* new tuple is filled -- return it */
-   return ntup;
-}
-
-static void
-inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
-{
-   InsertIndexResult res;
-   Datum       v[1];
-   char        n[1];
-
-   n[0] = ' ';
-   v[0] = Int32GetDatum(obj_desc->highbyte);
-   res = index_insert(obj_desc->index_r, &v[0], &n[0],
-                      &(tuple->t_self), obj_desc->heap_r);
-
-   if (res)
-       pfree(res);
-}
-
-#ifdef NOT_USED
-
-static void
-DumpPage(Page page, int blkno)
-{
-       ItemId          lp;
-       HeapTuple       tup;
-       int             flags, i, nline;
-       ItemPointerData pointerData;
-
-       printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
-               ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
-               ((PageHeader)page)->pd_special);
-
-       printf("\t:MaxOffsetNumber=%d\n",
-              (int16) PageGetMaxOffsetNumber(page));
-
-       nline = (int16) PageGetMaxOffsetNumber(page);
-
-{
-       int     i;
-       char    *cp;
-
-       i = PageGetSpecialSize(page);
-       cp = PageGetSpecialPointer(page);
-
-       printf("\t:SpecialData=");
-
-       while (i > 0) {
-               printf(" 0x%02x", *cp);
-               cp += 1;
-               i -= 1;
-       }
-       printf("\n");
-}
-       for (i = 0; i < nline; i++) {
-               lp = ((PageHeader)page)->pd_linp + i;
-               flags = (*lp).lp_flags;
-               ItemPointerSet(&pointerData, blkno, 1 + i);
-               printf("%s:off=%d:flags=0x%x:len=%d",
-                       ItemPointerFormExternal(&pointerData), (*lp).lp_off,
-                       flags, (*lp).lp_len);
-
-               if (flags & LP_USED) {
-                       HeapTupleData   htdata;
-
-                       printf(":USED");
-
-                       memmove((char *) &htdata,
-                               (char *) &((char *)page)[(*lp).lp_off],
-                               sizeof(htdata));
-
-                       tup = &htdata;
-
-                       printf("\n\t:ctid=%s:oid=%d",
-                               ItemPointerFormExternal(&tup->t_ctid),
-                               tup->t_oid);
-                       printf(":natts=%d:thoff=%d:",
-                               tup->t_natts,
-                               tup->t_hoff);
-
-                       printf("\n\t:cmin=%u:",
-                               tup->t_cmin);
-
-                       printf("xmin=%u:", tup->t_xmin);
-
-                       printf("\n\t:cmax=%u:",
-                               tup->t_cmax);
-
-                       printf("xmax=%u:\n", tup->t_xmax);
-
-               } else
-                       putchar('\n');
-       }
-}
-
-static char*
-ItemPointerFormExternal(ItemPointer pointer)
-{
-       static char     itemPointerString[32];
-
-       if (!ItemPointerIsValid(pointer)) {
-           memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
-       } else {
-           sprintf(itemPointerString, "<%u,%u>",
-                   ItemPointerGetBlockNumber(pointer),
-                   ItemPointerGetOffsetNumber(pointer));
-       }
-
-       return itemPointerString;
-}
-
-#endif
-
-static int
-_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
-{
-   IndexScanDesc iscan;
-   RetrieveIndexResult res;
-   HeapTupleData tuple;
-   Datum       d;
-   long        size;
-   bool        isNull;
-   Buffer      buffer;
-
-   /* scan backwards from end */
-   iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
-
-   do
-   {
-       res = index_getnext(iscan, BackwardScanDirection);
-
-       /*
-        * If there are no more index tuples, then the relation is empty,
-        * so the file's size is zero.
-        */
-
-       if (res == (RetrieveIndexResult) NULL)
-       {
-           index_endscan(iscan);
-           return 0;
-       }
-
-       /*
-        * For time travel, we need to use the actual time qual here,
-        * rather that NowTimeQual.  We currently have no way to pass a
-        * time qual in.
-        */
-       tuple.t_self = res->heap_iptr;
-       heap_fetch(hreln, SnapshotNow, &tuple, &buffer);
-       pfree(res);
-   } while (tuple.t_data == NULL);
-
-   /* don't need the index scan anymore */
-   index_endscan(iscan);
-
-   /* get olastbyte attribute */
-   d = heap_getattr(&tuple, 1, hdesc, &isNull);
-   size = DatumGetInt32(d) + 1;
-   ReleaseBuffer(buffer);
-
-   return size;
-}
index a3a914f8af386adfba52fa6ee18083390cb4cbd9..0d2c16128054afb79575a96a249d8fb79b22c70f 100644 (file)
@@ -22,7 +22,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.174 2000/10/22 23:16:55 pjw Exp $
+ *   $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.175 2000/10/24 01:38:32 tgl Exp $
  *
  * Modifications - 6/10/96 - [email protected] - version 1.13.dhb
  *
@@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal)
        fprintf(stderr, "%s saving BLOBs\n", g_comment_start);
 
    /* Cursor to get all BLOB tables */
-    appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT);
+    appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT DISTINCT loid FROM pg_largeobject");
 
    res = PQexec(g_conn, oidQry->data);
    if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1874,8 +1874,7 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs)
     * tables before the child tables when traversing the tblinfo*
     *
     * we ignore tables that are not type 'r' (ordinary relation) or 'S'
-    * (sequence) or 'v' (view) --- in particular, Large Object 
-     * relations (type 'l') are ignored.
+    * (sequence) or 'v' (view).
     */
 
    appendPQExpBuffer(query,
@@ -1886,7 +1885,6 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs)
                      "where relname !~ '^pg_' "
                      "and relkind in ('%c', '%c', '%c') "
                      "order by oid",
-               RELKIND_VIEW,
                RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW);
 
    res = PQexec(g_conn, query->data);
@@ -2585,7 +2583,7 @@ getIndices(int *numIndices)
     * find all the user-defined indices. We do not handle partial
     * indices.
     *
-    * Notice we skip indices on inversion objects (relkind 'l')
+    * Notice we skip indices on system classes
     *
     * this is a 4-way join !!
     */
@@ -2597,8 +2595,8 @@ getIndices(int *numIndices)
                    "from pg_index i, pg_class t1, pg_class t2, pg_am a "
                   "WHERE t1.oid = i.indexrelid and t2.oid = i.indrelid "
                      "and t1.relam = a.oid and i.indexrelid > '%u'::oid "
-                     "and t2.relname !~ '^pg_' and t2.relkind != '%c' and not i.indisprimary",
-                     g_last_builtin_oid, RELKIND_LOBJECT);
+                     "and t2.relname !~ '^pg_' and not i.indisprimary",
+                     g_last_builtin_oid);
 
    res = PQexec(g_conn, query->data);
    if (!res ||
index d97c8a7b67024e7e6adb9a08ddc7433f70bff84b..9cb8384dc29dc2b1ca3acda777fef0971ec3e238 100644 (file)
@@ -59,7 +59,7 @@ proc update_attnvals {conn rel} {
 proc updateStats { dbName } {
     # datnames is the list to be result
     set conn [pg_connect $dbName]
-    set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_' and relname !~ '^xinv'"]
+    set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_'"]
     set ntups [pg_result $res -numTuples]
     for {set i 0} {$i < $ntups} {incr i} {
    set rel [pg_result $res -getTuple $i]
index 26c54b366a1c59a147fc3ebe06f8e0d5dec7fc87..3db2eb95a66f13043687bff8972f316f4d01b63a 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Copyright 2000 by PostgreSQL Global Development Group
  *
- * $Header: /cvsroot/pgsql/src/bin/psql/describe.c,v 1.24 2000/09/07 04:55:27 ishii Exp $
+ * $Header: /cvsroot/pgsql/src/bin/psql/describe.c,v 1.25 2000/10/24 01:38:38 tgl Exp $
  */
 #include "postgres.h"
 #include "describe.h"
@@ -1020,10 +1020,6 @@ listTables(const char *infotype, const char *name, bool desc)
            strcat(buf, "'S'");
        strcat(buf, ")\n");
 
-       /* ignore large-obj indices */
-       if (showIndices)
-           strcat(buf, "  AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n");
-
        strcat(buf, showSystem ? "  AND c.relname ~ '^pg_'\n" : "  AND c.relname !~ '^pg_'\n");
        if (name)
        {
@@ -1050,10 +1046,6 @@ listTables(const char *infotype, const char *name, bool desc)
            strcat(buf, "'S'");
        strcat(buf, ")\n");
 
-       /* ignore large-obj indices */
-       if (showIndices)
-           strcat(buf, "  AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n");
-
        strcat(buf, showSystem ? "  AND c.relname ~ '^pg_'\n" : "  AND c.relname !~ '^pg_'\n");
        if (name)
        {
index 020b0173eb4017c1427b0dcb9434373510cbd77b..5cfd18c328fbdb80b2af61037448566c3abd397c 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Copyright 2000 by PostgreSQL Global Development Group
  *
- * $Header: /cvsroot/pgsql/src/bin/psql/large_obj.c,v 1.10 2000/04/12 17:16:22 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/bin/psql/large_obj.c,v 1.11 2000/10/24 01:38:39 tgl Exp $
  */
 #include "postgres.h"
 #include "large_obj.h"
@@ -193,7 +193,7 @@ do_lo_import(const char *filename_arg, const char *comment_arg)
    /* insert description if given */
    if (comment_arg)
    {
-       sprintf(buf, "INSERT INTO pg_description VALUES (%d, '", loid);
+       sprintf(buf, "INSERT INTO pg_description VALUES (%u, '", loid);
        for (i = 0; i < strlen(comment_arg); i++)
            if (comment_arg[i] == '\'')
                strcat(buf, "\\'");
@@ -284,7 +284,7 @@ do_lo_unlink(const char *loid_arg)
    }
 
    /* remove the comment as well */
-   sprintf(buf, "DELETE FROM pg_description WHERE objoid = %d", loid);
+   sprintf(buf, "DELETE FROM pg_description WHERE objoid = %u", loid);
    if (!(res = PSQLexec(buf)))
    {
        if (own_transaction)
@@ -328,15 +328,9 @@ do_lo_list(void)
    printQueryOpt myopt = pset.popt;
 
    strcpy(buf,
-   "SELECT usename as \"Owner\", substring(relname from 5) as \"ID\",\n"
-          "  obj_description(pg_class.oid) as \"Description\"\n"
-          "FROM pg_class, pg_user\n"
-          "WHERE usesysid = relowner AND relkind = 'l'\n"
-          "UNION\n"
-      "SELECT NULL as \"Owner\", substring(relname from 5) as \"ID\",\n"
-          "  obj_description(pg_class.oid) as \"Description\"\n"
-          "FROM pg_class\n"
-          "WHERE not exists (select 1 from pg_user where usesysid = relowner) AND relkind = 'l'\n"
+   "SELECT DISTINCT loid as \"ID\",\n"
+          "  obj_description(loid) as \"Description\"\n"
+          "FROM pg_largeobject\n"
           "ORDER BY \"ID\"");
 
    res = PSQLexec(buf);
index b82977d806c4bf8de69d3bb1d23fe786e350cffe..54b964e215295685fffacbe972141dc28d7bdcba 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catname.h,v 1.16 2000/10/22 05:27:20 momjian Exp $
+ * $Id: catname.h,v 1.17 2000/10/24 01:38:41 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -29,6 +29,7 @@
 #define  InheritsRelationName "pg_inherits"
 #define  InheritancePrecidenceListRelationName "pg_ipl"
 #define  LanguageRelationName "pg_language"
+#define  LargeObjectRelationName "pg_largeobject"
 #define  ListenerRelationName "pg_listener"
 #define  LogRelationName "pg_log"
 #define  OperatorClassRelationName "pg_opclass"
index c16c6ae83ecc6c3f3082543e386d4816d5508a08..f6fd284f34d7ab99d959a08755266dbfdda728f4 100644 (file)
@@ -37,7 +37,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catversion.h,v 1.51 2000/10/22 17:55:49 pjw Exp $
+ * $Id: catversion.h,v 1.52 2000/10/24 01:38:41 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 200010231
+#define CATALOG_VERSION_NO 200010232
 
 #endif
index 6cc98bdc322ec437deb550e20bf2f3da65975143..7150a43d2df729c5e702feef75db8621906f614c 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: indexing.h,v 1.44 2000/10/22 05:27:20 momjian Exp $
+ * $Id: indexing.h,v 1.45 2000/10/24 01:38:41 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -31,6 +31,7 @@
 #define Num_pg_index_indices       2
 #define Num_pg_inherits_indices        1
 #define Num_pg_language_indices        2
+#define Num_pg_largeobject_indices 1
 #define Num_pg_listener_indices        1
 #define Num_pg_opclass_indices     2
 #define Num_pg_operator_indices        2
@@ -62,6 +63,7 @@
 #define InheritsRelidSeqnoIndex        "pg_inherits_relid_seqno_index"
 #define LanguageNameIndex          "pg_language_name_index"
 #define LanguageOidIndex           "pg_language_oid_index"
+#define LargeObjectLOidPNIndex     "pg_largeobject_loid_pn_index"
 #define ListenerPidRelnameIndex        "pg_listener_pid_relname_index"
 #define OpclassDeftypeIndex            "pg_opclass_deftype_index"
 #define OpclassNameIndex           "pg_opclass_name_index"
@@ -92,6 +94,7 @@ extern char *Name_pg_group_indices[];
 extern char *Name_pg_index_indices[];
 extern char *Name_pg_inherits_indices[];
 extern char *Name_pg_language_indices[];
+extern char *Name_pg_largeobject_indices[];
 extern char *Name_pg_listener_indices[];
 extern char *Name_pg_opclass_indices[];
 extern char *Name_pg_operator_indices[];
@@ -191,6 +194,7 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli
 DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
 DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops));
 DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops));
+DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops));
 DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops));
 /* This column needs to allow multiple zero entries, but is in the cache */
 DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops));
index a9592e7ddb852ebc35ac2fe39640d1c26201f2ce..68db583fe3a02a7d4e0e73602a85df12872bcec4 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: pg_class.h,v 1.43 2000/10/22 17:55:49 pjw Exp $
+ * $Id: pg_class.h,v 1.44 2000/10/24 01:38:41 tgl Exp $
  *
  * NOTES
  *   the genbki.sh script reads this file and generates .bki
@@ -174,7 +174,6 @@ DESCR("");
 #define XactLockTableId            376
 
 #define          RELKIND_INDEX           'i'       /* secondary index */
-#define          RELKIND_LOBJECT         'l'       /* large objects */
 #define          RELKIND_RELATION        'r'       /* ordinary cataloged heap */
 #define          RELKIND_SPECIAL         's'       /* special (non-heap) */
 #define          RELKIND_SEQUENCE        'S'       /* SEQUENCE relation */
diff --git a/src/include/catalog/pg_largeobject.h b/src/include/catalog/pg_largeobject.h
new file mode 100644 (file)
index 0000000..7777604
--- /dev/null
@@ -0,0 +1,63 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.h
+ *   definition of the system "largeobject" relation (pg_largeobject)
+ *   along with the relation's initial contents.
+ *
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: pg_largeobject.h,v 1.5 2000/10/24 01:38:41 tgl Exp $
+ *
+ * NOTES
+ *   the genbki.sh script reads this file and generates .bki
+ *   information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LARGEOBJECT_H
+#define PG_LARGEOBJECT_H
+
+/* ----------------
+ *     postgres.h contains the system type definintions and the
+ *     CATALOG(), BOOTSTRAP and DATA() sugar words so this file
+ *     can be read by both genbki.sh and the C compiler.
+ * ----------------
+ */
+
+/* ----------------
+ *     pg_largeobject definition.  cpp turns this into
+ *     typedef struct FormData_pg_largeobject. Large object id
+ *     is stored in loid;
+ * ----------------
+ */
+
+CATALOG(pg_largeobject)
+{
+   Oid         loid;           /* Identifier of large object */
+   int4        pageno;         /* Page number (starting from 0) */
+   bytea       data;           /* Data for page (may be zero-length) */
+} FormData_pg_largeobject;
+
+/* ----------------
+ *     Form_pg_largeobject corresponds to a pointer to a tuple with
+ *     the format of pg_largeobject relation.
+ * ----------------
+ */
+typedef FormData_pg_largeobject *Form_pg_largeobject;
+
+/* ----------------
+ *     compiler constants for pg_largeobject
+ * ----------------
+ */
+#define Natts_pg_largeobject           3
+#define Anum_pg_largeobject_loid       1
+#define Anum_pg_largeobject_pageno     2
+#define Anum_pg_largeobject_data       3
+
+extern Oid LargeObjectCreate(Oid loid);
+extern void LargeObjectDrop(Oid loid);
+extern bool LargeObjectExists(Oid loid);
+
+#endif  /* PG_LARGEOBJECT_H */
index c480f5b7874059e94ea3fb3e95a440edfde816ff..6bb0c4fcf2e6612167641db0e7616b68609f9227 100644 (file)
@@ -8,39 +8,54 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: large_object.h,v 1.17 2000/10/22 05:27:23 momjian Exp $
+ * $Id: large_object.h,v 1.18 2000/10/24 01:38:43 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef LARGE_OBJECT_H
 #define LARGE_OBJECT_H
 
-#include 
+#include "utils/rel.h"
 
-#include "access/relscan.h"
 
-/*
- * This structure will eventually have lots more stuff associated with it.
+/*----------
+ * Data about a currently-open large object.
+ *
+ * id is the logical OID of the large object
+ * offset is the current seek offset within the LO
+ * heap_r holds an open-relation reference to pg_largeobject
+ * index_r holds an open-relation reference to pg_largeobject_loid_pn_index
+ *
+ * NOTE: before 7.1, heap_r and index_r held references to the separate
+ * table and index of a specific large object.  Now they all live in one rel.
+ *----------
  */
-typedef struct LargeObjectDesc
-{
-   Relation    heap_r;         /* heap relation */
-   Relation    index_r;        /* index relation on seqno attribute */
-   IndexScanDesc iscan;        /* index scan we're using */
-   TupleDesc   hdesc;          /* heap relation tuple desc */
-   TupleDesc   idesc;          /* index relation tuple desc */
-   uint32      lowbyte;        /* low byte on the current page */
-   uint32      highbyte;       /* high byte on the current page */
+typedef struct LargeObjectDesc {
+   Oid         id;
    uint32      offset;         /* current seek pointer */
-   ItemPointerData htid;       /* tid of current heap tuple */
+   int         flags;          /* locking info, etc */
 
+/* flag bits: */
 #define IFS_RDLOCK     (1 << 0)
 #define IFS_WRLOCK     (1 << 1)
-#define IFS_ATEOF      (1 << 2)
 
-   u_long      flags;          /* locking info, etc */
+   Relation    heap_r;
+   Relation    index_r;
 } LargeObjectDesc;
 
+
+/*
+ * Each "page" (tuple) of a large object can hold this much data
+ *
+ * Calculation is max tuple size less tuple header, loid field (Oid),
+ * pageno field (int32), and varlena header of data (int32).  Note we
+ * assume none of the fields will be NULL, hence no need for null bitmap.
+ */
+#define    LOBLKSIZE       (MaxTupleSize \
+                        - MAXALIGN(offsetof(HeapTupleHeaderData, t_bits)) \
+                        - sizeof(Oid) - sizeof(int32) * 2)
+
+
 /*
  * Function definitions...
  */
@@ -55,7 +70,4 @@ extern int    inv_tell(LargeObjectDesc *obj_desc);
 extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
 extern int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes);
 
-/* added for buffer leak prevention [ PA ] */
-extern void inv_cleanindex(LargeObjectDesc *obj_desc);
-
 #endif  /* LARGE_OBJECT_H */
index 9c99a120adf1ed73c4f98a7476e27df4b355a087..9d4e75a9e0a6742b474dfb3e43fd756357df84b9 100644 (file)
@@ -1007,8 +1007,7 @@ mylog("%s: entering...stmt=%u\n", func, stmt);
    }
 
 
-   /*  filter out large objects unconditionally (they are not system tables) and match users */
-   strcat(tables_query, " and relname !~ '^xinv[0-9]+'");
+   /* match users */
    strcat(tables_query, " and usesysid = relowner");
    strcat(tables_query, " order by relname");
 
index f5d2427cfa1b7d048589d67ecb52d0a1c0a1ec11..9fd96b22803b6434ec91dd10e2e6f5eb54f91ea2 100644 (file)
@@ -482,8 +482,8 @@ WHERE p1.aggtransfn = p2.oid AND
           (p2.pronargs = 1 AND p1.aggbasetype = 0)));
   oid  | aggname | oid |   proname   
 -------+---------+-----+-------------
- 16984 | max     | 768 | int4larger
- 16998 | min     | 769 | int4smaller
+ 16996 | max     | 768 | int4larger
+ 17010 | min     | 769 | int4smaller
 (2 rows)
 
 -- Cross-check finalfn (if present) against its entry in pg_proc.
index 823d9e142db0b9eb1999c48762678fe5f2353a38..f2412386d176bfb5b99971396772a9ccf6d64bbc 100644 (file)
@@ -40,6 +40,7 @@ SELECT relname, relhasindex
  pg_index            | t
  pg_inherits         | t
  pg_language         | t
+ pg_largeobject      | t
  pg_listener         | t
  pg_opclass          | t
  pg_operator         | t
@@ -54,5 +55,5 @@ SELECT relname, relhasindex
  shighway            | t
  tenk1               | t
  tenk2               | t
-(44 rows)
+(45 rows)