Skip to content

Commit df5503f

Browse files
committed
Cleaned up naming of observable operator definition fns
1 parent cf5a20c commit df5503f

File tree

4 files changed

+139
-138
lines changed

4 files changed

+139
-138
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/chunk.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@
9292
(fn []
9393
(swap! state-atom assoc :complete :complete)
9494
(advance! state-atom))))
95-
observable (rx/fn->o (fn [observer]
96-
(subscribe (new-state-atom observer)))) ]
95+
observable (rx/observable* (fn [observer]
96+
(subscribe (new-state-atom observer)))) ]
9797
(if (:delay-error? options)
9898
(rx.Observable/mergeDelayError observable)
9999
(rx.Observable/merge observable)))))

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 122 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -54,28 +54,14 @@
5454
(.onError o e))
5555

5656
;################################################################################
57+
; Tools for creating new operators and observables
5758

58-
(defn ^Subscription subscribe
59-
60-
([^Observable o on-next-action]
61-
(.subscribe o
62-
^Action1 (iop/action* on-next-action)))
59+
(declare unsubscribed?)
6360

64-
([^Observable o on-next-action on-error-action]
65-
(.subscribe o
66-
^Action1 (iop/action* on-next-action)
67-
^Action1 (iop/action* on-error-action)))
68-
69-
([^Observable o on-next-action on-error-action on-completed-action]
70-
(.subscribe o
71-
^Action1 (iop/action* on-next-action)
72-
^Action1 (iop/action* on-error-action)
73-
^Action0 (iop/action* on-completed-action))))
74-
75-
(defn ^Subscriber ->subscriber
61+
(defn ^Subscriber subscriber
7662
""
77-
([o on-next-action] (->subscriber o on-next-action nil nil))
78-
([o on-next-action on-error-action] (->subscriber o on-next-action on-error-action nil))
63+
([o on-next-action] (subscriber o on-next-action nil nil))
64+
([o on-next-action on-error-action] (subscriber o on-next-action on-error-action nil))
7965
([^Subscriber o on-next-action on-error-action on-completed-action]
8066
(proxy [Subscriber] [o]
8167
(onCompleted []
@@ -91,40 +77,96 @@
9177
(on-next-action o t)
9278
(on-next o t))))))
9379

94-
(defn ^Observable$Operator fn->operator
95-
"Create a basic Operator with f. If a handler is omitted or nil
96-
it's treated as a pass-through.
80+
(defn ^Subscription subscription
81+
"Create a new subscription that calls the given no-arg handler function when
82+
unsubscribe is called
83+
84+
See:
85+
rx.subscriptions.Subscriptions/create
86+
"
87+
[handler]
88+
(Subscriptions/create ^Action0 (iop/action* handler)))
9789

98-
on-next-action Passed Subscriber and value
99-
on-error-action Passed Throwable
100-
on-completed-action No-args
90+
(defn ^Observable$Operator operator*
91+
"Returns a new implementation of rx.Observable$Operator that calls the given
92+
function with a rx.Subscriber. The function should return a rx.Subscriber.
10193
10294
See:
103-
lift
104-
rx.Observable$Operator
95+
lift
96+
rx.Observable$Operator
10597
"
10698
[f]
10799
{:pre [(fn? f)]}
108100
(reify Observable$Operator
109101
(call [this o]
110102
(f o))))
111103

104+
(defn ^Observable observable*
105+
"Create an Observable from the given function.
106+
107+
When subscribed to, (f subscriber) is called at which point, f can start emitting values, etc.
108+
The passed subscriber is of type rx.Subscriber.
109+
110+
See:
111+
rx.Subscriber
112+
rx.Observable/create
113+
"
114+
[f]
115+
(Observable/create ^Observable$OnSubscribe (iop/action* f)))
116+
117+
(defn wrap-on-completed
118+
"Wrap handler with code that automaticaly calls rx.Observable.onCompleted."
119+
[handler]
120+
(fn [^Observer observer]
121+
(handler observer)
122+
(when-not (unsubscribed? observer)
123+
(.onCompleted observer))))
124+
125+
(defn wrap-on-error
126+
"Wrap handler with code that automaticaly calls (on-error) if an exception is thrown"
127+
[handler]
128+
(fn [^Observer observer]
129+
(try
130+
(handler observer)
131+
(catch Throwable e
132+
(when-not (unsubscribed? observer)
133+
(.onError observer e))))))
134+
112135
(defn lift
113136
"Lift the Operator op over the given Observable xs
114137
115138
Example:
116139
117140
(->> my-observable
118-
(rx/lift (rx/fn->operator ...))
141+
(rx/lift (rx/operator ...))
119142
...)
120143
121144
See:
122145
rx.Observable/lift
123-
fn->operator
146+
operator
124147
"
125148
[^Observable$Operator op ^Observable xs]
126149
(.lift xs op))
127150

151+
;################################################################################
152+
153+
(defn ^Subscription subscribe
154+
155+
([^Observable o on-next-action]
156+
(.subscribe o
157+
^Action1 (iop/action* on-next-action)))
158+
159+
([^Observable o on-next-action on-error-action]
160+
(.subscribe o
161+
^Action1 (iop/action* on-next-action)
162+
^Action1 (iop/action* on-error-action)))
163+
164+
([^Observable o on-next-action on-error-action on-completed-action]
165+
(.subscribe o
166+
^Action1 (iop/action* on-next-action)
167+
^Action1 (iop/action* on-error-action)
168+
^Action0 (iop/action* on-completed-action))))
169+
128170
(defn unsubscribe
129171
"Unsubscribe from Subscription s and return it."
130172
[^Subscription s]
@@ -158,53 +200,50 @@
158200
159201
See:
160202
rx.Observable/create
161-
fn->o
203+
observable*
162204
"
163205
[^Subscription s]
164206
(.isUnsubscribed s))
165207

166-
(defn ^Subscription fn->subscription
167-
"Create a new subscription that calls the given no-arg handler function when
168-
unsubscribe is called
208+
;################################################################################
209+
; Functions for creating Observables
210+
211+
(defn ^Observable never
212+
"Returns an Observable that never emits any values and never completes.
169213
170214
See:
171-
rx.subscriptions.Subscriptions/create
215+
rx.Observable/never
172216
"
173-
[handler]
174-
(Subscriptions/create ^Action0 (iop/action* handler)))
175-
176-
(defn ^Observable fn->o
177-
"Create an Observable from the given function.
217+
[]
218+
(Observable/never))
178219

179-
When subscribed to, (f subscriber) is called at which point, f can start emitting values, etc.
180-
The passed subscriber is of type rx.Subscriber.
220+
(defn ^Observable empty
221+
"Returns an Observable that completes immediately without emitting any values.
181222
182223
See:
183-
rx.Subscriber
184-
rx.Observable/create
224+
rx.Observable/empty
185225
"
186-
[f]
187-
(Observable/create ^Observable$OnSubscribe (iop/action* f)))
226+
[]
227+
(Observable/empty))
188228

189-
(defn wrap-on-completed
190-
"Wrap handler with code that automaticaly calls rx.Observable.onCompleted."
191-
[handler]
192-
(fn [^Observer observer]
193-
(handler observer)
194-
(when-not (unsubscribed? observer)
195-
(.onCompleted observer))))
229+
(defn ^Observable return
230+
"Returns an observable that emits a single value.
196231
197-
(defn wrap-on-error
198-
"Wrap handler with code that automaticaly calls (on-error) if an exception is thrown"
199-
[handler]
200-
(fn [^Observer observer]
201-
(try
202-
(handler observer)
203-
(catch Throwable e
204-
(when-not (unsubscribed? observer)
205-
(.onError observer e))))))
232+
See:
233+
rx.Observable/just
234+
"
235+
[value]
236+
(Observable/just value))
237+
238+
(defn ^Observable seq->o
239+
"Make an observable out of some seq-able thing. The rx equivalent of clojure.core/seq."
240+
[xs]
241+
(if xs
242+
(Observable/from ^Iterable xs)
243+
(empty)))
206244

207245
;################################################################################
246+
; Operators
208247

209248
(defn synchronize
210249
([^Observable xs]
@@ -246,44 +285,6 @@
246285
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
247286

248287

249-
;################################################################################
250-
251-
(defn ^Observable never
252-
"Returns an Observable that never emits any values and never completes.
253-
254-
See:
255-
rx.Observable/never
256-
"
257-
[]
258-
(Observable/never))
259-
260-
(defn ^Observable empty
261-
"Returns an Observable that completes immediately without emitting any values.
262-
263-
See:
264-
rx.Observable/empty
265-
"
266-
[]
267-
(Observable/empty))
268-
269-
(defn ^Observable return
270-
"Returns an observable that emits a single value.
271-
272-
See:
273-
rx.Observable/just
274-
"
275-
[value]
276-
(Observable/just value))
277-
278-
(defn ^Observable seq->o
279-
"Make an observable out of some seq-able thing. The rx equivalent of clojure.core/seq."
280-
[xs]
281-
(if xs
282-
(Observable/from ^Iterable xs)
283-
(empty)))
284-
285-
;################################################################################
286-
287288
(defn cache
288289
"caches the observable value so that multiple subscribers don't re-evaluate it.
289290
@@ -354,14 +355,14 @@
354355
"
355356
([xs] (distinct identity xs))
356357
([key-fn ^Observable xs]
357-
(let [op (fn->operator (fn [o]
358-
(let [seen (atom #{})]
359-
(->subscriber o
360-
(fn [o v]
361-
(let [key (key-fn v)]
362-
(when-not (contains? @seen key)
363-
(swap! seen conj key)
364-
(on-next o v))))))))]
358+
(let [op (operator* (fn [o]
359+
(let [seen (atom #{})]
360+
(subscriber o
361+
(fn [o v]
362+
(let [key (key-fn v)]
363+
(when-not (contains? @seen key)
364+
(swap! seen conj key)
365+
(on-next o v))))))))]
365366
(lift op xs))))
366367

367368
(defn ^Observable do
@@ -422,12 +423,12 @@
422423

423424
(defn interpose
424425
[sep xs]
425-
(let [op (fn->operator (fn [o]
426-
(let [first? (atom true)]
427-
(->subscriber o (fn [o v]
428-
(if-not (compare-and-set! first? true false)
429-
(on-next o sep))
430-
(on-next o v))))))]
426+
(let [op (operator* (fn [o]
427+
(let [first? (atom true)]
428+
(subscriber o (fn [o v]
429+
(if-not (compare-and-set! first? true false)
430+
(on-next o sep))
431+
(on-next o v))))))]
431432
(lift op xs)))
432433

433434
(defn into
@@ -499,10 +500,11 @@
499500
clojure.core/map-indexed
500501
"
501502
[f xs]
502-
(let [op (fn->operator (fn [o]
503-
(let [n (atom -1)]
504-
(->subscriber o
505-
(fn [o v] (on-next o (f (swap! n inc) v)))))))]
503+
(let [op (operator* (fn [o]
504+
(let [n (atom -1)]
505+
(subscriber o
506+
(fn [o v]
507+
(on-next o (f (swap! n inc) v)))))))]
506508
(lift op xs)))
507509

508510
(def next
@@ -768,9 +770,9 @@
768770
(rx/generator* on-next 99)
769771
"
770772
[f & args]
771-
(fn->o (-> #(apply f % args)
772-
wrap-on-completed
773-
wrap-on-error)))
773+
(observable* (-> #(apply f % args)
774+
wrap-on-completed
775+
wrap-on-error)))
774776

775777
(defmacro generator
776778
"Create an observable that executes body which should emit a sequence. bindings

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/future.clj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
"
3030
[runner f & args]
3131
{:pre [(ifn? runner) (ifn? f)]}
32-
(rx/fn->o (fn [^rx.Subscriber observer]
32+
(rx/observable* (fn [^rx.Subscriber observer]
3333
(let [wrapped (-> (fn [o]
3434
(apply f o args))
3535
rx/wrap-on-completed
3636
rx/wrap-on-error)
3737
fu (runner #(wrapped observer))]
3838
(.add observer
39-
(rx/fn->subscription #(future-cancel fu)))))))
39+
(rx/subscription #(future-cancel fu)))))))
4040

4141
(defmacro future-generator
4242
"Same as rx/generator macro except body is invoked in a separate thread.
@@ -74,13 +74,13 @@
7474
"
7575
[runner f & args]
7676
{:pre [(ifn? runner) (ifn? f)]}
77-
(rx/fn->o (fn [^rx.Subscriber observer]
77+
(rx/observable* (fn [^rx.Subscriber observer]
7878
(let [wrapped (-> #(rx/on-next % (apply f args))
7979
rx/wrap-on-completed
8080
rx/wrap-on-error)
8181
fu (runner #(wrapped observer))]
8282
(.add observer
83-
(rx/fn->subscription #(future-cancel fu)))))))
83+
(rx/subscription #(future-cancel fu)))))))
8484

8585
(defmacro future
8686
"Executes body in a separate thread and passes the single result to onNext.

0 commit comments

Comments
 (0)