Skip to content

Commit 4276368

Browse files
committed
Update blocking stuff
1 parent bfc03d6 commit 4276368

File tree

2 files changed

+146
-19
lines changed

2 files changed

+146
-19
lines changed
Lines changed: 96 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,90 @@
11
(ns rx.lang.clojure.blocking
22
"Blocking operators and functions. These should never be used in
33
production code except at the end of an async chain to convert from
4-
rx land back to sync land. For example, to produce a servlet response."
5-
(:refer-clojure :exclude [first into])
6-
(:require [rx.lang.clojure.core :as rx])
4+
rx land back to sync land. For example, to produce a servlet response.
5+
6+
If you use these, you're a bad person.
7+
"
8+
(:refer-clojure :exclude [first into doseq last])
9+
(:require [rx.lang.clojure.interop :as iop] [rx.lang.clojure.core :as rx])
710
(:import [rx Observable]
811
[rx.observables BlockingObservable]))
912

1013
(def ^:private -ns- *ns*)
1114
(set! *warn-on-reflection* true)
1215

16+
(defmacro ^:private with-ex-unwrap
17+
[& body]
18+
`(try
19+
~@body
20+
(catch RuntimeException e#
21+
(throw (or
22+
(and (identical? RuntimeException (class e#))
23+
(.getCause e#))
24+
e#)))))
25+
1326
(defn ^BlockingObservable ->blocking
14-
"Convert an Observable to a BlockingObservable"
15-
[^Observable o]
16-
(.toBlockingObservable o))
27+
"Convert an Observable to a BlockingObservable.
28+
29+
If o is already a BlockingObservable it's returned unchanged.
30+
"
31+
[o]
32+
(if (instance? BlockingObservable o)
33+
o
34+
(.toBlockingObservable ^Observable o)))
35+
36+
(defn o->seq
37+
"Returns a lazy sequence of the items emitted by o
38+
39+
See:
40+
rx.observables.BlockingObservable/getIterator
41+
rx.lang.clojure.core/seq->o
42+
"
43+
[o]
44+
(-> (->blocking o)
45+
(.getIterator)
46+
(iterator-seq)))
1747

1848
(defn first
1949
"*Blocks* and waits for the first value emitted by the given observable.
2050
51+
If the Observable is empty, returns nil
52+
2153
If an error is produced it is thrown.
2254
2355
See:
2456
clojure.core/first
2557
rx/first
58+
rx.observables.BlockingObservable/first
59+
"
60+
[observable]
61+
(with-ex-unwrap
62+
(.firstOrDefault (->blocking observable) nil)))
63+
64+
(defn last
65+
"*Blocks* and waits for the last value emitted by the given observable.
66+
67+
If the Observable is empty, returns nil
68+
69+
If an error is produced it is thrown.
70+
71+
See:
72+
clojure.core/last
73+
rx/last
74+
rx.observable.BlockingObservable/last
2675
"
2776
[observable]
28-
(let [result (clojure.core/promise)]
29-
(rx/subscribe (->> observable (rx/take 1))
30-
#(clojure.core/deliver result [:value %])
31-
#(clojure.core/deliver result [:error %])
32-
#(clojure.core/deliver result nil))
33-
(if-let [[type v] @result]
34-
(case type
35-
:value v
36-
:error (throw v)))))
77+
(with-ex-unwrap
78+
(.lastOrDefault (->blocking observable) nil)))
3779

3880
(defn single
3981
"*Blocks* and waits for the first value emitted by the given observable.
4082
4183
An error is thrown if more then one value is produced.
4284
"
4385
[observable]
44-
(.single (->blocking observable)))
86+
(with-ex-unwrap
87+
(.single (->blocking observable))))
4588

4689
(defn into
4790
"*Blocks* and pours the elements emitted by the given observables into
@@ -55,3 +98,40 @@
5598
"
5699
[to from-observable]
57100
(first (rx/into to from-observable)))
101+
102+
(defn doseq*
103+
"*Blocks* and executes (f x) for each x emitted by xs
104+
105+
Returns nil.
106+
107+
See:
108+
doseq
109+
clojure.core/doseq
110+
"
111+
[xs f]
112+
(with-ex-unwrap
113+
(-> (->blocking xs)
114+
(.forEach (rx.lang.clojure.interop/action* f)))))
115+
116+
(defmacro doseq
117+
"Like clojure.core/doseq except iterates over an observable in a blocking manner.
118+
119+
Unlike clojure.core/doseq, only supports a single binding
120+
121+
Returns nil.
122+
123+
Example:
124+
125+
(rx-blocking/doseq [{:keys [name]} users-observable]
126+
(println \"User:\" name))
127+
128+
See:
129+
doseq*
130+
clojure.core/doseq
131+
"
132+
[bindings & body]
133+
(when (not= (count bindings) 2)
134+
(throw (IllegalArgumentException. (str "sorry, rx/doseq only supports one binding"))))
135+
(let [[k v] bindings]
136+
`(doseq* ~v (fn [~k] ~@body))))
137+

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/blocking_test.clj

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,63 @@
33
[rx.lang.clojure.core :as rx]
44
[clojure.test :refer [deftest testing is]]))
55

6+
(deftest test-->blocking
7+
(testing "returns a BlockingObservable from an Observable"
8+
(is (instance? rx.observables.BlockingObservable (b/->blocking (rx/return 0)))))
9+
10+
(testing "is idempotent"
11+
(is (instance? rx.observables.BlockingObservable (b/->blocking (b/->blocking (rx/return 0)))))))
12+
13+
14+
(deftest test-o->seq
15+
(is (= [1 2 3] (b/o->seq (rx/seq->o [1 2 3])))))
16+
617
(deftest test-first
718
(testing "returns first element of observable"
8-
(is (= 1 (b/first (rx/return 1)))))
19+
(is (= 1 (b/first (rx/seq->o [1 2 3])))))
20+
(testing "returns nil for empty observable"
21+
(is (nil? (b/first (rx/empty)))))
22+
(testing "rethrows errors"
23+
(is (thrown? java.io.FileNotFoundException
24+
(b/first (rx/throw (java.io.FileNotFoundException. "boo")))))))
25+
26+
(deftest test-last
27+
(testing "returns last element of observable"
28+
(is (= 3 (b/last (rx/seq->o [1 2 3])))))
929
(testing "returns nil for empty observable"
10-
(is (nil? (b/first (rx/empty))))))
30+
(is (nil? (b/last (rx/empty)))))
31+
(testing "rethrows errors"
32+
(is (thrown? java.io.FileNotFoundException
33+
(b/last (rx/throw (java.io.FileNotFoundException. "boo")))))))
1134

1235
(deftest test-single
1336
(testing "returns one element"
1437
(is (= 1 (b/single (rx/return 1)))))
1538
(testing "throw if empty"
1639
(is (thrown? java.lang.IllegalArgumentException (b/single (rx/empty)))))
1740
(testing "throw if many"
18-
(is (thrown? java.lang.IllegalArgumentException (b/single (rx/seq->o [1 2]))))))
41+
(is (thrown? java.lang.IllegalArgumentException (b/single (rx/seq->o [1 2])))))
42+
(testing "rethrows errors"
43+
(is (thrown? java.io.FileNotFoundException
44+
(b/single (rx/throw (java.io.FileNotFoundException. "boo")))))))
45+
46+
(deftest test-into
47+
(is (= [1 2 3]
48+
(b/into [1] (rx/seq->o [2 3]))))
49+
(testing "rethrows errors"
50+
(is (thrown? java.io.FileNotFoundException
51+
(b/into #{} (rx/throw (java.io.FileNotFoundException. "boo")))))))
52+
53+
(deftest test-doseq
54+
(is (= (range 3)
55+
(let [capture (atom [])]
56+
(b/doseq [{:keys [value]} (rx/seq->o (map #(hash-map :value %) (range 3)))]
57+
(println value)
58+
(swap! capture conj value))
59+
@capture)))
60+
61+
(testing "rethrows errors"
62+
(is (thrown? java.io.FileNotFoundException
63+
(b/doseq [i (rx/seq->o (range 3))]
64+
(throw (java.io.FileNotFoundException. "boo")))))))
65+

0 commit comments

Comments
 (0)