Skip to content

Commit 829f016

Browse files
committed
Operators: switchCase (Case), ifThen (If), doWhile (DoWhile), WhileDo (While)
1 parent a252dca commit 829f016

File tree

3 files changed

+818
-0
lines changed

3 files changed

+818
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import rx.operators.OperationCast;
4242
import rx.operators.OperationCombineLatest;
4343
import rx.operators.OperationConcat;
44+
import rx.operators.OperationConditionals;
4445
import rx.operators.OperationDebounce;
4546
import rx.operators.OperationDefaultIfEmpty;
4647
import rx.operators.OperationDefer;
@@ -1923,6 +1924,128 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
19231924
return create(OperationSwitch.switchDo(sequenceOfSequences));
19241925
}
19251926

1927+
/**
1928+
* Return an Observable that subscribes to an observable sequence
1929+
* chosen from a map of observables via a selector function or to an
1930+
* empty observable.
1931+
* @param <K> the case key type
1932+
* @param <R> the result value type
1933+
* @param caseSelector the function that produces a case key when an Observer subscribes
1934+
* @param mapOfCases a map that maps a case key to an observable sequence
1935+
* @return an Observable that subscribes to an observable sequence
1936+
* chosen from a map of observables via a selector function or to an
1937+
* empty observable
1938+
*/
1939+
public static <K, R> Observable<R> switchCase(Func0<? extends K> caseSelector,
1940+
Map<? super K, ? extends Observable<? extends R>> mapOfCases) {
1941+
return switchCase(caseSelector, mapOfCases, Observable.<R>empty());
1942+
}
1943+
1944+
/**
1945+
* Return an Observable that subscribes to an observable sequence
1946+
* chosen from a map of observables via a selector function or to an
1947+
* empty observable which runs on the given scheduler.
1948+
* @param <K> the case key type
1949+
* @param <R> the result value type
1950+
* @param caseSelector the function that produces a case key when an Observer subscribes
1951+
* @param mapOfCases a map that maps a case key to an observable sequence
1952+
* @param scheduler the scheduler where the empty observable is observed
1953+
* @return an Observable that subscribes to an observable sequence
1954+
* chosen from a map of observables via a selector function or to an
1955+
* empty observable which runs on the given scheduler
1956+
*/
1957+
public static <K, R> Observable<R> switchCase(Func0<? extends K> caseSelector,
1958+
Map<? super K, ? extends Observable<? extends R>> mapOfCases, Scheduler scheduler) {
1959+
return switchCase(caseSelector, mapOfCases, Observable.<R>empty(scheduler));
1960+
}
1961+
/**
1962+
* Return an Observable that subscribes to an observable sequence
1963+
* chosen from a map of observables via a selector function or to the
1964+
* default observable.
1965+
* @param <K> the case key type
1966+
* @param <R> the result value type
1967+
* @param caseSelector the function that produces a case key when an Observer subscribes
1968+
* @param mapOfCases a map that maps a case key to an observable sequence
1969+
* @param defaultCase the default observable if the {@code mapOfCases} doesn't contain a value for
1970+
* the key returned by the {@case caseSelector}
1971+
* @return an Observable that subscribes to an observable sequence
1972+
* chosen from a map of observables via a selector function or to an
1973+
* empty observable
1974+
*/
1975+
public static <K, R> Observable<R> switchCase(Func0<? extends K> caseSelector,
1976+
Map<? super K, ? extends Observable<? extends R>> mapOfCases,
1977+
Observable<? extends R> defaultCase) {
1978+
return create(OperationConditionals.switchCase(caseSelector, mapOfCases, defaultCase));
1979+
}
1980+
1981+
/**
1982+
* Return an Observable that subscribes to the this Observable,
1983+
* then resubscribes only if the postCondition evaluates to true.
1984+
* @param postCondition the post condition after the source completes
1985+
* @return an Observable that subscribes to the source Observable,
1986+
* then resubscribes only if the postCondition evaluates to true.
1987+
*/
1988+
public Observable<T> doWhile(Func0<Boolean> postCondition) {
1989+
return create(OperationConditionals.doWhile(this, postCondition));
1990+
}
1991+
1992+
/**
1993+
* Return an Observable that subscribes and resubscribes to this
1994+
* Observable if the preCondition evaluates to true.
1995+
* @param preCondition the condition to evaluate before subscribing to this,
1996+
* and subscribe to source if it returns {@code true}
1997+
* @return an Observable that subscribes and resubscribes to the source
1998+
* Observable if the preCondition evaluates to true.
1999+
*/
2000+
public Observable<T> whileDo(Func0<Boolean> preCondition) {
2001+
return create(OperationConditionals.whileDo(this, preCondition));
2002+
}
2003+
2004+
/**
2005+
* Return an Observable that subscribes to the
2006+
* then Observables if the condition function evaluates to true, or to an empty
2007+
* Observable if false.
2008+
* @param <R> the result value type
2009+
* @param condition the condition to decide which Observables to subscribe to
2010+
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
2011+
* @return an Observable that subscribes to the
2012+
* then Observables if the condition function evaluates to true, or to an empty
2013+
* Observable running on the given scheduler if false
2014+
*/
2015+
public static <R> Observable<R> ifThen(Func0<Boolean> condition, Observable<? extends R> then) {
2016+
return ifThen(condition, then, Observable.<R>empty());
2017+
}
2018+
2019+
/**
2020+
* Return an Observable that subscribes to the
2021+
* then Observables if the condition function evaluates to true, or to an empty
2022+
* Observable running on the given scheduler if false.
2023+
* @param <R> the result value type
2024+
* @param condition the condition to decide which Observables to subscribe to
2025+
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
2026+
* @param scheduler the scheduler where the empty Observable is observed in case the condition returns false
2027+
* @return an Observable that subscribes to the
2028+
* then Observables if the condition function evaluates to true, or to an empty
2029+
* Observable running on the given scheduler if false
2030+
*/
2031+
public static <R> Observable<R> ifThen(Func0<Boolean> condition, Observable<? extends R> then, Scheduler scheduler) {
2032+
return ifThen(condition, then, Observable.<R>empty(scheduler));
2033+
}
2034+
/**
2035+
* Return an Observable that subscribes to either the
2036+
* then or orElse Observables depending on a condition function.
2037+
* @param <R> the result value type
2038+
* @param condition the condition to decide which Observables to subscribe to
2039+
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
2040+
* @param orElse the Observable sequence to subscribe to if {@code condition} is {@code false}
2041+
* @return an Observable that subscribes to either the
2042+
* then or orElse Observables depending on a condition function
2043+
*/
2044+
public static <R> Observable<R> ifThen(Func0<Boolean> condition, Observable<? extends R> then,
2045+
Observable<? extends R> orElse) {
2046+
return create(OperationConditionals.ifThen(condition, then, orElse));
2047+
}
2048+
19262049
/**
19272050
* Accepts an Observable and wraps it in another Observable that ensures
19282051
* that the resulting Observable is chronologically well-behaved.
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.Map;
19+
import rx.Observable;
20+
import rx.Observable.OnSubscribeFunc;
21+
import rx.Observer;
22+
import rx.Subscription;
23+
import rx.subscriptions.SerialSubscription;
24+
import rx.subscriptions.Subscriptions;
25+
import rx.util.functions.Func0;
26+
27+
/**
28+
* Implementation of conditional-based operations such as Case, If, DoWhile and While.
29+
*/
30+
public final class OperationConditionals {
31+
/** Utility class. */
32+
private OperationConditionals() { throw new IllegalStateException("No instances!"); }
33+
/**
34+
* Return a subscription function that subscribes to an observable sequence
35+
* chosen from a map of observables via a selector function or to the
36+
* default observable.
37+
* @param <K> the case key type
38+
* @param <R> the result value type
39+
* @param caseSelector the function that produces a case key when an Observer subscribes
40+
* @param mapOfCases a map that maps a case key to an observable sequence
41+
* @param defaultCase the default observable if the {@code mapOfCases} doesn't contain a value for
42+
* the key returned by the {@case caseSelector}
43+
* @return a subscription function
44+
*/
45+
public static <K, R> OnSubscribeFunc<R> switchCase(
46+
Func0<? extends K> caseSelector,
47+
Map<? super K, ? extends Observable<? extends R>> mapOfCases,
48+
Observable<? extends R> defaultCase) {
49+
return new SwitchCase<K, R>(caseSelector, mapOfCases, defaultCase);
50+
}
51+
/**
52+
* Return a subscription function that subscribes to either the
53+
* then or orElse Observables depending on a condition function.
54+
* @param <R> the result value type
55+
* @param condition the condition to decide which Observables to subscribe to
56+
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
57+
* @param orElse the Observable sequence to subscribe to if {@code condition} is {@code false}
58+
* @return a subscription function
59+
*/
60+
public static <R> OnSubscribeFunc<R> ifThen(
61+
Func0<Boolean> condition,
62+
Observable<? extends R> then,
63+
Observable<? extends R> orElse) {
64+
return new IfThen<R>(condition, then, orElse);
65+
}
66+
/**
67+
* Return a subscription function that subscribes to the source Observable,
68+
* then resubscribes only if the postCondition evaluates to true.
69+
* @param <T> the result value type
70+
* @param source the source Observable
71+
* @param postCondition the post condition after the source completes
72+
* @return a subscription function.
73+
*/
74+
public static <T> OnSubscribeFunc<T> doWhile(Observable<? extends T> source, Func0<Boolean> postCondition) {
75+
return new WhileDoWhile<T>(source, TRUE, postCondition);
76+
}
77+
/**
78+
* Return a subscription function that subscribes and resubscribes to the source
79+
* Observable if the preCondition evaluates to true.
80+
* @param <T> the result value type
81+
* @param source the source Observable
82+
* @param preCondition the condition to evaluate before subscribing to source,
83+
* and subscribe to source if it returns {@code true}
84+
* @return a subscription function.
85+
*/
86+
public static <T> OnSubscribeFunc<T> whileDo(Observable<? extends T> source, Func0<Boolean> preCondition) {
87+
return new WhileDoWhile<T>(source, preCondition, preCondition);
88+
}
89+
/**
90+
* Select an observable from a map based on a case key returned by a selector
91+
* function when an observer subscribes.
92+
* @param <K> the case key type
93+
* @param <R> the result value type
94+
*/
95+
private static final class SwitchCase<K, R> implements OnSubscribeFunc<R> {
96+
final Func0<? extends K> caseSelector;
97+
final Map<? super K, ? extends Observable<? extends R>> mapOfCases;
98+
final Observable<? extends R> defaultCase;
99+
public SwitchCase(Func0<? extends K> caseSelector,
100+
Map<? super K, ? extends Observable<? extends R>> mapOfCases,
101+
Observable<? extends R> defaultCase) {
102+
this.caseSelector = caseSelector;
103+
this.mapOfCases = mapOfCases;
104+
this.defaultCase = defaultCase;
105+
}
106+
107+
@Override
108+
public Subscription onSubscribe(Observer<? super R> t1) {
109+
Observable<? extends R> target;
110+
try {
111+
K caseKey = caseSelector.call();
112+
if (mapOfCases.containsKey(caseKey)) {
113+
target = mapOfCases.get(caseKey);
114+
} else {
115+
target = defaultCase;
116+
}
117+
} catch (Throwable t) {
118+
t1.onError(t);
119+
return Subscriptions.empty();
120+
}
121+
return target.subscribe(t1);
122+
}
123+
}
124+
/** Returns always true. */
125+
private static final class Func0True implements Func0<Boolean> {
126+
@Override
127+
public Boolean call() {
128+
return true;
129+
}
130+
}
131+
/** Returns always true function. */
132+
private static final Func0True TRUE = new Func0True();
133+
/**
134+
* Given a condition, subscribe to one of the observables when an Observer
135+
* subscribes.
136+
* @param <R> the result value type
137+
*/
138+
private static final class IfThen<R> implements OnSubscribeFunc<R> {
139+
final Func0<Boolean> condition;
140+
final Observable<? extends R> then;
141+
final Observable<? extends R> orElse;
142+
public IfThen(Func0<Boolean> condition, Observable<? extends R> then, Observable<? extends R> orElse) {
143+
this.condition = condition;
144+
this.then = then;
145+
this.orElse = orElse;
146+
}
147+
@Override
148+
public Subscription onSubscribe(Observer<? super R> t1) {
149+
Observable<? extends R> target;
150+
try {
151+
if (condition.call()) {
152+
target = then;
153+
} else {
154+
target = orElse;
155+
}
156+
} catch (Throwable t) {
157+
t1.onError(t);
158+
return Subscriptions.empty();
159+
}
160+
return target.subscribe(t1);
161+
}
162+
}
163+
/**
164+
* Repeatedly subscribes to the source observable if the pre- or
165+
* postcondition is true.
166+
* <p>
167+
* This combines the While and DoWhile into a single operation through
168+
* the conditions.
169+
* @param <T> the result value type
170+
*/
171+
private static final class WhileDoWhile<T> implements OnSubscribeFunc<T> {
172+
final Func0<Boolean> preCondition;
173+
final Func0<Boolean> postCondition;
174+
final Observable<? extends T> source;
175+
public WhileDoWhile(Observable<? extends T> source,
176+
Func0<Boolean> preCondition, Func0<Boolean> postCondition
177+
) {
178+
this.source = source;
179+
this.preCondition = preCondition;
180+
this.postCondition = postCondition;
181+
}
182+
183+
@Override
184+
public Subscription onSubscribe(Observer<? super T> t1) {
185+
boolean first;
186+
try {
187+
first = preCondition.call();
188+
} catch (Throwable t) {
189+
t1.onError(t);
190+
return Subscriptions.empty();
191+
}
192+
if (first) {
193+
SerialSubscription ssub = new SerialSubscription();
194+
195+
ssub.setSubscription(source.subscribe(new SourceObserver(t1, ssub)));
196+
197+
return ssub;
198+
} else {
199+
t1.onCompleted();
200+
}
201+
return Subscriptions.empty();
202+
}
203+
/** Observe the source. */
204+
final class SourceObserver implements Observer<T> {
205+
final SerialSubscription cancel;
206+
final Observer<? super T> observer;
207+
public SourceObserver(Observer<? super T> observer, SerialSubscription cancel) {
208+
this.observer = observer;
209+
this.cancel = cancel;
210+
}
211+
212+
@Override
213+
public void onNext(T args) {
214+
observer.onNext(args);
215+
}
216+
217+
@Override
218+
public void onError(Throwable e) {
219+
observer.onError(e);
220+
cancel.unsubscribe();
221+
}
222+
223+
@Override
224+
public void onCompleted() {
225+
boolean next;
226+
try {
227+
next = postCondition.call();
228+
} catch (Throwable t) {
229+
observer.onError(t);
230+
return;
231+
}
232+
if (next) {
233+
cancel.setSubscription(source.subscribe(this));
234+
} else {
235+
observer.onCompleted();
236+
cancel.unsubscribe();
237+
}
238+
}
239+
240+
}
241+
}
242+
}

0 commit comments

Comments
 (0)