Skip to content

Commit b378593

Browse files
MergeMap, ConcatMap, SwitchMap
- flattening map implementations that better fit mental models and use cases - keeping flatMap as alias to mergeMap since flatMap is common … (though flatMap still confuses people) - deprecate mapMany as alias to flatMap and mergeMap
1 parent 1b3feeb commit b378593

File tree

3 files changed

+65
-27
lines changed

3 files changed

+65
-27
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3653,7 +3653,7 @@ public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit
36533653
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#zip">RxJava Wiki: zip()</a>
36543654
*/
36553655
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
3656-
return ws.toList().mapMany(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
3656+
return ws.toList().mergeMap(new Func1<List<? extends Observable<?>>, Observable<? extends R>>() {
36573657
@Override
36583658
public Observable<R> call(List<? extends Observable<?>> wsList) {
36593659
return create(OperationZip.zip(wsList, zipFunction));
@@ -3891,7 +3891,64 @@ public Observable<T> finallyDo(Action0 action) {
38913891
* @see #mapMany(Func1)
38923892
*/
38933893
public <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3894-
return mapMany(func);
3894+
return mergeMap(func);
3895+
}
3896+
3897+
/**
3898+
* Creates a new Observable by applying a function that you supply to each
3899+
* item emitted by the source Observable, where that function returns an
3900+
* Observable, and then merging those resulting Observables and emitting the
3901+
* results of this merger.
3902+
* <p>
3903+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/flatMap.png">
3904+
* <p>
3905+
* Note: {@code mapMany} and {@code flatMap} are equivalent.
3906+
*
3907+
* @param func a function that, when applied to an item emitted by the
3908+
* source Observable, returns an Observable
3909+
* @return an Observable that emits the result of applying the
3910+
* transformation function to each item emitted by the source
3911+
* Observable and merging the results of the Observables obtained
3912+
* from this transformation.
3913+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mapmany-or-flatmap-and-mapmanydelayerror">RxJava Wiki: flatMap()</a>
3914+
* @see #flatMap(Func1)
3915+
*/
3916+
public <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3917+
return merge(map(func));
3918+
}
3919+
3920+
/**
3921+
* Creates a new Observable by applying a function that you supply to each
3922+
* item emitted by the source Observable, where that function returns an
3923+
* Observable, and then concatting those resulting Observables and emitting the
3924+
* results of this concat.
3925+
* <p>
3926+
*
3927+
* @param func a function that, when applied to an item emitted by the
3928+
* source Observable, returns an Observable
3929+
* @return an Observable that emits the result of applying the
3930+
* transformation function to each item emitted by the source
3931+
* Observable and concatting the results of the Observables obtained
3932+
* from this transformation.
3933+
*/
3934+
public <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3935+
return concat(map(func));
3936+
}
3937+
3938+
/**
3939+
* Creates a new Observable by applying a function that you supply to each
3940+
* item emitted by the source Observable resulting in an Observable of Observables.
3941+
* <p>
3942+
* Then a {@link #switchLatest(Observable)} / {@link #switchOnNext(Observable)} is applied.
3943+
*
3944+
* @param func a function that, when applied to an item emitted by the
3945+
* source Observable, returns an Observable
3946+
* @return an Observable that emits the result of applying the
3947+
* transformation function to each item emitted by the source
3948+
* Observable and then switch
3949+
*/
3950+
public <R> Observable<R> switchMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3951+
return switchOnNext(map(func));
38953952
}
38963953

38973954
/**
@@ -3965,9 +4022,10 @@ public <R> Observable<R> mapWithIndex(Func2<? super T, Integer, ? extends R> fun
39654022
* from this transformation.
39664023
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#mapmany-or-flatmap-and-mapmanydelayerror">RxJava Wiki: mapMany()</a>
39674024
* @see #flatMap(Func1)
4025+
* @deprecated
39684026
*/
39694027
public <R> Observable<R> mapMany(Func1<? super T, ? extends Observable<? extends R>> func) {
3970-
return create(OperationMap.mapMany(this, func));
4028+
return mergeMap(func);
39714029
}
39724030

39734031
/**

rxjava-core/src/main/java/rx/operators/OperationMap.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -77,26 +77,6 @@ public Subscription onSubscribe(Observer<? super R> observer) {
7777
};
7878
}
7979

80-
/**
81-
* Accepts a sequence of observable sequences and a transformation function. Returns a flattened sequence that is the result of
82-
* applying the transformation function to each item in the sequence of each observable sequence.
83-
* <p>
84-
* The closure should return an Observable which will then be merged.
85-
*
86-
* @param sequence
87-
* the input sequence.
88-
* @param func
89-
* a function to apply to each item in the sequence.
90-
* @param <T>
91-
* the type of the input sequence.
92-
* @param <R>
93-
* the type of the output sequence.
94-
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
95-
*/
96-
public static <T, R> OnSubscribeFunc<R> mapMany(Observable<? extends T> sequence, Func1<? super T, ? extends Observable<? extends R>> func) {
97-
return OperationMerge.merge(Observable.create(map(sequence, func)));
98-
}
99-
10080
/**
10181
* An observable sequence that is the result of applying a transformation to each item in an input sequence.
10282
*

rxjava-core/src/test/java/rx/operators/OperationMapTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void testMapMany() {
118118
Observable<Integer> ids = Observable.from(1, 2);
119119

120120
/* now simulate the behavior to take those IDs and perform nested async calls based on them */
121-
Observable<String> m = Observable.create(mapMany(ids, new Func1<Integer, Observable<String>>() {
121+
Observable<String> m = ids.flatMap(new Func1<Integer, Observable<String>>() {
122122

123123
@Override
124124
public Observable<String> call(Integer id) {
@@ -143,7 +143,7 @@ public String call(Map<String, String> map) {
143143
}));
144144
}
145145

146-
}));
146+
});
147147
m.subscribe(stringObserver);
148148

149149
verify(stringObserver, never()).onError(any(Throwable.class));
@@ -166,7 +166,7 @@ public void testMapMany2() {
166166

167167
Observable<Observable<Map<String, String>>> observable = Observable.from(observable1, observable2);
168168

169-
Observable<String> m = Observable.create(mapMany(observable, new Func1<Observable<Map<String, String>>, Observable<String>>() {
169+
Observable<String> m = observable.flatMap(new Func1<Observable<Map<String, String>>, Observable<String>>() {
170170

171171
@Override
172172
public Observable<String> call(Observable<Map<String, String>> o) {
@@ -179,7 +179,7 @@ public String call(Map<String, String> map) {
179179
}));
180180
}
181181

182-
}));
182+
});
183183
m.subscribe(stringObserver);
184184

185185
verify(stringObserver, never()).onError(any(Throwable.class));

0 commit comments

Comments
 (0)