[Top][All Lists]

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: stis engine

From: Stefan Israelsson Tampe
Subject: Re: stis engine
Date: Wed, 25 Aug 2021 00:31:25 +0200

;; Oups I managed to send the message by accident without finishing it.
;; The server part is similar and I will drop the actual pipeline and highlight the differenece
;; which is that we will get a message stream it to an scheme object call server-.lambda below
;; with it and finally from that lambdas return value reply to the client we will create a loop where
;; the server waits for questions

(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)

Finally the idea is to use it as

(define context    (make-zmq-context))
(define address    "") ;; ZeroMQ address
(define ip-server? #t) ;; if we will use bind or connect

(make-server (lambda (x) (cons '() x)) context address ip-server?)

(define client (make-client context address ip-server?))
  ;; send a mesage that is not compressed
  > (client "abc")  

  > (length (client (iota 1000000) #:compress? #t))

in a run fibers context

On Wed, Aug 25, 2021 at 12:25 AM Stefan Israelsson Tampe <> wrote:
I have now made a fiber enables stream library called fpipe in my stis-engine repository see

The idea is to focus on a high performance byte streaming library on top of wingo's fibers library and make heavy use of bytevector buffers. We will also gp between a stream of scheme 
values and these byte streams to seamlessly be able to integrate a good overview of the data pipeline.

The following code uses a c-based serializer and deserializer of scheme data structures 
and allow for optional streamed compression and decompression and transport it over ZeroMQ
networking which allow for thread/process/computer movement of data. The end result is a way to create servers and clients.

It is instructive to show the code for the client and server pipelines is constructed tp show off the fpipes library. This is not the final design but moste components are done

Here is the client

(define* (make-client address ip-server? #key (context #f))
  ;; First we setup the zmq networking
  (define ctx    (if context context (make-zmq-context)))
  (define socket (zmq-socket context ZMQ_REQ))
  (if ip-server?
      (zmq-bind      socket address)
      (zmq-connect socket address))

  ;; we will define to fiber channels, channel in = ch1 and channel out = ch2
  (define-values (ch1 ch2)  

     ;; fpipe-construct is the general pipelining macro

      ;; this is a scheme condition that will match check a message bounded to it
      (#:scm it)
       ;; format of the matcher is (predicate . translatot) where if predicate is true we will  
       ;; push the message to the branching pipline this assumes a message is the form
       ;; ((list-of-features) . message)
      (((memq 'compress (car it)) #:tr (cdr it))      
       ;;  the c-based stremed serializer that integrates nicely with fibers and streams
       ;;  the message transport is the form scm->bytesteam


       ;; the zlib compressor node will tarnsport as bytestream->bytestream

       ;; a bytestream->bytestream that will prepend a message with 1 to indicate that the stream
       ;; has been compressed
       #:prepend #u8(1))
      ;;  if we do not have the compress feature then we will simply generate the stream and 
      ;; prepend a one e.g. not doing any compression 
       #:prepend #u8(0)))
     ;; transport the message byetstream over the zmq socket this will retrun in a scheme
      ;; stream where eof will survive as all control messages are and will initiate the next
      ;; reading from the socket (when the request message has been fully sent.
     (fpipe->zmq socket)
      ;; so here we get the return message
     (zmq->fpipe socket)
     ;; This is a bytestream cond and has no it part, 
       ;; We try to match the beginning of the bytestream message and if it starts with 1
       ;; then we know that the reply message has been compressed
      ((#:match u8(1) #:skip 1)
      ;; else no compression.
      ((else #:skip 1)

     ;; the final step is to take the bytestream and make a scheme object and put that
     ;; to the scheme stream and the pipe is finished
   ;; fpipe-scheme takes a piplend from scm to scm and creates a function of it.
   ;; each time the function is called with a scheme object we will send it ot the server
   ;; from the return message create a scheme object that is returned from the funciton  
  (define action (fpipe-schemer ch1 ch2))
  ;; A little nicer interface and we are finished
  (lambda* (message #:key (compress? #f))
     (action (cons (if compress? '(compress) '()) message))))  

(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)

reply via email to

[Prev in Thread] Current Thread [Next in Thread]