Skip to content

Commit 892e27a

Browse files
committed
Added event-merger overload
1 parent dd7c4ce commit 892e27a

File tree

3 files changed

+329
-0
lines changed

3 files changed

+329
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3901,6 +3901,23 @@ public <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable
39013901
return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector);
39023902
}
39033903

3904+
/**
3905+
* Create an Observable that projects the notification of an observable sequence to an observable
3906+
* sequence and merges the results into one.
3907+
* @param <R> the result type
3908+
* @param onNext function returning a collection to merge for each onNext event of the source
3909+
* @param onError function returning a collection to merge for an onError event
3910+
* @param onCompleted function returning a collection to merge for an onCompleted event
3911+
* @return an Observable that projects the notification of an observable sequence to an observable
3912+
* sequence and merges the results into one.
3913+
*/
3914+
public <R> Observable<R> mergeMap(
3915+
Func1<? super T, ? extends Observable<? extends R>> onNext,
3916+
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
3917+
Func0<? extends Observable<? extends R>> onCompleted) {
3918+
return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted));
3919+
}
3920+
39043921
/**
39053922
* Creates a new Observable by applying a function that you supply to each
39063923
* item emitted by the source Observable, where that function returns an

rxjava-core/src/main/java/rx/operators/OperationFlatMap.java

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Subscription;
2323
import rx.subscriptions.CompositeSubscription;
2424
import rx.subscriptions.SerialSubscription;
25+
import rx.util.functions.Func0;
2526
import rx.util.functions.Func1;
2627
import rx.util.functions.Func2;
2728

@@ -205,4 +206,165 @@ public void onCompleted() {
205206
}
206207
};
207208
}
209+
210+
/**
211+
* Projects the notification of an observable sequence to an observable
212+
* sequence and merges the results into one.
213+
*/
214+
public static <T, R> OnSubscribeFunc<R> flatMap(Observable<? extends T> source,
215+
Func1<? super T, ? extends Observable<? extends R>> onNext,
216+
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
217+
Func0<? extends Observable<? extends R>> onCompleted) {
218+
return new FlatMapTransform<T, R>(source, onNext, onError, onCompleted);
219+
}
220+
221+
/**
222+
* Projects the notification of an observable sequence to an observable
223+
* sequence and merges the results into one.
224+
* @param <T> the source value type
225+
* @param <R> the result value type
226+
*/
227+
private static final class FlatMapTransform<T, R> implements OnSubscribeFunc<R> {
228+
final Observable<? extends T> source;
229+
final Func1<? super T, ? extends Observable<? extends R>> onNext;
230+
final Func1<? super Throwable, ? extends Observable<? extends R>> onError;
231+
final Func0<? extends Observable<? extends R>> onCompleted;
232+
233+
public FlatMapTransform(Observable<? extends T> source, Func1<? super T, ? extends Observable<? extends R>> onNext, Func1<? super Throwable, ? extends Observable<? extends R>> onError, Func0<? extends Observable<? extends R>> onCompleted) {
234+
this.source = source;
235+
this.onNext = onNext;
236+
this.onError = onError;
237+
this.onCompleted = onCompleted;
238+
}
239+
240+
@Override
241+
public Subscription onSubscribe(Observer<? super R> t1) {
242+
CompositeSubscription csub = new CompositeSubscription();
243+
244+
csub.add(source.subscribe(new SourceObserver<T, R>(t1, onNext, onError, onCompleted, csub)));
245+
246+
return csub;
247+
}
248+
/**
249+
* Observe the source and merge the values.
250+
* @param <T> the source value type
251+
* @param <R> the result value type
252+
*/
253+
private static final class SourceObserver<T, R> implements Observer<T> {
254+
final Observer<? super R> observer;
255+
final Func1<? super T, ? extends Observable<? extends R>> onNext;
256+
final Func1<? super Throwable, ? extends Observable<? extends R>> onError;
257+
final Func0<? extends Observable<? extends R>> onCompleted;
258+
final CompositeSubscription csub;
259+
final AtomicInteger wip;
260+
volatile boolean done;
261+
final Object guard;
262+
263+
public SourceObserver(Observer<? super R> observer, Func1<? super T, ? extends Observable<? extends R>> onNext, Func1<? super Throwable, ? extends Observable<? extends R>> onError, Func0<? extends Observable<? extends R>> onCompleted, CompositeSubscription csub) {
264+
this.observer = observer;
265+
this.onNext = onNext;
266+
this.onError = onError;
267+
this.onCompleted = onCompleted;
268+
this.csub = csub;
269+
this.guard = new Object();
270+
this.wip = new AtomicInteger(1);
271+
}
272+
273+
@Override
274+
public void onNext(T args) {
275+
Observable<? extends R> o;
276+
try {
277+
o = onNext.call(args);
278+
} catch (Throwable t) {
279+
synchronized (guard) {
280+
observer.onError(t);
281+
}
282+
csub.unsubscribe();
283+
return;
284+
}
285+
subscribeInner(o);
286+
}
287+
288+
@Override
289+
public void onError(Throwable e) {
290+
Observable<? extends R> o;
291+
try {
292+
o = onError.call(e);
293+
} catch (Throwable t) {
294+
synchronized (guard) {
295+
observer.onError(t);
296+
}
297+
csub.unsubscribe();
298+
return;
299+
}
300+
subscribeInner(o);
301+
done = true;
302+
finish();
303+
}
304+
305+
@Override
306+
public void onCompleted() {
307+
Observable<? extends R> o;
308+
try {
309+
o = onCompleted.call();
310+
} catch (Throwable t) {
311+
synchronized (guard) {
312+
observer.onError(t);
313+
}
314+
csub.unsubscribe();
315+
return;
316+
}
317+
subscribeInner(o);
318+
done = true;
319+
finish();
320+
}
321+
322+
void subscribeInner(Observable<? extends R> o) {
323+
SerialSubscription ssub = new SerialSubscription();
324+
wip.incrementAndGet();
325+
csub.add(ssub);
326+
327+
ssub.set(o.subscribe(new CollectionObserver<T, R>(this, ssub)));
328+
}
329+
void finish() {
330+
if (wip.decrementAndGet() == 0) {
331+
synchronized (guard) {
332+
observer.onCompleted();
333+
}
334+
csub.unsubscribe();
335+
}
336+
}
337+
}
338+
/** Observes the collections. */
339+
private static final class CollectionObserver<T, R> implements Observer<R> {
340+
final SourceObserver<T, R> parent;
341+
final Subscription cancel;
342+
343+
public CollectionObserver(SourceObserver<T, R> parent, Subscription cancel) {
344+
this.parent = parent;
345+
this.cancel = cancel;
346+
}
347+
348+
@Override
349+
public void onNext(R args) {
350+
synchronized (parent.guard) {
351+
parent.observer.onNext(args);
352+
}
353+
}
354+
355+
@Override
356+
public void onError(Throwable e) {
357+
synchronized (parent.guard) {
358+
parent.observer.onError(e);
359+
}
360+
parent.csub.unsubscribe();
361+
}
362+
363+
@Override
364+
public void onCompleted() {
365+
parent.csub.remove(cancel);
366+
parent.finish();
367+
}
368+
}
369+
}
208370
}

rxjava-core/src/test/java/rx/operators/OperationFlatMapTest.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.Mockito.*;
2222
import rx.Observable;
2323
import rx.Observer;
24+
import rx.util.functions.Func0;
2425
import rx.util.functions.Func1;
2526
import rx.util.functions.Func2;
2627

@@ -142,4 +143,153 @@ public Integer call(Integer t1, Integer t2) {
142143
verify(o, never()).onNext(any());
143144
verify(o).onError(any(OperationReduceTest.CustomException.class));
144145
}
146+
<T, R> Func1<T, R> just(final R value) {
147+
return new Func1<T, R>() {
148+
149+
@Override
150+
public R call(T t1) {
151+
return value;
152+
}
153+
};
154+
}
155+
<R> Func0<R> just0(final R value) {
156+
return new Func0<R>() {
157+
158+
@Override
159+
public R call() {
160+
return value;
161+
}
162+
};
163+
}
164+
@Test
165+
public void testFlatMapTransformsNormal() {
166+
Observable<Integer> onNext = Observable.from(Arrays.asList(1, 2, 3));
167+
Observable<Integer> onCompleted = Observable.from(Arrays.asList(4));
168+
Observable<Integer> onError = Observable.from(Arrays.asList(5));
169+
170+
Observable<Integer> source = Observable.from(Arrays.asList(10, 20, 30));
171+
172+
@SuppressWarnings("unchecked")
173+
Observer<Object> o = mock(Observer.class);
174+
175+
source.mergeMap(just(onNext), just(onError), just0(onCompleted)).subscribe(o);
176+
177+
verify(o, times(3)).onNext(1);
178+
verify(o, times(3)).onNext(2);
179+
verify(o, times(3)).onNext(3);
180+
verify(o).onNext(4);
181+
verify(o).onCompleted();
182+
183+
verify(o, never()).onNext(5);
184+
verify(o, never()).onError(any(Throwable.class));
185+
}
186+
@Test
187+
public void testFlatMapTransformsException() {
188+
Observable<Integer> onNext = Observable.from(Arrays.asList(1, 2, 3));
189+
Observable<Integer> onCompleted = Observable.from(Arrays.asList(4));
190+
Observable<Integer> onError = Observable.from(Arrays.asList(5));
191+
192+
Observable<Integer> source = Observable.concat(
193+
Observable.from(Arrays.asList(10, 20, 30))
194+
, Observable.<Integer>error(new RuntimeException("Forced failure!"))
195+
);
196+
197+
@SuppressWarnings("unchecked")
198+
Observer<Object> o = mock(Observer.class);
199+
200+
source.mergeMap(just(onNext), just(onError), just0(onCompleted)).subscribe(o);
201+
202+
verify(o, times(3)).onNext(1);
203+
verify(o, times(3)).onNext(2);
204+
verify(o, times(3)).onNext(3);
205+
verify(o).onNext(5);
206+
verify(o).onCompleted();
207+
verify(o, never()).onNext(4);
208+
209+
verify(o, never()).onError(any(Throwable.class));
210+
}
211+
<R> Func0<R> funcThrow0(R r) {
212+
return new Func0<R>() {
213+
@Override
214+
public R call() {
215+
throw new OperationReduceTest.CustomException();
216+
}
217+
};
218+
}
219+
<T, R> Func1<T, R> funcThrow(T t, R r) {
220+
return new Func1<T, R>() {
221+
@Override
222+
public R call(T t) {
223+
throw new OperationReduceTest.CustomException();
224+
}
225+
};
226+
}
227+
@Test
228+
public void testFlatMapTransformsOnNextFuncThrows() {
229+
Observable<Integer> onCompleted = Observable.from(Arrays.asList(4));
230+
Observable<Integer> onError = Observable.from(Arrays.asList(5));
231+
232+
Observable<Integer> source = Observable.from(Arrays.asList(10, 20, 30));
233+
234+
@SuppressWarnings("unchecked")
235+
Observer<Object> o = mock(Observer.class);
236+
237+
source.mergeMap(funcThrow(1, onError), just(onError), just0(onCompleted)).subscribe(o);
238+
239+
verify(o).onError(any(OperationReduceTest.CustomException.class));
240+
verify(o, never()).onNext(any());
241+
verify(o, never()).onCompleted();
242+
}
243+
@Test
244+
public void testFlatMapTransformsOnErrorFuncThrows() {
245+
Observable<Integer> onNext = Observable.from(Arrays.asList(1, 2, 3));
246+
Observable<Integer> onCompleted = Observable.from(Arrays.asList(4));
247+
Observable<Integer> onError = Observable.from(Arrays.asList(5));
248+
249+
Observable<Integer> source = Observable.error(new OperationReduceTest.CustomException());
250+
251+
@SuppressWarnings("unchecked")
252+
Observer<Object> o = mock(Observer.class);
253+
254+
source.mergeMap(just(onNext), funcThrow((Throwable)null, onError), just0(onCompleted)).subscribe(o);
255+
256+
verify(o).onError(any(OperationReduceTest.CustomException.class));
257+
verify(o, never()).onNext(any());
258+
verify(o, never()).onCompleted();
259+
}
260+
261+
@Test
262+
public void testFlatMapTransformsOnCompletedFuncThrows() {
263+
Observable<Integer> onNext = Observable.from(Arrays.asList(1, 2, 3));
264+
Observable<Integer> onCompleted = Observable.from(Arrays.asList(4));
265+
Observable<Integer> onError = Observable.from(Arrays.asList(5));
266+
267+
Observable<Integer> source = Observable.from(Arrays.<Integer>asList());
268+
269+
@SuppressWarnings("unchecked")
270+
Observer<Object> o = mock(Observer.class);
271+
272+
source.mergeMap(just(onNext), just(onError), funcThrow0(onCompleted)).subscribe(o);
273+
274+
verify(o).onError(any(OperationReduceTest.CustomException.class));
275+
verify(o, never()).onNext(any());
276+
verify(o, never()).onCompleted();
277+
}
278+
@Test
279+
public void testFlatMapTransformsMergeException() {
280+
Observable<Integer> onNext = Observable.error(new OperationReduceTest.CustomException());
281+
Observable<Integer> onCompleted = Observable.from(Arrays.asList(4));
282+
Observable<Integer> onError = Observable.from(Arrays.asList(5));
283+
284+
Observable<Integer> source = Observable.from(Arrays.asList(10, 20, 30));
285+
286+
@SuppressWarnings("unchecked")
287+
Observer<Object> o = mock(Observer.class);
288+
289+
source.mergeMap(just(onNext), just(onError), funcThrow0(onCompleted)).subscribe(o);
290+
291+
verify(o).onError(any(OperationReduceTest.CustomException.class));
292+
verify(o, never()).onNext(any());
293+
verify(o, never()).onCompleted();
294+
}
145295
}

0 commit comments

Comments
 (0)