Implement partial-key searching of syscaches, per recent suggestion
authorTom Lane
Sat, 6 Apr 2002 06:59:25 +0000 (06:59 +0000)
committerTom Lane
Sat, 6 Apr 2002 06:59:25 +0000 (06:59 +0000)
to pghackers.  Use this to do searching for ambiguous functions ---
it will get more uses soon.

src/backend/catalog/namespace.c
src/backend/parser/parse_func.c
src/backend/utils/cache/catcache.c
src/backend/utils/cache/syscache.c
src/include/catalog/namespace.h
src/include/utils/catcache.h
src/include/utils/syscache.h

index fab1912b1b537f370378871441159836779bed60..a7d73bbf272240050e271d9d37508f2a44ab1541 100644 (file)
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/catalog/namespace.c,v 1.5 2002/04/01 03:34:25 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/catalog/namespace.c,v 1.6 2002/04/06 06:59:21 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -26,6 +26,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_shadow.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -33,6 +34,7 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
+#include "utils/catcache.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
@@ -301,6 +303,174 @@ TypenameGetTypid(const char *typname)
    return InvalidOid;
 }
 
+/*
+ * FuncnameGetCandidates
+ *     Given a possibly-qualified function name and argument count,
+ *     retrieve a list of the possible matches.
+ *
+ * We search a single namespace if the function name is qualified, else
+ * all namespaces in the search path.  The return list will never contain
+ * multiple entries with identical argument types --- in the multiple-
+ * namespace case, we arrange for entries in earlier namespaces to mask
+ * identical entries in later namespaces.
+ */
+FuncCandidateList
+FuncnameGetCandidates(List *names, int nargs)
+{
+   FuncCandidateList resultList = NULL;
+   char       *catalogname;
+   char       *schemaname = NULL;
+   char       *funcname = NULL;
+   Oid         namespaceId;
+   CatCList   *catlist;
+   int         i;
+
+   /* deconstruct the name list */
+   switch (length(names))
+   {
+       case 1:
+           funcname = strVal(lfirst(names));
+           break;
+       case 2:
+           schemaname = strVal(lfirst(names));
+           funcname = strVal(lsecond(names));
+           break;
+       case 3:
+           catalogname = strVal(lfirst(names));
+           schemaname = strVal(lsecond(names));
+           funcname = strVal(lfirst(lnext(lnext(names))));
+           /*
+            * We check the catalog name and then ignore it.
+            */
+           if (strcmp(catalogname, DatabaseName) != 0)
+               elog(ERROR, "Cross-database references are not implemented");
+           break;
+       default:
+           elog(ERROR, "Improper qualified name (too many dotted names)");
+           break;
+   }
+
+   if (schemaname)
+   {
+       /* use exact schema given */
+       namespaceId = GetSysCacheOid(NAMESPACENAME,
+                                    CStringGetDatum(schemaname),
+                                    0, 0, 0);
+       if (!OidIsValid(namespaceId))
+           elog(ERROR, "Namespace \"%s\" does not exist",
+                schemaname);
+   }
+   else
+   {
+       /* flag to indicate we need namespace search */
+       namespaceId = InvalidOid;
+   }
+
+   /* Search syscache by name and nargs only */
+   catlist = SearchSysCacheList(PROCNAME, 2,
+                                CStringGetDatum(funcname),
+                                Int16GetDatum(nargs),
+                                0, 0);
+
+   for (i = 0; i < catlist->n_members; i++)
+   {
+       HeapTuple   proctup = &catlist->members[i]->tuple;
+       Form_pg_proc procform = (Form_pg_proc) GETSTRUCT(proctup);
+       int         pathpos = 0;
+       FuncCandidateList newResult;
+
+       if (OidIsValid(namespaceId))
+       {
+           /* Consider only procs in specified namespace */
+           if (procform->pronamespace != namespaceId)
+               continue;
+           /* No need to check args, they must all be different */
+       }
+       else
+       {
+           /* Consider only procs that are in the search path */
+           if (pathContainsSystemNamespace ||
+               procform->pronamespace != PG_CATALOG_NAMESPACE)
+           {
+               List       *nsp;
+
+               foreach(nsp, namespaceSearchPath)
+               {
+                   pathpos++;
+                   if (procform->pronamespace == (Oid) lfirsti(nsp))
+                       break;
+               }
+               if (nsp == NIL)
+                   continue;   /* proc is not in search path */
+           }
+
+           /*
+            * Okay, it's in the search path, but does it have the same
+            * arguments as something we already accepted?  If so, keep
+            * only the one that appears earlier in the search path.
+            *
+            * If we have an ordered list from SearchSysCacheList (the
+            * normal case), then any conflicting proc must immediately
+            * adjoin this one in the list, so we only need to look at
+            * the newest result item.  If we have an unordered list,
+            * we have to scan the whole result list.
+            */
+           if (resultList)
+           {
+               FuncCandidateList   prevResult;
+
+               if (catlist->ordered)
+               {
+                   if (memcmp(procform->proargtypes, resultList->args,
+                              nargs * sizeof(Oid)) == 0)
+                       prevResult = resultList;
+                   else
+                       prevResult = NULL;
+               }
+               else
+               {
+                   for (prevResult = resultList;
+                        prevResult;
+                        prevResult = prevResult->next)
+                   {
+                       if (memcmp(procform->proargtypes, prevResult->args,
+                                  nargs * sizeof(Oid)) == 0)
+                           break;
+                   }
+               }
+               if (prevResult)
+               {
+                   /* We have a match with a previous result */
+                   Assert(pathpos != prevResult->pathpos);
+                   if (pathpos > prevResult->pathpos)
+                       continue; /* keep previous result */
+                   /* replace previous result */
+                   prevResult->pathpos = pathpos;
+                   prevResult->oid = proctup->t_data->t_oid;
+                   continue;   /* args are same, of course */
+               }
+           }
+       }
+
+       /*
+        * Okay to add it to result list
+        */
+       newResult = (FuncCandidateList)
+           palloc(sizeof(struct _FuncCandidateList) - sizeof(Oid)
+                  + nargs * sizeof(Oid));
+       newResult->pathpos = pathpos;
+       newResult->oid = proctup->t_data->t_oid;
+       memcpy(newResult->args, procform->proargtypes, nargs * sizeof(Oid));
+
+       newResult->next = resultList;
+       resultList = newResult;
+   }
+
+   ReleaseSysCacheList(catlist);
+
+   return resultList;
+}
+
 /*
  * QualifiedNameGetCreationNamespace
  *     Given a possibly-qualified name for an object (in List-of-Values
index f3c8712abaed640ecb92f6e315cbd2050833aa7c..578402fd255293543a98abaa3275fa5444498b2c 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/parser/parse_func.c,v 1.123 2002/04/05 00:31:27 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/parser/parse_func.c,v 1.124 2002/04/06 06:59:22 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -18,6 +18,7 @@
 #include "access/heapam.h"
 #include "catalog/catname.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
@@ -40,7 +41,6 @@ static Node *ParseComplexProjection(ParseState *pstate,
 static Oid **argtype_inherit(int nargs, Oid *argtypes);
 
 static int find_inheritors(Oid relid, Oid **supervec);
-static CandidateList func_get_candidates(char *funcname, int nargs);
 static Oid **gen_cross_product(InhPaths *arginh, int nargs);
 static void make_arguments(ParseState *pstate,
               int nargs,
@@ -48,14 +48,15 @@ static void make_arguments(ParseState *pstate,
               Oid *input_typeids,
               Oid *function_typeids);
 static int match_argtypes(int nargs,
-              Oid *input_typeids,
-              CandidateList function_typeids,
-              CandidateList *candidates);
+                         Oid *input_typeids,
+                         FuncCandidateList function_typeids,
+                         FuncCandidateList *candidates);
 static FieldSelect *setup_field_select(Node *input, char *attname, Oid relid);
-static Oid *func_select_candidate(int nargs, Oid *input_typeids,
-                     CandidateList candidates);
-static int agg_get_candidates(char *aggname, Oid typeId, CandidateList *candidates);
-static Oid agg_select_candidate(Oid typeid, CandidateList candidates);
+static FuncCandidateList func_select_candidate(int nargs, Oid *input_typeids,
+                                 FuncCandidateList candidates);
+static int agg_get_candidates(char *aggname, Oid typeId,
+                              FuncCandidateList *candidates);
+static Oid agg_select_candidate(Oid typeid, FuncCandidateList candidates);
 
 
 /*
@@ -170,7 +171,7 @@ ParseFuncOrColumn(ParseState *pstate, char *funcname, List *fargs,
    {
        Oid         basetype = exprType(lfirst(fargs));
        int         ncandidates;
-       CandidateList candidates;
+       FuncCandidateList candidates;
 
        /* try for exact match first... */
        if (SearchSysCacheExists(AGGNAME,
@@ -374,7 +375,7 @@ ParseFuncOrColumn(ParseState *pstate, char *funcname, List *fargs,
 static int
 agg_get_candidates(char *aggname,
                   Oid typeId,
-                  CandidateList *candidates)
+                  FuncCandidateList *candidates)
 {
    Relation    pg_aggregate_desc;
    SysScanDesc pg_aggregate_scan;
@@ -398,11 +399,10 @@ agg_get_candidates(char *aggname,
    while (HeapTupleIsValid(tup = systable_getnext(pg_aggregate_scan)))
    {
        Form_pg_aggregate agg = (Form_pg_aggregate) GETSTRUCT(tup);
-       CandidateList current_candidate;
-
-       current_candidate = (CandidateList) palloc(sizeof(struct _CandidateList));
-       current_candidate->args = (Oid *) palloc(sizeof(Oid));
+       FuncCandidateList current_candidate;
 
+       current_candidate = (FuncCandidateList)
+           palloc(sizeof(struct _FuncCandidateList));
        current_candidate->args[0] = agg->aggbasetype;
        current_candidate->next = *candidates;
        *candidates = current_candidate;
@@ -422,10 +422,10 @@ agg_get_candidates(char *aggname,
  * if successful, else InvalidOid.
  */
 static Oid
-agg_select_candidate(Oid typeid, CandidateList candidates)
+agg_select_candidate(Oid typeid, FuncCandidateList candidates)
 {
-   CandidateList current_candidate;
-   CandidateList last_candidate;
+   FuncCandidateList current_candidate;
+   FuncCandidateList last_candidate;
    Oid         current_typeid;
    int         ncandidates;
    CATEGORY    category,
@@ -498,91 +498,37 @@ agg_select_candidate(Oid typeid, CandidateList candidates)
 }  /* agg_select_candidate() */
 
 
-/* func_get_candidates()
- * get a list of all argument type vectors for which a function named
- * funcname taking nargs arguments exists
- */
-static CandidateList
-func_get_candidates(char *funcname, int nargs)
-{
-   Relation    heapRelation;
-   ScanKeyData skey[2];
-   HeapTuple   tuple;
-   SysScanDesc funcscan;
-   CandidateList candidates = NULL;
-   int         i;
-
-   heapRelation = heap_openr(ProcedureRelationName, AccessShareLock);
-
-   ScanKeyEntryInitialize(&skey[0],
-                          (bits16) 0x0,
-                          (AttrNumber) Anum_pg_proc_proname,
-                          (RegProcedure) F_NAMEEQ,
-                          PointerGetDatum(funcname));
-   ScanKeyEntryInitialize(&skey[1],
-                          (bits16) 0x0,
-                          (AttrNumber) Anum_pg_proc_pronargs,
-                          (RegProcedure) F_INT2EQ,
-                          Int16GetDatum(nargs));
-
-   funcscan = systable_beginscan(heapRelation, ProcedureNameNspIndex, true,
-                                 SnapshotNow, 2, skey);
-
-   while (HeapTupleIsValid(tuple = systable_getnext(funcscan)))
-   {
-       Form_pg_proc pgProcP = (Form_pg_proc) GETSTRUCT(tuple);
-       CandidateList current_candidate;
-
-       current_candidate = (CandidateList)
-           palloc(sizeof(struct _CandidateList));
-       current_candidate->args = (Oid *)
-           palloc(FUNC_MAX_ARGS * sizeof(Oid));
-       MemSet(current_candidate->args, 0, FUNC_MAX_ARGS * sizeof(Oid));
-       for (i = 0; i < nargs; i++)
-           current_candidate->args[i] = pgProcP->proargtypes[i];
-
-       current_candidate->next = candidates;
-       candidates = current_candidate;
-   }
-
-   systable_endscan(funcscan);
-   heap_close(heapRelation, AccessShareLock);
-
-   return candidates;
-}
-
-
 /* match_argtypes()
+ *
  * Given a list of possible typeid arrays to a function and an array of
  * input typeids, produce a shortlist of those function typeid arrays
  * that match the input typeids (either exactly or by coercion), and
- * return the number of such arrays
+ * return the number of such arrays.
+ *
+ * NB: okay to modify input list structure, as long as we find at least
+ * one match.
  */
 static int
 match_argtypes(int nargs,
               Oid *input_typeids,
-              CandidateList function_typeids,
-              CandidateList *candidates)       /* return value */
+              FuncCandidateList function_typeids,
+              FuncCandidateList *candidates) /* return value */
 {
-   CandidateList current_candidate;
-   CandidateList matching_candidate;
-   Oid        *current_typeids;
+   FuncCandidateList current_candidate;
+   FuncCandidateList next_candidate;
    int         ncandidates = 0;
 
    *candidates = NULL;
 
    for (current_candidate = function_typeids;
         current_candidate != NULL;
-        current_candidate = current_candidate->next)
+        current_candidate = next_candidate)
    {
-       current_typeids = current_candidate->args;
-       if (can_coerce_type(nargs, input_typeids, current_typeids))
+       next_candidate = current_candidate->next;
+       if (can_coerce_type(nargs, input_typeids, current_candidate->args))
        {
-           matching_candidate = (CandidateList)
-               palloc(sizeof(struct _CandidateList));
-           matching_candidate->args = current_typeids;
-           matching_candidate->next = *candidates;
-           *candidates = matching_candidate;
+           current_candidate->next = *candidates;
+           *candidates = current_candidate;
            ncandidates++;
        }
    }
@@ -593,8 +539,8 @@ match_argtypes(int nargs,
 
 /* func_select_candidate()
  * Given the input argtype array and more than one candidate
- * for the function argtype array, attempt to resolve the conflict.
- * Returns the selected argtype array if the conflict can be resolved,
+ * for the function, attempt to resolve the conflict.
+ * Returns the selected candidate if the conflict can be resolved,
  * otherwise returns NULL.
  *
  * By design, this is pretty similar to oper_select_candidate in parse_oper.c.
@@ -602,13 +548,13 @@ match_argtypes(int nargs,
  * already pruned away "candidates" that aren't actually coercion-compatible
  * with the input types, whereas oper_select_candidate must do that itself.
  */
-static Oid *
+static FuncCandidateList
 func_select_candidate(int nargs,
                      Oid *input_typeids,
-                     CandidateList candidates)
+                     FuncCandidateList candidates)
 {
-   CandidateList current_candidate;
-   CandidateList last_candidate;
+   FuncCandidateList current_candidate;
+   FuncCandidateList last_candidate;
    Oid        *current_typeids;
    Oid         current_type;
    int         i;
@@ -662,7 +608,7 @@ func_select_candidate(int nargs,
        last_candidate->next = NULL;
 
    if (ncandidates == 1)
-       return candidates->args;
+       return candidates;
 
    /*
     * Still too many candidates? Run through all candidates and keep
@@ -709,7 +655,7 @@ func_select_candidate(int nargs,
        last_candidate->next = NULL;
 
    if (ncandidates == 1)
-       return candidates->args;
+       return candidates;
 
    /*
     * Still too many candidates? Now look for candidates which are
@@ -755,7 +701,7 @@ func_select_candidate(int nargs,
        last_candidate->next = NULL;
 
    if (ncandidates == 1)
-       return candidates->args;
+       return candidates;
 
    /*
     * Still too many candidates? Try assigning types for the unknown
@@ -888,7 +834,7 @@ func_select_candidate(int nargs,
    }
 
    if (ncandidates == 1)
-       return candidates->args;
+       return candidates;
 
    return NULL;                /* failed to determine a unique candidate */
 }  /* func_select_candidate() */
@@ -925,22 +871,24 @@ func_get_detail(char *funcname,
                bool *retset,   /* return value */
                Oid **true_typeids)     /* return value */
 {
-   HeapTuple   ftup;
-   CandidateList function_typeids;
+   FuncCandidateList function_typeids;
+   FuncCandidateList best_candidate;
 
-   /* attempt to find with arguments exactly as specified... */
-   ftup = SearchSysCache(PROCNAME,
-                         PointerGetDatum(funcname),
-                         Int32GetDatum(nargs),
-                         PointerGetDatum(argtypes),
-                         0);
+   /* Get list of possible candidates from namespace search */
+   function_typeids = FuncnameGetCandidates(makeList1(makeString(funcname)), nargs);
 
-   if (HeapTupleIsValid(ftup))
+   /*
+    * See if there is an exact match
+    */
+   for (best_candidate = function_typeids;
+        best_candidate != NULL;
+        best_candidate = best_candidate->next)
    {
-       /* given argument types are the right ones */
-       *true_typeids = argtypes;
+       if (memcmp(argtypes, best_candidate->args, nargs * sizeof(Oid)) == 0)
+           break;
    }
-   else
+
+   if (best_candidate == NULL)
    {
        /*
         * If we didn't find an exact match, next consider the possibility
@@ -1001,10 +949,6 @@ func_get_detail(char *funcname,
         * didn't find an exact match, so now try to match up
         * candidates...
         */
-
-       function_typeids = func_get_candidates(funcname, nargs);
-
-       /* found something, so let's look through them... */
        if (function_typeids != NULL)
        {
            Oid       **input_typeid_vector = NULL;
@@ -1019,7 +963,7 @@ func_get_detail(char *funcname,
 
            do
            {
-               CandidateList current_function_typeids;
+               FuncCandidateList current_function_typeids;
                int         ncandidates;
 
                ncandidates = match_argtypes(nargs, current_input_typeids,
@@ -1029,13 +973,7 @@ func_get_detail(char *funcname,
                /* one match only? then run with it... */
                if (ncandidates == 1)
                {
-                   *true_typeids = current_function_typeids->args;
-                   ftup = SearchSysCache(PROCNAME,
-                                         PointerGetDatum(funcname),
-                                         Int32GetDatum(nargs),
-                                         PointerGetDatum(*true_typeids),
-                                         0);
-                   Assert(HeapTupleIsValid(ftup));
+                   best_candidate = current_function_typeids;
                    break;
                }
 
@@ -1045,25 +983,15 @@ func_get_detail(char *funcname,
                 */
                if (ncandidates > 1)
                {
-                   *true_typeids = func_select_candidate(nargs,
+                   best_candidate = func_select_candidate(nargs,
                                                   current_input_typeids,
                                               current_function_typeids);
 
-                   if (*true_typeids != NULL)
-                   {
-                       /* was able to choose a best candidate */
-                       ftup = SearchSysCache(PROCNAME,
-                                             PointerGetDatum(funcname),
-                                             Int32GetDatum(nargs),
-                                         PointerGetDatum(*true_typeids),
-                                             0);
-                       Assert(HeapTupleIsValid(ftup));
-                       break;
-                   }
-
                    /*
-                    * otherwise, ambiguous function call, so fail by
-                    * exiting loop with ftup still NULL.
+                    * If we were able to choose a best candidate, we're
+                    * done.  Otherwise, ambiguous function call, so fail
+                    * by exiting loop with best_candidate still NULL.
+                    * Either way, we're outta here.
                     */
                    break;
                }
@@ -1082,11 +1010,20 @@ func_get_detail(char *funcname,
        }
    }
 
-   if (HeapTupleIsValid(ftup))
+   if (best_candidate)
    {
-       Form_pg_proc pform = (Form_pg_proc) GETSTRUCT(ftup);
-
-       *funcid = ftup->t_data->t_oid;
+       HeapTuple   ftup;
+       Form_pg_proc pform;
+
+       *funcid = best_candidate->oid;
+       *true_typeids = best_candidate->args;
+
+       ftup = SearchSysCache(PROCOID,
+                             ObjectIdGetDatum(best_candidate->oid),
+                             0, 0, 0);
+       if (!HeapTupleIsValid(ftup)) /* should not happen */
+           elog(ERROR, "function %u not found", best_candidate->oid);
+       pform = (Form_pg_proc) GETSTRUCT(ftup);
        *rettype = pform->prorettype;
        *retset = pform->proretset;
        ReleaseSysCache(ftup);
index 202f447076e2a0bceeeeb243bb0f7a44a0cb8494..efcb65dbfdf13a003ee251555ba1522e5bc1a064 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.93 2002/03/26 19:16:08 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.94 2002/04/06 06:59:22 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -34,7 +34,7 @@
 #include "utils/syscache.h"
 
 
- /* #define CACHEDEBUG */  /* turns DEBUG elogs on */
+/* #define CACHEDEBUG */   /* turns DEBUG elogs on */
 
 /*
  * Constants related to size of the catcache.
@@ -98,7 +98,7 @@ static const Oid eqproc[] = {
 #define EQPROC(SYSTEMTYPEOID)  eqproc[(SYSTEMTYPEOID)-BOOLOID]
 
 
-static uint32 CatalogCacheComputeHashValue(CatCache *cache,
+static uint32 CatalogCacheComputeHashValue(CatCache *cache, int nkeys,
                             ScanKey cur_skey);
 static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
                                  HeapTuple tuple);
@@ -106,7 +106,12 @@ static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
 static void CatCachePrintStats(void);
 #endif
 static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
+static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
 static void CatalogCacheInitializeCache(CatCache *cache);
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                                       uint32 hashValue, Index hashIndex,
+                                       bool negative);
+static HeapTuple build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys);
 
 
 /*
@@ -149,16 +154,16 @@ GetCCHashFunc(Oid keytype)
  * Compute the hash value associated with a given set of lookup keys
  */
 static uint32
-CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
+CatalogCacheComputeHashValue(CatCache *cache, int nkeys, ScanKey cur_skey)
 {
    uint32      hashValue = 0;
 
    CACHE4_elog(DEBUG1, "CatalogCacheComputeHashValue %s %d %p",
                cache->cc_relname,
-               cache->cc_nkeys,
+               nkeys,
                cache);
 
-   switch (cache->cc_nkeys)
+   switch (nkeys)
    {
        case 4:
            hashValue ^=
@@ -181,7 +186,7 @@ CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
                                               cur_skey[0].sk_argument));
            break;
        default:
-           elog(FATAL, "CCComputeHashValue: %d cc_nkeys", cache->cc_nkeys);
+           elog(FATAL, "CCComputeHashValue: %d nkeys", nkeys);
            break;
    }
 
@@ -251,7 +256,7 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
            break;
    }
 
-   return CatalogCacheComputeHashValue(cache, cur_skey);
+   return CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
 }
 
 
@@ -267,6 +272,8 @@ CatCachePrintStats(void)
    long        cc_newloads = 0;
    long        cc_invals = 0;
    long        cc_discards = 0;
+   long        cc_lsearches = 0;
+   long        cc_lhits = 0;
 
    elog(DEBUG1, "Catcache stats dump: %d/%d tuples in catcaches",
         CacheHdr->ch_ntup, CacheHdr->ch_maxtup);
@@ -275,7 +282,7 @@ CatCachePrintStats(void)
    {
        if (cache->cc_ntup == 0 && cache->cc_searches == 0)
            continue;           /* don't print unused caches */
-       elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+       elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
             cache->cc_relname,
             cache->cc_indname,
             cache->cc_ntup,
@@ -287,15 +294,19 @@ CatCachePrintStats(void)
             cache->cc_searches - cache->cc_hits - cache->cc_neg_hits - cache->cc_newloads,
             cache->cc_searches - cache->cc_hits - cache->cc_neg_hits,
             cache->cc_invals,
-            cache->cc_discards);
+            cache->cc_discards,
+            cache->cc_lsearches,
+            cache->cc_lhits);
        cc_searches += cache->cc_searches;
        cc_hits += cache->cc_hits;
        cc_neg_hits += cache->cc_neg_hits;
        cc_newloads += cache->cc_newloads;
        cc_invals += cache->cc_invals;
        cc_discards += cache->cc_discards;
+       cc_lsearches += cache->cc_lsearches;
+       cc_lhits += cache->cc_lhits;
    }
-   elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+   elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
         CacheHdr->ch_ntup,
         cc_searches,
         cc_hits,
@@ -305,7 +316,9 @@ CatCachePrintStats(void)
         cc_searches - cc_hits - cc_neg_hits - cc_newloads,
         cc_searches - cc_hits - cc_neg_hits,
         cc_invals,
-        cc_discards);
+        cc_discards,
+        cc_lsearches,
+        cc_lhits);
 }
 
 #endif /* CATCACHE_STATS */
@@ -315,6 +328,8 @@ CatCachePrintStats(void)
  *     CatCacheRemoveCTup
  *
  * Unlink and delete the given cache entry
+ *
+ * NB: if it is a member of a CatCList, the CatCList is deleted too.
  */
 static void
 CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
@@ -322,6 +337,9 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
    Assert(ct->refcount == 0);
    Assert(ct->my_cache == cache);
 
+   if (ct->c_list)
+       CatCacheRemoveCList(cache, ct->c_list);
+
    /* delink from linked lists */
    DLRemove(&ct->lrulist_elem);
    DLRemove(&ct->cache_elem);
@@ -335,6 +353,38 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
    --CacheHdr->ch_ntup;
 }
 
+/*
+ *     CatCacheRemoveCList
+ *
+ * Unlink and delete the given cache list entry
+ */
+static void
+CatCacheRemoveCList(CatCache *cache, CatCList *cl)
+{
+   int         i;
+
+   Assert(cl->refcount == 0);
+   Assert(cl->my_cache == cache);
+
+   /* delink from member tuples */
+   for (i = cl->n_members; --i >= 0; )
+   {
+       CatCTup    *ct = cl->members[i];
+
+       Assert(ct->c_list == cl);
+       ct->c_list = NULL;
+   }
+
+   /* delink from linked list */
+   DLRemove(&cl->cache_elem);
+
+   /* free associated tuple data */
+   if (cl->tuple.t_data != NULL)
+       pfree(cl->tuple.t_data);
+   pfree(cl);
+}
+
+
 /*
  * CatalogCacheIdInvalidate
  *
@@ -385,7 +435,23 @@ CatalogCacheIdInvalidate(int cacheId,
         */
 
        /*
-        * inspect the proper hash bucket for matches
+        * Invalidate *all* CatCLists in this cache; it's too hard to tell
+        * which searches might still be correct, so just zap 'em all.
+        */
+       for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+       {
+           CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+           nextelt = DLGetSucc(elt);
+
+           if (cl->refcount > 0)
+               cl->dead = true;
+           else
+               CatCacheRemoveCList(ccp, cl);
+       }
+
+       /*
+        * inspect the proper hash bucket for tuple matches
         */
        hashIndex = HASH_INDEX(hashValue, ccp->cc_nbuckets);
 
@@ -458,9 +524,38 @@ CreateCacheMemoryContext(void)
 void
 AtEOXact_CatCache(bool isCommit)
 {
+   CatCache   *ccp;
    Dlelem     *elt,
               *nextelt;
 
+   /*
+    * First clean up CatCLists
+    */
+   for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
+   {
+       for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+       {
+           CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+           nextelt = DLGetSucc(elt);
+
+           if (cl->refcount != 0)
+           {
+               if (isCommit)
+                   elog(WARNING, "Cache reference leak: cache %s (%d), list %p has count %d",
+                        ccp->cc_relname, ccp->id, cl, cl->refcount);
+               cl->refcount = 0;
+           }
+
+           /* Clean up any now-deletable dead entries */
+           if (cl->dead)
+               CatCacheRemoveCList(ccp, cl);
+       }
+   }
+
+   /*
+    * Now clean up tuples; we can scan them all using the global LRU list
+    */
    for (elt = DLGetHead(&CacheHdr->ch_lrulist); elt; elt = nextelt)
    {
        CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -494,14 +589,26 @@ AtEOXact_CatCache(bool isCommit)
 static void
 ResetCatalogCache(CatCache *cache)
 {
+   Dlelem     *elt,
+              *nextelt;
    int         i;
 
+   /* Remove each list in this cache, or at least mark it dead */
+   for (elt = DLGetHead(&cache->cc_lists); elt; elt = nextelt)
+   {
+       CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+       nextelt = DLGetSucc(elt);
+
+       if (cl->refcount > 0)
+           cl->dead = true;
+       else
+           CatCacheRemoveCList(cache, cl);
+   }
+
    /* Remove each tuple in this cache, or at least mark it dead */
    for (i = 0; i < cache->cc_nbuckets; i++)
    {
-       Dlelem     *elt,
-                  *nextelt;
-
        for (elt = DLGetHead(&cache->cc_bucket[i]); elt; elt = nextelt)
        {
            CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -694,7 +801,7 @@ InitCatCache(int id,
    /*
     * allocate a new cache structure
     *
-    * Note: we assume zeroing initializes the bucket headers correctly
+    * Note: we assume zeroing initializes the Dllist headers correctly
     */
    cp = (CatCache *) palloc(sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
    MemSet((char *) cp, 0, sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
@@ -965,9 +1072,8 @@ SearchCatCache(CatCache *cache,
    Dlelem     *elt;
    CatCTup    *ct;
    Relation    relation;
+   SysScanDesc scandesc;
    HeapTuple   ntp;
-   int         i;
-   MemoryContext oldcxt;
 
    /*
     * one-time startup overhead for each cache
@@ -991,7 +1097,7 @@ SearchCatCache(CatCache *cache,
    /*
     * find the hash bucket in which to look for the tuple
     */
-   hashValue = CatalogCacheComputeHashValue(cache, cur_skey);
+   hashValue = CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
    hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
 
    /*
@@ -1040,10 +1146,8 @@ SearchCatCache(CatCache *cache,
        {
            ct->refcount++;
 
-#ifdef CACHEDEBUG
            CACHE3_elog(DEBUG1, "SearchCatCache(%s): found in bucket %d",
                        cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
            cache->cc_hits++;
@@ -1053,10 +1157,8 @@ SearchCatCache(CatCache *cache,
        }
        else
        {
-#ifdef CACHEDEBUG
            CACHE3_elog(DEBUG1, "SearchCatCache(%s): found neg entry in bucket %d",
                        cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
            cache->cc_neg_hits++;
@@ -1081,187 +1183,422 @@ SearchCatCache(CatCache *cache,
     * cache, so there's no functional problem.  This case is rare enough
     * that it's not worth expending extra cycles to detect.
     */
+   relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+   scandesc = systable_beginscan(relation,
+                                 cache->cc_indname,
+                                 IndexScanOK(cache, cur_skey),
+                                 SnapshotNow,
+                                 cache->cc_nkeys,
+                                 cur_skey);
+
+   ct = NULL;
+
+   while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+   {
+       ct = CatalogCacheCreateEntry(cache, ntp,
+                                    hashValue, hashIndex,
+                                    false);
+       break;                  /* assume only one match */
+   }
+
+   systable_endscan(scandesc);
+
+   heap_close(relation, AccessShareLock);
 
    /*
-    * open the relation associated with the cache
+    * If tuple was not found, we need to build a negative cache entry
+    * containing a fake tuple.  The fake tuple has the correct key columns,
+    * but nulls everywhere else.
     */
-   relation = heap_open(cache->cc_reloid, AccessShareLock);
+   if (ct == NULL)
+   {
+       ntp = build_dummy_tuple(cache, cache->cc_nkeys, cur_skey);
+       ct = CatalogCacheCreateEntry(cache, ntp,
+                                    hashValue, hashIndex,
+                                    true);
+       heap_freetuple(ntp);
+
+       CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
+                   cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+       CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d",
+                   cache->cc_relname, hashIndex);
+
+       /*
+        * We are not returning the new entry to the caller, so reset its
+        * refcount.
+        */
+       ct->refcount = 0;       /* negative entries never have refs */
+
+       return NULL;
+   }
+
+   CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
+               cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+   CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d",
+               cache->cc_relname, hashIndex);
+
+#ifdef CATCACHE_STATS
+   cache->cc_newloads++;
+#endif
+
+   return &ct->tuple;
+}
+
+/*
+ * ReleaseCatCache
+ *
+ * Decrement the reference count of a catcache entry (releasing the
+ * hold grabbed by a successful SearchCatCache).
+ *
+ * NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
+ * will be freed as soon as their refcount goes to zero.  In combination
+ * with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
+ * to catch references to already-released catcache entries.
+ */
+void
+ReleaseCatCache(HeapTuple tuple)
+{
+   CatCTup    *ct = (CatCTup *) (((char *) tuple) -
+                                 offsetof(CatCTup, tuple));
+
+   /* Safety checks to ensure we were handed a cache entry */
+   Assert(ct->ct_magic == CT_MAGIC);
+   Assert(ct->refcount > 0);
+
+   ct->refcount--;
+
+   if (ct->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+       && ct->dead
+#endif
+       )
+       CatCacheRemoveCTup(ct->my_cache, ct);
+}
+
+
+/*
+ * SearchCatCacheList
+ *
+ *     Generate a list of all tuples matching a partial key (that is,
+ *     a key specifying just the first K of the cache's N key columns).
+ *
+ *     The caller must not modify the list object or the pointed-to tuples,
+ *     and must call ReleaseCatCacheList() when done with the list.
+ */
+CatCList *
+SearchCatCacheList(CatCache *cache,
+                  int nkeys,
+                  Datum v1,
+                  Datum v2,
+                  Datum v3,
+                  Datum v4)
+{
+   ScanKeyData cur_skey[4];
+   uint32      lHashValue;
+   Dlelem     *elt;
+   CatCList   *cl;
+   CatCTup    *ct;
+   List       *ctlist;
+   int         nmembers;
+   Relation    relation;
+   SysScanDesc scandesc;
+   bool        ordered;
+   HeapTuple   ntp;
+   MemoryContext oldcxt;
+   int         i;
+
+   /*
+    * one-time startup overhead for each cache
+    */
+   if (cache->cc_tupdesc == NULL)
+       CatalogCacheInitializeCache(cache);
+
+   Assert(nkeys > 0 && nkeys < cache->cc_nkeys);
+
+#ifdef CATCACHE_STATS
+   cache->cc_lsearches++;
+#endif
+
+   /*
+    * initialize the search key information
+    */
+   memcpy(cur_skey, cache->cc_skey, sizeof(cur_skey));
+   cur_skey[0].sk_argument = v1;
+   cur_skey[1].sk_argument = v2;
+   cur_skey[2].sk_argument = v3;
+   cur_skey[3].sk_argument = v4;
 
    /*
-    * Pre-create cache entry header, and mark no tuple found.
+    * compute a hash value of the given keys for faster search.  We don't
+    * presently divide the CatCList items into buckets, but this still lets
+    * us skip non-matching items quickly most of the time.
     */
-   ct = (CatCTup *) MemoryContextAlloc(CacheMemoryContext, sizeof(CatCTup));
-   ct->negative = true;
+   lHashValue = CatalogCacheComputeHashValue(cache, nkeys, cur_skey);
 
    /*
-    * Scan the relation to find the tuple.  If there's an index, and if
-    * it's safe to do so, use the index.  Else do a heap scan.
+    * scan the items until we find a match or exhaust our list
     */
-   if ((RelationGetForm(relation))->relhasindex &&
-       !IsIgnoringSystemIndexes() &&
-       IndexScanOK(cache, cur_skey))
+   for (elt = DLGetHead(&cache->cc_lists);
+        elt;
+        elt = DLGetSucc(elt))
    {
-       Relation    idesc;
-       IndexScanDesc isd;
-       RetrieveIndexResult indexRes;
-       HeapTupleData tuple;
-       Buffer      buffer;
+       bool        res;
 
-       CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing index scan",
-                   cache->cc_relname);
+       cl = (CatCList *) DLE_VAL(elt);
+
+       if (cl->dead)
+           continue;           /* ignore dead entries */
+
+       if (cl->hash_value != lHashValue)
+           continue;           /* quickly skip entry if wrong hash val */
 
        /*
-        * For an index scan, sk_attno has to be set to the index
-        * attribute number(s), not the heap attribute numbers.  We assume
-        * that the index corresponds exactly to the cache keys (or its
-        * first N keys do, anyway).
+        * see if the cached list matches our key.
         */
-       for (i = 0; i < cache->cc_nkeys; ++i)
-           cur_skey[i].sk_attno = i + 1;
+       if (cl->nkeys != nkeys)
+           continue;
+       HeapKeyTest(&cl->tuple,
+                   cache->cc_tupdesc,
+                   nkeys,
+                   cur_skey,
+                   res);
+       if (!res)
+           continue;
 
-       idesc = index_openr(cache->cc_indname);
-       isd = index_beginscan(idesc, false, cache->cc_nkeys, cur_skey);
-       tuple.t_datamcxt = CurrentMemoryContext;
-       tuple.t_data = NULL;
-       while ((indexRes = index_getnext(isd, ForwardScanDirection)))
+       /*
+        * we found a matching list: move each of its members to the front
+        * of the global LRU list.  Also move the list itself to the front
+        * of the cache's list-of-lists, to speed subsequent searches.
+        * (We do not move the members to the fronts of their hashbucket
+        * lists, however, since there's no point in that unless they are
+        * searched for individually.)  Also bump the members' refcounts.
+        */
+       for (i = 0; i < cl->n_members; i++)
        {
-           tuple.t_self = indexRes->heap_iptr;
-           heap_fetch(relation, SnapshotNow, &tuple, &buffer, isd);
-           pfree(indexRes);
-           if (tuple.t_data != NULL)
-           {
-               /* Copy tuple into our context */
-               oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-               heap_copytuple_with_tuple(&tuple, &ct->tuple);
-               ct->negative = false;
-               MemoryContextSwitchTo(oldcxt);
-               ReleaseBuffer(buffer);
-               break;
-           }
+           cl->members[i]->refcount++;
+           DLMoveToFront(&cl->members[i]->lrulist_elem);
        }
-       index_endscan(isd);
-       index_close(idesc);
+       DLMoveToFront(&cl->cache_elem);
+
+       /* Bump the list's refcount and return it */
+       cl->refcount++;
+
+       CACHE2_elog(DEBUG1, "SearchCatCacheList(%s): found list",
+                   cache->cc_relname);
+
+#ifdef CATCACHE_STATS
+       cache->cc_lhits++;
+#endif
+
+       return cl;
    }
-   else
+
+   /*
+    * List was not found in cache, so we have to build it by reading
+    * the relation.  For each matching tuple found in the relation,
+    * use an existing cache entry if possible, else build a new one.
+    */
+   relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+   scandesc = systable_beginscan(relation,
+                                 cache->cc_indname,
+                                 true,
+                                 SnapshotNow,
+                                 nkeys,
+                                 cur_skey);
+
+   /* The list will be ordered iff we are doing an index scan */
+   ordered = (scandesc->irel != NULL);
+
+   ctlist = NIL;
+   nmembers = 0;
+
+   while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
    {
-       HeapScanDesc sd;
+       uint32      hashValue;
+       Index       hashIndex;
 
-       CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing heap scan",
-                   cache->cc_relname);
+       /*
+        * See if there's an entry for this tuple already.
+        */
+       ct = NULL;
+       hashValue = CatalogCacheComputeTupleHashValue(cache, ntp);
+       hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
+
+       for (elt = DLGetHead(&cache->cc_bucket[hashIndex]);
+            elt;
+            elt = DLGetSucc(elt))
+       {
+           ct = (CatCTup *) DLE_VAL(elt);
+
+           if (ct->dead || ct->negative)
+               continue;           /* ignore dead and negative entries */
+
+           if (ct->hash_value != hashValue)
+               continue;           /* quickly skip entry if wrong hash val */
 
-       sd = heap_beginscan(relation, 0, SnapshotNow,
-                           cache->cc_nkeys, cur_skey);
+           if (!ItemPointerEquals(&(ct->tuple.t_self), &(ntp->t_self)))
+               continue;           /* not same tuple */
 
-       ntp = heap_getnext(sd, 0);
+           /*
+            * Found a match, but can't use it if it belongs to another list
+            * already
+            */
+           if (ct->c_list)
+               continue;
 
-       if (HeapTupleIsValid(ntp))
+           /* Found a match, so bump its refcount and move to front */
+           ct->refcount++;
+
+           DLMoveToFront(&ct->lrulist_elem);
+
+           break;
+       }
+
+       if (elt == NULL)
        {
-           /* Copy tuple into our context */
-           oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-           heap_copytuple_with_tuple(ntp, &ct->tuple);
-           ct->negative = false;
-           MemoryContextSwitchTo(oldcxt);
-           /* We should not free the result of heap_getnext... */
+           /* We didn't find a usable entry, so make a new one */
+           ct = CatalogCacheCreateEntry(cache, ntp,
+                                        hashValue, hashIndex,
+                                        false);
        }
 
-       heap_endscan(sd);
+       ctlist = lcons(ct, ctlist);
+       nmembers++;
    }
 
-   /*
-    * close the relation
-    */
+   systable_endscan(scandesc);
+
    heap_close(relation, AccessShareLock);
 
    /*
-    * scan is complete.  If tuple was not found, we need to build
-    * a fake tuple for the negative cache entry.  The fake tuple has
-    * the correct key columns, but nulls everywhere else.
+    * Now we can build the CatCList entry.  First we need a dummy tuple
+    * containing the key values...
     */
-   if (ct->negative)
+   ntp = build_dummy_tuple(cache, nkeys, cur_skey);
+   oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+   cl = (CatCList *) palloc(sizeof(CatCList) + nmembers * sizeof(CatCTup *));
+   heap_copytuple_with_tuple(ntp, &cl->tuple);
+   MemoryContextSwitchTo(oldcxt);
+   heap_freetuple(ntp);
+
+   cl->cl_magic = CL_MAGIC;
+   cl->my_cache = cache;
+   DLInitElem(&cl->cache_elem, (void *) cl);
+   cl->refcount = 1;           /* count this first reference */
+   cl->dead = false;
+   cl->ordered = ordered;
+   cl->nkeys = nkeys;
+   cl->hash_value = lHashValue;
+   cl->n_members = nmembers;
+   /* The list is backwards because we built it with lcons */
+   for (i = nmembers; --i >= 0; )
    {
-       TupleDesc   tupDesc = cache->cc_tupdesc;
-       Datum      *values;
-       char       *nulls;
-       Oid         negOid = InvalidOid;
+       cl->members[i] = ct = (CatCTup *) lfirst(ctlist);
+       Assert(ct->c_list == NULL);
+       ct->c_list = cl;
+       /* mark list dead if any members already dead */
+       if (ct->dead)
+           cl->dead = true;
+       ctlist = lnext(ctlist);
+   }
 
-       values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-       nulls = (char *) palloc(tupDesc->natts * sizeof(char));
+   DLAddHead(&cache->cc_lists, &cl->cache_elem);
 
-       memset(values, 0, tupDesc->natts * sizeof(Datum));
-       memset(nulls, 'n', tupDesc->natts * sizeof(char));
+   CACHE3_elog(DEBUG1, "SearchCatCacheList(%s): made list of %d members",
+               cache->cc_relname, nmembers);
 
-       for (i = 0; i < cache->cc_nkeys; i++)
-       {
-           int     attindex = cache->cc_key[i];
-           Datum   keyval = cur_skey[i].sk_argument;
+   return cl;
+}
 
-           if (attindex > 0)
-           {
-               /*
-                * Here we must be careful in case the caller passed a
-                * C string where a NAME is wanted: convert the given
-                * argument to a correctly padded NAME.  Otherwise the
-                * memcpy() done in heap_formtuple could fall off the
-                * end of memory.
-                */
-               if (cache->cc_isname[i])
-               {
-                   Name    newval = (Name) palloc(NAMEDATALEN);
+/*
+ * ReleaseCatCacheList
+ *
+ * Decrement the reference counts of a catcache list.
+ */
+void
+ReleaseCatCacheList(CatCList *list)
+{
+   int         i;
 
-                   namestrcpy(newval, DatumGetCString(keyval));
-                   keyval = NameGetDatum(newval);
-               }
-               values[attindex-1] = keyval;
-               nulls[attindex-1] = ' ';
-           }
-           else
-           {
-               Assert(attindex == ObjectIdAttributeNumber);
-               negOid = DatumGetObjectId(keyval);
-           }
-       }
+   /* Safety checks to ensure we were handed a cache entry */
+   Assert(list->cl_magic == CL_MAGIC);
+   Assert(list->refcount > 0);
 
-       ntp = heap_formtuple(tupDesc, values, nulls);
+   for (i = list->n_members; --i >= 0; )
+   {
+       CatCTup    *ct = list->members[i];
 
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-       heap_copytuple_with_tuple(ntp, &ct->tuple);
-       ct->tuple.t_data->t_oid = negOid;
-       MemoryContextSwitchTo(oldcxt);
+       Assert(ct->refcount > 0);
 
-       heap_freetuple(ntp);
-       for (i = 0; i < cache->cc_nkeys; i++)
-       {
-           if (cache->cc_isname[i])
-               pfree(DatumGetName(values[cache->cc_key[i]-1]));
-       }
-       pfree(values);
-       pfree(nulls);
+       ct->refcount--;
+
+       if (ct->dead)
+           list->dead = true;
+       /* can't remove tuple before list is removed */
    }
 
+   list->refcount--;
+
+   if (list->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+       && list->dead
+#endif
+       )
+       CatCacheRemoveCList(list->my_cache, list);
+}
+
+
+/*
+ * CatalogCacheCreateEntry
+ *     Create a new CatCTup entry, copying the given HeapTuple and other
+ *     supplied data into it.  The new entry is given refcount 1.
+ */
+static CatCTup *
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                       uint32 hashValue, Index hashIndex, bool negative)
+{
+   CatCTup    *ct;
+   MemoryContext oldcxt;
+
+   /*
+    * Allocate CatCTup header in cache memory, and copy the tuple there too.
+    */
+   oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+   ct = (CatCTup *) palloc(sizeof(CatCTup));
+   heap_copytuple_with_tuple(ntp, &ct->tuple);
+   MemoryContextSwitchTo(oldcxt);
+
    /*
-    * Finish initializing the CatCTup header, and add it to the linked
-    * lists.
+    * Finish initializing the CatCTup header, and add it to the cache's
+    * linked lists and counts.
     */
    ct->ct_magic = CT_MAGIC;
    ct->my_cache = cache;
    DLInitElem(&ct->lrulist_elem, (void *) ct);
    DLInitElem(&ct->cache_elem, (void *) ct);
+   ct->c_list = NULL;
    ct->refcount = 1;           /* count this first reference */
    ct->dead = false;
+   ct->negative = negative;
    ct->hash_value = hashValue;
 
    DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem);
    DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem);
 
+   cache->cc_ntup++;
+   CacheHdr->ch_ntup++;
+
    /*
     * If we've exceeded the desired size of the caches, try to throw away
     * the least recently used entry.  NB: the newly-built entry cannot
     * get thrown away here, because it has positive refcount.
     */
-   ++cache->cc_ntup;
-   if (++CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
+   if (CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
    {
-       Dlelem     *prevelt;
+       Dlelem     *elt,
+                  *prevelt;
 
        for (elt = DLGetTail(&CacheHdr->ch_lrulist); elt; elt = prevelt)
        {
@@ -1271,7 +1608,7 @@ SearchCatCache(CatCache *cache,
 
            if (oldct->refcount == 0)
            {
-               CACHE2_elog(DEBUG1, "SearchCatCache(%s): Overflow, LRU removal",
+               CACHE2_elog(DEBUG1, "CatCacheCreateEntry(%s): Overflow, LRU removal",
                            cache->cc_relname);
 #ifdef CATCACHE_STATS
                oldct->my_cache->cc_discards++;
@@ -1283,65 +1620,75 @@ SearchCatCache(CatCache *cache,
        }
    }
 
-   CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
-               cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
-
-   if (ct->negative)
-   {
-       CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d",
-                   cache->cc_relname, hashIndex);
-
-       /*
-        * We are not returning the new entry to the caller, so reset its
-        * refcount.  Note it would be uncool to set the refcount to 0
-        * before doing the extra-entry removal step above.
-        */
-       ct->refcount = 0;       /* negative entries never have refs */
-
-       return NULL;
-   }
-
-   CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d",
-               cache->cc_relname, hashIndex);
-
-#ifdef CATCACHE_STATS
-   cache->cc_newloads++;
-#endif
-
-   return &ct->tuple;
+   return ct;
 }
 
 /*
- * ReleaseCatCache()
+ * build_dummy_tuple
+ *     Generate a palloc'd HeapTuple that contains the specified key
+ *     columns, and NULLs for other columns.
  *
- * Decrement the reference count of a catcache entry (releasing the
- * hold grabbed by a successful SearchCatCache).
- *
- * NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
- * will be freed as soon as their refcount goes to zero.  In combination
- * with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
- * to catch references to already-released catcache entries.
+ * This is used to store the keys for negative cache entries and CatCList
+ * entries, which don't have real tuples associated with them.
  */
-void
-ReleaseCatCache(HeapTuple tuple)
+static HeapTuple
+build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys)
 {
-   CatCTup    *ct = (CatCTup *) (((char *) tuple) -
-                                 offsetof(CatCTup, tuple));
+   HeapTuple   ntp;
+   TupleDesc   tupDesc = cache->cc_tupdesc;
+   Datum      *values;
+   char       *nulls;
+   Oid         tupOid = InvalidOid;
+   NameData    tempNames[4];
+   int         i;
 
-   /* Safety checks to ensure we were handed a cache entry */
-   Assert(ct->ct_magic == CT_MAGIC);
-   Assert(ct->refcount > 0);
+   values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+   nulls = (char *) palloc(tupDesc->natts * sizeof(char));
 
-   ct->refcount--;
+   memset(values, 0, tupDesc->natts * sizeof(Datum));
+   memset(nulls, 'n', tupDesc->natts * sizeof(char));
 
-   if (ct->refcount == 0
-#ifndef CATCACHE_FORCE_RELEASE
-       && ct->dead
-#endif
-       )
-       CatCacheRemoveCTup(ct->my_cache, ct);
+   for (i = 0; i < nkeys; i++)
+   {
+       int     attindex = cache->cc_key[i];
+       Datum   keyval = skeys[i].sk_argument;
+
+       if (attindex > 0)
+       {
+           /*
+            * Here we must be careful in case the caller passed a
+            * C string where a NAME is wanted: convert the given
+            * argument to a correctly padded NAME.  Otherwise the
+            * memcpy() done in heap_formtuple could fall off the
+            * end of memory.
+            */
+           if (cache->cc_isname[i])
+           {
+               Name    newval = &tempNames[i];
+
+               namestrcpy(newval, DatumGetCString(keyval));
+               keyval = NameGetDatum(newval);
+           }
+           values[attindex-1] = keyval;
+           nulls[attindex-1] = ' ';
+       }
+       else
+       {
+           Assert(attindex == ObjectIdAttributeNumber);
+           tupOid = DatumGetObjectId(keyval);
+       }
+   }
+
+   ntp = heap_formtuple(tupDesc, values, nulls);
+   ntp->t_data->t_oid = tupOid;
+
+   pfree(values);
+   pfree(nulls);
+
+   return ntp;
 }
 
+
 /*
  * PrepareToInvalidateCacheTuple()
  *
index 84cea0bb63242bfb6f863615fdfc2071a6908f3a..7e6a95a324e464a932a7855edec6c0e037057f3d 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/utils/cache/syscache.c,v 1.73 2002/04/05 00:31:31 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/utils/cache/syscache.c,v 1.74 2002/04/06 06:59:23 tgl Exp $
  *
  * NOTES
  *   These routines allow the parser/planner/executor to perform
@@ -626,3 +626,18 @@ SysCacheGetAttr(int cacheId, HeapTuple tup,
                        SysCache[cacheId]->cc_tupdesc,
                        isNull);
 }
+
+/*
+ * List-search interface
+ */
+struct catclist *
+SearchSysCacheList(int cacheId, int nkeys,
+                  Datum key1, Datum key2, Datum key3, Datum key4)
+{
+   if (cacheId < 0 || cacheId >= SysCacheSize ||
+       ! PointerIsValid(SysCache[cacheId]))
+       elog(ERROR, "SearchSysCacheList: Bad cache id %d", cacheId);
+
+   return SearchCatCacheList(SysCache[cacheId], nkeys,
+                             key1, key2, key3, key4);
+}
index a8a64bd7db1151f4ce6365e200cdb58422b0f271..4c81e78f3ffbe3699bf638798a7ec84679a46046 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: namespace.h,v 1.5 2002/04/01 03:34:27 tgl Exp $
+ * $Id: namespace.h,v 1.6 2002/04/06 06:59:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "nodes/primnodes.h"
 
 
+/*
+ * This structure holds a list of possible functions or operators
+ * found by namespace lookup.  Each function/operator is identified
+ * by OID and by argument types; the list must be pruned by type
+ * resolution rules that are embodied in the parser, not here.
+ * The number of arguments is assumed to be known a priori.
+ */
+typedef struct _FuncCandidateList
+{
+   struct _FuncCandidateList *next;
+   int         pathpos;        /* for internal use of namespace lookup */
+   Oid         oid;            /* the function or operator's OID */
+   Oid         args[1];        /* arg types --- VARIABLE LENGTH ARRAY */
+} *FuncCandidateList;          /* VARIABLE LENGTH STRUCT */
+
+
 extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK);
 
 extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
@@ -25,6 +41,8 @@ extern Oid    RelnameGetRelid(const char *relname);
 
 extern Oid TypenameGetTypid(const char *typname);
 
+extern FuncCandidateList FuncnameGetCandidates(List *names, int nargs);
+
 extern Oid QualifiedNameGetCreationNamespace(List *names, char **objname_p);
 
 extern RangeVar *makeRangeVarFromNameList(List *names);
index 8e98f41c019f2062c1bbaa87e35e0ec148deaf84..2a1a83f13b961f5b87ebe20e72fa7b553962c95d 100644 (file)
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catcache.h,v 1.41 2002/03/26 19:16:56 tgl Exp $
+ * $Id: catcache.h,v 1.42 2002/04/06 06:59:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -25,6 +25,7 @@
 
 /*
  *     struct catctup:         individual tuple in the cache.
+ *     struct catclist:        list of tuples matching a partial key.
  *     struct catcache:        information for managing a cache.
  *     struct catcacheheader:  information for managing all the caches.
  */
@@ -36,7 +37,7 @@ typedef struct catcache
    const char *cc_relname;     /* name of relation the tuples come from */
    const char *cc_indname;     /* name of index matching cache keys */
    Oid         cc_reloid;      /* OID of relation the tuples come from */
-   bool        cc_relisshared; /* is relation shared? */
+   bool        cc_relisshared; /* is relation shared across databases? */
    TupleDesc   cc_tupdesc;     /* tuple descriptor (copied from reldesc) */
    int         cc_reloidattr;  /* AttrNumber of relation OID attr, or 0 */
    int         cc_ntup;        /* # of tuples currently in this cache */
@@ -46,6 +47,7 @@ typedef struct catcache
    PGFunction  cc_hashfunc[4]; /* hash function to use for each key */
    ScanKeyData cc_skey[4];     /* precomputed key info for heap scans */
    bool        cc_isname[4];   /* flag key columns that are NAMEs */
+   Dllist      cc_lists;       /* list of CatCList structs */
 #ifdef CATCACHE_STATS
    long        cc_searches;    /* total # searches against this cache */
    long        cc_hits;        /* # of matches against existing entry */
@@ -57,6 +59,8 @@ typedef struct catcache
     */
    long        cc_invals;      /* # of entries invalidated from cache */
    long        cc_discards;    /* # of entries discarded due to overflow */
+   long        cc_lsearches;   /* total # list-searches */
+   long        cc_lhits;       /* # of matches against existing lists */
 #endif
    Dllist      cc_bucket[1];   /* hash buckets --- VARIABLE LENGTH ARRAY */
 } CatCache;                        /* VARIABLE LENGTH STRUCT */
@@ -64,15 +68,25 @@ typedef struct catcache
 
 typedef struct catctup
 {
-   int         ct_magic;       /* for Assert checks */
+   int         ct_magic;       /* for identifying CatCTup entries */
 #define CT_MAGIC   0x57261502
    CatCache   *my_cache;       /* link to owning catcache */
 
    /*
-    * Each tuple in a cache is a member of two lists: one lists all the
+    * Each tuple in a cache is a member of two Dllists: one lists all the
     * elements in all the caches in LRU order, and the other lists just
     * the elements in one hashbucket of one cache, also in LRU order.
     *
+    * The tuple may also be a member of at most one CatCList.  (If a single
+    * catcache is list-searched with varying numbers of keys, we may have
+    * to make multiple entries for the same tuple because of this
+    * restriction.  Currently, that's not expected to be common, so we
+    * accept the potential inefficiency.)
+    */
+   Dlelem      lrulist_elem;   /* list member of global LRU list */
+   Dlelem      cache_elem;     /* list member of per-bucket list */
+   struct catclist *c_list;    /* containing catclist, or NULL if none */
+   /*
     * A tuple marked "dead" must not be returned by subsequent searches.
     * However, it won't be physically deleted from the cache until its
     * refcount goes to zero.
@@ -82,8 +96,6 @@ typedef struct catctup
     * so far as avoiding catalog searches is concerned.  Management of
     * positive and negative entries is identical.
     */
-   Dlelem      lrulist_elem;   /* list member of global LRU list */
-   Dlelem      cache_elem;     /* list member of per-bucket list */
    int         refcount;       /* number of active references */
    bool        dead;           /* dead but not yet removed? */
    bool        negative;       /* negative cache entry? */
@@ -92,6 +104,47 @@ typedef struct catctup
 } CatCTup;
 
 
+typedef struct catclist
+{
+   int         cl_magic;       /* for identifying CatCList entries */
+#define CL_MAGIC   0x52765103
+   CatCache   *my_cache;       /* link to owning catcache */
+
+   /*
+    * A CatCList describes the result of a partial search, ie, a search
+    * using only the first K key columns of an N-key cache.  We form the
+    * keys used into a tuple (with other attributes NULL) to represent
+    * the stored key set.  The CatCList object contains links to cache
+    * entries for all the table rows satisfying the partial key.  (Note:
+    * none of these will be negative cache entries.)
+    *
+    * A CatCList is only a member of a per-cache list; we do not do
+    * separate LRU management for CatCLists.  Instead, a CatCList is
+    * dropped from the cache as soon as any one of its member tuples
+    * ages out due to tuple-level LRU management.
+    *
+    * A list marked "dead" must not be returned by subsequent searches.
+    * However, it won't be physically deleted from the cache until its
+    * refcount goes to zero.  (Its member tuples must have refcounts at
+    * least as large, so they won't go away either.)
+    *
+    * If "ordered" is true then the member tuples appear in the order of
+    * the cache's underlying index.  This will be true in normal operation,
+    * but might not be true during bootstrap or recovery operations.
+    * (namespace.c is able to save some cycles when it is true.)
+    */
+   Dlelem      cache_elem;     /* list member of per-catcache list */
+   int         refcount;       /* number of active references */
+   bool        dead;           /* dead but not yet removed? */
+   bool        ordered;        /* members listed in index order? */
+   short       nkeys;          /* number of lookup keys specified */
+   uint32      hash_value;     /* hash value for lookup keys */
+   HeapTupleData tuple;        /* header for tuple holding keys */
+   int         n_members;      /* number of member tuples */
+   CatCTup    *members[1];     /* members --- VARIABLE LENGTH ARRAY */
+} CatCList;                        /* VARIABLE LENGTH STRUCT */
+
+
 typedef struct catcacheheader
 {
    CatCache   *ch_caches;      /* head of list of CatCache structs */
@@ -117,6 +170,11 @@ extern HeapTuple SearchCatCache(CatCache *cache,
               Datum v3, Datum v4);
 extern void ReleaseCatCache(HeapTuple tuple);
 
+extern CatCList *SearchCatCacheList(CatCache *cache, int nkeys,
+              Datum v1, Datum v2,
+              Datum v3, Datum v4);
+extern void ReleaseCatCacheList(CatCList *list);
+
 extern void ResetCatalogCaches(void);
 extern void CatalogCacheFlushRelation(Oid relId);
 extern void CatalogCacheIdInvalidate(int cacheId, uint32 hashValue,
index d08dfadda49157bc2f320aa1f8b184cad28c3ac2..1d9ddd9acfd8f37173802ac48e0889947c4f0d08 100644 (file)
@@ -9,7 +9,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: syscache.h,v 1.41 2002/03/29 19:06:26 tgl Exp $
+ * $Id: syscache.h,v 1.42 2002/04/06 06:59:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -79,4 +79,9 @@ extern Oid GetSysCacheOid(int cacheId,
 extern Datum SysCacheGetAttr(int cacheId, HeapTuple tup,
                AttrNumber attributeNumber, bool *isNull);
 
+/* list-search interface.  Users of this must import catcache.h too */
+extern struct catclist *SearchSysCacheList(int cacheId, int nkeys,
+              Datum key1, Datum key2, Datum key3, Datum key4);
+#define ReleaseSysCacheList(x)  ReleaseCatCacheList(x)
+
 #endif   /* SYSCACHE_H */