Add large object access control.
authorItagaki Takahiro
Fri, 11 Dec 2009 03:34:57 +0000 (03:34 +0000)
committerItagaki Takahiro
Fri, 11 Dec 2009 03:34:57 +0000 (03:34 +0000)
A new system catalog pg_largeobject_metadata manages
ownership and access privileges of large objects.

KaiGai Kohei, reviewed by Jaime Casanova.

39 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/config.sgml
doc/src/sgml/lobj.sgml
doc/src/sgml/ref/allfiles.sgml
doc/src/sgml/ref/alter_large_object.sgml [new file with mode: 0755]
doc/src/sgml/ref/grant.sgml
doc/src/sgml/ref/revoke.sgml
doc/src/sgml/reference.sgml
src/backend/catalog/Makefile
src/backend/catalog/aclchk.c
src/backend/catalog/dependency.c
src/backend/catalog/pg_largeobject.c
src/backend/catalog/pg_shdepend.c
src/backend/commands/alter.c
src/backend/commands/comment.c
src/backend/commands/tablecmds.c
src/backend/libpq/be-fsstubs.c
src/backend/parser/gram.y
src/backend/storage/large_object/inv_api.c
src/backend/tcop/utility.c
src/backend/utils/adt/acl.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/bin/initdb/initdb.c
src/bin/pg_dump/dumputils.c
src/bin/pg_dump/pg_dump.c
src/bin/psql/large_obj.c
src/bin/psql/tab-complete.c
src/include/catalog/catversion.h
src/include/catalog/dependency.h
src/include/catalog/indexing.h
src/include/catalog/pg_largeobject.h
src/include/catalog/pg_largeobject_metadata.h [new file with mode: 0755]
src/include/libpq/be-fsstubs.h
src/include/nodes/parsenodes.h
src/include/utils/acl.h
src/test/regress/expected/privileges.out
src/test/regress/expected/sanity_check.out
src/test/regress/sql/privileges.sql

index be5b037aa0b7e03aa64496c389b7e3a8e570e4c2..9d7f7346d95d925915adee65a6f836287bd600f9 100644 (file)
@@ -1,4 +1,4 @@
-
+
 
 
      
       pg_largeobject
-      large objects
+      data pages for large objects
+     
+
+     
+      pg_largeobject_metadata
+      metadata for large objects
      
 
      
 
   
    The catalog pg_largeobject holds the data making up
-   large objects.  A large object is identified by an
-   OID assigned when it is created.  Each large object is broken into
+   large objects.  A large object is identified by an OID of
+   pg_largeobject_metadata
+   catalog, assigned when it is created.  Each large object is broken into
    segments or pages small enough to be conveniently stored as rows
    in pg_largeobject.
    The amount of data per page is defined to be LOBLKSIZE (which is currently
    BLCKSZ/4, or typically 2 kB).
   
 
+  
+   pg_largeobject should not be readable by the
+   public, since the catalog contains data in large objects of all users.
+   pg_largeobject_metadata is a publicly readable catalog
+   that only contains identifiers of large objects.
+  
+
   
    <structname>pg_largeobject</> Columns
 
-   3">
+   4">
     
      
       Name
       Type
+      References
       Description
      
     
      
       loid
       oid
+      pg_largeobject_metadata.oid
       Identifier of the large object that includes this page
      
 
      
       pageno
       int4
+      
       Page number of this page within its large object
       (counting from zero)
      
      
       data
       bytea
+      
       
        Actual data stored in the large object.
        This will never be more than LOBLKSIZE bytes and might be less
 
  
 
+  <structname>pg_largeobject_metadata</structname>
+
+  
+   pg_largeobject_metadata
+  
+
+  
+   The purpose of pg_largeobject_metadata is to
+   hold metadata of large objects, such as OID of its owner,
+   access permissions and OID of the large object itself.
+  
+
+  
+   <structname>pg_largeobject_metadata</> Columns
+
+   
+    
+     
+      Name
+      Type
+      References
+      Description
+     
+    
+
+    
+     
+      lomowner
+      oid
+      pg_authid.oid
+      Owner of the largeobejct
+     
+
+     
+      lomacl
+      aclitem[]
+      
+       Access privileges; see
+        and
+       
+       for details
+      
+     
+
+    
+   
+  
 
  
   <structname>pg_listener</structname>
index 1fb32c8de398d6c700881239df026d232ab680d9..8045f5c95beed1d16231117447ca5d9df85a89de 100644 (file)
@@ -1,4 +1,4 @@
-
+
 
 
   Server Configuration
@@ -4816,6 +4816,35 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
       
      
 
+     
+      lo_compat_privileges (boolean)
+      
+       
+        lo_compat_privileges configuration parameter
+       
+      
+      
+       
+        This allows us to tuen on/off database privilege checks on large
+        objects. In the 8.4.x series and earlier release do not have
+        privilege checks on large object in most cases.
+
+        So, turning the lo_compat_privileges off means
+        the large object feature performs in compatible mode.
+       
+       
+        Please note that it is not equivalent to disable all the security
+        checks corresponding to large objects.
+        For example, the lo_import() and
+        lo_export() need superuser privileges independent
+        from this setting as prior versions were doing.
+       
+       
+        It is off by default.
+       
+      
+     
+
      
       sql_inheritance (boolean)
       
index 750b9c5c4ee053c8d61f3cd978b7c0a3bf20d8f4..1cec73e4c025723b193f2193e2a945605e105c4d 100644 (file)
@@ -1,4 +1,4 @@
-
+
 
  
   Large Objects
@@ -441,6 +441,57 @@ SELECT lo_export(image.raster, '/tmp/motd') FROM image
     The client-side functions can be used by any
     PostgreSQL user.
   
+
+  
+   Large object and privileges
+   
+    Note that access control feature was not supported in the 8.4.x series
+    and earlier release.
+    Also see the  compatibility
+    option.
+   
+   
+    Now it supports access controls on large objects, and allows the owner
+    of large objects to set up access rights using
+     and
+     statement.
+   
+   
+    Two permissions are defined on the large object class.
+    These are checked only when 
+    option is disabled.
+   
+   
+    The first is SELECT.
+    It is required on loread() function.
+    Note that when we open large object with read-only mode, we can see
+    a static image even if other concurrent transaction modified the
+    same large object.
+    This principle is also applied on the access rights of large objects.
+    Even if a transaction modified access rights and commit it, it is
+    not invisible from other transaction which already opened the large
+    object.
+   
+   
+    The second is UPDATE.
+    It is required on lowrite() function and
+    lo_truncate() function.
+   
+   
+    In addition, lo_unlink() function,
+    COMMENT ON and ALTER LARGE OBJECT
+    statements needs ownership of the large object to be accessed.
+   
+   
+    You may wonder why SELECT is not checked on the
+    lo_export() function or UPDATE
+    is not checked on the lo_import function.
+
+    These functions originally require database superuser privilege,
+    and it allows to bypass the default database privilege checks,
+    so we don't need to check an obvious test twice.
+   
+  
 
 
 
index c15579c5164916a5134426f1292c16c6db4b5edc..1754aae58b8fcc6708619a71b8713a1ae90794b6 100644 (file)
@@ -1,5 +1,5 @@
 
@@ -16,6 +16,7 @@ Complete list of usable sgml source files in this directory.
 
 
 
+
 
 
 
diff --git a/doc/src/sgml/ref/alter_large_object.sgml b/doc/src/sgml/ref/alter_large_object.sgml
new file mode 100755 (executable)
index 0000000..3436ae8
--- /dev/null
@@ -0,0 +1,75 @@
+
+  ALTER LARGE OBJECT
+  7
+  SQL - Language Statements
+
+  ALTER LARGE OBJECT
+  change the definition of a large object
+
+  ALTER LARGE OBJECT
+
+
+ALTER LARGE OBJECT large_object_oid OWNER TO new_owner
+
+
+  Description
+
+  
+   ALTER LARGE OBJECT changes the definition of a
+   large object. The only functionality is to assign a new owner.
+   You must be superuser or owner of the large object to use
+   ALTER LARGE OBJECT.
+  
+
+  Parameters
+
+  
+   
+    large_object_oid
+    
+     
+      OID of the large object to be altered
+     
+    
+   
+
+   
+    new_owner
+    
+     
+      The new owner of the large object
+     
+    
+   
+  
+
+  Compatibility
+    
+  
+   There is no ALTER LARGE OBJECT statement in the SQL
+   standard.
+  
+
+  See Also
+
+  
+   
+  
+
+
index 2e8f2050f113289e5bdbe7e52b7fec788804dbaf..86879acedbcb1077f65d9a3b364c51da555f1849 100644 (file)
@@ -1,5 +1,5 @@
 
 
@@ -59,6 +59,10 @@ GRANT { USAGE | ALL [ PRIVILEGES ] }
     ON LANGUAGE lang_name [, ...]
     TO { [ GROUP ] role_name | PUBLIC } [, ...] [ WITH GRANT OPTION ]
 
+GRANT { { SELECT | UPDATE } [,...] | ALL [ PRIVILEGES ] }
+    ON LARGE OBJECT loid [, ...]
+    TO { [ GROUP ] rolename | PUBLIC } [, ...] [ WITH GRANT OPTION ]
+
 GRANT { { CREATE | USAGE } [,...] | ALL [ PRIVILEGES ] }
     ON SCHEMA schema_name [, ...]
     TO { [ GROUP ] role_name | PUBLIC } [, ...] [ WITH GRANT OPTION ]
@@ -170,6 +174,8 @@ GRANT role_name [, ...] TO 
        .
        For sequences, this privilege also allows the use of the
        currval function.
+       For large objects, this privilege also allows to read from
+       the target large object.
       
      
     
@@ -203,6 +209,8 @@ GRANT role_name [, ...] TO 
        SELECT privilege.  For sequences, this
        privilege allows the use of the nextval and
        setval functions.
+       For large objects, this privilege also allows to write or truncate
+       on the target large object.
       
      
     
index 0b8aea534c4e954bc8d123685b794a0e8faf8f1f..e31549fa3c3d20278717aee4950a2f68ed441799 100644 (file)
@@ -1,5 +1,5 @@
 
 
@@ -75,6 +75,12 @@ REVOKE [ GRANT OPTION FOR ]
     FROM { [ GROUP ] role_name | PUBLIC } [, ...]
     [ CASCADE | RESTRICT ]
 
+REVOKE [ GRANT OPTION FOR ]
+    { { SELECT | UPDATE } [,...] | ALL [ PRIVILEGES ] }
+    ON LARGE OBJECT loid [, ...]
+    FROM { [ GROUP ] rolename | PUBLIC } [, ...]
+    [ CASCADE | RESTRICT ]
+
 REVOKE [ GRANT OPTION FOR ]
     { { CREATE | USAGE } [,...] | ALL [ PRIVILEGES ] }
     ON SCHEMA schema_name [, ...]
index 0e72fc5475bd5851652e0bb8f06e7ab911c27c36..f97bf651ce55fbca51731dd1dc06d63a857e8362 100644 (file)
@@ -1,4 +1,4 @@
-
+
 
 
  Reference
@@ -44,6 +44,7 @@
    &alterGroup;
    &alterIndex;
    &alterLanguage;
+   &alterLargeObject;
    &alterOperator;
    &alterOperatorClass;
    &alterOperatorFamily;
index ec548990b1003b850eb5883d2d87e4c25bf65fb1..02a2b01b815b936475de6088a51947ae1e99cd99 100644 (file)
@@ -2,7 +2,7 @@
 #
 # Makefile for backend/catalog
 #
-# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.73 2009/10/07 22:14:16 alvherre Exp $
+# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.74 2009/12/11 03:34:55 itagaki Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -29,9 +29,9 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
    pg_proc.h pg_type.h pg_attribute.h pg_class.h \
    pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
    pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
-   pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
-   pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \
-   pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
+   pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
+   pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \
+   pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
    pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
    pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
    pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
index 8b2599a99f53b06455142f24cfd112efba1017c7..3c2fdb0cf14556c9fb98195b68815f1e558c16ca 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.156 2009/10/12 20:39:39 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.157 2009/12/11 03:34:55 itagaki Exp $
  *
  * NOTES
  *   See acl.h.
@@ -31,6 +31,8 @@
 #include "catalog/pg_foreign_data_wrapper.h"
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_language.h"
+#include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
@@ -103,6 +105,7 @@ static void ExecGrant_Fdw(InternalGrant *grantStmt);
 static void ExecGrant_ForeignServer(InternalGrant *grantStmt);
 static void ExecGrant_Function(InternalGrant *grantStmt);
 static void ExecGrant_Language(InternalGrant *grantStmt);
+static void ExecGrant_Largeobject(InternalGrant *grantStmt);
 static void ExecGrant_Namespace(InternalGrant *grantStmt);
 static void ExecGrant_Tablespace(InternalGrant *grantStmt);
 
@@ -251,6 +254,9 @@ restrict_and_check_grant(bool is_grant, AclMode avail_goptions, bool all_privs,
        case ACL_KIND_LANGUAGE:
            whole_mask = ACL_ALL_RIGHTS_LANGUAGE;
            break;
+       case ACL_KIND_LARGEOBJECT:
+           whole_mask = ACL_ALL_RIGHTS_LARGEOBJECT;
+           break;
        case ACL_KIND_NAMESPACE:
            whole_mask = ACL_ALL_RIGHTS_NAMESPACE;
            break;
@@ -410,6 +416,10 @@ ExecuteGrantStmt(GrantStmt *stmt)
            all_privileges = ACL_ALL_RIGHTS_LANGUAGE;
            errormsg = gettext_noop("invalid privilege type %s for language");
            break;
+       case ACL_OBJECT_LARGEOBJECT:
+           all_privileges = ACL_ALL_RIGHTS_LARGEOBJECT;
+           errormsg = gettext_noop("invalid privilege type %s for large object");
+           break;
        case ACL_OBJECT_NAMESPACE:
            all_privileges = ACL_ALL_RIGHTS_NAMESPACE;
            errormsg = gettext_noop("invalid privilege type %s for schema");
@@ -513,6 +523,9 @@ ExecGrantStmt_oids(InternalGrant *istmt)
        case ACL_OBJECT_LANGUAGE:
            ExecGrant_Language(istmt);
            break;
+       case ACL_OBJECT_LARGEOBJECT:
+           ExecGrant_Largeobject(istmt);
+           break;
        case ACL_OBJECT_NAMESPACE:
            ExecGrant_Namespace(istmt);
            break;
@@ -597,6 +610,20 @@ objectNamesToOids(GrantObjectType objtype, List *objnames)
                ReleaseSysCache(tuple);
            }
            break;
+       case ACL_OBJECT_LARGEOBJECT:
+           foreach(cell, objnames)
+           {
+               Oid     lobjOid = intVal(lfirst(cell));
+
+               if (!LargeObjectExists(lobjOid))
+                   ereport(ERROR,
+                           (errcode(ERRCODE_UNDEFINED_OBJECT),
+                            errmsg("large object %u does not exist",
+                                   lobjOid)));
+
+               objects = lappend_oid(objects, lobjOid);
+           }
+           break;
        case ACL_OBJECT_NAMESPACE:
            foreach(cell, objnames)
            {
@@ -1279,6 +1306,9 @@ RemoveRoleFromObjectACL(Oid roleid, Oid classid, Oid objid)
            case LanguageRelationId:
                istmt.objtype = ACL_OBJECT_LANGUAGE;
                break;
+           case LargeObjectRelationId:
+               istmt.objtype = ACL_OBJECT_LARGEOBJECT;
+               break;
            case NamespaceRelationId:
                istmt.objtype = ACL_OBJECT_NAMESPACE;
                break;
@@ -2472,6 +2502,138 @@ ExecGrant_Language(InternalGrant *istmt)
    heap_close(relation, RowExclusiveLock);
 }
 
+static void
+ExecGrant_Largeobject(InternalGrant *istmt)
+{
+   Relation    relation;
+   ListCell   *cell;
+
+   if (istmt->all_privs && istmt->privileges == ACL_NO_RIGHTS)
+       istmt->privileges = ACL_ALL_RIGHTS_LARGEOBJECT;
+
+   relation = heap_open(LargeObjectMetadataRelationId,
+                        RowExclusiveLock);
+
+   foreach(cell, istmt->objects)
+   {
+       Oid         loid = lfirst_oid(cell);
+       Form_pg_largeobject_metadata    form_lo_meta;
+       char        loname[NAMEDATALEN];
+       Datum       aclDatum;
+       bool        isNull;
+       AclMode     avail_goptions;
+       AclMode     this_privileges;
+       Acl        *old_acl;
+       Acl        *new_acl;
+       Oid         grantorId;
+       Oid         ownerId;
+       HeapTuple   newtuple;
+       Datum       values[Natts_pg_largeobject_metadata];
+       bool        nulls[Natts_pg_largeobject_metadata];
+       bool        replaces[Natts_pg_largeobject_metadata];
+       int         noldmembers;
+       int         nnewmembers;
+       Oid        *oldmembers;
+       Oid        *newmembers;
+       ScanKeyData entry[1];
+       SysScanDesc scan;
+       HeapTuple   tuple;
+
+       /* There's no syscache for pg_largeobject_metadata */
+       ScanKeyInit(&entry[0],
+                   ObjectIdAttributeNumber,
+                   BTEqualStrategyNumber, F_OIDEQ,
+                   ObjectIdGetDatum(loid));
+
+       scan = systable_beginscan(relation,
+                                 LargeObjectMetadataOidIndexId, true,
+                                 SnapshotNow, 1, entry);
+
+       tuple = systable_getnext(scan);
+       if (!HeapTupleIsValid(tuple))
+           elog(ERROR, "cache lookup failed for large object %u", loid);
+
+       form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(tuple);
+
+       /*
+        * Get owner ID and working copy of existing ACL. If there's no ACL,
+        * substitute the proper default.
+        */
+       ownerId = form_lo_meta->lomowner;
+       aclDatum = heap_getattr(tuple,
+                               Anum_pg_largeobject_metadata_lomacl,
+                               RelationGetDescr(relation), &isNull);
+       if (isNull)
+           old_acl = acldefault(ACL_OBJECT_LARGEOBJECT, ownerId);
+       else
+           old_acl = DatumGetAclPCopy(aclDatum);
+
+       /* Determine ID to do the grant as, and available grant options */
+       select_best_grantor(GetUserId(), istmt->privileges,
+                           old_acl, ownerId,
+                           &grantorId, &avail_goptions);
+
+       /*
+        * Restrict the privileges to what we can actually grant, and emit the
+        * standards-mandated warning and error messages.
+        */
+       snprintf(loname, sizeof(loname), "large object %u", loid);
+       this_privileges =
+           restrict_and_check_grant(istmt->is_grant, avail_goptions,
+                                    istmt->all_privs, istmt->privileges,
+                                    loid, grantorId, ACL_KIND_LARGEOBJECT,
+                                    loname, 0, NULL);
+
+       /*
+        * Generate new ACL.
+        *
+        * We need the members of both old and new ACLs so we can correct the
+        * shared dependency information.
+        */
+       noldmembers = aclmembers(old_acl, &oldmembers);
+
+       new_acl = merge_acl_with_grant(old_acl, istmt->is_grant,
+                                      istmt->grant_option, istmt->behavior,
+                                      istmt->grantees, this_privileges,
+                                      grantorId, ownerId);
+
+       nnewmembers = aclmembers(new_acl, &newmembers);
+
+       /* finished building new ACL value, now insert it */
+       MemSet(values, 0, sizeof(values));
+       MemSet(nulls, false, sizeof(nulls));
+       MemSet(replaces, false, sizeof(replaces));
+
+       replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true;
+       values[Anum_pg_largeobject_metadata_lomacl - 1]
+           = PointerGetDatum(new_acl);
+
+       newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation),
+                                    values, nulls, replaces);
+
+       simple_heap_update(relation, &newtuple->t_self, newtuple);
+
+       /* keep the catalog indexes up to date */
+       CatalogUpdateIndexes(relation, newtuple);
+
+       /* Update the shared dependency ACL info */
+       updateAclDependencies(LargeObjectRelationId,
+                             HeapTupleGetOid(tuple), 0,
+                             ownerId, istmt->is_grant,
+                             noldmembers, oldmembers,
+                             nnewmembers, newmembers);
+
+       systable_endscan(scan);
+
+       pfree(new_acl);
+
+       /* prevent error when processing duplicate objects */
+       CommandCounterIncrement();
+   }
+
+   heap_close(relation, RowExclusiveLock);
+}
+
 static void
 ExecGrant_Namespace(InternalGrant *istmt)
 {
@@ -2812,6 +2974,8 @@ static const char *const no_priv_msg[MAX_ACL_KIND] =
    gettext_noop("permission denied for type %s"),
    /* ACL_KIND_LANGUAGE */
    gettext_noop("permission denied for language %s"),
+   /* ACL_KIND_LARGEOBJECT */
+   gettext_noop("permission denied for large object %s"),
    /* ACL_KIND_NAMESPACE */
    gettext_noop("permission denied for schema %s"),
    /* ACL_KIND_OPCLASS */
@@ -2850,6 +3014,8 @@ static const char *const not_owner_msg[MAX_ACL_KIND] =
    gettext_noop("must be owner of type %s"),
    /* ACL_KIND_LANGUAGE */
    gettext_noop("must be owner of language %s"),
+   /* ACL_KIND_LARGEOBJECT */
+   gettext_noop("must be owner of large object %s"),
    /* ACL_KIND_NAMESPACE */
    gettext_noop("must be owner of schema %s"),
    /* ACL_KIND_OPCLASS */
@@ -2969,6 +3135,9 @@ pg_aclmask(AclObjectKind objkind, Oid table_oid, AttrNumber attnum, Oid roleid,
            return pg_proc_aclmask(table_oid, roleid, mask, how);
        case ACL_KIND_LANGUAGE:
            return pg_language_aclmask(table_oid, roleid, mask, how);
+       case ACL_KIND_LARGEOBJECT:
+           return pg_largeobject_aclmask_snapshot(table_oid, roleid,
+                                                  mask, how, SnapshotNow);
        case ACL_KIND_NAMESPACE:
            return pg_namespace_aclmask(table_oid, roleid, mask, how);
        case ACL_KIND_TABLESPACE:
@@ -3351,6 +3520,90 @@ pg_language_aclmask(Oid lang_oid, Oid roleid,
    return result;
 }
 
+/*
+ * Exported routine for examining a user's privileges for a largeobject
+ *
+ * The reason why this interface has an argument of snapshot is that
+ * we apply a snapshot available on lo_open(), not SnapshotNow, when
+ * it is opened as read-only mode.
+ * If we could see the metadata and data from inconsistent viewpoint,
+ * it will give us much confusion. So, we need to provide an interface
+ * which takes an argument of snapshot.
+ *
+ * If the caller refers a large object with a certain snapshot except
+ * for SnapshotNow, its permission checks should be also applied in
+ * the same snapshot.
+ */
+AclMode
+pg_largeobject_aclmask_snapshot(Oid lobj_oid, Oid roleid,
+                               AclMode mask, AclMaskHow how,
+                               Snapshot snapshot)
+{
+   AclMode     result;
+   Relation    pg_lo_meta;
+   ScanKeyData entry[1];
+   SysScanDesc scan;
+   HeapTuple   tuple;
+   Datum       aclDatum;
+   bool        isNull;
+   Acl        *acl;
+   Oid         ownerId;
+
+   /* Superusers bypass all permission checking. */
+   if (superuser_arg(roleid))
+       return mask;
+
+   /*
+    * Get the largeobject's ACL from pg_language_metadata
+    */
+   pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+                          AccessShareLock);
+
+   ScanKeyInit(&entry[0],
+               ObjectIdAttributeNumber,
+               BTEqualStrategyNumber, F_OIDEQ,
+               ObjectIdGetDatum(lobj_oid));
+
+   scan = systable_beginscan(pg_lo_meta,
+                             LargeObjectMetadataOidIndexId, true,
+                             snapshot, 1, entry);
+
+   tuple = systable_getnext(scan);
+   if (!HeapTupleIsValid(tuple))
+       ereport(ERROR,
+               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                errmsg("large object %u does not exist", lobj_oid)));
+
+   ownerId = ((Form_pg_largeobject_metadata) GETSTRUCT(tuple))->lomowner;
+
+   aclDatum = heap_getattr(tuple, Anum_pg_largeobject_metadata_lomacl,
+                           RelationGetDescr(pg_lo_meta), &isNull);
+
+   if (isNull)
+   {
+       /* No ACL, so build default ACL */
+       acl = acldefault(ACL_OBJECT_LARGEOBJECT, ownerId);
+       aclDatum = (Datum) 0;
+   }
+   else
+   {
+       /* detoast ACL if necessary */
+       acl = DatumGetAclP(aclDatum);
+   }
+
+   result = aclmask(acl, roleid, ownerId, mask, how);
+
+   /* if we have a detoasted copy, free it */
+   if (acl && (Pointer) acl != DatumGetPointer(aclDatum))
+       pfree(acl);
+
+   systable_endscan(scan);
+
+   heap_close(pg_lo_meta, AccessShareLock);
+
+   return result;
+}
+
 /*
  * Exported routine for examining a user's privileges for a namespace
  */
@@ -3801,6 +4054,20 @@ pg_language_aclcheck(Oid lang_oid, Oid roleid, AclMode mode)
        return ACLCHECK_NO_PRIV;
 }
 
+/*
+ * Exported routine for checking a user's access privileges to a largeobject
+ */
+AclResult
+pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid, AclMode mode,
+                                Snapshot snapshot)
+{
+   if (pg_largeobject_aclmask_snapshot(lobj_oid, roleid, mode,
+                                       ACLMASK_ANY, snapshot) != 0)
+       return ACLCHECK_OK;
+   else
+       return ACLCHECK_NO_PRIV;
+}
+
 /*
  * Exported routine for checking a user's access privileges to a namespace
  */
@@ -3991,6 +4258,53 @@ pg_language_ownercheck(Oid lan_oid, Oid roleid)
    return has_privs_of_role(roleid, ownerId);
 }
 
+/*
+ * Ownership check for a largeobject (specified by OID)
+ *
+ * Note that we have no candidate to call this routine with a certain
+ * snapshot except for SnapshotNow, so we don't provide an interface
+ * with _snapshot() version now.
+ */
+bool
+pg_largeobject_ownercheck(Oid lobj_oid, Oid roleid)
+{
+   Relation    pg_lo_meta;
+   ScanKeyData entry[1];
+   SysScanDesc scan;
+   HeapTuple   tuple;
+   Oid         ownerId;
+
+   /* Superusers bypass all permission checking. */
+   if (superuser_arg(roleid))
+       return true;
+
+   /* There's no syscache for pg_largeobject_metadata */
+   pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+                          AccessShareLock);
+
+   ScanKeyInit(&entry[0],
+               ObjectIdAttributeNumber,
+               BTEqualStrategyNumber, F_OIDEQ,
+               ObjectIdGetDatum(lobj_oid));
+
+   scan = systable_beginscan(pg_lo_meta,
+                             LargeObjectMetadataOidIndexId, true,
+                             SnapshotNow, 1, entry);
+
+   tuple = systable_getnext(scan);
+   if (!HeapTupleIsValid(tuple))
+       ereport(ERROR,
+               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                errmsg("large object %u does not exist", lobj_oid)));
+
+   ownerId = ((Form_pg_largeobject_metadata) GETSTRUCT(tuple))->lomowner;
+
+   systable_endscan(scan);
+   heap_close(pg_lo_meta, AccessShareLock);
+
+   return has_privs_of_role(roleid, ownerId);
+}
+
 /*
  * Ownership check for a namespace (specified by OID).
  */
index 8a07c69c7e85681e47c5aeafc10b1f8b26192d00..4ef3bb3c56fbbd51bdfaabc07ff8aa41ff42f741 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.92 2009/10/05 19:24:35 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.93 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -37,6 +37,7 @@
 #include "catalog/pg_foreign_data_wrapper.h"
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_language.h"
+#include "catalog/pg_largeobject.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
@@ -131,6 +132,7 @@ static const Oid object_classes[MAX_OCLASS] = {
    ConversionRelationId,       /* OCLASS_CONVERSION */
    AttrDefaultRelationId,      /* OCLASS_DEFAULT */
    LanguageRelationId,         /* OCLASS_LANGUAGE */
+   LargeObjectRelationId,      /* OCLASS_LARGEOBJECT */
    OperatorRelationId,         /* OCLASS_OPERATOR */
    OperatorClassRelationId,    /* OCLASS_OPCLASS */
    OperatorFamilyRelationId,   /* OCLASS_OPFAMILY */
@@ -1074,6 +1076,10 @@ doDeletion(const ObjectAddress *object)
            DropProceduralLanguageById(object->objectId);
            break;
 
+       case OCLASS_LARGEOBJECT:
+           LargeObjectDrop(object->objectId);
+           break;
+
        case OCLASS_OPERATOR:
            RemoveOperatorById(object->objectId);
            break;
@@ -1991,6 +1997,10 @@ getObjectClass(const ObjectAddress *object)
            Assert(object->objectSubId == 0);
            return OCLASS_LANGUAGE;
 
+       case LargeObjectRelationId:
+           Assert(object->objectSubId == 0);
+           return OCLASS_LARGEOBJECT;
+
        case OperatorRelationId:
            Assert(object->objectSubId == 0);
            return OCLASS_OPERATOR;
@@ -2243,6 +2253,10 @@ getObjectDescription(const ObjectAddress *object)
                ReleaseSysCache(langTup);
                break;
            }
+       case OCLASS_LARGEOBJECT:
+           appendStringInfo(&buffer, _("large object %u"),
+                            object->objectId);
+           break;
 
        case OCLASS_OPERATOR:
            appendStringInfo(&buffer, _("operator %s"),
index 313ccdd3f07b4e8f1d978dcd9f2adc75c2ca6425..517a0f3932ece4e5b0d2d9d7b6666eb290291b81 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.33 2009/08/04 16:08:36 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.34 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/sysattr.h"
+#include "catalog/catalog.h"
+#include "catalog/dependency.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
+#include "catalog/toasting.h"
+#include "miscadmin.h"
+#include "utils/acl.h"
 #include "utils/bytea.h"
 #include "utils/fmgroids.h"
 #include "utils/rel.h"
 /*
  * Create a large object having the given LO identifier.
  *
- * We do this by inserting an empty first page, so that the object will
- * appear to exist with size 0.  Note that the unique index will reject
- * an attempt to create a duplicate page.
+ * We create a new large object by inserting an entry into
+ * pg_largeobject_metadata without any data pages, so that the object
+ * will appear to exist with size 0.
  */
-void
+Oid
 LargeObjectCreate(Oid loid)
 {
-   Relation    pg_largeobject;
+   Relation    pg_lo_meta;
    HeapTuple   ntup;
-   Datum       values[Natts_pg_largeobject];
-   bool        nulls[Natts_pg_largeobject];
-   int         i;
+   Oid         loid_new;
+   Datum       values[Natts_pg_largeobject_metadata];
+   bool        nulls[Natts_pg_largeobject_metadata];
 
-   pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
+   pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+                          RowExclusiveLock);
 
    /*
-    * Form new tuple
+    * Insert metadata of the largeobject
     */
-   for (i = 0; i < Natts_pg_largeobject; i++)
-   {
-       values[i] = (Datum) NULL;
-       nulls[i] = false;
-   }
+   memset(values, 0, sizeof(values));
+   memset(nulls, false, sizeof(nulls));
 
-   i = 0;
-   values[i++] = ObjectIdGetDatum(loid);
-   values[i++] = Int32GetDatum(0);
-   values[i++] = DirectFunctionCall1(byteain,
-                                     CStringGetDatum(""));
+   values[Anum_pg_largeobject_metadata_lomowner - 1]
+       = ObjectIdGetDatum(GetUserId());
+   nulls[Anum_pg_largeobject_metadata_lomacl - 1] = true;
 
-   ntup = heap_form_tuple(pg_largeobject->rd_att, values, nulls);
+   ntup = heap_form_tuple(RelationGetDescr(pg_lo_meta),
+                          values, nulls);
+   if (OidIsValid(loid))
+       HeapTupleSetOid(ntup, loid);
 
-   /*
-    * Insert it
-    */
-   simple_heap_insert(pg_largeobject, ntup);
-
-   /* Update indexes */
-   CatalogUpdateIndexes(pg_largeobject, ntup);
+   loid_new = simple_heap_insert(pg_lo_meta, ntup);
+   Assert(!OidIsValid(loid) || loid == loid_new);
 
-   heap_close(pg_largeobject, RowExclusiveLock);
+   CatalogUpdateIndexes(pg_lo_meta, ntup);
 
    heap_freetuple(ntup);
+
+   heap_close(pg_lo_meta, RowExclusiveLock);
+
+   return loid_new;
 }
 
+/*
+ * Drop a large object having the given LO identifier.
+ *
+ * When we drop a large object, it is necessary to drop both of metadata
+ * and data pages in same time.
+ */
 void
 LargeObjectDrop(Oid loid)
 {
-   bool        found = false;
+   Relation    pg_lo_meta;
    Relation    pg_largeobject;
    ScanKeyData skey[1];
-   SysScanDesc sd;
+   SysScanDesc scan;
    HeapTuple   tuple;
 
+   pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+                          RowExclusiveLock);
+
+   pg_largeobject = heap_open(LargeObjectRelationId,
+                              RowExclusiveLock);
+
+   /*
+    * Delete an entry from pg_largeobject_metadata
+    */
    ScanKeyInit(&skey[0],
-               Anum_pg_largeobject_loid,
+               ObjectIdAttributeNumber,
                BTEqualStrategyNumber, F_OIDEQ,
-               ObjectIdGetDatum(loid));
+               ObjectIdGetDatum(loid));    
 
-   pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
+   scan = systable_beginscan(pg_lo_meta,
+                             LargeObjectMetadataOidIndexId, true,
+                             SnapshotNow, 1, skey);
 
-   sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
-                           SnapshotNow, 1, skey);
+   tuple = systable_getnext(scan);
+   if (!HeapTupleIsValid(tuple))
+       ereport(ERROR,
+               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                errmsg("large object %u does not exist", loid)));
+
+   simple_heap_delete(pg_lo_meta, &tuple->t_self);
+
+   systable_endscan(scan);
+
+   /*
+    * Delete all the associated entries from pg_largeobject
+    */
+   ScanKeyInit(&skey[0],
+               Anum_pg_largeobject_loid,
+               BTEqualStrategyNumber, F_OIDEQ,
+               ObjectIdGetDatum(loid));
 
-   while ((tuple = systable_getnext(sd)) != NULL)
+   scan = systable_beginscan(pg_largeobject,
+                             LargeObjectLOidPNIndexId, true,
+                             SnapshotNow, 1, skey);
+   while (HeapTupleIsValid(tuple = systable_getnext(scan)))
    {
        simple_heap_delete(pg_largeobject, &tuple->t_self);
-       found = true;
    }
 
-   systable_endscan(sd);
+   systable_endscan(scan);
 
    heap_close(pg_largeobject, RowExclusiveLock);
 
-   if (!found)
+   heap_close(pg_lo_meta, RowExclusiveLock);
+}
+
+/*
+ * LargeObjectAlterOwner
+ *
+ * Implementation of ALTER LARGE OBJECT statement
+ */
+void
+LargeObjectAlterOwner(Oid loid, Oid newOwnerId)
+{
+   Form_pg_largeobject_metadata    form_lo_meta;
+   Relation    pg_lo_meta;
+   ScanKeyData skey[1];
+   SysScanDesc scan;
+   HeapTuple   oldtup;
+   HeapTuple   newtup;
+
+   pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+                          RowExclusiveLock);
+
+   ScanKeyInit(&skey[0],
+               ObjectIdAttributeNumber,
+               BTEqualStrategyNumber, F_OIDEQ,
+               ObjectIdGetDatum(loid));
+
+   scan = systable_beginscan(pg_lo_meta,
+                             LargeObjectMetadataOidIndexId, true,
+                             SnapshotNow, 1, skey);
+
+   oldtup = systable_getnext(scan);
+   if (!HeapTupleIsValid(oldtup))
        ereport(ERROR,
                (errcode(ERRCODE_UNDEFINED_OBJECT),
                 errmsg("large object %u does not exist", loid)));
+
+   form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(oldtup);
+   if (form_lo_meta->lomowner != newOwnerId)
+   {
+       Datum       values[Natts_pg_largeobject_metadata];
+       bool        nulls[Natts_pg_largeobject_metadata];
+       bool        replaces[Natts_pg_largeobject_metadata];
+       Acl        *newAcl;
+       Datum       aclDatum;
+       bool        isnull;
+
+       /* Superusers can always do it */
+       if (!superuser())
+       {
+           /*
+            * The 'lo_compat_privileges' is not checked here, because we
+            * don't have any access control features in the 8.4.x series
+            * or earlier release.
+            * So, it is not a place we can define a compatible behavior.
+            */
+
+           /* Otherwise, must be owner of the existing object */
+           if (!pg_largeobject_ownercheck(loid, GetUserId()))
+               ereport(ERROR,
+                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                        errmsg("must be owner of large object %u", loid)));
+
+           /* Must be able to become new owner */
+           check_is_member_of_role(GetUserId(), newOwnerId);
+       }
+
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+       memset(replaces, false, sizeof(nulls));
+
+       values[Anum_pg_largeobject_metadata_lomowner - 1]
+           = ObjectIdGetDatum(newOwnerId);
+       replaces[Anum_pg_largeobject_metadata_lomowner - 1] = true;
+
+       /*
+        * Determine the modified ACL for the new owner.
+        * This is only necessary when the ACL is non-null.
+        */
+       aclDatum = heap_getattr(oldtup,
+                               Anum_pg_largeobject_metadata_lomacl,
+                               RelationGetDescr(pg_lo_meta), &isnull);
+       if (!isnull)
+       {
+           newAcl = aclnewowner(DatumGetAclP(aclDatum),
+                                form_lo_meta->lomowner, newOwnerId);
+           values[Anum_pg_largeobject_metadata_lomacl - 1]
+               = PointerGetDatum(newAcl);
+           replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true;
+       }
+
+       newtup = heap_modify_tuple(oldtup, RelationGetDescr(pg_lo_meta),
+                                  values, nulls, replaces);
+
+       simple_heap_update(pg_lo_meta, &newtup->t_self, newtup);
+       CatalogUpdateIndexes(pg_lo_meta, newtup);
+
+       heap_freetuple(newtup);
+
+       /* Update owner dependency reference */
+       changeDependencyOnOwner(LargeObjectRelationId,
+                               loid, newOwnerId);
+   }
+   systable_endscan(scan);
+
+   heap_close(pg_lo_meta, RowExclusiveLock);
 }
 
+/*
+ * LargeObjectExists
+ *
+ * Currently, we don't use system cache to contain metadata of
+ * large objects, because massive number of large objects can
+ * consume not a small amount of process local memory.
+ *
+ * Note that LargeObjectExists always scans the system catalog
+ * with SnapshotNow, so it is unavailable to use to check
+ * existence in read-only accesses.
+ */
 bool
 LargeObjectExists(Oid loid)
 {
+   Relation    pg_lo_meta;
+   ScanKeyData skey[1];
+   SysScanDesc sd;
+   HeapTuple   tuple;
    bool        retval = false;
-   Relation    pg_largeobject;
-   ScanKeyData skey[1];
-   SysScanDesc sd;
 
-   /*
-    * See if we can find any tuples belonging to the specified LO
-    */
    ScanKeyInit(&skey[0],
-               Anum_pg_largeobject_loid,
+               ObjectIdAttributeNumber,
                BTEqualStrategyNumber, F_OIDEQ,
                ObjectIdGetDatum(loid));
 
-   pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
+   pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+                          AccessShareLock);
 
-   sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
+   sd = systable_beginscan(pg_lo_meta,
+                           LargeObjectMetadataOidIndexId, true,
                            SnapshotNow, 1, skey);
 
-   if (systable_getnext(sd) != NULL)
+   tuple = systable_getnext(sd);
+   if (HeapTupleIsValid(tuple))
        retval = true;
 
    systable_endscan(sd);
 
-   heap_close(pg_largeobject, AccessShareLock);
+   heap_close(pg_lo_meta, AccessShareLock);
 
    return retval;
 }
index be70143ea27e4ba554c9ffd13043b8d7912bc70a..7130444448e9502da857fdbcf412f943cc7ef2a6 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.36 2009/10/07 22:14:18 alvherre Exp $
+ *   $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.37 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -25,6 +25,7 @@
 #include "catalog/pg_database.h"
 #include "catalog/pg_default_acl.h"
 #include "catalog/pg_language.h"
+#include "catalog/pg_largeobject.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
@@ -1347,6 +1348,10 @@ shdepReassignOwned(List *roleids, Oid newrole)
                    AlterLanguageOwner_oid(sdepForm->objid, newrole);
                    break;
 
+               case LargeObjectRelationId:
+                   LargeObjectAlterOwner(sdepForm->objid, newrole);
+                   break;
+
                case DefaultAclRelationId:
                    /*
                     * Ignore default ACLs; they should be handled by
index b91f2205d1ceed111ce97b43e0cd1729dee53113..8ba630a83db4055b9214ed7080fdabd7fd81bcd1 100644 (file)
@@ -8,13 +8,14 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/alter.c,v 1.31 2009/01/01 17:23:37 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/alter.c,v 1.32 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "catalog/namespace.h"
+#include "catalog/pg_largeobject.h"
 #include "commands/alter.h"
 #include "commands/conversioncmds.h"
 #include "commands/dbcommands.h"
@@ -233,6 +234,10 @@ ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
            AlterLanguageOwner(strVal(linitial(stmt->object)), newowner);
            break;
 
+       case OBJECT_LARGEOBJECT:
+           LargeObjectAlterOwner(intVal(linitial(stmt->object)), newowner);
+           break;
+
        case OBJECT_OPERATOR:
            Assert(list_length(stmt->objarg) == 2);
            AlterOperatorOwner(stmt->object,
index 610816db6d02cc1b9d8d45195f58e5da3464dedc..d57ea25d9ca0892eb51a57ffd54e9275ef82331d 100644 (file)
@@ -7,7 +7,7 @@
  * Copyright (c) 1996-2009, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/comment.c,v 1.108 2009/10/12 19:49:24 adunstan Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/comment.c,v 1.109 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -25,6 +25,7 @@
 #include "catalog/pg_description.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
@@ -42,6 +43,7 @@
 #include "commands/comment.h"
 #include "commands/dbcommands.h"
 #include "commands/tablespace.h"
+#include "libpq/be-fsstubs.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
@@ -1435,7 +1437,20 @@ CommentLargeObject(List *qualname, char *comment)
                (errcode(ERRCODE_UNDEFINED_OBJECT),
                 errmsg("large object %u does not exist", loid)));
 
-   /* Call CreateComments() to create/drop the comments */
+   /* Permission checks */
+   if (!lo_compat_privileges &&
+       !pg_largeobject_ownercheck(loid, GetUserId()))
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("must be owner of large object %u", loid)));
+
+   /*
+    * Call CreateComments() to create/drop the comments
+    *
+    * See the comment in the inv_create() which describes
+    * the reason why LargeObjectRelationId is used instead
+    * of the LargeObjectMetadataRelationId.
+    */
    CreateComments(loid, LargeObjectRelationId, 0, comment);
 }
 
index c9188b2d7c6088f1cd128cef077a20bb140553ed..2344b79547b9abb0ac0b8b589483aa1c0ab7d4e6 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.308 2009/12/09 21:57:50 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.309 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -6186,6 +6186,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
            case OCLASS_CAST:
            case OCLASS_CONVERSION:
            case OCLASS_LANGUAGE:
+           case OCLASS_LARGEOBJECT:
            case OCLASS_OPERATOR:
            case OCLASS_OPCLASS:
            case OCLASS_OPFAMILY:
index 24605b5490df3dcfb3e276f4e219227dd95492b6..0b816b6ff02d73c81409485491387705b7038ac3 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/libpq/be-fsstubs.c,v 1.91 2009/06/11 14:48:58 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/libpq/be-fsstubs.c,v 1.92 2009/12/11 03:34:55 itagaki Exp $
  *
  * NOTES
  *   This should be moved to a more appropriate place.  It is here
 #include 
 #include 
 
+#include "catalog/pg_largeobject_metadata.h"
 #include "libpq/be-fsstubs.h"
 #include "libpq/libpq-fs.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/large_object.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 
+/*
+ * compatibility flag for permission checks
+ */
+bool lo_compat_privileges;
 
 /*#define FSDB 1*/
 #define BUFSIZE            8192
@@ -156,6 +162,17 @@ lo_read(int fd, char *buf, int len)
                (errcode(ERRCODE_UNDEFINED_OBJECT),
                 errmsg("invalid large-object descriptor: %d", fd)));
 
+   /* Permission checks */
+   if (!lo_compat_privileges &&
+       pg_largeobject_aclcheck_snapshot(cookies[fd]->id,
+                                        GetUserId(),
+                                        ACL_SELECT,
+                                        cookies[fd]->snapshot) != ACLCHECK_OK)
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("permission denied for large object %u",
+                       cookies[fd]->id)));
+
    status = inv_read(cookies[fd], buf, len);
 
    return status;
@@ -177,6 +194,17 @@ lo_write(int fd, const char *buf, int len)
              errmsg("large object descriptor %d was not opened for writing",
                     fd)));
 
+   /* Permission checks */
+   if (!lo_compat_privileges &&
+       pg_largeobject_aclcheck_snapshot(cookies[fd]->id,
+                                        GetUserId(),
+                                        ACL_UPDATE,
+                                        cookies[fd]->snapshot) != ACLCHECK_OK)
+       ereport(ERROR,
+                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("permission denied for large object %u",
+                       cookies[fd]->id)));
+
    status = inv_write(cookies[fd], buf, len);
 
    return status;
@@ -251,6 +279,13 @@ lo_unlink(PG_FUNCTION_ARGS)
 {
    Oid         lobjId = PG_GETARG_OID(0);
 
+   /* Must be owner of the largeobject */
+   if (!lo_compat_privileges &&
+       !pg_largeobject_ownercheck(lobjId, GetUserId()))
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("must be owner of large object %u", lobjId)));
+
    /*
     * If there are any open LO FDs referencing that ID, close 'em.
     */
@@ -482,6 +517,17 @@ lo_truncate(PG_FUNCTION_ARGS)
                (errcode(ERRCODE_UNDEFINED_OBJECT),
                 errmsg("invalid large-object descriptor: %d", fd)));
 
+   /* Permission checks */
+   if (!lo_compat_privileges &&
+       pg_largeobject_aclcheck_snapshot(cookies[fd]->id,
+                                        GetUserId(),
+                                        ACL_UPDATE,
+                                        cookies[fd]->snapshot) != ACLCHECK_OK)
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("permission denied for large object %u",
+                       cookies[fd]->id)));
+
    inv_truncate(cookies[fd], len);
 
    PG_RETURN_INT32(0);
index 8dca257360575558d73f985d72767c6bdd418fbb..3257ccbbf839dbb437d521a5cab9763473461612 100644 (file)
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.695 2009/12/07 05:22:22 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.696 2009/12/11 03:34:55 itagaki Exp $
  *
  * HISTORY
  *   AUTHOR            DATE            MAJOR EVENT
@@ -397,6 +397,7 @@ static TypeName *TableFuncTypeName(List *columns);
 %type  opt_varying opt_timezone
 
 %type    Iconst SignedIconst
+%type    Iconst_list
 %type         Sconst comment_text
 %type         RoleId opt_granted_by opt_boolean ColId_or_Sconst
 %type    var_list
@@ -4576,6 +4577,14 @@ privilege_target:
                    n->objs = $2;
                    $$ = n;
                }
+           | LARGE_P OBJECT_P Iconst_list
+               {
+                   PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
+                   n->targtype = ACL_TARGET_OBJECT;
+                   n->objtype = ACL_OBJECT_LARGEOBJECT;
+                   n->objs = $3;
+                   $$ = n;
+               }
            | SCHEMA name_list
                {
                    PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
@@ -5851,6 +5860,14 @@ AlterOwnerStmt: ALTER AGGREGATE func_name aggr_args OWNER TO RoleId
                    n->newowner = $7;
                    $$ = (Node *)n;
                }
+           | ALTER LARGE_P OBJECT_P Iconst OWNER TO RoleId
+               {
+                   AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
+                   n->objectType = OBJECT_LARGEOBJECT;
+                   n->object = list_make1(makeInteger($4));
+                   n->newowner = $7;
+                   $$ = (Node *)n;
+               }
            | ALTER OPERATOR any_operator oper_argtypes OWNER TO RoleId
                {
                    AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
@@ -10542,6 +10559,10 @@ SignedIconst: Iconst                               { $$ = $1; }
            | '-' Iconst                            { $$ = - $2; }
        ;
 
+Iconst_list:   Iconst                      { $$ = list_make1(makeInteger($1)); }
+               | Iconst_list ',' Iconst    { $$ = lappend($1, makeInteger($3)); }
+       ;
+
 /*
  * Name classification hierarchy.
  *
index 0def1decd0da4d6a284a5a77b04d9d585fcc7bf0..224971c03eb1df0b19a4ca331d7e55bde2e50711 100644 (file)
@@ -24,7 +24,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.138 2009/06/11 14:49:02 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.139 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/sysattr.h"
 #include "access/tuptoaster.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
+#include "catalog/dependency.h"
 #include "catalog/indexing.h"
 #include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
 #include "commands/comment.h"
 #include "libpq/libpq-fs.h"
+#include "miscadmin.h"
 #include "storage/large_object.h"
 #include "utils/fmgroids.h"
 #include "utils/rel.h"
 #include "utils/resowner.h"
 #include "utils/snapmgr.h"
+#include "utils/syscache.h"
 #include "utils/tqual.h"
 
 
@@ -139,30 +144,31 @@ close_lo_relation(bool isCommit)
 static bool
 myLargeObjectExists(Oid loid, Snapshot snapshot)
 {
+   Relation    pg_lo_meta;
+   ScanKeyData skey[1];
+   SysScanDesc sd;
+   HeapTuple   tuple;
    bool        retval = false;
-   Relation    pg_largeobject;
-   ScanKeyData skey[1];
-   SysScanDesc sd;
 
-   /*
-    * See if we can find any tuples belonging to the specified LO
-    */
    ScanKeyInit(&skey[0],
-               Anum_pg_largeobject_loid,
+               ObjectIdAttributeNumber,
                BTEqualStrategyNumber, F_OIDEQ,
                ObjectIdGetDatum(loid));
 
-   pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
+   pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+                          AccessShareLock);
 
-   sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
+   sd = systable_beginscan(pg_lo_meta,
+                           LargeObjectMetadataOidIndexId, true,
                            snapshot, 1, skey);
 
-   if (systable_getnext(sd) != NULL)
+   tuple = systable_getnext(sd);
+   if (HeapTupleIsValid(tuple))
        retval = true;
 
    systable_endscan(sd);
 
-   heap_close(pg_largeobject, AccessShareLock);
+   heap_close(pg_lo_meta, AccessShareLock);
 
    return retval;
 }
@@ -193,31 +199,31 @@ getbytealen(bytea *data)
 Oid
 inv_create(Oid lobjId)
 {
+   Oid         lobjId_new;
+
    /*
-    * Allocate an OID to be the LO's identifier, unless we were told what to
-    * use.  We can use the index on pg_largeobject for checking OID
-    * uniqueness, even though it has additional columns besides OID.
+    * Create a new largeobject with empty data pages
     */
-   if (!OidIsValid(lobjId))
-   {
-       open_lo_relation();
-
-       lobjId = GetNewOidWithIndex(lo_heap_r, LargeObjectLOidPNIndexId,
-                                   Anum_pg_largeobject_loid);
-   }
+   lobjId_new = LargeObjectCreate(lobjId);
 
    /*
-    * Create the LO by writing an empty first page for it in pg_largeobject
-    * (will fail if duplicate)
+    * dependency on the owner of largeobject
+    *
+    * The reason why we use LargeObjectRelationId instead of
+    * LargeObjectMetadataRelationId here is to provide backward
+    * compatibility to the applications which utilize a knowledge
+    * about internal layout of system catalogs.
+    * OID of pg_largeobject_metadata and loid of pg_largeobject
+    * are same value, so there are no actual differences here.
     */
-   LargeObjectCreate(lobjId);
-
+   recordDependencyOnOwner(LargeObjectRelationId,
+                           lobjId_new, GetUserId());
    /*
     * Advance command counter to make new tuple visible to later operations.
     */
    CommandCounterIncrement();
 
-   return lobjId;
+   return lobjId_new;
 }
 
 /*
@@ -292,10 +298,15 @@ inv_close(LargeObjectDesc *obj_desc)
 int
 inv_drop(Oid lobjId)
 {
-   LargeObjectDrop(lobjId);
+   ObjectAddress   object;
 
-   /* Delete any comments on the large object */
-   DeleteComments(lobjId, LargeObjectRelationId, 0);
+   /*
+    * Delete any comments and dependencies on the large object
+    */
+   object.classId = LargeObjectRelationId;
+   object.objectId = lobjId;
+   object.objectSubId = 0;
+   performDeletion(&object, DROP_CASCADE);
 
    /*
     * Advance command counter so that tuple removal will be seen by later
@@ -315,7 +326,6 @@ inv_drop(Oid lobjId)
 static uint32
 inv_getsize(LargeObjectDesc *obj_desc)
 {
-   bool        found = false;
    uint32      lastbyte = 0;
    ScanKeyData skey[1];
    SysScanDesc sd;
@@ -339,13 +349,13 @@ inv_getsize(LargeObjectDesc *obj_desc)
     * large object in reverse pageno order.  So, it's sufficient to examine
     * the first valid tuple (== last valid page).
     */
-   while ((tuple = systable_getnext_ordered(sd, BackwardScanDirection)) != NULL)
+   tuple = systable_getnext_ordered(sd, BackwardScanDirection);
+   if (HeapTupleIsValid(tuple))
    {
        Form_pg_largeobject data;
        bytea      *datafield;
        bool        pfreeit;
 
-       found = true;
        if (HeapTupleHasNulls(tuple))   /* paranoia */
            elog(ERROR, "null field found in pg_largeobject");
        data = (Form_pg_largeobject) GETSTRUCT(tuple);
@@ -360,15 +370,10 @@ inv_getsize(LargeObjectDesc *obj_desc)
        lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
        if (pfreeit)
            pfree(datafield);
-       break;
    }
 
    systable_endscan_ordered(sd);
 
-   if (!found)
-       ereport(ERROR,
-               (errcode(ERRCODE_UNDEFINED_OBJECT),
-                errmsg("large object %u does not exist", obj_desc->id)));
    return lastbyte;
 }
 
@@ -545,6 +550,12 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
                 errmsg("large object %u was not opened for writing",
                        obj_desc->id)));
 
+   /* check existence of the target largeobject */
+   if (!LargeObjectExists(obj_desc->id))
+       ereport(ERROR,
+               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                errmsg("large object %u was already dropped", obj_desc->id)));
+
    if (nbytes <= 0)
        return 0;
 
@@ -736,6 +747,12 @@ inv_truncate(LargeObjectDesc *obj_desc, int len)
                 errmsg("large object %u was not opened for writing",
                        obj_desc->id)));
 
+   /* check existence of the target largeobject */
+   if (!LargeObjectExists(obj_desc->id))
+       ereport(ERROR,
+               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                errmsg("large object %u was already dropped", obj_desc->id)));
+
    open_lo_relation();
 
    indstate = CatalogOpenIndexes(lo_heap_r);
index 59576a25d8bb6e11cae2257e9cf2c6e9eb0baae9..2fd4b9923f5434d6713e213f02ecf7cbfd5e0e1b 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.322 2009/12/09 21:57:51 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.323 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1638,6 +1638,9 @@ CreateCommandTag(Node *parsetree)
                case OBJECT_LANGUAGE:
                    tag = "ALTER LANGUAGE";
                    break;
+               case OBJECT_LARGEOBJECT:
+                   tag = "ALTER LARGEOBJECT";
+                   break;
                case OBJECT_OPERATOR:
                    tag = "ALTER OPERATOR";
                    break;
index 142e06cf45fc93702269a7d2cfbcc02a6f3c1c38..31cbbe7c7609078bfd2b0f14edcb305db47bff2b 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.151 2009/12/05 21:43:35 petere Exp $
+ *   $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.152 2009/12/11 03:34:55 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -764,6 +764,11 @@ acldefault(GrantObjectType objtype, Oid ownerId)
            world_default = ACL_USAGE;
            owner_default = ACL_ALL_RIGHTS_LANGUAGE;
            break;
+       case ACL_OBJECT_LARGEOBJECT:
+           /* Grant SELECT,UPDATE by default, for now */
+           world_default = ACL_NO_RIGHTS;
+           owner_default = ACL_ALL_RIGHTS_LARGEOBJECT;
+           break;
        case ACL_OBJECT_NAMESPACE:
            world_default = ACL_NO_RIGHTS;
            owner_default = ACL_ALL_RIGHTS_NAMESPACE;
index edc5544f7f3ef74779e4c6585fa04f07adee5b4c..900c3662786356e70cfb353c089eacaf6414a205 100644 (file)
@@ -10,7 +10,7 @@
  * Written by Peter Eisentraut .
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.526 2009/12/09 21:57:51 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.527 2009/12/11 03:34:56 itagaki Exp $
  *
  *--------------------------------------------------------------------
  */
@@ -38,6 +38,7 @@
 #include "commands/trigger.h"
 #include "funcapi.h"
 #include "libpq/auth.h"
+#include "libpq/be-fsstubs.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/cost.h"
@@ -1226,6 +1227,16 @@ static struct config_bool ConfigureNamesBool[] =
        false, NULL, NULL
    },
 
+   {
+       {"lo_compat_privileges", PGC_SUSET, COMPAT_OPTIONS_PREVIOUS,
+           gettext_noop("Enables backward compatibility in privilege checks on large objects"),
+           gettext_noop("When turned on, privilege checks on large objects perform "
+                        "with backward compatibility as 8.4.x or earlier releases.")
+       },
+       &lo_compat_privileges,
+       false, NULL, NULL
+   },
+
    /* End-of-list marker */
    {
        {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
index f2accd263e6d423e7ca0d77e1e589eb5c29d9e3b..d2da9b9c3d2f665b19e4267049ab827c7de2d506 100644 (file)
 #backslash_quote = safe_encoding   # on, off, or safe_encoding
 #default_with_oids = off
 #escape_string_warning = on
+#lo_compat_privileges = off
 #sql_inheritance = on
 #standard_conforming_strings = off
 #synchronize_seqscans = on
index de0aba36a865bfaff3120481a27edb36012bce8d..6f1f211c141a81bf55d69b312cf9837da653c657 100644 (file)
@@ -42,7 +42,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  * Portions taken from FreeBSD.
  *
- * $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.177 2009/11/14 15:39:36 mha Exp $
+ * $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.178 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1783,6 +1783,7 @@ setup_privileges(void)
        "  WHERE relkind IN ('r', 'v', 'S') AND relacl IS NULL;\n",
        "GRANT USAGE ON SCHEMA pg_catalog TO PUBLIC;\n",
        "GRANT CREATE, USAGE ON SCHEMA public TO PUBLIC;\n",
+       "REVOKE ALL ON pg_largeobject FROM PUBLIC;\n",
        NULL
    };
 
index c7cfa2a6212a1d7fd3df8183cd9627a6ff89554d..45559665aee58534064888927fa4480e9b6e2b2e 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.51 2009/10/12 23:41:43 tgl Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.52 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -862,6 +862,11 @@ do { \
        CONVERT_PRIV('U', "USAGE");
    else if (strcmp(type, "SERVER") == 0)
        CONVERT_PRIV('U', "USAGE");
+   else if (strcmp(type, "LARGE OBJECT") == 0)
+   {
+       CONVERT_PRIV('r', "SELECT");
+       CONVERT_PRIV('w', "UPDATE");
+   }
    else
        abort();
 
index a753976139c81aa5625cbada7fc5a38b683d33f9..7af461bd5a65dc544c7b8d7c517238026f9b687f 100644 (file)
@@ -12,7 +12,7 @@
  * by PostgreSQL
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.554 2009/12/07 05:22:22 tgl Exp $
+ *   $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.555 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -2045,7 +2045,9 @@ dumpBlobs(Archive *AH, void *arg)
 
 /*
  * dumpBlobComments
- * dump all blob comments
+ * dump all blob properties.
+ *  It has "BLOB COMMENTS" tag due to the historical reason, but note
+ *  that it is the routine to dump all the properties of blobs.
  *
  * Since we don't provide any way to be selective about dumping blobs,
  * there's no need to be selective about their comments either.  We put
@@ -2056,30 +2058,35 @@ dumpBlobComments(Archive *AH, void *arg)
 {
    const char *blobQry;
    const char *blobFetchQry;
-   PQExpBuffer commentcmd = createPQExpBuffer();
+   PQExpBuffer cmdQry = createPQExpBuffer();
    PGresult   *res;
    int         i;
 
    if (g_verbose)
-       write_msg(NULL, "saving large object comments\n");
+       write_msg(NULL, "saving large object properties\n");
 
    /* Make sure we are in proper schema */
    selectSourceSchema("pg_catalog");
 
    /* Cursor to get all BLOB comments */
-   if (AH->remoteVersion >= 70300)
+   if (AH->remoteVersion >= 80500)
+       blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
+           "obj_description(oid, 'pg_largeobject'), "
+           "pg_get_userbyid(lomowner), lomacl "
+           "FROM pg_largeobject_metadata";
+   else if (AH->remoteVersion >= 70300)
        blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-           "obj_description(loid, 'pg_largeobject') "
+           "obj_description(loid, 'pg_largeobject'), NULL, NULL "
            "FROM (SELECT DISTINCT loid FROM "
            "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
            "WHERE classoid = 'pg_largeobject'::regclass) ss";
    else if (AH->remoteVersion >= 70200)
        blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-           "obj_description(loid, 'pg_largeobject') "
+           "obj_description(loid, 'pg_largeobject'), NULL, NULL "
            "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
    else if (AH->remoteVersion >= 70100)
        blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-           "obj_description(loid) "
+           "obj_description(loid), NULL, NULL "
            "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
    else
        blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
@@ -2087,7 +2094,7 @@ dumpBlobComments(Archive *AH, void *arg)
            "       SELECT description "
            "       FROM pg_description pd "
            "       WHERE pd.objoid=pc.oid "
-           "   ) "
+           "   ), NULL, NULL "
            "FROM pg_class pc WHERE relkind = 'l'";
 
    res = PQexec(g_conn, blobQry);
@@ -2107,22 +2114,51 @@ dumpBlobComments(Archive *AH, void *arg)
        /* Process the tuples, if any */
        for (i = 0; i < PQntuples(res); i++)
        {
-           Oid         blobOid;
-           char       *comment;
+           Oid         blobOid = atooid(PQgetvalue(res, i, 0));
+           char       *lo_comment = PQgetvalue(res, i, 1);
+           char       *lo_owner = PQgetvalue(res, i, 2);
+           char       *lo_acl = PQgetvalue(res, i, 3);
+           char        lo_name[32];
 
-           /* ignore blobs without comments */
-           if (PQgetisnull(res, i, 1))
-               continue;
+           resetPQExpBuffer(cmdQry);
 
-           blobOid = atooid(PQgetvalue(res, i, 0));
-           comment = PQgetvalue(res, i, 1);
+           /* comment on the blob */
+           if (!PQgetisnull(res, i, 1))
+           {
+               appendPQExpBuffer(cmdQry,
+                                 "COMMENT ON LARGE OBJECT %u IS ", blobOid);
+               appendStringLiteralAH(cmdQry, lo_comment, AH);
+               appendPQExpBuffer(cmdQry, ";\n");
+           }
+
+           /* dump blob ownership, if necessary */
+           if (!PQgetisnull(res, i, 2))
+           {
+               appendPQExpBuffer(cmdQry,
+                                 "ALTER LARGE OBJECT %u OWNER TO %s;\n",
+                                 blobOid, lo_owner);
+           }
 
-           printfPQExpBuffer(commentcmd, "COMMENT ON LARGE OBJECT %u IS ",
-                             blobOid);
-           appendStringLiteralAH(commentcmd, comment, AH);
-           appendPQExpBuffer(commentcmd, ";\n");
+           /* dump blob privileges, if necessary */
+           if (!PQgetisnull(res, i, 3) &&
+               !dataOnly && !aclsSkip)
+           {
+               snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
+               if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
+                                     lo_acl, lo_owner, "",
+                                     AH->remoteVersion, cmdQry))
+               {
+                   write_msg(NULL, "could not parse ACL (%s) for "
+                             "large object %u", lo_acl, blobOid);
+                   exit_nicely();
+               }
+           }
 
-           archputs(commentcmd->data, AH);
+           if (cmdQry->len > 0)
+           {
+               appendPQExpBuffer(cmdQry, "\n");
+               archputs(cmdQry->data, AH);
+           }
        }
    } while (PQntuples(res) > 0);
 
@@ -2130,7 +2166,7 @@ dumpBlobComments(Archive *AH, void *arg)
 
    archputs("\n", AH);
 
-   destroyPQExpBuffer(commentcmd);
+   destroyPQExpBuffer(cmdQry);
 
    return 1;
 }
index e4da1d3e9620378c0a7989e73a76f41209471da4..c77077f9870e6386795468a56758a15c9bb39ebc 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Copyright (c) 2000-2009, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/bin/psql/large_obj.c,v 1.52 2009/01/01 17:23:55 momjian Exp $
+ * $PostgreSQL: pgsql/src/bin/psql/large_obj.c,v 1.53 2009/12/11 03:34:56 itagaki Exp $
  */
 #include "postgres_fe.h"
 #include "large_obj.h"
@@ -278,13 +278,28 @@ do_lo_list(void)
    char        buf[1024];
    printQueryOpt myopt = pset.popt;
 
-   snprintf(buf, sizeof(buf),
-            "SELECT loid as \"%s\",\n"
-          "  pg_catalog.obj_description(loid, 'pg_largeobject') as \"%s\"\n"
-            "FROM (SELECT DISTINCT loid FROM pg_catalog.pg_largeobject) x\n"
-            "ORDER BY 1",
-            gettext_noop("ID"),
-            gettext_noop("Description"));
+   if (pset.sversion >= 80500)
+   {
+       snprintf(buf, sizeof(buf),
+                "SELECT oid as \"%s\",\n"
+                "  pg_catalog.pg_get_userbyid(lomowner) as \"%s\",\n"
+                "  pg_catalog.obj_description(oid, 'pg_largeobject') as \"%s\"\n"
+                "  FROM pg_catalog.pg_largeobject_metadata "
+                "  ORDER BY oid",
+                gettext_noop("ID"),
+                gettext_noop("Owner"),
+                gettext_noop("Description"));
+   }
+   else
+   {
+       snprintf(buf, sizeof(buf),
+                "SELECT loid as \"%s\",\n"
+                "  pg_catalog.obj_description(loid, 'pg_largeobject') as \"%s\"\n"
+                "FROM (SELECT DISTINCT loid FROM pg_catalog.pg_largeobject) x\n"
+                "ORDER BY 1",
+                gettext_noop("ID"),
+                gettext_noop("Description"));
+   }
 
    res = PSQLexec(buf, false);
    if (!res)
index 7cd07574151f6841d0ce60c37698dee1f54b462e..10734fe00c43230ce074679be4f0518e6a4570d4 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Copyright (c) 2000-2009, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.187 2009/10/13 21:04:01 tgl Exp $
+ * $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.188 2009/12/11 03:34:56 itagaki Exp $
  */
 
 /*----------------------------------------------------------------------
@@ -691,7 +691,7 @@ psql_completion(char *text, int start, int end)
    {
        static const char *const list_ALTER[] =
        {"AGGREGATE", "CONVERSION", "DATABASE", "DOMAIN", "FOREIGN DATA WRAPPER", "FUNCTION",
-           "GROUP", "INDEX", "LANGUAGE", "OPERATOR", "ROLE", "SCHEMA", "SERVER", "SEQUENCE", "TABLE",
+           "GROUP", "INDEX", "LANGUAGE", "LARGE OBJECT", "OPERATOR", "ROLE", "SCHEMA", "SERVER", "SEQUENCE", "TABLE",
        "TABLESPACE", "TEXT SEARCH", "TRIGGER", "TYPE", "USER", "USER MAPPING FOR", "VIEW", NULL};
 
        COMPLETE_WITH_LIST(list_ALTER);
@@ -760,6 +760,17 @@ psql_completion(char *text, int start, int end)
        COMPLETE_WITH_LIST(list_ALTERLANGUAGE);
    }
 
+   /* ALTER LARGE OBJECT  */
+   else if (pg_strcasecmp(prev4_wd, "ALTER") == 0 &&
+            pg_strcasecmp(prev3_wd, "LARGE") == 0 &&
+            pg_strcasecmp(prev2_wd, "OBJECT") == 0)
+   {
+       static const char *const list_ALTERLARGEOBJECT[] =
+       {"OWNER TO", NULL};
+
+       COMPLETE_WITH_LIST(list_ALTERLARGEOBJECT);
+   }
+
    /* ALTER USER,ROLE  */
    else if (pg_strcasecmp(prev3_wd, "ALTER") == 0 &&
             !(pg_strcasecmp(prev2_wd, "USER") == 0 && pg_strcasecmp(prev_wd, "MAPPING") == 0) &&
@@ -1732,6 +1743,7 @@ psql_completion(char *text, int start, int end)
                                   " UNION SELECT 'FOREIGN SERVER'"
                                   " UNION SELECT 'FUNCTION'"
                                   " UNION SELECT 'LANGUAGE'"
+                                  " UNION SELECT 'LARGE OBJECT'"
                                   " UNION SELECT 'SCHEMA'"
                                   " UNION SELECT 'TABLESPACE'");
 
index 9b935720972924562d848d431be9c9e955234627..3fe284addbac271134734afdebe921b6836b93b9 100644 (file)
@@ -37,7 +37,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.556 2009/12/07 05:22:23 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.557 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 200912071
+#define CATALOG_VERSION_NO 200912111
 
 #endif
index 5134479c3d03988dadbc7235f23eda0048988cf2..ec65f9cd2104941d868095f9451bcb528c65d6fd 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/dependency.h,v 1.42 2009/10/07 22:14:24 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/dependency.h,v 1.43 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -128,6 +128,7 @@ typedef enum ObjectClass
    OCLASS_CONVERSION,          /* pg_conversion */
    OCLASS_DEFAULT,             /* pg_attrdef */
    OCLASS_LANGUAGE,            /* pg_language */
+   OCLASS_LARGEOBJECT,         /* pg_largeobject */
    OCLASS_OPERATOR,            /* pg_operator */
    OCLASS_OPCLASS,             /* pg_opclass */
    OCLASS_OPFAMILY,            /* pg_opfamily */
index 4f9f9e9c2acfe36941f8d32cbf223c99f5156d32..3bf606c0985111bdadcee84cb94102e6201bc538 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/indexing.h,v 1.110 2009/10/07 22:14:25 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/indexing.h,v 1.111 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -165,6 +165,9 @@ DECLARE_UNIQUE_INDEX(pg_language_oid_index, 2682, on pg_language using btree(oid
 DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index, 2683, on pg_largeobject using btree(loid oid_ops, pageno int4_ops));
 #define LargeObjectLOidPNIndexId  2683
 
+DECLARE_UNIQUE_INDEX(pg_largeobject_metadata_oid_index, 2996, on pg_largeobject_metadata using btree(oid oid_ops));
+#define LargeObjectMetadataOidIndexId  2996
+
 DECLARE_UNIQUE_INDEX(pg_namespace_nspname_index, 2684, on pg_namespace using btree(nspname name_ops));
 #define NamespaceNameIndexId  2684
 DECLARE_UNIQUE_INDEX(pg_namespace_oid_index, 2685, on pg_namespace using btree(oid oid_ops));
index e907cc64192605eca8c81df3cbeb52860201e862..3c117225c118b164ac446e5060d1f8fa2d736d5e 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_largeobject.h,v 1.24 2009/01/01 17:23:57 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_largeobject.h,v 1.25 2009/12/11 03:34:56 itagaki Exp $
  *
  * NOTES
  *   the genbki.sh script reads this file and generates .bki
@@ -51,8 +51,9 @@ typedef FormData_pg_largeobject *Form_pg_largeobject;
 #define Anum_pg_largeobject_pageno     2
 #define Anum_pg_largeobject_data       3
 
-extern void LargeObjectCreate(Oid loid);
+extern Oid  LargeObjectCreate(Oid loid);
 extern void LargeObjectDrop(Oid loid);
+extern void LargeObjectAlterOwner(Oid loid, Oid newOwnerId);
 extern bool LargeObjectExists(Oid loid);
 
 #endif   /* PG_LARGEOBJECT_H */
diff --git a/src/include/catalog/pg_largeobject_metadata.h b/src/include/catalog/pg_largeobject_metadata.h
new file mode 100755 (executable)
index 0000000..8f4ac67
--- /dev/null
@@ -0,0 +1,52 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject_metadata.h
+ *   definition of the system "largeobject_metadata" relation (pg_largeobject_metadata)
+ *   along with the relation's initial contents.
+ *
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL: pgsql/src/include/catalog/pg_largeobject_metadata.h,v 1.1 2009/12/11 03:34:56 itagaki Exp $
+ *
+ * NOTES
+ *   the genbki.sh script reads this file and generates .bki
+ *   information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LARGEOBJECT_METADATA_H
+#define PG_LARGEOBJECT_METADATA_H
+
+#include "catalog/genbki.h"
+
+/* ----------------
+ *     pg_largeobject_metadata definition. cpp turns this into
+ *     typedef struct FormData_pg_largeobject_metadata
+ * ----------------
+ */
+#define LargeObjectMetadataRelationId  2995
+
+CATALOG(pg_largeobject_metadata,2995)
+{
+   Oid         lomowner;       /* OID of the largeobject owner */
+   aclitem     lomacl[1];      /* access permissions */
+} FormData_pg_largeobject_metadata;
+
+/* ----------------
+ *     Form_pg_largeobject_metadata corresponds to a pointer to a tuple
+ *     with the format of pg_largeobject_metadata relation.
+ * ----------------
+ */
+typedef FormData_pg_largeobject_metadata *Form_pg_largeobject_metadata;
+
+/* ----------------
+ *     compiler constants for pg_largeobject_metadata
+ * ----------------
+ */
+#define Natts_pg_largeobject_metadata          2
+#define Anum_pg_largeobject_metadata_lomowner  1
+#define Anum_pg_largeobject_metadata_lomacl        2
+
+#endif   /* PG_LARGEOBJECT_METADATA_H */
index ff5e555824a08e408a12b196e079427dfff518e6..ebdcc0e863990b63e989c8d7d36d1e0943c9330e 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/libpq/be-fsstubs.h,v 1.32 2009/01/01 17:23:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/libpq/be-fsstubs.h,v 1.33 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -37,6 +37,11 @@ extern Datum lo_tell(PG_FUNCTION_ARGS);
 extern Datum lo_unlink(PG_FUNCTION_ARGS);
 extern Datum lo_truncate(PG_FUNCTION_ARGS);
 
+/*
+ * compatibility option for access control
+ */
+extern bool    lo_compat_privileges;
+
 /*
  * These are not fmgr-callable, but are available to C code.
  * Probably these should have had the underscore-free names,
index a791223e6fb48e892a57340dde46ac904708859b..351b1c971826a9212efe645a344fe2152ea3d669 100644 (file)
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.417 2009/12/07 05:22:23 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.418 2009/12/11 03:34:56 itagaki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1195,6 +1195,7 @@ typedef enum GrantObjectType
    ACL_OBJECT_FOREIGN_SERVER,  /* foreign server */
    ACL_OBJECT_FUNCTION,        /* function */
    ACL_OBJECT_LANGUAGE,        /* procedural language */
+   ACL_OBJECT_LARGEOBJECT,     /* largeobject */
    ACL_OBJECT_NAMESPACE,       /* namespace */
    ACL_OBJECT_TABLESPACE       /* tablespace */
 } GrantObjectType;
index 039d27b59c391cefab2448b2d60c8ad0f4959eee..aa6999676d37bf710a95e58fa6f023ee0140ec3d 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/acl.h,v 1.110 2009/12/05 21:43:36 petere Exp $
+ * $PostgreSQL: pgsql/src/include/utils/acl.h,v 1.111 2009/12/11 03:34:56 itagaki Exp $
  *
  * NOTES
  *   An ACL array is simply an array of AclItems, representing the union
@@ -26,6 +26,7 @@
 
 #include "nodes/parsenodes.h"
 #include "utils/array.h"
+#include "utils/snapshot.h"
 
 
 /*
@@ -151,6 +152,7 @@ typedef ArrayType Acl;
 #define ACL_ALL_RIGHTS_FOREIGN_SERVER (ACL_USAGE)
 #define ACL_ALL_RIGHTS_FUNCTION        (ACL_EXECUTE)
 #define ACL_ALL_RIGHTS_LANGUAGE        (ACL_USAGE)
+#define ACL_ALL_RIGHTS_LARGEOBJECT (ACL_SELECT|ACL_UPDATE)
 #define ACL_ALL_RIGHTS_NAMESPACE   (ACL_USAGE|ACL_CREATE)
 #define ACL_ALL_RIGHTS_TABLESPACE  (ACL_CREATE)
 
@@ -181,6 +183,7 @@ typedef enum AclObjectKind
    ACL_KIND_OPER,              /* pg_operator */
    ACL_KIND_TYPE,              /* pg_type */
    ACL_KIND_LANGUAGE,          /* pg_language */
+   ACL_KIND_LARGEOBJECT,       /* pg_largeobject */
    ACL_KIND_NAMESPACE,         /* pg_namespace */
    ACL_KIND_OPCLASS,           /* pg_opclass */
    ACL_KIND_OPFAMILY,          /* pg_opfamily */
@@ -259,6 +262,8 @@ extern AclMode pg_proc_aclmask(Oid proc_oid, Oid roleid,
                AclMode mask, AclMaskHow how);
 extern AclMode pg_language_aclmask(Oid lang_oid, Oid roleid,
                    AclMode mask, AclMaskHow how);
+extern AclMode pg_largeobject_aclmask_snapshot(Oid lobj_oid, Oid roleid,
+                   AclMode mask, AclMaskHow how, Snapshot snapshot);
 extern AclMode pg_namespace_aclmask(Oid nsp_oid, Oid roleid,
                     AclMode mask, AclMaskHow how);
 extern AclMode pg_tablespace_aclmask(Oid spc_oid, Oid roleid,
@@ -276,6 +281,8 @@ extern AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode);
 extern AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode);
 extern AclResult pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode);
 extern AclResult pg_language_aclcheck(Oid lang_oid, Oid roleid, AclMode mode);
+extern AclResult pg_largeobject_aclcheck_snapshot(Oid lang_oid, Oid roleid,
+                                                 AclMode mode, Snapshot snapshot);
 extern AclResult pg_namespace_aclcheck(Oid nsp_oid, Oid roleid, AclMode mode);
 extern AclResult pg_tablespace_aclcheck(Oid spc_oid, Oid roleid, AclMode mode);
 extern AclResult pg_foreign_data_wrapper_aclcheck(Oid fdw_oid, Oid roleid, AclMode mode);
@@ -293,6 +300,7 @@ extern bool pg_type_ownercheck(Oid type_oid, Oid roleid);
 extern bool pg_oper_ownercheck(Oid oper_oid, Oid roleid);
 extern bool pg_proc_ownercheck(Oid proc_oid, Oid roleid);
 extern bool pg_language_ownercheck(Oid lan_oid, Oid roleid);
+extern bool pg_largeobject_ownercheck(Oid lobj_oid, Oid roleid);
 extern bool pg_namespace_ownercheck(Oid nsp_oid, Oid roleid);
 extern bool pg_tablespace_ownercheck(Oid spc_oid, Oid roleid);
 extern bool pg_opclass_ownercheck(Oid opc_oid, Oid roleid);
index 4decb2b07af0c8fa76c7d440e5a43888632892cb..4160cba47dafba97cc4eafd4401f10f1e3a066dc 100644 (file)
@@ -11,6 +11,12 @@ DROP ROLE IF EXISTS regressuser2;
 DROP ROLE IF EXISTS regressuser3;
 DROP ROLE IF EXISTS regressuser4;
 DROP ROLE IF EXISTS regressuser5;
+DROP ROLE IF EXISTS regressuser6;
+SELECT lo_unlink(oid) FROM pg_largeobject_metadata;
+ lo_unlink 
+-----------
+(0 rows)
+
 RESET client_min_messages;
 -- test proper begins here
 CREATE USER regressuser1;
@@ -847,6 +853,194 @@ SELECT has_sequence_privilege('x_seq', 'USAGE');
  t
 (1 row)
 
+-- largeobject privilege tests
+\c -
+SET SESSION AUTHORIZATION regressuser1;
+SELECT lo_create(1001);
+ lo_create 
+-----------
+      1001
+(1 row)
+
+SELECT lo_create(1002);
+ lo_create 
+-----------
+      1002
+(1 row)
+
+SELECT lo_create(1003);
+ lo_create 
+-----------
+      1003
+(1 row)
+
+SELECT lo_create(1004);
+ lo_create 
+-----------
+      1004
+(1 row)
+
+SELECT lo_create(1005);
+ lo_create 
+-----------
+      1005
+(1 row)
+
+GRANT ALL ON LARGE OBJECT 1001 TO PUBLIC;
+GRANT SELECT ON LARGE OBJECT 1003 TO regressuser2;
+GRANT SELECT,UPDATE ON LARGE OBJECT 1004 TO regressuser2;
+GRANT ALL ON LARGE OBJECT 1005 TO regressuser2;
+GRANT SELECT ON LARGE OBJECT 1005 TO regressuser2 WITH GRANT OPTION;
+GRANT SELECT, INSERT ON LARGE OBJECT 1001 TO PUBLIC;   -- to be failed
+ERROR:  invalid privilege type INSERT for large object
+GRANT SELECT, UPDATE ON LARGE OBJECT 1001 TO nosuchuser;   -- to be failed
+ERROR:  role "nosuchuser" does not exist
+GRANT SELECT, UPDATE ON LARGE OBJECT  999 TO PUBLIC;   -- to be failed
+ERROR:  large object 999 does not exist
+\c -
+SET SESSION AUTHORIZATION regressuser2;
+SELECT lo_create(2001);
+ lo_create 
+-----------
+      2001
+(1 row)
+
+SELECT lo_create(2002);
+ lo_create 
+-----------
+      2002
+(1 row)
+
+SELECT loread(lo_open(1001, x'40000'::int), 32);
+ loread 
+--------
+ \x
+(1 row)
+
+SELECT loread(lo_open(1002, x'40000'::int), 32);   -- to be denied
+ERROR:  permission denied for large object 1002
+SELECT loread(lo_open(1003, x'40000'::int), 32);
+ loread 
+--------
+ \x
+(1 row)
+
+SELECT loread(lo_open(1004, x'40000'::int), 32);
+ loread 
+--------
+ \x
+(1 row)
+
+SELECT lowrite(lo_open(1001, x'20000'::int), 'abcd');
+ lowrite 
+---------
+       4
+(1 row)
+
+SELECT lowrite(lo_open(1002, x'20000'::int), 'abcd');  -- to be denied
+ERROR:  permission denied for large object 1002
+SELECT lowrite(lo_open(1003, x'20000'::int), 'abcd');  -- to be denied
+ERROR:  permission denied for large object 1003
+SELECT lowrite(lo_open(1004, x'20000'::int), 'abcd');
+ lowrite 
+---------
+       4
+(1 row)
+
+GRANT SELECT ON LARGE OBJECT 1005 TO regressuser3;
+GRANT UPDATE ON LARGE OBJECT 1006 TO regressuser3; -- to be denied
+ERROR:  large object 1006 does not exist
+REVOKE ALL ON LARGE OBJECT 2001, 2002 FROM PUBLIC;
+GRANT ALL ON LARGE OBJECT 2001 TO regressuser3;
+SELECT lo_unlink(1001);        -- to be denied
+ERROR:  must be owner of large object 1001
+SELECT lo_unlink(2002);
+ lo_unlink 
+-----------
+         1
+(1 row)
+
+\c -
+-- confirm ACL setting
+SELECT oid, pg_get_userbyid(lomowner) ownername, lomacl FROM pg_largeobject_metadata;
+ oid  |  ownername   |                                          lomacl                                          
+------+--------------+------------------------------------------------------------------------------------------
+ 1002 | regressuser1 | 
+ 1001 | regressuser1 | {regressuser1=rw/regressuser1,=rw/regressuser1}
+ 1003 | regressuser1 | {regressuser1=rw/regressuser1,regressuser2=r/regressuser1}
+ 1004 | regressuser1 | {regressuser1=rw/regressuser1,regressuser2=rw/regressuser1}
+ 1005 | regressuser1 | {regressuser1=rw/regressuser1,regressuser2=r*w/regressuser1,regressuser3=r/regressuser2}
+ 2001 | regressuser2 | {regressuser2=rw/regressuser2,regressuser3=rw/regressuser2}
+(6 rows)
+
+SET SESSION AUTHORIZATION regressuser3;
+SELECT loread(lo_open(1001, x'40000'::int), 32);
+   loread   
+------------
+ \x61626364
+(1 row)
+
+SELECT loread(lo_open(1003, x'40000'::int), 32);   -- to be denied
+ERROR:  permission denied for large object 1003
+SELECT loread(lo_open(1005, x'40000'::int), 32);
+ loread 
+--------
+ \x
+(1 row)
+
+SELECT lo_truncate(lo_open(1005, x'20000'::int), 10);  -- to be denied
+ERROR:  permission denied for large object 1005
+SELECT lo_truncate(lo_open(2001, x'20000'::int), 10);
+ lo_truncate 
+-------------
+           0
+(1 row)
+
+-- compatibility mode in largeobject permission
+\c -
+SET lo_compat_privileges = false;  -- default setting
+SET SESSION AUTHORIZATION regressuser4;
+SELECT loread(lo_open(1002, x'40000'::int), 32);   -- to be denied
+ERROR:  permission denied for large object 1002
+SELECT lowrite(lo_open(1002, x'20000'::int), 'abcd');  -- to be denied
+ERROR:  permission denied for large object 1002
+SELECT lo_truncate(lo_open(1002, x'20000'::int), 10);  -- to be denied
+ERROR:  permission denied for large object 1002
+SELECT lo_unlink(1002);                    -- to be denied
+ERROR:  must be owner of large object 1002
+SELECT lo_export(1001, '/dev/null');           -- to be denied
+ERROR:  must be superuser to use server-side lo_export()
+HINT:  Anyone can use the client-side lo_export() provided by libpq.
+\c -
+SET lo_compat_privileges = true;   -- compatibility mode
+SET SESSION AUTHORIZATION regressuser4;
+SELECT loread(lo_open(1002, x'40000'::int), 32);
+ loread 
+--------
+ \x
+(1 row)
+
+SELECT lowrite(lo_open(1002, x'20000'::int), 'abcd');
+ lowrite 
+---------
+       4
+(1 row)
+
+SELECT lo_truncate(lo_open(1002, x'20000'::int), 10);
+ lo_truncate 
+-------------
+           0
+(1 row)
+
+SELECT lo_unlink(1002);
+ lo_unlink 
+-----------
+         1
+(1 row)
+
+SELECT lo_export(1001, '/dev/null');           -- to be denied
+ERROR:  must be superuser to use server-side lo_export()
+HINT:  Anyone can use the client-side lo_export() provided by libpq.
 -- test default ACLs
 \c -
 CREATE SCHEMA testns;
@@ -1034,6 +1228,16 @@ DROP TABLE atest6;
 DROP TABLE atestc;
 DROP TABLE atestp1;
 DROP TABLE atestp2;
+SELECT lo_unlink(oid) FROM pg_largeobject_metadata;
+ lo_unlink 
+-----------
+         1
+         1
+         1
+         1
+         1
+(5 rows)
+
 DROP GROUP regressgroup1;
 DROP GROUP regressgroup2;
 -- these are needed to clean up permissions
@@ -1044,3 +1248,5 @@ DROP USER regressuser2;
 DROP USER regressuser3;
 DROP USER regressuser4;
 DROP USER regressuser5;
+DROP USER regressuser6;
+ERROR:  role "regressuser6" does not exist
index fe0d93670f8f7df5a133abf766770dffacae3826..2a4dc4755d74fbf70ce76d1fe482c219cae0e50a 100644 (file)
@@ -106,6 +106,7 @@ SELECT relname, relhasindex
  pg_inherits             | t
  pg_language             | t
  pg_largeobject          | t
+ pg_largeobject_metadata | t
  pg_listener             | f
  pg_namespace            | t
  pg_opclass              | t
@@ -153,7 +154,7 @@ SELECT relname, relhasindex
  timetz_tbl              | f
  tinterval_tbl           | f
  varchar_tbl             | f
-(142 rows)
+(143 rows)
 
 --
 -- another sanity check: every system catalog that has OIDs should have
index d4f728205f7980390dddf5f62f8d58b8df3845bc..8e8ff70608f6085a26d654a8a714a93c95ea3d0f 100644 (file)
@@ -15,6 +15,9 @@ DROP ROLE IF EXISTS regressuser2;
 DROP ROLE IF EXISTS regressuser3;
 DROP ROLE IF EXISTS regressuser4;
 DROP ROLE IF EXISTS regressuser5;
+DROP ROLE IF EXISTS regressuser6;
+
+SELECT lo_unlink(oid) FROM pg_largeobject_metadata;
 
 RESET client_min_messages;
 
@@ -36,7 +39,6 @@ ALTER GROUP regressgroup2 ADD USER regressuser2;  -- duplicate
 ALTER GROUP regressgroup2 DROP USER regressuser2;
 ALTER GROUP regressgroup2 ADD USER regressuser4;
 
-
 -- test owner privileges
 
 SET SESSION AUTHORIZATION regressuser1;
@@ -485,6 +487,83 @@ SET SESSION AUTHORIZATION regressuser2;
 
 SELECT has_sequence_privilege('x_seq', 'USAGE');
 
+-- largeobject privilege tests
+\c -
+SET SESSION AUTHORIZATION regressuser1;
+
+SELECT lo_create(1001);
+SELECT lo_create(1002);
+SELECT lo_create(1003);
+SELECT lo_create(1004);
+SELECT lo_create(1005);
+
+GRANT ALL ON LARGE OBJECT 1001 TO PUBLIC;
+GRANT SELECT ON LARGE OBJECT 1003 TO regressuser2;
+GRANT SELECT,UPDATE ON LARGE OBJECT 1004 TO regressuser2;
+GRANT ALL ON LARGE OBJECT 1005 TO regressuser2;
+GRANT SELECT ON LARGE OBJECT 1005 TO regressuser2 WITH GRANT OPTION;
+
+GRANT SELECT, INSERT ON LARGE OBJECT 1001 TO PUBLIC;   -- to be failed
+GRANT SELECT, UPDATE ON LARGE OBJECT 1001 TO nosuchuser;   -- to be failed
+GRANT SELECT, UPDATE ON LARGE OBJECT  999 TO PUBLIC;   -- to be failed
+
+\c -
+SET SESSION AUTHORIZATION regressuser2;
+
+SELECT lo_create(2001);
+SELECT lo_create(2002);
+
+SELECT loread(lo_open(1001, x'40000'::int), 32);
+SELECT loread(lo_open(1002, x'40000'::int), 32);   -- to be denied
+SELECT loread(lo_open(1003, x'40000'::int), 32);
+SELECT loread(lo_open(1004, x'40000'::int), 32);
+
+SELECT lowrite(lo_open(1001, x'20000'::int), 'abcd');
+SELECT lowrite(lo_open(1002, x'20000'::int), 'abcd');  -- to be denied
+SELECT lowrite(lo_open(1003, x'20000'::int), 'abcd');  -- to be denied
+SELECT lowrite(lo_open(1004, x'20000'::int), 'abcd');
+
+GRANT SELECT ON LARGE OBJECT 1005 TO regressuser3;
+GRANT UPDATE ON LARGE OBJECT 1006 TO regressuser3; -- to be denied
+REVOKE ALL ON LARGE OBJECT 2001, 2002 FROM PUBLIC;
+GRANT ALL ON LARGE OBJECT 2001 TO regressuser3;
+
+SELECT lo_unlink(1001);        -- to be denied
+SELECT lo_unlink(2002);
+
+\c -
+-- confirm ACL setting
+SELECT oid, pg_get_userbyid(lomowner) ownername, lomacl FROM pg_largeobject_metadata;
+
+SET SESSION AUTHORIZATION regressuser3;
+
+SELECT loread(lo_open(1001, x'40000'::int), 32);
+SELECT loread(lo_open(1003, x'40000'::int), 32);   -- to be denied
+SELECT loread(lo_open(1005, x'40000'::int), 32);
+
+SELECT lo_truncate(lo_open(1005, x'20000'::int), 10);  -- to be denied
+SELECT lo_truncate(lo_open(2001, x'20000'::int), 10);
+
+-- compatibility mode in largeobject permission
+\c -
+SET lo_compat_privileges = false;  -- default setting
+SET SESSION AUTHORIZATION regressuser4;
+
+SELECT loread(lo_open(1002, x'40000'::int), 32);   -- to be denied
+SELECT lowrite(lo_open(1002, x'20000'::int), 'abcd');  -- to be denied
+SELECT lo_truncate(lo_open(1002, x'20000'::int), 10);  -- to be denied
+SELECT lo_unlink(1002);                    -- to be denied
+SELECT lo_export(1001, '/dev/null');           -- to be denied
+
+\c -
+SET lo_compat_privileges = true;   -- compatibility mode
+SET SESSION AUTHORIZATION regressuser4;
+
+SELECT loread(lo_open(1002, x'40000'::int), 32);
+SELECT lowrite(lo_open(1002, x'20000'::int), 'abcd');
+SELECT lo_truncate(lo_open(1002, x'20000'::int), 10);
+SELECT lo_unlink(1002);
+SELECT lo_export(1001, '/dev/null');           -- to be denied
 
 -- test default ACLs
 \c -
@@ -611,6 +690,8 @@ DROP TABLE atestc;
 DROP TABLE atestp1;
 DROP TABLE atestp2;
 
+SELECT lo_unlink(oid) FROM pg_largeobject_metadata;
+
 DROP GROUP regressgroup1;
 DROP GROUP regressgroup2;
 
@@ -623,3 +704,4 @@ DROP USER regressuser2;
 DROP USER regressuser3;
 DROP USER regressuser4;
 DROP USER regressuser5;
+DROP USER regressuser6;