Remove AtSubStart_Notify.
authorRobert Haas
Fri, 4 Oct 2019 12:19:25 +0000 (08:19 -0400)
committerRobert Haas
Fri, 4 Oct 2019 12:19:25 +0000 (08:19 -0400)
Allocate notify-related state lazily instead. This makes trivial
subtransactions noticeably faster.

Patch by me, reviewed and tested by Dilip Kumar, Kyotaro Horiguchi,
and Jeevan Ladhe.

Discussion: https://postgr.es/m/CA+TgmobE1J22S1eC-6N-je9LgrcwZypkwp+zH6JXo9mc=4Nk3A@mail.gmail.com

src/backend/access/transam/xact.c
src/backend/commands/async.c
src/include/commands/async.h

index 9162286c98a658771aa3bef8bfb53f33160ca9e8..fc55fa6d53c623cf8edddf1dce971db08c005b7b 100644 (file)
@@ -4743,7 +4743,6 @@ StartSubTransaction(void)
     */
    AtSubStart_Memory();
    AtSubStart_ResourceOwner();
-   AtSubStart_Notify();
    AfterTriggerBeginSubXact();
 
    s->state = TRANS_INPROGRESS;
index ee01df589f2d473fe0f411965d3e5fe7741e3c24..d0649d2e3ef17a8552e67f4a6ee75c288819d11f 100644 (file)
@@ -344,9 +344,14 @@ typedef struct
    char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
 } ListenAction;
 
-static List *pendingActions = NIL; /* list of ListenAction */
+typedef struct ActionList
+{
+   int         nestingLevel;   /* current transaction nesting depth */
+   List       *actions;            /* list of ListenAction structs */
+   struct ActionList *upper;   /* details for upper transaction levels */
+} ActionList;
 
-static List *upperPendingActions = NIL; /* list of upper-xact lists */
+static ActionList *pendingActions = NULL;
 
 /*
  * State for outbound notifies consists of a list of all channels+payloads
@@ -385,8 +390,10 @@ typedef struct Notification
 
 typedef struct NotificationList
 {
+   int         nestingLevel;   /* current transaction nesting depth */
    List       *events;         /* list of Notification structs */
    HTAB       *hashtab;        /* hash of NotificationHash structs, or NULL */
+   struct NotificationList *upper; /* details for upper transaction levels */
 } NotificationList;
 
 #define MIN_HASHABLE_NOTIFIES 16   /* threshold to build hashtab */
@@ -396,9 +403,7 @@ typedef struct NotificationHash
    Notification *event;        /* => the actual Notification struct */
 } NotificationHash;
 
-static NotificationList *pendingNotifies = NULL;   /* current list, if any */
-
-static List *upperPendingNotifies = NIL;   /* list of upper-xact lists */
+static NotificationList *pendingNotifies = NULL;
 
 /*
  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
@@ -609,6 +614,7 @@ pg_notify(PG_FUNCTION_ARGS)
 void
 Async_Notify(const char *channel, const char *payload)
 {
+   int         my_level = GetCurrentTransactionNestLevel();
    size_t      channel_len;
    size_t      payload_len;
    Notification *n;
@@ -659,25 +665,36 @@ Async_Notify(const char *channel, const char *payload)
    else
        n->data[channel_len + 1] = '\0';
 
-   /* Now check for duplicates */
-   if (AsyncExistsPendingNotify(n))
+   if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
    {
-       /* It's a dup, so forget it */
-       pfree(n);
-       MemoryContextSwitchTo(oldcontext);
-       return;
-   }
+       NotificationList *notifies;
 
-   if (pendingNotifies == NULL)
-   {
-       /* First notify event in current (sub)xact */
-       pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
-       pendingNotifies->events = list_make1(n);
+       /*
+        * First notify event in current (sub)xact. Note that we allocate the
+        * NotificationList in TopTransactionContext; the nestingLevel might
+        * get changed later by AtSubCommit_Notify.
+        */
+       notifies = (NotificationList *)
+           MemoryContextAlloc(TopTransactionContext,
+                              sizeof(NotificationList));
+       notifies->nestingLevel = my_level;
+       notifies->events = list_make1(n);
        /* We certainly don't need a hashtable yet */
-       pendingNotifies->hashtab = NULL;
+       notifies->hashtab = NULL;
+       notifies->upper = pendingNotifies;
+       pendingNotifies = notifies;
    }
    else
    {
+       /* Now check for duplicates */
+       if (AsyncExistsPendingNotify(n))
+       {
+           /* It's a dup, so forget it */
+           pfree(n);
+           MemoryContextSwitchTo(oldcontext);
+           return;
+       }
+
        /* Append more events to existing list */
        AddEventToPendingNotifies(n);
    }
@@ -698,6 +715,7 @@ queue_listen(ListenActionKind action, const char *channel)
 {
    MemoryContext oldcontext;
    ListenAction *actrec;
+   int         my_level = GetCurrentTransactionNestLevel();
 
    /*
     * Unlike Async_Notify, we don't try to collapse out duplicates. It would
@@ -713,7 +731,24 @@ queue_listen(ListenActionKind action, const char *channel)
    actrec->action = action;
    strcpy(actrec->channel, channel);
 
-   pendingActions = lappend(pendingActions, actrec);
+   if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
+   {
+       ActionList *actions;
+
+       /*
+        * First action in current sub(xact). Note that we allocate the
+        * ActionList in TopTransactionContext; the nestingLevel might get
+        * changed later by AtSubCommit_Notify.
+        */
+       actions = (ActionList *)
+           MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
+       actions->nestingLevel = my_level;
+       actions->actions = list_make1(actrec);
+       actions->upper = pendingActions;
+       pendingActions = actions;
+   }
+   else
+       pendingActions->actions = lappend(pendingActions->actions, actrec);
 
    MemoryContextSwitchTo(oldcontext);
 }
@@ -744,7 +779,7 @@ Async_Unlisten(const char *channel)
        elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
 
    /* If we couldn't possibly be listening, no need to queue anything */
-   if (pendingActions == NIL && !unlistenExitRegistered)
+   if (pendingActions == NULL && !unlistenExitRegistered)
        return;
 
    queue_listen(LISTEN_UNLISTEN, channel);
@@ -762,7 +797,7 @@ Async_UnlistenAll(void)
        elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
 
    /* If we couldn't possibly be listening, no need to queue anything */
-   if (pendingActions == NIL && !unlistenExitRegistered)
+   if (pendingActions == NULL && !unlistenExitRegistered)
        return;
 
    queue_listen(LISTEN_UNLISTEN_ALL, "");
@@ -858,21 +893,24 @@ PreCommit_Notify(void)
        elog(DEBUG1, "PreCommit_Notify");
 
    /* Preflight for any pending listen/unlisten actions */
-   foreach(p, pendingActions)
+   if (pendingActions != NULL)
    {
-       ListenAction *actrec = (ListenAction *) lfirst(p);
-
-       switch (actrec->action)
+       foreach(p, pendingActions->actions)
        {
-           case LISTEN_LISTEN:
-               Exec_ListenPreCommit();
-               break;
-           case LISTEN_UNLISTEN:
-               /* there is no Exec_UnlistenPreCommit() */
-               break;
-           case LISTEN_UNLISTEN_ALL:
-               /* there is no Exec_UnlistenAllPreCommit() */
-               break;
+           ListenAction *actrec = (ListenAction *) lfirst(p);
+
+           switch (actrec->action)
+           {
+               case LISTEN_LISTEN:
+                   Exec_ListenPreCommit();
+                   break;
+               case LISTEN_UNLISTEN:
+                   /* there is no Exec_UnlistenPreCommit() */
+                   break;
+               case LISTEN_UNLISTEN_ALL:
+                   /* there is no Exec_UnlistenAllPreCommit() */
+                   break;
+           }
        }
    }
 
@@ -961,21 +999,24 @@ AtCommit_Notify(void)
        elog(DEBUG1, "AtCommit_Notify");
 
    /* Perform any pending listen/unlisten actions */
-   foreach(p, pendingActions)
+   if (pendingActions != NULL)
    {
-       ListenAction *actrec = (ListenAction *) lfirst(p);
-
-       switch (actrec->action)
+       foreach(p, pendingActions->actions)
        {
-           case LISTEN_LISTEN:
-               Exec_ListenCommit(actrec->channel);
-               break;
-           case LISTEN_UNLISTEN:
-               Exec_UnlistenCommit(actrec->channel);
-               break;
-           case LISTEN_UNLISTEN_ALL:
-               Exec_UnlistenAllCommit();
-               break;
+           ListenAction *actrec = (ListenAction *) lfirst(p);
+
+           switch (actrec->action)
+           {
+               case LISTEN_LISTEN:
+                   Exec_ListenCommit(actrec->channel);
+                   break;
+               case LISTEN_UNLISTEN:
+                   Exec_UnlistenCommit(actrec->channel);
+                   break;
+               case LISTEN_UNLISTEN_ALL:
+                   Exec_UnlistenAllCommit();
+                   break;
+           }
        }
    }
 
@@ -1705,36 +1746,6 @@ AtAbort_Notify(void)
    ClearPendingActionsAndNotifies();
 }
 
-/*
- * AtSubStart_Notify() --- Take care of subtransaction start.
- *
- * Push empty state for the new subtransaction.
- */
-void
-AtSubStart_Notify(void)
-{
-   MemoryContext old_cxt;
-
-   /* Keep the list-of-lists in TopTransactionContext for simplicity */
-   old_cxt = MemoryContextSwitchTo(TopTransactionContext);
-
-   upperPendingActions = lcons(pendingActions, upperPendingActions);
-
-   Assert(list_length(upperPendingActions) ==
-          GetCurrentTransactionNestLevel() - 1);
-
-   pendingActions = NIL;
-
-   upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
-
-   Assert(list_length(upperPendingNotifies) ==
-          GetCurrentTransactionNestLevel() - 1);
-
-   pendingNotifies = NULL;
-
-   MemoryContextSwitchTo(old_cxt);
-}
-
 /*
  * AtSubCommit_Notify() --- Take care of subtransaction commit.
  *
@@ -1743,53 +1754,66 @@ AtSubStart_Notify(void)
 void
 AtSubCommit_Notify(void)
 {
-   List       *parentPendingActions;
-   NotificationList *parentPendingNotifies;
-
-   parentPendingActions = linitial_node(List, upperPendingActions);
-   upperPendingActions = list_delete_first(upperPendingActions);
-
-   Assert(list_length(upperPendingActions) ==
-          GetCurrentTransactionNestLevel() - 2);
-
-   /*
-    * Mustn't try to eliminate duplicates here --- see queue_listen()
-    */
-   pendingActions = list_concat(parentPendingActions, pendingActions);
+   int         my_level = GetCurrentTransactionNestLevel();
 
-   parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
-   upperPendingNotifies = list_delete_first(upperPendingNotifies);
+   /* If there are actions at our nesting level, we must reparent them. */
+   if (pendingActions != NULL &&
+       pendingActions->nestingLevel >= my_level)
+   {
+       if (pendingActions->upper == NULL ||
+           pendingActions->upper->nestingLevel < my_level - 1)
+       {
+           /* nothing to merge; give the whole thing to the parent */
+           --pendingActions->nestingLevel;
+       }
+       else
+       {
+           ActionList *childPendingActions = pendingActions;
 
-   Assert(list_length(upperPendingNotifies) ==
-          GetCurrentTransactionNestLevel() - 2);
+           pendingActions = pendingActions->upper;
 
-   if (pendingNotifies == NULL)
-   {
-       /* easy, no notify events happened in current subxact */
-       pendingNotifies = parentPendingNotifies;
-   }
-   else if (parentPendingNotifies == NULL)
-   {
-       /* easy, subxact's list becomes parent's */
+           /*
+            * Mustn't try to eliminate duplicates here --- see queue_listen()
+            */
+           pendingActions->actions =
+               list_concat(pendingActions->actions,
+                           childPendingActions->actions);
+           pfree(childPendingActions);
+       }
    }
-   else
+
+   /* If there are notifies at our nesting level, we must reparent them. */
+   if (pendingNotifies != NULL &&
+       pendingNotifies->nestingLevel >= my_level)
    {
-       /*
-        * Formerly, we didn't bother to eliminate duplicates here, but now we
-        * must, else we fall foul of "Assert(!found)", either here or during
-        * a later attempt to build the parent-level hashtable.
-        */
-       NotificationList *childPendingNotifies = pendingNotifies;
-       ListCell   *l;
+       Assert(pendingNotifies->nestingLevel == my_level);
 
-       pendingNotifies = parentPendingNotifies;
-       /* Insert all the subxact's events into parent, except for dups */
-       foreach(l, childPendingNotifies->events)
+       if (pendingNotifies->upper == NULL ||
+           pendingNotifies->upper->nestingLevel < my_level - 1)
        {
-           Notification *childn = (Notification *) lfirst(l);
+           /* nothing to merge; give the whole thing to the parent */
+           --pendingNotifies->nestingLevel;
+       }
+       else
+       {
+           /*
+            * Formerly, we didn't bother to eliminate duplicates here, but
+            * now we must, else we fall foul of "Assert(!found)", either here
+            * or during a later attempt to build the parent-level hashtable.
+            */
+           NotificationList *childPendingNotifies = pendingNotifies;
+           ListCell   *l;
+
+           pendingNotifies = pendingNotifies->upper;
+           /* Insert all the subxact's events into parent, except for dups */
+           foreach(l, childPendingNotifies->events)
+           {
+               Notification *childn = (Notification *) lfirst(l);
 
-           if (!AsyncExistsPendingNotify(childn))
-               AddEventToPendingNotifies(childn);
+               if (!AsyncExistsPendingNotify(childn))
+                   AddEventToPendingNotifies(childn);
+           }
+           pfree(childPendingNotifies);
        }
    }
 }
@@ -1805,23 +1829,31 @@ AtSubAbort_Notify(void)
    /*
     * All we have to do is pop the stack --- the actions/notifies made in
     * this subxact are no longer interesting, and the space will be freed
-    * when CurTransactionContext is recycled.
+    * when CurTransactionContext is recycled. We still have to free the
+    * ActionList and NotificationList objects themselves, though, because
+    * those are allocated in TopTransactionContext.
     *
-    * This routine could be called more than once at a given nesting level if
-    * there is trouble during subxact abort.  Avoid dumping core by using
-    * GetCurrentTransactionNestLevel as the indicator of how far we need to
-    * prune the list.
+    * Note that there might be no entries at all, or no entries for the
+    * current subtransaction level, either because none were ever created,
+    * or because we reentered this routine due to trouble during subxact
+    * abort.
     */
-   while (list_length(upperPendingActions) > my_level - 2)
+   while (pendingActions != NULL &&
+          pendingActions->nestingLevel >= my_level)
    {
-       pendingActions = linitial_node(List, upperPendingActions);
-       upperPendingActions = list_delete_first(upperPendingActions);
+       ActionList *childPendingActions = pendingActions;
+
+       pendingActions = pendingActions->upper;
+       pfree(childPendingActions);
    }
 
-   while (list_length(upperPendingNotifies) > my_level - 2)
+   while (pendingNotifies != NULL &&
+          pendingNotifies->nestingLevel >= my_level)
    {
-       pendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
-       upperPendingNotifies = list_delete_first(upperPendingNotifies);
+       NotificationList *childPendingNotifies = pendingNotifies;
+
+       pendingNotifies = pendingNotifies->upper;
+       pfree(childPendingNotifies);
    }
 }
 
@@ -2374,12 +2406,11 @@ static void
 ClearPendingActionsAndNotifies(void)
 {
    /*
-    * We used to have to explicitly deallocate the list members and nodes,
-    * because they were malloc'd.  Now, since we know they are palloc'd in
-    * CurTransactionContext, we need not do that --- they'll go away
-    * automatically at transaction exit.  We need only reset the list head
-    * pointers.
+    * Everything's allocated in either TopTransactionContext or the context
+    * for the subtransaction to which it corresponds. So, there's nothing
+    * to do here except rest the pointers; the space will be reclaimed when
+    * the contexts are deleted.
     */
-   pendingActions = NIL;
+   pendingActions = NULL;
    pendingNotifies = NULL;
 }
index c295dc67c64fd206008cef094eb1fefbbe3d70bc..90b682f64d747908bdb954c96597cc2f57f3c65d 100644 (file)
@@ -40,7 +40,6 @@ extern void Async_UnlistenAll(void);
 extern void PreCommit_Notify(void);
 extern void AtCommit_Notify(void);
 extern void AtAbort_Notify(void);
-extern void AtSubStart_Notify(void);
 extern void AtSubCommit_Notify(void);
 extern void AtSubAbort_Notify(void);
 extern void AtPrepare_Notify(void);