Skip to content

Commit 17307e4

Browse files
Merge pull request #736 from akarnokd/FlatMapOverloads
MergeMap with Iterable and resultSelector overloads
2 parents 9650eb1 + 892e27a commit 17307e4

File tree

3 files changed

+727
-0
lines changed

3 files changed

+727
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import rx.operators.OperationElementAt;
5454
import rx.operators.OperationFilter;
5555
import rx.operators.OperationFinally;
56+
import rx.operators.OperationFlatMap;
5657
import rx.operators.OperationGroupBy;
5758
import rx.operators.OperationGroupByUntil;
5859
import rx.operators.OperationGroupJoin;
@@ -4059,6 +4060,67 @@ public <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extend
40594060
return merge(map(func));
40604061
}
40614062

4063+
/**
4064+
* Create an Observable that applies a function to the pair of values from the source
4065+
* Observable and the collection Observable.
4066+
* @param <U> the element type of the collection Observable
4067+
* @param <R> the result type
4068+
* @param collectionSelector function that returns an Observable sequence for each value in the source Observable
4069+
* @param resultSelector function that combines the values of the source and collection Observable
4070+
* @return an Observable that applies a function to the pair of values from the source
4071+
* Observable and the collection Observable.
4072+
*/
4073+
public <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
4074+
Func2<? super T, ? super U, ? extends R> resultSelector) {
4075+
return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector));
4076+
}
4077+
4078+
/**
4079+
* Create an Observable that merges the values of the iterables returned by the
4080+
* collectionSelector for each source value.
4081+
* @param <R> the result value type
4082+
* @param collectionSelector function that returns an Iterable sequence of values for
4083+
* each source value.
4084+
* @return an Observable that merges the values of the iterables returned by the
4085+
* collectionSelector for each source value.
4086+
*/
4087+
public <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
4088+
return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector)));
4089+
}
4090+
4091+
/**
4092+
* Create an Observable that applies a function to the pair of values from the source
4093+
* Observable and the collection Iterable sequence.
4094+
* @param <U> the collection element type
4095+
* @param <R> the result type
4096+
* @param collectionSelector function that returns an Iterable sequence of values for
4097+
* each source value.
4098+
* @param resultSelector function that combines the values of the source and collection Iterable
4099+
* @return n Observable that applies a function to the pair of values from the source
4100+
* Observable and the collection Iterable sequence.
4101+
*/
4102+
public <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
4103+
Func2<? super T, ? super U, ? extends R> resultSelector) {
4104+
return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector);
4105+
}
4106+
4107+
/**
4108+
* Create an Observable that projects the notification of an observable sequence to an observable
4109+
* sequence and merges the results into one.
4110+
* @param <R> the result type
4111+
* @param onNext function returning a collection to merge for each onNext event of the source
4112+
* @param onError function returning a collection to merge for an onError event
4113+
* @param onCompleted function returning a collection to merge for an onCompleted event
4114+
* @return an Observable that projects the notification of an observable sequence to an observable
4115+
* sequence and merges the results into one.
4116+
*/
4117+
public <R> Observable<R> mergeMap(
4118+
Func1<? super T, ? extends Observable<? extends R>> onNext,
4119+
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
4120+
Func0<? extends Observable<? extends R>> onCompleted) {
4121+
return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted));
4122+
}
4123+
40624124
/**
40634125
* Creates a new Observable by applying a function that you supply to each
40644126
* item emitted by the source Observable, where that function returns an

0 commit comments

Comments
 (0)