Skip to content

Commit bfc03d6

Browse files
committed
Some more core operators
1 parent 236cbdb commit bfc03d6

File tree

2 files changed

+283
-81
lines changed
  • language-adaptors/rxjava-clojure/src

2 files changed

+283
-81
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 194 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
(ns rx.lang.clojure.core
2-
(:refer-clojure :exclude [concat cons do drop drop-while empty
2+
(:refer-clojure :exclude [concat cons count cycle
3+
distinct do drop drop-while
4+
empty every?
35
filter first future
4-
interpose into keep keep-indexed
6+
interpose into
7+
keep keep-indexed
58
map mapcat map-indexed
6-
merge next partition reduce reductions
9+
merge next nth partition reduce reductions
710
rest seq some sort sort-by split-with
811
take take-while throw])
912
(:require [rx.lang.clojure.interop :as iop]
@@ -128,6 +131,28 @@
128131
(.unsubscribe s)
129132
s)
130133

134+
(defn subscribe-on
135+
"Cause subscriptions to the given observable to happen on the given scheduler.
136+
137+
Returns a new Observable.
138+
139+
See:
140+
rx.Observable/subscribeOn
141+
"
142+
[^rx.Scheduler s ^Observable xs]
143+
(.subscribeOn xs s))
144+
145+
(defn unsubscribe-on
146+
"Cause unsubscriptions from the given observable to happen on the given scheduler.
147+
148+
Returns a new Observable.
149+
150+
See:
151+
rx.Observable/unsubscribeOn
152+
"
153+
[^rx.Scheduler s ^Observable xs]
154+
(.unsubscribeOn xs s))
155+
131156
(defn unsubscribed?
132157
"Returns true if the given Subscription (or Subscriber) is unsubscribed.
133158
@@ -161,8 +186,6 @@
161186
[f]
162187
(Observable/create ^Observable$OnSubscribe (iop/action* f)))
163188

164-
;################################################################################
165-
166189
(defn wrap-on-completed
167190
"Wrap handler with code that automaticaly calls rx.Observable.onCompleted."
168191
[handler]
@@ -181,6 +204,14 @@
181204
(when-not (unsubscribed? observer)
182205
(.onError observer e))))))
183206

207+
;################################################################################
208+
209+
(defn synchronize
210+
([^Observable xs]
211+
(.synchronize xs))
212+
([lock ^Observable xs]
213+
(.synchronize xs lock)))
214+
184215
(defn ^Observable merge
185216
"Observable.merge, renamed because merge means something else in Clojure
186217
@@ -214,26 +245,6 @@
214245
:else
215246
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
216247

217-
(defn ^Observable zip
218-
"rx.Observable.zip. You want map."
219-
([f ^Observable a ^Observable b] (Observable/zip a b (iop/fn* f)))
220-
([f ^Observable a ^Observable b ^Observable c] (Observable/zip a b c (iop/fn* f)))
221-
([f ^Observable a ^Observable b ^Observable c ^Observable d] (Observable/zip a b c d (iop/fn* f)))
222-
([f a b c d & more]
223-
; recurse on more and then pull everything together with 4 parameter version
224-
(zip (fn [a b c more-value]
225-
(apply f a b c more-value))
226-
a
227-
b
228-
c
229-
(apply zip vector d more))))
230-
231-
(defmacro zip-let
232-
[bindings & body]
233-
(let [pairs (clojure.core/partition 2 bindings)
234-
names (clojure.core/mapv clojure.core/first pairs)
235-
values (clojure.core/map second pairs)]
236-
`(zip (fn ~names ~@body) ~@values)))
237248

238249
;################################################################################
239250

@@ -309,6 +320,50 @@
309320
[^Observable os]
310321
(Observable/concat os))
311322

323+
(defn count
324+
"Returns an Observable that emits the number of items is xs as a long.
325+
326+
See:
327+
rx.Observable/longCount
328+
"
329+
[^Observable xs]
330+
(.longCount xs))
331+
332+
(defn cycle
333+
"Returns an Observable that emits the items of xs repeatedly, forever.
334+
335+
TODO: Other sigs.
336+
337+
See:
338+
rx.Observable/repeat
339+
clojure.core/cycle
340+
"
341+
[^Observable xs]
342+
(.repeat xs))
343+
344+
(defn distinct
345+
"Returns an Observable of the elements of Observable xs with duplicates
346+
removed. key-fn, if provided, is a one arg function that determines the
347+
key used to determined duplicates. key-fn defaults to identity.
348+
349+
This implementation doesn't use rx.Observable/distinct because it doesn't
350+
honor Clojure's equality semantics.
351+
352+
See:
353+
clojure.core/distinct
354+
"
355+
([xs] (distinct identity xs))
356+
([key-fn ^Observable xs]
357+
(let [op (fn->operator (fn [o]
358+
(let [seen (atom #{})]
359+
(->subscriber o
360+
(fn [o v]
361+
(let [key (key-fn v)]
362+
(when-not (contains? @seen key)
363+
(swap! seen conj key)
364+
(on-next o v))))))))]
365+
(lift op xs))))
366+
312367
(defn ^Observable do
313368
"Returns a new Observable that, for each x in Observable xs, executes (do-fn x),
314369
presumably for its side effects, and then passes x along unchanged.
@@ -318,14 +373,17 @@
318373
319374
Example:
320375
321-
(->> (rx/seq->o [1 2 3])
322-
(rx/do println)
323-
...)
376+
(->> (rx/seq->o [1 2 3])
377+
(rx/do println)
378+
...)
324379
325380
Will print 1, 2, 3.
381+
382+
See:
383+
rx.Observable/doOnNext
326384
"
327-
[do-fn xs]
328-
(map #(do (do-fn %) %) xs))
385+
[do-fn ^Observable xs]
386+
(.doOnNext xs (iop/action* do-fn)))
329387

330388
(defn ^Observable drop
331389
[n ^Observable xs]
@@ -335,6 +393,17 @@
335393
[p ^Observable xs]
336394
(.skipWhile xs (fn->predicate p)))
337395

396+
(defn ^Observable every?
397+
"Returns an Observable that emits a single true value if (p x) is true for
398+
all x in xs. Otherwise emits false.
399+
400+
See:
401+
clojure.core/every?
402+
rx.Observable/all
403+
"
404+
[p ^Observable xs]
405+
(.all xs (fn->predicate p)))
406+
338407
(defn ^Observable filter
339408
[p ^Observable xs]
340409
(.filter xs (fn->predicate p)))
@@ -349,6 +418,8 @@
349418
[^Observable xs]
350419
(.takeFirst xs))
351420

421+
; TODO group-by
422+
352423
(defn interpose
353424
[sep xs]
354425
(let [op (fn->operator (fn [o]
@@ -380,11 +451,34 @@
380451
[f xs]
381452
(filter (complement nil?) (map-indexed f xs)))
382453

454+
(defn ^Observable map*
455+
"Map a function over an Observable of Observables.
456+
457+
Each item from the first emitted Observable is the first arg, each
458+
item from the second emitted Observable is the second arg, and so on.
459+
460+
See:
461+
map
462+
clojure.core/map
463+
rx.Observable/zip
464+
"
465+
[f ^Observable observable]
466+
(Observable/zip observable
467+
^rx.functions.FuncN (iop/fnN* f)))
468+
383469
(defn ^Observable map
384-
"Map a function over an observable sequence. Unlike clojure.core/map, only supports up
385-
to 4 simultaneous source sequences at the moment."
386-
([f ^Observable xs] (.map xs (iop/fn* f)))
387-
([f xs & observables] (apply zip f xs observables)))
470+
"Map a function over one or more observable sequences.
471+
472+
Each item from the first Observable is the first arg, each item
473+
from the second Observable is the second arg, and so on.
474+
475+
See:
476+
clojure.core/map
477+
rx.Observable/zip
478+
"
479+
[f & observables]
480+
(Observable/zip ^Iterable observables
481+
^rx.functions.FuncN (iop/fnN* f)))
388482

389483
(defn ^Observable mapcat
390484
"Returns an observable which, for each value x in xs, calls (f x), which must
@@ -393,11 +487,9 @@
393487
394488
See:
395489
clojure.core/mapcat
396-
rx.Observable/mapMany
490+
rx.Observable/flatMap
397491
"
398-
([f ^Observable xs] (.mapMany xs (iop/fn* f)))
399-
; TODO multi-arg version
400-
)
492+
([f ^Observable xs] (.flatMap xs (iop/fn* f))))
401493

402494
(defn map-indexed
403495
"Returns an observable that invokes (f index value) for each value of the input
@@ -408,9 +500,9 @@
408500
"
409501
[f xs]
410502
(let [op (fn->operator (fn [o]
411-
(let [n (atom -1)]
412-
(->subscriber o
413-
(fn [o v] (on-next o (f (swap! n inc) v)))))))]
503+
(let [n (atom -1)]
504+
(->subscriber o
505+
(fn [o v] (on-next o (f (swap! n inc) v)))))))]
414506
(lift op xs)))
415507

416508
(def next
@@ -421,7 +513,19 @@
421513
"
422514
(partial drop 1))
423515

424-
; TODO partition. Use Buffer whenever it's implemented.
516+
(defn nth
517+
"Returns an Observable that emits the value at the index in the given
518+
Observable. nth throws an IndexOutOfBoundsException unless not-found
519+
is supplied.
520+
521+
Note that the Observable is the *first* arg!
522+
"
523+
([^Observable xs index]
524+
(.elementAt xs index))
525+
([^Observable xs index not-found]
526+
(.elementAtOrDefault xs index not-found)))
527+
528+
; TODO partition. Use window
425529

426530
(defn ^Observable reduce
427531
([f ^Observable xs] (.reduce xs (iop/fn* f)))
@@ -448,36 +552,76 @@
448552
(filter identity)
449553
first))
450554

451-
(defn sort
452-
"Returns an observable that emits a single value which is a sorted sequence
555+
(defn sorted-list
556+
"Returns an observable that emits a *single value* which is a sorted List
453557
of the items in coll, where the sort order is determined by comparing
454558
items. If no comparator is supplied, uses compare. comparator must
455559
implement java.util.Comparator.
456560
561+
Use sort if you don't want the sequence squashed down to a List.
562+
457563
See:
458-
clojure.core/sort
564+
rx.Observable/toSortedList
565+
sort
459566
"
460-
([coll] (sort clojure.core/compare coll))
567+
([coll] (sorted-list clojure.core/compare coll))
461568
([comp ^Observable coll]
462569
(.toSortedList coll (iop/fn [a b]
463570
; force to int so rxjava doesn't have a fit
464571
(int (comp a b))))))
465572

466-
(defn sort-by
467-
"Returns an observable that emits a single value which is a sorted sequence
573+
(defn sorted-list-by
574+
"Returns an observable that emits a *single value* which is a sorted List
468575
of the items in coll, where the sort order is determined by comparing
469576
(keyfn item). If no comparator is supplied, uses compare. comparator must
470577
implement java.util.Comparator.
471578
579+
Use sort-by if you don't want the sequence squashed down to a List.
580+
472581
See:
473-
clojure.core/sort-by
582+
rx.Observable/toSortedList
583+
sort-by
474584
"
475-
([keyfn coll] (sort-by keyfn clojure.core/compare coll))
585+
([keyfn coll] (sorted-list-by keyfn clojure.core/compare coll))
476586
([keyfn comp ^Observable coll]
477587
(.toSortedList coll (iop/fn [a b]
478588
; force to int so rxjava doesn't have a fit
479589
(int (comp (keyfn a) (keyfn b)))))))
480590

591+
(defn sort
592+
"Returns an observable that emits the items in xs, where the sort order is
593+
determined by comparing items. If no comparator is supplied, uses compare.
594+
comparator must implement java.util.Comparator.
595+
596+
See:
597+
sorted-list
598+
clojure.core/sort
599+
"
600+
([xs]
601+
(->> xs
602+
(sorted-list)
603+
(mapcat seq->o)))
604+
([comp xs]
605+
(->> xs
606+
(sorted-list comp)
607+
(mapcat seq->o))))
608+
609+
(defn sort-by
610+
"Returns an observable that emits the items in xs, where the sort order is
611+
determined by comparing (keyfn item). If no comparator is supplied, uses
612+
compare. comparator must implement java.util.Comparator.
613+
614+
See:
615+
clojure.core/sort-by
616+
"
617+
([keyfn xs]
618+
(->> (sorted-list-by keyfn xs)
619+
(mapcat seq->o)))
620+
([keyfn comp ^Observable xs]
621+
(->> xs
622+
(sorted-list-by keyfn comp)
623+
(mapcat seq->o))))
624+
481625
(defn split-with
482626
"Returns an observable that emits a pair of observables
483627

0 commit comments

Comments
 (0)