Skip to content

Commit 7f4f07e

Browse files
committed
Update merge/merge-delay-error impls.
Split merge into merge/merge* for consistency with other functions that can take one or more observables or observable of observables.
1 parent 1ffe5fb commit 7f4f07e

File tree

2 files changed

+63
-37
lines changed
  • language-adaptors/rxjava-clojure/src

2 files changed

+63
-37
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -288,39 +288,45 @@
288288
([lock ^Observable xs]
289289
(.synchronize xs lock)))
290290

291-
(defn ^Observable merge
292-
"Observable.merge, renamed because merge means something else in Clojure
291+
(defn merge*
292+
"Merge an Observable of Observables into a single Observable
293+
294+
If you want clojure.core/merge, it's just this:
293295
294-
os is one of:
296+
(rx/reduce clojure.core/merge {} maps)
295297
296-
* An Iterable of Observables to merge
297-
* An Observable<Observable<T>> to merge
298+
See:
299+
merge
300+
merge-delay-error*
301+
rx.Observable/merge
302+
"
303+
[^Observable xs]
304+
(Observable/merge xs))
305+
306+
(defn ^Observable merge
307+
"Merge one or more Observables into a single observable.
298308
299309
If you want clojure.core/merge, it's just this:
300310
301311
(rx/reduce clojure.core/merge {} maps)
302312
313+
See:
314+
merge*
315+
merge-delay-error
316+
rx.Observable/merge
303317
"
304-
[os]
305-
(cond
306-
(instance? Iterable os)
307-
(Observable/merge (Observable/from ^Iterable os))
308-
(instance? Observable os)
309-
(Observable/merge ^Observable os)
310-
:else
311-
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
318+
[& os]
319+
(merge* (seq->o os)))
312320

313-
(defn ^Observable merge-delay-error
314-
"Observable.mergeDelayError"
315-
[os]
316-
(cond
317-
(instance? java.util.List os)
318-
(Observable/mergeDelayError ^java.util.List os)
319-
(instance? Observable os)
320-
(Observable/mergeDelayError ^Observable os)
321-
:else
322-
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
321+
(defn ^Observable merge-delay-error*
322+
"Same as merge*, but all values are emitted before errors are propagated"
323+
[^Observable xs]
324+
(Observable/mergeDelayError xs))
323325

326+
(defn ^Observable merge-delay-error
327+
"Same as merge, but all values are emitted before errors are propagated"
328+
[& os]
329+
(merge-delay-error* (seq->o os)))
324330

325331
(defn cache
326332
"caches the observable value so that multiple subscribers don't re-evaluate it.

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -111,20 +111,40 @@
111111
(rx/synchronize lock)
112112
(b/into []))))))
113113

114-
(deftest test-merge
115-
(is (= [[1 3 5] [2 4 6]]
116-
(let [r (b/into []
117-
(rx/merge [(f/future-generator f/default-runner [o]
118-
(doseq [x [1 3 5]]
119-
(Thread/sleep 10)
120-
(rx/on-next o x)))
121-
(f/future-generator f/default-runner [o]
122-
(doseq [x [2 4 6]]
123-
(Thread/sleep 10)
124-
(rx/on-next o x)))]))]
125-
; make sure each sequence maintained original order
126-
[(keep #{1 3 5} r)
127-
(keep #{2 4 6} r) ]))))
114+
(let [expected-result [[1 3 5] [2 4 6]]
115+
sleepy-o #(f/future-generator f/default-runner [o]
116+
(doseq [x %]
117+
(Thread/sleep 10)
118+
(rx/on-next o x)))
119+
make-inputs (fn [] (mapv sleepy-o expected-result))
120+
make-output (fn [r] [(keep #{1 3 5} r)
121+
(keep #{2 4 6} r)])]
122+
(deftest test-merge*
123+
(is (= expected-result
124+
(->> (make-inputs)
125+
(rx/seq->o)
126+
(rx/merge*)
127+
(b/into [])
128+
(make-output)))))
129+
(deftest test-merge
130+
(is (= expected-result
131+
(->> (make-inputs)
132+
(apply rx/merge)
133+
(b/into [])
134+
(make-output)))))
135+
(deftest test-merge-delay-error*
136+
(is (= expected-result
137+
(->> (make-inputs)
138+
(rx/seq->o)
139+
(rx/merge-delay-error*)
140+
(b/into [])
141+
(make-output)))))
142+
(deftest test-merge-delay-error
143+
(is (= expected-result
144+
(->> (make-inputs)
145+
(apply rx/merge-delay-error)
146+
(b/into [])
147+
(make-output))))))
128148

129149
(deftest test-generator
130150
(testing "calls on-completed automatically"

0 commit comments

Comments
 (0)