Skip to content

Commit 0ffb50f

Browse files
davidmotenakarnokd
authored andcommitted
2.x: add full implementation for Single.flatMapPublisher so doesn't batch requests (#6015) (#6021)
1 parent d24bfbd commit 0ffb50f

File tree

3 files changed

+228
-1
lines changed

3 files changed

+228
-1
lines changed

src/main/java/io/reactivex/Single.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2460,7 +2460,8 @@ public final <R> Maybe<R> flatMapMaybe(final Function<? super T, ? extends Maybe
24602460
@CheckReturnValue
24612461
@SchedulerSupport(SchedulerSupport.NONE)
24622462
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper) {
2463-
return toFlowable().flatMap(mapper);
2463+
ObjectHelper.requireNonNull(mapper, "mapper is null");
2464+
return RxJavaPlugins.onAssembly(new SingleFlatMapPublisher<T, R>(this, mapper));
24642465
}
24652466

24662467
/**
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import java.util.concurrent.atomic.AtomicLong;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
22+
23+
import io.reactivex.Flowable;
24+
import io.reactivex.FlowableSubscriber;
25+
import io.reactivex.Scheduler;
26+
import io.reactivex.SingleObserver;
27+
import io.reactivex.SingleSource;
28+
import io.reactivex.disposables.Disposable;
29+
import io.reactivex.exceptions.Exceptions;
30+
import io.reactivex.functions.Function;
31+
import io.reactivex.internal.functions.ObjectHelper;
32+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
33+
34+
/**
35+
* A Flowable that emits items based on applying a specified function to the item emitted by the
36+
* source Single, where that function returns a Publisher.
37+
* <p>
38+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapPublisher.png" alt="">
39+
* <dl>
40+
* <dt><b>Backpressure:</b></dt>
41+
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
42+
* and the {@code Publisher} returned by the mapper function is expected to honor it as well.</dd>
43+
* <dt><b>Scheduler:</b></dt>
44+
* <dd>{@code flatMapPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
45+
* </dl>
46+
*
47+
* @param <T> the source value type
48+
* @param <R> the result value type
49+
*
50+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
51+
* @since 2.1.15
52+
*/
53+
public final class SingleFlatMapPublisher<T, R> extends Flowable<R> {
54+
55+
final SingleSource<T> source;
56+
final Function<? super T, ? extends Publisher<? extends R>> mapper;
57+
58+
public SingleFlatMapPublisher(SingleSource<T> source,
59+
Function<? super T, ? extends Publisher<? extends R>> mapper) {
60+
this.source = source;
61+
this.mapper = mapper;
62+
}
63+
64+
@Override
65+
protected void subscribeActual(Subscriber<? super R> actual) {
66+
source.subscribe(new SingleFlatMapPublisherObserver<T, R>(actual, mapper));
67+
}
68+
69+
static final class SingleFlatMapPublisherObserver<S, T> extends AtomicLong
70+
implements SingleObserver<S>, FlowableSubscriber<T>, Subscription {
71+
72+
private static final long serialVersionUID = 7759721921468635667L;
73+
74+
final Subscriber<? super T> actual;
75+
final Function<? super S, ? extends Publisher<? extends T>> mapper;
76+
final AtomicReference<Subscription> parent;
77+
Disposable disposable;
78+
79+
SingleFlatMapPublisherObserver(Subscriber<? super T> actual,
80+
Function<? super S, ? extends Publisher<? extends T>> mapper) {
81+
this.actual = actual;
82+
this.mapper = mapper;
83+
this.parent = new AtomicReference<Subscription>();
84+
}
85+
86+
@Override
87+
public void onSubscribe(Disposable d) {
88+
this.disposable = d;
89+
actual.onSubscribe(this);
90+
}
91+
92+
@Override
93+
public void onSuccess(S value) {
94+
Publisher<? extends T> f;
95+
try {
96+
f = ObjectHelper.requireNonNull(mapper.apply(value), "the mapper returned a null Publisher");
97+
} catch (Throwable e) {
98+
Exceptions.throwIfFatal(e);
99+
actual.onError(e);
100+
return;
101+
}
102+
f.subscribe(this);
103+
}
104+
105+
@Override
106+
public void onSubscribe(Subscription s) {
107+
SubscriptionHelper.deferredSetOnce(parent, this, s);
108+
}
109+
110+
@Override
111+
public void onNext(T t) {
112+
actual.onNext(t);
113+
}
114+
115+
@Override
116+
public void onComplete() {
117+
actual.onComplete();
118+
}
119+
120+
@Override
121+
public void onError(Throwable e) {
122+
actual.onError(e);
123+
}
124+
125+
@Override
126+
public void request(long n) {
127+
SubscriptionHelper.deferredRequest(parent, this, n);
128+
}
129+
130+
@Override
131+
public void cancel() {
132+
disposable.dispose();
133+
SubscriptionHelper.cancel(parent);
134+
}
135+
}
136+
137+
}

src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515

1616
import static org.junit.Assert.*;
1717

18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
1820
import org.junit.Test;
1921
import org.reactivestreams.Publisher;
2022

2123
import io.reactivex.*;
2224
import io.reactivex.exceptions.TestException;
2325
import io.reactivex.functions.*;
26+
import io.reactivex.subscribers.TestSubscriber;
2427

2528
public class SingleFlatMapTest {
2629

@@ -126,6 +129,92 @@ public Publisher<Integer> apply(Integer v) throws Exception {
126129
.test()
127130
.assertResult(1, 2, 3, 4, 5);
128131
}
132+
133+
@Test(expected = NullPointerException.class)
134+
public void flatMapPublisherMapperNull() {
135+
Single.just(1).flatMapPublisher(null);
136+
}
137+
138+
@Test
139+
public void flatMapPublisherMapperThrows() {
140+
final TestException ex = new TestException();
141+
Single.just(1)
142+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
143+
@Override
144+
public Publisher<Integer> apply(Integer v) throws Exception {
145+
throw ex;
146+
}
147+
})
148+
.test()
149+
.assertNoValues()
150+
.assertError(ex);
151+
}
152+
153+
@Test
154+
public void flatMapPublisherSingleError() {
155+
final TestException ex = new TestException();
156+
Single.<Integer>error(ex)
157+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
158+
@Override
159+
public Publisher<Integer> apply(Integer v) throws Exception {
160+
return Flowable.just(1);
161+
}
162+
})
163+
.test()
164+
.assertNoValues()
165+
.assertError(ex);
166+
}
167+
168+
@Test
169+
public void flatMapPublisherCancelDuringSingle() {
170+
final AtomicBoolean disposed = new AtomicBoolean();
171+
TestSubscriber<Integer> ts = Single.<Integer>never()
172+
.doOnDispose(new Action() {
173+
@Override
174+
public void run() throws Exception {
175+
disposed.set(true);
176+
}
177+
})
178+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
179+
@Override
180+
public Publisher<Integer> apply(Integer v) throws Exception {
181+
return Flowable.range(v, 5);
182+
}
183+
})
184+
.test()
185+
.assertNoValues()
186+
.assertNotTerminated();
187+
assertFalse(disposed.get());
188+
ts.cancel();
189+
assertTrue(disposed.get());
190+
ts.assertNotTerminated();
191+
}
192+
193+
@Test
194+
public void flatMapPublisherCancelDuringFlowable() {
195+
final AtomicBoolean disposed = new AtomicBoolean();
196+
TestSubscriber<Integer> ts =
197+
Single.just(1)
198+
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
199+
@Override
200+
public Publisher<Integer> apply(Integer v) throws Exception {
201+
return Flowable.<Integer>never()
202+
.doOnCancel(new Action() {
203+
@Override
204+
public void run() throws Exception {
205+
disposed.set(true);
206+
}
207+
});
208+
}
209+
})
210+
.test()
211+
.assertNoValues()
212+
.assertNotTerminated();
213+
assertFalse(disposed.get());
214+
ts.cancel();
215+
assertTrue(disposed.get());
216+
ts.assertNotTerminated();
217+
}
129218

130219
@Test(expected = NullPointerException.class)
131220
public void flatMapNull() {

0 commit comments

Comments
 (0)