Add explicit buffering in backend libpq, to compensate for
authorTom Lane
Sat, 23 Jan 1999 22:27:29 +0000 (22:27 +0000)
committerTom Lane
Sat, 23 Jan 1999 22:27:29 +0000 (22:27 +0000)
buffering lost by not going through stdio anymore for client I/O.

src/backend/commands/copy.c
src/backend/libpq/pqcomm.c
src/backend/libpq/pqcomprim.c
src/backend/utils/error/elog.c
src/include/libpq/libpq.h

index 5a8b3fc96504c5de086f30a8ae8bd435cdf86aca..3126e82dfc37f1cbb0897855014e772a08674118 100644 (file)
@@ -6,7 +6,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.67 1999/01/17 06:18:15 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.68 1999/01/23 22:27:26 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -303,9 +303,7 @@ DoCopy(char *relname, bool binary, bool oids, bool from, bool pipe,
        }
        else if (!from && !binary)
        {
-               CopySendData("\\.\n",3,fp);
-           if (IsUnderPostmaster)
-                   pq_flush();
+           CopySendData("\\.\n",3,fp);
        }
    }
 }
index 8f9c14fee9fcca8d56f13912182289dd2e3ac842..386643fe95c3fdfb52b87cd64a2c893e81d19df7 100644 (file)
@@ -5,39 +5,40 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- *  $Id: pqcomm.c,v 1.63 1999/01/17 06:18:26 momjian Exp $
+ *  $Id: pqcomm.c,v 1.64 1999/01/23 22:27:28 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 /*
  * INTERFACE ROUTINES
- *              pq_init                 - initialize libpq
+ *     pq_init         - initialize libpq
  *     pq_getport      - return the PGPORT setting
  *     pq_close        - close input / output connections
  *     pq_flush        - flush pending output
+ *     pq_recvbuf      - load some bytes into the input buffer
  *     pq_getstr       - get a null terminated string from connection
- *              pq_getchar              - get 1 character from connection
- *              pq_peekchar             - peek at first character in connection
+ *     pq_getchar      - get 1 character from connection
+ *     pq_peekchar     - peek at next character from connection
  *     pq_getnchar     - get n characters from connection, and null-terminate
  *     pq_getint       - get an integer from connection
- *              pq_putchar              - send 1 character to connection
+ *     pq_putchar      - send 1 character to connection
  *     pq_putstr       - send a null terminated string to connection
  *     pq_putnchar     - send n characters to connection
  *     pq_putint       - send an integer to connection
- *     pq_putncharlen      - send n characters to connection
+ *     pq_putncharlen  - send n characters to connection
  *                   (also send an int header indicating
  *                    the length)
  *     pq_getinaddr    - initialize address from host and port number
  *     pq_getinserv    - initialize address from host and service name
  *
- *              StreamDoUnlink          - Shutdown UNIX socket connectioin
- *              StreamServerPort        - Open sock stream
- *              StreamConnection        - Create new connection with client
- *              StreamClose             - Close a client/backend connection
+ *     StreamDoUnlink      - Shutdown UNIX socket connection
+ *     StreamServerPort    - Open socket stream
+ *     StreamConnection    - Create new connection with client
+ *     StreamClose         - Close a client/backend connection
  * 
  * NOTES
- *              Frontend is now completey in interfaces/libpq, and no 
- *              functions from this file is used.
+ *              Frontend is now completely in interfaces/libpq, and no 
+ *              functions from this file are used there.
  *
  */
 #include "postgres.h"
 
 extern FILE * debug_port; /* in util.c */
 
+/*
+ * Buffers 
+ */
+char PqSendBuffer[PQ_BUFFER_SIZE];
+char PqRecvBuffer[PQ_BUFFER_SIZE];
+int PqSendPointer,PqRecvPointer,PqRecvLength;
+
+
 /* --------------------------------
  *     pq_init - open portal file descriptors
  * --------------------------------
@@ -86,6 +95,7 @@ extern FILE * debug_port; /* in util.c */
 void
 pq_init(int fd)
 {
+   PqSendPointer = PqRecvPointer = PqRecvLength = 0;
    PQnotifies_init();
    if (getenv("LIBPQ_DEBUG"))
      debug_port = stderr;
@@ -94,40 +104,40 @@ pq_init(int fd)
 /* -------------------------
  *  pq_getchar()
  *
- *  get a character from the input file,
- *
+ *  get a character from the input file, or EOF if trouble
+ * --------------------------------
  */
 
 int
 pq_getchar(void)
 {
-   char c;
-
-   while (recv(MyProcPort->sock, &c, 1, 0) != 1) {
-       if (errno != EINTR)
-           return EOF; /* Not interrupted, so something went wrong */
+   while (PqRecvPointer >= PqRecvLength)
+   {
+       if (pq_recvbuf())       /* If nothing in buffer, then recv some */
+           return EOF;         /* Failed to recv data */
    }
-     
-   return c;
+   return PqRecvBuffer[PqRecvPointer++];
 }
 
-/*
+/* -------------------------
+ *  pq_peekchar()
+ *
+ *  get a character from the connection, but leave it in the buffer
+ *  to be read again
  * --------------------------------
- *              pq_peekchar - get 1 character from connection, but leave it in the stream
  */
-int
-pq_peekchar(void) {
-   char c;
 
-   while (recv(MyProcPort->sock, &c, 1, MSG_PEEK) != 1) {
-       if (errno != EINTR)
-           return EOF; /* Not interrupted, so something went wrong */
+int
+pq_peekchar(void)
+{
+   while (PqRecvPointer >= PqRecvLength)
+   {
+       if (pq_recvbuf())       /* If nothing in buffer, then recv some */
+           return EOF;         /* Failed to recv data */
    }
-
-   return c;
+   /* Note we don't bump the pointer... */
+   return PqRecvBuffer[PqRecvPointer];
 }
-  
-
 
 /* --------------------------------
  *     pq_getport - return the PGPORT setting
@@ -150,18 +160,91 @@ pq_getport()
 void
 pq_close()
 {
-        close(MyProcPort->sock);
+   close(MyProcPort->sock);
    PQnotifies_init();
 }
 
 /* --------------------------------
  *     pq_flush - flush pending output
+ *
+ *     returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-void
+int
 pq_flush()
 {
-  /* Not supported/required? */
+   char *bufptr = PqSendBuffer;
+   char *bufend = PqSendBuffer + PqSendPointer;
+
+   while (bufptr < bufend)
+   {
+       int r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0);
+       if (r <= 0)
+       {
+           if (errno == EINTR)
+               continue;       /* Ok if we were interrupted */
+           /* We would like to use elog() here, but cannot because elog
+            * tries to write to the client, which would cause a recursive
+            * flush attempt!  So just write it out to the postmaster log.
+            */
+           fprintf(stderr, "pq_flush: send() failed, errno %d\n", errno);
+           /* We drop the buffered data anyway so that processing
+            * can continue, even though we'll probably quit soon.
+            */
+           PqSendPointer = 0;
+           return EOF;
+       }
+       bufptr += r;
+   }
+   PqSendPointer = 0;
+   return 0;
+}
+
+/* --------------------------------
+ *     pq_recvbuf - load some bytes into the input buffer
+ *
+ *     returns 0 if OK, EOF if trouble
+ * --------------------------------
+ */
+
+int
+pq_recvbuf()
+{
+   if (PqRecvPointer > 0)
+   {
+       if (PqRecvLength > PqRecvPointer)
+       {
+           /* still some unread data, left-justify it in the buffer */
+           memmove(PqRecvBuffer, PqRecvBuffer+PqRecvPointer,
+                   PqRecvLength-PqRecvPointer);
+           PqRecvLength -= PqRecvPointer;
+           PqRecvPointer = 0;
+       }
+       else
+           PqRecvLength = PqRecvPointer = 0;
+   }
+
+   /* Can fill buffer from PqRecvLength and upwards */
+   for (;;)
+   {
+       int r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength,
+                    PQ_BUFFER_SIZE - PqRecvLength, 0);
+       if (r <= 0)
+       {
+           if (errno == EINTR)
+               continue;       /* Ok if interrupted */
+           /* We would like to use elog() here, but dare not because elog
+            * tries to write to the client, which will cause problems
+            * if we have a hard communications failure ...
+            * So just write the message to the postmaster log.
+            */
+           fprintf(stderr, "pq_recvbuf: recv() failed, errno %d\n", errno);
+           return EOF;
+       }
+       /* r contains number of bytes read, so just incr length */
+       PqRecvLength += r;
+       return 0;
+   }
 }
 
 /* --------------------------------
@@ -194,7 +277,7 @@ pq_getstr(char *s, int maxlen)
 int
 pq_getnchar(char *s, int off, int maxlen)
 {
-        int r = pqGetNBytes(s + off, maxlen);
+   int r = pqGetNBytes(s + off, maxlen);
    s[off+maxlen] = '\0';
    return r;
 }
@@ -602,7 +685,7 @@ StreamConnection(int server_fd, Port *port)
        if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY,
                       &on, sizeof(on)) < 0)
        {
-           elog(ERROR, "postmaster: setsockopt failed");
+           elog(ERROR, "postmaster: setsockopt failed: %m");
            return STATUS_ERROR;
        }
    }
@@ -644,18 +727,9 @@ pq_putncharlen(char *s, int n)
  */
 int pq_putchar(char c) 
 {
-  char isDone = 0;
-
-  do {
-    if (send(MyProcPort->sock, &c, 1, 0) != 1) {
-      if (errno != EINTR) 
-   return EOF; /* Anything other than interrupt is error! */
-    }
-    else
-      isDone = 1; /* Done if we sent one char */
-  } while (!isDone);
-  return c;
+   if (PqSendPointer >= PQ_BUFFER_SIZE)
+       if (pq_flush())         /* If buffer is full, then flush it out */
+           return EOF;
+   PqSendBuffer[PqSendPointer++] = c; /* Put in buffer */
+   return c;
 }
-
-
-
index 23ecfd4e19fe7a13ef0fbb0552ae6091348c096d..e48a1c16888600ee82c27fbb3143abf3cb5bb56b 100644 (file)
@@ -98,7 +98,7 @@ pqPutLong(int integer)
    n = ((PG_PROTOCOL_MAJOR(FrontendProtocol) == 0) ? hton_l(integer) : htonl((uint32) integer));
 #endif
 
-        return pqPutNBytes((char *)&n,4);
+   return pqPutNBytes((char *)&n, 4);
 }
 
 /* --------------------------------------------------------------------- */
@@ -107,7 +107,7 @@ pqGetShort(int *result)
 {
    uint16      n;
 
-   if (pqGetNBytes((char *)&n,2) != 0)
+   if (pqGetNBytes((char *)&n, 2) != 0)
      return EOF;
 
 #ifdef FRONTEND
@@ -138,28 +138,29 @@ pqGetLong(int *result)
 }
 
 /* --------------------------------------------------------------------- */
-/* pqGetNBytes: Read a chunk of exactly len bytes in buffer s (which must be 1
-       byte longer) and terminate it with a '\0'.
-       Return 0 if ok.
-*/
+/* pqGetNBytes: Read a chunk of exactly len bytes into buffer s.
+ * Return 0 if ok, EOF if trouble.
+ */
 int
 pqGetNBytes(char *s, size_t len)
 {
-   int bytesDone = 0;
-
-   do {
-     int r = recv(MyProcPort->sock, s+bytesDone, len-bytesDone, 0);
-     if (r == 0 || r == -1) {
-       if (errno != EINTR)
-         return EOF; /* All other than signal-interrupted is error */
-       continue; /* Otherwise, try again */
-     }
-     
-     /* r contains number of bytes received */
-     bytesDone += r;
-
-   } while (bytesDone < len);
-   /* Zero-termination now in pq_getnchar() instead */
+   size_t amount;
+
+   while (len > 0)
+   {
+       while (PqRecvPointer >= PqRecvLength)
+       {
+           if (pq_recvbuf())   /* If nothing in buffer, then recv some */
+               return EOF;     /* Failed to recv data */
+       }
+       amount = PqRecvLength - PqRecvPointer;
+       if (amount > len)
+           amount = len;
+       memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
+       PqRecvPointer += amount;
+       s += amount;
+       len -= amount;
+   }
    return 0;
 }
 
@@ -167,20 +168,21 @@ pqGetNBytes(char *s, size_t len)
 int
 pqPutNBytes(const char *s, size_t len)
 {
-        int bytesDone = 0;
-
-   do {
-     int r = send(MyProcPort->sock, s+bytesDone, len-bytesDone, 0);
-     if (r == 0 || r == -1) {
-       if (errno != EINTR)
-         return EOF; /* Only signal interruption allowed */
-       continue; /* If interruped and read nothing, just try again */
-     }
-     
-     /* r contains number of bytes sent so far */
-     bytesDone += r;
-   } while (bytesDone < len);
-
+   size_t amount;
+
+   while (len > 0)
+   {
+       if (PqSendPointer >= PQ_BUFFER_SIZE)
+           if (pq_flush())     /* If buffer is full, then flush it out */
+               return EOF;
+       amount = PQ_BUFFER_SIZE - PqSendPointer;
+       if (amount > len)
+           amount = len;
+       memcpy(PqSendBuffer + PqSendPointer, s, amount);
+       PqSendPointer += amount;
+       s += amount;
+       len -= amount;
+   }
    return 0;
 }
 
@@ -191,8 +193,8 @@ pqGetString(char *s, size_t len)
    int         c;
 
    /*
-    * Keep on reading until we get the terminating '\0' and discard those
-    * bytes we don't have room for.
+    * Keep on reading until we get the terminating '\0',
+    * discarding any bytes we don't have room for.
     */
 
    while ((c = pq_getchar()) != EOF && c != '\0')
@@ -216,4 +218,3 @@ pqPutString(const char *s)
 {
   return pqPutNBytes(s,strlen(s)+1);
 }
-
index 4e68c1e24a8177e2438f9e3c97bccdad7f45a345..473fc06c3e11cfb667024c9646b9a7bfb8312532 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.37 1999/01/11 03:56:07 scrappy Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.38 1999/01/23 22:27:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -192,8 +192,15 @@ elog(int lev, const char *fmt,...)
            pq_putnchar("N", 1);
        else
            pq_putnchar("E", 1);
-       /* pq_putint(-101, 4); *//* should be query id */
        pq_putstr(line + TIMESTAMP_SIZE);       /* don't show timestamps */
+       /*
+        * This flush is normally not necessary, since postgres.c will
+        * flush out waiting data when control returns to the main loop.
+        * But it seems best to leave it here, so that the client has some
+        * clue what happened if the backend dies before getting back to the
+        * main loop ... error/notice messages should not be a performance-
+        * critical path anyway, so an extra flush won't hurt much ...
+        */
        pq_flush();
    }
    if (!IsUnderPostmaster)
index a315521eb3594ce5d901208a77b4c0e04bb50db7..c1cdd8ac5d97cc79e13a1b4ca0dfa16f4d16e4d4 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: libpq.h,v 1.23 1999/01/12 12:49:52 scrappy Exp $
+ * $Id: libpq.h,v 1.24 1999/01/23 22:27:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -254,12 +254,13 @@ extern void pq_init(int fd);
 extern void pq_gettty(char *tp);
 extern int pq_getport(void);
 extern void pq_close(void);
-extern void pq_flush(void);
+extern int pq_flush(void);
+extern int pq_recvbuf(void);
 extern int pq_getstr(char *s, int maxlen);
 extern int PQgetline(char *s, int maxlen);
 extern int PQputline(char *s);
-extern int      pq_getchar(void);
-extern int      pq_peekchar(void);
+extern int pq_getchar(void);
+extern int pq_peekchar(void);
 extern int pq_getnchar(char *s, int off, int maxlen);
 extern int pq_getint(int b);
 extern int  pq_putchar(char c);
@@ -282,4 +283,18 @@ extern int StreamServerPort(char *hostName, short portName, int *fdP);
 extern int StreamConnection(int server_fd, Port *port);
 extern void StreamClose(int sock);
 
+/*
+ * Internal send/receive buffers in libpq.
+ * These probably shouldn't be global at all, but unless we merge
+ * pqcomm.c and pqcomprim.c they have to be...
+ */
+
+#define PQ_BUFFER_SIZE 8192
+
+extern char PqSendBuffer[PQ_BUFFER_SIZE];
+extern int PqSendPointer;  /* Next index to store a byte in PqSendBuffer */
+extern char PqRecvBuffer[PQ_BUFFER_SIZE];
+extern int PqRecvPointer;  /* Next index to read a byte from PqRecvBuffer */
+extern int PqRecvLength;   /* End of data available in PqRecvBuffer */
+
 #endif  /* LIBPQ_H */