Skip to content

Commit d4c09e1

Browse files
committed
feat: introduce tapestry.queue
Introduce `tapestry.queue`. Note that this requires `--enable-preview` for structured concurrency.
1 parent d524099 commit d4c09e1

File tree

5 files changed

+178
-1
lines changed

5 files changed

+178
-1
lines changed

.dir-locals.el

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
;;; Directory Local Variables
1+
;;; Directory Local Variables -*- no-byte-compile: t -*-
22
;;; For more information see (info "(emacs) Directory Variables")
33

44
((clojure-mode . ((cider-clojure-cli-aliases . "test"))))

deps.edn

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
:aliases
1111
{:test
1212
{:extra-paths ["test"]
13+
:jvm-opts ["--enable-preview"]
1314
:main-opts ["-m" "kaocha.runner"]
1415
:extra-deps
1516
{org.clojure/test.check {:mvn/version "1.1.1"}

src/tapestry/experimental.clj

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
(ns tapestry.experimental
2+
(:import [java.util.concurrent StructuredTaskScope$ShutdownOnSuccess]))
3+
4+
5+
(defmacro alts
6+
"Execute the given `exprs` in parallel and return the result of the first one to return,
7+
shutting down all others with an interrupted exception."
8+
[& exprs]
9+
(let [scope (gensym "scope")]
10+
`(with-open [~scope (StructuredTaskScope$ShutdownOnSuccess.)]
11+
~@(for [expr exprs]
12+
`(.fork ~scope (fn [] ~expr)))
13+
(.join ~scope)
14+
(.result ~scope #(throw %)))))

src/tapestry/queue.clj

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
(ns tapestry.queue
2+
(:require [tapestry.experimental :refer [alts]])
3+
(:import [java.util.concurrent
4+
CompletableFuture
5+
SynchronousQueue ArrayBlockingQueue BlockingQueue LinkedBlockingQueue
6+
TimeUnit]))
7+
8+
(deftype Queue [^BlockingQueue q ^CompletableFuture closed?*])
9+
10+
(defmethod print-method Queue [^Queue q ^java.io.Writer w]
11+
(let [bq ^BlockingQueue (.-q q)]
12+
(.write w
13+
(str "#queue"
14+
{:capacity (+ (.remainingCapacity bq)
15+
(.size bq))
16+
:items (into [] bq)
17+
:closed? (.isDone ^CompletableFuture (.-closed?* q))}))))
18+
19+
(defn queue
20+
"Create a new queue with an optional `capacity`.
21+
22+
If no capacity is specified it's a synchrounous queue.
23+
24+
Passing `:unbounded` will create an unbounded queue holding at most Integer/MAX_VALUE items.."
25+
([] (queue nil))
26+
([capacity]
27+
(let [q (case capacity
28+
nil (SynchronousQueue. true)
29+
:unbounded (LinkedBlockingQueue.)
30+
(ArrayBlockingQueue. capacity true))]
31+
(Queue. q (CompletableFuture.)))))
32+
33+
(defn queue?
34+
"Return whether the provided `obj` is a queue"
35+
[obj]
36+
(instance? Queue obj))
37+
38+
(defn closed?
39+
"Return whether the provided queue is closed"
40+
[^Queue q]
41+
(.isDone (.-closed?* q)))
42+
43+
(defn await-close
44+
"Block until `q` closes"
45+
[^Queue q]
46+
@(.-closed?* q))
47+
48+
(defn put!
49+
"Place an `item` in the `q`, potentially blocking until space is available.
50+
51+
Return `true` if the item is queues, or `false` if it is closed"
52+
([^Queue q item]
53+
(if (closed? q)
54+
false
55+
(alts
56+
(do (await-close q)
57+
false)
58+
(do (.put ^BlockingQueue (.-q q) item)
59+
true)))))
60+
61+
(defn try-put!
62+
"Place an `item` in the `q`, waiting at most `timeout-ms`.
63+
64+
Return `true` if the queue successfully takes the `item`, or `false`` if it is closed / closes
65+
before the item is accepted.
66+
67+
Takes an optional `timeout-val` to return in the case of timeout"
68+
([^Queue q item]
69+
(try-put! q item 0 false))
70+
([^Queue q item timeout-ms]
71+
(try-put! q item timeout-ms false))
72+
([^Queue q item timeout-ms timeout-val]
73+
(if (closed? q)
74+
false
75+
(alts
76+
(do (await-close q)
77+
false)
78+
(or (.offer ^BlockingQueue (.-q q) item timeout-ms TimeUnit/MILLISECONDS)
79+
timeout-val)))))
80+
81+
(defn take!
82+
"Take from the `q`, blocking until an item is available. Returns `nil` if the queue is closed."
83+
[^Queue q]
84+
(if-some [item (.poll ^BlockingQueue (.-q q) 0 TimeUnit/NANOSECONDS)]
85+
item
86+
(alts
87+
(do (await-close q)
88+
nil)
89+
(.take ^BlockingQueue (.-q q)))))
90+
91+
(defn try-take!
92+
"Try to take from the `q`, waiting at most `timeout-ms` milliseconds before returning
93+
`timeout-val`.
94+
95+
Returns `nil` if the `q` is closed."
96+
([q] (try-take! q 0 nil))
97+
([q timeout-ms] (try-take! q timeout-ms nil))
98+
([^Queue q timeout-ms timeout-val]
99+
(if-some [x (.poll ^BlockingQueue (.-q q) 0 TimeUnit/MILLISECONDS)]
100+
x
101+
(if (closed? q)
102+
nil
103+
(alts
104+
(do (await-close q)
105+
nil)
106+
(if-some [x (.poll ^BlockingQueue (.-q q) timeout-ms TimeUnit/MILLISECONDS)]
107+
x
108+
timeout-val))))))
109+
110+
(defn close!
111+
"Close the `q`.
112+
113+
Takes will return items until the `q` has been drained."
114+
[^Queue q]
115+
(.complete ^CompletableFuture (.-closed?* q) true)
116+
true)

test/tapestry/queue_test.clj

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
(ns tapestry.queue-test
2+
(:require [tapestry.queue :as sut]
3+
[tapestry.core :refer [fiber alive?]]
4+
[clojure.test :refer [deftest testing is]]))
5+
6+
(deftest queue--sync-queue-test
7+
(let [q (sut/queue)
8+
put* (fiber (sut/put! q 1))]
9+
(is (sut/queue? q))
10+
(Thread/sleep 10)
11+
(is (alive? put*))
12+
(is (= 1 (sut/take! q)))
13+
(is @put*)))
14+
15+
(deftest queue--array-queue-test
16+
(let [q (sut/queue 2)]
17+
(is (sut/try-put! q 1))
18+
(is (sut/try-put! q 2))
19+
(is (not (sut/try-put! q 3)))
20+
(is (= 1 (sut/try-take! q)))
21+
(is (= 2 (sut/try-take! q)))))
22+
23+
(deftest queue--linked-queue-test
24+
(let [q (sut/queue :unbounded)]
25+
(is (sut/try-put! q 1))
26+
(is (sut/try-put! q 2))
27+
(is (sut/try-put! q 3))
28+
(is (= 1 (sut/try-take! q)))
29+
(is (= 2 (sut/try-take! q)))
30+
(is (= 3 (sut/try-take! q)))))
31+
32+
33+
(deftest queue--try-ops
34+
(testing "try-take!"
35+
(let [q (sut/queue)]
36+
(is (nil? (sut/try-take! q)))
37+
(is (false? (sut/try-take! q 1 false)))
38+
(sut/close! q)
39+
(is (nil? (sut/try-take! q 0 false)))))
40+
(testing "try-put!"
41+
(let [q (sut/queue)]
42+
(is (false? (sut/try-put! q :val)))
43+
(is (false? (sut/try-put! q :val 0)))
44+
(is (= :timeout (sut/try-put! q :val 0 :timeout)))
45+
(sut/close! q)
46+
(is (false? (sut/try-put! q :val 0 :timeout))))))

0 commit comments

Comments
 (0)