Skip to content

Commit 80ba1b3

Browse files
committed
Replace chain with operators. General core cleanup
1 parent 1b13e56 commit 80ba1b3

File tree

2 files changed

+181
-111
lines changed
  • language-adaptors/rxjava-clojure/src

2 files changed

+181
-111
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 134 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
(ns rx.lang.clojure.core
22
(:refer-clojure :exclude [concat cons do drop drop-while empty
3-
filter future
3+
filter first future
44
interpose into keep keep-indexed
55
map mapcat map-indexed
66
merge next partition reduce reductions
@@ -10,14 +10,14 @@
1010
[rx.lang.clojure.base :as base]
1111
[rx.lang.clojure.graph :as graph]
1212
[rx.lang.clojure.realized :as realized])
13-
(:import [rx Observable Observer Subscription]
13+
(:import [rx Observable Observer Subscriber Subscription Observable$Operator Observable$OnSubscribe]
1414
[rx.observables BlockingObservable]
1515
[rx.subscriptions Subscriptions]
1616
[rx.util.functions Action0 Action1 Func0 Func1 Func2]))
1717

1818
(set! *warn-on-reflection* true)
1919

20-
(declare map map-indexed reduce take take-while)
20+
(declare concat map map-indexed reduce take take-while)
2121

2222
(defn ^Func1 fn->predicate
2323
"Turn f into a predicate that returns true/false like Rx predicates should"
@@ -62,23 +62,75 @@
6262
([^Observable o on-next-action on-error-action on-completed-action]
6363
(.subscribe o ^Action1 (iop/action* on-next-action) ^Action1 (iop/action* on-error-action) ^Action0 (iop/action* on-completed-action))))
6464

65-
(defn chain
66-
"Like subscribe, but any omitted handlers pass-through to the next observable."
67-
([from to]
68-
(chain from to #(on-next to %)))
69-
([from to on-next-action]
70-
(chain from to on-next-action #(on-error to %)))
71-
([from to on-next-action on-error-action]
72-
(chain from to on-next-action on-error-action #(on-completed to)))
73-
([from to on-next-action on-error-action on-completed-action]
74-
(subscribe from on-next-action on-error-action on-completed-action)))
65+
(defn ^Subscriber ->subscriber
66+
""
67+
([o on-next-action] (->subscriber o on-next-action nil nil))
68+
([o on-next-action on-error-action] (->subscriber o on-next-action on-error-action))
69+
([^Subscriber o on-next-action on-error-action on-completed-action]
70+
(proxy [Subscriber] [o]
71+
(onCompleted []
72+
(if on-completed-action
73+
(on-completed-action o)
74+
(on-completed o)))
75+
(onError [e]
76+
(if on-error-action
77+
(on-error-action o e)
78+
(on-error o e)))
79+
(onNext [t]
80+
(if on-next-action
81+
(on-next-action o t)
82+
(on-next o t))))))
83+
84+
(defn ^Observable$Operator ->operator
85+
"Create a basic Operator with the given handler fns. If a handler is omitted or nil
86+
it's treated as a pass-through.
87+
88+
on-next-action Passed Subscriber and value
89+
on-error-action Passed Throwable
90+
on-completed-action No-args
91+
92+
See:
93+
lift
94+
rx.Observable$Operator
95+
"
96+
[input]
97+
{:pre [(fn? input)]}
98+
(reify Observable$Operator
99+
(call [this o]
100+
(input o))))
101+
102+
(defn lift
103+
"Lift the Operator op over the given Observable xs
104+
105+
Example:
106+
107+
(->> my-observable
108+
(rx/lift (rx/->operator ...))
109+
...)
110+
111+
See:
112+
rx.Observable/lift
113+
->operator
114+
"
115+
[^Observable$Operator op ^Observable xs]
116+
(.lift xs op))
75117

76118
(defn unsubscribe
77119
"Unsubscribe from Subscription s and return it."
78120
[^Subscription s]
79121
(.unsubscribe s)
80122
s)
81123

124+
(defn unsubscribed?
125+
"Returns true if the given Subscription (or Subscriber) is unsubscribed.
126+
127+
See:
128+
rx.Observable/create
129+
fn->o
130+
"
131+
[^Subscription s]
132+
(.isUnsubscribed s))
133+
82134
(defn ^Subscription fn->subscription
83135
"Create a new subscription that calls the given no-arg handler function when
84136
unsubscribe is called
@@ -89,6 +141,19 @@
89141
[handler]
90142
(Subscriptions/create ^Action0 (iop/action* handler)))
91143

144+
(defn ^Observable fn->o
145+
"Create an Observable from the given function.
146+
147+
When subscribed to, (f subscriber) is called at which point, f can start emitting values, etc.
148+
The passed subscriber is of type rx.Subscriber.
149+
150+
See:
151+
rx.Subscriber
152+
rx.Observable/create
153+
"
154+
[f]
155+
(Observable/create ^Observable$OnSubscribe (iop/action* f)))
156+
92157
;################################################################################
93158

94159
(defn ^Observable never [] (Observable/never))
@@ -103,12 +168,6 @@
103168
[value]
104169
(Observable/just value))
105170

106-
(defn ^Observable fn->o
107-
"Create an observable from the given handler. When subscribed to, (f observer)
108-
is called at which point, f can start emitting values, etc."
109-
[f]
110-
(Observable/create (iop/fn* f)))
111-
112171
(defn ^Observable seq->o
113172
"Make an observable out of some seq-able thing. The rx equivalent of clojure.core/seq."
114173
[xs]
@@ -119,16 +178,17 @@
119178
;################################################################################
120179

121180
(defn cache
122-
"caches the observable value so that multiple subscribers don't re-evaluate it"
181+
"caches the observable value so that multiple subscribers don't re-evaluate it.
182+
183+
See:
184+
rx.Observable/cache"
123185
[^Observable xs]
124186
(.cache xs))
125187

126188
(defn cons
127189
"cons x to the beginning of xs"
128190
[x xs]
129-
(fn->o (fn [target]
130-
(on-next target x)
131-
(chain xs target))))
191+
(concat (return x) xs))
132192

133193
(defn ^Observable concat
134194
"Concatenate the given Observables one after the another.
@@ -162,64 +222,46 @@
162222
163223
Example:
164224
165-
(->> (rx/seq->o [1 2 3])
166-
(rx/do println)
167-
...)
225+
(->> (rx/seq->o [1 2 3])
226+
(rx/do println)
227+
...)
168228
169-
Will print 1, 2, 3.
229+
Will print 1, 2, 3.
170230
"
171-
([do-fn xs]
172-
(fn->o (fn [target]
173-
(let [state (atom {:sub nil
174-
:error nil })
175-
on-next-fn (fn [v]
176-
; since we may not be able to unsubscribe, drop
177-
; anything after an error
178-
(let [{:keys [sub error]} @state]
179-
(if-not error
180-
(try
181-
(do-fn v)
182-
(on-next target v)
183-
(catch Throwable e
184-
(reset! state {:error e :sub nil})
185-
(if sub
186-
(unsubscribe sub))
187-
(on-error target e))))))]
188-
(let [sub (chain xs target on-next-fn)]
189-
; dependening on xs, this may not be reached until after the sequence
190-
; is complete.
191-
(swap! state update-in [:sub] (constantly sub))
192-
sub))))))
231+
[do-fn xs]
232+
(map #(do (do-fn %) %) xs))
193233

194234
(defn ^Observable drop
195235
[n ^Observable xs]
196236
(.skip xs n))
197237

198238
(defn ^Observable drop-while
199-
[p xs]
200-
(fn->o (fn [target]
201-
(let [dropping (atom true)]
202-
(chain xs
203-
target
204-
(fn [v]
205-
(when (or (not @dropping)
206-
(not (reset! dropping (p v))))
207-
(on-next target v))))))))
239+
[p ^Observable xs]
240+
(.skipWhile xs (fn->predicate p)))
208241

209242
(defn ^Observable filter
210243
[p ^Observable xs]
211244
(.filter xs (fn->predicate p)))
212245

246+
(defn ^Observable first
247+
"Returns an Observable that emits the first item emitted by xs, or an
248+
empty Observable if xs is empty.
249+
250+
See:
251+
rx.Observable/takeFirst
252+
"
253+
[^Observable xs]
254+
(.takeFirst xs))
255+
213256
(defn interpose
214257
[sep xs]
215-
(fn->o (fn [target]
216-
(let [first? (atom true)]
217-
(chain xs
218-
target
219-
(fn [v]
220-
(if-not (compare-and-set! first? true false)
221-
(on-next target sep))
222-
(on-next target v)))))))
258+
(let [op (->operator (fn [o]
259+
(let [first? (atom true)]
260+
(->subscriber o (fn [o v]
261+
(if-not (compare-and-set! first? true false)
262+
(on-next o sep))
263+
(on-next o v))))))]
264+
(lift op xs)))
223265

224266
(defn into
225267
"Returns an observable that emits a single value which is all of the
@@ -234,10 +276,6 @@
234276
.toList
235277
(map (partial clojure.core/into to))))
236278

237-
(defn keep
238-
[f xs]
239-
(filter (complement nil?) (map xs f)))
240-
241279
(defn keep
242280
[f xs]
243281
(filter (complement nil?) (map f xs)))
@@ -273,12 +311,13 @@
273311
clojure.core/map-indexed
274312
"
275313
[f xs]
276-
(fn->o (fn [target]
277-
(let [n (atom -1)]
278-
(chain xs
279-
target
280-
(fn [v] (on-next target (f (swap! n inc) v))))))))
314+
(let [op (->operator (fn [o]
315+
(let [n (atom -1)]
316+
(->subscriber o
317+
(fn [o v] (on-next o (f (swap! n inc) v)))))))]
318+
(lift op xs)))
281319

320+
; TODO which merge goes here?
282321
(defn merge
283322
"
284323
Returns an observable that emits a single map that consists of the rest of the
@@ -296,7 +335,7 @@
296335
(reduce clojure.core/merge {} maps))
297336

298337
(def next
299-
"Returns an observable that emits all of the first element of the input observable.
338+
"Returns an observable that emits all but the first element of the input observable.
300339
301340
See:
302341
clojure.core/next
@@ -325,13 +364,10 @@
325364
clojure.core/some
326365
"
327366
[p ^Observable xs]
328-
(fn->o (fn [target]
329-
(chain xs
330-
target
331-
(fn [v]
332-
(when-let [result (p v)]
333-
(on-next target result)
334-
(on-completed target)))))))
367+
(->> xs
368+
(map p)
369+
(filter identity)
370+
first))
335371

336372
(defn sort
337373
"Returns an observable that emits a single value which is a sorted sequence
@@ -387,24 +423,25 @@
387423
(.take xs n))
388424

389425
(defn take-while
390-
[p xs]
391-
(fn->o (fn [target]
392-
(chain xs
393-
target
394-
(fn [v]
395-
(if (p v)
396-
(on-next target v)
397-
(on-completed target)))))))
426+
"Returns an Observable that emits xs until the first x such that
427+
(p x) is falsey.
428+
429+
See:
430+
clojure.core/take-while
431+
rx.Observable/takeWhile
432+
"
433+
[p ^Observable xs]
434+
(.takeWhile xs (fn->predicate p)))
398435

399436
;################################################################################;
400437

401438
(defn throw
402439
"Returns an Observable the simply emits the given exception with on-error
403440
404441
See:
405-
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#error(java.lang.Exception)
442+
rx.Observable/error
406443
"
407-
[^Exception e]
444+
[^Throwable e]
408445
(Observable/error e))
409446

410447
(defn catch*
@@ -444,6 +481,9 @@
444481
445482
The body of the catch is wrapped in an implicit (do). It must evaluate to an Observable.
446483
484+
Note that the source observable is the first argument so this won't mix well with ->>
485+
threading.
486+
447487
Example:
448488
449489
(-> my-observable
@@ -504,9 +544,7 @@
504544
(generator* on-next 99)
505545
"
506546
[f & args]
507-
(fn->o (-> (fn [observer]
508-
(apply f observer args)
509-
(Subscriptions/empty))
547+
(fn->o (-> #(apply f % args)
510548
base/wrap-on-completed
511549
base/wrap-on-error)))
512550

0 commit comments

Comments
 (0)