Skip to content

Commit 3d3894f

Browse files
Merge pull request #851 from zsxwing/timeout
Reimplement the timeout operator and fix timeout bugs
2 parents bf049a7 + 8cf79d7 commit 3d3894f

File tree

7 files changed

+572
-351
lines changed

7 files changed

+572
-351
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@
8787
import rx.operators.OperationTakeWhile;
8888
import rx.operators.OperationThrottleFirst;
8989
import rx.operators.OperationTimeInterval;
90-
import rx.operators.OperationTimeout;
9190
import rx.operators.OperationTimer;
9291
import rx.operators.OperationToMap;
9392
import rx.operators.OperationToMultimap;
@@ -106,6 +105,8 @@
106105
import rx.operators.OperatorRepeat;
107106
import rx.operators.OperatorSubscribeOn;
108107
import rx.operators.OperatorTake;
108+
import rx.operators.OperatorTimeout;
109+
import rx.operators.OperatorTimeoutWithSelector;
109110
import rx.operators.OperatorTimestamp;
110111
import rx.operators.OperatorToObservableList;
111112
import rx.operators.OperatorToObservableSortedList;
@@ -7755,11 +7756,8 @@ public final Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
77557756
* @return an Observable that completes if either the first item or any subsequent item doesn't
77567757
* arrive within the time windows specified by the timeout selectors
77577758
*/
7758-
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector) {
7759-
if (firstTimeoutSelector == null) {
7760-
throw new NullPointerException("firstTimeoutSelector");
7761-
}
7762-
return timeout(firstTimeoutSelector, timeoutSelector, Observable.<T> empty());
7759+
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<V>> timeoutSelector) {
7760+
return timeout(firstTimeoutSelector, timeoutSelector, null);
77637761
}
77647762

77657763
/**
@@ -7785,14 +7783,11 @@ public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTi
77857783
* @return an Observable that mirrors the source Observable, but switches to the {@code other} Observable if either the first item emitted by the source Observable or any
77867784
* subsequent item don't arrive within time windows defined by the timeout selectors
77877785
*/
7788-
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
7789-
if (firstTimeoutSelector == null) {
7790-
throw new NullPointerException("firstTimeoutSelector");
7791-
}
7792-
if (other == null) {
7793-
throw new NullPointerException("other");
7786+
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<V>> timeoutSelector, Observable<? extends T> other) {
7787+
if(timeoutSelector == null) {
7788+
throw new NullPointerException("timeoutSelector is null");
77947789
}
7795-
return create(OperationTimeout.timeoutSelector(this, firstTimeoutSelector, timeoutSelector, other));
7790+
return lift(new OperatorTimeoutWithSelector<T, U, V>(firstTimeoutSelector, timeoutSelector, other));
77967791
}
77977792

77987793
/**
@@ -7814,8 +7809,8 @@ public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTi
78147809
* the source Observable takes longer to arrive than the time window defined by the
78157810
* selector for the previously emitted item
78167811
*/
7817-
public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector) {
7818-
return timeout(timeoutSelector, Observable.<T> empty());
7812+
public final <V> Observable<T> timeout(Func1<? super T, ? extends Observable<V>> timeoutSelector) {
7813+
return timeout(null, timeoutSelector, null);
78197814
}
78207815

78217816
/**
@@ -7839,11 +7834,8 @@ public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>>
78397834
* fallback Observable if a item emitted by the source Observable takes longer to arrive
78407835
* than the time window defined by the selector for the previously emitted item
78417836
*/
7842-
public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
7843-
if (other == null) {
7844-
throw new NullPointerException("other");
7845-
}
7846-
return create(OperationTimeout.timeoutSelector(this, null, timeoutSelector, other));
7837+
public final <V> Observable<T> timeout(Func1<? super T, ? extends Observable<V>> timeoutSelector, Observable<? extends T> other) {
7838+
return timeout(null, timeoutSelector, other);
78477839
}
78487840

78497841
/**
@@ -7864,7 +7856,7 @@ public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>>
78647856
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244283.aspx">MSDN: Observable.Timeout</a>
78657857
*/
78667858
public final Observable<T> timeout(long timeout, TimeUnit timeUnit) {
7867-
return create(OperationTimeout.timeout(this, timeout, timeUnit));
7859+
return timeout(timeout, timeUnit, null, Schedulers.computation());
78687860
}
78697861

78707862
/**
@@ -7887,7 +7879,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit) {
78877879
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229512.aspx">MSDN: Observable.Timeout</a>
78887880
*/
78897881
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
7890-
return create(OperationTimeout.timeout(this, timeout, timeUnit, other));
7882+
return timeout(timeout, timeUnit, other, Schedulers.computation());
78917883
}
78927884

78937885
/**
@@ -7912,7 +7904,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<?
79127904
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211676.aspx">MSDN: Observable.Timeout</a>
79137905
*/
79147906
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
7915-
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
7907+
return lift(new OperatorTimeout<T>(timeout, timeUnit, other, scheduler));
79167908
}
79177909

79187910
/**
@@ -7935,7 +7927,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<?
79357927
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228946.aspx">MSDN: Observable.Timeout</a>
79367928
*/
79377929
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
7938-
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
7930+
return timeout(timeout, timeUnit, null, scheduler);
79397931
}
79407932

79417933
/**

0 commit comments

Comments
 (0)