ice-9 async-queue

From: Andy Wingo
Subject: ice-9 async-queue
Date: Mon, 06 Feb 2012 18:00:53 +0100
User-agent: Gnus/5.13 (Gnus v5.13) Emacs/23.3 (gnu/linux)

Hi all,

I was thinking of adding the following to Guile, to eventually help make
the web server a little less terrible.  What do you think?  I haven't
tested it properly yet.  


;;; Asynchronous queues

;; Copyright (C)  2012 Free Software Foundation, Inc.

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; Lesser General Public License for more details.
;; You should have received a copy of the GNU Lesser General Public
;; License along with this library; if not, write to the Free Software
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
;; 02110-1301 USA

;;; Commentary:
;;; An implementation of thread-safe asynchronous queues, with both
;;; blocking and nonblocking interfaces.
;;; Code:

(define-module (ice-9 async-queue)
  #:use-module (srfi srfi-9)
  #:use-module (srfi srfi-9 gnu)
  #:use-module (ice-9 format)
  #:use-module (ice-9 threads)
  #:export (make-async-queue
            async-queue-length async-queue-capacity
            async-queue-pop! async-queue-try-pop!))

;; One thing that we should be careful about is to avoid exposing
;; information about the way this queue is implemented.
;; Currently we use an array, but it's easy to imagine a functional
;; implementation facilitated by compare-and-swap operations, with
;; perhaps the option to disable the blocking interfaces (and thereby
;; remove the need for the mutex and cond var).

(define-record-type <async-queue>
  (make-aq mutex condvar buf length capacity read-idx)
  (mutex aq-mutex)
  (condvar aq-condvar)
  (buf aq-buf)
  (capacity aq-capacity)
  (length aq-length set-aq-length!)
  (read-idx aq-read-idx set-aq-read-idx!))

 (lambda (aq port)
   (format port "<async-queue ~x ~a/~a>" (object-address aq)
           (aq-length aq) (aq-capacity aq))))

(define (aq-inc! aq)
  (set-aq-length! aq (1+ (aq-length aq)))
  (signal-condition-variable (aq-condvar aq)))

(define (aq-dec! aq)
  (set-aq-length! aq (1- (aq-length aq)))
  (signal-condition-variable (aq-condvar aq)))

(define (aq-idx aq idx)
  (modulo idx (aq-capacity aq)))

(define (aq-wait aq time)
  (if time
      (wait-condition-variable (aq-condvar aq) (aq-mutex aq) time)
      (wait-condition-variable (aq-condvar aq) (aq-mutex aq))))

(define* (make-async-queue #:key (capacity 10))
  (make-aq (make-mutex)
           (make-vector capacity #f)

(define (async-queue-length aq)
  (with-mutex (aq-mutex aq)
    (aq-length aq)))

(define (async-queue-capacity aq)
  (aq-capacity aq))

(define* (async-queue-push! aq item #:optional time)
  (with-mutex (aq-mutex aq)
    (let lp ()
      (if (< (aq-length aq) (aq-capacity aq))
          (let ((idx (aq-idx aq (+ (aq-read-idx aq) (aq-length aq)))))
            (vector-set! (aq-buf aq) idx item)
            (aq-inc! aq)
          (and (aq-wait aq time) (lp))))))

(define* (async-queue-pop! aq #:optional time)
  (with-mutex (aq-mutex aq)
    (let lp ()
      (if (zero? (aq-length aq))
          (if (aq-wait aq time)
              (values #f #f))
          (let* ((idx (aq-read-idx aq))
                 (item (vector-ref (aq-buf aq) idx)))
            (vector-set! (aq-buf aq) idx #f)
            (set-aq-read-idx! aq (aq-idx aq (1+ idx)))
            (aq-dec! aq)
            (values item #t))))))

(define* (async-queue-try-pop! aq)
  (with-mutex (aq-mutex aq)
    (if (zero? (aq-length aq))
        (values #f #f)
        (let* ((idx (aq-read-idx aq))
               (item (vector-ref (aq-buf aq) idx)))
          (vector-set! (aq-buf aq) idx #f)
          (set-aq-read-idx! aq (aq-idx aq (1+ idx)))
          (aq-dec! aq)
          (values item #t)))))


