Skip to content

Commit da88202

Browse files
committed
Got rid of base. It was weird.
1 parent 80ba1b3 commit da88202

File tree

8 files changed

+133
-159
lines changed

8 files changed

+133
-159
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/base.clj

Lines changed: 0 additions & 75 deletions
This file was deleted.

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/chunk.clj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
(ns rx.lang.clojure.chunk
22
(:refer-clojure :exclude [chunk])
3-
(:require [rx.lang.clojure.core :as rx]
4-
[rx.lang.clojure.base :as rx-base]))
3+
(:require [rx.lang.clojure.core :as rx]))
54

65
(def ^:private -ns- *ns*)
76
(set! *warn-on-reflection* true)

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 78 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
rest seq some sort sort-by split-with
88
take take-while throw])
99
(:require [rx.lang.clojure.interop :as iop]
10-
[rx.lang.clojure.base :as base]
1110
[rx.lang.clojure.graph :as graph]
1211
[rx.lang.clojure.realized :as realized])
1312
(:import [rx Observable Observer Subscriber Subscription Observable$Operator Observable$OnSubscribe]
@@ -156,6 +155,81 @@
156155

157156
;################################################################################
158157

158+
(defn wrap-on-completed
159+
"Wrap handler with code that automaticaly calls rx.Observable.onCompleted."
160+
[handler]
161+
(fn [^Observer observer]
162+
(handler observer)
163+
(.onCompleted observer)))
164+
165+
(defn wrap-on-error
166+
"Wrap handler with code that automaticaly calls (on-error) if an exception is thrown"
167+
[handler]
168+
(fn [^Observer observer]
169+
(try
170+
(handler observer)
171+
(catch Throwable e
172+
(.onError observer e)))))
173+
174+
(defn ^Observable merge
175+
"Observable.merge, renamed because merge means something else in Clojure
176+
177+
os is one of:
178+
179+
* An Iterable of Observables to merge
180+
* An Observable<Observable<T>> to merge
181+
182+
If you want clojure.core/merge, it's just this:
183+
184+
(rx/reduce clojure.core/merge {} maps)
185+
186+
"
187+
[os]
188+
(cond
189+
(instance? Iterable os)
190+
(Observable/merge (Observable/from ^Iterable os))
191+
(instance? Observable os)
192+
(Observable/merge ^Observable os)
193+
:else
194+
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
195+
196+
(defn ^Observable merge-delay-error
197+
"Observable.mergeDelayError, renamed because merge means something else in Clojure"
198+
[os]
199+
(cond
200+
(instance? java.util.List os)
201+
(Observable/mergeDelayError ^java.util.List os)
202+
(instance? Observable os)
203+
(Observable/mergeDelayError ^Observable os)
204+
:else
205+
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
206+
207+
(defn ^Observable zip
208+
"Observable.zip. You want map."
209+
([f ^Observable a ^Observable b] (Observable/zip a b (iop/fn* f)))
210+
([f ^Observable a ^Observable b ^Observable c] (Observable/zip a b c (iop/fn* f)))
211+
([f ^Observable a ^Observable b ^Observable c ^Observable d] (Observable/zip a b c d (iop/fn* f)))
212+
([f a b c d & more]
213+
; recurse on more and then pull everything together with 4 parameter version
214+
(zip (fn [a b c more-value]
215+
(apply f a b c more-value))
216+
a
217+
b
218+
c
219+
(apply zip vector d more))))
220+
221+
(defmacro zip-let
222+
[bindings & body]
223+
(let [pairs (clojure.core/partition 2 bindings)
224+
names (clojure.core/mapv clojure.core/first pairs)
225+
values (clojure.core/map second pairs)]
226+
`(zip (fn ~names ~@body) ~@values)))
227+
;################################################################################
228+
229+
230+
231+
232+
159233
(defn ^Observable never [] (Observable/never))
160234
(defn ^Observable empty [] (Observable/empty))
161235

@@ -288,7 +362,7 @@
288362
"Map a function over an observable sequence. Unlike clojure.core/map, only supports up
289363
to 4 simultaneous source sequences at the moment."
290364
([f ^Observable xs] (.map xs (iop/fn* f)))
291-
([f xs & observables] (apply base/zip f xs observables)))
365+
([f xs & observables] (apply zip f xs observables)))
292366

293367
(defn ^Observable mapcat
294368
"Returns an observable which, for each value x in xs, calls (f x), which must
@@ -317,23 +391,6 @@
317391
(fn [o v] (on-next o (f (swap! n inc) v)))))))]
318392
(lift op xs)))
319393

320-
; TODO which merge goes here?
321-
(defn merge
322-
"
323-
Returns an observable that emits a single map that consists of the rest of the
324-
maps emitted by the input observable conj-ed onto the first. If a key occurs
325-
in more than one map, the mapping from the latter (left-to-right) will be the
326-
mapping in the result.
327-
328-
NOTE: This is very different from rx.Observable/merge. See rx.base/merge for that
329-
one.
330-
331-
See:
332-
clojure.core/merge
333-
"
334-
[maps]
335-
(reduce clojure.core/merge {} maps))
336-
337394
(def next
338395
"Returns an observable that emits all but the first element of the input observable.
339396
@@ -545,8 +602,8 @@
545602
"
546603
[f & args]
547604
(fn->o (-> #(apply f % args)
548-
base/wrap-on-completed
549-
base/wrap-on-error)))
605+
wrap-on-completed
606+
wrap-on-error)))
550607

551608
(defmacro generator
552609
"Create an observable that executes body which should emit a sequence. bindings

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/future.clj

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
(ns rx.lang.clojure.future
22
(:refer-clojure :exclude [future])
33
(:require [rx.lang.clojure.interop :as iop]
4-
[rx.lang.clojure.core :as rx :refer [fn->o fn->subscription]]
5-
[rx.lang.clojure.base :as base]))
4+
[rx.lang.clojure.core :as rx]))
65

76
(def ^:private -ns- *ns*)
87
(set! *warn-on-reflection* true)
@@ -26,13 +25,13 @@
2625
"
2726
[runner f & args]
2827
{:pre [(ifn? runner) (ifn? f)]}
29-
(fn->o (fn [observer]
28+
(rx/fn->o (fn [observer]
3029
(let [wrapped (-> (fn [o]
3130
(apply f o args))
32-
base/wrap-on-completed
33-
base/wrap-on-error)
31+
rx/wrap-on-completed
32+
rx/wrap-on-error)
3433
fu (runner #(wrapped observer))]
35-
(fn->subscription #(future-cancel fu))))))
34+
(rx/fn->subscription #(future-cancel fu))))))
3635

3736
(defmacro future-generator
3837
"Same as rx/generator macro except body is invoked in a separate thread.
@@ -60,12 +59,12 @@
6059
"
6160
[runner f & args]
6261
{:pre [(ifn? runner) (ifn? f)]}
63-
(fn->o (fn [observer]
62+
(rx/fn->o (fn [observer]
6463
(let [wrapped (-> #(rx/on-next % (apply f args))
65-
base/wrap-on-completed
66-
base/wrap-on-error)
64+
rx/wrap-on-completed
65+
rx/wrap-on-error)
6766
fu (runner #(wrapped observer))]
68-
(fn->subscription #(future-cancel fu))))))
67+
(rx/fn->subscription #(future-cancel fu))))))
6968

7069
(defmacro future
7170
"Executes body in a separate thread and passes the single result to onNext.

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/realized.clj

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
(ns rx.lang.clojure.realized
2-
(:require [rx.lang.clojure.interop :as iop]
3-
[rx.lang.clojure.base :as rx-base]))
2+
(:require [rx.lang.clojure.interop :as iop]))
43

54
(def ^:private -ns- *ns*)
65
(set! *warn-on-reflection* true)
@@ -62,10 +61,12 @@
6261
single key maps, merging and then folding all the separate maps together. So code
6362
like this:
6463
65-
(rx/merge (rx-base/merge (->> (user-info-o user-id)
66-
(rx/map (fn [u] {:user u})))
67-
(->> (user-likes-o user-id)
68-
(rx/map (fn [u] {:likes u})))))
64+
; TODO update
65+
(->> (rx/merge (->> (user-info-o user-id)
66+
(rx/map (fn [u] {:user u})))
67+
(->> (user-likes-o user-id)
68+
(rx/map (fn [u] {:likes u}))))
69+
(rx/reduce merge {}))
6970
7071
becomes:
7172
@@ -86,8 +87,8 @@
8687
.toList
8788
(.map (iop/fn [list] {k (f list)})))))))]
8889

89-
(-> o
90-
rx-base/merge ; funnel all the observables into a single sequence
90+
(-> ^Iterable o
91+
(rx.Observable/merge) ; funnel all the observables into a single sequence
9192
(.reduce {} (iop/fn* merge))))) ; do the map merge dance
9293

9394
(defn ^rx.Observable realized-map*

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/base_test.clj

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)