Skip to content

Commit 1b13e56

Browse files
committed
Initial import of indigena-rx
Imported code, fixed namespaces and adusted one test for changes since 0.14.1.
1 parent c1bd534 commit 1b13e56

File tree

14 files changed

+1873
-0
lines changed

14 files changed

+1873
-0
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
(ns rx.lang.clojure.base
2+
"Generic, low-level rx helpers."
3+
(:refer-clojure :exclude [merge])
4+
(:require [rx.lang.clojure.interop :as iop])
5+
(:import [rx Observable Observer Subscription]
6+
[rx.observables BlockingObservable]
7+
[rx.subscriptions Subscriptions]))
8+
9+
(def ^:private -ns- *ns*)
10+
(set! *warn-on-reflection* true)
11+
12+
(defn wrap-on-completed
13+
"Wrap handler with code that automaticaly calls rx.Observable.onCompleted."
14+
[handler]
15+
(fn [^Observer observer]
16+
(handler observer)
17+
(.onCompleted observer)))
18+
19+
(defn wrap-on-error
20+
"Wrap handler with code that automaticaly calls (on-error) if an exception is thrown"
21+
[handler]
22+
(fn [^Observer observer]
23+
(try
24+
(handler observer)
25+
(catch Exception e
26+
(.onError observer e)))))
27+
28+
(defn ^Observable merge
29+
"Observable.merge, renamed because merge means something else in Clojure
30+
31+
os is one of:
32+
33+
* An Iterable of Observables to merge
34+
* An Observable<Observable<T>> to merge
35+
"
36+
[os]
37+
(cond
38+
(instance? Iterable os)
39+
(Observable/merge (Observable/from ^Iterable os))
40+
(instance? Observable os)
41+
(Observable/merge ^Observable os)
42+
:else
43+
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
44+
45+
(defn ^Observable merge-delay-error
46+
"Observable.mergeDelayError, renamed because merge means something else in Clojure"
47+
[os]
48+
(cond
49+
(instance? java.util.List os)
50+
(Observable/mergeDelayError ^java.util.List os)
51+
(instance? Observable os)
52+
(Observable/mergeDelayError ^Observable os)
53+
:else
54+
(throw (IllegalArgumentException. (str "Don't know how to merge " (type os))))))
55+
56+
(defn ^Observable zip
57+
"Observable.zip. You want map."
58+
([f ^Observable a ^Observable b] (Observable/zip a b (iop/fn* f)))
59+
([f ^Observable a ^Observable b ^Observable c] (Observable/zip a b c (iop/fn* f)))
60+
([f ^Observable a ^Observable b ^Observable c ^Observable d] (Observable/zip a b c d (iop/fn* f)))
61+
([f a b c d & more]
62+
; recurse on more and then pull everything together with 4 parameter version
63+
(zip (fn [a b c more-value]
64+
(apply f a b c more-value))
65+
a
66+
b
67+
c
68+
(apply zip vector d more))))
69+
70+
(defmacro zip-let
71+
[bindings & body]
72+
(let [pairs (clojure.core/partition 2 bindings)
73+
names (clojure.core/mapv clojure.core/first pairs)
74+
values (clojure.core/map second pairs)]
75+
`(zip (fn ~names ~@body) ~@values)))
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
(ns rx.lang.clojure.blocking
2+
"Blocking operators and functions. These should never be used in
3+
production code except at the end of an async chain to convert from
4+
rx land back to sync land. For example, to produce a servlet response."
5+
(:refer-clojure :exclude [first into])
6+
(:require [rx.lang.clojure.core :as rx])
7+
(:import [rx Observable]
8+
[rx.observables BlockingObservable]))
9+
10+
(def ^:private -ns- *ns*)
11+
(set! *warn-on-reflection* true)
12+
13+
(defn ^BlockingObservable ->blocking
14+
"Convert an Observable to a BlockingObservable"
15+
[^Observable o]
16+
(.toBlockingObservable o))
17+
18+
(defn first
19+
"*Blocks* and waits for the first value emitted by the given observable.
20+
21+
If an error is produced it is thrown.
22+
23+
See:
24+
clojure.core/first
25+
rx/first
26+
"
27+
[observable]
28+
(let [result (clojure.core/promise)]
29+
(rx/subscribe (->> observable (rx/take 1))
30+
#(clojure.core/deliver result [:value %])
31+
#(clojure.core/deliver result [:error %])
32+
#(clojure.core/deliver result nil))
33+
(if-let [[type v] @result]
34+
(case type
35+
:value v
36+
:error (throw v)))))
37+
38+
(defn single
39+
"*Blocks* and waits for the first value emitted by the given observable.
40+
41+
An error is thrown if more then one value is produced.
42+
"
43+
[observable]
44+
(.single (->blocking observable)))
45+
46+
(defn into
47+
"*Blocks* and pours the elements emitted by the given observables into
48+
to.
49+
50+
If an error is produced it is thrown.
51+
52+
See:
53+
clojure.core/into
54+
rx/into
55+
"
56+
[to from-observable]
57+
(first (rx/into to from-observable)))
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
(ns rx.lang.clojure.chunk
2+
(:refer-clojure :exclude [chunk])
3+
(:require [rx.lang.clojure.core :as rx]
4+
[rx.lang.clojure.base :as rx-base]))
5+
6+
(def ^:private -ns- *ns*)
7+
(set! *warn-on-reflection* true)
8+
9+
(defn chunk
10+
"Same as rx.Observable.merge(Observable<Observable<T>>) but the input Observables
11+
are \"chunked\" so that at most chunk-size of them are \"in flight\" at any given
12+
time.
13+
14+
The order of the input Observables is not preserved.
15+
16+
The main purpose here is to allow a large number of Hystrix observables to
17+
be processed in a controlled way so that the Hystrix execution queues aren't
18+
overwhelmed.
19+
20+
Example:
21+
22+
(->> users
23+
(map #(-> (GetUserCommand. %) .toObservable))
24+
(chunk 10))
25+
26+
See:
27+
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#merge(rx.Observable)
28+
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#mergeDelayError(rx.Observable)
29+
"
30+
([chunk-size observable-source] (chunk chunk-size {} observable-source))
31+
([chunk-size options observable-source]
32+
(let [new-state-atom #(atom {:in-flight #{} ; observables currently in-flight
33+
:buffered [] ; observables waiting to be emitted
34+
:complete false ; true if observable-source is complete
35+
:observer % }) ; the observer
36+
ps #(do (printf "%s/%d/%d%n"
37+
(:complete %)
38+
(-> % :buffered count)
39+
(-> % :in-flight count))
40+
(flush))
41+
42+
; Given the current state, returns [action new-state]. action is the
43+
; next Observable or Throwable to emit, or :complete if we're done.
44+
next-state (fn [{:keys [complete buffered in-flight] :as old}]
45+
(cond
46+
(empty? buffered) [complete old]
47+
48+
(< (count in-flight) chunk-size) (let [next-o (first buffered)]
49+
[next-o
50+
(-> old
51+
(update-in [:buffered] next)
52+
(update-in [:in-flight] conj next-o))])
53+
54+
:else [nil old]))
55+
56+
; Advance the state, performing side-effects as necessary
57+
advance! (fn advance! [state-atom]
58+
(let [old-state @state-atom
59+
[action new-state] (next-state old-state)]
60+
(if (compare-and-set! state-atom old-state new-state)
61+
(let [observer (:observer new-state)]
62+
(if (:debug options) (ps new-state))
63+
(cond
64+
(= :complete action)
65+
(rx/on-completed observer)
66+
67+
(instance? Throwable action)
68+
(rx/on-error observer action)
69+
70+
(instance? rx.Observable action)
71+
(rx/on-next observer
72+
(.finallyDo ^rx.Observable action
73+
(reify rx.util.functions.Action0
74+
(call [this]
75+
(swap! state-atom update-in [:in-flight] disj action)
76+
(advance! state-atom)))))))
77+
(recur state-atom))))
78+
79+
subscribe (fn [state-atom]
80+
(rx/subscribe observable-source
81+
(fn [o]
82+
(swap! state-atom update-in [:buffered] conj o)
83+
(advance! state-atom))
84+
85+
(fn [e]
86+
(swap! state-atom assoc :complete e)
87+
(advance! state-atom))
88+
89+
(fn []
90+
(swap! state-atom assoc :complete :complete)
91+
(advance! state-atom))))
92+
observable (rx/fn->o (fn [observer]
93+
(subscribe (new-state-atom observer)))) ]
94+
(if (:delay-error? options)
95+
(rx.Observable/mergeDelayError observable)
96+
(rx.Observable/merge observable)))))
97+

0 commit comments

Comments
 (0)