Skip to content

Commit 9bc5a85

Browse files
committed
Implemented multi-sequence mapcat
1 parent 9269d4e commit 9bc5a85

File tree

2 files changed

+41
-2
lines changed
  • language-adaptors/rxjava-clojure/src

2 files changed

+41
-2
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,16 +597,35 @@
597597
(Observable/zip ^Iterable observables
598598
^rx.functions.FuncN (iop/fnN* f)))
599599

600+
(defn ^Observable mapcat*
601+
"Same as multi-arg mapcat, but input is an Observable of Observables.
602+
603+
See:
604+
mapcat
605+
clojure.core/mapcat
606+
"
607+
[f ^Observable xs]
608+
(->> xs
609+
(map* f)
610+
(concat*)))
611+
600612
(defn ^Observable mapcat
601613
"Returns an observable which, for each value x in xs, calls (f x), which must
602614
return an Observable. The resulting observables are concatentated together
603615
into one observable.
604616
617+
If multiple Observables are given, the arguments to f are the first item from
618+
each observable, then the second item, etc.
619+
605620
See:
606621
clojure.core/mapcat
607622
rx.Observable/flatMap
608623
"
609-
([f ^Observable xs] (.flatMap xs (iop/fn* f))))
624+
[f & xs]
625+
(if (clojure.core/next xs)
626+
(mapcat* f (seq->o xs))
627+
; use built-in flatMap for single-arg case
628+
(.flatMap ^Observable (clojure.core/first xs) (iop/fn* f))))
610629

611630
(defn map-indexed
612631
"Returns an observable that invokes (f index value) for each value of the input

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,11 +423,31 @@
423423
(catch java.io.FileNotFoundException e
424424
(is (= :b (-> e .getCause .getValue)))))))
425425

426+
(deftest test-mapcat*
427+
(let [f (fn [a b c d e]
428+
[(+ a b) (+ c d) e])]
429+
(is (= (->> (range 5)
430+
(map (fn [_] (range 5)))
431+
(apply mapcat f))
432+
(->> (range 5)
433+
(map (fn [_] (rx/seq->o (range 5))))
434+
(rx/seq->o)
435+
(rx/mapcat* (fn [& args] (rx/seq->o (apply f args))))
436+
(b/into []))))))
437+
426438
(deftest test-mapcat
427439
(let [f (fn [v] [v (* v v)])
428440
xs (range 10)]
429441
(is (= (mapcat f xs)
430-
(b/into [] (rx/mapcat (comp rx/seq->o f) (rx/seq->o xs)))))))
442+
(b/into [] (rx/mapcat (comp rx/seq->o f) (rx/seq->o xs))))))
443+
444+
(let [f (fn [a b] [a b (* a b)])
445+
as (range 10)
446+
bs (range 15)]
447+
(is (= (mapcat f as bs)
448+
(b/into [] (rx/mapcat (comp rx/seq->o f)
449+
(rx/seq->o as)
450+
(rx/seq->o bs)))))))
431451

432452
(deftest test-next
433453
(let [in [:q :r :s :t :u]]

0 commit comments

Comments
 (0)