Skip to content

Commit a041883

Browse files
committed
Fixed issue #737
1 parent e335e61 commit a041883

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

rxjava-core/src/main/java/rx/operators/OperationSwitch.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,12 @@ public void onCompleted() {
126126
sub.unsubscribe();
127127
if (latest == id) {
128128
SwitchObserver.this.hasLatest = false;
129-
}
130129

131-
if (stopped) {
132-
SwitchObserver.this.observer.onCompleted();
133-
SwitchObserver.this.parent.unsubscribe();
130+
if (stopped) {
131+
SwitchObserver.this.observer.onCompleted();
132+
SwitchObserver.this.parent.unsubscribe();
133+
}
134134
}
135-
136135
}
137136
}
138137

rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,4 +380,51 @@ public void call() {
380380
@SuppressWarnings("serial")
381381
private class TestException extends Throwable {
382382
}
383+
384+
@Test
385+
public void testSwitchIssue737() {
386+
// https://github.com/Netflix/RxJava/issues/737
387+
Observable<Observable<String>> source = Observable.create(new Observable.OnSubscribeFunc<Observable<String>>() {
388+
@Override
389+
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
390+
publishNext(observer, 0, Observable.create(new Observable.OnSubscribeFunc<String>() {
391+
@Override
392+
public Subscription onSubscribe(Observer<? super String> observer) {
393+
publishNext(observer, 10, "1-one");
394+
publishNext(observer, 20, "1-two");
395+
// The following events will be ignored
396+
publishNext(observer, 30, "1-three");
397+
publishCompleted(observer, 40);
398+
return Subscriptions.empty();
399+
}
400+
}));
401+
publishNext(observer, 25, Observable.create(new Observable.OnSubscribeFunc<String>() {
402+
@Override
403+
public Subscription onSubscribe(Observer<? super String> observer) {
404+
publishNext(observer, 10, "2-one");
405+
publishNext(observer, 20, "2-two");
406+
publishNext(observer, 30, "2-three");
407+
publishCompleted(observer, 40);
408+
return Subscriptions.empty();
409+
}
410+
}));
411+
publishCompleted(observer, 30);
412+
return Subscriptions.empty();
413+
}
414+
});
415+
416+
Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
417+
sampled.subscribe(observer);
418+
419+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
420+
421+
InOrder inOrder = inOrder(observer);
422+
inOrder.verify(observer, times(1)).onNext("1-one");
423+
inOrder.verify(observer, times(1)).onNext("1-two");
424+
inOrder.verify(observer, times(1)).onNext("2-one");
425+
inOrder.verify(observer, times(1)).onNext("2-two");
426+
inOrder.verify(observer, times(1)).onNext("2-three");
427+
inOrder.verify(observer, times(1)).onCompleted();
428+
inOrder.verifyNoMoreInteractions();
429+
}
383430
}

0 commit comments

Comments
 (0)