guile-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Guile-commits] 02/03: join-thread in Scheme


From: Andy Wingo
Subject: [Guile-commits] 02/03: join-thread in Scheme
Date: Mon, 14 Nov 2016 21:02:39 +0000 (UTC)

wingo pushed a commit to branch master
in repository guile.

commit a52144002911f217e03155336ce0980ac8b5b2af
Author: Andy Wingo <address@hidden>
Date:   Mon Nov 14 21:35:44 2016 +0100

    join-thread in Scheme
    
    * module/ice-9/threads.scm (join-thread): Implement in Scheme.
      (call-with-new-thread): Arrange to record values in a weak table and
      signal the join cond.
      (with-mutex): Move up definition; call-with-new-thread needs it.  (How
      was this working before?)
    * libguile/threads.c (guilify_self_1, guilify_self_2, do_thread_exit):
      Remove join queue management.
    * libguile/threads.c (scm_join_thread, scm_join_thread_timed): Call out
      to Scheme.
      (scm_init_ice_9_threads): Capture join-thread var.
---
 libguile/threads.c       |   87 ++++++++--------------------------------------
 libguile/threads.h       |    2 --
 module/ice-9/threads.scm |   70 +++++++++++++++++++++++++++++--------
 3 files changed, 70 insertions(+), 89 deletions(-)

diff --git a/libguile/threads.c b/libguile/threads.c
index 9732057..2798be7 100644
--- a/libguile/threads.c
+++ b/libguile/threads.c
@@ -408,7 +408,6 @@ guilify_self_1 (struct GC_stack_base *base)
   t.pthread = scm_i_pthread_self ();
   t.handle = SCM_BOOL_F;
   t.result = SCM_BOOL_F;
-  t.join_queue = SCM_EOL;
   t.freelists = NULL;
   t.pointerless_freelists = NULL;
   t.dynamic_state = SCM_BOOL_F;
@@ -491,7 +490,6 @@ guilify_self_2 (SCM parent)
   t->dynstack.limit = t->dynstack.base + 16;
   t->dynstack.top = t->dynstack.base + SCM_DYNSTACK_HEADER_LEN;
 
-  t->join_queue = make_queue ();
   t->block_asyncs = 0;
 
   /* See note in finalizers.c:queue_finalizer_async().  */
@@ -509,13 +507,9 @@ do_thread_exit (void *v)
   scm_i_thread *t = (scm_i_thread *) v;
 
   scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
-
   t->exited = 1;
   close (t->sleep_pipe[0]);
   close (t->sleep_pipe[1]);
-  while (scm_is_true (unblock_from_queue (t->join_queue)))
-    ;
-
   scm_i_pthread_mutex_unlock (&t->admin_mutex);
 
   return NULL;
@@ -867,9 +861,6 @@ SCM_DEFINE (scm_yield, "yield", 0, 0, 0,
 }
 #undef FUNC_NAME
 
-/* Some systems, notably Android, lack 'pthread_cancel'.  Don't provide
-   'cancel-thread' on these systems.  */
-
 static SCM cancel_thread_var;
 
 SCM
@@ -879,79 +870,26 @@ scm_cancel_thread (SCM thread)
   return SCM_UNSPECIFIED;
 }
 
+static SCM join_thread_var;
+
 SCM
 scm_join_thread (SCM thread)
 {
-  return scm_join_thread_timed (thread, SCM_UNDEFINED, SCM_UNDEFINED);
+  return scm_call_1 (scm_variable_ref (join_thread_var), thread);
 }
 
-SCM_DEFINE (scm_join_thread_timed, "join-thread", 1, 2, 0,
-           (SCM thread, SCM timeout, SCM timeoutval),
-"Suspend execution of the calling thread until the target @var{thread} "
-"terminates, unless the target @var{thread} has already terminated. ")
-#define FUNC_NAME s_scm_join_thread_timed
+SCM
+scm_join_thread_timed (SCM thread, SCM timeout, SCM timeoutval)
 {
-  scm_i_thread *t;
-  scm_t_timespec ctimeout, *timeout_ptr = NULL;
-  SCM res = SCM_BOOL_F;
-
-  if (! (SCM_UNBNDP (timeoutval)))
-    res = timeoutval;
-
-  SCM_VALIDATE_THREAD (1, thread);
-  if (scm_is_eq (scm_current_thread (), thread))
-    SCM_MISC_ERROR ("cannot join the current thread", SCM_EOL);
-
-  t = SCM_I_THREAD_DATA (thread);
-  scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
+  SCM join_thread = scm_variable_ref (join_thread_var);
 
-  if (! SCM_UNBNDP (timeout))
-    {
-      to_timespec (timeout, &ctimeout);
-      timeout_ptr = &ctimeout;
-    }
-
-  if (t->exited)
-    res = t->result;
+  if (SCM_UNBNDP (timeout))
+    return scm_call_1 (join_thread, thread);
+  else if (SCM_UNBNDP (timeoutval))
+    return scm_call_2 (join_thread, thread, timeout);
   else
-    {
-      while (1)
-       {
-         int err = block_self (t->join_queue, &t->admin_mutex,
-                               timeout_ptr);
-          scm_remember_upto_here_1 (thread);
-         if (err == 0)
-           {
-             if (t->exited)
-               {
-                 res = t->result;
-                 break;
-               }
-           }
-         else if (err == ETIMEDOUT)
-           break;
-
-         scm_i_pthread_mutex_unlock (&t->admin_mutex);
-         SCM_TICK;
-         scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
-
-         /* Check for exit again, since we just released and
-            reacquired the admin mutex, before the next block_self
-            call (which would block forever if t has already
-            exited). */
-         if (t->exited)
-           {
-             res = t->result;
-             break;
-           }
-       }
-    }
-
-  scm_i_pthread_mutex_unlock (&t->admin_mutex);
-
-  return res;
+    return scm_call_3 (join_thread, thread, timeout, timeoutval);
 }
-#undef FUNC_NAME
 
 SCM_DEFINE (scm_thread_p, "thread?", 1, 0, 0,
            (SCM obj),
@@ -1875,6 +1813,9 @@ scm_init_ice_9_threads (void *unused)
   cancel_thread_var =
     scm_module_variable (scm_current_module (),
                          scm_from_latin1_symbol ("cancel-thread"));
+  join_thread_var =
+    scm_module_variable (scm_current_module (),
+                         scm_from_latin1_symbol ("join-thread"));
   call_with_new_thread_var =
     scm_module_variable (scm_current_module (),
                          scm_from_latin1_symbol ("call-with-new-thread"));
diff --git a/libguile/threads.h b/libguile/threads.h
index db52f16..986049c 100644
--- a/libguile/threads.h
+++ b/libguile/threads.h
@@ -55,8 +55,6 @@ typedef struct scm_i_thread {
   SCM handle;
   scm_i_pthread_t pthread;
 
-  SCM join_queue;
-
   scm_i_pthread_mutex_t admin_mutex;
 
   SCM result;
diff --git a/module/ice-9/threads.scm b/module/ice-9/threads.scm
index 119334b..ae6a97d 100644
--- a/module/ice-9/threads.scm
+++ b/module/ice-9/threads.scm
@@ -85,6 +85,13 @@
 
 
 
+(define-syntax-rule (with-mutex m e0 e1 ...)
+  (let ((x m))
+    (dynamic-wind
+      (lambda () (lock-mutex x))
+      (lambda () (begin e0 e1 ...))
+      (lambda () (unlock-mutex x)))))
+
 (define cancel-tag (make-prompt-tag "cancel"))
 (define (cancel-thread thread . values)
   "Asynchronously interrupt the target @var{thread} and ask it to
@@ -101,6 +108,9 @@ no-op."
          (error "thread cancellation failed, throwing error instead???"))))
    thread))
 
+(define thread-join-data (make-object-property))
+(define %thread-results (make-object-property))
+
 (define* (call-with-new-thread thunk #:optional handler)
   "Call @code{thunk} in a new thread and with a new dynamic state,
 returning a new thread object representing the thread.  The procedure
@@ -121,21 +131,60 @@ Once @var{thunk} or @var{handler} returns, the return 
value is made the
     (with-mutex mutex
       (%call-with-new-thread
        (lambda ()
-         (call-with-prompt cancel-tag
-           (lambda ()
+         (call-with-values
+             (lambda ()
+               (with-continuation-barrier
+                (lambda ()
+                  (call-with-prompt cancel-tag
+                    (lambda ()
+                      (lock-mutex mutex)
+                      (set! thread (current-thread))
+                      (set! (thread-join-data thread) (cons cv mutex))
+                      (signal-condition-variable cv)
+                      (unlock-mutex mutex)
+                      (thunk))
+                    (lambda (k . args)
+                      (apply values args))))))
+           (lambda vals
              (lock-mutex mutex)
-             (set! thread (current-thread))
-             (signal-condition-variable cv)
+             ;; Probably now you're wondering why we are going to use
+             ;; the cond variable as the key into the thread results
+             ;; object property.  It's because there is a possibility
+             ;; that the thread object itself ends up as part of the
+             ;; result, and if that happens we create a cycle whereby
+             ;; the strong reference to a thread in the value of the
+             ;; weak-key hash table used by the object property prevents
+             ;; the thread from ever being collected.  So instead we use
+             ;; the cv as the key.  Weak-key hash tables, amirite?
+             (set! (%thread-results cv) vals)
+             (broadcast-condition-variable cv)
              (unlock-mutex mutex)
-             (thunk))
-           (lambda (k . args)
-             (apply values args)))))
+             (apply values vals)))))
       (let lp ()
         (unless thread
           (wait-condition-variable cv mutex)
           (lp))))
     thread))
 
+(define* (join-thread thread #:optional timeout timeoutval)
+  "Suspend execution of the calling thread until the target @var{thread}
+terminates, unless the target @var{thread} has already terminated."
+  (match (thread-join-data thread)
+    (#f (error "foreign thread cannot be joined" thread))
+    ((cv . mutex)
+     (lock-mutex mutex)
+     (let lp ()
+       (cond
+        ((%thread-results cv)
+         => (lambda (results)
+              (unlock-mutex mutex)
+              (apply values results)))
+        ((if timeout
+             (wait-condition-variable cv mutex timeout)
+             (wait-condition-variable cv mutex))
+         (lp))
+        (else timeoutval))))))
+
 (define* (try-mutex mutex)
   "Try to lock @var{mutex}.  If the mutex is already locked, return
 @code{#f}.  Otherwise lock the mutex and return @code{#t}."
@@ -155,13 +204,6 @@ Once @var{thunk} or @var{handler} returns, the return 
value is made the
    (lambda () (proc arg ...))
    %thread-handler))
 
-(define-syntax-rule (with-mutex m e0 e1 ...)
-  (let ((x m))
-    (dynamic-wind
-      (lambda () (lock-mutex x))
-      (lambda () (begin e0 e1 ...))
-      (lambda () (unlock-mutex x)))))
-
 (define monitor-mutex-table (make-hash-table))
 
 (define monitor-mutex-table-mutex (make-mutex))



reply via email to

[Prev in Thread] Current Thread [Next in Thread]