Skip to content

Commit f1ff778

Browse files
committed
Implement catch-error-value
Automating "rx.exceptions.OnErrorThrowable/addValueAsLastCause" idiom when user code is invoked in onNext. Other minor cleanup.
1 parent df5503f commit f1ff778

File tree

2 files changed

+95
-8
lines changed
  • language-adaptors/rxjava-clojure/src

2 files changed

+95
-8
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,53 @@
3939
;################################################################################
4040

4141
(defn on-next
42-
"Call onNext on the given observer."
42+
"Call onNext on the given observer and return o."
4343
[^Observer o value]
44-
(.onNext o value))
44+
(.onNext o value)
45+
o)
4546

4647
(defn on-completed
47-
"Call onCompleted on the given observer."
48+
"Call onCompleted on the given observer and return o."
4849
[^Observer o]
49-
(.onCompleted o))
50+
(.onCompleted o)
51+
o)
5052

5153
(defn on-error
52-
"Call onError on the given observer."
54+
"Call onError on the given observer and return o."
5355
[^Observer o e]
54-
(.onError o e))
56+
(.onError o e)
57+
o)
58+
59+
(defmacro catch-error-value
60+
"Experimental
61+
62+
TODO: Better name, better abstraction.
63+
64+
Evaluate body and return its value. If an exception e is thrown, inject the
65+
given value into the exception's cause and call (on-error error-observer e),
66+
returning e.
67+
68+
This is meant to facilitate implementing Observers that call user-supplied code
69+
safely. The general pattern is something like:
70+
71+
(fn [o v]
72+
(rx/catch-error-value o v
73+
(rx/on-next o (some-func v))))
74+
75+
If (some-func v) throws an exception, it is caught, v is injected into the
76+
exception's cause (with OnErrorThrowable/addValueAsLastCause) and
77+
(rx/on-error o e) is invoked.
78+
79+
See:
80+
rx.exceptions.OnErrorThrowable/addValueAsLastCause
81+
"
82+
[error-observer value & body]
83+
`(try
84+
~@body
85+
(catch Throwable e#
86+
(on-error ~error-observer
87+
(rx.exceptions.OnErrorThrowable/addValueAsLastCause e# ~value))
88+
e#)))
5589

5690
;################################################################################
5791
; Tools for creating new operators and observables
@@ -504,7 +538,8 @@
504538
(let [n (atom -1)]
505539
(subscriber o
506540
(fn [o v]
507-
(on-next o (f (swap! n inc) v)))))))]
541+
(catch-error-value o v
542+
(on-next o (f (swap! n inc) v))))))))]
508543
(lift op xs)))
509544

510545
(def next

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,48 @@
88
(is (rx/observable? (rx/return 99)))
99
(is (not (rx/observable? "I'm not an observable"))))
1010

11+
(deftest test-on-next
12+
(testing "calls onNext"
13+
(let [called (atom [])
14+
o (reify rx.Observer (onNext [this value] (swap! called conj value)))]
15+
(is (identical? o (rx/on-next o 1)))
16+
(is (= [1] @called)))))
17+
18+
(deftest test-on-completed
19+
(testing "calls onCompleted"
20+
(let [called (atom 0)
21+
o (reify rx.Observer (onCompleted [this] (swap! called inc)))]
22+
(is (identical? o (rx/on-completed o)))
23+
(is (= 1 @called)))))
24+
25+
(deftest test-on-error
26+
(testing "calls onError"
27+
(let [called (atom [])
28+
e (java.io.FileNotFoundException. "yum")
29+
o (reify rx.Observer (onError [this e] (swap! called conj e)))]
30+
(is (identical? o (rx/on-error o e)))
31+
(is (= [e] @called)))))
32+
33+
(deftest test-catch-error-value
34+
(testing "if no exception, returns body"
35+
(let [o (reify rx.Observer)]
36+
(is (= 3 (rx/catch-error-value o 99
37+
(+ 1 2))))))
38+
39+
(testing "exceptions call onError on observable and inject value in exception"
40+
(let [called (atom [])
41+
e (java.io.FileNotFoundException. "boo")
42+
o (reify rx.Observer
43+
(onError [this e]
44+
(swap! called conj e)))
45+
result (rx/catch-error-value o 100
46+
(throw e))
47+
cause (.getCause e)]
48+
(is (identical? e result))
49+
(is (= [e] @called))
50+
(when (is (instance? rx.exceptions.OnErrorThrowable$OnNextValue cause))
51+
(is (= 100 (.getValue cause)))))))
52+
1153
(deftest test-subscribe
1254
(testing "subscribe overload with only onNext"
1355
(let [o (rx/return 1)
@@ -285,7 +327,17 @@
285327
(rx/seq->o [8])]))))))
286328
(deftest test-map-indexed
287329
(is (= (map-indexed vector [:a :b :c])
288-
(b/into [] (rx/map-indexed vector (rx/seq->o [:a :b :c]))))))
330+
(b/into [] (rx/map-indexed vector (rx/seq->o [:a :b :c])))))
331+
(testing "exceptions from fn have error value injected"
332+
(try
333+
(->> (rx/seq->o [:a :b :c])
334+
(rx/map-indexed (fn [i v]
335+
(if (= 1 i)
336+
(throw (java.io.FileNotFoundException. "blah")))
337+
v))
338+
(b/into []))
339+
(catch java.io.FileNotFoundException e
340+
(is (= :b (-> e .getCause .getValue)))))))
289341

290342
(deftest test-mapcat
291343
(let [f (fn [v] [v (* v v)])

0 commit comments

Comments
 (0)