Skip to content

Commit ea5fb51

Browse files
committed
Merge branch '2.x' of https://github.com/ReactiveX/RxJava.git into 2.x
2 parents cff0ae3 + 57ac0ff commit ea5fb51

File tree

2 files changed

+125
-36
lines changed

2 files changed

+125
-36
lines changed

src/main/java/io/reactivex/internal/operators/single/SingleFlatMap.java

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import io.reactivex.disposables.Disposable;
1818
import io.reactivex.exceptions.Exceptions;
1919
import io.reactivex.functions.Function;
20-
import io.reactivex.internal.disposables.SequentialDisposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.internal.functions.ObjectHelper;
22+
import java.util.concurrent.atomic.AtomicReference;
2123

2224
public final class SingleFlatMap<T, R> extends Single<R> {
2325
final SingleSource<? extends T> source;
@@ -30,72 +32,84 @@ public SingleFlatMap(SingleSource<? extends T> source, Function<? super T, ? ext
3032
}
3133

3234
@Override
33-
protected void subscribeActual(SingleObserver<? super R> subscriber) {
34-
SingleFlatMapCallback<T, R> parent = new SingleFlatMapCallback<T, R>(subscriber, mapper);
35-
subscriber.onSubscribe(parent.sd);
36-
source.subscribe(parent);
35+
protected void subscribeActual(SingleObserver<? super R> actual) {
36+
source.subscribe(new SingleFlatMapCallback<T, R>(actual, mapper));
3737
}
3838

39-
static final class SingleFlatMapCallback<T, R> implements SingleObserver<T> {
39+
static final class SingleFlatMapCallback<T, R>
40+
extends AtomicReference<Disposable>
41+
implements SingleObserver<T>, Disposable {
4042
final SingleObserver<? super R> actual;
4143
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
4244

43-
final SequentialDisposable sd;
44-
4545
SingleFlatMapCallback(SingleObserver<? super R> actual,
4646
Function<? super T, ? extends SingleSource<? extends R>> mapper) {
4747
this.actual = actual;
4848
this.mapper = mapper;
49-
this.sd = new SequentialDisposable();
49+
}
50+
51+
@Override
52+
public void dispose() {
53+
DisposableHelper.dispose(this);
54+
}
55+
56+
@Override
57+
public boolean isDisposed() {
58+
return DisposableHelper.isDisposed(get());
5059
}
5160

5261
@Override
5362
public void onSubscribe(Disposable d) {
54-
sd.replace(d);
63+
if (DisposableHelper.setOnce(this, d)) {
64+
actual.onSubscribe(this);
65+
}
5566
}
5667

5768
@Override
5869
public void onSuccess(T value) {
5970
SingleSource<? extends R> o;
6071

6172
try {
62-
o = mapper.apply(value);
73+
o = ObjectHelper.requireNonNull(mapper.apply(value), "The single returned by the mapper is null");
6374
} catch (Throwable e) {
6475
Exceptions.throwIfFatal(e);
6576
actual.onError(e);
6677
return;
6778
}
6879

69-
if (o == null) {
70-
actual.onError(new NullPointerException("The single returned by the mapper is null"));
71-
return;
72-
}
73-
74-
if (sd.isDisposed()) {
75-
return;
76-
}
77-
78-
o.subscribe(new SingleObserver<R>() {
79-
@Override
80-
public void onSubscribe(Disposable d) {
81-
sd.replace(d);
82-
}
83-
84-
@Override
85-
public void onSuccess(R value) {
86-
actual.onSuccess(value);
87-
}
88-
89-
@Override
90-
public void onError(Throwable e) {
91-
actual.onError(e);
92-
}
93-
});
80+
o.subscribe(new FlatMapSingleObserver<R>(this, actual));
9481
}
9582

9683
@Override
9784
public void onError(Throwable e) {
9885
actual.onError(e);
9986
}
87+
88+
static final class FlatMapSingleObserver<R> implements SingleObserver<R> {
89+
90+
final AtomicReference<Disposable> parent;
91+
92+
final SingleObserver<? super R> actual;
93+
94+
FlatMapSingleObserver(AtomicReference<Disposable> parent, SingleObserver<? super R> actual) {
95+
this.parent = parent;
96+
this.actual = actual;
97+
}
98+
99+
@Override
100+
public void onSubscribe(final Disposable d) {
101+
DisposableHelper.replace(parent, d);
102+
}
103+
104+
@Override
105+
public void onSuccess(final R value) {
106+
actual.onSuccess(value);
107+
}
108+
109+
@Override
110+
public void onError(final Throwable e) {
111+
actual.onError(e);
112+
}
113+
}
100114
}
101115
}

src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,79 @@ public Publisher<Integer> apply(Integer v) throws Exception {
126126
.test()
127127
.assertResult(1, 2, 3, 4, 5);
128128
}
129+
130+
@Test(expected = NullPointerException.class)
131+
public void flatMapNull() {
132+
Single.just(1)
133+
.flatMap(null);
134+
}
135+
136+
@Test
137+
public void flatMapValue() {
138+
Single.just(1).flatMap(new Function<Integer, SingleSource<Integer>>() {
139+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
140+
if (integer == 1) {
141+
return Single.just(2);
142+
}
143+
144+
return Single.just(1);
145+
}
146+
})
147+
.test()
148+
.assertResult(2);
149+
}
150+
151+
@Test
152+
public void flatMapValueDifferentType() {
153+
Single.just(1).flatMap(new Function<Integer, SingleSource<String>>() {
154+
@Override public SingleSource<String> apply(final Integer integer) throws Exception {
155+
if (integer == 1) {
156+
return Single.just("2");
157+
}
158+
159+
return Single.just("1");
160+
}
161+
})
162+
.test()
163+
.assertResult("2");
164+
}
165+
166+
@Test
167+
public void flatMapValueNull() {
168+
Single.just(1).flatMap(new Function<Integer, SingleSource<Integer>>() {
169+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
170+
return null;
171+
}
172+
})
173+
.test()
174+
.assertNoValues()
175+
.assertError(NullPointerException.class)
176+
.assertErrorMessage("The single returned by the mapper is null");
177+
}
178+
179+
@Test
180+
public void flatMapValueErrorThrown() {
181+
Single.just(1).flatMap(new Function<Integer, SingleSource<Integer>>() {
182+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
183+
throw new RuntimeException("something went terribly wrong!");
184+
}
185+
})
186+
.test()
187+
.assertNoValues()
188+
.assertError(RuntimeException.class)
189+
.assertErrorMessage("something went terribly wrong!");
190+
}
191+
192+
@Test
193+
public void flatMapError() {
194+
RuntimeException exception = new RuntimeException("test");
195+
196+
Single.error(exception).flatMap(new Function<Object, SingleSource<Object>>() {
197+
@Override public SingleSource<Object> apply(final Object integer) throws Exception {
198+
return Single.just(new Object());
199+
}
200+
})
201+
.test()
202+
.assertError(exception);
203+
}
129204
}

0 commit comments

Comments
 (0)