Skip to content

Commit 10da2ae

Browse files
committed
Improved efficiency of SerialSubscription and unit tested Timeout
1 parent 22eaa5e commit 10da2ae

File tree

2 files changed

+128
-3
lines changed

2 files changed

+128
-3
lines changed

rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,19 @@ public class SerialSubscription implements Subscription {
3030

3131
@Override
3232
public void unsubscribe() {
33+
Subscription toUnsubscribe = null;
3334
synchronized (gate) {
3435
if (!unsubscribed) {
3536
if (subscription != null) {
36-
subscription.unsubscribe();
37+
toUnsubscribe = subscription;
3738
subscription = null;
3839
}
3940
unsubscribed = true;
4041
}
4142
}
43+
if (toUnsubscribe != null) {
44+
toUnsubscribe.unsubscribe();
45+
}
4246
}
4347

4448
public Subscription getSubscription() {
@@ -48,15 +52,19 @@ public Subscription getSubscription() {
4852
}
4953

5054
public void setSubscription(Subscription subscription) {
55+
Subscription toUnsubscribe = null;
5156
synchronized (gate) {
5257
if (!unsubscribed) {
5358
if (this.subscription != null) {
54-
this.subscription.unsubscribe();
59+
toUnsubscribe = this.subscription;
5560
}
5661
this.subscription = subscription;
5762
} else {
58-
subscription.unsubscribe();
63+
toUnsubscribe = subscription;
5964
}
6065
}
66+
if (toUnsubscribe != null) {
67+
toUnsubscribe.unsubscribe();
68+
}
6169
}
6270
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
import org.mockito.MockitoAnnotations;
21+
import rx.concurrency.TestScheduler;
22+
import rx.subjects.PublishSubject;
23+
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
27+
import static org.mockito.Matchers.any;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.never;
30+
import static org.mockito.Mockito.verify;
31+
32+
public class TimeoutTests {
33+
private PublishSubject<String> underlyingSubject;
34+
private TestScheduler testScheduler;
35+
private Observable<String> withTimeout;
36+
private static final long TIMEOUT = 3;
37+
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
38+
39+
@Before
40+
public void setUp() {
41+
MockitoAnnotations.initMocks(this);
42+
43+
underlyingSubject = PublishSubject.create();
44+
testScheduler = new TestScheduler();
45+
withTimeout = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, testScheduler);
46+
}
47+
48+
@Test
49+
public void shouldNotTimeoutIfOnNextWithinTimeout() {
50+
Observer<String> observer = mock(Observer.class);
51+
Subscription subscription = withTimeout.subscribe(observer);
52+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
53+
underlyingSubject.onNext("One");
54+
verify(observer).onNext("One");
55+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
56+
verify(observer, never()).onError(any(Throwable.class));
57+
subscription.unsubscribe();
58+
}
59+
60+
@Test
61+
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
62+
Observer<String> observer = mock(Observer.class);
63+
Subscription subscription = withTimeout.subscribe(observer);
64+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
65+
underlyingSubject.onNext("One");
66+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
67+
underlyingSubject.onNext("Two");
68+
verify(observer).onNext("Two");
69+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
70+
verify(observer, never()).onError(any(Throwable.class));
71+
subscription.unsubscribe();
72+
}
73+
74+
@Test
75+
public void shouldTimeoutIfOnNextNotWithinTimeout() {
76+
Observer<String> observer = mock(Observer.class);
77+
Subscription subscription = withTimeout.subscribe(observer);
78+
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
79+
verify(observer).onError(any(TimeoutException.class));
80+
subscription.unsubscribe();
81+
}
82+
83+
@Test
84+
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
85+
Observer<String> observer = mock(Observer.class);
86+
Subscription subscription = withTimeout.subscribe(observer);
87+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
88+
underlyingSubject.onNext("One");
89+
verify(observer).onNext("One");
90+
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
91+
verify(observer).onError(any(TimeoutException.class));
92+
subscription.unsubscribe();
93+
}
94+
95+
@Test
96+
public void shouldCompleteIfUnderlyingComletes() {
97+
Observer<String> observer = mock(Observer.class);
98+
Subscription subscription = withTimeout.subscribe(observer);
99+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
100+
underlyingSubject.onCompleted();
101+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
102+
verify(observer).onCompleted();
103+
verify(observer, never()).onError(any(Throwable.class));
104+
subscription.unsubscribe();
105+
}
106+
107+
@Test
108+
public void shouldErrorIfUnderlyingErrors() {
109+
Observer<String> observer = mock(Observer.class);
110+
Subscription subscription = withTimeout.subscribe(observer);
111+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
112+
underlyingSubject.onError(new UnsupportedOperationException());
113+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
114+
verify(observer).onError(any(UnsupportedOperationException.class));
115+
subscription.unsubscribe();
116+
}
117+
}

0 commit comments

Comments
 (0)