Skip to content

Commit 9269d4e

Browse files
committed
Eliminated macro version of future stuff based on feedback.
1 parent ee45b44 commit 9269d4e

File tree

5 files changed

+113
-133
lines changed

5 files changed

+113
-133
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/future.clj

Lines changed: 32 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -2,44 +2,45 @@
22
"Functions and macros for making rx-ified futures. That is, run some code in some
33
other thread and return an Observable of its result.
44
"
5-
(:refer-clojure :exclude [future])
65
(:require [rx.lang.clojure.interop :as iop]
76
[rx.lang.clojure.core :as rx]))
87

98
(def ^:private -ns- *ns*)
109
(set! *warn-on-reflection* true)
1110

12-
(defn default-runner
13-
"Default runner creator function. Creates futures on Clojure's default future thread pool."
14-
[f]
15-
(future-call f))
11+
(defn future*
12+
"Exerimental/Possibly a bad idea
1613
17-
(defn future-generator*
18-
"Same as rx/generator* except f is invoked in a separate thread.
14+
Execute (f & args) in a separate thread and pass the result to onNext.
15+
If an exception is thrown, onError is called with the exception.
1916
2017
runner is a function that takes a no-arg function argument and returns a future
2118
representing the execution of that function.
2219
2320
Returns an Observable. If the subscriber unsubscribes, the future will be canceled
2421
with clojure.core/future-cancel
2522
26-
See:
27-
rx.lang.clojure.core/generator*
28-
rx.lang.clojure.future/future-generator
23+
Examples:
24+
25+
(subscribe (rx/future future-call
26+
#(slurp \"input.txt\"))
27+
(fn [v] (println \"Got: \" v)))
28+
; eventually outputs content of input.txt
2929
"
3030
[runner f & args]
3131
{:pre [(ifn? runner) (ifn? f)]}
3232
(rx/observable* (fn [^rx.Subscriber observer]
33-
(let [wrapped (-> (fn [o]
34-
(apply f o args))
35-
rx/wrap-on-completed
36-
rx/wrap-on-error)
37-
fu (runner #(wrapped observer))]
38-
(.add observer
39-
(rx/subscription #(future-cancel fu)))))))
33+
(let [wrapped (-> #(rx/on-next % (apply f args))
34+
rx/wrap-on-completed
35+
rx/wrap-on-error)
36+
fu (runner #(wrapped observer))]
37+
(.add observer
38+
(rx/subscription #(future-cancel fu)))))))
4039

41-
(defmacro future-generator
42-
"Same as rx/generator macro except body is invoked in a separate thread.
40+
(defn future-generator*
41+
"Exerimental/Possibly a bad idea
42+
43+
Same as rx/generator* except f is invoked in a separate thread.
4344
4445
runner is a function that takes a no-arg function argument and returns a future
4546
representing the execution of that function.
@@ -49,56 +50,22 @@
4950
5051
Example:
5152
52-
(future-generator default-runner
53-
[o]
54-
(rx/on-next o 1)
55-
(Thread/sleep 1000)
56-
(rx/on-next o 2))
53+
(future-generator* future-call
54+
(fn [o]
55+
(rx/on-next o 1)
56+
(Thread/sleep 1000)
57+
(rx/on-next o 2)))
5758
5859
See:
5960
rx.lang.clojure.core/generator*
60-
rx.lang.clojure.future/future-generator
61-
"
62-
[runner bindings & body]
63-
`(future-generator* ~runner (fn ~bindings ~@body)))
64-
65-
(defn future*
66-
"Execute (f & args) in a separate thread and pass the result to onNext.
67-
If an exception is thrown, onError is called with the exception.
68-
69-
runner is a function that takes a no-arg function argument and returns a future
70-
representing the execution of that function.
71-
72-
Returns an Observable. If the subscriber unsubscribes, the future will be canceled
73-
with clojure.core/future-cancel
7461
"
7562
[runner f & args]
7663
{:pre [(ifn? runner) (ifn? f)]}
7764
(rx/observable* (fn [^rx.Subscriber observer]
78-
(let [wrapped (-> #(rx/on-next % (apply f args))
79-
rx/wrap-on-completed
80-
rx/wrap-on-error)
81-
fu (runner #(wrapped observer))]
82-
(.add observer
83-
(rx/subscription #(future-cancel fu)))))))
84-
85-
(defmacro future
86-
"Executes body in a separate thread and passes the single result to onNext.
87-
If an exception occurs, onError is called.
88-
89-
Returns an Observable. If the subscriber unsubscribes, the future will be canceled
90-
with clojure.core/future-cancel
91-
92-
runner is a function that takes a no-arg function argument and returns a future
93-
representing the execution of that function.
94-
95-
Examples:
96-
97-
(subscribe (rx/future rx/default-runner
98-
(slurp \"input.txt\"))
99-
(fn [v] (println \"Got: \" v)))
100-
; eventually outputs content of input.txt
101-
"
102-
[runner & body]
103-
`(future* ~runner (fn [] ~@body)))
104-
65+
(let [wrapped (-> (fn [o]
66+
(apply f o args))
67+
rx/wrap-on-completed
68+
rx/wrap-on-error)
69+
fu (runner #(wrapped observer))]
70+
(.add observer
71+
(rx/subscription #(future-cancel fu)))))))

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/chunk_test.clj

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99
(deftest test-chunk
1010
(let [n 20
1111
chunk-size 10
12-
factory (rx-future/future-generator rx-future/default-runner [o]
13-
(doseq [i (range n)]
14-
(Thread/sleep (rand-int 50))
15-
(rx/on-next o (rx-future/future rx-future/default-runner
16-
(let [t (rand-int 500)]
17-
(Thread/sleep t))
18-
i))))]
12+
factory (rx-future/future-generator*
13+
future-call
14+
(fn[o]
15+
(doseq [i (range n)]
16+
(Thread/sleep (rand-int 50))
17+
(rx/on-next o (rx-future/future*
18+
future-call
19+
#(let [t (rand-int 500)]
20+
(Thread/sleep t)
21+
i))))))]
1922
(is (= (range n)
2023
(sort (rx-blocking/into []
2124
(rx-chunk/chunk chunk-size {:debug true} factory)))))))
@@ -24,29 +27,35 @@
2427
(testing "error from source is propagated"
2528
(let [n 20
2629
chunk-size 4
27-
factory (rx-future/future-generator rx-future/default-runner [o]
28-
(doseq [i (range n)]
29-
(Thread/sleep (rand-int 50))
30-
(rx/on-next o (rx-future/future rx-future/default-runner
31-
(let [t (rand-int 1000)]
32-
(Thread/sleep t))
33-
i)))
34-
(throw (IllegalArgumentException. "hi")))]
30+
factory (rx-future/future-generator*
31+
future-call
32+
(fn [o]
33+
(doseq [i (range n)]
34+
(Thread/sleep (rand-int 50))
35+
(rx/on-next o (rx-future/future*
36+
future-call
37+
#(let [t (rand-int 1000)]
38+
(Thread/sleep t)
39+
i))))
40+
(throw (IllegalArgumentException. "hi"))))]
3541
(is (thrown-with-msg? IllegalArgumentException #"hi"
3642
(rx-blocking/into []
3743
(rx-chunk/chunk chunk-size {:debug true} factory))))))
3844

3945
(testing "error from single observable is propagated"
4046
(let [n 20
4147
chunk-size 4
42-
factory (rx-future/future-generator rx-future/default-runner [o]
43-
(doseq [i (range n)]
44-
(Thread/sleep (rand-int 50))
45-
(rx/on-next o (rx-future/future rx-future/default-runner
46-
(let [t (rand-int 1000)]
47-
(throw (IllegalArgumentException. "byebye"))
48-
(Thread/sleep t))
49-
i))))]
48+
factory (rx-future/future-generator*
49+
future-call
50+
(fn [o]
51+
(doseq [i (range n)]
52+
(Thread/sleep (rand-int 50))
53+
(rx/on-next o (rx-future/future*
54+
future-call
55+
#(let [t (rand-int 1000)]
56+
(throw (IllegalArgumentException. "byebye"))
57+
(Thread/sleep t)
58+
i))))))]
5059
(is (thrown? rx.exceptions.CompositeException
5160
(rx-blocking/into []
5261
(rx-chunk/chunk chunk-size

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,12 @@
112112
(b/into []))))))
113113

114114
(let [expected-result [[1 3 5] [2 4 6]]
115-
sleepy-o #(f/future-generator f/default-runner [o]
116-
(doseq [x %]
117-
(Thread/sleep 10)
118-
(rx/on-next o x)))
115+
sleepy-o #(f/future-generator*
116+
future-call
117+
(fn [o]
118+
(doseq [x %]
119+
(Thread/sleep 10)
120+
(rx/on-next o x))))
119121
make-inputs (fn [] (mapv sleepy-o expected-result))
120122
make-output (fn [r] [(keep #{1 3 5} r)
121123
(keep #{2 4 6} r)])]

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/future_test.clj

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,25 @@
77
(deftest test-future-generator
88
(is (not= [(.getId (Thread/currentThread))]
99
(b/into []
10-
(f/future-generator f/default-runner
11-
[observer]
12-
(rx/on-next observer (.getId (Thread/currentThread))))))))
10+
(f/future-generator* future-call
11+
#(rx/on-next % (.getId (Thread/currentThread))))))))
1312

1413
(deftest test-future
15-
(is (= [15] (b/into [] (f/future* f/default-runner + 1 2 3 4 5))))
16-
(is (= [15] (b/into [] (f/future f/default-runner (println "HI") (+ 1 2 3 4 5))))) )
14+
(is (= [15] (b/into [] (f/future* future-call + 1 2 3 4 5)))))
1715

1816
(deftest test-future-exception
1917
(is (= "Caught: boo"
20-
(->> (f/future f/default-runner (throw (java.io.FileNotFoundException. "boo")))
18+
(->> (f/future* future-call #(throw (java.io.FileNotFoundException. "boo")))
2119
(rx/catch java.io.FileNotFoundException e
2220
(rx/return (str "Caught: " (.getMessage e))))
2321
(b/single)))))
2422

2523
(deftest test-future-cancel
2624
(let [exited? (atom nil)
27-
o (f/future f/default-runner
28-
(Thread/sleep 1000)
29-
(reset! exited? true)
30-
"WAT")
25+
o (f/future* future-call
26+
(fn [] (Thread/sleep 1000)
27+
(reset! exited? true)
28+
"WAT"))
3129
result (->> o
3230
(rx/take 0)
3331
(b/into []))]
@@ -37,11 +35,11 @@
3735

3836
(deftest test-future-generator-cancel
3937
(let [exited? (atom nil)
40-
o (f/future-generator f/default-runner
41-
[o]
42-
(rx/on-next o "FIRST")
43-
(Thread/sleep 1000)
44-
(reset! exited? true))
38+
o (f/future-generator* future-call
39+
(fn [o]
40+
(rx/on-next o "FIRST")
41+
(Thread/sleep 1000)
42+
(reset! exited? true)))
4543
result (->> o
4644
(rx/take 1)
4745
(b/into []))]
@@ -52,12 +50,12 @@
5250
(deftest test-future-generator-exception
5351
(let [e (java.io.FileNotFoundException. "snake")]
5452
(is (= [1 2 e]
55-
(->> (f/future-generator
56-
f/default-runner
57-
[o]
58-
(rx/on-next o 1)
59-
(rx/on-next o 2)
60-
(throw e))
53+
(->> (f/future-generator*
54+
future-call
55+
(fn [o]
56+
(rx/on-next o 1)
57+
(rx/on-next o 2)
58+
(throw e)))
6159
(rx/catch java.io.FileNotFoundException e
6260
(rx/return e))
6361
(b/into []))))))

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/graph_test.clj

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,42 +43,46 @@
4343
(rx-blocking/single
4444
(-> (let [z (rx/return "hi")] ; an observable from "somewhere else"
4545
(graph/let-o
46-
[?a (rx-future/future rx-future/default-runner (Thread/sleep 50) 99)
47-
?b (rx-future/future rx-future/default-runner (Thread/sleep 500) 100)
46+
[?a (rx-future/future* future-call #(do (Thread/sleep 50) 99))
47+
?b (rx-future/future* future-call #(do (Thread/sleep 500) 100))
4848
?c (rx/map #(hash-map :a %1 :b %2 :z %3) ?a ?b ?z)
4949
?z z]
5050
(rx/reduce merge {} ?c)))))))))
5151

5252
(deftest test-complicated-graph
5353
; These funcs model network requests for various stuff. They all return observable.
5454
(let [request-vhs (fn []
55-
(rx-future/future-generator rx-future/default-runner
56-
[o]
57-
(Thread/sleep 50)
58-
(doseq [i (range 3)]
59-
(rx/on-next o {:id i}))))
55+
(rx-future/future-generator*
56+
future-call
57+
(fn [o]
58+
(Thread/sleep 50)
59+
(doseq [i (range 3)]
60+
(rx/on-next o {:id i})))))
6061
request-user (fn [id]
61-
(rx-future/future rx-future/default-runner
62-
(Thread/sleep (rand-int 250))
63-
{:id id
64-
:name (str "friend" id) }))
62+
(rx-future/future*
63+
future-call
64+
#(do (Thread/sleep (rand-int 250))
65+
{:id id
66+
:name (str "friend" id) })))
6567
request-ab (fn [u]
66-
(rx-future/future rx-future/default-runner
67-
(Thread/sleep (rand-int 250))
68-
{:user-id (:id u)
69-
:cell (* 2 (:id u))}))
68+
(rx-future/future*
69+
future-call
70+
#(do (Thread/sleep (rand-int 250))
71+
{:user-id (:id u)
72+
:cell (* 2 (:id u))})))
7073

7174
request-video-md (fn [v]
7275
(rx/return {:video v
7376
:title (str "title" (:id v)) }))
7477

7578
; Now we can stitch all these requests together into an rx graph to
7679
; produce a response.
77-
o (graph/let-o [?user-info (rx-future/future rx-future/default-runner
78-
(Thread/sleep 20)
79-
{:name "Bob"
80-
:id 12345
81-
:friend-ids [1 2 3] })
80+
o (graph/let-o [?user-info (rx-future/future*
81+
future-call
82+
#(do (Thread/sleep 20)
83+
{:name "Bob"
84+
:id 12345
85+
:friend-ids [1 2 3] }))
8286

8387
?friends (->> ?user-info
8488
(rx/mapcat (fn [ui]

0 commit comments

Comments
 (0)