Skip to content

Commit 236cbdb

Browse files
committed
Clean up future stuff and docs
1 parent da88202 commit 236cbdb

File tree

4 files changed

+131
-66
lines changed

4 files changed

+131
-66
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
(:require [rx.lang.clojure.interop :as iop]
1010
[rx.lang.clojure.graph :as graph]
1111
[rx.lang.clojure.realized :as realized])
12-
(:import [rx Observable Observer Subscriber Subscription Observable$Operator Observable$OnSubscribe]
12+
(:import [rx
13+
Observable
14+
Observer Observable$Operator Observable$OnSubscribe
15+
Subscriber Subscription]
1316
[rx.observables BlockingObservable]
1417
[rx.subscriptions Subscriptions]
1518
[rx.util.functions Action0 Action1 Func0 Func1 Func2]))
@@ -47,24 +50,29 @@
4750
[^Observer o e]
4851
(.onError o e))
4952

50-
(defn on-error-return
51-
[^Observable o f]
52-
(.onErrorReturn o f))
53-
5453
;################################################################################
5554

5655
(defn ^Subscription subscribe
56+
5757
([^Observable o on-next-action]
58-
(.subscribe o ^Action1 (iop/action* on-next-action)))
58+
(.subscribe o
59+
^Action1 (iop/action* on-next-action)))
60+
5961
([^Observable o on-next-action on-error-action]
60-
(.subscribe o ^Action1 (iop/action* on-next-action) ^Action1 (iop/action* on-error-action)))
62+
(.subscribe o
63+
^Action1 (iop/action* on-next-action)
64+
^Action1 (iop/action* on-error-action)))
65+
6166
([^Observable o on-next-action on-error-action on-completed-action]
62-
(.subscribe o ^Action1 (iop/action* on-next-action) ^Action1 (iop/action* on-error-action) ^Action0 (iop/action* on-completed-action))))
67+
(.subscribe o
68+
^Action1 (iop/action* on-next-action)
69+
^Action1 (iop/action* on-error-action)
70+
^Action0 (iop/action* on-completed-action))))
6371

6472
(defn ^Subscriber ->subscriber
6573
""
6674
([o on-next-action] (->subscriber o on-next-action nil nil))
67-
([o on-next-action on-error-action] (->subscriber o on-next-action on-error-action))
75+
([o on-next-action on-error-action] (->subscriber o on-next-action on-error-action nil))
6876
([^Subscriber o on-next-action on-error-action on-completed-action]
6977
(proxy [Subscriber] [o]
7078
(onCompleted []
@@ -80,8 +88,8 @@
8088
(on-next-action o t)
8189
(on-next o t))))))
8290

83-
(defn ^Observable$Operator ->operator
84-
"Create a basic Operator with the given handler fns. If a handler is omitted or nil
91+
(defn ^Observable$Operator fn->operator
92+
"Create a basic Operator with f. If a handler is omitted or nil
8593
it's treated as a pass-through.
8694
8795
on-next-action Passed Subscriber and value
@@ -92,24 +100,24 @@
92100
lift
93101
rx.Observable$Operator
94102
"
95-
[input]
96-
{:pre [(fn? input)]}
103+
[f]
104+
{:pre [(fn? f)]}
97105
(reify Observable$Operator
98106
(call [this o]
99-
(input o))))
107+
(f o))))
100108

101109
(defn lift
102110
"Lift the Operator op over the given Observable xs
103111
104112
Example:
105113
106114
(->> my-observable
107-
(rx/lift (rx/->operator ...))
115+
(rx/lift (rx/fn->operator ...))
108116
...)
109117
110118
See:
111119
rx.Observable/lift
112-
->operator
120+
fn->operator
113121
"
114122
[^Observable$Operator op ^Observable xs]
115123
(.lift xs op))
@@ -160,7 +168,8 @@
160168
[handler]
161169
(fn [^Observer observer]
162170
(handler observer)
163-
(.onCompleted observer)))
171+
(when-not (unsubscribed? observer)
172+
(.onCompleted observer))))
164173

165174
(defn wrap-on-error
166175
"Wrap handler with code that automaticaly calls (on-error) if an exception is thrown"
@@ -169,7 +178,8 @@
169178
(try
170179
(handler observer)
171180
(catch Throwable e
172-
(.onError observer e)))))
181+
(when-not (unsubscribed? observer)
182+
(.onError observer e))))))
173183

174184
(defn ^Observable merge
175185
"Observable.merge, renamed because merge means something else in Clojure
@@ -194,7 +204,7 @@
194204
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
195205

196206
(defn ^Observable merge-delay-error
197-
"Observable.mergeDelayError, renamed because merge means something else in Clojure"
207+
"Observable.mergeDelayError"
198208
[os]
199209
(cond
200210
(instance? java.util.List os)
@@ -205,7 +215,7 @@
205215
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
206216

207217
(defn ^Observable zip
208-
"Observable.zip. You want map."
218+
"rx.Observable.zip. You want map."
209219
([f ^Observable a ^Observable b] (Observable/zip a b (iop/fn* f)))
210220
([f ^Observable a ^Observable b ^Observable c] (Observable/zip a b c (iop/fn* f)))
211221
([f ^Observable a ^Observable b ^Observable c ^Observable d] (Observable/zip a b c d (iop/fn* f)))
@@ -224,20 +234,32 @@
224234
names (clojure.core/mapv clojure.core/first pairs)
225235
values (clojure.core/map second pairs)]
226236
`(zip (fn ~names ~@body) ~@values)))
227-
;################################################################################
228237

238+
;################################################################################
229239

240+
(defn ^Observable never
241+
"Returns an Observable that never emits any values and never completes.
230242
243+
See:
244+
rx.Observable/never
245+
"
246+
[]
247+
(Observable/never))
231248

249+
(defn ^Observable empty
250+
"Returns an Observable that completes immediately without emitting any values.
232251
233-
(defn ^Observable never [] (Observable/never))
234-
(defn ^Observable empty [] (Observable/empty))
252+
See:
253+
rx.Observable/empty
254+
"
255+
[]
256+
(Observable/empty))
235257

236258
(defn ^Observable return
237259
"Returns an observable that emits a single value.
238260
239261
See:
240-
Observable/just
262+
rx.Observable/just
241263
"
242264
[value]
243265
(Observable/just value))
@@ -329,7 +351,7 @@
329351

330352
(defn interpose
331353
[sep xs]
332-
(let [op (->operator (fn [o]
354+
(let [op (fn->operator (fn [o]
333355
(let [first? (atom true)]
334356
(->subscriber o (fn [o v]
335357
(if-not (compare-and-set! first? true false)
@@ -385,7 +407,7 @@
385407
clojure.core/map-indexed
386408
"
387409
[f xs]
388-
(let [op (->operator (fn [o]
410+
(let [op (fn->operator (fn [o]
389411
(let [n (atom -1)]
390412
(->subscriber o
391413
(fn [o v] (on-next o (f (swap! n inc) v)))))))]
@@ -589,16 +611,17 @@
589611
;################################################################################;
590612

591613
(defn generator*
592-
"Creates an observable that calls (f observable & args) which should emit a sequence.
614+
"Creates an observable that calls (f observable & args) which should emit values
615+
with (rx/on-next observable value).
593616
594617
Automatically calls on-completed on return, or on-error if any exception is thrown.
595618
596-
Subscribers will block.
619+
f should exit early if (rx/unsubscribed? observable) returns true
597620
598621
Examples:
599622
600623
; An observable that emits just 99
601-
(generator* on-next 99)
624+
(rx/generator* on-next 99)
602625
"
603626
[f & args]
604627
(fn->o (-> #(apply f % args)
@@ -611,7 +634,7 @@
611634
612635
Automatically calls on-completed on return, or on-error if any exception is thrown.
613636
614-
Subscribe will block.
637+
The body should exit early if (rx/unsubscribed? observable) returns true
615638
616639
Examples:
617640

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/future.clj

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
(ns rx.lang.clojure.future
2+
"Functions and macros for making rx-ified futures. That is, run some code in some
3+
other thread and return an Observable of its result.
4+
"
25
(:refer-clojure :exclude [future])
36
(:require [rx.lang.clojure.interop :as iop]
47
[rx.lang.clojure.core :as rx]))
@@ -17,29 +20,40 @@
1720
runner is a function that takes a no-arg function argument and returns a future
1821
representing the execution of that function.
1922
20-
subscribe will not block.
23+
Returns an Observable. If the subscriber unsubscribes, the future will be canceled
24+
with clojure.core/future-cancel
2125
2226
See:
2327
rx.lang.clojure.core/generator*
2428
rx.lang.clojure.future/future-generator
2529
"
2630
[runner f & args]
2731
{:pre [(ifn? runner) (ifn? f)]}
28-
(rx/fn->o (fn [observer]
29-
(let [wrapped (-> (fn [o]
30-
(apply f o args))
31-
rx/wrap-on-completed
32-
rx/wrap-on-error)
33-
fu (runner #(wrapped observer))]
34-
(rx/fn->subscription #(future-cancel fu))))))
32+
(rx/fn->o (fn [^rx.Subscriber observer]
33+
(let [wrapped (-> (fn [o]
34+
(apply f o args))
35+
rx/wrap-on-completed
36+
rx/wrap-on-error)
37+
fu (runner #(wrapped observer))]
38+
(.add observer
39+
(rx/fn->subscription #(future-cancel fu)))))))
3540

3641
(defmacro future-generator
3742
"Same as rx/generator macro except body is invoked in a separate thread.
3843
3944
runner is a function that takes a no-arg function argument and returns a future
4045
representing the execution of that function.
4146
42-
subscribe will not block.
47+
Returns an Observable. If the subscriber unsubscribes, the future will be canceled
48+
with clojure.core/future-cancel
49+
50+
Example:
51+
52+
(future-generator default-runner
53+
[o]
54+
(rx/on-next o 1)
55+
(Thread/sleep 1000)
56+
(rx/on-next o 2))
4357
4458
See:
4559
rx.lang.clojure.core/generator*
@@ -55,22 +69,25 @@
5569
runner is a function that takes a no-arg function argument and returns a future
5670
representing the execution of that function.
5771
58-
Returns an Observable.
72+
Returns an Observable. If the subscriber unsubscribes, the future will be canceled
73+
with clojure.core/future-cancel
5974
"
6075
[runner f & args]
6176
{:pre [(ifn? runner) (ifn? f)]}
62-
(rx/fn->o (fn [observer]
77+
(rx/fn->o (fn [^rx.Subscriber observer]
6378
(let [wrapped (-> #(rx/on-next % (apply f args))
6479
rx/wrap-on-completed
6580
rx/wrap-on-error)
6681
fu (runner #(wrapped observer))]
67-
(rx/fn->subscription #(future-cancel fu))))))
82+
(.add observer
83+
(rx/fn->subscription #(future-cancel fu)))))))
6884

6985
(defmacro future
7086
"Executes body in a separate thread and passes the single result to onNext.
7187
If an exception occurs, onError is called.
7288
73-
Returns an Observable
89+
Returns an Observable. If the subscriber unsubscribes, the future will be canceled
90+
with clojure.core/future-cancel
7491
7592
runner is a function that takes a no-arg function argument and returns a future
7693
representing the execution of that function.

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
(rx/on-completed s)))]
4545
(is (= [0 1 2] (b/into [] o)))))
4646

47-
(deftest test-->operator
48-
(let [o (rx/->operator #(rx/->subscriber %
47+
(deftest test-fn->operator
48+
(let [o (rx/fn->operator #(rx/->subscriber %
4949
(fn [o v]
5050
(if (even? v)
5151
(rx/on-next o v)))))
Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,62 @@
11
(ns rx.lang.clojure.future-test
22
(:require [rx.lang.clojure.core :as rx]
3-
[rx.lang.clojure.blocking :as blocking]
3+
[rx.lang.clojure.blocking :as b]
44
[rx.lang.clojure.future :as f])
55
(:require [clojure.test :refer [deftest testing is]]))
66

77
(deftest test-future-generator
88
(is (not= [(.getId (Thread/currentThread))]
9-
(blocking/into []
9+
(b/into []
1010
(f/future-generator f/default-runner
1111
[observer]
1212
(rx/on-next observer (.getId (Thread/currentThread))))))))
1313

1414
(deftest test-future
15-
(is (= [15] (blocking/into [] (f/future* f/default-runner + 1 2 3 4 5))))
16-
(is (= [15] (blocking/into [] (f/future f/default-runner (println "HI") (+ 1 2 3 4 5))))) )
15+
(is (= [15] (b/into [] (f/future* f/default-runner + 1 2 3 4 5))))
16+
(is (= [15] (b/into [] (f/future f/default-runner (println "HI") (+ 1 2 3 4 5))))) )
1717

18+
(deftest test-future-exception
19+
(is (= "Caught: boo"
20+
(-> (f/future f/default-runner (throw (java.io.FileNotFoundException. "boo")))
21+
(rx/catch java.io.FileNotFoundException e
22+
(rx/return (str "Caught: " (.getMessage e))))
23+
(b/single)))))
1824

19-
(comment (rx/subscribe (f/future* f/default-runner + 1 2 3 4 5)
20-
(fn [v] (println "RESULT: " v))
21-
(fn [e] (println "ERROR: " e))
22-
#(println "COMPLETED")))
25+
(deftest test-future-cancel
26+
(let [exited? (atom nil)
27+
o (f/future f/default-runner
28+
(Thread/sleep 1000)
29+
(reset! exited? true)
30+
"WAT")
31+
result (->> o
32+
(rx/take 0)
33+
(b/into []))]
34+
(Thread/sleep 2000)
35+
(is (= [nil []]
36+
[@exited? result]))))
2337

24-
(comment (rx/subscribe (f/future f/default-runner
25-
(Thread/sleep 5000)
26-
(+ 100 200))
27-
(fn [v] (println "RESULT: " v))
28-
(fn [e] (println "ERROR: " e))
29-
#(println "COMPLETED")))
30-
31-
(comment (rx/subscribe (f/future f/default-runner
32-
(Thread/sleep 2000)
33-
(throw (Exception. "Failed future")))
34-
(fn [v] (println "RESULT: " v))
35-
(fn [e] (println "ERROR: " e))
36-
#(println "COMPLETED")))
38+
(deftest test-future-generator-cancel
39+
(let [exited? (atom nil)
40+
o (f/future-generator f/default-runner
41+
[o]
42+
(rx/on-next o "FIRST")
43+
(Thread/sleep 1000)
44+
(reset! exited? true))
45+
result (->> o
46+
(rx/take 1)
47+
(b/into []))]
48+
(Thread/sleep 2000)
49+
(is (= [nil ["FIRST"]]
50+
[@exited? result]))))
3751

52+
(deftest test-future-generator-exception
53+
(let [e (java.io.FileNotFoundException. "snake")]
54+
(is (= [1 2 e]
55+
(b/into [] (-> (f/future-generator
56+
f/default-runner
57+
[o]
58+
(rx/on-next o 1)
59+
(rx/on-next o 2)
60+
(throw e))
61+
(rx/catch java.io.FileNotFoundException e
62+
(rx/return e))))))))

0 commit comments

Comments
 (0)