Skip to content

Commit 3714db9

Browse files
committed
Implement interleave and interleave*
1 parent 963cfd5 commit 3714db9

File tree

2 files changed

+58
-4
lines changed
  • language-adaptors/rxjava-clojure/src

2 files changed

+58
-4
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
empty every?
55
filter first future
66
group-by
7-
interpose into
7+
interleave interpose into
88
keep keep-indexed
99
map mapcat map-indexed
1010
merge next nth partition reduce reductions
@@ -25,7 +25,7 @@
2525

2626
(set! *warn-on-reflection* true)
2727

28-
(declare concat map map-indexed reduce take take-while)
28+
(declare concat* concat map* map map-indexed reduce take take-while)
2929

3030
(defn ^Func1 fn->predicate
3131
"Turn f into a predicate that returns true/false like Rx predicates should"
@@ -484,6 +484,33 @@
484484
(map (fn [^GroupedObservable go]
485485
(clojure.lang.MapEntry. (.getKey go) go))))))
486486

487+
(defn interleave*
488+
"Returns an Observable of the first item in each Observable emitted by observables, then
489+
the second etc.
490+
491+
observables is an Observable of Observables
492+
493+
See:
494+
interleave
495+
clojure.core/interleave
496+
"
497+
[observables]
498+
(->> (map* #(seq->o %&) observables)
499+
(concat*)))
500+
501+
(defn interleave
502+
"Returns an Observable of the first item in each Observable, then the second etc.
503+
504+
Each argument is an individual Observable
505+
506+
See:
507+
observable*
508+
clojure.core/interleave
509+
"
510+
[o1 & observables]
511+
(->> (apply map #(seq->o %&) o1 observables)
512+
(concat*)))
513+
487514
(defn interpose
488515
[sep xs]
489516
(let [op (operator* (fn [o]

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,35 @@
284284
(rx/mapcat (fn [[k vo :as me]]
285285
(is (instance? clojure.lang.MapEntry me))
286286
(rx/map #(vector k %) vo)))
287-
(b/into [])))))
288-
))
287+
(b/into [])))))))
288+
289+
(deftest test-interleave
290+
(are [inputs] (= (apply interleave inputs)
291+
(->> (apply rx/interleave (map rx/seq->o inputs))
292+
(b/into [])))
293+
[[] []]
294+
[[] [1]]
295+
[(range 5) (range 10) (range 10) (range 3)]
296+
[(range 50) (range 10)]
297+
[(range 5) (range 10 60) (range 10) (range 50)])
298+
299+
; one-arg case, not supported by clojure.core/interleave
300+
(is (= (range 10)
301+
(->> (rx/interleave (rx/seq->o (range 10)))
302+
(b/into [])))))
303+
304+
(deftest test-interleave*
305+
(are [inputs] (= (apply interleave inputs)
306+
(->> (rx/interleave* (->> inputs
307+
(map rx/seq->o)
308+
(rx/seq->o)))
309+
(b/into [])))
310+
[[] []]
311+
[[] [1]]
312+
[(range 5) (range 10) (range 10) (range 3)]
313+
[(range 50) (range 10)]
314+
[(range 5) (range 10 60) (range 10) (range 50)]))
315+
289316
(deftest test-interpose
290317
(is (= (interpose \, [1 2 3])
291318
(b/into [] (rx/interpose \, (rx/seq->o [1 2 3]))))))

0 commit comments

Comments
 (0)