;; -*- lexical-binding: t -*- (require 'threads) (require 'eieio) (require 'cl-lib) (require 'ring) (defun debug (fmt &rest args) (princ (apply #'format fmt args) #'external-debugging-output) (terpri #'external-debugging-output)) (define-error 'thread-utils-thread-interrupted "Thread was interrupted" 'error) (defun thread-utils-main-thread-p (&optional object) (let ((object (or object (current-thread)))) (and (threadp object) (eq object (car (all-threads)))))) (defun thread-utils-quitable-apply (fn &rest args) (let* ((this-thread (current-thread)) (quit-thread (make-thread (lambda nil (condition-case nil (cl-loop (sleep-for 3)) (quit (thread-signal this-thread 'quit nil)) (thread-utils-thread-interrupted nil)))))) (unwind-protect (apply fn args) (thread-signal quit-thread 'thread-utils-thread-interrupted nil)))) (defun thread-utils-condition-quitable-wait (condition) (cl-check-type condition condition-variable) (thread-utils-quitable-apply #'condition-wait condition)) (defun thread-utils-condition-wait (condition) (if (thread-utils-main-thread-p) (thread-utils-condition-quitable-wait condition) (condition-wait condition))) (defconst channel-default-capacity 16) (defclass channel-terminal nil ((mutex :initarg :mutex :type mutex) (condition :initarg :condition :type condition-variable) (msg-queue :initarg :msg-queue :type ring) (closed-p :initform nil) (other-terminal :type (or null channel-terminal)))) (defclass channel-source (channel-terminal) nil) (defclass channel-sink (channel-terminal) nil) (define-error 'channel-closed "Trying to send/recv from a closed channel") (defun make-channel (&optional capacity) (unless capacity (setq capacity channel-default-capacity)) (cl-check-type capacity (integer 1 *)) (let* ((mutex (make-mutex "channel")) (condition (make-condition-variable mutex "channel")) (msg-queue (make-ring capacity)) (source (channel-source :mutex mutex :condition condition :msg-queue msg-queue)) (sink (channel-sink :mutex mutex :condition condition :msg-queue msg-queue))) (oset source other-terminal sink) (oset sink other-terminal source) (cons source sink))) (cl-defgeneric channel-send ((source channel-source) message) (with-mutex (oref source mutex) (with-slots (condition msg-queue) source (while (and (not (channel-closed-p source)) (= (ring-size msg-queue) (ring-length msg-queue))) (thread-utils-condition-wait condition)) (when (channel-closed-p source) (signal 'channel-closed (list source))) (let ((inhibit-quit t)) (ring-insert msg-queue message) (when (= 1 (ring-length msg-queue)) (condition-notify condition t))) nil))) (cl-defgeneric channel-recv ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (let ((inhibit-quit t)) (prog1 (ring-remove msg-queue) (when (= 1 (- (ring-size msg-queue) (ring-length msg-queue))) (condition-notify condition t))))))) (cl-defgeneric channel-peek ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (ring-ref msg-queue -1)))) (cl-defgeneric channel-close ((terminal channel-terminal)) (with-mutex (oref terminal mutex) (with-slots (closed-p condition) terminal (setq closed-p t) (condition-notify condition t)) nil)) (cl-defmethod channel-closed-p ((source channel-source)) (with-mutex (oref source mutex) (with-slots (closed-p other-terminal) source (or closed-p (oref other-terminal closed-p))))) (cl-defmethod channel-closed-p ((sink channel-sink)) (with-mutex (oref sink mutex) (with-slots (closed-p other-terminal msg-queue) sink (or closed-p (and (oref other-terminal closed-p) (ring-empty-p msg-queue)))))) (defclass future nil ((channel :initform (make-channel 1)))) (defun make-future () (make-instance 'future)) (cl-defgeneric future-set ((future future) value) (with-slots (channel) future (let ((inhibit-quit t)) (condition-case nil (progn (debug "Sending future") (channel-send (car channel) value) (debug "Future send")) (channel-closed (signal 'error (list future)))) (debug "Closing future") (channel-close (car channel)) (debug "Future closed")))) (cl-defgeneric future-get ((future future)) (with-slots (channel) future (debug "Getting future") (channel-peek (cdr channel)) (debug "Future got"))) (defclass future-deferred (future) ((producer :initarg :producer :type function) (value-produced-p :initform nil) (mutex :initform (make-mutex "future-deferred")))) (defun make-deferred-future (producer) (make-instance 'future-deferred :producer producer)) (cl-defmethod future-get :before ((future future-deferred)) (with-slots (mutex value-produced-p producer) future (with-mutex mutex (unless value-produced-p (unwind-protect (make-thread (lambda nil (debug "Setting Future") (future-set future (funcall producer)) (debug "Future set")) "XEmacs") (setq value-produced-p t)))))) (let ((future (make-deferred-future (lambda nil 42)))) (future-get future))