Skip to content

Commit b763ffa

Browse files
authored
2.x: Fix concatMapDelayError not continuing on fused inner source crash (#6522)
1 parent 31e8d48 commit b763ffa

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,13 @@ void drain() {
520520
vr = supplier.call();
521521
} catch (Throwable e) {
522522
Exceptions.throwIfFatal(e);
523-
upstream.cancel();
524523
errors.addThrowable(e);
525-
downstream.onError(errors.terminate());
526-
return;
524+
if (!veryEnd) {
525+
upstream.cancel();
526+
downstream.onError(errors.terminate());
527+
return;
528+
}
529+
vr = null;
527530
}
528531

529532
if (vr == null) {

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515

1616
import static org.junit.Assert.assertEquals;
1717

18-
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.*;
1919
import java.util.concurrent.atomic.AtomicInteger;
2020

2121
import org.junit.Test;
2222
import org.reactivestreams.Publisher;
2323

2424
import io.reactivex.*;
25-
import io.reactivex.exceptions.TestException;
25+
import io.reactivex.exceptions.*;
2626
import io.reactivex.functions.*;
2727
import io.reactivex.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription;
2828
import io.reactivex.schedulers.Schedulers;
@@ -168,4 +168,42 @@ public void run() throws Exception {
168168

169169
assertEquals(0, counter.get());
170170
}
171+
172+
@Test
173+
public void delayErrorCallableTillTheEnd() {
174+
Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32)
175+
.concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
176+
@Override public Flowable<Integer> apply(final Integer integer) throws Exception {
177+
return Flowable.fromCallable(new Callable<Integer>() {
178+
@Override public Integer call() throws Exception {
179+
if (integer >= 100) {
180+
throw new NullPointerException("test null exp");
181+
}
182+
return integer;
183+
}
184+
});
185+
}
186+
})
187+
.test()
188+
.assertFailure(CompositeException.class, 1, 2, 3, 23, 32);
189+
}
190+
191+
@Test
192+
public void delayErrorCallableEager() {
193+
Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32)
194+
.concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
195+
@Override public Flowable<Integer> apply(final Integer integer) throws Exception {
196+
return Flowable.fromCallable(new Callable<Integer>() {
197+
@Override public Integer call() throws Exception {
198+
if (integer >= 100) {
199+
throw new NullPointerException("test null exp");
200+
}
201+
return integer;
202+
}
203+
});
204+
}
205+
}, 2, false)
206+
.test()
207+
.assertFailure(NullPointerException.class, 1, 2, 3);
208+
}
171209
}

0 commit comments

Comments
 (0)