guile-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Anouncement stis-engine 0.1


From: Stefan Israelsson Tampe
Subject: Anouncement stis-engine 0.1
Date: Tue, 31 Aug 2021 02:08:54 +0200

Hi guilers!

I am now happy to announce the first version 0.1 stis-engine. With this tool you can manage data streams with quite good throughput.  

Example:
Consider a case with streaming objects over the network and allow to have a prefix of the 
stream where  you control what kind of serializer are used and if streamed zipping is done.
Below is the server and client implementation in module/fibers/stis-parser/q.scm. 1 400MB big
bytevector transports from the client to the server and back with about one second. I did the transport over a zmq  thread communication link. We reach quite high throughput as we only do logic in the header of the stream so we can copy the buffers themselves when we can instead
of copying the bytes.

The sizes of the buffers is quite large as to make sure that the fiber overhead is not too large
The main constraint is from copying bytes, just the serializer and deserialiser used to copy an object would be 10x the speed. But this is for simple data structures like bytevectors. Already a list of numbers instead of bytevector the serialisation and deserialisation starts to dominate as bytestream operations is essentially memmove memcpy and those are insanely optimized.

Some other examples are audio and image streams. you can glue stream operations together
in guile and have close to C speed for large amount of data.

Here are the code:
;;We can make an abstractions as such:
(define-fpipe-construct #:scm (zmq->atom socket)
    (zmq->fpipe socket)                      ; read from the network to a byte stream
    (let ((it)                                           ; pick up the first byte (it1 ...) means the bytes prefix 
            (opt?  (= 1 (logand it 1)))
            (text? (= 2 (logand it 2))))

        (fpipe-skip 1)                              ; skip the prefix

        (when opt?
            (uncompress-from-fpipe-to-fpipe))   ; id the stream was compressed decompress

       (if text?                                                  ; if text the transport is in cleartext e.g. scheme
            (fpipe->scm)
            (fpipe->c-atom))))


(define-fpipe-construct #:scm (atom->zmq socket)
     (let ( it                                                     ;; This picks a scheme object (header . payload)
           (header (car it))
           (opt?   (memq 'compress header))
           (text?  (memq 'text     header))
           (tag     (logior (if opt? 1 0) (if text? 2 0))))

       (if text?
           (scm->fpipe)
           (c-atom->fpipe))
   
       (when opt?
           (compress-from-fpipe-to-fpipe #:level 3))
 
       (fpipe-prepend tag)                       ; prepend the stream with the tag ...

       (fpipe->zmq socket)))


;;Now we can make a server and a client out like so,

(define* (make-client address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REQ))
 
  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))
 
  ;; here we construct the scheme pipeline using the abstraction  
  (define-values (ch1 ch2)  
    (fpipe-construct
     (atom->zmq socket)
     (zmq->atom socket)))
     
  (define action (fpipe-schemer ch1 ch2))
  (lambda* (message #:key (compress? #f))
    (cdr (action (cons (if compress? '(compress) '()) message)))))

;; the server:
(define* (run-server server-lambda address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REP))
  (define (lam x)
    (call-with-values server-lambda
      (lambda* (message #:key (compress? #f))
         (cons (if compress? '(compress) '()) message))))

  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))
 
  ;; the pipeline using the abstractions  
  (define-values (ch1 ch2)  
      (fpipe-construct
          (zmq->atom socket)
          (fpipe-map server-lambda)
          (atom->zmq socket)))

  (define schemer (fpipe-schemer ch1 ch2))
 
  (spawn-fiber
   (lambda ()
       (let lp ()
           (schemer %fpipe-eof%)
           (lp)))
      #:parallel? #f))


reply via email to

[Prev in Thread] Current Thread [Next in Thread]