Skip to content

Commit 0757a40

Browse files
authored
Merge pull request #200 from tanzoniteblack/tsasvla
Add `go-off` macro as alternate way to work with deferreds
2 parents f32f4e9 + e66d41a commit 0757a40

File tree

5 files changed

+335
-5
lines changed

5 files changed

+335
-5
lines changed

docs/deferred.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,27 @@ In this example, `c` is declared within a normal `let` binding, and as such we c
166166

167167
It can be helpful to think of `let-flow` as similar to Prismatic's [Graph](https://github.com/prismatic/plumbing#graph-the-functional-swiss-army-knife) library, except that the dependencies between values are inferred from the code, rather than explicitly specified. Comparisons to core.async's goroutines are less accurate, since `let-flow` allows for concurrent execution of independent paths within the bindings, whereas operations within a goroutine are inherently sequential.
168168

169+
### manifold.go-off
170+
171+
An alternate way to write code using deferreds is the macro `manifold.go-off/go-off`. This macro is an almost exact mirror of the `go` macro from [clojure/core.async](https://github.com/clojure/core.async), to the point where it actually utilizes the state machine functionality from core.async. In order to use this macro, `core.async` must be a dependency provided by the user. The main difference between `go` and `go-off`, besides go-off working with deferrables instead of core.async channels, is the `take` function being `<!?` instead of `<!`. The difference in function names is used to indicate exceptions behave the same as a non-async clojure block (i.e. are thrown) instead of silently swallowed & returning `nil`.
172+
173+
The benefit of this macro over `let-flow` is that it gives complete control of when deferreds should be realized to the user of the macro, removing any potential surprises (especially around timeouts).
174+
175+
```clj
176+
@(go-off (+ (<!? (d/future 10))
177+
(<!? (d/future 20)))) ;; ==> 30
178+
```
179+
180+
```clj
181+
(<!! (core.async/go (try (<! (go (/ 5 0)))
182+
(catch Exception e
183+
"ERROR")))) ; ==> nil
184+
185+
@(go-off (try (<!? (d/future (/ 5 0)))
186+
(catch Exception e
187+
"ERROR"))) ; ==> "ERROR"
188+
```
189+
169190
### `manifold.deferred/loop`
170191

171192
Manifold also provides a `loop` macro, which allows for asynchronous loops to be defined. Consider `manifold.stream/consume`, which allows a function to be invoked with each new message from a stream. We can implement similar behavior like so:

project.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
:dependencies [[org.clojure/clojure "1.10.3" :scope "provided"]
88
[org.clojure/tools.logging "1.1.0" :exclusions [org.clojure/clojure]]
99
[io.aleph/dirigiste "1.0.0"]
10-
[riddley "0.1.15"]]
11-
:profiles {:dev {:dependencies [[criterium "0.4.6"]
12-
[org.clojure/core.async "1.3.618"]]}}
10+
[riddley "0.1.15"]
11+
[org.clojure/core.async "1.4.627" :scope "provided"]]
12+
:profiles {:dev {:dependencies [[criterium "0.4.6"]]}}
1313
:test-selectors {:default #(not
1414
(some #{:benchmark :stress}
1515
(cons (:tag %) (keys %))))

src/manifold/go_off.clj

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
(ns ^{:author "Ryan Smith"
2+
:doc "Provide a variant of `core.async/go` that works with manifold's deferreds and executors. Utilizes core.async's state-machine generator, so core.async must be provided as a dependency."}
3+
manifold.go-off
4+
(:require [manifold
5+
[executor :as ex]
6+
[deferred :as d]]
7+
[clojure.core.async.impl
8+
[ioc-macros :as ioc]]
9+
[manifold.stream :as s])
10+
(:import (java.util.concurrent Executor)
11+
(manifold.stream.core IEventSource)))
12+
13+
(defn return-deferred [state value]
14+
(let [d (ioc/aget-object state ioc/USER-START-IDX)]
15+
(d/success! d value)
16+
d))
17+
18+
(defn <!
19+
"Takes value from a deferred/stream. Must be called inside a (go ...) block. Will
20+
return nil if a stream is closed. Will park if nothing is available. If an error
21+
is thrown inside the body, that error will be placed as the return value.
22+
23+
N.B. To make `go-off` usage idiomatic with the rest of manifold, use `<!?`
24+
instead."
25+
[port]
26+
(assert nil "<! used not in (go-off ...) block"))
27+
28+
(defmacro <!?
29+
"Takes a val from a deferred/stream. Must be called inside a (go-off ...) block.
30+
Will park if nothing is available. If value that is returned is a Throwable,
31+
it will re-throw."
32+
[port]
33+
`(let [r# (<! ~port)]
34+
(if (instance? Throwable r#)
35+
;; this is a re-throw of the original throwable. the expectation is that
36+
;; it still will maintain the original stack trace
37+
(throw r#)
38+
r#)))
39+
40+
(defn run-state-machine-wrapped [state]
41+
(try (ioc/run-state-machine state)
42+
(catch Throwable ex
43+
(d/error! (ioc/aget-object state ioc/USER-START-IDX) ex)
44+
(throw ex))))
45+
46+
(defn take! [state blk d]
47+
(let [handler (fn [x]
48+
(ioc/aset-all! state ioc/VALUE-IDX x ioc/STATE-IDX blk)
49+
(run-state-machine-wrapped state))
50+
;; if `d` is a stream, use `take` to get a deferrable that we can wait on
51+
d (if (instance? IEventSource d) (s/take! d) d)
52+
d-is-deferrable? (d/deferrable? d)]
53+
(if
54+
;; if d is not deferrable immediately resume processing state machine
55+
(not d-is-deferrable?)
56+
(do (ioc/aset-all! state ioc/VALUE-IDX d ioc/STATE-IDX blk)
57+
:recur)
58+
(let [d (d/->deferred d)]
59+
(if
60+
;; if already realized, deref value and immediately resume processing state machine
61+
(d/realized? d)
62+
(do (ioc/aset-all! state ioc/VALUE-IDX @d ioc/STATE-IDX blk)
63+
:recur)
64+
65+
;; resume processing state machine once d has been realized
66+
(do (-> d
67+
(d/chain handler)
68+
(d/catch handler))
69+
nil))))))
70+
71+
(def async-custom-terminators
72+
{'manifold.go-off/<! `manifold.go-off/take!
73+
:Return `return-deferred})
74+
75+
(defmacro go-off-executor
76+
"Implementation of go-off that allows specifying executor. See docstring of go-off for usage."
77+
[executor & body]
78+
(let [executor (vary-meta executor assoc :tag 'Executor)
79+
crossing-env (zipmap (keys &env) (repeatedly gensym))]
80+
`(let [d# (d/deferred)
81+
captured-bindings# (clojure.lang.Var/getThreadBindingFrame)]
82+
(.execute ~executor ^Runnable
83+
(^:once fn* []
84+
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
85+
f# ~(ioc/state-machine `(do ~@body) 1 [crossing-env &env] async-custom-terminators)
86+
state# (-> (f#)
87+
(ioc/aset-all! ioc/USER-START-IDX d#
88+
ioc/BINDINGS-IDX captured-bindings#))]
89+
(run-state-machine-wrapped state#))))
90+
;; chain is8 being used to apply unwrap chain
91+
(d/chain d#)))
92+
)
93+
94+
(defmacro go-off
95+
"Asynchronously executes the body on manifold's default executor, returning
96+
immediately to the calling thread. Additionally, any visible calls to <!?
97+
and <! deferred operations within the body will block (if necessary)
98+
by 'parking' the calling thread rather than tying up an OS thread.
99+
Upon completion of the operation, the body will be resumed.
100+
101+
Returns a deferred which will receive the result of the body when
102+
completed. If the body returns a deferred, the result will be unwrapped
103+
until a non-deferrable value is available to be placed onto the return deferred.
104+
105+
This method is intended to be similar to `core.async/go`, and even utilizes the
106+
underlying state machine-related functions from `core.async`. It's been designed
107+
to address the following major points from core.async & vanilla manifold deferreds:
108+
109+
- `core.async/go` assumes that all of your code is able to be purely async
110+
and will never block the handling threads. go-off removes the concept of handling
111+
threads, which means blocking is not an issue, but if you spawn too many of these you
112+
can create too many threads for the OS to handle.
113+
- `core.async/go` has no built-in way of handling exceptions and assumes all async
114+
code will be either written defensively, or have custom error propagation, which
115+
differs from how clojure code blocks typically work outside of the async world.
116+
- `deferred/let-flow` presumes that every deferrable needs to be resolved. This prevents
117+
more complex handling of parallelism or being able to pass deferreds into other functions
118+
from within the `let-flow` block.
119+
- `deferred/chain` only works with single deferreds, which means having to write code in
120+
unnatural ways to handle multiple deferreds."
121+
[& body]
122+
`(go-off-executor (ex/execute-pool) ~@body))
123+
124+
125+
(comment
126+
(go-off "cat")
127+
128+
@(go-off (+ (<!? (d/future 10))
129+
(<!? (d/future 20)))) ;; ==> 30
130+
131+
)

src/manifold/utils.clj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,11 @@
109109

110110
;;;
111111

112-
(defmacro when-core-async [& body]
112+
(defmacro when-core-async
113+
"Suitable for altering behavior (like extending protocols), but not defs"
114+
[& body]
113115
(when (try
114-
(require '[clojure.core.async :as a])
116+
(require '[clojure.core.async])
115117
true
116118
(catch Exception _
117119
false))

test/manifold/go_off_test.clj

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
(ns manifold.go-off-test
2+
(:require [clojure.test :refer :all]
3+
[manifold.go-off :refer [go-off <!? go-off-executor]]
4+
[manifold.deferred :as d]
5+
[manifold.test-utils :refer :all]
6+
[manifold.executor :as ex]
7+
[clojure.string :as str]
8+
[manifold.stream :as s])
9+
(:import (java.util.concurrent TimeoutException Executor)))
10+
11+
(deftest async-test
12+
(testing "values are returned correctly"
13+
(is (= 10
14+
@(go-off (<!? (d/success-deferred 10))))))
15+
16+
(testing "case with go-off"
17+
(is (= :1
18+
@(go-off (case (name :1)
19+
"0" :0
20+
"1" :1
21+
:3)))))
22+
23+
(testing "nil result of go-off"
24+
(is (= nil
25+
@(go-off nil))))
26+
27+
(testing "take inside binding of loop"
28+
(is (= 42
29+
@(go-off (loop [x (<!? (d/success-deferred 42))]
30+
x)))))
31+
32+
(testing "can get from a catch"
33+
(let [c (d/success-deferred 42)]
34+
(is (= 42
35+
@(go-off (try
36+
(assert false)
37+
(catch Throwable ex (<!? c)))))))))
38+
39+
(deftest enqueued-chan-ops
40+
(testing "enqueued channel takes re-enter async properly"
41+
(is (= :foo
42+
(let [d (d/deferred)
43+
async-chan (go-off (<!? d))]
44+
(d/success! d :foo)
45+
@async-chan)))
46+
47+
(is (= 3
48+
(let [d1 (d/deferred)
49+
d2 (d/deferred)
50+
d3 (d/deferred)
51+
async-chan (go-off (+ (<!? d1) (<!? d2) (<!? d3)))]
52+
(d/success! d3 1)
53+
(d/success! d2 1)
54+
(d/success! d1 1)
55+
@async-chan)))))
56+
57+
(deftest go-off-nests
58+
(testing "return deferred will always result in a a realizable value, not another deferred"
59+
(is (= [23 42] @(go-off (let [let* 1 a 23] (go-off (let* [b 42] [a b]))))))
60+
(is (= 5 @(go-off (go-off (go-off (go-off (go-off (go-off (go-off 5))))))))))
61+
(testing "Parking unwraps nested deferreds"
62+
(is (= 5 @(go-off (<!? (go-off (go-off (go-off 5)))))))))
63+
64+
(deftest error-propagation
65+
(is (= "chained catch"
66+
@(d/catch (go-off (/ 5 0))
67+
(constantly "chained catch"))))
68+
69+
(is (= "try/catch in block"
70+
@(go-off (try (/ 5 0)
71+
(catch Throwable _ "try/catch in block")))))
72+
73+
(testing "Try/catch around parking will continue block"
74+
(is (= "try/catch parking"
75+
@(go-off (try (<!? (d/future (/ 5 0)))
76+
(catch Throwable _ "try/catch parking")))))
77+
(is (= 5
78+
@(go-off (try (<!? (d/future (/ 5 0)))
79+
(catch Throwable _))
80+
5))))
81+
82+
(testing "Normal deferred handling still works"
83+
(is (= 5
84+
@(go-off (<!? (d/catch (d/future (/ 5 0)) (constantly 5))))))))
85+
86+
(deftest non-deferred-takes
87+
(testing "Can take from non-deferreds"
88+
(is (= 5 @(go-off (<!? 5))))
89+
(is (= "test" @(go-off (<!? "test"))))))
90+
91+
(deftest already-realized-values
92+
(testing "When taking from already realized values, the threads should not change."
93+
(let [original-thread (atom nil)]
94+
(is (= @(go-off (reset! original-thread (Thread/currentThread))
95+
(<!? "cat")
96+
(Thread/currentThread))
97+
@original-thread)))
98+
99+
(let [original-thread (atom nil)]
100+
(is (= @(go-off (reset! original-thread (Thread/currentThread))
101+
(<!? (d/success-deferred "cat"))
102+
(Thread/currentThread))
103+
@original-thread)))
104+
105+
(testing "Taking from already realized value doesn't cause remaining body to run twice"
106+
(let [blow-up-counter (atom 0)
107+
blow-up-fn (fn [& _] (is (= 1 (swap! blow-up-counter inc))))]
108+
@(go-off (<!? "cat")
109+
(blow-up-fn))))
110+
;; Sleep is here to make sure that the secondary invocation of `blow-up-fn` that was happening has
111+
;; had time to report it's failure before the test finishes
112+
(Thread/sleep 500)))
113+
114+
(deftest deferred-interactions
115+
(testing "timeouts"
116+
(is (= ::timeout @(go-off (<!? (d/timeout! (d/deferred) 10 ::timeout)))))
117+
(is (= ::timeout @(d/timeout! (go-off (<!? (d/deferred))) 10 ::timeout)))
118+
(is (thrown? TimeoutException @(go-off (<!? (d/timeout! (d/deferred) 10)))))
119+
(is (thrown? TimeoutException @(d/timeout! (go-off (<!? (d/deferred))) 10))))
120+
121+
(testing "alt"
122+
(is (= ::timeout @(go-off (<!? (d/alt (d/deferred) (d/timeout! (d/deferred) 10 ::timeout))))))
123+
(is (= ::timeout @(d/alt (go-off (<!? (d/deferred))) (d/timeout! (d/deferred) 10 ::timeout))))
124+
(is (= 1 @(go-off (<!? (d/alt (d/deferred) (d/success-deferred 1))))))
125+
(is (= 1 @(d/alt (go-off (<!? (d/deferred))) (d/success-deferred 1))))))
126+
127+
(deftest go-off-specify-executor-pool
128+
(let [prefix "go-off-custom-executor"
129+
cnt (atom 0)
130+
custom-executor (ex/utilization-executor 0.95 Integer/MAX_VALUE
131+
{:thread-factory (ex/thread-factory
132+
#(str prefix (swap! cnt inc))
133+
(deliver (promise) nil))
134+
:stats-callback (constantly nil)})]
135+
(try (is (str/starts-with? @(go-off-executor custom-executor (.getName (Thread/currentThread))) prefix)
136+
"Running on custom executor, thread naming should be respected.")
137+
@(go-off-executor custom-executor (.getName (Thread/currentThread)))
138+
(finally (.shutdown custom-executor)))))
139+
140+
(deftest go-off-streams
141+
(let [test-stream (s/stream)
142+
test-d (go-off [(<!? test-stream)
143+
(<!? test-stream)
144+
(<!? test-stream)
145+
(<!? test-stream)
146+
(<!? test-stream)])]
147+
(dotimes [n 3]
148+
(s/put! test-stream n))
149+
(s/close! test-stream)
150+
(is (= @test-d [0 1 2 nil nil]))))
151+
152+
(deftest ^:benchmark benchmark-go-off
153+
(bench "invoke comp x1"
154+
((comp inc) 0))
155+
(bench "go-off x1"
156+
@(go-off (inc (<!? 0))))
157+
(bench "go-off deferred x1"
158+
@(go-off (inc (<!? (d/success-deferred 0)))))
159+
(bench "go-off future 200 x1"
160+
@(go-off (inc (<!? (d/future (Thread/sleep 200) 0)))))
161+
(bench "invoke comp x2"
162+
((comp inc inc) 0))
163+
(bench "go-off x2"
164+
@(go-off (inc (<!? (inc (<!? 0))))))
165+
(bench "go-off deferred x2"
166+
@(go-off (inc (<!? (inc (<!? (d/success-deferred 0)))))))
167+
(bench "go-off future 200 x2"
168+
@(go-off (inc (<!? (inc (<!? (d/future (Thread/sleep 200) 0)))))))
169+
(bench "invoke comp x5"
170+
((comp inc inc inc inc inc) 0))
171+
(bench "go-off x5"
172+
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? 0))))))))))))
173+
(bench "go-off deferred x5"
174+
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/success-deferred 0))))))))))))
175+
(bench "go-off future 200 x5"
176+
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/future (Thread/sleep 200) 0)))))))))))))))

0 commit comments

Comments
 (0)