Skip to content

Commit 75e9bfa

Browse files
davidmotenakarnokd
authored andcommitted
FlowableScanSeed - prevent post-terminal events (#4899)
1 parent 846afd3 commit 75e9bfa

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber
5151
private static final long serialVersionUID = -1776795561228106469L;
5252

5353
final BiFunction<R, ? super T, R> accumulator;
54+
55+
boolean done;
5456

5557
ScanSeedSubscriber(Subscriber<? super R> actual, BiFunction<R, ? super T, R> accumulator, R value) {
5658
super(actual);
@@ -60,6 +62,10 @@ static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber
6062

6163
@Override
6264
public void onNext(T t) {
65+
if (done) {
66+
return;
67+
}
68+
6369
R v = value;
6470

6571
R u;
@@ -80,12 +86,20 @@ public void onNext(T t) {
8086

8187
@Override
8288
public void onError(Throwable t) {
89+
if (done) {
90+
return;
91+
}
92+
done = true;
8393
value = null;
8494
actual.onError(t);
8595
}
8696

8797
@Override
8898
public void onComplete() {
99+
if (done) {
100+
return;
101+
}
102+
done = true;
89103
complete(value);
90104
}
91105
}

src/test/java/io/reactivex/flowable/FlowableScanTests.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@
1313

1414
package io.reactivex.flowable;
1515

16+
import static org.junit.Assert.assertEquals;
17+
1618
import java.util.HashMap;
19+
import java.util.concurrent.Callable;
20+
import java.util.concurrent.atomic.AtomicInteger;
1721

22+
import org.junit.Assert;
1823
import org.junit.Test;
1924

25+
import io.reactivex.Flowable;
2026
import io.reactivex.flowable.FlowableEventStream.Event;
2127
import io.reactivex.functions.*;
2228

@@ -41,4 +47,83 @@ public void accept(HashMap<String, String> v) {
4147
}
4248
});
4349
}
50+
51+
@Test
52+
public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
53+
final RuntimeException e = new RuntimeException();
54+
Burst.item(1).error(e).scan(0, new BiFunction<Integer, Integer, Integer>() {
55+
56+
@Override
57+
public Integer apply(Integer n1, Integer n2) throws Exception {
58+
throw e;
59+
}})
60+
.test()
61+
.assertNoValues()
62+
.assertError(e);
63+
}
64+
65+
@Test
66+
public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
67+
final RuntimeException e = new RuntimeException();
68+
Burst.item(1).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
69+
70+
@Override
71+
public Integer apply(Integer n1, Integer n2) throws Exception {
72+
throw e;
73+
}})
74+
.test()
75+
.assertNoValues()
76+
.assertError(e);
77+
}
78+
79+
@Test
80+
public void testFlowableScanSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
81+
final RuntimeException e = new RuntimeException();
82+
final AtomicInteger count = new AtomicInteger();
83+
Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
84+
85+
@Override
86+
public Integer apply(Integer n1, Integer n2) throws Exception {
87+
count.incrementAndGet();
88+
throw e;
89+
}})
90+
.test()
91+
.assertNoValues()
92+
.assertError(e);
93+
assertEquals(1, count.get());
94+
}
95+
96+
@Test
97+
public void testFlowableScanSeedCompletesNormally() {
98+
Flowable.just(1,2,3).scan(0, new BiFunction<Integer, Integer, Integer>() {
99+
100+
@Override
101+
public Integer apply(Integer t1, Integer t2) throws Exception {
102+
return t1 + t2;
103+
}})
104+
.test()
105+
.assertValues(0, 1, 3, 6)
106+
.assertComplete();
107+
}
108+
109+
@Test
110+
public void testFlowableScanSeedWhenScanSeedProviderThrows() {
111+
final RuntimeException e = new RuntimeException();
112+
Flowable.just(1,2,3).scanWith(new Callable<Integer>() {
113+
@Override
114+
public Integer call() throws Exception {
115+
throw e;
116+
}
117+
},
118+
new BiFunction<Integer, Integer, Integer>() {
119+
120+
@Override
121+
public Integer apply(Integer t1, Integer t2) throws Exception {
122+
return t1 + t2;
123+
}
124+
})
125+
.test()
126+
.assertError(e)
127+
.assertNoValues();
128+
}
44129
}

0 commit comments

Comments
 (0)