Substantial rewrite of async.c to avoid problems with non-reentrant stdio
authorTom Lane
Tue, 6 Oct 1998 02:40:09 +0000 (02:40 +0000)
committerTom Lane
Tue, 6 Oct 1998 02:40:09 +0000 (02:40 +0000)
and possibly other problems.  Minor changes in xact.c and postgres.c's
main loop to support new handling of async NOTIFY.

src/backend/access/transam/xact.c
src/backend/commands/async.c
src/backend/tcop/postgres.c
src/backend/utils/misc/trace.c
src/include/access/xact.h
src/include/commands/async.h
src/include/utils/trace.h

index d082c805cd7a9fb1a59f1d8d815783bebfc161db..24db5befb9e791e3d60a05ea44d24850c7844e4a 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.23 1998/09/01 04:27:19 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.24 1998/10/06 02:39:58 tgl Exp $
  *
  * NOTES
  *     Transaction aborts can now occur two ways:
@@ -901,6 +901,9 @@ CommitTransaction()
    /* handle commit for large objects [ PA, 7/17/98 ] */
    _lo_commit();
 
+   /* NOTIFY commit must also come before lower-level cleanup */
+   AtCommit_Notify();
+
    CloseSequences();
    DestroyTempRels();
    AtEOXact_portals();
@@ -916,10 +919,6 @@ CommitTransaction()
     * ----------------
     */
    s->state = TRANS_DEFAULT;
-   {                           /* want this after commit */
-       if (IsNormalProcessingMode())
-           Async_NotifyAtCommit();
-   }
 
    /*
     * Let others to know about no transaction in progress - vadim
@@ -967,6 +966,7 @@ AbortTransaction()
     *  do abort processing
     * ----------------
     */
+   AtAbort_Notify();
    CloseSequences();
    AtEOXact_portals();
    RecordTransactionAbort();
@@ -982,17 +982,6 @@ AbortTransaction()
     * ----------------
     */
    s->state = TRANS_DEFAULT;
-   {
-
-       /*
-        * We need to do this in case another process notified us while we
-        * are in the middle of an aborted transaction.  We need to notify
-        * our frontend after we finish the current transaction. -- jw,
-        * 1/3/94
-        */
-       if (IsNormalProcessingMode())
-           Async_NotifyAtAbort();
-   }
 }
 
 /* --------------------------------
@@ -1455,6 +1444,30 @@ UserAbortTransactionBlock()
    s->blockState = TBLOCK_ENDABORT;
 }
 
+/* --------------------------------
+ *     AbortOutOfAnyTransaction
+ *
+ * This routine is provided for error recovery purposes.  It aborts any
+ * active transaction or transaction block, leaving the system in a known
+ * idle state.
+ * --------------------------------
+ */
+void
+AbortOutOfAnyTransaction()
+{
+   TransactionState s = CurrentTransactionState;
+
+   /*
+    * Get out of any low-level transaction
+    */
+   if (s->state != TRANS_DEFAULT)
+       AbortTransaction();
+   /*
+    * Now reset the high-level state
+    */
+   s->blockState = TBLOCK_DEFAULT;
+}
+
 bool
 IsTransactionBlock()
 {
index 12121977177e61f973fe37dd8847403cb6c4d145..a8c447cb077b9cbfac6c8aa233f32a2585235fe3 100644 (file)
@@ -1,38 +1,79 @@
 /*-------------------------------------------------------------------------
  *
  * async.c--
- *   Asynchronous notification
+ *   Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.40 1998/09/01 04:27:42 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.41 1998/10/06 02:39:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
-/* New Async Notification Model:
+
+/*-------------------------------------------------------------------------
+ * New Async Notification Model:
  * 1. Multiple backends on same machine.  Multiple backends listening on
- *   one relation.
- *
- * 2. One of the backend does a 'notify '.  For all backends that
- *   are listening to this relation (all notifications take place at the
- *   end of commit),
- *   2.a  If the process is the same as the backend process that issued
- *        notification (we are notifying something that we are listening),
- *        signal the corresponding frontend over the comm channel.
- *   2.b  For all other listening processes, we send kill(SIGUSR2) to wake up
- *        the listening backend.
- * 3. Upon receiving a kill(SIGUSR2) signal from another backend process
- *   notifying that one of the relation that we are listening is being
- *   notified, we can be in either of two following states:
- *   3.a  We are sleeping, wake up and signal our frontend.
- *   3.b  We are in middle of another transaction, wait until the end of
- *        of the current transaction and signal our frontend.
- * 4. Each frontend receives this notification and processes accordingly.
- *
- * -- jw, 12/28/93
- *
+ *   one relation.  (Note: "listening on a relation" is not really the
+ *   right way to think about it, since the notify names need not have
+ *   anything to do with the names of relations actually in the database.
+ *   But this terminology is all over the code and docs, and I don't feel
+ *   like trying to replace it.)
+ *
+ * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
+ *   ie, each relname/listenerPID pair.  The "notification" field of the
+ *   tuple is zero when no NOTIFY is pending for that listener, or the PID
+ *   of the originating backend when a cross-backend NOTIFY is pending.
+ *   (We skip writing to pg_listener when doing a self-NOTIFY, so the
+ *   notification field should never be equal to the listenerPID field.)
+ *
+ * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
+ *   relname to a list of outstanding NOTIFY requests.  Actual processing
+ *   happens if and only if we reach transaction commit.  At that time (in
+ *   routine AtCommit_Notify) we scan pg_listener for matching relnames.
+ *    If the listenerPID in a matching tuple is ours, we just send a notify
+ *   message to our own front end.  If it is not ours, and "notification"
+ *   is not already nonzero, we set notification to our own PID and send a
+ *   SIGUSR2 signal to the receiving process (indicated by listenerPID).
+ *   BTW: if the signal operation fails, we presume that the listener backend
+ *    crashed without removing this tuple, and remove the tuple for it.
+ *
+ * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
+ *   notify processing immediately if this backend is idle (ie, it is
+ *   waiting for a frontend command and is not within a transaction block).
+ *    Otherwise the handler may only set a flag, which will cause the
+ *   processing to occur just before we next go idle.
+ *
+ * 5. Inbound-notify processing consists of scanning pg_listener for tuples
+ *   matching our own listenerPID and having nonzero notification fields.
+ *   For each such tuple, we send a message to our frontend and clear the
+ *   notification field.  BTW: this routine has to start/commit its own
+ *   transaction, since by assumption it is only called from outside any
+ *   transaction.
+ *
+ * Note that the system's use of pg_listener is confined to very short
+ * intervals at the end of a transaction that contains NOTIFY statements,
+ * or during the transaction caused by an inbound SIGUSR2.  So the fact that
+ * pg_listener is a global resource shouldn't cause too much performance
+ * problem.  But application authors ought to be discouraged from doing
+ * LISTEN or UNLISTEN near the start of a long transaction --- that would
+ * result in holding the pg_listener write lock for a long time, possibly
+ * blocking unrelated activity.  It could even lead to deadlock against another
+ * transaction that touches the same user tables and then tries to NOTIFY.
+ * Probably best to do LISTEN or UNLISTEN outside of transaction blocks.
+ *
+ * An application that listens on the same relname it notifies will get
+ * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
+ * by comparing be_pid in the NOTIFY message to the application's own backend's
+ * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
+ * frontend during startup.)  The above design guarantees that notifies from
+ * other backends will never be missed by ignoring self-notifies.  Note,
+ * however, that we do *not* guarantee that a separate frontend message will
+ * be sent for every outside NOTIFY.  Since there is only room for one
+ * originating PID in pg_listener, outside notifies occurring at about the
+ * same time may be collapsed into a single message bearing the PID of the
+ * first outside backend to perform the NOTIFY.
+ *-------------------------------------------------------------------------
  */
 
 #include 
 
 #include "postgres.h"
 
+#include "commands/async.h"
 #include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/xact.h"
 #include "catalog/catname.h"
 #include "catalog/pg_listener.h"
-#include "commands/async.h"
 #include "fmgr.h"
 #include "lib/dllist.h"
 #include "libpq/libpq.h"
 #include "miscadmin.h"
-#include "nodes/memnodes.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "tcop/dest.h"
-#include "utils/mcxt.h"
 #include "utils/syscache.h"
 #include 
 #include 
 
-#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
-#define NotifyHack  pg_options[OPT_NOTIFYHACK]
-
+/* stuff that we really ought not be touching directly :-( */
 extern TransactionState CurrentTransactionState;
 extern CommandDest whereToSendOutput;
 
-GlobalMemory notifyContext = NULL;
-
-static int notifyFrontEndPending = 0;
-static int notifyIssued = 0;
+/*
+ * State for outbound notifies consists of a list of all relnames NOTIFYed
+ * in the current transaction.  We do not actually perform a NOTIFY until
+ * and unless the transaction commits.  pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current transaction.
+ */
 static Dllist *pendingNotifies = NULL;
 
-static int AsyncExistsPendingNotify(char *);
-static void ClearPendingNotify(void);
-static void Async_NotifyFrontEnd(void);
-static void Async_NotifyFrontEnd_Aux(void);
-void       Async_Unlisten(char *relname, int pid);
-static void Async_UnlistenOnExit(int code, char *relname);
-static void Async_UnlistenAll(void);
-
 /*
- *--------------------------------------------------------------
- * Async_NotifyHandler --
- *
- *     This is the signal handler for SIGUSR2.  When the backend
- *     is signaled, the backend can be in two states.
- *     1. If the backend is in the middle of another transaction,
- *        we set the flag, notifyFrontEndPending, and wait until
- *        the end of the transaction to notify the front end.
- *     2. If the backend is not in the middle of another transaction,
- *        we notify the front end immediately.
- *
- *     -- jw, 12/28/93
- * Results:
- *     none
- *
- * Side effects:
- *     none
+ * State for inbound notifies consists of two flags: one saying whether
+ * the signal handler is currently allowed to call ProcessIncomingNotify
+ * directly, and one saying whether the signal has occurred but the handler
+ * was not allowed to call ProcessIncomingNotify at the time.
+ *
+ * NB: the "volatile" on these declarations is critical!  If your compiler
+ * does not grok "volatile", you'd be best advised to compile this file
+ * with all optimization turned off.
  */
-void
-Async_NotifyHandler(SIGNAL_ARGS)
-{
-   TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
+static volatile int    notifyInterruptEnabled = 0;
+static volatile int    notifyInterruptOccurred = 0;
 
-   if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-       (CurrentTransactionState->blockState == TRANS_DEFAULT))
-   {
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
-               "waking up sleeping backend process");
-       PS_SET_STATUS("async_notify");
-       Async_NotifyFrontEnd();
-       PS_SET_STATUS("idle");
-   }
-   else
-   {
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
-            "process in middle of transaction, state=%d, blockstate=%d",
-               CurrentTransactionState->state,
-               CurrentTransactionState->blockState);
-       notifyFrontEndPending = 1;
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
-   }
+/* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
+static int unlistenExitRegistered = 0;
+
+
+static void Async_UnlistenAll(void);
+static void Async_UnlistenOnExit(void);
+static void ProcessIncomingNotify(void);
+static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
+static int AsyncExistsPendingNotify(char *relname);
+static void ClearPendingNotifies(void);
 
-   TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
-}
 
 /*
  *--------------------------------------------------------------
@@ -136,253 +146,40 @@ Async_NotifyHandler(SIGNAL_ARGS)
  *     This is executed by the SQL notify command.
  *
  *     Adds the relation to the list of pending notifies.
- *     All notification happens at end of commit.
- *     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- *
- *     All notification of backend processes happens here,
- *     then each backend notifies its corresponding front end at
- *     the end of commit.
- *
- *     -- jw, 12/28/93
+ *     Actual notification happens during transaction commit.
+ *     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  *
  * Results:
  *     XXX
  *
- * Side effects:
- *     All tuples for relname in pg_listener are updated.
- *
  *--------------------------------------------------------------
  */
 void
 Async_Notify(char *relname)
 {
-
-   HeapTuple   lTuple,
-               rTuple;
-   Relation    lRel;
-   HeapScanDesc sRel;
-   TupleDesc   tdesc;
-   ScanKeyData key;
-   Datum       d,
-               value[3];
-   bool        isnull;
-   char        repl[3],
-               nulls[3];
-
    char       *notifyName;
 
    TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
 
-   if (!pendingNotifies)
-       pendingNotifies = DLNewList();
-
    /*
-    * Allocate memory from the global malloc pool because it needs to be
-    * referenced also when the transaction is finished.  DZ - 26-08-1996
+    * We allocate list memory from the global malloc pool to ensure that
+    * it will live until we want to use it.  This is probably not necessary
+    * any longer, since we will use it before the end of the transaction.
+    * DLList only knows how to use malloc() anyway, but we could probably
+    * palloc() the strings...
     */
+   if (!pendingNotifies)
+       pendingNotifies = DLNewList();
    notifyName = strdup(relname);
    DLAddHead(pendingNotifies, DLNewElem(notifyName));
-
-   ScanKeyEntryInitialize(&key, 0,
-                          Anum_pg_listener_relname,
-                          F_NAMEEQ,
-                          PointerGetDatum(notifyName));
-
-   lRel = heap_openr(ListenerRelationName);
-   tdesc = RelationGetDescr(lRel);
-   RelationSetLockForWrite(lRel);
-   sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
-
-   nulls[0] = nulls[1] = nulls[2] = ' ';
-   repl[0] = repl[1] = repl[2] = ' ';
-   repl[Anum_pg_listener_notify - 1] = 'r';
-   value[0] = value[1] = value[2] = (Datum) 0;
-   value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
-
-   while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
-   {
-       d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
-       if (!DatumGetInt32(d))
-       {
-           rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
-           heap_replace(lRel, &lTuple->t_ctid, rTuple);
-           /* notify is really issued only if a tuple has been changed */
-           notifyIssued = 1;
-       }
-   }
-   heap_endscan(sRel);
-
    /*
-    * Note: if the write lock is unset we can get multiple tuples with
-    * same oid if other backends notify the same relation. Use this
-    * option at your own risk.
+    * NOTE: we could check to see if pendingNotifies already has an entry
+    * for relname, and thus avoid making duplicate entries.  However, most
+    * apps probably don't notify the same name multiple times per transaction,
+    * so we'd likely just be wasting cycles to make such a check.
+    * AsyncExistsPendingNotify() doesn't really care whether the list
+    * contains duplicates...
     */
-   if (NotifyUnlock)
-       RelationUnsetLockForWrite(lRel);
-
-   heap_close(lRel);
-
-   TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtCommit --
- *
- *     This is called at transaction commit.
- *
- *     Signal our corresponding frontend process on relations that
- *     were notified.  Signal all other backend process that
- *     are listening also.
- *
- *     -- jw, 12/28/93
- *
- * Results:
- *     XXX
- *
- * Side effects:
- *     Tuples in pg_listener that has our listenerpid are updated so
- *     that the notification is 0.  We do not want to notify frontend
- *     more than once.
- *
- *     -- jw, 12/28/93
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtCommit()
-{
-   HeapTuple   lTuple;
-   Relation    lRel;
-   HeapScanDesc sRel;
-   TupleDesc   tdesc;
-   ScanKeyData key;
-   Datum       d;
-   bool        isnull;
-   extern TransactionState CurrentTransactionState;
-
-   if (!pendingNotifies)
-       pendingNotifies = DLNewList();
-
-   if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-       (CurrentTransactionState->blockState == TRANS_DEFAULT))
-   {
-       if (notifyIssued)
-       {
-           /* 'notify ' issued by us */
-           notifyIssued = 0;
-           StartTransactionCommand();
-           TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
-           ScanKeyEntryInitialize(&key, 0,
-                                  Anum_pg_listener_notify,
-                                  F_INT4EQ,
-                                  Int32GetDatum(1));
-           lRel = heap_openr(ListenerRelationName);
-           RelationSetLockForWrite(lRel);
-           sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
-           tdesc = RelationGetDescr(lRel);
-
-           while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
-           {
-               d = heap_getattr(lTuple, Anum_pg_listener_relname,
-                                tdesc, &isnull);
-
-               if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
-               {
-                   d = heap_getattr(lTuple, Anum_pg_listener_pid,
-                                    tdesc, &isnull);
-
-                   if (MyProcPid == DatumGetInt32(d))
-                   {
-                       notifyFrontEndPending = 1;
-                       TPRINTF(TRACE_NOTIFY,
-                               "Async_NotifyAtCommit: notifying self");
-                   }
-                   else
-                   {
-                       TPRINTF(TRACE_NOTIFY,
-                               "Async_NotifyAtCommit: notifying pid %d",
-                               DatumGetInt32(d));
-#ifdef HAVE_KILL
-                       if (kill(DatumGetInt32(d), SIGUSR2) < 0)
-                       {
-                           if (errno == ESRCH)
-                               heap_delete(lRel, &lTuple->t_ctid);
-                       }
-#endif
-                   }
-               }
-           }
-           heap_endscan(sRel);
-           heap_close(lRel);
-
-           /*
-            * Notify the frontend inside the current transaction while we
-            * still have a valid write lock on pg_listeners. This avoid
-            * waiting until all other backends have finished with
-            * pg_listener.
-            */
-           if (notifyFrontEndPending)
-           {
-               /* The aux version is called inside transaction */
-               Async_NotifyFrontEnd_Aux();
-           }
-
-           TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
-           CommitTransactionCommand();
-       }
-       else
-       {
-
-           /*
-            * No notifies issued by us. If notifyFrontEndPending has been
-            * set by Async_NotifyHandler notify the frontend of pending
-            * notifies from other backends.
-            */
-           if (notifyFrontEndPending)
-               Async_NotifyFrontEnd();
-       }
-
-       ClearPendingNotify();
-   }
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtAbort --
- *
- *     This is called at transaction commit.
- *
- *     Gets rid of pending notifies.  List elements are automatically
- *     freed through memory context.
- *
- *
- * Results:
- *     XXX
- *
- * Side effects:
- *     XXX
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtAbort()
-{
-   if (pendingNotifies)
-   {
-       ClearPendingNotify();
-       DLFreeList(pendingNotifies);
-   }
-   pendingNotifies = DLNewList();
-   notifyIssued = 0;
-
-   if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-       (CurrentTransactionState->blockState == TRANS_DEFAULT))
-   {
-       /* don't forget to notify front end */
-       if (notifyFrontEndPending)
-           Async_NotifyFrontEnd();
-   }
 }
 
 /*
@@ -394,108 +191,94 @@ Async_NotifyAtAbort()
  *     Register a backend (identified by its Unix PID) as listening
  *     on the specified relation.
  *
- *     One listener per relation, pg_listener relation is keyed
- *     on (relname,pid) to provide multiple listeners in future.
- *
  * Results:
- *     pg_listeners is updated.
+ *     XXX
  *
  * Side effects:
- *     XXX
+ *     pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
 Async_Listen(char *relname, int pid)
 {
-   Datum       values[Natts_pg_listener];
-   char        nulls[Natts_pg_listener];
+   Relation    lRel;
    TupleDesc   tdesc;
    HeapScanDesc scan;
    HeapTuple   tuple,
                newtup;
-   Relation    lDesc;
+   Datum       values[Natts_pg_listener];
+   char        nulls[Natts_pg_listener];
    Datum       d;
    int         i;
    bool        isnull;
    int         alreadyListener = 0;
-   char       *relnamei;
    TupleDesc   tupDesc;
 
-   if (whereToSendOutput != Remote)
-   {
-       elog(NOTICE, "Async_Listen: "
-            "listen not available on interactive sessions");
-       return;
-   }
-
    TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
-   for (i = 0; i < Natts_pg_listener; i++)
-   {
-       nulls[i] = ' ';
-       values[i] = PointerGetDatum(NULL);
-   }
 
-   i = 0;
-   values[i++] = (Datum) relname;
-   values[i++] = (Datum) pid;
-   values[i++] = (Datum) 0;    /* no notifies pending */
-
-   lDesc = heap_openr(ListenerRelationName);
-   RelationSetLockForWrite(lDesc);
+   lRel = heap_openr(ListenerRelationName);
+   RelationSetLockForWrite(lRel);
+   tdesc = RelationGetDescr(lRel);
 
-   /* is someone already listening.  One listener per relation */
-   tdesc = RelationGetDescr(lDesc);
-   scan = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
+   /* Detect whether we are already listening on this relname */
+   scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
    while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
    {
-       d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc,
-                        &isnull);
-       relnamei = DatumGetPointer(d);
-       if (!strncmp(relnamei, relname, NAMEDATALEN))
+       d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
+       if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
        {
            d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
-           pid = DatumGetInt32(d);
-           if (pid == MyProcPid)
+           if (DatumGetInt32(d) == pid)
+           {
                alreadyListener = 1;
-       }
-       if (alreadyListener)
-       {
-           /* No need to scan the rest of the table */
-           break;
+               /* No need to scan the rest of the table */
+               break;
+           }
        }
    }
    heap_endscan(scan);
 
    if (alreadyListener)
    {
-       elog(NOTICE, "Async_Listen: We are already listening on %s",
-            relname);
-       RelationUnsetLockForWrite(lDesc);
-       heap_close(lDesc);
+       elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
+       RelationUnsetLockForWrite(lRel);
+       heap_close(lRel);
        return;
    }
 
-   tupDesc = lDesc->rd_att;
-   newtup = heap_formtuple(tupDesc, values, nulls);
-   heap_insert(lDesc, newtup);
-   pfree(newtup);
-
    /*
-    * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
-    * listener on %s (possibly dead)",relname); }
+    * OK to insert a new tuple
     */
 
-   RelationUnsetLockForWrite(lDesc);
-   heap_close(lDesc);
+   for (i = 0; i < Natts_pg_listener; i++)
+   {
+       nulls[i] = ' ';
+       values[i] = PointerGetDatum(NULL);
+   }
+
+   i = 0;
+   values[i++] = (Datum) relname;
+   values[i++] = (Datum) pid;
+   values[i++] = (Datum) 0;    /* no notifies pending */
+
+   tupDesc = lRel->rd_att;
+   newtup = heap_formtuple(tupDesc, values, nulls);
+   heap_insert(lRel, newtup);
+   pfree(newtup);
+
+   RelationUnsetLockForWrite(lRel);
+   heap_close(lRel);
 
    /*
-    * now that we are listening, we should make a note to ourselves to
-    * unlisten prior to dying.
+    * now that we are listening, make sure we will unlisten before dying.
     */
-   relnamei = malloc(NAMEDATALEN);     /* persists to process exit */
-   StrNCpy(relnamei, relname, NAMEDATALEN);
-   on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
+   if (! unlistenExitRegistered)
+   {
+       if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
+           elog(NOTICE, "Async_Listen: out of shmem_exit slots");
+       unlistenExitRegistered = 1;
+   }
 }
 
 /*
@@ -508,17 +291,17 @@ Async_Listen(char *relname, int pid)
  *     for the specified relation.
  *
  * Results:
- *     pg_listeners is updated.
+ *     XXX
  *
  * Side effects:
- *     XXX
+ *     pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
 Async_Unlisten(char *relname, int pid)
 {
-   Relation    lDesc;
+   Relation    lRel;
    HeapTuple   lTuple;
 
    /* Handle specially the `unlisten "*"' command */
@@ -530,17 +313,21 @@ Async_Unlisten(char *relname, int pid)
 
    TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
 
+   /* Note we assume there can be only one matching tuple. */
    lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
                                 Int32GetDatum(pid),
                                 0, 0);
    if (lTuple != NULL)
    {
-       lDesc = heap_openr(ListenerRelationName);
-       RelationSetLockForWrite(lDesc);
-       heap_delete(lDesc, &lTuple->t_ctid);
-       RelationUnsetLockForWrite(lDesc);
-       heap_close(lDesc);
+       lRel = heap_openr(ListenerRelationName);
+       RelationSetLockForWrite(lRel);
+       heap_delete(lRel, &lTuple->t_ctid);
+       RelationUnsetLockForWrite(lRel);
+       heap_close(lRel);
    }
+   /* We do not complain about unlistening something not being listened;
+    * should we?
+    */
 }
 
 /*
@@ -549,187 +336,487 @@ Async_Unlisten(char *relname, int pid)
  *
  *     Unlisten all relations for this backend.
  *
+ *     This is invoked by UNLISTEN "*" command, and also at backend exit.
+ *
  * Results:
- *     pg_listeners is updated.
+ *     XXX
  *
  * Side effects:
- *     XXX
+ *     pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 static void
 Async_UnlistenAll()
 {
-   HeapTuple   lTuple;
    Relation    lRel;
-   HeapScanDesc sRel;
    TupleDesc   tdesc;
+   HeapScanDesc sRel;
+   HeapTuple   lTuple;
    ScanKeyData key[1];
 
    TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+
+   lRel = heap_openr(ListenerRelationName);
+   RelationSetLockForWrite(lRel);
+   tdesc = RelationGetDescr(lRel);
+
+   /* Find and delete all entries with my listenerPID */
    ScanKeyEntryInitialize(&key[0], 0,
                           Anum_pg_listener_pid,
                           F_INT4EQ,
                           Int32GetDatum(MyProcPid));
-   lRel = heap_openr(ListenerRelationName);
-   RelationSetLockForWrite(lRel);
-   tdesc = RelationGetDescr(lRel);
    sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
 
    while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
        heap_delete(lRel, &lTuple->t_ctid);
+
    heap_endscan(sRel);
    RelationUnsetLockForWrite(lRel);
    heap_close(lRel);
-   TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
 }
 
 /*
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
  * Async_UnlistenOnExit --
  *
- *     This is called at backend exit for each registered listen.
+ *     Clean up the pg_listener table at backend exit.
+ *
+ *     This is executed if we have done any LISTENs in this backend.
+ *     It might not be necessary anymore, if the user UNLISTENed everything,
+ *     but we don't try to detect that case.
  *
  * Results:
  *     XXX
  *
- * --------------------------------------------------------------
- */
-static void
-Async_UnlistenOnExit(int code, /* from exitpg */
-                    char *relname)
-{
-   Async_Unlisten((char *) relname, MyProcPid);
-}
-
-/*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd --
- *
- *     This is called outside transactions. The real work is done
- *     by Async_NotifyFrontEnd_Aux().
+ * Side effects:
+ *     pg_listener is updated if necessary.
  *
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
  */
 static void
-Async_NotifyFrontEnd()
+Async_UnlistenOnExit()
 {
+   /*
+    * We need to start/commit a transaction for the unlisten,
+    * but if there is already an active transaction we had better
+    * abort that one first.  Otherwise we'd end up committing changes
+    * that probably ought to be discarded.
+    */
+   AbortOutOfAnyTransaction();
+   /* Now we can do the unlisten */
    StartTransactionCommand();
-   Async_NotifyFrontEnd_Aux();
+   Async_UnlistenAll();
    CommitTransactionCommand();
 }
 
 /*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd_Aux --
+ *--------------------------------------------------------------
+ * AtCommit_Notify --
  *
- *     This must be called inside a transaction block.
+ *     This is called at transaction commit.
  *
- *     Perform an asynchronous notification to front end over
- *     portal comm channel.  The name of the relation which contains the
- *     data is sent to the front end.
+ *     If there are outbound notify requests in the pendingNotifies list,
+ *     scan pg_listener for matching tuples, and either signal the other
+ *     backend or send a message to our own frontend.
  *
- *     We remove the notification flag from the pg_listener tuple
- *     associated with our process.
+ *     NOTE: we are still inside the current transaction, therefore can
+ *     piggyback on its committing of changes.
  *
  * Results:
  *     XXX
  *
- * --------------------------------------------------------------
+ * Side effects:
+ *     Tuples in pg_listener that have matching relnames and other peoples'
+ *     listenerPIDs are updated with a nonzero notification field.
+ *
+ *--------------------------------------------------------------
  */
-static void
-Async_NotifyFrontEnd_Aux()
+void
+AtCommit_Notify()
 {
-   HeapTuple   lTuple,
-               rTuple;
    Relation    lRel;
-   HeapScanDesc sRel;
    TupleDesc   tdesc;
-   ScanKeyData key[2];
+   HeapScanDesc sRel;
+   HeapTuple   lTuple,
+               rTuple;
    Datum       d,
-               value[3];
-   char        repl[3],
-               nulls[3];
+               value[Natts_pg_listener];
+   char        repl[Natts_pg_listener],
+               nulls[Natts_pg_listener];
    bool        isnull;
+   char       *relname;
+   int32       listenerPID;
 
-#define MAX_DONE 64
+   if (!pendingNotifies)
+       return;                 /* no NOTIFY statements in this transaction */
 
-   char       *done[MAX_DONE];
-   int         ndone = 0;
-   int         i;
+   /* NOTIFY is disabled if not normal processing mode.
+    * This test used to be in xact.c, but it seems cleaner to do it here.
+    */
+   if (! IsNormalProcessingMode())
+   {
+       ClearPendingNotifies();
+       return;
+   }
 
-   notifyFrontEndPending = 0;
+   TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
 
-   TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
-   StartTransactionCommand();
-   ScanKeyEntryInitialize(&key[0], 0,
-                          Anum_pg_listener_notify,
-                          F_INT4EQ,
-                          Int32GetDatum(1));
-   ScanKeyEntryInitialize(&key[1], 0,
-                          Anum_pg_listener_pid,
-                          F_INT4EQ,
-                          Int32GetDatum(MyProcPid));
    lRel = heap_openr(ListenerRelationName);
    RelationSetLockForWrite(lRel);
    tdesc = RelationGetDescr(lRel);
-   sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key);
+   sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
 
+   /* preset data to update notify column to MyProcPid */
    nulls[0] = nulls[1] = nulls[2] = ' ';
    repl[0] = repl[1] = repl[2] = ' ';
    repl[Anum_pg_listener_notify - 1] = 'r';
    value[0] = value[1] = value[2] = (Datum) 0;
-   value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
+   value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
 
    while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
    {
-       d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
-                        &isnull);
+       d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+       relname = (char *) DatumGetPointer(d);
 
-       /*
-        * This hack deletes duplicate tuples which can be left in the
-        * table if the NotifyUnlock option is set. I'm further
-        * investigating this.  -- dz
-        */
-       if (NotifyHack)
+       if (AsyncExistsPendingNotify(relname))
        {
-           for (i = 0; i < ndone; i++)
+           d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
+           listenerPID = DatumGetInt32(d);
+
+           if (listenerPID == MyProcPid)
+           {
+               /* Self-notify: no need to bother with table update.
+                * Indeed, we *must not* clear the notification field in
+                * this path, or we could lose an outside notify, which'd be
+                * bad for applications that ignore self-notify messages.
+                */
+               TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
+               NotifyMyFrontEnd(relname, listenerPID);
+           }
+           else
            {
-               if (strcmp(DatumGetName(d)->data, done[i]) == 0)
+               TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
+                       listenerPID);
+               /*
+                * If someone has already notified this listener,
+                * we don't bother modifying the table, but we do still send
+                * a SIGUSR2 signal, just in case that backend missed the
+                * earlier signal for some reason.  It's OK to send the signal
+                * first, because the other guy can't read pg_listener until
+                * we unlock it.
+                */
+#ifdef HAVE_KILL
+               if (kill(listenerPID, SIGUSR2) < 0)
                {
-                   TPRINTF(TRACE_NOTIFY,
-                           "Async_NotifyFrontEnd: duplicate %s",
-                           DatumGetName(d)->data);
+                   /* Get rid of pg_listener entry if it refers to a PID
+                    * that no longer exists.  Presumably, that backend
+                    * crashed without deleting its pg_listener entries.
+                    * This code used to only delete the entry if errno==ESRCH,
+                    * but as far as I can see we should just do it for any
+                    * failure (certainly at least for EPERM too...)
+                    */
                    heap_delete(lRel, &lTuple->t_ctid);
-                   continue;
                }
+               else
+#endif
+               {
+                   d = heap_getattr(lTuple, Anum_pg_listener_notify,
+                                    tdesc, &isnull);
+                   if (DatumGetInt32(d) == 0)
+                   {
+                       rTuple = heap_modifytuple(lTuple, lRel,
+                                                 value, nulls, repl);
+                       heap_replace(lRel, &lTuple->t_ctid, rTuple);
+                   }
+               }
+           }
+       }
+   }
+
+   heap_endscan(sRel);
+   /*
+    * We do not do RelationUnsetLockForWrite(lRel) here, because the
+    * transaction is about to be committed anyway.
+    */
+   heap_close(lRel);
+
+   ClearPendingNotifies();
+
+   TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
+}
+
+/*
+ *--------------------------------------------------------------
+ * AtAbort_Notify --
+ *
+ *     This is called at transaction abort.
+ *
+ *     Gets rid of pending outbound notifies that we would have executed
+ *     if the transaction got committed.
+ *
+ * Results:
+ *     XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+AtAbort_Notify()
+{
+   ClearPendingNotifies();
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_NotifyHandler --
+ *
+ *     This is the signal handler for SIGUSR2.
+ *
+ *     If we are idle (notifyInterruptEnabled is set), we can safely invoke
+ *     ProcessIncomingNotify directly.  Otherwise, just set a flag
+ *     to do it later.
+ *
+ * Results:
+ *     none
+ *
+ * Side effects:
+ *     per above
+ *--------------------------------------------------------------
+ */
+
+void
+Async_NotifyHandler(SIGNAL_ARGS)
+{
+   /*
+    * Note: this is a SIGNAL HANDLER.  You must be very wary what you do here.
+    * Some helpful soul had this routine sprinkled with TPRINTFs, which would
+    * likely lead to corruption of stdio buffers if they were ever turned on.
+    */
+
+   if (notifyInterruptEnabled)
+   {
+       /* I'm not sure whether some flavors of Unix might allow another
+        * SIGUSR2 occurrence to recursively interrupt this routine.
+        * To cope with the possibility, we do the same sort of dance that
+        * EnableNotifyInterrupt must do --- see that routine for comments.
+        */
+       notifyInterruptEnabled = 0;     /* disable any recursive signal */
+       notifyInterruptOccurred = 1;    /* do at least one iteration */
+       for (;;)
+       {
+           notifyInterruptEnabled = 1;
+           if (! notifyInterruptOccurred)
+               break;
+           notifyInterruptEnabled = 0;
+           if (notifyInterruptOccurred)
+           {
+               /* Here, it is finally safe to do stuff. */
+               TPRINTF(TRACE_NOTIFY,
+                       "Async_NotifyHandler: perform async notify");
+               ProcessIncomingNotify();
+               TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
            }
-           if (ndone < MAX_DONE)
-               done[ndone++] = pstrdup(DatumGetName(d)->data);
        }
+   }
+   else
+   {
+       /* In this path it is NOT SAFE to do much of anything, except this: */
+       notifyInterruptOccurred = 1;
+   }
+}
+
+/*
+ * --------------------------------------------------------------
+ * EnableNotifyInterrupt --
+ *
+ *     This is called by the PostgresMain main loop just before waiting
+ *     for a frontend command.  If we are truly idle (ie, *not* inside
+ *     a transaction block), then process any pending inbound notifies,
+ *     and enable the signal handler to process future notifies directly.
+ *
+ *     NOTE: the signal handler starts out disabled, and stays so until
+ *     PostgresMain calls this the first time.
+ * --------------------------------------------------------------
+ */
+
+void
+EnableNotifyInterrupt(void)
+{
+   if (CurrentTransactionState->blockState != TRANS_DEFAULT)
+       return;                 /* not really idle */
+
+   /*
+    * This code is tricky because we are communicating with a signal
+    * handler that could interrupt us at any point.  If we just checked
+    * notifyInterruptOccurred and then set notifyInterruptEnabled, we
+    * could fail to respond promptly to a signal that happens in between
+    * those two steps.  (A very small time window, perhaps, but Murphy's
+    * Law says you can hit it...)  Instead, we first set the enable flag,
+    * then test the occurred flag.  If we see an unserviced interrupt
+    * has occurred, we re-clear the enable flag before going off to do
+    * the service work.  (That prevents re-entrant invocation of
+    * ProcessIncomingNotify() if another interrupt occurs.)
+    * If an interrupt comes in between the setting and clearing of
+    * notifyInterruptEnabled, then it will have done the service
+    * work and left notifyInterruptOccurred zero, so we have to check
+    * again after clearing enable.  The whole thing has to be in a loop
+    * in case another interrupt occurs while we're servicing the first.
+    * Once we get out of the loop, enable is set and we know there is no
+    * unserviced interrupt.
+    *
+    * NB: an overenthusiastic optimizing compiler could easily break this
+    * code.  Hopefully, they all understand what "volatile" means these days.
+    */
+   for (;;)
+   {
+       notifyInterruptEnabled = 1;
+       if (! notifyInterruptOccurred)
+           break;
+       notifyInterruptEnabled = 0;
+       if (notifyInterruptOccurred)
+       {
+           TPRINTF(TRACE_NOTIFY,
+                   "EnableNotifyInterrupt: perform async notify");
+           ProcessIncomingNotify();
+           TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
+       }
+   }
+}
+
+/*
+ * --------------------------------------------------------------
+ * DisableNotifyInterrupt --
+ *
+ *     This is called by the PostgresMain main loop just after receiving
+ *     a frontend command.  Signal handler execution of inbound notifies
+ *     is disabled until the next EnableNotifyInterrupt call.
+ * --------------------------------------------------------------
+ */
+
+void
+DisableNotifyInterrupt(void)
+{
+   notifyInterruptEnabled = 0;
+}
+
+/*
+ * --------------------------------------------------------------
+ * ProcessIncomingNotify --
+ *
+ *     Deal with arriving NOTIFYs from other backends.
+ *     This is called either directly from the SIGUSR2 signal handler,
+ *     or the next time control reaches the outer idle loop.
+ *     Scan pg_listener for arriving notifies, report them to my front end,
+ *     and clear the notification field in pg_listener until next time.
+ *
+ *     NOTE: since we are outside any transaction, we must create our own.
+ *
+ * Results:
+ *     XXX
+ *
+ * --------------------------------------------------------------
+ */
+static void
+ProcessIncomingNotify(void)
+{
+   Relation    lRel;
+   TupleDesc   tdesc;
+   ScanKeyData key[1];
+   HeapScanDesc sRel;
+   HeapTuple   lTuple,
+               rTuple;
+   Datum       d,
+               value[Natts_pg_listener];
+   char        repl[Natts_pg_listener],
+               nulls[Natts_pg_listener];
+   bool        isnull;
+   char       *relname;
+   int32       sourcePID;
+
+   TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
+   PS_SET_STATUS("async_notify");
+
+   notifyInterruptOccurred = 0;
+
+   StartTransactionCommand();
 
-       rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
-       heap_replace(lRel, &lTuple->t_ctid, rTuple);
+   lRel = heap_openr(ListenerRelationName);
+   RelationSetLockForWrite(lRel);
+   tdesc = RelationGetDescr(lRel);
+
+   /* Scan only entries with my listenerPID */
+   ScanKeyEntryInitialize(&key[0], 0,
+                          Anum_pg_listener_pid,
+                          F_INT4EQ,
+                          Int32GetDatum(MyProcPid));
+   sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
 
-       /* notifying the front end */
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
-               DatumGetName(d)->data);
+   /* Prepare data for rewriting 0 into notification field */
+   nulls[0] = nulls[1] = nulls[2] = ' ';
+   repl[0] = repl[1] = repl[2] = ' ';
+   repl[Anum_pg_listener_notify - 1] = 'r';
+   value[0] = value[1] = value[2] = (Datum) 0;
+   value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
 
-       if (whereToSendOutput == Remote)
+   while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+   {
+       d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
+       sourcePID = DatumGetInt32(d);
+       if (sourcePID != 0)
        {
-           pq_putnchar("A", 1);
-           pq_putint((int32) MyProcPid, sizeof(int32));
-           pq_putstr(DatumGetName(d)->data);
-           pq_flush();
+           d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+           relname = (char *) DatumGetPointer(d);
+           /* Notify the frontend */
+           TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
+                   relname, (int) sourcePID);
+           NotifyMyFrontEnd(relname, sourcePID);
+           /* Rewrite the tuple with 0 in notification column */
+           rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
+           heap_replace(lRel, &lTuple->t_ctid, rTuple);
        }
    }
    heap_endscan(sRel);
-   RelationUnsetLockForWrite(lRel);
+   /*
+    * We do not do RelationUnsetLockForWrite(lRel) here, because the
+    * transaction is about to be committed anyway.
+    */
    heap_close(lRel);
 
-   TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
+   CommitTransactionCommand();
+
+   /* Must flush the notify messages to ensure frontend gets them promptly. */
+   pq_flush();
+
+   PS_SET_STATUS("idle");
+   TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
 }
 
+/* Send NOTIFY message to my front end. */
+
+static void
+NotifyMyFrontEnd(char *relname, int32 listenerPID)
+{
+   if (whereToSendOutput == Remote)
+   {
+       pq_putnchar("A", 1);
+       pq_putint(listenerPID, sizeof(int32));
+       pq_putstr(relname);
+       /* NOTE: we do not do pq_flush() here.  For a self-notify, it will
+        * happen at the end of the transaction, and for incoming notifies
+        * ProcessIncomingNotify will do it after finding all the notifies.
+        */
+   }
+   else
+   {
+       elog(NOTICE, "NOTIFY for %s", relname);
+   }
+}
+
+/* Does pendingNotifies include the given relname?
+ *
+ * NB: not called unless pendingNotifies != NULL.
+ */
+
 static int
 AsyncExistsPendingNotify(char *relname)
 {
@@ -747,11 +834,26 @@ AsyncExistsPendingNotify(char *relname)
    return 0;
 }
 
+/* Clear the pendingNotifies list. */
+
 static void
-ClearPendingNotify()
+ClearPendingNotifies()
 {
    Dlelem     *p;
 
-   while ((p = DLRemHead(pendingNotifies)) != NULL)
-       free(DLE_VAL(p));
+   if (pendingNotifies)
+   {
+       /* Since the referenced strings are malloc'd, we have to scan the
+        * list and delete them individually.  If we used palloc for the
+        * strings then we could just do DLFreeList to get rid of both
+        * the list nodes and the list base...
+        */
+       while ((p = DLRemHead(pendingNotifies)) != NULL)
+       {
+           free(DLE_VAL(p));
+           DLFreeElem(p);
+       }
+       DLFreeList(pendingNotifies);
+       pendingNotifies = NULL;
+   }
 }
index 566b15c5b0d11b076ab7d0b62920c825655c5557..8394fcdea77f12f0cab145c6b3313770a3a4b93f 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.90 1998/10/02 01:14:14 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.91 1998/10/06 02:40:01 tgl Exp $
  *
  * NOTES
  *   this is the "main" module of the postgres backend and
@@ -1511,7 +1511,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
    if (!IsUnderPostmaster)
    {
        puts("\nPOSTGRES backend interactive interface ");
-       puts("$Revision: 1.90 $ $Date: 1998/10/02 01:14:14 $\n");
+       puts("$Revision: 1.91 $ $Date: 1998/10/06 02:40:01 $\n");
    }
 
    /* ----------------
@@ -1559,7 +1559,16 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
        ReadyForQuery(whereToSendOutput);
 
        /* ----------------
-        *   (2) read a command.
+        *   (2) deal with pending asynchronous NOTIFY from other backends,
+        *   and enable async.c's signal handler to execute NOTIFY directly.
+        * ----------------
+        */
+       QueryCancel = false;    /* forget any earlier CANCEL signal */
+
+       EnableNotifyInterrupt();
+
+       /* ----------------
+        *   (3) read a command.
         * ----------------
         */
        MemSet(parser_input, 0, MAX_PARSE_BUFFER);
@@ -1569,7 +1578,13 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
        QueryCancel = false;    /* forget any earlier CANCEL signal */
 
        /* ----------------
-        *   (3) process the command.
+        *   (4) disable async.c's signal handler.
+        * ----------------
+        */
+       DisableNotifyInterrupt();
+
+       /* ----------------
+        *   (5) process the command.
         * ----------------
         */
        switch (firstchar)
@@ -1640,7 +1655,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
        }
 
        /* ----------------
-        *   (4) commit the current transaction
+        *   (6) commit the current transaction
         *
         *   Note: if we had an empty input buffer, then we didn't
         *   call pg_exec_query, so we don't bother to commit this transaction.
index fb3289d7bc181fc9076ec3fa71209288722b60a6..8adbb075be1254122e3d245ffe5658e3c38804b4 100644 (file)
@@ -70,10 +70,6 @@ static char *opt_names[] = {
    "syslog",                   /* use syslog for error messages */
    "hostlookup",               /* enable hostname lookup in ps_status */
    "showportnumber",           /* show port number in ps_status */
-   "notifyunlock",             /* enable unlock of pg_listener after
-                                * notify */
-   "notifyhack"                /* enable notify hack to remove duplicate
-                                * tuples */
 };
 
 /*
index a612910047761e8c06381de1e9536b0ec27350f5..d006264ae12e0b03ec111312aab701190b9da39f 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xact.h,v 1.15 1998/09/01 04:34:35 momjian Exp $
+ * $Id: xact.h,v 1.16 1998/10/06 02:40:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -107,6 +107,7 @@ extern void BeginTransactionBlock(void);
 extern void EndTransactionBlock(void);
 extern bool IsTransactionBlock(void);
 extern void UserAbortTransactionBlock(void);
+extern void AbortOutOfAnyTransaction(void);
 
 extern TransactionId DisabledTransactionId;
 
index 2c9d0a348a5384b86d17dc1a76ba7402a6b0ba2a..5494b0f6c7e9a0a9e59c3a1060a2c649f0dbf5cf 100644 (file)
@@ -1,27 +1,38 @@
 /*-------------------------------------------------------------------------
  *
  * async.h--
- *
- *
+ *   Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: async.h,v 1.9 1998/09/01 04:35:22 momjian Exp $
+ * $Id: async.h,v 1.10 1998/10/06 02:40:08 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef ASYNC_H
 #define ASYNC_H
 
-#include <nodes/memnodes.h>
+#include <postgres.h>
 
-extern void Async_NotifyHandler(SIGNAL_ARGS);
+/* notify-related SQL statements */
 extern void Async_Notify(char *relname);
-extern void Async_NotifyAtCommit(void);
-extern void Async_NotifyAtAbort(void);
 extern void Async_Listen(char *relname, int pid);
 extern void Async_Unlisten(char *relname, int pid);
 
-extern GlobalMemory notifyContext;
+/* perform (or cancel) outbound notify processing at transaction commit */
+extern void AtCommit_Notify(void);
+extern void AtAbort_Notify(void);
+
+/* signal handler for inbound notifies (SIGUSR2) */
+extern void Async_NotifyHandler(SIGNAL_ARGS);
+
+/*
+ * enable/disable processing of inbound notifies directly from signal handler.
+ * The enable routine first performs processing of any inbound notifies that
+ * have occurred since the last disable.  These are meant to be called ONLY
+ * from the appropriate places in PostgresMain().
+ */
+extern void EnableNotifyInterrupt(void);
+extern void DisableNotifyInterrupt(void);
 
 #endif  /* ASYNC_H */
index 8f716393c67028aeb9f906901e35046098f570f5..d978f16ab4443f9c07c5b89b97651dd2679b8ce9 100644 (file)
@@ -66,10 +66,6 @@ enum pg_option_enum
    OPT_SYSLOG,                 /* use syslog for error messages */
    OPT_HOSTLOOKUP,             /* enable hostname lookup in ps_status */
    OPT_SHOWPORTNUMBER,         /* show port number in ps_status */
-   OPT_NOTIFYUNLOCK,           /* enable unlock of pg_listener after
-                                * notify */
-   OPT_NOTIFYHACK,             /* enable notify hack to remove duplicate
-                                * tuples */
 
    NUM_PG_OPTIONS              /* must be the last item of enum */
 };