Skip to content

Commit 963cfd5

Browse files
committed
Implement group-by
Note that since 02ccc4d, the val-fn variant of groupBy is unimplemented so for now an exception is thrown if it's used.
1 parent feb16d0 commit 963cfd5

File tree

3 files changed

+62
-3
lines changed

3 files changed

+62
-3
lines changed

language-adaptors/rxjava-clojure/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ Blocking operators, which are useful for testing, but should otherwise be avoide
7272
## Open Issues
7373

7474
* The missing stuff mentioned below
75-
* `group-by`
75+
* `group-by` val-fn variant isn't implemented in RxJava
7676
* There are some functions for defining customer Observables and Operators (`subscriber`, `operator*`, `observable*`). I don't think these are really enough for serious operator implementation, but I'm hesitant to guess at an abstraction at this point. These will probably change dramatically.
7777

7878
## What's Missing

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
distinct do drop drop-while
44
empty every?
55
filter first future
6+
group-by
67
interpose into
78
keep keep-indexed
89
map mapcat map-indexed
@@ -16,7 +17,9 @@
1617
Observable
1718
Observer Observable$Operator Observable$OnSubscribe
1819
Subscriber Subscription]
19-
[rx.observables BlockingObservable]
20+
[rx.observables
21+
BlockingObservable
22+
GroupedObservable]
2023
[rx.subscriptions Subscriptions]
2124
[rx.util.functions Action0 Action1 Func0 Func1 Func2]))
2225

@@ -453,7 +456,33 @@
453456
[^Observable xs]
454457
(.takeFirst xs))
455458

456-
; TODO group-by
459+
(defn ^Observable group-by
460+
"Returns an Observable of clojure.lang.MapEntry where the key is the result of
461+
(key-fn x) and the val is an Observable of (val-fn x) for each key. If val-fn is
462+
omitted, it defaults to identity.
463+
464+
This returns a clojure.lang.MapEntry rather than rx.observables.GroupedObservable
465+
for some vague consistency with clojure.core/group-by and so that clojure.core/key,
466+
clojure.core/val and destructuring will work as expected.
467+
468+
See:
469+
clojure.core/group-by
470+
rx.Observable/groupBy
471+
rx.observables.GroupedObservable
472+
"
473+
([key-fn ^Observable xs]
474+
(->> (.groupBy xs (iop/fn* key-fn))
475+
(map (fn [^GroupedObservable go]
476+
(clojure.lang.MapEntry. (.getKey go) go)))))
477+
([key-fn val-fn ^Observable xs]
478+
; TODO reinstate once this is implemented
479+
; see https://github.com/Netflix/RxJava/commit/02ccc4d727a9297f14219549208757c6e0efce2a
480+
(throw (UnsupportedOperationException. "groupBy with val-fn is currently unimplemented in RxJava"))
481+
(->> (.groupBy xs
482+
(iop/fn* key-fn)
483+
(iop/fn* val-fn))
484+
(map (fn [^GroupedObservable go]
485+
(clojure.lang.MapEntry. (.getKey go) go))))))
457486

458487
(defn interpose
459488
[sep xs]

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,36 @@
256256
(is (= []
257257
(b/into [] (rx/first (rx/empty))))))
258258

259+
(deftest test-group-by
260+
(let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}]]
261+
(testing "with just a key-fn"
262+
(is (= [[:a {:k :a :v 1}]
263+
[:b {:k :b :v 2}]
264+
[:a {:k :a :v 3}]
265+
[:c {:k :c :v 4}]]
266+
(->> xs
267+
(rx/seq->o)
268+
(rx/group-by :k)
269+
(rx/mapcat (fn [[k vo :as me]]
270+
(is (instance? clojure.lang.MapEntry me))
271+
(rx/map #(vector k %) vo)))
272+
(b/into [])))))
273+
274+
; TODO reinstate once this is implemented
275+
; see https://github.com/Netflix/RxJava/commit/02ccc4d727a9297f14219549208757c6e0efce2a
276+
#_(testing "with a val-fn"
277+
(is (= [[:a 1]
278+
[:b 2]
279+
[:a 3]
280+
[:c 4]]
281+
(->> xs
282+
(rx/seq->o)
283+
(rx/group-by :k :v)
284+
(rx/mapcat (fn [[k vo :as me]]
285+
(is (instance? clojure.lang.MapEntry me))
286+
(rx/map #(vector k %) vo)))
287+
(b/into [])))))
288+
))
259289
(deftest test-interpose
260290
(is (= (interpose \, [1 2 3])
261291
(b/into [] (rx/interpose \, (rx/seq->o [1 2 3]))))))

0 commit comments

Comments
 (0)