Make view/rule permission checking behave properly with
authorTom Lane
Sun, 9 Jul 2000 04:56:32 +0000 (04:56 +0000)
committerTom Lane
Sun, 9 Jul 2000 04:56:32 +0000 (04:56 +0000)
subqueries in the rule.

src/backend/rewrite/locks.c

index 57cce96d901fad10d30e114433d1924fc0c77424..5054b215438ebd79ef28fedaa4c891cf5f20e685 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/rewrite/Attic/locks.c,v 1.29 2000/05/30 00:49:51 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/rewrite/Attic/locks.c,v 1.30 2000/07/09 04:56:32 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -17,9 +17,9 @@
 #include "catalog/pg_shadow.h"
 #include "optimizer/clauses.h"
 #include "rewrite/locks.h"
+#include "parser/parsetree.h"
 #include "utils/acl.h"
 #include "utils/syscache.h"
-#include "utils/syscache.h"
 
 
 /*
@@ -152,39 +152,148 @@ matchLocks(CmdType event,
 }
 
 
+/*
+ * Check the access permissions of tables that are referred to by a rule.
+ * We want to check the access permissions using the userid of the rule's
+ * owner, *not* of the current user (the one accessing the rule).  So, we
+ * do the permission check here and set skipAcl = TRUE in each of the rule's
+ * RTEs, to prevent the executor from running another check with the current
+ * user's ID.
+ *
+ * XXX This routine is called before the rule's query tree has been copied
+ * out of the relcache entry where it is kept.  Therefore, when we set
+ * skipAcl = TRUE, we are destructively modifying the relcache entry for
+ * the event relation!  This seems fairly harmless because the relcache
+ * querytree is only used as a source for the rewriter, but it's a tad
+ * unclean anyway.
+ *
+ * Note that we must check permissions every time, even if skipAcl was
+ * already set TRUE by a prior call.  This ensures that we enforce the
+ * current permission settings for each referenced table, even if they
+ * have changed since the relcache entry was loaded.
+ */
+
+typedef struct
+{
+   char       *evowner;
+} checkLockPerms_context;
+
+static bool
+checkLockPerms_walker(Node *node,
+                     checkLockPerms_context *context)
+{
+   if (node == NULL)
+       return false;
+   if (IsA(node, SubLink))
+   {
+       /*
+        * Standard expression_tree_walker will not recurse into
+        * subselect, but here we must do so.
+        */
+       SubLink    *sub = (SubLink *) node;
+
+       if (checkLockPerms_walker((Node *) (sub->lefthand), context))
+           return true;
+       if (checkLockPerms_walker((Node *) (sub->subselect), context))
+           return true;
+       return false;
+   }
+   if (IsA(node, Query))
+   {
+       /* Reach here after recursing down into subselect above... */
+       Query      *qry = (Query *) node;
+       int         rtablength = length(qry->rtable);
+       int         i;
+
+       /* Check all the RTEs in this query node, except OLD and NEW */
+       for (i = 1; i <= rtablength; i++)
+       {
+           RangeTblEntry *rte = rt_fetch(i, qry->rtable);
+           int32       reqperm;
+           int32       aclcheck_res;
+
+           if (rte->ref != NULL)
+           {
+               if (strcmp(rte->ref->relname, "*NEW*") == 0)
+                   continue;
+               if (strcmp(rte->ref->relname, "*OLD*") == 0)
+                   continue;
+           }
+
+           if (i == qry->resultRelation)
+               switch (qry->commandType)
+               {
+                   case CMD_INSERT:
+                       reqperm = ACL_AP;
+                       break;
+                   default:
+                       reqperm = ACL_WR;
+                       break;
+               }
+           else
+               reqperm = ACL_RD;
+
+           aclcheck_res = pg_aclcheck(rte->relname,
+                                      context->evowner,
+                                      reqperm);
+           if (aclcheck_res != ACLCHECK_OK)
+               elog(ERROR, "%s: %s",
+                    rte->relname,
+                    aclcheck_error_strings[aclcheck_res]);
+
+           /*
+            * Mark RTE to prevent executor from checking again with the
+            * current user's ID...
+            */
+           rte->skipAcl = true;
+       }
+
+       /* If there are sublinks, search for them and check their RTEs */
+       if (qry->hasSubLinks)
+       {
+           if (checkLockPerms_walker((Node *) (qry->targetList), context))
+               return true;
+           if (checkLockPerms_walker((Node *) (qry->qual), context))
+               return true;
+           if (checkLockPerms_walker((Node *) (qry->havingQual), context))
+               return true;
+       }
+       return false;
+   }
+   return expression_tree_walker(node, checkLockPerms_walker,
+                                 (void *) context);
+}
+
 void
 checkLockPerms(List *locks, Query *parsetree, int rt_index)
 {
+   RangeTblEntry *rte;
    Relation    ev_rel;
    HeapTuple   usertup;
-   char       *evowner;
-   RangeTblEntry *rte;
-   int32       reqperm;
-   int32       aclcheck_res;
-   int         i;
+   Form_pg_shadow userform;
+   checkLockPerms_context context;
    List       *l;
 
    if (locks == NIL)
-       return;
+       return;                 /* nothing to check */
 
    /*
-    * Get the usename of the rules event relation owner
+    * Get the usename of the rule's event relation owner
     */
-   rte = (RangeTblEntry *) nth(rt_index - 1, parsetree->rtable);
+   rte = rt_fetch(rt_index, parsetree->rtable);
    ev_rel = heap_openr(rte->relname, AccessShareLock);
    usertup = SearchSysCacheTuple(SHADOWSYSID,
                              ObjectIdGetDatum(ev_rel->rd_rel->relowner),
                                  0, 0, 0);
    if (!HeapTupleIsValid(usertup))
-   {
        elog(ERROR, "cache lookup for userid %d failed",
             ev_rel->rd_rel->relowner);
-   }
+   userform = (Form_pg_shadow) GETSTRUCT(usertup);
+   context.evowner = pstrdup(NameStr(userform->usename));
    heap_close(ev_rel, AccessShareLock);
-   evowner = pstrdup(NameStr(((Form_pg_shadow) GETSTRUCT(usertup))->usename));
 
    /*
-    * Check all the locks, that should get fired on this query
+    * Check all the locks that should get fired on this query
     */
    foreach(l, locks)
    {
@@ -192,53 +301,14 @@ checkLockPerms(List *locks, Query *parsetree, int rt_index)
        List       *action;
 
        /*
-        * In each lock check every action
+        * In each lock check every action.  We must scan the action
+        * recursively in case there are any sub-queries within it.
         */
        foreach(action, onelock->actions)
        {
            Query      *query = (Query *) lfirst(action);
 
-           /*
-            * In each action check every rangetable entry for read/write
-            * permission of the event relations owner depending on if
-            * it's the result relation (write) or not (read)
-            */
-           for (i = 2; i < length(query->rtable); i++)
-           {
-               if (i + 1 == query->resultRelation)
-                   switch (query->resultRelation)
-                   {
-                       case CMD_INSERT:
-                           reqperm = ACL_AP;
-                           break;
-                       default:
-                           reqperm = ACL_WR;
-                           break;
-                   }
-               else
-                   reqperm = ACL_RD;
-
-               rte = (RangeTblEntry *) nth(i, query->rtable);
-               aclcheck_res = pg_aclcheck(rte->relname,
-                                          evowner, reqperm);
-               if (aclcheck_res != ACLCHECK_OK)
-               {
-                   elog(ERROR, "%s: %s",
-                        rte->relname,
-                        aclcheck_error_strings[aclcheck_res]);
-               }
-
-               /*
-                * So this is allowed due to the permissions of the rules
-                * event relation owner. But let's see if the next one too
-                */
-               rte->skipAcl = TRUE;
-           }
+           checkLockPerms_walker((Node *) query, &context);
        }
    }
-
-   /*
-    * Phew, that was close
-    */
-   return;
 }