Skip to content

Commit 34b4b6b

Browse files
davidmotenakarnokd
authored andcommitted
Observable.scan no seed fix post-terminal behaviour (#4904)
1 parent d52cce9 commit 34b4b6b

File tree

2 files changed

+101
-1
lines changed

2 files changed

+101
-1
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.functions.BiFunction;
2020
import io.reactivex.internal.disposables.DisposableHelper;
2121
import io.reactivex.internal.functions.ObjectHelper;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
public final class ObservableScan<T> extends AbstractObservableWithUpstream<T, T> {
2425
final BiFunction<T, T, T> accumulator;
@@ -39,6 +40,8 @@ static final class ScanObserver<T> implements Observer<T>, Disposable {
3940
Disposable s;
4041

4142
T value;
43+
44+
boolean done;
4245

4346
ScanObserver(Observer<? super T> actual, BiFunction<T, T, T> accumulator) {
4447
this.actual = actual;
@@ -67,6 +70,9 @@ public boolean isDisposed() {
6770

6871
@Override
6972
public void onNext(T t) {
73+
if (done) {
74+
return;
75+
}
7076
final Observer<? super T> a = actual;
7177
T v = value;
7278
if (v == null) {
@@ -80,7 +86,7 @@ public void onNext(T t) {
8086
} catch (Throwable e) {
8187
Exceptions.throwIfFatal(e);
8288
s.dispose();
83-
a.onError(e);
89+
onError(e);
8490
return;
8591
}
8692

@@ -91,11 +97,20 @@ public void onNext(T t) {
9197

9298
@Override
9399
public void onError(Throwable t) {
100+
if (done) {
101+
RxJavaPlugins.onError(t);
102+
return;
103+
}
104+
done = true;
94105
actual.onError(t);
95106
}
96107

97108
@Override
98109
public void onComplete() {
110+
if (done) {
111+
return;
112+
}
113+
done = true;
99114
actual.onComplete();
100115
}
101116
}

src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919

2020
import java.util.*;
2121
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CopyOnWriteArrayList;
2223
import java.util.concurrent.atomic.AtomicInteger;
2324

2425
import org.junit.Test;
2526

2627
import io.reactivex.*;
2728
import io.reactivex.Observable;
2829
import io.reactivex.Observer;
30+
import io.reactivex.disposables.Disposable;
31+
import io.reactivex.disposables.Disposables;
2932
import io.reactivex.exceptions.TestException;
3033
import io.reactivex.functions.*;
3134
import io.reactivex.observers.*;
35+
import io.reactivex.plugins.RxJavaPlugins;
3236
import io.reactivex.subjects.PublishSubject;
3337

3438
public class ObservableScanTest {
@@ -300,4 +304,85 @@ public Object apply(Object a, Object b) throws Exception {
300304
}
301305
}, false, 1, 1, 0, 0);
302306
}
307+
308+
@Test
309+
public void testScanFunctionThrowsAndUpstreamErrorsDoesNotResultInTwoTerminalEvents() {
310+
final RuntimeException err = new RuntimeException();
311+
final RuntimeException err2 = new RuntimeException();
312+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
313+
final Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {
314+
@Override
315+
public void accept(Throwable t) throws Exception {
316+
list.add(t);
317+
}};
318+
try {
319+
RxJavaPlugins.setErrorHandler(errorConsumer);
320+
Observable.unsafeCreate(new ObservableSource<Integer>() {
321+
@Override
322+
public void subscribe(Observer<? super Integer> o) {
323+
Disposable d = Disposables.empty();
324+
o.onSubscribe(d);
325+
o.onNext(1);
326+
o.onNext(2);
327+
o.onError(err2);
328+
}})
329+
.scan(new BiFunction<Integer,Integer,Integer>() {
330+
@Override
331+
public Integer apply(Integer t1, Integer t2) throws Exception {
332+
throw err;
333+
}})
334+
.test()
335+
.assertError(err)
336+
.assertValue(1);
337+
} finally {
338+
RxJavaPlugins.reset();
339+
}
340+
}
341+
342+
@Test
343+
public void testScanFunctionThrowsAndUpstreamCompletesDoesNotResultInTwoTerminalEvents() {
344+
final RuntimeException err = new RuntimeException();
345+
Observable.unsafeCreate(new ObservableSource<Integer>() {
346+
@Override
347+
public void subscribe(Observer<? super Integer> o) {
348+
Disposable d = Disposables.empty();
349+
o.onSubscribe(d);
350+
o.onNext(1);
351+
o.onNext(2);
352+
o.onComplete();
353+
}})
354+
.scan(new BiFunction<Integer,Integer,Integer>() {
355+
@Override
356+
public Integer apply(Integer t1, Integer t2) throws Exception {
357+
throw err;
358+
}})
359+
.test()
360+
.assertError(err)
361+
.assertValue(1);
362+
}
363+
364+
@Test
365+
public void testScanFunctionThrowsAndUpstreamEmitsOnNextResultsInScanFunctionBeingCalledOnlyOnce() {
366+
final RuntimeException err = new RuntimeException();
367+
final AtomicInteger count = new AtomicInteger();
368+
Observable.unsafeCreate(new ObservableSource<Integer>() {
369+
@Override
370+
public void subscribe(Observer<? super Integer> o) {
371+
Disposable d = Disposables.empty();
372+
o.onSubscribe(d);
373+
o.onNext(1);
374+
o.onNext(2);
375+
o.onNext(3);
376+
}})
377+
.scan(new BiFunction<Integer,Integer,Integer>() {
378+
@Override
379+
public Integer apply(Integer t1, Integer t2) throws Exception {
380+
count.incrementAndGet();
381+
throw err;
382+
}})
383+
.test()
384+
.assertError(err)
385+
.assertValue(1);
386+
assertEquals(1, count.get());
387+
}
303388
}

0 commit comments

Comments
 (0)