Skip to content

Commit 6349171

Browse files
Fix mergeDelayError Handling of Error in Parent Observable
Fixes #313
1 parent 018bd22 commit 6349171

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,11 @@ public Boolean call(InnerSubscriber<T> s) {
397397

398398
@Override
399399
public void onError(Throwable e) {
400+
completed = true;
401+
innerError(e);
402+
}
403+
404+
private void innerError(Throwable e) {
400405
if (delayErrors) {
401406
synchronized (this) {
402407
if (exceptions == null) {
@@ -540,7 +545,7 @@ public void onNext(T t) {
540545
public void onError(Throwable e) {
541546
// it doesn't go through queues, it immediately onErrors and tears everything down
542547
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
543-
parentSubscriber.onError(e);
548+
parentSubscriber.innerError(e);
544549
}
545550
}
546551

@@ -753,4 +758,4 @@ private int drainQueue() {
753758
}
754759
}
755760
}
756-
}
761+
}

src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import rx.observers.TestSubscriber;
3131

3232
import java.util.ArrayList;
33+
import java.util.Arrays;
3334
import java.util.List;
3435
import java.util.concurrent.CountDownLatch;
3536
import java.util.concurrent.TimeUnit;
@@ -479,6 +480,20 @@ public void onCompleted() {
479480
verify(o, never()).onCompleted();
480481
}
481482

483+
@Test
484+
public void testErrorInParentObservable() {
485+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
486+
Observable.mergeDelayError(
487+
Observable.just(Observable.just(1), Observable.just(2))
488+
.startWith(Observable.<Integer> error(new RuntimeException()))
489+
).subscribe(ts);
490+
ts.awaitTerminalEvent();
491+
ts.assertTerminalEvent();
492+
ts.assertReceivedOnNext(Arrays.asList(1, 2));
493+
assertEquals(1, ts.getOnErrorEvents().size());
494+
495+
}
496+
482497
@Test
483498
public void testErrorInParentObservableDelayed() throws Exception {
484499
final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable();

0 commit comments

Comments
 (0)