[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[shepherd] 08/12: service: Associate a control fiber with each service.
From: |
Ludovic Courtès |
Subject: |
[shepherd] 08/12: service: Associate a control fiber with each service. |
Date: |
Sun, 19 Feb 2023 16:58:37 -0500 (EST) |
civodul pushed a commit to branch wip-service-monitor
in repository shepherd.
commit 6ad9c92a682f06b8486fe11ff6d43afd1d0c4229
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Sun Feb 19 16:14:01 2023 +0100
service: Associate a control fiber with each service.
This moves service state handling from the service monitor to each
"service controller".
* modules/shepherd/service.scm (<service>)[control]: New slot.
(service-control, spawn-service-controller, service-controller): New
procedures.
(start, stop, handle-service-termination, root-service): Send 'start',
'stop', and 'notify-termination' messages to the control channel of the
service rather than to the service monitor.
(service-monitor)[*service-started*, started-message?]
[*service-stopped*, stopped-message?]: Remove.
[stopped?]: New procedure.
Remove 'running', 'starting', and 'stopping' from the 'loop' variables.
Use 'replace-if-running' message when registering a service. Use
'stopped?' when unregistering services. Remove handlers for 'running',
'start', 'stop', 'notify-termination', 'stopped-message?', and
'started-message?', now moved to 'service-controller'.
(remove): New procedure.
---
modules/shepherd/service.scm | 334 +++++++++++++++++++++++--------------------
1 file changed, 177 insertions(+), 157 deletions(-)
diff --git a/modules/shepherd/service.scm b/modules/shepherd/service.scm
index f88781c..ed0766a 100644
--- a/modules/shepherd/service.scm
+++ b/modules/shepherd/service.scm
@@ -275,7 +275,145 @@ Log abnormal termination reported by @var{status}."
(last-respawns #:init-form '())
;; A replacement for when this service is stopped.
(replacement #:init-keyword #:replacement
- #:init-value #f))
+ #:init-value #f)
+
+ ;; Control channel that encapsulates the current state of the service; send
+ ;; requests such as 'start' and 'stop' on this channels.
+ (control #:init-value #f))
+
+(define (service-control service)
+ "Return the controlling channel of @var{service}."
+ ;; Spawn the controlling fiber lazily, hopefully once Fibers has actually
+ ;; been initialized.
+ (or (slot-ref service 'control)
+ (begin
+ (slot-set! service 'control
+ (spawn-service-controller service))
+ (slot-ref service 'control))))
+
+(define (spawn-service-controller service)
+ "Return a channel over which @var{service} may be controlled."
+ (let ((channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (service-controller service channel)))
+ channel))
+
+(define (service-controller service channel)
+ "Encapsulate @var{service} state and serve requests arriving on
+@var{channel}."
+ (define *service-started* (list 'service 'started!))
+ (define (started-message? obj) (eq? *service-started* obj))
+ (define *service-stopped* (list 'service 'stopped!))
+ (define (stopped-message? obj) (eq? *service-stopped* obj))
+
+ (let loop ((status 'stopped)
+ (value #f)
+ (condition #f))
+ (match (get-message channel)
+ (('running reply)
+ (put-message reply value)
+ (loop status value condition))
+ (('status reply)
+ (put-message reply status)
+ (loop status value condition))
+
+ (('start reply)
+ ;; Attempt to start SERVICE, blocking if it is already being started.
+ ;; Send #f on REPLY if SERVICE was already running or being started;
+ ;; otherwise send a channel on which to send SERVICE's value one it
+ ;; has been started.
+ (cond ((eq? 'running status)
+ ;; SERVICE is already running: send #f on REPLY.
+ (put-message reply #f)
+ (loop status value condition))
+ ((eq? 'starting status)
+ ;; SERVICE is being started: wait until it has started and
+ ;; then send #f on REPLY.
+ (spawn-fiber
+ (lambda ()
+ (wait condition)
+ (put-message reply #f)))
+ (loop status value condition))
+ (else
+ ;; Become the one that starts SERVICE.
+ (let ((condition (make-condition))
+ (notification (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (let ((running (get-message notification)))
+ (local-output (l10n "Service ~a started.")
+ (canonical-name service))
+ (put-message channel
+ (list *service-started* running)))))
+ (local-output (l10n "Starting service ~a...")
+ (canonical-name service))
+ (put-message reply notification)
+ (loop 'starting value condition)))))
+ (((? started-message?) value) ;no reply
+ (local-output (l10n "Service ~a running with value ~s.")
+ (canonical-name service) value)
+ (signal-condition! condition)
+ (loop (if (and value (not (one-shot? service)))
+ 'running
+ 'stopped)
+ (and (not (one-shot? service)) value)
+ #f))
+
+ (('stop reply)
+ ;; Attempt to stop SERVICE, blocking if it is already being stopped.
+ ;; Send #f on REPLY if SERVICE was already running or being stopped;
+ ;; otherwise send a channel on which to send a notification once it
+ ;; has been stopped.
+ (cond ((eq? status 'stopping)
+ ;; SERVICE is being stopped: wait until it is stopped and
+ ;; then send #f on REPLY.
+ (spawn-fiber
+ (lambda ()
+ (wait condition)
+ (put-message reply #f)))
+ (loop status value condition))
+ ((not (eq? status 'running))
+ ;; SERVICE is not running: send #f on REPLY.
+ (put-message reply #f)
+ (loop status value condition))
+ (else
+ ;; Become the one that stops SERVICE.
+ (let ((condition (make-condition))
+ (notification (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (let ((stopped? (get-message notification)))
+ (if stopped?
+ (local-output (l10n "Service ~a stopped.")
+ (canonical-name service))
+ (local-output (l10n "Failed to stop ~a.")
+ (canonical-name service)))
+ (put-message channel *service-stopped*))))
+ (local-output (l10n "Stopping service ~a...")
+ (canonical-name service))
+ (put-message reply notification)
+ (loop 'stopping value condition)))))
+ ((? stopped-message?) ;no reply
+ (local-output (l10n "Service ~a is now stopped.")
+ (canonical-name service))
+ (signal-condition! condition)
+ (loop 'stopped #f #f))
+
+ ('notify-termination ;no reply
+ (loop 'stopped #f condition))
+
+ (('replace-if-running replacement reply)
+ (if (eq? status 'running)
+ (begin
+ (local-output (l10n "Recording replacement for ~a.")
+ (canonical-name service))
+ (slot-set! service 'replacement replacement)
+ (put-message reply #t)
+ (loop status value condition))
+ (begin
+ (put-message reply #f)
+ (loop status value condition)))))))
(define (service? obj)
"Return true if OBJ is a service."
@@ -427,7 +565,7 @@ that could not be started."
problems)
;; Start the service itself.
(let ((reply (make-channel)))
- (put-message (current-monitor-channel) `(start ,obj ,reply))
+ (put-message (service-control obj) `(start ,reply))
(match (get-message reply)
(#f
;; We lost the race: OBJ is already running.
@@ -445,8 +583,7 @@ that could not be started."
;; Status message.
(when (one-shot? obj)
- (put-message (current-monitor-channel)
- `(notify-termination ,obj)))
+ (put-message (service-control obj) 'notify-termination))
(local-output (if running
(l10n "Service ~a has been started.")
@@ -499,8 +636,7 @@ is not already running, and will return SERVICE's canonical
name in a list."
;; Stop the service itself.
(let ((reply (make-channel)))
- (put-message (current-monitor-channel)
- `(stop ,service ,reply))
+ (put-message (service-control service) `(stop ,reply))
(match (get-message reply)
(#f
#f)
@@ -521,8 +657,8 @@ is not already running, and will return SERVICE's canonical
name in a list."
(caught-error key args))))))
;; SERVICE is no longer running.
- (put-message (current-monitor-channel)
- `(notify-termination ,service))
+ (put-message (service-control service)
+ 'notify-termination)
;; Restore termination handler.
(slot-set! service 'handle-termination handle-termination)
@@ -709,15 +845,12 @@ clients."
(define (service-monitor channel)
"Encapsulate shepherd state (registered and running services) and serve
requests arriving on @var{channel}."
- (define *service-started* (list 'service 'started!))
- (define (started-message? obj) (eq? *service-started* obj))
- (define *service-stopped* (list 'service 'stopped!))
- (define (stopped-message? obj) (eq? *service-stopped* obj))
+ (define (stopped? service)
+ (let ((reply (make-channel)))
+ (put-message (service-control service) `(status ,reply))
+ (eq? 'stopped (get-message reply))))
- (let loop ((registered vlist-null)
- (running vlist-null)
- (starting vlist-null)
- (stopping vlist-null))
+ (let loop ((registered vlist-null))
(define (unregister services)
;; Return REGISTERED minus SERVICE.
(vhash-fold (lambda (name service result)
@@ -740,37 +873,35 @@ requests arriving on @var{channel}."
(let ((name (canonical-name service)))
(match (vhash-assq name registered)
(#f
- (loop (register service) running starting stopping))
+ (loop (register service)))
((_ . old)
- (if (vhash-assq old running)
- (begin
- (slot-set! old 'replacement service)
- (loop registered running starting stopping))
- (loop (register service (unregister (list old)))
- running starting stopping))))))
+ (let ((reply (make-channel)))
+ (put-message (service-control old)
+ `(replace-if-running ,service ,reply))
+ (match (get-message reply)
+ (#t (loop registered))
+ (#f (loop (register service
+ (unregister (list old)))))))))))
(('unregister services) ;no reply
- (match (filter (cut vhash-assq <> running) services)
+ (match (remove stopped? services)
(()
- (loop (unregister services) running starting stopping))
+ (loop (unregister services)))
(lst ;
(local-output
(l10n "Cannot unregister service ~a, which is still running"
"Cannot unregister services~{ ~a,~} which are still running"
(length lst))
(map canonical-name lst))
- (loop registered running starting stopping))))
+ (loop registered))))
(('unregister-all) ;no reply
(let ((root (cdr (vhash-assq 'root registered))))
(loop (fold (cut vhash-consq <> root <>)
vlist-null
- (provided-by root))
- (vhash-consq root #t running)
- starting
- stopping)))
+ (provided-by root)))))
(('lookup name reply)
(put-message reply
(vhash-foldq* cons '() name registered))
- (loop registered running starting stopping))
+ (loop registered))
(('service-list reply)
(let ((names (delete-duplicates
(vhash-fold (lambda (key _ result)
@@ -786,129 +917,7 @@ requests arriving on @var{channel}."
result))
'()
names))
- (loop registered running starting stopping)))
- (('running service reply)
- (put-message reply
- (match (vhash-assq service running)
- (#f #f)
- ((_ . value) value)))
- (loop registered running starting stopping))
-
- (('start service reply)
- ;; Attempt to start SERVICE, blocking if it is already being started.
- ;; Send #f on REPLY if SERVICE was already running or being started;
- ;; otherwise send a channel on which to send SERVICE's value one it has
- ;; been started.
- (cond ((vhash-assq service running)
- =>
- ;; SERVICE is already running: send #f on REPLY.
- (lambda (pair)
- (match pair
- ((_ . value)
- (put-message reply #f)
- (loop registered running starting stopping)))))
- ((vhash-assq service starting)
- =>
- ;; SERVICE is being started: wait until it has started and then
- ;; send #f on REPLY.
- (lambda (pair)
- (match pair
- ((_ . condition)
- (spawn-fiber
- (lambda ()
- (wait condition)
- (put-message reply #f)))
- (loop registered running starting stopping)))))
- (else
- ;; Become the one who starts SERVICE.
- (let ((condition (make-condition))
- (notification (make-channel)))
- (spawn-fiber
- (lambda ()
- (let ((running (get-message notification)))
- (local-output (l10n "Service ~a started.")
- (canonical-name service))
- (put-message channel
- (list *service-started* service running)))))
- (local-output (l10n "Starting service ~a...")
- (canonical-name service))
- (put-message reply notification)
- (loop registered running
- (vhash-consq service condition starting)
- stopping)))))
- (((? started-message?) service value) ;no reply
- (local-output (l10n "Service ~a running with value ~s.")
- (canonical-name service) value)
- (match (vhash-assq service starting)
- ((_ . condition)
- (signal-condition! condition)
- (loop registered
- (if (or (one-shot? service) (not value))
- running
- (vhash-consq service value running))
- (vhash-delq service starting)
- stopping))))
-
- (('stop service reply)
- ;; Attempt to stop SERVICE, blocking if it is already being stopped.
- ;; Send #f on REPLY if SERVICE was already running or being started;
- ;; otherwise send a channel on which to send a notification once it has
- ;; been stopped.
- (cond ((vhash-assq service stopping)
- =>
- ;; SERVICE is being stopped: wait until it is stopped and then
- ;; send #f on REPLY.
- (lambda (pair)
- (match pair
- ((_ . condition)
- (spawn-fiber
- (lambda ()
- (wait condition)
- (put-message reply #f)))
- (loop registered running starting stopping)))))
- ((not (vhash-assq service running))
- =>
- ;; SERVICE is not running: send #f on REPLY.
- (lambda (pair)
- (match pair
- ((_ . value)
- (put-message reply #f)
- (loop registered running starting stopping)))))
- (else
- ;; Become the one that stops SERVICE.
- (let ((condition (make-condition))
- (notification (make-channel)))
- (spawn-fiber
- (lambda ()
- (let ((stopped? (get-message notification)))
- (if stopped?
- (local-output (l10n "Service ~a stopped.")
- (canonical-name service))
- (local-output (l10n "Failed to stop ~a.")
- (canonical-name service)))
- (put-message channel
- (list *service-stopped* service)))))
- (local-output (l10n "Stopping service ~a...")
- (canonical-name service))
- (put-message reply notification)
- (loop registered running starting
- (vhash-consq service condition stopping))))))
- (((? stopped-message?) service) ;no reply
- (local-output (l10n "Service ~a is now stopped.")
- (canonical-name service))
- (match (vhash-assq service stopping)
- ((_ . condition)
- (signal-condition! condition)
- (loop registered
- (vhash-delq service running)
- starting
- (vhash-delq service stopping)))))
-
- (('notify-termination service) ;no reply
- (loop registered
- (vhash-delq service running) ;XXX: complexity
- starting
- stopping)))))
+ (loop registered))))))
(define (spawn-service-monitor)
"Spawn a new service monitor fiber and return a channel to send it requests."
@@ -937,6 +946,17 @@ service state and to send requests to the service monitor."
+(define (remove pred lst)
+ ;; In Guile <= 3.0.9, 'remove' is written in C and thus introduced a
+ ;; continuation barrier. Provide a Scheme implementation to address that.
+ (let loop ((lst lst)
+ (result '()))
+ (match lst
+ (()
+ (reverse result))
+ ((head . tail)
+ (loop tail (if (pred head) result (cons head result)))))))
+
(cond-expand
(guile-3.0 #t)
(else
@@ -2228,7 +2248,7 @@ PID is in its @code{running} slot; @var{status} is the
process's exit status
as returned by @code{waitpid}. This procedure is called right after the
process has terminated."
(let ((running (service-running-value service)))
- (put-message (current-monitor-channel) `(notify-termination ,service))
+ (put-message (service-control service) 'notify-termination)
((slot-ref service 'handle-termination) service running status)))
(define (respawn-service serv)
@@ -2385,8 +2405,8 @@ where prctl/PR_SET_CHILD_SUBREAPER is unsupported."
(local-output (l10n "Exiting shepherd..."))
;; Prevent that we try to stop ourself again.
- (put-message (current-monitor-channel)
- `(notify-termination ,root-service))
+ (put-message (service-control root-service)
+ 'notify-termination)
(shutdown-services)
(quit))
- [shepherd] 02/12: service: Handle service state in a monitoring agent., (continued)
- [shepherd] 02/12: service: Handle service state in a monitoring agent., Ludovic Courtès, 2023/02/19
- [shepherd] 06/12: service: 'stop' blocks when a service is already being stopped., Ludovic Courtès, 2023/02/19
- [shepherd] 09/12: service: Rename "service monitor" to "service registry"., Ludovic Courtès, 2023/02/19
- [shepherd] 10/12: service: Clarify expected statuses., Ludovic Courtès, 2023/02/19
- [shepherd] 05/12: service: 'start-in-the-background' starts services in parallel., Ludovic Courtès, 2023/02/19
- [shepherd] 03/12: service: 'start' blocks when a service is already being started., Ludovic Courtès, 2023/02/19
- [shepherd] 04/12: service: Start dependent services in parallel., Ludovic Courtès, 2023/02/19
- [shepherd] 07/12: service: 'service-running-value' uses a fresh channel for each reply., Ludovic Courtès, 2023/02/19
- [shepherd] 11/12: service: Communicate the service status symbol to clients., Ludovic Courtès, 2023/02/19
- [shepherd] 12/12: herd: Report 'starting' and 'stopping' service statuses., Ludovic Courtès, 2023/02/19
- [shepherd] 08/12: service: Associate a control fiber with each service.,
Ludovic Courtès <=