Skip to content

Commit 3d45dac

Browse files
committed
Chain Subscription in TimeoutSubscriber and SerializedSubscriber
1 parent bcded86 commit 3d45dac

File tree

3 files changed

+88
-0
lines changed

3 files changed

+88
-0
lines changed

rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class SerializedSubscriber<T> extends Subscriber<T> {
2020
private final Observer<T> s;
2121

2222
public SerializedSubscriber(Subscriber<? super T> s) {
23+
super(s);
2324
this.s = new SerializedObserver<T>(s);
2425
}
2526

rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private TimeoutSubscriber(
9393
SerializedSubscriber<T> serializedSubscriber,
9494
TimeoutStub<T> timeoutStub, SerialSubscription serial,
9595
Observable<? extends T> other) {
96+
super(serializedSubscriber);
9697
this.serializedSubscriber = serializedSubscriber;
9798
this.timeoutStub = timeoutStub;
9899
this.serial = serial;

rxjava-core/src/test/java/rx/operators/OperatorTimeoutTests.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.mockito.Matchers.*;
1919
import static org.mockito.Mockito.*;
2020

21+
import java.io.IOException;
2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.concurrent.TimeoutException;
@@ -266,4 +267,89 @@ public void call(Subscriber<? super String> subscriber) {
266267

267268
exit.countDown(); // exit the thread
268269
}
270+
271+
@Test
272+
public void shouldUnsubscribeFromUnderlyingSubscriptionOnTimeout() throws InterruptedException {
273+
// From https://github.com/Netflix/RxJava/pull/951
274+
final Subscription s = mock(Subscription.class);
275+
276+
Observable<String> never = Observable.create(new OnSubscribe<String>() {
277+
public void call(Subscriber<? super String> subscriber) {
278+
subscriber.add(s);
279+
}
280+
});
281+
282+
TestScheduler testScheduler = new TestScheduler();
283+
Observable<String> observableWithTimeout = never.timeout(1000, TimeUnit.MILLISECONDS, testScheduler);
284+
285+
@SuppressWarnings("unchecked")
286+
Observer<String> observer = mock(Observer.class);
287+
observableWithTimeout.subscribe(observer);
288+
289+
testScheduler.advanceTimeBy(2000, TimeUnit.MILLISECONDS);
290+
291+
InOrder inOrder = inOrder(observer);
292+
inOrder.verify(observer).onError(isA(TimeoutException.class));
293+
inOrder.verifyNoMoreInteractions();
294+
295+
verify(s, times(1)).unsubscribe();
296+
}
297+
298+
@Test
299+
public void shouldUnsubscribeFromUnderlyingSubscriptionOnImmediatelyComplete() {
300+
// From https://github.com/Netflix/RxJava/pull/951
301+
final Subscription s = mock(Subscription.class);
302+
303+
Observable<String> immediatelyComplete = Observable.create(new OnSubscribe<String>() {
304+
public void call(Subscriber<? super String> subscriber) {
305+
subscriber.add(s);
306+
subscriber.onCompleted();
307+
}
308+
});
309+
310+
TestScheduler testScheduler = new TestScheduler();
311+
Observable<String> observableWithTimeout = immediatelyComplete.timeout(1000, TimeUnit.MILLISECONDS,
312+
testScheduler);
313+
314+
@SuppressWarnings("unchecked")
315+
Observer<String> observer = mock(Observer.class);
316+
observableWithTimeout.subscribe(observer);
317+
318+
testScheduler.advanceTimeBy(2000, TimeUnit.MILLISECONDS);
319+
320+
InOrder inOrder = inOrder(observer);
321+
inOrder.verify(observer).onCompleted();
322+
inOrder.verifyNoMoreInteractions();
323+
324+
verify(s, times(1)).unsubscribe();
325+
}
326+
327+
@Test
328+
public void shouldUnsubscribeFromUnderlyingSubscriptionOnImmediatelyErrored() throws InterruptedException {
329+
// From https://github.com/Netflix/RxJava/pull/951
330+
final Subscription s = mock(Subscription.class);
331+
332+
Observable<String> immediatelyError = Observable.create(new OnSubscribe<String>() {
333+
public void call(Subscriber<? super String> subscriber) {
334+
subscriber.add(s);
335+
subscriber.onError(new IOException("Error"));
336+
}
337+
});
338+
339+
TestScheduler testScheduler = new TestScheduler();
340+
Observable<String> observableWithTimeout = immediatelyError.timeout(1000, TimeUnit.MILLISECONDS,
341+
testScheduler);
342+
343+
@SuppressWarnings("unchecked")
344+
Observer<String> observer = mock(Observer.class);
345+
observableWithTimeout.subscribe(observer);
346+
347+
testScheduler.advanceTimeBy(2000, TimeUnit.MILLISECONDS);
348+
349+
InOrder inOrder = inOrder(observer);
350+
inOrder.verify(observer).onError(isA(IOException.class));
351+
inOrder.verifyNoMoreInteractions();
352+
353+
verify(s, times(1)).unsubscribe();
354+
}
269355
}

0 commit comments

Comments
 (0)