Skip to content

Commit b12f3eb

Browse files
Merge pull request #1560 from benjchristensen/flatMap-mergeMap
flatMap overloads
2 parents c9585d1 + 001b696 commit b12f3eb

File tree

3 files changed

+220
-155
lines changed

3 files changed

+220
-155
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 125 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4738,7 +4738,118 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
47384738
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.selectmany.aspx">MSDN: Observable.SelectMany</a>
47394739
*/
47404740
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
4741-
return mergeMap(func);
4741+
return merge(map(func));
4742+
}
4743+
4744+
/**
4745+
* Returns an Observable that applies a function to each item emitted or notification raised by the source
4746+
* Observable and then flattens the Observables returned from these functions and emits the resulting items.
4747+
* <p>
4748+
* <img width="640" height="410" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMap.nce.png" alt="">
4749+
* <dl>
4750+
* <dt><b>Scheduler:</b></dt>
4751+
* <dd>{@code mergeMap} does not operate by default on a particular {@link Scheduler}.</dd>
4752+
* </dl>
4753+
*
4754+
* @param <R>
4755+
* the result type
4756+
* @param onNext
4757+
* a function that returns an Observable to merge for each item emitted by the source Observable
4758+
* @param onError
4759+
* a function that returns an Observable to merge for an onError notification from the source
4760+
* Observable
4761+
* @param onCompleted
4762+
* a function that returns an Observable to merge for an onCompleted notification from the source
4763+
* Observable
4764+
* @return an Observable that emits the results of merging the Observables returned from applying the
4765+
* specified functions to the emissions and notifications of the source Observable
4766+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
4767+
*/
4768+
public final <R> Observable<R> flatMap(
4769+
Func1<? super T, ? extends Observable<? extends R>> onNext,
4770+
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
4771+
Func0<? extends Observable<? extends R>> onCompleted) {
4772+
return merge(mapNotification(onNext, onError, onCompleted));
4773+
}
4774+
4775+
/**
4776+
* Returns an Observable that emits the results of a specified function to the pair of values emitted by the
4777+
* source Observable and a specified collection Observable.
4778+
* <p>
4779+
* <img width="640" height="390" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMap.r.png" alt="">
4780+
* <dl>
4781+
* <dt><b>Scheduler:</b></dt>
4782+
* <dd>{@code mergeMap} does not operate by default on a particular {@link Scheduler}.</dd>
4783+
* </dl>
4784+
*
4785+
* @param <U>
4786+
* the type of items emitted by the collection Observable
4787+
* @param <R>
4788+
* the type of items emitted by the resulting Observable
4789+
* @param collectionSelector
4790+
* a function that returns an Observable for each item emitted by the source Observable
4791+
* @param resultSelector
4792+
* a function that combines one item emitted by each of the source and collection Observables and
4793+
* returns an item to be emitted by the resulting Observable
4794+
* @return an Observable that emits the results of applying a function to a pair of values emitted by the
4795+
* source Observable and the collection Observable
4796+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
4797+
*/
4798+
public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
4799+
final Func2<? super T, ? super U, ? extends R> resultSelector) {
4800+
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)));
4801+
}
4802+
4803+
/**
4804+
* Returns an Observable that merges each item emitted by the source Observable with the values in an
4805+
* Iterable corresponding to that item that is generated by a selector.
4806+
* <p>
4807+
* <img width="640" height="310" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
4808+
* <dl>
4809+
* <dt><b>Scheduler:</b></dt>
4810+
* <dd>{@code mergeMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
4811+
* </dl>
4812+
*
4813+
* @param <R>
4814+
* the type of item emitted by the resulting Observable
4815+
* @param collectionSelector
4816+
* a function that returns an Iterable sequence of values for when given an item emitted by the
4817+
* source Observable
4818+
* @return an Observable that emits the results of merging the items emitted by the source Observable with
4819+
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
4820+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
4821+
*/
4822+
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
4823+
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
4824+
}
4825+
4826+
/**
4827+
* Returns an Observable that emits the results of applying a function to the pair of values from the source
4828+
* Observable and an Iterable corresponding to that item that is generated by a selector.
4829+
* <p>
4830+
* <img width="640" height="390" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMapIterable.r.png" alt="">
4831+
* <dl>
4832+
* <dt><b>Scheduler:</b></dt>
4833+
* <dd>{@code mergeMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
4834+
* </dl>
4835+
*
4836+
* @param <U>
4837+
* the collection element type
4838+
* @param <R>
4839+
* the type of item emited by the resulting Observable
4840+
* @param collectionSelector
4841+
* a function that returns an Iterable sequence of values for each item emitted by the source
4842+
* Observable
4843+
* @param resultSelector
4844+
* a function that returns an item based on the item emitted by the source Observable and the
4845+
* Iterable returned for that item by the {@code collectionSelector}
4846+
* @return an Observable that emits the items returned by {@code resultSelector} for each item in the source
4847+
* Observable
4848+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
4849+
*/
4850+
public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
4851+
Func2<? super T, ? super U, ? extends R> resultSelector) {
4852+
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector);
47424853
}
47434854

47444855
/**
@@ -5225,7 +5336,9 @@ public final Observable<Notification<T>> materialize() {
52255336
* transformations
52265337
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
52275338
* @see #flatMap(Func1)
5339+
* @deprecated use flatMap
52285340
*/
5341+
@Deprecated
52295342
public final <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends R>> func) {
52305343
return merge(map(func));
52315344
}
@@ -5253,7 +5366,9 @@ public final <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<?
52535366
* @return an Observable that emits the results of merging the Observables returned from applying the
52545367
* specified functions to the emissions and notifications of the source Observable
52555368
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
5369+
* @deprecated use flatMap
52565370
*/
5371+
@Deprecated
52575372
public final <R> Observable<R> mergeMap(
52585373
Func1<? super T, ? extends Observable<? extends R>> onNext,
52595374
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
@@ -5283,10 +5398,12 @@ public final <R> Observable<R> mergeMap(
52835398
* @return an Observable that emits the results of applying a function to a pair of values emitted by the
52845399
* source Observable and the collection Observable
52855400
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMap</a>
5401+
* @deprecated use flatMap
52865402
*/
5403+
@Deprecated
52875404
public final <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
52885405
Func2<? super T, ? super U, ? extends R> resultSelector) {
5289-
return lift(new OperatorMergeMapPair<T, U, R>(collectionSelector, resultSelector));
5406+
return flatMap(collectionSelector, resultSelector);
52905407
}
52915408

52925409
/**
@@ -5307,9 +5424,11 @@ public final <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable
53075424
* @return an Observable that emits the results of merging the items emitted by the source Observable with
53085425
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
53095426
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
5427+
* @deprecated use flatMapIterable
53105428
*/
5429+
@Deprecated
53115430
public final <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
5312-
return merge(map(OperatorMergeMapPair.convertSelector(collectionSelector)));
5431+
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
53135432
}
53145433

53155434
/**
@@ -5335,10 +5454,12 @@ public final <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Itera
53355454
* @return an Observable that emits the items returned by {@code resultSelector} for each item in the source
53365455
* Observable
53375456
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mergemap-and-mergemapiterable">RxJava wiki: mergeMapIterable</a>
5457+
* @deprecated use flatMapIterable
53385458
*/
5459+
@Deprecated
53395460
public final <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
53405461
Func2<? super T, ? super U, ? extends R> resultSelector) {
5341-
return mergeMap(OperatorMergeMapPair.convertSelector(collectionSelector), resultSelector);
5462+
return mergeMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector);
53425463
}
53435464

53445465
/**
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
import rx.exceptions.OnErrorThrowable;
22+
import rx.functions.Func1;
23+
import rx.functions.Func2;
24+
25+
/**
26+
* An {@link Operator} that pairs up items emitted by a source {@link Observable} with the sequence of items
27+
* emitted by the {@code Observable} that is derived from each item by means of a selector, and emits the
28+
* results of this pairing.
29+
*
30+
* @param <T>
31+
* the type of items emitted by the source {@code Observable}
32+
* @param <U>
33+
* the type of items emitted by the derived {@code Observable}s
34+
* @param <R>
35+
* the type of items to be emitted by this {@code Operator}
36+
*/
37+
public final class OperatorMapPair<T, U, R> implements Operator<Observable<? extends R>, T> {
38+
39+
/**
40+
* Creates the function that generates a {@code Observable} based on an item emitted by another {@code Observable}.
41+
*
42+
* @param selector
43+
* a function that accepts an item and returns an {@code Iterable} of corresponding items
44+
* @return a function that converts an item emitted by the source {@code Observable} into an {@code Observable} that emits the items generated by {@code selector} operating on that item
45+
*/
46+
public static <T, U> Func1<T, Observable<U>> convertSelector(final Func1<? super T, ? extends Iterable<? extends U>> selector) {
47+
return new Func1<T, Observable<U>>() {
48+
@Override
49+
public Observable<U> call(T t1) {
50+
return Observable.from(selector.call(t1));
51+
}
52+
};
53+
}
54+
55+
final Func1<? super T, ? extends Observable<? extends U>> collectionSelector;
56+
final Func2<? super T, ? super U, ? extends R> resultSelector;
57+
58+
public OperatorMapPair(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector, final Func2<? super T, ? super U, ? extends R> resultSelector) {
59+
this.collectionSelector = collectionSelector;
60+
this.resultSelector = resultSelector;
61+
}
62+
63+
@Override
64+
public Subscriber<? super T> call(final Subscriber<? super Observable<? extends R>> o) {
65+
return new Subscriber<T>(o) {
66+
67+
@Override
68+
public void onCompleted() {
69+
o.onCompleted();
70+
}
71+
72+
@Override
73+
public void onError(Throwable e) {
74+
o.onError(e);
75+
}
76+
77+
@Override
78+
public void onNext(final T outer) {
79+
try {
80+
o.onNext(collectionSelector.call(outer).map(new Func1<U, R>() {
81+
82+
@Override
83+
public R call(U inner) {
84+
return resultSelector.call(outer, inner);
85+
}
86+
}));
87+
} catch (Throwable e) {
88+
o.onError(OnErrorThrowable.addValueAsLastCause(e, outer));
89+
}
90+
}
91+
92+
};
93+
}
94+
95+
}

0 commit comments

Comments
 (0)