guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[shepherd] 08/12: service: Associate a control fiber with each service.


From: Ludovic Courtès
Subject: [shepherd] 08/12: service: Associate a control fiber with each service.
Date: Sun, 19 Feb 2023 16:58:37 -0500 (EST)

civodul pushed a commit to branch wip-service-monitor
in repository shepherd.

commit 6ad9c92a682f06b8486fe11ff6d43afd1d0c4229
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Sun Feb 19 16:14:01 2023 +0100

    service: Associate a control fiber with each service.
    
    This moves service state handling from the service monitor to each
    "service controller".
    
    * modules/shepherd/service.scm (<service>)[control]: New slot.
    (service-control, spawn-service-controller, service-controller): New
    procedures.
    (start, stop, handle-service-termination, root-service): Send 'start',
    'stop', and 'notify-termination' messages to the control channel of the
    service rather than to the service monitor.
    (service-monitor)[*service-started*, started-message?]
    [*service-stopped*, stopped-message?]: Remove.
    [stopped?]: New procedure.
    Remove 'running', 'starting', and 'stopping' from the 'loop' variables.
    Use 'replace-if-running' message when registering a service.  Use
    'stopped?' when unregistering services.  Remove handlers for 'running',
    'start', 'stop', 'notify-termination', 'stopped-message?', and
    'started-message?', now moved to 'service-controller'.
    (remove): New procedure.
---
 modules/shepherd/service.scm | 334 +++++++++++++++++++++++--------------------
 1 file changed, 177 insertions(+), 157 deletions(-)

diff --git a/modules/shepherd/service.scm b/modules/shepherd/service.scm
index f88781c..ed0766a 100644
--- a/modules/shepherd/service.scm
+++ b/modules/shepherd/service.scm
@@ -275,7 +275,145 @@ Log abnormal termination reported by @var{status}."
   (last-respawns #:init-form '())
   ;; A replacement for when this service is stopped.
   (replacement #:init-keyword #:replacement
-               #:init-value #f))
+               #:init-value #f)
+
+  ;; Control channel that encapsulates the current state of the service; send
+  ;; requests such as 'start' and 'stop' on this channels.
+  (control #:init-value #f))
+
+(define (service-control service)
+  "Return the controlling channel of @var{service}."
+  ;; Spawn the controlling fiber lazily, hopefully once Fibers has actually
+  ;; been initialized.
+  (or (slot-ref service 'control)
+      (begin
+        (slot-set! service 'control
+                   (spawn-service-controller service))
+        (slot-ref service 'control))))
+
+(define (spawn-service-controller service)
+  "Return a channel over which @var{service} may be controlled."
+  (let ((channel (make-channel)))
+    (spawn-fiber
+     (lambda ()
+       (service-controller service channel)))
+    channel))
+
+(define (service-controller service channel)
+  "Encapsulate @var{service} state and serve requests arriving on
+@var{channel}."
+  (define *service-started* (list 'service 'started!))
+  (define (started-message? obj) (eq? *service-started* obj))
+  (define *service-stopped* (list 'service 'stopped!))
+  (define (stopped-message? obj) (eq? *service-stopped* obj))
+
+  (let loop ((status 'stopped)
+             (value #f)
+             (condition #f))
+    (match (get-message channel)
+      (('running reply)
+       (put-message reply value)
+       (loop status value condition))
+      (('status reply)
+       (put-message reply status)
+       (loop status value condition))
+
+      (('start reply)
+       ;; Attempt to start SERVICE, blocking if it is already being started.
+       ;; Send #f on REPLY if SERVICE was already running or being started;
+       ;; otherwise send a channel on which to send SERVICE's value one it
+       ;; has been started.
+       (cond ((eq? 'running status)
+              ;; SERVICE is already running: send #f on REPLY.
+              (put-message reply #f)
+              (loop status value condition))
+             ((eq? 'starting status)
+              ;; SERVICE is being started: wait until it has started and
+              ;; then send #f on REPLY.
+              (spawn-fiber
+               (lambda ()
+                 (wait condition)
+                 (put-message reply #f)))
+              (loop status value condition))
+             (else
+              ;; Become the one that starts SERVICE.
+              (let ((condition (make-condition))
+                    (notification (make-channel)))
+                (spawn-fiber
+                 (lambda ()
+                   (let ((running (get-message notification)))
+                     (local-output (l10n "Service ~a started.")
+                                   (canonical-name service))
+                     (put-message channel
+                                  (list *service-started* running)))))
+                (local-output (l10n "Starting service ~a...")
+                              (canonical-name service))
+                (put-message reply notification)
+                (loop 'starting value condition)))))
+      (((? started-message?) value)               ;no reply
+       (local-output (l10n "Service ~a running with value ~s.")
+                     (canonical-name service) value)
+       (signal-condition! condition)
+       (loop (if (and value (not (one-shot? service)))
+                 'running
+                 'stopped)
+             (and (not (one-shot? service)) value)
+             #f))
+
+      (('stop reply)
+       ;; Attempt to stop SERVICE, blocking if it is already being stopped.
+       ;; Send #f on REPLY if SERVICE was already running or being stopped;
+       ;; otherwise send a channel on which to send a notification once it
+       ;; has been stopped.
+       (cond ((eq? status 'stopping)
+              ;; SERVICE is being stopped: wait until it is stopped and
+              ;; then send #f on REPLY.
+              (spawn-fiber
+               (lambda ()
+                 (wait condition)
+                 (put-message reply #f)))
+              (loop status value condition))
+             ((not (eq? status 'running))
+              ;; SERVICE is not running: send #f on REPLY.
+              (put-message reply #f)
+              (loop status value condition))
+             (else
+              ;; Become the one that stops SERVICE.
+              (let ((condition (make-condition))
+                    (notification (make-channel)))
+                (spawn-fiber
+                 (lambda ()
+                   (let ((stopped? (get-message notification)))
+                     (if stopped?
+                         (local-output (l10n "Service ~a stopped.")
+                                       (canonical-name service))
+                         (local-output (l10n "Failed to stop ~a.")
+                                       (canonical-name service)))
+                     (put-message channel *service-stopped*))))
+                (local-output (l10n "Stopping service ~a...")
+                              (canonical-name service))
+                (put-message reply notification)
+                (loop 'stopping value condition)))))
+      ((? stopped-message?)                       ;no reply
+       (local-output (l10n "Service ~a is now stopped.")
+                     (canonical-name service))
+       (signal-condition! condition)
+       (loop 'stopped #f #f))
+
+      ('notify-termination                        ;no reply
+       (loop 'stopped #f condition))
+
+      (('replace-if-running replacement reply)
+       (if (eq? status 'running)
+           (begin
+             (local-output (l10n "Recording replacement for ~a.")
+                           (canonical-name service))
+             (slot-set! service 'replacement replacement)
+             (put-message reply #t)
+             (loop status value condition))
+           (begin
+             (put-message reply #f)
+             (loop status value condition)))))))
 
 (define (service? obj)
   "Return true if OBJ is a service."
@@ -427,7 +565,7 @@ that could not be started."
                            problems)
                  ;; Start the service itself.
                  (let ((reply (make-channel)))
-                   (put-message (current-monitor-channel) `(start ,obj ,reply))
+                   (put-message (service-control obj) `(start ,reply))
                    (match (get-message reply)
                      (#f
                       ;; We lost the race: OBJ is already running.
@@ -445,8 +583,7 @@ that could not be started."
 
           ;; Status message.
            (when (one-shot? obj)
-             (put-message (current-monitor-channel)
-                          `(notify-termination ,obj)))
+             (put-message (service-control obj) 'notify-termination))
 
            (local-output (if running
                             (l10n "Service ~a has been started.")
@@ -499,8 +636,7 @@ is not already running, and will return SERVICE's canonical 
name in a list."
 
         ;; Stop the service itself.
         (let ((reply (make-channel)))
-          (put-message (current-monitor-channel)
-                       `(stop ,service ,reply))
+          (put-message (service-control service) `(stop ,reply))
           (match (get-message reply)
             (#f
              #f)
@@ -521,8 +657,8 @@ is not already running, and will return SERVICE's canonical 
name in a list."
                  (caught-error key args))))))
 
         ;; SERVICE is no longer running.
-        (put-message (current-monitor-channel)
-                     `(notify-termination ,service))
+        (put-message (service-control service)
+                     'notify-termination)
 
         ;; Restore termination handler.
         (slot-set! service 'handle-termination handle-termination)
@@ -709,15 +845,12 @@ clients."
 (define (service-monitor channel)
   "Encapsulate shepherd state (registered and running services) and serve
 requests arriving on @var{channel}."
-  (define *service-started* (list 'service 'started!))
-  (define (started-message? obj) (eq? *service-started* obj))
-  (define *service-stopped* (list 'service 'stopped!))
-  (define (stopped-message? obj) (eq? *service-stopped* obj))
+  (define (stopped? service)
+    (let ((reply (make-channel)))
+      (put-message (service-control service) `(status ,reply))
+      (eq? 'stopped (get-message reply))))
 
-  (let loop ((registered vlist-null)
-             (running vlist-null)
-             (starting vlist-null)
-             (stopping vlist-null))
+  (let loop ((registered vlist-null))
     (define (unregister services)
       ;; Return REGISTERED minus SERVICE.
       (vhash-fold (lambda (name service result)
@@ -740,37 +873,35 @@ requests arriving on @var{channel}."
        (let ((name (canonical-name service)))
          (match (vhash-assq name registered)
            (#f
-            (loop (register service) running starting stopping))
+            (loop (register service)))
            ((_ . old)
-            (if (vhash-assq old running)
-                (begin
-                  (slot-set! old 'replacement service)
-                  (loop registered running starting stopping))
-                (loop (register service (unregister (list old)))
-                      running starting stopping))))))
+            (let ((reply (make-channel)))
+              (put-message (service-control old)
+                           `(replace-if-running ,service ,reply))
+              (match (get-message reply)
+                (#t (loop registered))
+                (#f (loop (register service
+                                    (unregister (list old)))))))))))
       (('unregister services)                     ;no reply
-       (match (filter (cut vhash-assq <> running) services)
+       (match (remove stopped? services)
          (()
-          (loop (unregister services) running starting stopping))
+          (loop (unregister services)))
          (lst                                     ;
           (local-output
            (l10n "Cannot unregister service ~a, which is still running"
                  "Cannot unregister services~{ ~a,~} which are still running"
                  (length lst))
            (map canonical-name lst))
-          (loop registered running starting stopping))))
+          (loop registered))))
       (('unregister-all)                          ;no reply
        (let ((root (cdr (vhash-assq 'root registered))))
          (loop (fold (cut vhash-consq <> root <>)
                      vlist-null
-                     (provided-by root))
-               (vhash-consq root #t running)
-               starting
-               stopping)))
+                     (provided-by root)))))
       (('lookup name reply)
        (put-message reply
                     (vhash-foldq* cons '() name registered))
-       (loop registered running starting stopping))
+       (loop registered))
       (('service-list reply)
        (let ((names (delete-duplicates
                      (vhash-fold (lambda (key _ result)
@@ -786,129 +917,7 @@ requests arriving on @var{channel}."
                                           result))
                             '()
                             names))
-         (loop registered running starting stopping)))
-      (('running service reply)
-       (put-message reply
-                    (match (vhash-assq service running)
-                      (#f #f)
-                      ((_ . value) value)))
-       (loop registered running starting stopping))
-
-      (('start service reply)
-       ;; Attempt to start SERVICE, blocking if it is already being started.
-       ;; Send #f on REPLY if SERVICE was already running or being started;
-       ;; otherwise send a channel on which to send SERVICE's value one it has
-       ;; been started.
-       (cond ((vhash-assq service running)
-              =>
-              ;; SERVICE is already running: send #f on REPLY.
-              (lambda (pair)
-                (match pair
-                  ((_ . value)
-                   (put-message reply #f)
-                   (loop registered running starting stopping)))))
-             ((vhash-assq service starting)
-              =>
-              ;; SERVICE is being started: wait until it has started and then
-              ;; send #f on REPLY.
-              (lambda (pair)
-                (match pair
-                  ((_ . condition)
-                   (spawn-fiber
-                    (lambda ()
-                      (wait condition)
-                      (put-message reply #f)))
-                   (loop registered running starting stopping)))))
-             (else
-              ;; Become the one who starts SERVICE.
-              (let ((condition (make-condition))
-                    (notification (make-channel)))
-                (spawn-fiber
-                 (lambda ()
-                   (let ((running (get-message notification)))
-                     (local-output (l10n "Service ~a started.")
-                                   (canonical-name service))
-                     (put-message channel
-                                  (list *service-started* service running)))))
-                (local-output (l10n "Starting service ~a...")
-                              (canonical-name service))
-                (put-message reply notification)
-                (loop registered running
-                      (vhash-consq service condition starting)
-                      stopping)))))
-      (((? started-message?) service value)       ;no reply
-       (local-output (l10n "Service ~a running with value ~s.")
-                     (canonical-name service) value)
-       (match (vhash-assq service starting)
-         ((_ . condition)
-          (signal-condition! condition)
-          (loop registered
-                (if (or (one-shot? service) (not value))
-                    running
-                    (vhash-consq service value running))
-                (vhash-delq service starting)
-                stopping))))
-
-      (('stop service reply)
-       ;; Attempt to stop SERVICE, blocking if it is already being stopped.
-       ;; Send #f on REPLY if SERVICE was already running or being started;
-       ;; otherwise send a channel on which to send a notification once it has
-       ;; been stopped.
-       (cond ((vhash-assq service stopping)
-              =>
-              ;; SERVICE is being stopped: wait until it is stopped and then
-              ;; send #f on REPLY.
-              (lambda (pair)
-                (match pair
-                  ((_ . condition)
-                   (spawn-fiber
-                    (lambda ()
-                      (wait condition)
-                      (put-message reply #f)))
-                   (loop registered running starting stopping)))))
-             ((not (vhash-assq service running))
-              =>
-              ;; SERVICE is not running: send #f on REPLY.
-              (lambda (pair)
-                (match pair
-                  ((_ . value)
-                   (put-message reply #f)
-                   (loop registered running starting stopping)))))
-             (else
-              ;; Become the one that stops SERVICE.
-              (let ((condition (make-condition))
-                    (notification (make-channel)))
-                (spawn-fiber
-                 (lambda ()
-                   (let ((stopped? (get-message notification)))
-                     (if stopped?
-                         (local-output (l10n "Service ~a stopped.")
-                                       (canonical-name service))
-                         (local-output (l10n "Failed to stop ~a.")
-                                       (canonical-name service)))
-                     (put-message channel
-                                  (list *service-stopped* service)))))
-                (local-output (l10n "Stopping service ~a...")
-                              (canonical-name service))
-                (put-message reply notification)
-                (loop registered running starting
-                      (vhash-consq service condition stopping))))))
-      (((? stopped-message?) service)             ;no reply
-       (local-output (l10n "Service ~a is now stopped.")
-                     (canonical-name service))
-       (match (vhash-assq service stopping)
-         ((_ . condition)
-          (signal-condition! condition)
-          (loop registered
-                (vhash-delq service running)
-                starting
-                (vhash-delq service stopping)))))
-
-      (('notify-termination service)              ;no reply
-       (loop registered
-             (vhash-delq service running)         ;XXX: complexity
-             starting
-             stopping)))))
+         (loop registered))))))
 
 (define (spawn-service-monitor)
   "Spawn a new service monitor fiber and return a channel to send it requests."
@@ -937,6 +946,17 @@ service state and to send requests to the service monitor."
 
 
 
+(define (remove pred lst)
+  ;; In Guile <= 3.0.9, 'remove' is written in C and thus introduced a
+  ;; continuation barrier.  Provide a Scheme implementation to address that.
+  (let loop ((lst lst)
+             (result '()))
+    (match lst
+      (()
+       (reverse result))
+      ((head . tail)
+       (loop tail (if (pred head) result (cons head result)))))))
+
 (cond-expand
  (guile-3.0 #t)
  (else
@@ -2228,7 +2248,7 @@ PID is in its @code{running} slot; @var{status} is the 
process's exit status
 as returned by @code{waitpid}.  This procedure is called right after the
 process has terminated."
   (let ((running (service-running-value service)))
-    (put-message (current-monitor-channel) `(notify-termination ,service))
+    (put-message (service-control service) 'notify-termination)
     ((slot-ref service 'handle-termination) service running status)))
 
 (define (respawn-service serv)
@@ -2385,8 +2405,8 @@ where prctl/PR_SET_CHILD_SUBREAPER is unsupported."
             (local-output (l10n "Exiting shepherd..."))
 
             ;; Prevent that we try to stop ourself again.
-            (put-message (current-monitor-channel)
-                          `(notify-termination ,root-service))
+            (put-message (service-control root-service)
+                          'notify-termination)
 
              (shutdown-services)
             (quit))



reply via email to

[Prev in Thread] Current Thread [Next in Thread]