Skip to content

Commit 37bde8c

Browse files
authored
Merge pull request #4858 from akarnokd/FlatMapSingleElement
2.x: add Maybe.flatMapSingleElement returning Maybe
2 parents 557aca3 + a94a307 commit 37bde8c

File tree

3 files changed

+307
-0
lines changed

3 files changed

+307
-0
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2701,6 +2701,32 @@ public final <R> Single<R> flatMapSingle(final Function<? super T, ? extends Sin
27012701
return RxJavaPlugins.onAssembly(new MaybeFlatMapSingle<T, R>(this, mapper));
27022702
}
27032703

2704+
/**
2705+
* Returns a {@link Maybe} based on applying a specified function to the item emitted by the
2706+
* source {@link Maybe}, where that function returns a {@link Single}.
2707+
* When this Maybe just completes the resulting {@code Maybe} completes as well.
2708+
* <p>
2709+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle.png" alt="">
2710+
* <dl>
2711+
* <dt><b>Scheduler:</b></dt>
2712+
* <dd>{@code flatMapSingleElement} does not operate by default on a particular {@link Scheduler}.</dd>
2713+
* </dl>
2714+
*
2715+
* @param <R> the result value type
2716+
* @param mapper
2717+
* a function that, when applied to the item emitted by the source Maybe, returns a
2718+
* Single
2719+
* @return the new Maybe instance
2720+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
2721+
* @since 2.0.2 - experimental
2722+
*/
2723+
@SchedulerSupport(SchedulerSupport.NONE)
2724+
@Experimental
2725+
public final <R> Maybe<R> flatMapSingleElement(final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
2726+
ObjectHelper.requireNonNull(mapper, "mapper is null");
2727+
return RxJavaPlugins.onAssembly(new MaybeFlatMapSingleElement<T, R>(this, mapper));
2728+
}
2729+
27042730
/**
27052731
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
27062732
* source {@link Maybe}, where that function returns a {@link Completable}.
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.Experimental;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.Function;
23+
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.functions.ObjectHelper;
25+
26+
/**
27+
* Maps the success value of the source MaybeSource into a Single.
28+
* @param <T> the input value type
29+
* @param <R> the result value type
30+
*
31+
* @since 2.0.2 - experimental
32+
*/
33+
@Experimental
34+
public final class MaybeFlatMapSingleElement<T, R> extends Maybe<R> {
35+
36+
final MaybeSource<T> source;
37+
38+
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
39+
40+
public MaybeFlatMapSingleElement(MaybeSource<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper) {
41+
this.source = source;
42+
this.mapper = mapper;
43+
}
44+
45+
@Override
46+
protected void subscribeActual(MaybeObserver<? super R> actual) {
47+
source.subscribe(new FlatMapMaybeObserver<T, R>(actual, mapper));
48+
}
49+
50+
static final class FlatMapMaybeObserver<T, R>
51+
extends AtomicReference<Disposable>
52+
implements MaybeObserver<T>, Disposable {
53+
54+
private static final long serialVersionUID = 4827726964688405508L;
55+
56+
final MaybeObserver<? super R> actual;
57+
58+
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
59+
60+
FlatMapMaybeObserver(MaybeObserver<? super R> actual, Function<? super T, ? extends SingleSource<? extends R>> mapper) {
61+
this.actual = actual;
62+
this.mapper = mapper;
63+
}
64+
65+
@Override
66+
public void dispose() {
67+
DisposableHelper.dispose(this);
68+
}
69+
70+
@Override
71+
public boolean isDisposed() {
72+
return DisposableHelper.isDisposed(get());
73+
}
74+
75+
@Override
76+
public void onSubscribe(Disposable d) {
77+
if (DisposableHelper.setOnce(this, d)) {
78+
actual.onSubscribe(this);
79+
}
80+
}
81+
82+
@Override
83+
public void onSuccess(T value) {
84+
SingleSource<? extends R> ss;
85+
86+
try {
87+
ss = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null SingleSource");
88+
} catch (Throwable ex) {
89+
Exceptions.throwIfFatal(ex);
90+
onError(ex);
91+
return;
92+
}
93+
94+
ss.subscribe(new FlatMapSingleObserver<R>(this, actual));
95+
}
96+
97+
@Override
98+
public void onError(Throwable e) {
99+
actual.onError(e);
100+
}
101+
102+
@Override
103+
public void onComplete() {
104+
actual.onComplete();
105+
}
106+
}
107+
108+
static final class FlatMapSingleObserver<R> implements SingleObserver<R> {
109+
110+
final AtomicReference<Disposable> parent;
111+
112+
final MaybeObserver<? super R> actual;
113+
114+
FlatMapSingleObserver(AtomicReference<Disposable> parent, MaybeObserver<? super R> actual) {
115+
this.parent = parent;
116+
this.actual = actual;
117+
}
118+
119+
@Override
120+
public void onSubscribe(final Disposable d) {
121+
DisposableHelper.replace(parent, d);
122+
}
123+
124+
@Override
125+
public void onSuccess(final R value) {
126+
actual.onSuccess(value);
127+
}
128+
129+
@Override
130+
public void onError(final Throwable e) {
131+
actual.onError(e);
132+
}
133+
}
134+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import org.junit.Test;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.exceptions.TestException;
20+
import io.reactivex.functions.Function;
21+
22+
public class MaybeFlatMapSingleElementTest {
23+
@Test(expected = NullPointerException.class)
24+
public void flatMapSingleElementNull() {
25+
Maybe.just(1)
26+
.flatMapSingleElement(null);
27+
}
28+
29+
@Test
30+
public void flatMapSingleElementValue() {
31+
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
32+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
33+
if (integer == 1) {
34+
return Single.just(2);
35+
}
36+
37+
return Single.just(1);
38+
}
39+
})
40+
.test()
41+
.assertResult(2);
42+
}
43+
44+
@Test
45+
public void flatMapSingleElementValueDifferentType() {
46+
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<String>>() {
47+
@Override public SingleSource<String> apply(final Integer integer) throws Exception {
48+
if (integer == 1) {
49+
return Single.just("2");
50+
}
51+
52+
return Single.just("1");
53+
}
54+
})
55+
.test()
56+
.assertResult("2");
57+
}
58+
59+
@Test
60+
public void flatMapSingleElementValueNull() {
61+
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
62+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
63+
return null;
64+
}
65+
})
66+
.test()
67+
.assertNoValues()
68+
.assertError(NullPointerException.class)
69+
.assertErrorMessage("The mapper returned a null SingleSource");
70+
}
71+
72+
@Test
73+
public void flatMapSingleElementValueErrorThrown() {
74+
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
75+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
76+
throw new RuntimeException("something went terribly wrong!");
77+
}
78+
})
79+
.test()
80+
.assertNoValues()
81+
.assertError(RuntimeException.class)
82+
.assertErrorMessage("something went terribly wrong!");
83+
}
84+
85+
@Test
86+
public void flatMapSingleElementError() {
87+
RuntimeException exception = new RuntimeException("test");
88+
89+
Maybe.error(exception).flatMapSingleElement(new Function<Object, SingleSource<Object>>() {
90+
@Override public SingleSource<Object> apply(final Object integer) throws Exception {
91+
return Single.just(new Object());
92+
}
93+
})
94+
.test()
95+
.assertError(exception);
96+
}
97+
98+
@Test
99+
public void flatMapSingleElementEmpty() {
100+
Maybe.<Integer>empty().flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
101+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
102+
return Single.just(2);
103+
}
104+
})
105+
.test()
106+
.assertNoValues()
107+
.assertResult();
108+
}
109+
110+
@Test
111+
public void dispose() {
112+
TestHelper.checkDisposed(Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
113+
@Override
114+
public SingleSource<Integer> apply(final Integer integer) throws Exception {
115+
return Single.just(2);
116+
}
117+
}));
118+
}
119+
120+
@Test
121+
public void doubleOnSubscribe() {
122+
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Integer>, Maybe<Integer>>() {
123+
@Override
124+
public Maybe<Integer> apply(Maybe<Integer> m) throws Exception {
125+
return m.flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
126+
@Override
127+
public SingleSource<Integer> apply(final Integer integer) throws Exception {
128+
return Single.just(2);
129+
}
130+
});
131+
}
132+
});
133+
}
134+
135+
@Test
136+
public void singleErrors() {
137+
Maybe.just(1)
138+
.flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
139+
@Override
140+
public SingleSource<Integer> apply(final Integer integer) throws Exception {
141+
return Single.error(new TestException());
142+
}
143+
})
144+
.test()
145+
.assertFailure(TestException.class);
146+
}
147+
}

0 commit comments

Comments
 (0)