Skip to content

Commit a39fa89

Browse files
davidmotenakarnokd
authored andcommitted
FlowableScan - prevent multiple terminal emissions (#4901)
1 parent b7f81d2 commit a39fa89

File tree

3 files changed

+134
-37
lines changed

3 files changed

+134
-37
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.functions.BiFunction;
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
public final class FlowableScan<T> extends AbstractFlowableWithUpstream<T, T> {
2425
final BiFunction<T, T, T> accumulator;
@@ -39,6 +40,8 @@ static final class ScanSubscriber<T> implements Subscriber<T>, Subscription {
3940
Subscription s;
4041

4142
T value;
43+
44+
boolean done;
4245

4346
ScanSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> accumulator) {
4447
this.actual = actual;
@@ -55,6 +58,9 @@ public void onSubscribe(Subscription s) {
5558

5659
@Override
5760
public void onNext(T t) {
61+
if (done) {
62+
return;
63+
}
5864
final Subscriber<? super T> a = actual;
5965
T v = value;
6066
if (v == null) {
@@ -68,7 +74,7 @@ public void onNext(T t) {
6874
} catch (Throwable e) {
6975
Exceptions.throwIfFatal(e);
7076
s.cancel();
71-
a.onError(e);
77+
onError(e);
7278
return;
7379
}
7480

@@ -79,11 +85,20 @@ public void onNext(T t) {
7985

8086
@Override
8187
public void onError(Throwable t) {
88+
if (done) {
89+
RxJavaPlugins.onError(t);
90+
return;
91+
}
92+
done = true;
8293
actual.onError(t);
8394
}
8495

8596
@Override
8697
public void onComplete() {
98+
if (done) {
99+
return;
100+
}
101+
done = true;
87102
actual.onComplete();
88103
}
89104

src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.internal.functions.ObjectHelper;
2222
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
2323
import io.reactivex.internal.subscriptions.EmptySubscription;
24+
import io.reactivex.plugins.RxJavaPlugins;
2425

2526
public final class FlowableScanSeed<T, R> extends AbstractFlowableWithUpstream<T, R> {
2627
final BiFunction<R, ? super T, R> accumulator;
@@ -87,6 +88,7 @@ public void onNext(T t) {
8788
@Override
8889
public void onError(Throwable t) {
8990
if (done) {
91+
RxJavaPlugins.onError(t);
9092
return;
9193
}
9294
done = true;

src/test/java/io/reactivex/flowable/FlowableScanTests.java

Lines changed: 116 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,25 @@
1515

1616
import static org.junit.Assert.assertEquals;
1717

18+
import java.util.Arrays;
1819
import java.util.HashMap;
20+
import java.util.List;
1921
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CopyOnWriteArrayList;
2023
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicReference;
2125

2226
import org.junit.Assert;
2327
import org.junit.Test;
2428

2529
import io.reactivex.Flowable;
2630
import io.reactivex.flowable.FlowableEventStream.Event;
2731
import io.reactivex.functions.*;
32+
import io.reactivex.plugins.RxJavaPlugins;
2833

2934
public class FlowableScanTests {
3035

36+
3137
@Test
3238
public void testUnsubscribeScan() {
3339

@@ -49,81 +55,155 @@ public void accept(HashMap<String, String> v) {
4955
}
5056

5157
@Test
52-
public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
58+
public void testScanWithSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
59+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
60+
Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {
61+
@Override
62+
public void accept(Throwable t) throws Exception {
63+
list.add(t);
64+
}};
65+
try {
66+
RxJavaPlugins.setErrorHandler(errorConsumer);
67+
final RuntimeException e = new RuntimeException();
68+
final RuntimeException e2 = new RuntimeException();
69+
Burst.items(1).error(e2)
70+
.scan(0, throwingBiFunction(e))
71+
.test()
72+
.assertNoValues()
73+
.assertError(e);
74+
assertEquals(Arrays.asList(e2), list);
75+
} finally {
76+
RxJavaPlugins.reset();
77+
}
78+
}
79+
80+
@Test
81+
public void testScanWithSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
5382
final RuntimeException e = new RuntimeException();
54-
Burst.item(1).error(e).scan(0, new BiFunction<Integer, Integer, Integer>() {
83+
Burst.item(1).create()
84+
.scan(0, throwingBiFunction(e))
85+
.test()
86+
.assertNoValues()
87+
.assertError(e);
88+
}
89+
90+
@Test
91+
public void testScanWithSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
92+
final RuntimeException e = new RuntimeException();
93+
final AtomicInteger count = new AtomicInteger();
94+
Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
5595

5696
@Override
5797
public Integer apply(Integer n1, Integer n2) throws Exception {
98+
count.incrementAndGet();
5899
throw e;
59100
}})
60101
.test()
61102
.assertNoValues()
62103
.assertError(e);
104+
assertEquals(1, count.get());
105+
}
106+
107+
@Test
108+
public void testScanWithSeedCompletesNormally() {
109+
Flowable.just(1,2,3).scan(0, SUM)
110+
.test()
111+
.assertValues(0, 1, 3, 6)
112+
.assertComplete();
63113
}
64114

65115
@Test
66-
public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
116+
public void testScanWithSeedWhenScanSeedProviderThrows() {
67117
final RuntimeException e = new RuntimeException();
68-
Burst.item(1).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
118+
Flowable.just(1,2,3).scanWith(throwingCallable(e),
119+
SUM)
120+
.test()
121+
.assertError(e)
122+
.assertNoValues();
123+
}
69124

125+
@Test
126+
public void testScanNoSeed() {
127+
Flowable.just(1, 2, 3)
128+
.scan(SUM)
129+
.test()
130+
.assertValues(1, 3, 6)
131+
.assertComplete();
132+
}
133+
134+
@Test
135+
public void testScanNoSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
136+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
137+
Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {
70138
@Override
71-
public Integer apply(Integer n1, Integer n2) throws Exception {
72-
throw e;
73-
}})
139+
public void accept(Throwable t) throws Exception {
140+
list.add(t);
141+
}};
142+
try {
143+
RxJavaPlugins.setErrorHandler(errorConsumer);
144+
final RuntimeException e = new RuntimeException();
145+
final RuntimeException e2 = new RuntimeException();
146+
Burst.items(1, 2).error(e2)
147+
.scan(throwingBiFunction(e))
148+
.test()
149+
.assertValue(1)
150+
.assertError(e);
151+
assertEquals(Arrays.asList(e2), list);
152+
} finally {
153+
RxJavaPlugins.reset();
154+
}
155+
}
156+
157+
@Test
158+
public void testScanNoSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
159+
final RuntimeException e = new RuntimeException();
160+
Burst.items(1, 2).create()
161+
.scan(throwingBiFunction(e))
74162
.test()
75-
.assertNoValues()
163+
.assertValue(1)
76164
.assertError(e);
77165
}
78166

79167
@Test
80-
public void testFlowableScanSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
168+
public void testScanNoSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
81169
final RuntimeException e = new RuntimeException();
82170
final AtomicInteger count = new AtomicInteger();
83-
Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
171+
Burst.items(1, 2, 3).create().scan(new BiFunction<Integer, Integer, Integer>() {
84172

85173
@Override
86174
public Integer apply(Integer n1, Integer n2) throws Exception {
87175
count.incrementAndGet();
88176
throw e;
89177
}})
90178
.test()
91-
.assertNoValues()
179+
.assertValue(1)
92180
.assertError(e);
93181
assertEquals(1, count.get());
94182
}
95183

96-
@Test
97-
public void testFlowableScanSeedCompletesNormally() {
98-
Flowable.just(1,2,3).scan(0, new BiFunction<Integer, Integer, Integer>() {
99-
184+
private static BiFunction<Integer,Integer, Integer> throwingBiFunction(final RuntimeException e) {
185+
return new BiFunction<Integer, Integer, Integer>() {
100186
@Override
101-
public Integer apply(Integer t1, Integer t2) throws Exception {
102-
return t1 + t2;
103-
}})
104-
.test()
105-
.assertValues(0, 1, 3, 6)
106-
.assertComplete();
187+
public Integer apply(Integer n1, Integer n2) throws Exception {
188+
throw e;
189+
}
190+
};
107191
}
192+
193+
private static final BiFunction<Integer, Integer, Integer> SUM = new BiFunction<Integer, Integer, Integer>() {
194+
195+
@Override
196+
public Integer apply(Integer t1, Integer t2) throws Exception {
197+
return t1 + t2;
198+
}
199+
};
108200

109-
@Test
110-
public void testFlowableScanSeedWhenScanSeedProviderThrows() {
111-
final RuntimeException e = new RuntimeException();
112-
Flowable.just(1,2,3).scanWith(new Callable<Integer>() {
201+
private static Callable<Integer> throwingCallable(final RuntimeException e) {
202+
return new Callable<Integer>() {
113203
@Override
114204
public Integer call() throws Exception {
115205
throw e;
116206
}
117-
},
118-
new BiFunction<Integer, Integer, Integer>() {
119-
120-
@Override
121-
public Integer apply(Integer t1, Integer t2) throws Exception {
122-
return t1 + t2;
123-
}
124-
})
125-
.test()
126-
.assertError(e)
127-
.assertNoValues();
207+
};
128208
}
129209
}

0 commit comments

Comments
 (0)