Skip to content

Commit 57376d0

Browse files
Merge pull request #740 from akarnokd/TimeoutWithSelector
Timeout with selector overloads
2 parents 4bce409 + 34c5638 commit 57376d0

File tree

3 files changed

+474
-0
lines changed

3 files changed

+474
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7047,6 +7047,86 @@ public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? exten
70477047
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
70487048
}
70497049

7050+
/**
7051+
* Create an observable which completes if a source item doesn't arrive after the
7052+
* previous one in the time window specified by the per-item observable.
7053+
* <p>
7054+
* The arrival of the first source item is not timed out.
7055+
* @param <U> the timeout value type (ignored)
7056+
* @param timeoutSelector function that returns an observable for each source item
7057+
* which determines the timeout window for the subsequent source item
7058+
* @return an observable which completes if a source item doesn't arrive after the
7059+
* previous one in the time window specified by the per-item observable.
7060+
*/
7061+
public <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector) {
7062+
return timeout(timeoutSelector, Observable.<T>empty());
7063+
}
7064+
7065+
/**
7066+
* Create an observable which switches to the other Observable if a source
7067+
* item doesn't arrive after the
7068+
* previous one in the time window specified by the per-item observable.
7069+
* <p>
7070+
* The arrival of the first source item is not timed out.
7071+
* @param <U> the timeout value type (ignored)
7072+
* @param timeoutSelector function that returns an observable for each source item
7073+
* which determines the timeout window for the subsequent source item
7074+
* @param other the other observable to switch to if the source times out
7075+
* @return an observable which switches to the other Observable if a source
7076+
* item doesn't arrive after the
7077+
* previous one in the time window specified by the per-item observable
7078+
*/
7079+
public <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
7080+
if (other == null) {
7081+
throw new NullPointerException("other");
7082+
}
7083+
return create(OperationTimeout.timeoutSelector(this, null, timeoutSelector, other));
7084+
}
7085+
7086+
/**
7087+
* Create an Observable which completes if either the first item or any subsequent item
7088+
* doesn't arrive within the time window specified by the timeout selectors' Observable.
7089+
* @param <U> the first timeout value type (ignored)
7090+
* @param <V> the subsequent timeout value type (ignored)
7091+
* @param firstTimeoutSelector function that returns an observable which determines
7092+
* the timeout window for the first source item
7093+
* @param timeoutSelector function that returns an observable for each source item
7094+
* which determines the timeout window for the subsequent source item
7095+
* @return an Observable which completes if either the first item or any subsequent item
7096+
* doesn't arrive within the time window specified by the timeout selectors' Observable.
7097+
*/
7098+
public <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector) {
7099+
if (firstTimeoutSelector == null) {
7100+
throw new NullPointerException("firstTimeoutSelector");
7101+
}
7102+
return timeout(firstTimeoutSelector, timeoutSelector, Observable.<T>empty());
7103+
}
7104+
7105+
/**
7106+
* Create an Observable which switches to another Observable
7107+
* if either the first item or any subsequent item
7108+
* doesn't arrive within the time window specified by the timeout selectors' Observable.
7109+
* @param <U> the first timeout value type (ignored)
7110+
* @param <V> the subsequent timeout value type (ignored)
7111+
* @param firstTimeoutSelector function that returns an observable which determines
7112+
* the timeout window for the first source item
7113+
* @param timeoutSelector function that returns an observable for each source item
7114+
* which determines the timeout window for the subsequent source item
7115+
* @param other the other observable to switch to if the source times out
7116+
* @return an Observable which switches to another Observable
7117+
* if either the first item or any subsequent item
7118+
* doesn't arrive within the time window specified by the timeout selectors' Observable
7119+
*/
7120+
public <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
7121+
if (firstTimeoutSelector == null) {
7122+
throw new NullPointerException("firstTimeoutSelector");
7123+
}
7124+
if (other == null) {
7125+
throw new NullPointerException("other");
7126+
}
7127+
return create(OperationTimeout.timeoutSelector(this, firstTimeoutSelector, timeoutSelector, other));
7128+
}
7129+
70507130
/**
70517131
* Records the time interval between consecutive items emitted by an
70527132
* Observable.

rxjava-core/src/main/java/rx/operators/OperationTimeout.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import rx.schedulers.Schedulers;
2929
import rx.subscriptions.CompositeSubscription;
3030
import rx.subscriptions.SerialSubscription;
31+
import rx.subscriptions.Subscriptions;
3132
import rx.util.functions.Action0;
3233
import rx.util.functions.Func0;
34+
import rx.util.functions.Func1;
3335

3436
/**
3537
* Applies a timeout policy for each element in the observable sequence, using
@@ -154,4 +156,160 @@ public void onCompleted() {
154156
return composite;
155157
}
156158
}
159+
160+
/** Timeout using a per-item observable sequence. */
161+
public static <T, U, V> OnSubscribeFunc<T> timeoutSelector(Observable<? extends T> source, Func0<? extends Observable<U>> firstValueTimeout, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other) {
162+
return new TimeoutSelector<T, U, V>(source, firstValueTimeout, valueTimeout, other);
163+
}
164+
165+
/** Timeout using a per-item observable sequence. */
166+
private static final class TimeoutSelector<T, U, V> implements OnSubscribeFunc<T> {
167+
final Observable<? extends T> source;
168+
final Func0<? extends Observable<U>> firstValueTimeout;
169+
final Func1<? super T, ? extends Observable<V>> valueTimeout;
170+
final Observable<? extends T> other;
171+
172+
public TimeoutSelector(Observable<? extends T> source, Func0<? extends Observable<U>> firstValueTimeout, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other) {
173+
this.source = source;
174+
this.firstValueTimeout = firstValueTimeout;
175+
this.valueTimeout = valueTimeout;
176+
this.other = other;
177+
}
178+
179+
@Override
180+
public Subscription onSubscribe(Observer<? super T> t1) {
181+
CompositeSubscription csub = new CompositeSubscription();
182+
183+
SourceObserver<T, V> so = new SourceObserver<T, V>(t1, valueTimeout, other, csub);
184+
if (firstValueTimeout != null) {
185+
Observable<U> o;
186+
try {
187+
o = firstValueTimeout.call();
188+
} catch (Throwable t) {
189+
t1.onError(t);
190+
return Subscriptions.empty();
191+
}
192+
193+
csub.add(o.subscribe(new TimeoutObserver<U>(so)));
194+
}
195+
csub.add(source.subscribe(so));
196+
return csub;
197+
}
198+
199+
/** Observe the source. */
200+
private static final class SourceObserver<T, V> implements Observer<T>, TimeoutCallback {
201+
final Observer<? super T> observer;
202+
final Func1<? super T, ? extends Observable<V>> valueTimeout;
203+
final Observable<? extends T> other;
204+
final CompositeSubscription cancel;
205+
final Object guard;
206+
boolean done;
207+
final SerialSubscription tsub;
208+
final TimeoutObserver<V> to;
209+
210+
public SourceObserver(Observer<? super T> observer, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other, CompositeSubscription cancel) {
211+
this.observer = observer;
212+
this.valueTimeout = valueTimeout;
213+
this.other = other;
214+
this.cancel = cancel;
215+
this.guard = new Object();
216+
this.tsub = new SerialSubscription();
217+
this.cancel.add(tsub);
218+
this.to = new TimeoutObserver<V>(this);
219+
}
220+
221+
@Override
222+
public void onNext(T args) {
223+
tsub.set(Subscriptions.empty());
224+
225+
synchronized (guard) {
226+
if (done) {
227+
return;
228+
}
229+
observer.onNext(args);
230+
}
231+
232+
Observable<V> o;
233+
try {
234+
o = valueTimeout.call(args);
235+
} catch (Throwable t) {
236+
onError(t);
237+
return;
238+
}
239+
240+
SerialSubscription osub = new SerialSubscription();
241+
tsub.set(osub);
242+
243+
osub.set(o.subscribe(to));
244+
}
245+
@Override
246+
public void onError(Throwable e) {
247+
synchronized (guard) {
248+
if (done) {
249+
return;
250+
}
251+
done = true;
252+
observer.onError(e);
253+
}
254+
cancel.unsubscribe();
255+
}
256+
257+
@Override
258+
public void onCompleted() {
259+
synchronized (guard) {
260+
if (done) {
261+
return;
262+
}
263+
done = true;
264+
observer.onCompleted();
265+
}
266+
cancel.unsubscribe();
267+
}
268+
@Override
269+
public void timeout() {
270+
if (other != null) {
271+
synchronized (guard) {
272+
if (done) {
273+
return;
274+
}
275+
done = true;
276+
}
277+
cancel.clear();
278+
cancel.add(other.subscribe(observer));
279+
} else {
280+
onCompleted();
281+
}
282+
}
283+
}
284+
285+
/** The timeout callback. */
286+
private interface TimeoutCallback {
287+
void timeout();
288+
void onError(Throwable t);
289+
}
290+
291+
/** Observe the timeout. */
292+
private static final class TimeoutObserver<V> implements Observer<V> {
293+
final TimeoutCallback parent;
294+
295+
public TimeoutObserver(TimeoutCallback parent) {
296+
this.parent = parent;
297+
}
298+
299+
@Override
300+
public void onNext(V args) {
301+
parent.timeout();
302+
}
303+
304+
@Override
305+
public void onError(Throwable e) {
306+
parent.onError(e);
307+
}
308+
309+
@Override
310+
public void onCompleted() {
311+
parent.timeout();
312+
}
313+
}
314+
}
157315
}

0 commit comments

Comments
 (0)