Skip to content

Commit 34c5638

Browse files
committed
Timeout with selector overloads
1 parent fe438ca commit 34c5638

File tree

3 files changed

+474
-0
lines changed

3 files changed

+474
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6691,6 +6691,86 @@ public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? exten
66916691
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
66926692
}
66936693

6694+
/**
6695+
* Create an observable which completes if a source item doesn't arrive after the
6696+
* previous one in the time window specified by the per-item observable.
6697+
* <p>
6698+
* The arrival of the first source item is not timed out.
6699+
* @param <U> the timeout value type (ignored)
6700+
* @param timeoutSelector function that returns an observable for each source item
6701+
* which determines the timeout window for the subsequent source item
6702+
* @return an observable which completes if a source item doesn't arrive after the
6703+
* previous one in the time window specified by the per-item observable.
6704+
*/
6705+
public <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector) {
6706+
return timeout(timeoutSelector, Observable.<T>empty());
6707+
}
6708+
6709+
/**
6710+
* Create an observable which switches to the other Observable if a source
6711+
* item doesn't arrive after the
6712+
* previous one in the time window specified by the per-item observable.
6713+
* <p>
6714+
* The arrival of the first source item is not timed out.
6715+
* @param <U> the timeout value type (ignored)
6716+
* @param timeoutSelector function that returns an observable for each source item
6717+
* which determines the timeout window for the subsequent source item
6718+
* @param other the other observable to switch to if the source times out
6719+
* @return an observable which switches to the other Observable if a source
6720+
* item doesn't arrive after the
6721+
* previous one in the time window specified by the per-item observable
6722+
*/
6723+
public <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
6724+
if (other == null) {
6725+
throw new NullPointerException("other");
6726+
}
6727+
return create(OperationTimeout.timeoutSelector(this, null, timeoutSelector, other));
6728+
}
6729+
6730+
/**
6731+
* Create an Observable which completes if either the first item or any subsequent item
6732+
* doesn't arrive within the time window specified by the timeout selectors' Observable.
6733+
* @param <U> the first timeout value type (ignored)
6734+
* @param <V> the subsequent timeout value type (ignored)
6735+
* @param firstTimeoutSelector function that returns an observable which determines
6736+
* the timeout window for the first source item
6737+
* @param timeoutSelector function that returns an observable for each source item
6738+
* which determines the timeout window for the subsequent source item
6739+
* @return an Observable which completes if either the first item or any subsequent item
6740+
* doesn't arrive within the time window specified by the timeout selectors' Observable.
6741+
*/
6742+
public <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector) {
6743+
if (firstTimeoutSelector == null) {
6744+
throw new NullPointerException("firstTimeoutSelector");
6745+
}
6746+
return timeout(firstTimeoutSelector, timeoutSelector, Observable.<T>empty());
6747+
}
6748+
6749+
/**
6750+
* Create an Observable which switches to another Observable
6751+
* if either the first item or any subsequent item
6752+
* doesn't arrive within the time window specified by the timeout selectors' Observable.
6753+
* @param <U> the first timeout value type (ignored)
6754+
* @param <V> the subsequent timeout value type (ignored)
6755+
* @param firstTimeoutSelector function that returns an observable which determines
6756+
* the timeout window for the first source item
6757+
* @param timeoutSelector function that returns an observable for each source item
6758+
* which determines the timeout window for the subsequent source item
6759+
* @param other the other observable to switch to if the source times out
6760+
* @return an Observable which switches to another Observable
6761+
* if either the first item or any subsequent item
6762+
* doesn't arrive within the time window specified by the timeout selectors' Observable
6763+
*/
6764+
public <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
6765+
if (firstTimeoutSelector == null) {
6766+
throw new NullPointerException("firstTimeoutSelector");
6767+
}
6768+
if (other == null) {
6769+
throw new NullPointerException("other");
6770+
}
6771+
return create(OperationTimeout.timeoutSelector(this, firstTimeoutSelector, timeoutSelector, other));
6772+
}
6773+
66946774
/**
66956775
* Records the time interval between consecutive items emitted by an
66966776
* Observable.

rxjava-core/src/main/java/rx/operators/OperationTimeout.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import rx.schedulers.Schedulers;
2929
import rx.subscriptions.CompositeSubscription;
3030
import rx.subscriptions.SerialSubscription;
31+
import rx.subscriptions.Subscriptions;
3132
import rx.util.functions.Action0;
3233
import rx.util.functions.Func0;
34+
import rx.util.functions.Func1;
3335

3436
/**
3537
* Applies a timeout policy for each element in the observable sequence, using
@@ -154,4 +156,160 @@ public void onCompleted() {
154156
return composite;
155157
}
156158
}
159+
160+
/** Timeout using a per-item observable sequence. */
161+
public static <T, U, V> OnSubscribeFunc<T> timeoutSelector(Observable<? extends T> source, Func0<? extends Observable<U>> firstValueTimeout, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other) {
162+
return new TimeoutSelector<T, U, V>(source, firstValueTimeout, valueTimeout, other);
163+
}
164+
165+
/** Timeout using a per-item observable sequence. */
166+
private static final class TimeoutSelector<T, U, V> implements OnSubscribeFunc<T> {
167+
final Observable<? extends T> source;
168+
final Func0<? extends Observable<U>> firstValueTimeout;
169+
final Func1<? super T, ? extends Observable<V>> valueTimeout;
170+
final Observable<? extends T> other;
171+
172+
public TimeoutSelector(Observable<? extends T> source, Func0<? extends Observable<U>> firstValueTimeout, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other) {
173+
this.source = source;
174+
this.firstValueTimeout = firstValueTimeout;
175+
this.valueTimeout = valueTimeout;
176+
this.other = other;
177+
}
178+
179+
@Override
180+
public Subscription onSubscribe(Observer<? super T> t1) {
181+
CompositeSubscription csub = new CompositeSubscription();
182+
183+
SourceObserver<T, V> so = new SourceObserver<T, V>(t1, valueTimeout, other, csub);
184+
if (firstValueTimeout != null) {
185+
Observable<U> o;
186+
try {
187+
o = firstValueTimeout.call();
188+
} catch (Throwable t) {
189+
t1.onError(t);
190+
return Subscriptions.empty();
191+
}
192+
193+
csub.add(o.subscribe(new TimeoutObserver<U>(so)));
194+
}
195+
csub.add(source.subscribe(so));
196+
return csub;
197+
}
198+
199+
/** Observe the source. */
200+
private static final class SourceObserver<T, V> implements Observer<T>, TimeoutCallback {
201+
final Observer<? super T> observer;
202+
final Func1<? super T, ? extends Observable<V>> valueTimeout;
203+
final Observable<? extends T> other;
204+
final CompositeSubscription cancel;
205+
final Object guard;
206+
boolean done;
207+
final SerialSubscription tsub;
208+
final TimeoutObserver<V> to;
209+
210+
public SourceObserver(Observer<? super T> observer, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other, CompositeSubscription cancel) {
211+
this.observer = observer;
212+
this.valueTimeout = valueTimeout;
213+
this.other = other;
214+
this.cancel = cancel;
215+
this.guard = new Object();
216+
this.tsub = new SerialSubscription();
217+
this.cancel.add(tsub);
218+
this.to = new TimeoutObserver<V>(this);
219+
}
220+
221+
@Override
222+
public void onNext(T args) {
223+
tsub.set(Subscriptions.empty());
224+
225+
synchronized (guard) {
226+
if (done) {
227+
return;
228+
}
229+
observer.onNext(args);
230+
}
231+
232+
Observable<V> o;
233+
try {
234+
o = valueTimeout.call(args);
235+
} catch (Throwable t) {
236+
onError(t);
237+
return;
238+
}
239+
240+
SerialSubscription osub = new SerialSubscription();
241+
tsub.set(osub);
242+
243+
osub.set(o.subscribe(to));
244+
}
245+
@Override
246+
public void onError(Throwable e) {
247+
synchronized (guard) {
248+
if (done) {
249+
return;
250+
}
251+
done = true;
252+
observer.onError(e);
253+
}
254+
cancel.unsubscribe();
255+
}
256+
257+
@Override
258+
public void onCompleted() {
259+
synchronized (guard) {
260+
if (done) {
261+
return;
262+
}
263+
done = true;
264+
observer.onCompleted();
265+
}
266+
cancel.unsubscribe();
267+
}
268+
@Override
269+
public void timeout() {
270+
if (other != null) {
271+
synchronized (guard) {
272+
if (done) {
273+
return;
274+
}
275+
done = true;
276+
}
277+
cancel.clear();
278+
cancel.add(other.subscribe(observer));
279+
} else {
280+
onCompleted();
281+
}
282+
}
283+
}
284+
285+
/** The timeout callback. */
286+
private interface TimeoutCallback {
287+
void timeout();
288+
void onError(Throwable t);
289+
}
290+
291+
/** Observe the timeout. */
292+
private static final class TimeoutObserver<V> implements Observer<V> {
293+
final TimeoutCallback parent;
294+
295+
public TimeoutObserver(TimeoutCallback parent) {
296+
this.parent = parent;
297+
}
298+
299+
@Override
300+
public void onNext(V args) {
301+
parent.timeout();
302+
}
303+
304+
@Override
305+
public void onError(Throwable e) {
306+
parent.onError(e);
307+
}
308+
309+
@Override
310+
public void onCompleted() {
311+
parent.timeout();
312+
}
313+
}
314+
}
157315
}

0 commit comments

Comments
 (0)