Skip to content

Commit 0b3b300

Browse files
authored
2.x: Fix Flowable.flatMap not canceling the inner sources on outer error (#6827)
1 parent ea2c796 commit 0b3b300

File tree

3 files changed

+73
-0
lines changed

3 files changed

+73
-0
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,11 @@ public void onError(Throwable t) {
322322
}
323323
if (errs.addThrowable(t)) {
324324
done = true;
325+
if (!delayErrors) {
326+
for (InnerSubscriber<?, ?> a : subscribers.getAndSet(CANCELLED)) {
327+
a.dispose();
328+
}
329+
}
325330
drain();
326331
} else {
327332
RxJavaPlugins.onError(t);

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,4 +1123,38 @@ public void accept(Integer v) throws Exception {
11231123
assertFalse(pp3.hasSubscribers());
11241124
assertFalse(pp4.hasSubscribers());
11251125
}
1126+
1127+
@Test
1128+
public void mainErrorsInnerCancelled() {
1129+
PublishProcessor<Integer> pp1 = PublishProcessor.create();
1130+
PublishProcessor<Integer> pp2 = PublishProcessor.create();
1131+
1132+
pp1
1133+
.flatMap(Functions.justFunction(pp2))
1134+
.test();
1135+
1136+
pp1.onNext(1);
1137+
assertTrue("No subscribers?", pp2.hasSubscribers());
1138+
1139+
pp1.onError(new TestException());
1140+
1141+
assertFalse("Has subscribers?", pp2.hasSubscribers());
1142+
}
1143+
1144+
@Test
1145+
public void innerErrorsMainCancelled() {
1146+
PublishProcessor<Integer> pp1 = PublishProcessor.create();
1147+
PublishProcessor<Integer> pp2 = PublishProcessor.create();
1148+
1149+
pp1
1150+
.flatMap(Functions.justFunction(pp2))
1151+
.test();
1152+
1153+
pp1.onNext(1);
1154+
assertTrue("No subscribers?", pp2.hasSubscribers());
1155+
1156+
pp2.onError(new TestException());
1157+
1158+
assertFalse("Has subscribers?", pp1.hasSubscribers());
1159+
}
11261160
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,4 +1084,38 @@ public void accept(Integer v) throws Exception {
10841084
assertFalse(ps3.hasObservers());
10851085
assertFalse(ps4.hasObservers());
10861086
}
1087+
1088+
@Test
1089+
public void mainErrorsInnerCancelled() {
1090+
PublishSubject<Integer> ps1 = PublishSubject.create();
1091+
PublishSubject<Integer> ps2 = PublishSubject.create();
1092+
1093+
ps1
1094+
.flatMap(Functions.justFunction(ps2))
1095+
.test();
1096+
1097+
ps1.onNext(1);
1098+
assertTrue("No subscribers?", ps2.hasObservers());
1099+
1100+
ps1.onError(new TestException());
1101+
1102+
assertFalse("Has subscribers?", ps2.hasObservers());
1103+
}
1104+
1105+
@Test
1106+
public void innerErrorsMainCancelled() {
1107+
PublishSubject<Integer> ps1 = PublishSubject.create();
1108+
PublishSubject<Integer> ps2 = PublishSubject.create();
1109+
1110+
ps1
1111+
.flatMap(Functions.justFunction(ps2))
1112+
.test();
1113+
1114+
ps1.onNext(1);
1115+
assertTrue("No subscribers?", ps2.hasObservers());
1116+
1117+
ps2.onError(new TestException());
1118+
1119+
assertFalse("Has subscribers?", ps1.hasObservers());
1120+
}
10871121
}

0 commit comments

Comments
 (0)