Skip to content

Commit 061cdb6

Browse files
committed
Fixed incorrect error merging.
1 parent 4439cc1 commit 061cdb6

File tree

2 files changed

+35
-25
lines changed

2 files changed

+35
-25
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,11 +397,13 @@ public Boolean call(InnerSubscriber<T> s) {
397397

398398
@Override
399399
public void onError(Throwable e) {
400-
completed = true;
401-
innerError(e);
400+
if (!completed) {
401+
completed = true;
402+
innerError(e, true);
403+
}
402404
}
403405

404-
private void innerError(Throwable e) {
406+
private void innerError(Throwable e, boolean parent) {
405407
if (delayErrors) {
406408
synchronized (this) {
407409
if (exceptions == null) {
@@ -411,7 +413,9 @@ private void innerError(Throwable e) {
411413
exceptions.add(e);
412414
boolean sendOnComplete = false;
413415
synchronized (this) {
414-
wip--;
416+
if (!parent) {
417+
wip--;
418+
}
415419
if ((wip == 0 && completed) || (wip < 0)) {
416420
sendOnComplete = true;
417421
}
@@ -545,7 +549,7 @@ public void onNext(T t) {
545549
public void onError(Throwable e) {
546550
// it doesn't go through queues, it immediately onErrors and tears everything down
547551
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
548-
parentSubscriber.innerError(e);
552+
parentSubscriber.innerError(e, false);
549553
}
550554
}
551555

src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -496,26 +496,32 @@ public void testErrorInParentObservable() {
496496

497497
@Test
498498
public void testErrorInParentObservableDelayed() throws Exception {
499-
final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable();
500-
final TestASynchronous1sDelayedObservable o2 = new TestASynchronous1sDelayedObservable();
501-
Observable<Observable<String>> parentObservable = Observable.create(new Observable.OnSubscribe<Observable<String>>() {
502-
@Override
503-
public void call(Subscriber<? super Observable<String>> op) {
504-
op.onNext(Observable.create(o1));
505-
op.onNext(Observable.create(o2));
506-
op.onError(new NullPointerException("throwing exception in parent"));
507-
}
508-
});
509-
510-
TestSubscriber<String> ts = new TestSubscriber<String>(stringObserver);
511-
Observable<String> m = Observable.mergeDelayError(parentObservable);
512-
m.subscribe(ts);
513-
ts.awaitTerminalEvent(2000, TimeUnit.MILLISECONDS);
514-
ts.assertTerminalEvent();
515-
516-
verify(stringObserver, times(2)).onNext("hello");
517-
verify(stringObserver, times(1)).onError(any(NullPointerException.class));
518-
verify(stringObserver, never()).onCompleted();
499+
for (int i = 0; i < 50; i++) {
500+
final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable();
501+
final TestASynchronous1sDelayedObservable o2 = new TestASynchronous1sDelayedObservable();
502+
Observable<Observable<String>> parentObservable = Observable.create(new Observable.OnSubscribe<Observable<String>>() {
503+
@Override
504+
public void call(Subscriber<? super Observable<String>> op) {
505+
op.onNext(Observable.create(o1));
506+
op.onNext(Observable.create(o2));
507+
op.onError(new NullPointerException("throwing exception in parent"));
508+
}
509+
});
510+
511+
@SuppressWarnings("unchecked")
512+
Observer<String> stringObserver = mock(Observer.class);
513+
514+
TestSubscriber<String> ts = new TestSubscriber<String>(stringObserver);
515+
Observable<String> m = Observable.mergeDelayError(parentObservable);
516+
m.subscribe(ts);
517+
System.out.println("testErrorInParentObservableDelayed | " + i);
518+
ts.awaitTerminalEvent(2000, TimeUnit.MILLISECONDS);
519+
ts.assertTerminalEvent();
520+
521+
verify(stringObserver, times(2)).onNext("hello");
522+
verify(stringObserver, times(1)).onError(any(NullPointerException.class));
523+
verify(stringObserver, never()).onCompleted();
524+
}
519525
}
520526

521527
private static class TestASynchronous1sDelayedObservable implements Observable.OnSubscribe<String> {

0 commit comments

Comments
 (0)