Try to instill some sanity in plperl's function result processing.
authorTom Lane
Mon, 22 Nov 2004 20:31:53 +0000 (20:31 +0000)
committerTom Lane
Mon, 22 Nov 2004 20:31:53 +0000 (20:31 +0000)
Get rid of static variables for SETOF result, don't crash when called
from non-FROM context, eliminate dead code, etc.

src/pl/plperl/plperl.c

index b2f4bf74a468c20b8bfa4b3a2a0f207875cd2506..9aa5102e192436591678c1a5aa5267324c863219 100644 (file)
@@ -33,7 +33,7 @@
  *   ENHANCEMENTS, OR MODIFICATIONS.
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.61 2004/11/21 22:13:37 tgl Exp $
+ *   $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.62 2004/11/22 20:31:53 tgl Exp $
  *
  **********************************************************************/
 
@@ -83,8 +83,8 @@ typedef struct plperl_proc_desc
    bool        lanpltrusted;
    bool        fn_retistuple;  /* true, if function returns tuple */
    bool        fn_retisset;    /* true, if function returns set */
-   Oid         ret_oid;        /* Oid of returning type */
-   FmgrInfo    result_in_func;
+   Oid         result_oid;     /* Oid of result type */
+   FmgrInfo    result_in_func; /* I/O function and arg for result type */
    Oid         result_typioparam;
    int         nargs;
    FmgrInfo    arg_out_func[FUNC_MAX_ARGS];
@@ -101,9 +101,6 @@ static int  plperl_firstcall = 1;
 static bool plperl_safe_init_done = false;
 static PerlInterpreter *plperl_interp = NULL;
 static HV  *plperl_proc_hash = NULL;
-static AV  *g_column_keys = NULL;
-static SV  *srf_perlret = NULL; /* keep returned value */
-static int g_attr_num = 0;
 
 /* this is saved and restored by plperl_call_handler */
 static plperl_proc_desc *plperl_current_prodesc = NULL;
@@ -163,27 +160,7 @@ plperl_init(void)
        return;
 
    /************************************************************
-    * Free the proc hash table
-    ************************************************************/
-   if (plperl_proc_hash != NULL)
-   {
-       hv_undef(plperl_proc_hash);
-       SvREFCNT_dec((SV *) plperl_proc_hash);
-       plperl_proc_hash = NULL;
-   }
-
-   /************************************************************
-    * Destroy the existing Perl interpreter
-    ************************************************************/
-   if (plperl_interp != NULL)
-   {
-       perl_destruct(plperl_interp);
-       perl_free(plperl_interp);
-       plperl_interp = NULL;
-   }
-
-   /************************************************************
-    * Now recreate a new Perl interpreter
+    * Create the Perl interpreter
     ************************************************************/
    plperl_init_interp();
 
@@ -217,8 +194,7 @@ plperl_init_all(void)
 static void
 plperl_init_interp(void)
 {
-
-   char       *embedding[3] = {
+   static char    *embedding[3] = {
        "", "-e",
 
        /*
@@ -238,7 +214,7 @@ plperl_init_interp(void)
    perl_run(plperl_interp);
 
    /************************************************************
-    * Initialize the proc and query hash tables
+    * Initialize the procedure hash table
     ************************************************************/
    plperl_proc_hash = newHV();
 }
@@ -269,7 +245,6 @@ plperl_safe_init(void)
               ;
 
    SV         *res;
-
    float       safe_version;
 
    res = eval_pv(safe_module, FALSE);  /* TRUE = croak if failure */
@@ -415,54 +390,6 @@ plperl_trigger_build_args(FunctionCallInfo fcinfo)
 }
 
 
-/**********************************************************************
- * check return value from plperl function
- **********************************************************************/
-static int
-plperl_is_set(SV *sv)
-{
-   int         i = 0;
-   int         len = 0;
-   int         set = 0;
-   int         other = 0;
-   AV         *input_av;
-   SV        **val;
-
-   if (SvTYPE(sv) != SVt_RV)
-       return 0;
-
-   if (SvTYPE(SvRV(sv)) == SVt_PVHV)
-       return 0;
-
-   if (SvTYPE(SvRV(sv)) == SVt_PVAV)
-   {
-       input_av = (AV *) SvRV(sv);
-       len = av_len(input_av) + 1;
-
-       for (i = 0; i < len; i++)
-       {
-           val = av_fetch(input_av, i, FALSE);
-           if (SvTYPE(*val) == SVt_RV)
-               set = 1;
-           else
-               other = 1;
-       }
-   }
-
-   if (len == 0)
-       return 1;
-   if (set && !other)
-       return 1;
-   if (!set && other)
-       return 0;
-   if (set && other)
-       elog(ERROR, "plperl: check your return value structure");
-   if (!set && !other)
-       elog(ERROR, "plperl: check your return value structure");
-
-   return 0;                   /* for compiler */
-}
-
 /**********************************************************************
  * extract a list of keys from a hash
  **********************************************************************/
@@ -505,7 +432,6 @@ plperl_get_key(AV *keys, int index)
  * extract a value for a given key from a hash
  *
  * return NULL on error or if we got an undef
- *
  **********************************************************************/
 static char *
 plperl_get_elem(HV *hash, char *key)
@@ -516,6 +442,28 @@ plperl_get_elem(HV *hash, char *key)
    return SvTYPE(*svp) == SVt_NULL ? NULL : SvPV(*svp, PL_na);
 }
 
+/*
+ * Obtain tuple descriptor for a function returning tuple
+ *
+ * NB: copy the result if needed for any great length of time
+ */
+static TupleDesc
+get_function_tupdesc(Oid result_type, ReturnSetInfo *rsinfo)
+{
+   if (result_type == RECORDOID)
+   {
+       /* We must get the information from call context */
+       if (!rsinfo || !IsA(rsinfo, ReturnSetInfo) ||
+           rsinfo->expectedDesc == NULL)
+           ereport(ERROR,
+                   (errcode(ERRCODE_DATATYPE_MISMATCH),
+                    errmsg("could not determine row description for function returning record")));
+       return rsinfo->expectedDesc;
+   }
+   else                /* ordinary composite type */
+       return lookup_rowtype_tupdesc(result_type, -1);
+}
+
 /**********************************************************************
  * set up the new tuple returned from a trigger
  **********************************************************************/
@@ -630,16 +578,10 @@ plperl_call_handler(PG_FUNCTION_ARGS)
 
    PG_TRY();
    {
-       /************************************************************
-        * Connect to SPI manager
-        ************************************************************/
-       if (SPI_connect() != SPI_OK_CONNECT)
-           elog(ERROR, "could not connect to SPI manager");
-
-       /************************************************************
+       /*
         * Determine if called as function or trigger and
         * call appropriate subhandler
-        ************************************************************/
+        */
        if (CALLED_AS_TRIGGER(fcinfo))
            retval = PointerGetDatum(plperl_trigger_handler(fcinfo));
        else
@@ -910,6 +852,10 @@ plperl_func_handler(PG_FUNCTION_ARGS)
    SV         *perlret;
    Datum       retval;
 
+   /* Connect to SPI manager */
+   if (SPI_connect() != SPI_OK_CONNECT)
+       elog(ERROR, "could not connect to SPI manager");
+
    /* Find or compile the function */
    prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, false);
 
@@ -920,19 +866,14 @@ plperl_func_handler(PG_FUNCTION_ARGS)
     ************************************************************/
    if (!prodesc->fn_retisset)
        perlret = plperl_call_perl_func(prodesc, fcinfo);
+   else if (SRF_IS_FIRSTCALL())
+       perlret = plperl_call_perl_func(prodesc, fcinfo);
    else
    {
-       if (SRF_IS_FIRSTCALL()) /* call function only once */
-           srf_perlret = plperl_call_perl_func(prodesc, fcinfo);
-       perlret = srf_perlret;
-   }
+       /* Get back the SV stashed on initial call */
+       FuncCallContext *funcctx = (FuncCallContext *) fcinfo->flinfo->fn_extra;
 
-   if (prodesc->fn_retisset && SRF_IS_FIRSTCALL())
-   {
-       if (prodesc->fn_retistuple)
-           g_column_keys = newAV();
-       if (SvTYPE(perlret) != SVt_RV)
-           elog(ERROR, "plperl: set-returning function must return reference");
+       perlret = (SV *) funcctx->user_fctx;
    }
 
    /************************************************************
@@ -947,147 +888,78 @@ plperl_func_handler(PG_FUNCTION_ARGS)
    if (!(perlret && SvOK(perlret) && SvTYPE(perlret) != SVt_NULL))
    {
        /* return NULL if Perl code returned undef */
-       fcinfo->isnull = true;
+       ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+
+       if (perlret)
+           SvREFCNT_dec(perlret);
+       if (rsi && IsA(rsi, ReturnSetInfo))
+           rsi->isDone = ExprEndResult;
+       PG_RETURN_NULL();
    }
 
-   if (prodesc->fn_retisset && !(perlret && SvTYPE(SvRV(perlret)) == SVt_PVAV))
+   if (prodesc->fn_retisset &&
+       (SvTYPE(perlret) != SVt_RV || SvTYPE(SvRV(perlret)) != SVt_PVAV))
        elog(ERROR, "plperl: set-returning function must return reference to array");
 
-   if (prodesc->fn_retistuple && perlret && SvTYPE(perlret) != SVt_RV)
+   if (prodesc->fn_retistuple && SvTYPE(perlret) != SVt_RV)
        elog(ERROR, "plperl: composite-returning function must return a reference");
 
-   if (prodesc->fn_retisset && !fcinfo->resultinfo)
-       ereport(ERROR,
-               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                errmsg("set-valued function called in context that cannot accept a set")));
-
-   if (prodesc->fn_retistuple && fcinfo->resultinfo)   /* set of tuples */
+   if (prodesc->fn_retisset && prodesc->fn_retistuple)
    {
-       /*
-        *  This branch will be taken when the function call
-        *  appears in a context that can return a set of tuples,
-        *  even if it only actually returns a single tuple
-        *  (e.g. select a from foo() where foo returns a singleton
-        *  of some composite type with member a). In this case, the
-        *  return value will be a hashref. If a rowset is returned
-        *  it will be an arrayref whose members will be hashrefs.
-        *
-        *  Care is taken in the code only to refer to the appropriate
-        *  one of ret_hv and ret_av, only one of which is therefore
-        *  valid for any given call.
-        *
-        *  XXX This code is in dire need of cleanup.
-        */
-   
-       /* SRF support */
-       HV         *ret_hv = NULL;
-       AV         *ret_av = NULL;
+       /* set of tuples */
+       AV         *ret_av = (AV *) SvRV(perlret);
        FuncCallContext *funcctx;
-       int         call_cntr;
-       int         max_calls;
        TupleDesc   tupdesc;
        AttInMetadata *attinmeta;
-       bool        isset;
-       char      **values = NULL;
-       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-
-       isset = plperl_is_set(perlret);
-
-       if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
-           ret_hv = (HV *) SvRV(perlret);
-       else
-           ret_av = (AV *) SvRV(perlret);
 
        if (SRF_IS_FIRSTCALL())
        {
            MemoryContext oldcontext;
-           int         i;
 
            funcctx = SRF_FIRSTCALL_INIT();
 
-           oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
-           if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
-           {
-               if (isset)
-                   funcctx->max_calls = hv_iterinit(ret_hv);
-               else
-                   funcctx->max_calls = 1;
-           }
-           else
-           {
-               if (isset)
-                   funcctx->max_calls = av_len(ret_av) + 1;
-               else
-                   funcctx->max_calls = 1;
-           }
-
-           tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
-
-           g_attr_num = tupdesc->natts;
+           funcctx->user_fctx = (void *) perlret;
 
-           for (i = 0; i < tupdesc->natts; i++)
-               av_store(g_column_keys, i + 1,
-                        newSVpv(SPI_fname(tupdesc, i+1), 0));
+           funcctx->max_calls = av_len(ret_av) + 1;
 
-           attinmeta = TupleDescGetAttInMetadata(tupdesc);
-           funcctx->attinmeta = attinmeta;
+           /* Cache a copy of the result's tupdesc and attinmeta */
+           oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+           tupdesc = get_function_tupdesc(prodesc->result_oid,
+                                       (ReturnSetInfo *) fcinfo->resultinfo);
+           tupdesc = CreateTupleDescCopy(tupdesc);
+           funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
            MemoryContextSwitchTo(oldcontext);
        }
 
        funcctx = SRF_PERCALL_SETUP();
-       call_cntr = funcctx->call_cntr;
-       max_calls = funcctx->max_calls;
        attinmeta = funcctx->attinmeta;
        tupdesc = attinmeta->tupdesc;
 
-       if (call_cntr < max_calls)
+       if (funcctx->call_cntr < funcctx->max_calls)
        {
+           SV        **svp;
+           HV         *row_hv;
+           char      **values;
            HeapTuple   tuple;
-           Datum       result;
            int         i;
-           char       *column_key;
-           char       *elem;
-
-           if (isset)
-           {
-               HV         *row_hv;
-               SV        **svp;
-
-               svp = av_fetch(ret_av, call_cntr, FALSE);
 
-               row_hv = (HV *) SvRV(*svp);
+           svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
 
-               values = (char **) palloc(g_attr_num * sizeof(char *));
+           if (SvTYPE(*svp) != SVt_RV)
+               elog(ERROR, "plperl: check your return value structure");
+           row_hv = (HV *) SvRV(*svp);
 
-               for (i = 0; i < g_attr_num; i++)
-               {
-                   column_key = plperl_get_key(g_column_keys, i + 1);
-                   elem = plperl_get_elem(row_hv, column_key);
-                   if (elem)
-                       values[i] = elem;
-                   else
-                       values[i] = NULL;
-               }
-           }
-           else
+           values = (char **) palloc(tupdesc->natts * sizeof(char *));
+           for (i = 0; i < tupdesc->natts; i++)
            {
-               int         i;
+               char       *column_key;
 
-               values = (char **) palloc(g_attr_num * sizeof(char *));
-               for (i = 0; i < g_attr_num; i++)
-               {
-                   column_key = SPI_fname(tupdesc, i + 1);
-                   elem = plperl_get_elem(ret_hv, column_key);
-                   if (elem)
-                       values[i] = elem;
-                   else
-                       values[i] = NULL;
-               }
+               column_key = SPI_fname(tupdesc, i + 1);
+               values[i] = plperl_get_elem(row_hv, column_key);
            }
            tuple = BuildTupleFromCStrings(attinmeta, values);
-           result = HeapTupleGetDatum(tuple);
-           SRF_RETURN_NEXT(funcctx, result);
+           retval = HeapTupleGetDatum(tuple);
+           SRF_RETURN_NEXT(funcctx, retval);
        }
        else
        {
@@ -1095,95 +967,91 @@ plperl_func_handler(PG_FUNCTION_ARGS)
            SRF_RETURN_DONE(funcctx);
        }
    }
-   else if (prodesc->fn_retisset)      /* set of non-tuples */
+   else if (prodesc->fn_retisset)
    {
+       /* set of non-tuples */
+       AV         *ret_av = (AV *) SvRV(perlret);
        FuncCallContext *funcctx;
 
        if (SRF_IS_FIRSTCALL())
        {
-           MemoryContext oldcontext;
-
            funcctx = SRF_FIRSTCALL_INIT();
-           oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-           funcctx->max_calls = av_len((AV *) SvRV(perlret)) + 1;
+           funcctx->user_fctx = (void *) perlret;
+
+           funcctx->max_calls = av_len(ret_av) + 1;
        }
 
        funcctx = SRF_PERCALL_SETUP();
 
        if (funcctx->call_cntr < funcctx->max_calls)
        {
-           Datum       result;
-           AV         *array;
            SV        **svp;
 
-           array = (AV *) SvRV(perlret);
-           svp = av_fetch(array, funcctx->call_cntr, FALSE);
+           svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
 
            if (SvTYPE(*svp) != SVt_NULL)
            {
+               char       *val = SvPV(*svp, PL_na);
+
                fcinfo->isnull = false;
-               result = FunctionCall3(&prodesc->result_in_func,
-                                      PointerGetDatum(SvPV(*svp, PL_na)),
+               retval = FunctionCall3(&prodesc->result_in_func,
+                                      PointerGetDatum(val),
                            ObjectIdGetDatum(prodesc->result_typioparam),
                                       Int32GetDatum(-1));
            }
            else
            {
                fcinfo->isnull = true;
-               result = (Datum) 0;
+               retval = (Datum) 0;
            }
-           SRF_RETURN_NEXT(funcctx, result);
+           SRF_RETURN_NEXT(funcctx, retval);
        }
        else
        {
-           if (perlret)
-               SvREFCNT_dec(perlret);
+           SvREFCNT_dec(perlret);
            SRF_RETURN_DONE(funcctx);
        }
    }
-   else if (!fcinfo->isnull)   /* non-null singleton */
+   else if (prodesc->fn_retistuple)
    {
-       if (prodesc->fn_retistuple)     /* singleton perl hash to Datum */
+       /* singleton perl hash to Datum */
+       HV         *perlhash = (HV *) SvRV(perlret);
+       TupleDesc   td;
+       int         i;
+       char      **values;
+       AttInMetadata *attinmeta;
+       HeapTuple   tup;
+
+       /*
+        * XXX should cache the attinmetadata instead of recomputing
+        */
+       td = get_function_tupdesc(prodesc->result_oid,
+                                 (ReturnSetInfo *) fcinfo->resultinfo);
+       /* td = CreateTupleDescCopy(td); */
+       attinmeta = TupleDescGetAttInMetadata(td);
+
+       values = (char **) palloc(td->natts * sizeof(char *));
+       for (i = 0; i < td->natts; i++)
        {
-           TupleDesc   td = lookup_rowtype_tupdesc(prodesc->ret_oid, (int32) -1);
-           HV         *perlhash = (HV *) SvRV(perlret);
-           int         i;
-           char      **values;
-           char       *key,
-                      *val;
-           AttInMetadata *attinmeta;
-           HeapTuple   tup;
-
-           if (!td)
-               ereport(ERROR,
-                       (errcode(ERRCODE_SYNTAX_ERROR),
-                        errmsg("no TupleDesc info available")));
-
-           values = (char **) palloc(td->natts * sizeof(char *));
-           for (i = 0; i < td->natts; i++)
-           {
+           char       *key;
 
-               key = SPI_fname(td, i + 1);
-               val = plperl_get_elem(perlhash, key);
-               if (val)
-                   values[i] = val;
-               else
-                   values[i] = NULL;
-           }
-           attinmeta = TupleDescGetAttInMetadata(td);
-           tup = BuildTupleFromCStrings(attinmeta, values);
-           retval = HeapTupleGetDatum(tup);
+           key = SPI_fname(td, i + 1);
+           values[i] = plperl_get_elem(perlhash, key);
        }
-       else
-           /* perl string to Datum */
-           retval = FunctionCall3(&prodesc->result_in_func,
-                                  PointerGetDatum(SvPV(perlret, PL_na)),
-                           ObjectIdGetDatum(prodesc->result_typioparam),
-                                  Int32GetDatum(-1));
+       tup = BuildTupleFromCStrings(attinmeta, values);
+       retval = HeapTupleGetDatum(tup);
+   }
+   else
+   {
+       /* perl string to Datum */
+       char       *val = SvPV(perlret, PL_na);
+
+       retval = FunctionCall3(&prodesc->result_in_func,
+                              CStringGetDatum(val),
+                              ObjectIdGetDatum(prodesc->result_typioparam),
+                              Int32GetDatum(-1));
    }
-   else        /* null singleton */
-       retval = (Datum) 0;
 
    SvREFCNT_dec(perlret);
    return retval;
@@ -1202,6 +1070,10 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
    SV         *svTD;
    HV         *hvTD;
 
+   /* Connect to SPI manager */
+   if (SPI_connect() != SPI_OK_CONNECT)
+       elog(ERROR, "could not connect to SPI manager");
+
    /* Find or compile the function */
    prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, true);
 
@@ -1248,7 +1120,6 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
    {
        if (!fcinfo->isnull)
        {
-
            HeapTuple   trv;
 
            if (strcasecmp(tmp, "SKIP") == 0)
@@ -1441,17 +1312,10 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
                }
            }
 
-           prodesc->fn_retisset = procStruct->proretset;       /* true, if function
-                                                                * returns set */
-
-           if (typeStruct->typtype == 'c' || procStruct->prorettype == RECORDOID)
-           {
-               prodesc->fn_retistuple = true;
-               prodesc->ret_oid =
-                   procStruct->prorettype == RECORDOID ?
-                   typeStruct->typrelid :
-                   procStruct->prorettype;
-           }
+           prodesc->result_oid = procStruct->prorettype;
+           prodesc->fn_retisset = procStruct->proretset;
+           prodesc->fn_retistuple = (typeStruct->typtype == 'c' ||
+                                     procStruct->prorettype == RECORDOID);
 
            perm_fmgr_info(typeStruct->typinput, &(prodesc->result_in_func));
            prodesc->result_typioparam = getTypeIOParam(typeTup);
@@ -1509,7 +1373,6 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
         * create the text of the anonymous subroutine.
         * we do not use a named subroutine so that we can call directly
         * through the reference.
-        *
         ************************************************************/
        prosrcdatum = SysCacheGetAttr(PROCOID, procTup,
                                      Anum_pg_proc_prosrc, &isnull);