Skip to content

Commit b5ec461

Browse files
Merge pull request #434 from johnhmarks/Timeout
Implemented SerialSubscription and Timeout operator
2 parents 132f925 + b58bc45 commit b5ec461

File tree

5 files changed

+418
-5
lines changed

5 files changed

+418
-5
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import rx.operators.OperationTakeUntil;
7575
import rx.operators.OperationTakeWhile;
7676
import rx.operators.OperationThrottleFirst;
77+
import rx.operators.OperationTimeout;
7778
import rx.operators.OperationTimestamp;
7879
import rx.operators.OperationToObservableFuture;
7980
import rx.operators.OperationToObservableIterable;
@@ -1855,8 +1856,6 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
18551856
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
18561857
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
18571858
*
1858-
* @param <T>
1859-
* the type of item emitted by the source Observable
18601859
* @return an Observable that is a chronologically well-behaved version of the source
18611860
* Observable, and that synchronously notifies its {@link Observer}s
18621861
*/
@@ -1876,8 +1875,6 @@ public Observable<T> synchronize() {
18761875
*
18771876
* @param lock
18781877
* The lock object to synchronize each observer call on
1879-
* @param <T>
1880-
* the type of item emitted by the source Observable
18811878
* @return an Observable that is a chronologically well-behaved version of the source
18821879
* Observable, and that synchronously notifies its {@link Observer}s
18831880
*/
@@ -3194,7 +3191,7 @@ public Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
31943191
/**
31953192
* Determines whether an observable sequence contains a specified element.
31963193
*
3197-
* @param value
3194+
* @param element
31983195
* The element to search in the sequence.
31993196
* @return an Observable that emits if the element is in the source sequence.
32003197
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228965(v=vs.103).aspx">MSDN: Observable.Contains</a>
@@ -4507,6 +4504,32 @@ public Observable<T> ignoreElements() {
45074504
return filter(alwaysFalse());
45084505
}
45094506

4507+
/**
4508+
* Returns either the observable sequence or an TimeoutException if timeout elapses.
4509+
* @param timeout
4510+
* The timeout duration
4511+
* @param timeUnit
4512+
* The time unit of the timeout
4513+
* @param scheduler
4514+
* The scheduler to run the timeout timers on.
4515+
* @return The source sequence with a TimeoutException in case of a timeout.
4516+
*/
4517+
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
4518+
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
4519+
}
4520+
4521+
/**
4522+
* Returns either the observable sequence or an TimeoutException if timeout elapses.
4523+
* @param timeout
4524+
* The timeout duration
4525+
* @param timeUnit
4526+
* The time unit of the timeout
4527+
* @return The source sequence with a TimeoutException in case of a timeout.
4528+
*/
4529+
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
4530+
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
4531+
}
4532+
45104533
/**
45114534
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
45124535
* <p>
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observer;
20+
import rx.Scheduler;
21+
import rx.Subscription;
22+
import rx.subscriptions.CompositeSubscription;
23+
import rx.subscriptions.SerialSubscription;
24+
import rx.util.functions.Action0;
25+
import rx.util.functions.Func0;
26+
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
32+
public final class OperationTimeout {
33+
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
34+
return new Timeout<T>(source, timeout, timeUnit, scheduler);
35+
}
36+
37+
private static class Timeout<T> implements Observable.OnSubscribeFunc<T> {
38+
private final Observable<? extends T> source;
39+
private final long timeout;
40+
private final TimeUnit timeUnit;
41+
private final Scheduler scheduler;
42+
43+
private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
44+
this.source = source;
45+
this.timeout = timeout;
46+
this.timeUnit = timeUnit;
47+
this.scheduler = scheduler;
48+
}
49+
50+
@Override
51+
public Subscription onSubscribe(final Observer<? super T> observer) {
52+
final AtomicBoolean terminated = new AtomicBoolean(false);
53+
final AtomicLong actual = new AtomicLong(0L); // Required to handle race between onNext and timeout
54+
final SerialSubscription serial = new SerialSubscription();
55+
final Object gate = new Object();
56+
CompositeSubscription composite = new CompositeSubscription();
57+
final Func0<Subscription> schedule = new Func0<Subscription>() {
58+
@Override
59+
public Subscription call() {
60+
final long expected = actual.get();
61+
return scheduler.schedule(new Action0() {
62+
@Override
63+
public void call() {
64+
boolean timeoutWins = false;
65+
synchronized (gate) {
66+
if (expected == actual.get() && !terminated.getAndSet(true)) {
67+
timeoutWins = true;
68+
}
69+
}
70+
if (timeoutWins) {
71+
observer.onError(new TimeoutException());
72+
}
73+
74+
}
75+
}, timeout, timeUnit);
76+
}
77+
};
78+
SafeObservableSubscription subscription = new SafeObservableSubscription();
79+
composite.add(subscription.wrap(source.subscribe(new Observer<T>() {
80+
@Override
81+
public void onNext(T value) {
82+
boolean onNextWins = false;
83+
synchronized (gate) {
84+
if (!terminated.get()) {
85+
actual.incrementAndGet();
86+
onNextWins = true;
87+
}
88+
}
89+
if (onNextWins) {
90+
serial.setSubscription(schedule.call());
91+
observer.onNext(value);
92+
}
93+
}
94+
95+
@Override
96+
public void onError(Throwable error) {
97+
boolean onErrorWins = false;
98+
synchronized (gate) {
99+
if (!terminated.getAndSet(true)) {
100+
onErrorWins = true;
101+
}
102+
}
103+
if (onErrorWins) {
104+
serial.unsubscribe();
105+
observer.onError(error);
106+
}
107+
}
108+
109+
@Override
110+
public void onCompleted() {
111+
boolean onCompletedWins = false;
112+
synchronized (gate) {
113+
if (!terminated.getAndSet(true)) {
114+
onCompletedWins = true;
115+
}
116+
}
117+
if (onCompletedWins) {
118+
serial.unsubscribe();
119+
observer.onCompleted();
120+
}
121+
}
122+
})));
123+
composite.add(serial);
124+
serial.setSubscription(schedule.call());
125+
return composite;
126+
}
127+
}
128+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import rx.Subscription;
19+
20+
/**
21+
* Represents a subscription whose underlying subscription can be swapped for another subscription
22+
* which causes the previous underlying subscription to be unsubscribed.
23+
*
24+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
25+
*/
26+
public class SerialSubscription implements Subscription {
27+
private boolean unsubscribed;
28+
private Subscription subscription;
29+
private final Object gate = new Object();
30+
31+
@Override
32+
public void unsubscribe() {
33+
Subscription toUnsubscribe = null;
34+
synchronized (gate) {
35+
if (!unsubscribed) {
36+
if (subscription != null) {
37+
toUnsubscribe = subscription;
38+
subscription = null;
39+
}
40+
unsubscribed = true;
41+
}
42+
}
43+
if (toUnsubscribe != null) {
44+
toUnsubscribe.unsubscribe();
45+
}
46+
}
47+
48+
public Subscription getSubscription() {
49+
synchronized (gate) {
50+
return subscription;
51+
}
52+
}
53+
54+
public void setSubscription(Subscription subscription) {
55+
Subscription toUnsubscribe = null;
56+
synchronized (gate) {
57+
if (!unsubscribed) {
58+
if (this.subscription != null) {
59+
toUnsubscribe = this.subscription;
60+
}
61+
this.subscription = subscription;
62+
} else {
63+
toUnsubscribe = subscription;
64+
}
65+
}
66+
if (toUnsubscribe != null) {
67+
toUnsubscribe.unsubscribe();
68+
}
69+
}
70+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
import org.mockito.MockitoAnnotations;
21+
import rx.concurrency.TestScheduler;
22+
import rx.subjects.PublishSubject;
23+
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
27+
import static org.mockito.Matchers.any;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.never;
30+
import static org.mockito.Mockito.verify;
31+
32+
public class TimeoutTests {
33+
private PublishSubject<String> underlyingSubject;
34+
private TestScheduler testScheduler;
35+
private Observable<String> withTimeout;
36+
private static final long TIMEOUT = 3;
37+
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
38+
39+
@Before
40+
public void setUp() {
41+
MockitoAnnotations.initMocks(this);
42+
43+
underlyingSubject = PublishSubject.create();
44+
testScheduler = new TestScheduler();
45+
withTimeout = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, testScheduler);
46+
}
47+
48+
@Test
49+
public void shouldNotTimeoutIfOnNextWithinTimeout() {
50+
Observer<String> observer = mock(Observer.class);
51+
Subscription subscription = withTimeout.subscribe(observer);
52+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
53+
underlyingSubject.onNext("One");
54+
verify(observer).onNext("One");
55+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
56+
verify(observer, never()).onError(any(Throwable.class));
57+
subscription.unsubscribe();
58+
}
59+
60+
@Test
61+
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
62+
Observer<String> observer = mock(Observer.class);
63+
Subscription subscription = withTimeout.subscribe(observer);
64+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
65+
underlyingSubject.onNext("One");
66+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
67+
underlyingSubject.onNext("Two");
68+
verify(observer).onNext("Two");
69+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
70+
verify(observer, never()).onError(any(Throwable.class));
71+
subscription.unsubscribe();
72+
}
73+
74+
@Test
75+
public void shouldTimeoutIfOnNextNotWithinTimeout() {
76+
Observer<String> observer = mock(Observer.class);
77+
Subscription subscription = withTimeout.subscribe(observer);
78+
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
79+
verify(observer).onError(any(TimeoutException.class));
80+
subscription.unsubscribe();
81+
}
82+
83+
@Test
84+
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
85+
Observer<String> observer = mock(Observer.class);
86+
Subscription subscription = withTimeout.subscribe(observer);
87+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
88+
underlyingSubject.onNext("One");
89+
verify(observer).onNext("One");
90+
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
91+
verify(observer).onError(any(TimeoutException.class));
92+
subscription.unsubscribe();
93+
}
94+
95+
@Test
96+
public void shouldCompleteIfUnderlyingComletes() {
97+
Observer<String> observer = mock(Observer.class);
98+
Subscription subscription = withTimeout.subscribe(observer);
99+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
100+
underlyingSubject.onCompleted();
101+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
102+
verify(observer).onCompleted();
103+
verify(observer, never()).onError(any(Throwable.class));
104+
subscription.unsubscribe();
105+
}
106+
107+
@Test
108+
public void shouldErrorIfUnderlyingErrors() {
109+
Observer<String> observer = mock(Observer.class);
110+
Subscription subscription = withTimeout.subscribe(observer);
111+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
112+
underlyingSubject.onError(new UnsupportedOperationException());
113+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
114+
verify(observer).onError(any(UnsupportedOperationException.class));
115+
subscription.unsubscribe();
116+
}
117+
}

0 commit comments

Comments
 (0)