Skip to content

Commit dd7c4ce

Browse files
committed
MergeMap with Iterable and resultSelector overloads
1 parent fe438ca commit dd7c4ce

File tree

3 files changed

+398
-0
lines changed

3 files changed

+398
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import rx.operators.OperationElementAt;
5454
import rx.operators.OperationFilter;
5555
import rx.operators.OperationFinally;
56+
import rx.operators.OperationFlatMap;
5657
import rx.operators.OperationGroupBy;
5758
import rx.operators.OperationGroupByUntil;
5859
import rx.operators.OperationGroupJoin;
@@ -3856,6 +3857,50 @@ public <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extend
38563857
return merge(map(func));
38573858
}
38583859

3860+
/**
3861+
* Create an Observable that applies a function to the pair of values from the source
3862+
* Observable and the collection Observable.
3863+
* @param <U> the element type of the collection Observable
3864+
* @param <R> the result type
3865+
* @param collectionSelector function that returns an Observable sequence for each value in the source Observable
3866+
* @param resultSelector function that combines the values of the source and collection Observable
3867+
* @return an Observable that applies a function to the pair of values from the source
3868+
* Observable and the collection Observable.
3869+
*/
3870+
public <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
3871+
Func2<? super T, ? super U, ? extends R> resultSelector) {
3872+
return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector));
3873+
}
3874+
3875+
/**
3876+
* Create an Observable that merges the values of the iterables returned by the
3877+
* collectionSelector for each source value.
3878+
* @param <R> the result value type
3879+
* @param collectionSelector function that returns an Iterable sequence of values for
3880+
* each source value.
3881+
* @return an Observable that merges the values of the iterables returned by the
3882+
* collectionSelector for each source value.
3883+
*/
3884+
public <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
3885+
return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector)));
3886+
}
3887+
3888+
/**
3889+
* Create an Observable that applies a function to the pair of values from the source
3890+
* Observable and the collection Iterable sequence.
3891+
* @param <U> the collection element type
3892+
* @param <R> the result type
3893+
* @param collectionSelector function that returns an Iterable sequence of values for
3894+
* each source value.
3895+
* @param resultSelector function that combines the values of the source and collection Iterable
3896+
* @return n Observable that applies a function to the pair of values from the source
3897+
* Observable and the collection Iterable sequence.
3898+
*/
3899+
public <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
3900+
Func2<? super T, ? super U, ? extends R> resultSelector) {
3901+
return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector);
3902+
}
3903+
38593904
/**
38603905
* Creates a new Observable by applying a function that you supply to each
38613906
* item emitted by the source Observable, where that function returns an
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import rx.Observable;
20+
import rx.Observable.OnSubscribeFunc;
21+
import rx.Observer;
22+
import rx.Subscription;
23+
import rx.subscriptions.CompositeSubscription;
24+
import rx.subscriptions.SerialSubscription;
25+
import rx.util.functions.Func1;
26+
import rx.util.functions.Func2;
27+
28+
/**
29+
* Additional flatMap operators.
30+
*/
31+
public final class OperationFlatMap {
32+
/** Utility class. */
33+
private OperationFlatMap() { throw new IllegalStateException("No instances!"); }
34+
35+
/**
36+
* Observable that pairs up the source values and all the derived collection
37+
* values and projects them via the selector.
38+
*/
39+
public static <T, U, R> OnSubscribeFunc<R> flatMap(Observable<? extends T> source,
40+
Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
41+
Func2<? super T, ? super U, ? extends R> resultSelector
42+
) {
43+
return new FlatMapPairSelector<T, U, R>(source, collectionSelector, resultSelector);
44+
}
45+
/**
46+
* Converts the result Iterable of a function into an Observable.
47+
*/
48+
public static <T, U> Func1<T, Observable<U>> flatMapIterableFunc(
49+
Func1<? super T, ? extends Iterable<? extends U>> collectionSelector) {
50+
return new IterableToObservableFunc<T, U>(collectionSelector);
51+
}
52+
/**
53+
* Converts the result Iterable of a function into an Observable.
54+
* @param <T> the parameter type
55+
* @param <R> the result type
56+
*/
57+
private static final class IterableToObservableFunc<T, R> implements Func1<T, Observable<R>> {
58+
final Func1<? super T, ? extends Iterable<? extends R>> func;
59+
60+
public IterableToObservableFunc(Func1<? super T, ? extends Iterable<? extends R>> func) {
61+
this.func = func;
62+
}
63+
64+
@Override
65+
public Observable<R> call(T t1) {
66+
return Observable.from(func.call(t1));
67+
}
68+
}
69+
/**
70+
* Pairs up the source value with each of the associated observable values
71+
* and uses a selector function to calculate the result sequence.
72+
* @param <T> the source value type
73+
* @param <U> the collection value type
74+
* @param <R> the result type
75+
*/
76+
private static final class FlatMapPairSelector<T, U, R> implements OnSubscribeFunc<R> {
77+
final Observable<? extends T> source;
78+
final Func1<? super T, ? extends Observable<? extends U>> collectionSelector;
79+
final Func2<? super T, ? super U, ? extends R> resultSelector;
80+
81+
public FlatMapPairSelector(Observable<? extends T> source, Func1<? super T, ? extends Observable<? extends U>> collectionSelector, Func2<? super T, ? super U, ? extends R> resultSelector) {
82+
this.source = source;
83+
this.collectionSelector = collectionSelector;
84+
this.resultSelector = resultSelector;
85+
}
86+
87+
@Override
88+
public Subscription onSubscribe(Observer<? super R> t1) {
89+
CompositeSubscription csub = new CompositeSubscription();
90+
91+
csub.add(source.subscribe(new SourceObserver<T, U, R>(t1, collectionSelector, resultSelector, csub)));
92+
93+
return csub;
94+
}
95+
96+
/** Observes the source, starts the collections and projects the result. */
97+
private static final class SourceObserver<T, U, R> implements Observer<T> {
98+
final Observer<? super R> observer;
99+
final Func1<? super T, ? extends Observable<? extends U>> collectionSelector;
100+
final Func2<? super T, ? super U, ? extends R> resultSelector;
101+
final CompositeSubscription csub;
102+
final AtomicInteger wip;
103+
/** Don't let various events run at the same time. */
104+
final Object guard;
105+
boolean done;
106+
107+
public SourceObserver(Observer<? super R> observer, Func1<? super T, ? extends Observable<? extends U>> collectionSelector, Func2<? super T, ? super U, ? extends R> resultSelector, CompositeSubscription csub) {
108+
this.observer = observer;
109+
this.collectionSelector = collectionSelector;
110+
this.resultSelector = resultSelector;
111+
this.csub = csub;
112+
this.wip = new AtomicInteger(1);
113+
this.guard = new Object();
114+
}
115+
116+
@Override
117+
public void onNext(T args) {
118+
Observable<? extends U> coll;
119+
try {
120+
coll = collectionSelector.call(args);
121+
} catch (Throwable e) {
122+
onError(e);
123+
return;
124+
}
125+
126+
SerialSubscription ssub = new SerialSubscription();
127+
csub.add(ssub);
128+
wip.incrementAndGet();
129+
130+
ssub.set(coll.subscribe(new CollectionObserver<T, U, R>(this, args, ssub)));
131+
}
132+
133+
@Override
134+
public void onError(Throwable e) {
135+
synchronized (guard) {
136+
if (done) {
137+
return;
138+
}
139+
done = true;
140+
observer.onError(e);
141+
}
142+
csub.unsubscribe();
143+
}
144+
145+
@Override
146+
public void onCompleted() {
147+
if (wip.decrementAndGet() == 0) {
148+
synchronized (guard) {
149+
if (done) {
150+
return;
151+
}
152+
done = true;
153+
observer.onCompleted();
154+
}
155+
csub.unsubscribe();
156+
}
157+
}
158+
159+
void complete(Subscription s) {
160+
csub.remove(s);
161+
onCompleted();
162+
}
163+
164+
void emit(T t, U u) {
165+
R r;
166+
try {
167+
r = resultSelector.call(t, u);
168+
} catch (Throwable e) {
169+
onError(e);
170+
return;
171+
}
172+
synchronized (guard) {
173+
if (done) {
174+
return;
175+
}
176+
observer.onNext(r);
177+
}
178+
}
179+
}
180+
/** Observe a collection and call emit with the pair of the key and the value. */
181+
private static final class CollectionObserver<T, U, R> implements Observer<U> {
182+
final SourceObserver<T, U, R> so;
183+
final Subscription cancel;
184+
final T value;
185+
186+
public CollectionObserver(SourceObserver<T, U, R> so, T value, Subscription cancel) {
187+
this.so = so;
188+
this.value = value;
189+
this.cancel = cancel;
190+
}
191+
192+
@Override
193+
public void onNext(U args) {
194+
so.emit(value, args);
195+
}
196+
197+
@Override
198+
public void onError(Throwable e) {
199+
so.onError(e);
200+
}
201+
202+
@Override
203+
public void onCompleted() {
204+
so.complete(cancel);
205+
}
206+
};
207+
}
208+
}

0 commit comments

Comments
 (0)