[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 18/42: cadet/client: Rewrite with run-loop.
From: |
gnunet |
Subject: |
[gnunet-scheme] 18/42: cadet/client: Rewrite with run-loop. |
Date: |
Sat, 10 Sep 2022 19:08:11 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit e7f9505a7f08a7458bf792ab75ad4065ad35711d
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Fri Sep 9 16:18:15 2022 +0200
cadet/client: Rewrite with run-loop.
Reduces duplication, should increase test coverage.
* gnu/gnunet/cadet/client.scm
(connect)[loop]: Set remaining arguments and use 'run-loop'.
(reconnect): Split into ...
(make-message-handlers,control-message-handlers): ... these new
procedures, adjusting for 'run-loop'.
---
gnu/gnunet/cadet/client.scm | 400 ++++++++++++++++++++++----------------------
1 file changed, 196 insertions(+), 204 deletions(-)
diff --git a/gnu/gnunet/cadet/client.scm b/gnu/gnunet/cadet/client.scm
index 04ceb35..f599637 100644
--- a/gnu/gnunet/cadet/client.scm
+++ b/gnu/gnunet/cadet/client.scm
@@ -165,12 +165,16 @@
(define server (%make-server))
(define loop
(apply make-loop
+ #:make-message-handlers make-message-handlers
+ #:make-error-handler* make-error-handler*/loop
+ #:control-message-handler control-message-handler
+ #:service-name "cadet"
#:configuration config
#:connected connected
#:disconnected disconnected
#:spawn spawn
(server->loop-arguments server)))
- (spawn (lambda () (reconnect loop empty-bbtree)))
+ (spawn (lambda () (run-loop loop empty-bbtree
%minimum-local-channel-id)))
server)
;; channel-number->channel-map:
@@ -180,57 +184,48 @@
;; has been closed.
;;
;; TODO: GC problems, split in external and internal parts
- (define (reconnect loop channel-number->channel-map)
- (define loop-operation
- (choice-operation
- (get-operation (loop:control-channel loop))
- (wrap-operation (collect-lost-and-found-operation (loop:lost-and-found
loop))
- (lambda (lost) (cons 'lost lost)))))
- (define handlers
- (message-handlers
- (message-handler
- (type (symbol-value message-type msg:cadet:local:data))
- ((interpose exp) exp)
- ((well-formed? slice) #true)
- ((handle! slice)
- (let^ ((! cadet-data-length (sizeof /:msg:cadet:local:data '()))
- (! header (slice-slice slice 0 cadet-data-length))
- (! tail (slice-slice slice cadet-data-length))
- (! channel-number
- (read% /:msg:cadet:local:data '(channel-number) header))
- (! channel
- (maybe-ask* (loop:terminal-condition loop)
- (loop:control-channel loop) 'channel
- channel-number))
- (? (not channel)
- ???))
- ;; TODO: while the message is being processed, other messages
- ;; cannot be accepted -- document this limitation.
- (inject-message! (channel-message-queue channel) tail))))
- (message-handler
- (type (symbol-value message-type msg:cadet:local:acknowledgement))
- ((interpose exp) exp)
- ((well-formed? slice)
- (= (slice-length slice) (sizeof /:msg:cadet:local:acknowledgement
'())))
- ((handle! slice)
- ;; The slice needs to be read here (and not in 'control'), as it
might
- ;; later be reused for something different.
- (let ((channel-number (analyse-local-acknowledgement slice)))
- (maybe-send-control-message!*
- (loop:terminal-condition loop)
- (loop:control-channel loop)
- 'acknowledgement
- channel-number))))))
- (define error-handler (make-error-handler*/loop loop))
- (define mq (connect/fibers
- (loop:configuration loop) "cadet" handlers error-handler
- #:spawn (loop:spawner loop)))
+
+ (define (make-message-handlers loop . _)
+ (message-handlers
+ (message-handler
+ (type (symbol-value message-type msg:cadet:local:data))
+ ((interpose exp) exp)
+ ((well-formed? slice) #true)
+ ((handle! slice)
+ (let^ ((! cadet-data-length (sizeof /:msg:cadet:local:data '()))
+ (! header (slice-slice slice 0 cadet-data-length))
+ (! tail (slice-slice slice cadet-data-length))
+ (! channel-number
+ (read% /:msg:cadet:local:data '(channel-number) header))
+ (! channel
+ (maybe-ask* (loop:terminal-condition loop)
+ (loop:control-channel loop) 'channel
+ channel-number))
+ (? (not channel)
+ ???))
+ ;; TODO: while the message is being processed, other messages
+ ;; cannot be accepted -- document this limitation.
+ (inject-message! (channel-message-queue channel) tail))))
+ (message-handler
+ (type (symbol-value message-type msg:cadet:local:acknowledgement))
+ ((interpose exp) exp)
+ ((well-formed? slice)
+ (= (slice-length slice)
+ (sizeof /:msg:cadet:local:acknowledgement '())))
+ ((handle! slice)
+ ;; The slice needs to be read here (and not in 'control'), as it might
+ ;; later be reused for something different.
+ (let ((channel-number (analyse-local-acknowledgement slice)))
+ (maybe-send-control-message!*
+ (loop:terminal-condition loop) (loop:control-channel loop)
+ 'acknowledgement channel-number))))))
+
+ (define (control-message-handler message control control* message-queue
loop
+ channel-number->channel-map
+ next-free-channel-number)
+ "The main event loop"
(define (k/reconnect! channel-number->channel-map)
- (reconnect loop channel-number->channel-map))
- (define (control loop channel-number->channel-map
next-free-channel-number)
- "The main event loop."
- (control* loop channel-number->channel-map next-free-channel-number
- (perform-operation loop-operation)))
+ (run-loop loop channel-number->channel-map next-free-channel-number))
(define (close-if-possible! channel)
;; Pre-conditions:
;; * the channel is open
@@ -238,166 +233,163 @@
;;
;; TODO: untested.
(when (= (message-queue-length (channel-message-queue channel)) 0)
- (send-message! mq
+ (send-message! message-queue
(construct-local-channel-destroy
(channel-channel-number channel)))
;; We don't need the envelope.
(values)))
- (define (control* loop channel-number->channel-map
next-free-channel-number
- message)
- (define (continue)
- (control loop channel-number->channel-map next-free-channel-number))
- (define (continue* message)
- (control* loop channel-number->channel-map next-free-channel-number
- message))
- ;; TODO: what about closed channels?
- (define (send-channel-stuff! channel)
- ;; Send messages one-by-one, keeping in mind that we might not be able
- ;; to send all messages to the service at once, only
'channel-allow-send'
- ;; messages can be sent and this decreases by sending messages.
- ;;
- ;; TODO: use priority information, somehow when cancelling a message
- ;; cancel the corresponding message to be sent to the CADET service
when
- ;; there is still time, zero-copy networking.
- (let/ec
- stop
- (define (stop-if-exhausted)
- ;; The mutation 'replace > by >=' is caught by
- ;; "data is not sent before an acknowledgement"
- ;; in form of a hang.
- (if (> (channel-allow-send channel) 0)
- ;; (unless ...) and (when ...) can return *unspecified*,
- ;; but (gnu gnunet mq) expects no return values. Detected
- ;; by the "data is properly sent in response to
acknowledgements, in-order"
- ;; test.
- (values)
- (stop)))
- ;; Tested by ‘data is properly sent in response to acknowledgements,
in-order’
- ;; -- it catches the mutation 'replace 1 by zero' (as a hang)
- (define (decrement!)
- (set-channel-allow-send! channel
- (- (channel-allow-send channel) 1)))
- ;; It is important to check that a message can be sent before
- ;; send! is called, otherwise the message will be removed from
- ;; the message queue and be forgotten without being ever sent.
- ;;
- ;; Tested by ‘data is not sent before an acknowledgement’ -- it
catches
- ;; the mutation 'remove this line' (as a hang).
- (stop-if-exhausted)
- (define (send! envelope)
- (attempt-irrevocable-sent!
- envelope
- ((go message priority)
- ;; The mutation ‘don't call send-message!’ is caught by
- ;; ‘data is properly sent in response to acknowledgements,
in-order’
- ;; as a hang and an exception.
- ;;
- ;; The mutation 'swap send-message!' and 'decrement!' is
uncaught,
- ;; but theoretically harmless.
- (send-message! mq ; TODO: maybe get rid of the message queue
limit in (gnu gnunet mq)
- (construct-local-data
- (channel-channel-number channel) ; TODO:
multiple channels is untested
- 0 ;; TODO: relation between priority and
priority-preference?
- message)) ; TODO: sending the _right_ message is
untested
- ;; The mutation ‘don't call decrement!' is caught by
- ;; ‘data is properly sent in response to acknowledgements,
in-order’,
- ;; as a hang with an exception.
- (decrement!))
- ((cancelled) (values)) ; TODO: untested
- ((already-sent) (error "tried to send an envelope twice
(CADET)")))
- ;; Exit once nothing can be sent anymore (TODO check if
- ;; make-one-by-one-sender allows non-local exits).
- ;;
- ;; The mutation 'don't call it' is caught by
+ (define (continue)
+ (control loop channel-number->channel-map next-free-channel-number))
+ (define (continue* message)
+ (control* message loop channel-number->channel-map
+ next-free-channel-number))
+ ;; TODO: what about closed channels?
+ (define (send-channel-stuff! channel)
+ ;; Send messages one-by-one, keeping in mind that we might not be able
+ ;; to send all messages to the service at once, only
'channel-allow-send'
+ ;; messages can be sent and this decreases by sending messages.
+ ;;
+ ;; TODO: use priority information, somehow when cancelling a message
+ ;; cancel the corresponding message to be sent to the CADET service when
+ ;; there is still time, zero-copy networking.
+ (let/ec
+ stop
+ (define (stop-if-exhausted)
+ ;; The mutation 'replace > by >=' is caught by
+ ;; "data is not sent before an acknowledgement"
+ ;; in form of a hang.
+ (if (> (channel-allow-send channel) 0)
+ ;; (unless ...) and (when ...) can return *unspecified*,
+ ;; but (gnu gnunet mq) expects no return values. Detected
+ ;; by the "data is properly sent in response to
acknowledgements, in-order"
+ ;; test.
+ (values)
+ (stop)))
+ ;; Tested by ‘data is properly sent in response to acknowledgements,
in-order’
+ ;; -- it catches the mutation 'replace 1 by zero' (as a hang)
+ (define (decrement!)
+ (set-channel-allow-send! channel
+ (- (channel-allow-send channel) 1)))
+ ;; It is important to check that a message can be sent before
+ ;; send! is called, otherwise the message will be removed from
+ ;; the message queue and be forgotten without being ever sent.
+ ;;
+ ;; Tested by ‘data is not sent before an acknowledgement’ -- it catches
+ ;; the mutation 'remove this line' (as a hang).
+ (stop-if-exhausted)
+ (define (send! envelope)
+ (attempt-irrevocable-sent!
+ envelope
+ ((go message priority)
+ ;; The mutation ‘don't call send-message!’ is caught by
;; ‘data is properly sent in response to acknowledgements,
in-order’
- ;; as a hang and an exception?
+ ;; as a hang and an exception.
;;
- ;; The mutation 'duplicate it' is uncaught, but theoretically
harmless
- ;; albeit inefficient.
- (stop-if-exhausted))
- ((make-one-by-one-sender send!) (channel-message-queue channel)))
- (when (channel-desire-close? channel)
- (close-if-possible! channel)))
- (match message
- (('open-channel! channel)
- (let^ ((! channel-number next-free-channel-number)
- ;; TODO: handle overflow, and respect bounds
- (! next-free-channel-number (+ 1 next-free-channel-number))
- (_ (set-channel-channel-number! channel channel-number))
- ;; Keep track of the new <channel> object; it will be required
- ;; later by 'acknowledgement'.
- (! channel-number->channel-map
- (bbtree-set channel-number->channel-map channel-number
- channel)))
- (send-local-channel-create! mq channel)
- (control loop channel-number->channel-map
next-free-channel-number)))
- (('close-channel! channel)
- ;; 'close-channel!' can only be sent after the <channel> object
- ;; was returned by the procedure 'open-channel!', because only
- ;; then the channel becomes available. This procedure
(synchronuously)
- ;; sends a 'open-channel!' message and messages are processed by
- ;; the control loop in-order, so the channel has already been opened.
+ ;; The mutation 'swap send-message!' and 'decrement!' is uncaught,
+ ;; but theoretically harmless.
+ ;; TODO: maybe get rid of the message queue limit in (gnu gnunet
mq)
+ (send-message! message-queue
+ (construct-local-data
+ (channel-channel-number channel) ; TODO: multiple
channels is untested
+ 0 ;; TODO: relation between priority and
priority-preference?
+ message)) ; TODO: sending the _right_ message is
untested
+ ;; The mutation ‘don't call decrement!' is caught by
+ ;; ‘data is properly sent in response to acknowledgements,
in-order’,
+ ;; as a hang with an exception.
+ (decrement!))
+ ((cancelled) (values)) ; TODO: untested
+ ((already-sent) (error "tried to send an envelope twice (CADET)")))
+ ;; Exit once nothing can be sent anymore (TODO check if
+ ;; make-one-by-one-sender allows non-local exits).
;;
- ;; The only remaining states are: the channel is open / the channel
- ;; is closed.
- (let^ ((! channel-number (channel-channel-number channel))
- (? (channel-desire-close? channel)
- ;; It has already been requested to close to channel
- ;; (maybe it even has already been closed). This is fine,
- ;; as 'close-channel!' is idempotent. Nothing to do!
- ;; TODO: untested.
- (continue)))
- (set-channel-desire-close? channel #true)
- ;; This procedure will take care of actually closing the
channel
- ;; (if currently possible). If it's not currently possible
- ;; due to a lack of acknowledgements, then a future
'send-channel-stuff!'
- ;; (in response to an 'acknowledgement' message) will take
care of things.
- ;;
- ;; TODO: untested. TODO: untested in case of reconnects.
- (close-if-possible! channel)
- (continue)))
- (('resend-old-operations!)
- ;; TODO: no operations and no channels are implemented yet,
- ;; so for now nothing can be done.
- (continue))
- (('acknowledgement channel-number)
- ;; TODO: failure case
- (let^ ((! channel
- (bbtree-ref channel-number->channel-map channel-number)))
- ;; The service is allowing us to send another message;
- ;; update the number of allowed messages.
- (set-channel-allow-send!
- channel (+ 1 (channel-allow-send channel)))
- ;; Actually send some message, if there are any to send.
- (send-channel-stuff! channel)
- (continue)))
- (('send-channel-stuff! message-queue channel)
- ;; Tell the service to send the messages over CADET.
- (send-channel-stuff! channel)
- (continue))
- ;; Respond to a query of the msg:cadet:local:data message handler.
- (('channel answer-box channel-number)
- (answer answer-box
- (bbtree-ref channel-number->channel-map
- channel-number (lambda () #false)))
- (continue))
- (('lost . lost)
- (let loop ((lost lost))
- (match lost
- (() (continue))
- ((object . rest)
- (match object
- ((? channel? lost)
- TODO
- (loop rest))
- ((? server:cadet? lost)
- (continue* '(disconnect!))))))))
- (rest
- (handle-control-message!
- rest mq (loop:terminal-condition loop)
- (cut k/reconnect! channel-number->channel-map)))))
- ;; Start the main event loop.
- (control loop channel-number->channel-map %minimum-local-channel-id))
+ ;; The mutation 'don't call it' is caught by
+ ;; ‘data is properly sent in response to acknowledgements, in-order’
+ ;; as a hang and an exception?
+ ;;
+ ;; The mutation 'duplicate it' is uncaught, but theoretically
harmless
+ ;; albeit inefficient.
+ (stop-if-exhausted))
+ ((make-one-by-one-sender send!) (channel-message-queue channel)))
+ (when (channel-desire-close? channel)
+ (close-if-possible! channel)))
+ (match message
+ (('open-channel! channel)
+ (let^ ((! channel-number next-free-channel-number)
+ ;; TODO: handle overflow, and respect bounds
+ (! next-free-channel-number (+ 1 next-free-channel-number))
+ (_ (set-channel-channel-number! channel channel-number))
+ ;; Keep track of the new <channel> object; it will be required
+ ;; later by 'acknowledgement'.
+ (! channel-number->channel-map
+ (bbtree-set channel-number->channel-map channel-number
+ channel)))
+ (send-local-channel-create! message-queue channel)
+ (control loop channel-number->channel-map
next-free-channel-number)))
+ (('close-channel! channel)
+ ;; 'close-channel!' can only be sent after the <channel> object
+ ;; was returned by the procedure 'open-channel!', because only
+ ;; then the channel becomes available. This procedure (synchronuously)
+ ;; sends a 'open-channel!' message and messages are processed by
+ ;; the control loop in-order, so the channel has already been opened.
+ ;;
+ ;; The only remaining states are: the channel is open / the channel
+ ;; is closed.
+ (let^ ((! channel-number (channel-channel-number channel))
+ (? (channel-desire-close? channel)
+ ;; It has already been requested to close to channel
+ ;; (maybe it even has already been closed). This is fine,
+ ;; as 'close-channel!' is idempotent. Nothing to do!
+ ;; TODO: untested.
+ (continue)))
+ (set-channel-desire-close? channel #true)
+ ;; This procedure will take care of actually closing the channel
+ ;; (if currently possible). If it's not currently possible
+ ;; due to a lack of acknowledgements, then a future
'send-channel-stuff!'
+ ;; (in response to an 'acknowledgement' message) will take care
of things.
+ ;;
+ ;; TODO: untested. TODO: untested in case of reconnects.
+ (close-if-possible! channel)
+ (continue)))
+ (('resend-old-operations!)
+ ;; TODO: no operations and no channels are implemented yet,
+ ;; so for now nothing can be done.
+ (continue))
+ (('acknowledgement channel-number)
+ ;; TODO: failure case
+ (let^ ((! channel
+ (bbtree-ref channel-number->channel-map channel-number)))
+ ;; The service is allowing us to send another message;
+ ;; update the number of allowed messages.
+ (set-channel-allow-send!
+ channel (+ 1 (channel-allow-send channel)))
+ ;; Actually send some message, if there are any to send.
+ (send-channel-stuff! channel)
+ (continue)))
+ (('send-channel-stuff! message-queue channel)
+ ;; Tell the service to send the messages over CADET.
+ (send-channel-stuff! channel)
+ (continue))
+ ;; Respond to a query of the msg:cadet:local:data message handler.
+ (('channel answer-box channel-number)
+ (answer answer-box
+ (bbtree-ref channel-number->channel-map
+ channel-number (lambda () #false)))
+ (continue))
+ (('lost . lost)
+ (let loop ((lost lost))
+ (match lost
+ (() (continue))
+ ((object . rest)
+ (match object
+ ((? channel? lost)
+ TODO
+ (loop rest))
+ ((? server:cadet? lost)
+ (continue* '(disconnect!))))))))
+ (rest
+ (handle-control-message!
+ rest message-queue (loop:terminal-condition loop)
+ (cut k/reconnect! channel-number->channel-map)))))
(define-record-type (<cadet-address> make-cadet-address cadet-address?)
(fields (immutable peer cadet-address-peer)
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 28/42: server: Inline primitive-disconnect!., (continued)
- [gnunet-scheme] 28/42: server: Inline primitive-disconnect!., gnunet, 2022/09/10
- [gnunet-scheme] 15/42: server: New procedure for making the arguments to make-loop., gnunet, 2022/09/10
- [gnunet-scheme] 17/42: cadet/client: Use <loop> for various objects where possible., gnunet, 2022/09/10
- [gnunet-scheme] 27/42: cadet/client: Simplify more., gnunet, 2022/09/10
- [gnunet-scheme] 23/42: dht/client: Re-indent., gnunet, 2022/09/10
- [gnunet-scheme] 22/42: nse/indent: Re-indent., gnunet, 2022/09/10
- [gnunet-scheme] 26/42: server: Inline single-use server->loop-arguments., gnunet, 2022/09/10
- [gnunet-scheme] 24/42: cadet/client: Re-indent., gnunet, 2022/09/10
- [gnunet-scheme] 33/42: server: Document 'make-loop'., gnunet, 2022/09/10
- [gnunet-scheme] 30/42: doc/service-communication: Document the control loop., gnunet, 2022/09/10
- [gnunet-scheme] 18/42: cadet/client: Rewrite with run-loop.,
gnunet <=
- [gnunet-scheme] 32/42: server: Add type checking to make-loop., gnunet, 2022/09/10
- [gnunet-scheme] 36/42: doc/concurrency: Add missing label for lost-and-found., gnunet, 2022/09/10
- [gnunet-scheme] 39/42: server: Add type checking., gnunet, 2022/09/10
- [gnunet-scheme] 35/42: doc/service-communication: Document #:control-message-handler., gnunet, 2022/09/10
- [gnunet-scheme] 37/42: doc/service-communication: Document run-loop., gnunet, 2022/09/10
- [gnunet-scheme] 34/42: server: Rename control -> continue., gnunet, 2022/09/10
- [gnunet-scheme] 38/42: doc/service-communication: Add procedures to the index., gnunet, 2022/09/10
- [gnunet-scheme] 40/42: doc/service-communication: Add missing argument of make-disconnect!., gnunet, 2022/09/10
- [gnunet-scheme] 41/42: NEWS: Update., gnunet, 2022/09/10
- [gnunet-scheme] 42/42: Merge branch 'server-unification', gnunet, 2022/09/10