Skip to content

Commit 669c7b6

Browse files
committed
SerialSubscription and Timeout operator
1 parent ae073dd commit 669c7b6

File tree

4 files changed

+292
-55
lines changed

4 files changed

+292
-55
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -28,61 +28,7 @@
2828
import rx.observables.BlockingObservable;
2929
import rx.observables.ConnectableObservable;
3030
import rx.observables.GroupedObservable;
31-
import rx.operators.OperationAll;
32-
import rx.operators.OperationAny;
33-
import rx.operators.OperationAverage;
34-
import rx.operators.OperationBuffer;
35-
import rx.operators.OperationCache;
36-
import rx.operators.OperationCast;
37-
import rx.operators.OperationCombineLatest;
38-
import rx.operators.OperationConcat;
39-
import rx.operators.OperationDebounce;
40-
import rx.operators.OperationDefaultIfEmpty;
41-
import rx.operators.OperationDefer;
42-
import rx.operators.OperationDematerialize;
43-
import rx.operators.OperationDistinct;
44-
import rx.operators.OperationDistinctUntilChanged;
45-
import rx.operators.OperationElementAt;
46-
import rx.operators.OperationFilter;
47-
import rx.operators.OperationFinally;
48-
import rx.operators.OperationFirstOrDefault;
49-
import rx.operators.OperationGroupBy;
50-
import rx.operators.OperationInterval;
51-
import rx.operators.OperationMap;
52-
import rx.operators.OperationMaterialize;
53-
import rx.operators.OperationMerge;
54-
import rx.operators.OperationMergeDelayError;
55-
import rx.operators.OperationMulticast;
56-
import rx.operators.OperationObserveOn;
57-
import rx.operators.OperationOnErrorResumeNextViaFunction;
58-
import rx.operators.OperationOnErrorResumeNextViaObservable;
59-
import rx.operators.OperationOnErrorReturn;
60-
import rx.operators.OperationOnExceptionResumeNextViaObservable;
61-
import rx.operators.OperationParallel;
62-
import rx.operators.OperationRetry;
63-
import rx.operators.OperationSample;
64-
import rx.operators.OperationScan;
65-
import rx.operators.OperationSkip;
66-
import rx.operators.OperationSkipLast;
67-
import rx.operators.OperationSkipWhile;
68-
import rx.operators.OperationSubscribeOn;
69-
import rx.operators.OperationSum;
70-
import rx.operators.OperationSwitch;
71-
import rx.operators.OperationSynchronize;
72-
import rx.operators.OperationTake;
73-
import rx.operators.OperationTakeLast;
74-
import rx.operators.OperationTakeUntil;
75-
import rx.operators.OperationTakeWhile;
76-
import rx.operators.OperationThrottleFirst;
77-
import rx.operators.OperationTimestamp;
78-
import rx.operators.OperationToObservableFuture;
79-
import rx.operators.OperationToObservableIterable;
80-
import rx.operators.OperationToObservableList;
81-
import rx.operators.OperationToObservableSortedList;
82-
import rx.operators.OperationWindow;
83-
import rx.operators.OperationZip;
84-
import rx.operators.SafeObservableSubscription;
85-
import rx.operators.SafeObserver;
31+
import rx.operators.*;
8632
import rx.plugins.RxJavaErrorHandler;
8733
import rx.plugins.RxJavaObservableExecutionHook;
8834
import rx.plugins.RxJavaPlugins;
@@ -4507,6 +4453,32 @@ public Observable<T> ignoreElements() {
45074453
return filter(alwaysFalse());
45084454
}
45094455

4456+
/**
4457+
* Returns either the observable sequence or an TimeoutException if timeout elapses.
4458+
* @param timeout
4459+
* The timeout duration
4460+
* @param timeUnit
4461+
* The time unit of the timeout
4462+
* @param scheduler
4463+
* The scheduler to run the timeout timers on.
4464+
* @return The source sequence with a TimeoutException in case of a timeout.
4465+
*/
4466+
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
4467+
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
4468+
}
4469+
4470+
/**
4471+
* Returns either the observable sequence or an TimeoutException if timeout elapses.
4472+
* @param timeout
4473+
* The timeout duration
4474+
* @param timeUnit
4475+
* The time unit of the timeout
4476+
* @return The source sequence with a TimeoutException in case of a timeout.
4477+
*/
4478+
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
4479+
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
4480+
}
4481+
45104482
/**
45114483
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
45124484
* <p>
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observer;
20+
import rx.Scheduler;
21+
import rx.Subscription;
22+
import rx.subscriptions.CompositeSubscription;
23+
import rx.subscriptions.SerialSubscription;
24+
import rx.util.functions.Action0;
25+
import rx.util.functions.Func0;
26+
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
32+
public final class OperationTimeout {
33+
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
34+
return new Timeout<T>(source, timeout, timeUnit, scheduler);
35+
}
36+
37+
private static class Timeout<T> implements Observable.OnSubscribeFunc<T> {
38+
private final Observable<? extends T> source;
39+
private final long timeout;
40+
private final TimeUnit timeUnit;
41+
private final Scheduler scheduler;
42+
43+
@Override
44+
public Subscription onSubscribe(final Observer<? super T> observer) {
45+
final AtomicBoolean terminated = new AtomicBoolean(false);
46+
final AtomicLong actual = new AtomicLong(0L); // Required to handle race between onNext and timeout
47+
final SerialSubscription serial = new SerialSubscription();
48+
final Object gate = new Object();
49+
CompositeSubscription composite = new CompositeSubscription();
50+
final Func0<Subscription> schedule = new Func0<Subscription>() {
51+
@Override
52+
public Subscription call() {
53+
final long expected = actual.get();
54+
return scheduler.schedule(new Action0() {
55+
@Override
56+
public void call() {
57+
boolean timeoutWins = false;
58+
synchronized (gate) {
59+
if (expected == actual.get() && !terminated.getAndSet(true)) {
60+
timeoutWins = true;
61+
}
62+
}
63+
if (timeoutWins) {
64+
observer.onError(new TimeoutException());
65+
}
66+
67+
}
68+
}, timeout, timeUnit);
69+
}
70+
};
71+
SafeObservableSubscription subscription = new SafeObservableSubscription();
72+
composite.add(subscription.wrap(source.subscribe(new Observer<T>() {
73+
@Override
74+
public void onNext(T value) {
75+
boolean onNextWins = false;
76+
synchronized (gate) {
77+
if (!terminated.get()) {
78+
actual.incrementAndGet();
79+
onNextWins = true;
80+
}
81+
}
82+
if (onNextWins) {
83+
serial.setSubscription(schedule.call());
84+
observer.onNext(value);
85+
}
86+
}
87+
88+
@Override
89+
public void onError(Throwable error) {
90+
boolean onErrorWins = false;
91+
synchronized (gate) {
92+
if (!terminated.getAndSet(true)) {
93+
onErrorWins = true;
94+
}
95+
}
96+
if (onErrorWins) {
97+
serial.unsubscribe();
98+
observer.onError(error);
99+
}
100+
}
101+
102+
@Override
103+
public void onCompleted() {
104+
boolean onCompletedWins = false;
105+
synchronized (gate) {
106+
if (!terminated.getAndSet(true)) {
107+
onCompletedWins = true;
108+
}
109+
}
110+
if (onCompletedWins) {
111+
serial.unsubscribe();
112+
observer.onCompleted();
113+
}
114+
}
115+
})));
116+
composite.add(serial);
117+
serial.setSubscription(schedule.call());
118+
return composite;
119+
}
120+
121+
private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
122+
this.source = source;
123+
this.timeout = timeout;
124+
this.timeUnit = timeUnit;
125+
this.scheduler = scheduler;
126+
}
127+
}
128+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import rx.Subscription;
19+
20+
/**
21+
* Represents a subscription whose underlying subscription can be swapped for another subscription
22+
* which causes the previous underlying subscription to be unsubscribed.
23+
*
24+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
25+
*/
26+
public class SerialSubscription implements Subscription {
27+
private boolean unsubscribed;
28+
private Subscription subscription;
29+
private final Object gate = new Object();
30+
31+
@Override
32+
public void unsubscribe() {
33+
synchronized (gate) {
34+
if (!unsubscribed) {
35+
if (subscription != null) {
36+
subscription.unsubscribe();
37+
subscription = null;
38+
}
39+
unsubscribed = true;
40+
}
41+
}
42+
}
43+
44+
public Subscription getSubscription() {
45+
synchronized (gate) {
46+
return subscription;
47+
}
48+
}
49+
50+
public void setSubscription(Subscription subscription) {
51+
synchronized (gate) {
52+
if (!unsubscribed) {
53+
if (this.subscription != null) {
54+
this.subscription.unsubscribe();
55+
}
56+
this.subscription = subscription;
57+
} else {
58+
subscription.unsubscribe();
59+
}
60+
}
61+
}
62+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
import org.mockito.MockitoAnnotations;
21+
import rx.Subscription;
22+
23+
import static org.mockito.Mockito.*;
24+
25+
public class SerialSubscriptionTests {
26+
private SerialSubscription serialSubscription;
27+
28+
@Before
29+
public void setUp() {
30+
MockitoAnnotations.initMocks(this);
31+
32+
serialSubscription = new SerialSubscription();
33+
}
34+
35+
@Test
36+
public void unsubscribingWithoutUnderlyingDoesNothing() {
37+
serialSubscription.unsubscribe();
38+
}
39+
40+
@Test
41+
public void unsubscribingWithSingleUnderlyingUnsubscribes() {
42+
Subscription underlying = mock(Subscription.class);
43+
serialSubscription.setSubscription(underlying);
44+
underlying.unsubscribe();
45+
verify(underlying).unsubscribe();
46+
}
47+
48+
@Test
49+
public void replacingFirstUnderlyingCausesUnsubscription() {
50+
Subscription first = mock(Subscription.class);
51+
serialSubscription.setSubscription(first);
52+
Subscription second = mock(Subscription.class);
53+
serialSubscription.setSubscription(second);
54+
verify(first).unsubscribe();
55+
}
56+
57+
@Test
58+
public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
59+
Subscription first = mock(Subscription.class);
60+
serialSubscription.setSubscription(first);
61+
Subscription second = mock(Subscription.class);
62+
serialSubscription.setSubscription(second);
63+
serialSubscription.unsubscribe();
64+
verify(second).unsubscribe();
65+
}
66+
67+
@Test
68+
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription()
69+
{
70+
serialSubscription.unsubscribe();
71+
Subscription underlying = mock(Subscription.class);
72+
serialSubscription.setSubscription(underlying);
73+
verify(underlying).unsubscribe();
74+
}
75+
}

0 commit comments

Comments
 (0)