guile-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Guile-commits] 01/01: Support for non-blocking I/O


From: Andy Wingo
Subject: [Guile-commits] 01/01: Support for non-blocking I/O
Date: Fri, 20 May 2016 13:40:47 +0000 (UTC)

wingo pushed a commit to branch master
in repository guile.

commit 534139e45852f5b59ef4a75c99d757c7456ce19c
Author: Andy Wingo <address@hidden>
Date:   Fri May 20 14:51:51 2016 +0200

    Support for non-blocking I/O
    
    * doc/ref/api-io.texi (I/O Extensions): Document read_wait_fd /
      write_wait_fd members.
      (Non-Blocking I/O): New section.
    * libguile/fports.c (fport_read, fport_write): Return -1 if the
      operation would block.
      (fport_wait_fd, scm_make_fptob): Add read/write wait-fd
      implementation.
    * libguile/ports-internal.h (scm_t_port_type): Add read_wait_fd /
      write_wait_fd.
    * libguile/ports.c (default_read_wait_fd, default_write_wait_fd): New
      functions.
      (scm_make_port_type): Initialize default read/write wait fd impls.
      (trampoline_to_c_read, trampoline_to_scm_read)
      (trampoline_to_c_write, trampoline_to_scm_write): To Scheme, a return
      of #f indicates EWOULDBLOCk.
      (scm_set_port_read_wait_fd, scm_set_port_write_wait_fd): New
      functions.
      (port_read_wait_fd, port_write_wait_fd, scm_port_read_wait_fd)
      (scm_port_write_wait_fd, port_poll, scm_port_poll): New functions.
      (scm_i_read_bytes, scm_i_write_bytes): Poll if the read or write would
      block.
    * libguile/ports.h (scm_set_port_read_wait_fd)
      (scm_set_port_write_wait_fd): Add declarations.
    * module/ice-9/ports.scm: Shunt port-poll and port-{read,write}-wait-fd
      to the internals module.
    * module/ice-9/sports.scm (current-write-waiter):
      (current-read-waiter): Implement.
    * test-suite/tests/ports.test: Adapt non-blocking test to new behavior.
    * NEWS: Add entry.
---
 NEWS                        |    4 +
 doc/ref/api-io.texi         |   87 +++++++++++++++++++++
 libguile/fports.c           |   46 +++++++++--
 libguile/ports-internal.h   |    3 +
 libguile/ports.c            |  179 +++++++++++++++++++++++++++++++++++++++----
 libguile/ports.h            |    4 +
 module/ice-9/ports.scm      |   10 ++-
 module/ice-9/sports.scm     |   26 +++++--
 test-suite/tests/ports.test |   27 ++++---
 9 files changed, 344 insertions(+), 42 deletions(-)

diff --git a/NEWS b/NEWS
index 3e64129..e887ec4 100644
--- a/NEWS
+++ b/NEWS
@@ -38,6 +38,10 @@ is equivalent to an unbuffered port.  Ports may set their 
default buffer
 sizes, and some ports (for example soft ports) are unbuffered by default
 for historical reasons.
 
+** Support for non-blocking I/O
+
+See "Non-Blocking I/O" in the manual, for more.
+
 ** Removal of port locks
 
 As part of the 2.2 series, we introduced recursive locks on each port,
diff --git a/doc/ref/api-io.texi b/doc/ref/api-io.texi
index 5b20097..3132045 100644
--- a/doc/ref/api-io.texi
+++ b/doc/ref/api-io.texi
@@ -20,6 +20,7 @@
 * Port Types::                  Types of port and how to make them.
 * R6RS I/O Ports::              The R6RS port API.
 * I/O Extensions::              Implementing new port types in C.
+* Non-Blocking I/O::            How Guile deals with EWOULDBLOCK.
 * BOM Handling::                Handling of Unicode byte order marks.
 @end menu
 
@@ -2302,6 +2303,24 @@ It should write out bytes from the supplied bytevector 
@code{src},
 starting at offset @code{start} and continuing for @code{count} bytes,
 and return the number of bytes that were written.
 
address@hidden read_wait_fd
address@hidden write_wait_fd
+If a port's @code{read} or @code{write} function returns @code{(size_t)
+-1}, that indicates that reading or writing would block.  In that case
+to preserve the illusion of a blocking read or write operation, Guile's
+C port run-time will @code{poll} on the file descriptor returned by
+either the port's @code{read_wait_fd} or @code{write_wait_fd} function.
+Set using
+
address@hidden void scm_set_port_read_wait_fd (scm_t_port_type *type, int 
(*wait_fd) (SCM port))
address@hidden void scm_set_port_write_wait_fd (scm_t_port_type *type, int 
(*wait_fd) (SCM port))
address@hidden deftypefun
+
+Only a port type which implements the @code{read_wait_fd} or
address@hidden port methods can usefully return @code{(size_t) -1}
+from a read or write function.  @xref{Non-Blocking I/O}, for more on
+non-blocking I/O in Guile.
+
 @item print
 Called when @code{write} is called on the port, to print a port
 description.  For example, for a file port it may produce something
@@ -2384,6 +2403,74 @@ operating system inform Guile about the appropriate 
buffer sizes for the
 particular file opened by the port.
 @end table
 
address@hidden Non-Blocking I/O
address@hidden Non-Blocking I/O
+
+Most ports in Guile are @dfn{blocking}: when you try to read a character
+from a port, Guile will block on the read until a character is ready, or
+end-of-stream is detected.  Likewise whenever Guile goes to write
+(possibly buffered) data to an output port, Guile will block until all
+the data is written.
+
+Interacting with ports in blocking mode is very convenient: you can
+write straightforward, sequential algorithms whose code flow reflects
+the flow of data.  However, blocking I/O has two main limitations.
+
+The first is that it's easy to get into a situation where code is
+waiting on data.  Time spent waiting on data when code could be doing
+something else is wasteful and prevents your program from reaching its
+peak throughput.  If you implement a web server that sequentially
+handles requests from clients, it's very easy for the server to end up
+waiting on a client to finish its HTTP request, or waiting on it to
+consume the response.  The end result is that you are able to serve
+fewer requests per second than you'd like to serve.
+
+The second limitation is related: a blocking parser over user-controlled
+input is a denial-of-service vulnerability.  Indeed the so-called ``slow
+loris'' attack of the early 2010s was just that: an attack on common web
+servers that drip-fed HTTP requests, one character at a time.  All it
+took was a handful of slow loris connections to occupy an entire web
+server.
+
+In Guile we would like to preserve the ability to write straightforward
+blocking networking processes of all kinds, but under the hood to allow
+those processes to suspend their requests if they would block.
+
+To do this, the first piece is to allow Guile ports to declare
+themselves as being nonblocking.  This is currently supported only for
+file ports, which also includes sockets, terminals, or any other port
+that is backed by a file descriptor.  To do that, we use an arcane UNIX
+incantation:
+
address@hidden
+(let ((flags (fcntl socket F_GETFL)))
+  (fcntl socket F_SETFL (logior O_NONBLOCK flags)))
address@hidden example
+
+Now the file descriptor is open in non-blocking mode.  If Guile tries to
+read or write from this file descriptor in C, it will block by polling
+on the socket's @code{read_wait_fd}, to preserve the illusion of a
+blocking read or write.  @xref{I/O Extensions} for more on that internal
+interface.
+
+However if a user uses the new and experimental Scheme implementation of
+ports in @code{(ice-9 sports)}, Guile instead calls the value of the
address@hidden or @code{current-write-waiter} parameters on
+the port before re-trying the read or write.  The default value of these
+parameters does the same thing as the C port runtime: it blocks.
+However it's possible to dynamically bind these parameters to handlers
+that can suspend the current coroutine to a scheduler, to be later
+re-animated once the port becomes readable or writable in the future.
+In the mean-time the scheduler can run other code, for example servicing
+other web requests.
+
+Guile does not currently include such a scheduler.  Currently we want to
+make sure that we're providing the right primitives that can be used to
+build schedulers and other user-space concurrency patterns.  In the
+meantime, have a look at 8sync (@url{https://gnu.org/software/8sync})
+for a prototype of an asynchronous I/O and concurrency facility.
+
+
 @node BOM Handling
 @subsection Handling of Unicode byte order marks.
 @cindex BOM
diff --git a/libguile/fports.c b/libguile/fports.c
index 046a844..271f3a0 100644
--- a/libguile/fports.c
+++ b/libguile/fports.c
@@ -573,14 +573,24 @@ fport_print (SCM exp, SCM port, scm_print_state *pstate 
SCM_UNUSED)
 static size_t
 fport_read (SCM port, SCM dst, size_t start, size_t count)
 {
-  long res;
   scm_t_fport *fp = SCM_FSTREAM (port);
   signed char *ptr = SCM_BYTEVECTOR_CONTENTS (dst) + start;
+  ssize_t ret;
 
-  SCM_SYSCALL (res = read (fp->fdes, ptr, count));
-  if (res == -1)
-    scm_syserror ("fport_read");
-  return res;
+ retry:
+  ret = read (fp->fdes, ptr, count);
+  if (ret < 0)
+    {
+      if (errno == EINTR)
+        {
+          SCM_ASYNC_TICK;
+          goto retry;
+        }
+      if (errno == EWOULDBLOCK || errno == EAGAIN)
+        return -1;
+      scm_syserror ("fport_read");
+    }
+  return ret;
 }
 
 static size_t
@@ -588,11 +598,23 @@ fport_write (SCM port, SCM src, size_t start, size_t 
count)
 {
   int fd = SCM_FPORT_FDES (port);
   signed char *ptr = SCM_BYTEVECTOR_CONTENTS (src) + start;
+  ssize_t ret;
 
-  if (full_write (fd, ptr, count) < count)
-    scm_syserror ("fport_write");
+ retry:
+  ret = write (fd, ptr, count);
+  if (ret < 0)
+    {
+      if (errno == EINTR)
+        {
+          SCM_ASYNC_TICK;
+          goto retry;
+        }
+      if (errno == EWOULDBLOCK || errno == EAGAIN)
+        return -1;
+      scm_syserror ("fport_write");
+    }
 
-  return count;
+  return ret;
 }
 
 static scm_t_off
@@ -637,6 +659,12 @@ fport_random_access_p (SCM port)
   return SCM_FDES_RANDOM_P (SCM_FSTREAM (port)->fdes);
 }
 
+static int
+fport_wait_fd (SCM port)
+{
+  return SCM_FSTREAM (port)->fdes;
+}
+
 /* Query the OS to get the natural buffering for FPORT, if available.  */
 static void
 fport_get_natural_buffer_sizes (SCM port, size_t *read_size, size_t 
*write_size)
@@ -660,6 +688,8 @@ scm_make_fptob ()
   scm_set_port_close                    (ptob, fport_close);
   scm_set_port_seek                     (ptob, fport_seek);
   scm_set_port_truncate                 (ptob, fport_truncate);
+  scm_set_port_read_wait_fd             (ptob, fport_wait_fd);
+  scm_set_port_write_wait_fd            (ptob, fport_wait_fd);
   scm_set_port_input_waiting            (ptob, fport_input_waiting);
   scm_set_port_random_access_p          (ptob, fport_random_access_p);
   scm_set_port_get_natural_buffer_sizes (ptob, fport_get_natural_buffer_sizes);
diff --git a/libguile/ports-internal.h b/libguile/ports-internal.h
index 54ce3e4..38da49e 100644
--- a/libguile/ports-internal.h
+++ b/libguile/ports-internal.h
@@ -44,6 +44,9 @@ struct scm_t_port_type
   SCM scm_read;
   SCM scm_write;
 
+  int (*read_wait_fd) (SCM port);
+  int (*write_wait_fd) (SCM port);
+
   scm_t_off (*seek) (SCM port, scm_t_off OFFSET, int WHENCE);
   void (*close) (SCM port);
 
diff --git a/libguile/ports.c b/libguile/ports.c
index c67bdf5..ba37555 100644
--- a/libguile/ports.c
+++ b/libguile/ports.c
@@ -33,6 +33,7 @@
 #include <fcntl.h>  /* for chsize on mingw */
 #include <assert.h>
 #include <iconv.h>
+#include <poll.h>
 #include <uniconv.h>
 #include <unistr.h>
 #include <striconveh.h>
@@ -126,6 +127,18 @@ default_random_access_p (SCM port)
   return SCM_PORT_TYPE (port)->seek != NULL;
 }
 
+static int
+default_read_wait_fd (SCM port)
+{
+  scm_misc_error ("read_wait_fd", "unimplemented", SCM_EOL);
+}
+
+static int
+default_write_wait_fd (SCM port)
+{
+  scm_misc_error ("write_wait_fd", "unimplemented", SCM_EOL);
+}
+
 scm_t_port_type *
 scm_make_port_type (char *name,
                     size_t (*read) (SCM port, SCM dst, size_t start,
@@ -144,6 +157,8 @@ scm_make_port_type (char *name,
   desc->c_write = write;
   desc->scm_read = read ? trampoline_to_c_read_subr : SCM_BOOL_F;
   desc->scm_write = write ? trampoline_to_c_write_subr : SCM_BOOL_F;
+  desc->read_wait_fd = default_read_wait_fd;
+  desc->write_wait_fd = default_write_wait_fd;
   desc->random_access_p = default_random_access_p;
   scm_make_port_classes (desc);
 
@@ -154,7 +169,7 @@ static SCM
 trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM count)
 #define FUNC_NAME "port-read"
 {
-  size_t c_start, c_count;
+  size_t c_start, c_count, ret;
 
   SCM_VALIDATE_OPPORT (1, port);
   c_start = scm_to_size_t (start);
@@ -162,24 +177,25 @@ trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM 
count)
   SCM_ASSERT_RANGE (2, start, c_start <= c_count);
   SCM_ASSERT_RANGE (3, count, c_start+c_count <= scm_c_bytevector_length 
(dst));
 
-  return scm_from_size_t
-    (SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count));
+  ret = SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count);
+
+  return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
 }
 #undef FUNC_NAME
 
 static size_t
 trampoline_to_scm_read (SCM port, SCM dst, size_t start, size_t count)
 {
-  return scm_to_size_t
-    (scm_call_4 (SCM_PORT_TYPE (port)->scm_read, port, dst,
-                 scm_from_size_t (start), scm_from_size_t (count)));
+  SCM ret = scm_call_4 (SCM_PORT_TYPE (port)->scm_read, port, dst,
+                        scm_from_size_t (start), scm_from_size_t (count));
+  return scm_is_true (ret) ? scm_to_size_t (ret) : (size_t) -1;
 }
 
 static SCM
 trampoline_to_c_write (SCM port, SCM src, SCM start, SCM count)
 #define FUNC_NAME "port-write"
 {
-  size_t c_start, c_count;
+  size_t c_start, c_count, ret;
 
   SCM_VALIDATE_OPPORT (1, port);
   c_start = scm_to_size_t (start);
@@ -187,17 +203,18 @@ trampoline_to_c_write (SCM port, SCM src, SCM start, SCM 
count)
   SCM_ASSERT_RANGE (2, start, c_start <= c_count);
   SCM_ASSERT_RANGE (3, count, c_start+c_count <= scm_c_bytevector_length 
(src));
 
-  return scm_from_size_t
-    (SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count));
+  ret = SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count);
+
+  return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
 }
 #undef FUNC_NAME
 
 static size_t
 trampoline_to_scm_write (SCM port, SCM src, size_t start, size_t count)
 {
-  return scm_to_size_t
-    (scm_call_4 (SCM_PORT_TYPE (port)->scm_write, port, src,
-                 scm_from_size_t (start), scm_from_size_t (count)));
+  SCM ret = scm_call_4 (SCM_PORT_TYPE (port)->scm_write, port, src,
+                        scm_from_size_t (start), scm_from_size_t (count));
+  return scm_is_true (ret) ? scm_to_size_t (ret) : (size_t) -1;
 }
 
 void
@@ -215,6 +232,18 @@ scm_set_port_scm_write (scm_t_port_type *ptob, SCM write)
 }
 
 void
+scm_set_port_read_wait_fd (scm_t_port_type *ptob, int (*get_fd) (SCM))
+{
+  ptob->read_wait_fd = get_fd;
+}
+
+void
+scm_set_port_write_wait_fd (scm_t_port_type *ptob, int (*get_fd) (SCM))
+{
+  ptob->write_wait_fd = get_fd;
+}
+
+void
 scm_set_port_print (scm_t_port_type *ptob,
                     int (*print) (SCM exp, SCM port, scm_print_state *pstate))
 {
@@ -1232,6 +1261,116 @@ SCM_DEFINE (scm_set_port_conversion_strategy_x, 
"set-port-conversion-strategy!",
 
 
 
+/* Non-blocking I/O.  */
+
+static int
+port_read_wait_fd (SCM port)
+{
+  scm_t_port_type *ptob = SCM_PORT_TYPE (port);
+  return ptob->read_wait_fd (port);
+}
+
+static int
+port_write_wait_fd (SCM port)
+{
+  scm_t_port_type *ptob = SCM_PORT_TYPE (port);
+  return ptob->write_wait_fd (port);
+}
+
+SCM_INTERNAL SCM scm_port_read_wait_fd (SCM);
+SCM_DEFINE (scm_port_read_wait_fd, "port-read-wait-fd", 1, 0, 0,
+            (SCM port), "")
+#define FUNC_NAME s_scm_port_read_wait_fd
+{
+  int fd;
+
+  port = SCM_COERCE_OUTPORT (port);
+  SCM_VALIDATE_OPINPORT (1, port);
+
+  fd = port_read_wait_fd (port);
+  return fd < 0 ? SCM_BOOL_F : scm_from_int (fd);
+}
+#undef FUNC_NAME
+
+SCM_INTERNAL SCM scm_port_write_wait_fd (SCM);
+SCM_DEFINE (scm_port_write_wait_fd, "port-write-wait-fd", 1, 0, 0,
+            (SCM port), "")
+#define FUNC_NAME s_scm_port_write_wait_fd
+{
+  int fd;
+
+  port = SCM_COERCE_OUTPORT (port);
+  SCM_VALIDATE_OPOUTPORT (1, port);
+
+  fd = port_write_wait_fd (port);
+  return fd < 0 ? SCM_BOOL_F : scm_from_int (fd);
+}
+#undef FUNC_NAME
+
+static int
+port_poll (SCM port, short events, int timeout)
+#define FUNC_NAME "port-poll"
+{
+  struct pollfd pollfd[2];
+  int nfds = 0, rv = 0;
+
+  if (events & POLLIN)
+    {
+      pollfd[nfds].fd = port_read_wait_fd (port);
+      pollfd[nfds].events = events & (POLLIN | POLLPRI);
+      pollfd[nfds].revents = 0;
+      nfds++;
+    }
+  if (events & POLLOUT)
+    {
+      pollfd[nfds].fd = port_write_wait_fd (port);
+      pollfd[nfds].events = events & (POLLOUT | POLLPRI);
+      pollfd[nfds].revents = 0;
+      nfds++;
+    }
+
+  if (nfds == 2 && pollfd[0].fd == pollfd[1].fd)
+    {
+      pollfd[0].events |= pollfd[1].events;
+      nfds--;
+    }
+
+  SCM_SYSCALL (rv = poll (pollfd, nfds, timeout));
+  if (rv < 0)
+    SCM_SYSERROR;
+
+  return rv;
+}
+#undef FUNC_NAME
+
+SCM_INTERNAL SCM scm_port_poll (SCM, SCM, SCM);
+SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0,
+            (SCM port, SCM events, SCM timeout),
+            "")
+#define FUNC_NAME s_scm_port_poll
+{
+  short c_events = 0;
+  int c_timeout;
+
+  port = SCM_COERCE_OUTPORT (port);
+  SCM_VALIDATE_PORT (1, port);
+  SCM_VALIDATE_STRING (2, events);
+  c_timeout = SCM_UNBNDP (timeout) ? -1 : SCM_NUM2INT (3, timeout);
+
+  if (scm_i_string_contains_char (events, 'r'))
+    c_events |= POLLIN;
+  if (scm_i_string_contains_char (events, '!'))
+    c_events |= POLLPRI;
+  if (scm_i_string_contains_char (events, 'w'))
+    c_events |= POLLIN;
+
+  return scm_from_int (port_poll (port, c_events, c_timeout));
+}
+#undef FUNC_NAME
+
+
+
+
 /* Input.  */
 
 static int
@@ -1330,8 +1469,15 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, 
size_t count)
   assert (count <= SCM_BYTEVECTOR_LENGTH (dst));
   assert (start + count <= SCM_BYTEVECTOR_LENGTH (dst));
 
+ retry:
   filled = ptob->c_read (port, dst, start, count);
 
+  if (filled == (size_t) -1)
+    {
+      port_poll (port, POLLIN, -1);
+      goto retry;
+    }
+
   assert (filled <= count);
 
   return filled;
@@ -2508,7 +2654,14 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, 
size_t count)
   assert (start + count <= SCM_BYTEVECTOR_LENGTH (src));
 
   do
-    written += ptob->c_write (port, src, start + written, count - written);
+    {
+      size_t ret = ptob->c_write (port, src, start + written, count - written);
+
+      if (ret == (size_t) -1)
+        port_poll (port, POLLOUT, -1);
+      else
+        written += ret;
+    }
   while (written < count);
 
   assert (written == count);
diff --git a/libguile/ports.h b/libguile/ports.h
index 43cd745..2905f68 100644
--- a/libguile/ports.h
+++ b/libguile/ports.h
@@ -90,6 +90,10 @@ SCM_API scm_t_port_type *scm_make_port_type
          size_t (*write) (SCM port, SCM src, size_t start, size_t count));
 SCM_API void scm_set_port_scm_read (scm_t_port_type *ptob, SCM read);
 SCM_API void scm_set_port_scm_write (scm_t_port_type *ptob, SCM write);
+SCM_API void scm_set_port_read_wait_fd (scm_t_port_type *ptob,
+                                        int (*wait_fd) (SCM port));
+SCM_API void scm_set_port_write_wait_fd (scm_t_port_type *ptob,
+                                         int (*wait_fd) (SCM port));
 SCM_API void scm_set_port_print (scm_t_port_type *ptob,
                                 int (*print) (SCM exp,
                                               SCM port,
diff --git a/module/ice-9/ports.scm b/module/ice-9/ports.scm
index a7f2373..4330ebe 100644
--- a/module/ice-9/ports.scm
+++ b/module/ice-9/ports.scm
@@ -179,7 +179,10 @@ interpret its input and output."
             specialize-port-encoding!
             port-random-access?
             port-decode-char
-            port-read-buffering))
+            port-read-buffering
+            port-poll
+            port-read-wait-fd
+            port-write-wait-fd))
 
 (define-syntax-rule (port-buffer-bytevector buf) (vector-ref buf 0))
 (define-syntax-rule (port-buffer-cur buf) (vector-ref buf 1))
@@ -209,7 +212,10 @@ interpret its input and output."
                        specialize-port-encoding!
                        port-decode-char
                        port-random-access?
-                       port-read-buffering)
+                       port-read-buffering
+                       port-poll
+                       port-read-wait-fd
+                       port-write-wait-fd)
 
 ;; And we're back.
 (define-module (ice-9 ports))
diff --git a/module/ice-9/sports.scm b/module/ice-9/sports.scm
index 6fd7ddd..c178b73 100644
--- a/module/ice-9/sports.scm
+++ b/module/ice-9/sports.scm
@@ -54,7 +54,9 @@
   #:replace (peek-char
              read-char)
   #:export (lookahead-u8
-            get-u8))
+            get-u8
+            current-read-waiter
+            current-write-waiter))
 
 (define (write-bytes port src start count)
   (let ((written ((port-write port) port src start count)))
@@ -77,11 +79,25 @@
       (set-port-buffer-end! buf 0)
       (write-bytes port (port-buffer-bytevector buf) cur (- end cur)))))
 
+(define (default-read-waiter port) (port-poll port "r"))
+(define (default-write-waiter port) (port-poll port "w"))
+
+(define current-read-waiter  (make-parameter default-read-waiter))
+(define current-write-waiter (make-parameter default-write-waiter))
+
+(define (wait-for-readable port) ((current-read-waiter) port))
+(define (wait-for-writable port) ((current-write-waiter) port))
+
 (define (read-bytes port dst start count)
-  (let ((read ((port-read port) port dst start count)))
-    (unless (<= 0 read count)
-      (error "bad return from port read function" read))
-    read))
+  (cond
+   (((port-read port) port dst start count)
+    => (lambda (read)
+         (unless (<= 0 read count)
+           (error "bad return from port read function" read))
+         read))
+   (else
+    (wait-for-readable port)
+    (read-bytes port dst start count))))
 
 (define utf8-bom #vu8(#xEF #xBB #xBF))
 (define utf16be-bom #vu8(#xFE #xFF))
diff --git a/test-suite/tests/ports.test b/test-suite/tests/ports.test
index 029dd2d..dfa430e 100644
--- a/test-suite/tests/ports.test
+++ b/test-suite/tests/ports.test
@@ -23,6 +23,7 @@
   #:use-module (test-suite guile-test)
   #:use-module (ice-9 popen)
   #:use-module (ice-9 rdelim)
+  #:use-module (ice-9 threads)
   #:use-module (rnrs bytevectors)
   #:use-module ((ice-9 binary-ports) #:select (open-bytevector-input-port
                                                open-bytevector-output-port
@@ -601,20 +602,18 @@
                           (pass-if "unread residue"
                                    (string=? (read-line) "moon"))))
 
-;;; non-blocking mode on a port.  create a pipe and set O_NONBLOCK on
-;;; the reading end.  try to read a byte: should get EAGAIN or
-;;; EWOULDBLOCK error.
-(let* ((p (pipe))
-       (r (car p)))
-  (fcntl r F_SETFL (logior (fcntl r F_GETFL) O_NONBLOCK))
-  (pass-if "non-blocking-I/O"
-           (catch 'system-error
-                  (lambda () (read-char r) #f)
-                  (lambda (key . args)
-                    (and (eq? key 'system-error)
-                         (let ((errno (car (list-ref args 3))))
-                           (or (= errno EAGAIN)
-                               (= errno EWOULDBLOCK))))))))
+(when (provided? 'threads)
+  (let* ((p (pipe))
+         (r (car p))
+         (w (cdr p)))
+    (fcntl r F_SETFL (logior (fcntl r F_GETFL) O_NONBLOCK))
+    (let ((thread (call-with-new-thread
+                   (lambda ()
+                     (usleep (* 250 1000))
+                     (write-char #\a w)
+                     (force-output w)))))
+      (pass-if-equal "non-blocking-I/O" #\a (read-char r))
+      (join-thread thread))))
 
 
 ;;;; Pipe (popen) ports.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]