This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn pipe | |
"Returns a pair: a seq (the read end) and a function (the write end). | |
The function can takes either no arguments to close the pipe | |
or one argument which is appended to the seq. Read is blocking." | |
[] | |
(let [promises (atom (repeatedly promise)) | |
p (second @promises)] | |
[(lazy-seq @p) | |
(fn | |
([] ;close the pipe | |
(let [[a] (swap! promises #(vector (second %)))] | |
(if a | |
(deliver a nil) | |
(throw (Exception. "Pipe already closed"))))) | |
([x] ;enqueue x | |
(let [[a b] (swap! promises next)] | |
(if (and a b) | |
(do | |
(deliver a (cons x (lazy-seq @b))) | |
x) | |
(throw (Exception. "Pipe already closed"))))))])) | |
;;Beware of not printing the seq while the pipe is still open! | |
(let [[q f] (pipe)] | |
(future (doseq [x q] (println x)) (println "that's all folks!")) | |
(doseq [x (range 10)] | |
(f x)) | |
(f)) ;close the pipe |
Thanks to Are pipe dreams made of promises? for writing it and Clojure - From Callbacks to Sequences for pointing it out to me. EDIT: While still a good idea, futures and promises are not ClojureScript approved.
Another EDIT: See core.async for the right way to do this.
No comments:
Post a Comment