Skip to content

Commit d91ee5a

Browse files
authored
2.x: Fix flatMap inner fused poll crash not cancelling the upstream (#5792)
* 2.x: Fix flatMap inner fused poll crash not cancelling the upstream * Verify Observable.flatMapIterable
1 parent 4fd16ee commit d91ee5a

File tree

5 files changed

+192
-11
lines changed

5 files changed

+192
-11
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ static final class MergeSubscriber<T, U> extends AtomicInteger implements Flowab
8585

8686
final AtomicLong requested = new AtomicLong();
8787

88-
Subscription s;
88+
Subscription upstream;
8989

9090
long uniqueId;
9191
long lastId;
@@ -107,8 +107,8 @@ static final class MergeSubscriber<T, U> extends AtomicInteger implements Flowab
107107

108108
@Override
109109
public void onSubscribe(Subscription s) {
110-
if (SubscriptionHelper.validate(this.s, s)) {
111-
this.s = s;
110+
if (SubscriptionHelper.validate(this.upstream, s)) {
111+
this.upstream = s;
112112
actual.onSubscribe(this);
113113
if (!cancelled) {
114114
if (maxConcurrency == Integer.MAX_VALUE) {
@@ -132,7 +132,7 @@ public void onNext(T t) {
132132
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
133133
} catch (Throwable e) {
134134
Exceptions.throwIfFatal(e);
135-
s.cancel();
135+
upstream.cancel();
136136
onError(e);
137137
return;
138138
}
@@ -154,7 +154,7 @@ public void onNext(T t) {
154154
if (maxConcurrency != Integer.MAX_VALUE && !cancelled
155155
&& ++scalarEmitted == scalarLimit) {
156156
scalarEmitted = 0;
157-
s.request(scalarLimit);
157+
upstream.request(scalarLimit);
158158
}
159159
}
160160
} else {
@@ -238,7 +238,7 @@ void tryEmitScalar(U value) {
238238
if (maxConcurrency != Integer.MAX_VALUE && !cancelled
239239
&& ++scalarEmitted == scalarLimit) {
240240
scalarEmitted = 0;
241-
s.request(scalarLimit);
241+
upstream.request(scalarLimit);
242242
}
243243
} else {
244244
if (q == null) {
@@ -350,7 +350,7 @@ public void request(long n) {
350350
public void cancel() {
351351
if (!cancelled) {
352352
cancelled = true;
353-
s.cancel();
353+
upstream.cancel();
354354
disposeAll();
355355
if (getAndIncrement() == 0) {
356356
SimpleQueue<U> q = queue;
@@ -482,6 +482,9 @@ void drainLoop() {
482482
Exceptions.throwIfFatal(ex);
483483
is.dispose();
484484
errs.addThrowable(ex);
485+
if (!delayErrors) {
486+
upstream.cancel();
487+
}
485488
if (checkTerminate()) {
486489
return;
487490
}
@@ -539,7 +542,7 @@ void drainLoop() {
539542
}
540543

541544
if (replenishMain != 0L && !cancelled) {
542-
s.request(replenishMain);
545+
upstream.request(replenishMain);
543546
}
544547
if (innerCompleted) {
545548
continue;
@@ -594,7 +597,7 @@ void innerError(InnerSubscriber<T, U> inner, Throwable t) {
594597
if (errs.addThrowable(t)) {
595598
inner.done = true;
596599
if (!delayErrors) {
597-
s.cancel();
600+
upstream.cancel();
598601
for (InnerSubscriber<?, ?> a : subscribers.getAndSet(CANCELLED)) {
599602
a.dispose();
600603
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,4 +1033,47 @@ public Object apply(Integer v, Object w) throws Exception {
10331033
.test()
10341034
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Publisher");
10351035
}
1036+
1037+
@Test
1038+
public void failingFusedInnerCancelsSource() {
1039+
final AtomicInteger counter = new AtomicInteger();
1040+
Flowable.range(1, 5)
1041+
.doOnNext(new Consumer<Integer>() {
1042+
@Override
1043+
public void accept(Integer v) throws Exception {
1044+
counter.getAndIncrement();
1045+
}
1046+
})
1047+
.flatMap(new Function<Integer, Publisher<Integer>>() {
1048+
@Override
1049+
public Publisher<Integer> apply(Integer v)
1050+
throws Exception {
1051+
return Flowable.<Integer>fromIterable(new Iterable<Integer>() {
1052+
@Override
1053+
public Iterator<Integer> iterator() {
1054+
return new Iterator<Integer>() {
1055+
@Override
1056+
public boolean hasNext() {
1057+
return true;
1058+
}
1059+
1060+
@Override
1061+
public Integer next() {
1062+
throw new TestException();
1063+
}
1064+
1065+
@Override
1066+
public void remove() {
1067+
throw new UnsupportedOperationException();
1068+
}
1069+
};
1070+
}
1071+
});
1072+
}
1073+
})
1074+
.test()
1075+
.assertFailure(TestException.class);
1076+
1077+
assertEquals(1, counter.get());
1078+
}
10361079
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,4 +914,47 @@ public void multiShareHidden() {
914914
.assertResult(600L);
915915
}
916916
}
917+
918+
@Test
919+
public void failingInnerCancelsSource() {
920+
final AtomicInteger counter = new AtomicInteger();
921+
Flowable.range(1, 5)
922+
.doOnNext(new Consumer<Integer>() {
923+
@Override
924+
public void accept(Integer v) throws Exception {
925+
counter.getAndIncrement();
926+
}
927+
})
928+
.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
929+
@Override
930+
public Iterable<Integer> apply(Integer v)
931+
throws Exception {
932+
return new Iterable<Integer>() {
933+
@Override
934+
public Iterator<Integer> iterator() {
935+
return new Iterator<Integer>() {
936+
@Override
937+
public boolean hasNext() {
938+
return true;
939+
}
940+
941+
@Override
942+
public Integer next() {
943+
throw new TestException();
944+
}
945+
946+
@Override
947+
public void remove() {
948+
throw new UnsupportedOperationException();
949+
}
950+
};
951+
}
952+
};
953+
}
954+
})
955+
.test()
956+
.assertFailure(TestException.class);
957+
958+
assertEquals(1, counter.get());
959+
}
917960
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,4 +894,48 @@ public Object apply(Integer v, Object w) throws Exception {
894894
.test()
895895
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null ObservableSource");
896896
}
897+
898+
899+
@Test
900+
public void failingFusedInnerCancelsSource() {
901+
final AtomicInteger counter = new AtomicInteger();
902+
Observable.range(1, 5)
903+
.doOnNext(new Consumer<Integer>() {
904+
@Override
905+
public void accept(Integer v) throws Exception {
906+
counter.getAndIncrement();
907+
}
908+
})
909+
.flatMap(new Function<Integer, Observable<Integer>>() {
910+
@Override
911+
public Observable<Integer> apply(Integer v)
912+
throws Exception {
913+
return Observable.<Integer>fromIterable(new Iterable<Integer>() {
914+
@Override
915+
public Iterator<Integer> iterator() {
916+
return new Iterator<Integer>() {
917+
@Override
918+
public boolean hasNext() {
919+
return true;
920+
}
921+
922+
@Override
923+
public Integer next() {
924+
throw new TestException();
925+
}
926+
927+
@Override
928+
public void remove() {
929+
throw new UnsupportedOperationException();
930+
}
931+
};
932+
}
933+
});
934+
}
935+
})
936+
.test()
937+
.assertFailure(TestException.class);
938+
939+
assertEquals(1, counter.get());
940+
}
897941
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,17 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import java.util.Arrays;
16+
import static org.junit.Assert.assertEquals;
17+
18+
import java.util.*;
19+
import java.util.concurrent.atomic.AtomicInteger;
1720

1821
import org.junit.Test;
1922

2023
import io.reactivex.*;
21-
import io.reactivex.functions.Function;
24+
import io.reactivex.Observable;
25+
import io.reactivex.exceptions.TestException;
26+
import io.reactivex.functions.*;
2227
import io.reactivex.subjects.PublishSubject;
2328

2429
public class ObservableFlattenIterableTest {
@@ -47,4 +52,47 @@ public Iterable<Integer> apply(Object v) throws Exception {
4752
}
4853
}, false, 1, 1, 10, 20);
4954
}
55+
56+
@Test
57+
public void failingInnerCancelsSource() {
58+
final AtomicInteger counter = new AtomicInteger();
59+
Observable.range(1, 5)
60+
.doOnNext(new Consumer<Integer>() {
61+
@Override
62+
public void accept(Integer v) throws Exception {
63+
counter.getAndIncrement();
64+
}
65+
})
66+
.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
67+
@Override
68+
public Iterable<Integer> apply(Integer v)
69+
throws Exception {
70+
return new Iterable<Integer>() {
71+
@Override
72+
public Iterator<Integer> iterator() {
73+
return new Iterator<Integer>() {
74+
@Override
75+
public boolean hasNext() {
76+
return true;
77+
}
78+
79+
@Override
80+
public Integer next() {
81+
throw new TestException();
82+
}
83+
84+
@Override
85+
public void remove() {
86+
throw new UnsupportedOperationException();
87+
}
88+
};
89+
}
90+
};
91+
}
92+
})
93+
.test()
94+
.assertFailure(TestException.class);
95+
96+
assertEquals(1, counter.get());
97+
}
5098
}

0 commit comments

Comments
 (0)