Skip to content

Commit c9588bd

Browse files
committed
Add the selector variants of timeout in RxScala
1 parent 2cedb25 commit c9588bd

File tree

3 files changed

+121
-4
lines changed

3 files changed

+121
-4
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,4 +474,25 @@ class RxScalaDemo extends JUnitSuite {
474474
obs.toBlockingObservable.toIterable.last
475475
}
476476

477+
@Test def timeoutExample(): Unit = {
478+
val other = List(100L, 200L, 300L).toObservable
479+
val result = Observable.interval(100 millis).timeout(50 millis, other).toBlockingObservable.toList
480+
println(result)
481+
}
482+
483+
@Test def timeoutExample2(): Unit = {
484+
val firstTimeoutSelector = () => {
485+
Observable.timer(10 seconds, 10 seconds, ComputationScheduler()).take(1)
486+
}
487+
val timeoutSelector = (t: Long) => {
488+
Observable.timer(
489+
(500 - t * 100) max 1 millis,
490+
(500 - t * 100) max 1 millis,
491+
ComputationScheduler()).take(1)
492+
}
493+
val other = List(100L, 200L, 300L).toObservable
494+
val result = Observable.interval(100 millis).timeout(firstTimeoutSelector, timeoutSelector, other).toBlockingObservable.toList
495+
println(result)
496+
}
497+
477498
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1756,6 +1756,102 @@ trait Observable[+T]
17561756
toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler))
17571757
}
17581758

1759+
/**
1760+
* Returns an Observable that mirrors the source Observable, but emits a TimeoutException if an item emitted by
1761+
* the source Observable doesn't arrive within a window of time after the emission of the
1762+
* previous item, where that period of time is measured by an Observable that is a function
1763+
* of the previous item.
1764+
* <p>
1765+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout3.png">
1766+
* </p>
1767+
* Note: The arrival of the first source item is never timed out.
1768+
*
1769+
* @param timeoutSelector
1770+
* a function that returns an observable for each item emitted by the source
1771+
* Observable and that determines the timeout window for the subsequent item
1772+
* @return an Observable that mirrors the source Observable, but emits a TimeoutException if a item emitted by
1773+
* the source Observable takes longer to arrive than the time window defined by the
1774+
* selector for the previously emitted item
1775+
*/
1776+
def timeout[V](timeoutSelector: T => Observable[V]): Observable[T] = {
1777+
toScalaObservable[T](asJavaObservable.timeout({ t: T => timeoutSelector(t).asJavaObservable.asInstanceOf[rx.Observable[V]] }))
1778+
}
1779+
1780+
/**
1781+
* Returns an Observable that mirrors the source Observable, but that switches to a fallback
1782+
* Observable if an item emitted by the source Observable doesn't arrive within a window of time
1783+
* after the emission of the previous item, where that period of time is measured by an
1784+
* Observable that is a function of the previous item.
1785+
* <p>
1786+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout4.png">
1787+
* </p>
1788+
* Note: The arrival of the first source item is never timed out.
1789+
*
1790+
* @param timeoutSelector
1791+
* a function that returns an observable for each item emitted by the source
1792+
* Observable and that determines the timeout window for the subsequent item
1793+
* @param other
1794+
* the fallback Observable to switch to if the source Observable times out
1795+
* @return an Observable that mirrors the source Observable, but switches to mirroring a
1796+
* fallback Observable if a item emitted by the source Observable takes longer to arrive
1797+
* than the time window defined by the selector for the previously emitted item
1798+
*/
1799+
def timeout[V, O >: T](timeoutSelector: T => Observable[V], other: Observable[O]): Observable[O] = {
1800+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[O]]
1801+
toScalaObservable[O](thisJava.timeout(
1802+
{ t: O => timeoutSelector(t.asInstanceOf[T]).asJavaObservable.asInstanceOf[rx.Observable[V]] },
1803+
other.asJavaObservable))
1804+
}
1805+
1806+
/**
1807+
* Returns an Observable that mirrors the source Observable, but emits a TimeoutException
1808+
* if either the first item emitted by the source Observable or any subsequent item
1809+
* don't arrive within time windows defined by other Observables.
1810+
* <p>
1811+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout5.png">
1812+
* </p>
1813+
* @param firstTimeoutSelector
1814+
* a function that returns an Observable that determines the timeout window for the
1815+
* first source item
1816+
* @param timeoutSelector
1817+
* a function that returns an Observable for each item emitted by the source
1818+
* Observable and that determines the timeout window in which the subsequent source
1819+
* item must arrive in order to continue the sequence
1820+
* @return an Observable that mirrors the source Observable, but emits a TimeoutException if either the first item or any subsequent item doesn't
1821+
* arrive within the time windows specified by the timeout selectors
1822+
*/
1823+
def timeout[U, V](firstTimeoutSelector: () => Observable[U], timeoutSelector: T => Observable[V]): Observable[T] = {
1824+
toScalaObservable[T](asJavaObservable.timeout(
1825+
{ firstTimeoutSelector().asJavaObservable.asInstanceOf[rx.Observable[U]] },
1826+
{ t: T => timeoutSelector(t).asJavaObservable.asInstanceOf[rx.Observable[V]] }))
1827+
}
1828+
1829+
/**
1830+
* Returns an Observable that mirrors the source Observable, but switches to a fallback
1831+
* Observable if either the first item emitted by the source Observable or any subsequent item
1832+
* don't arrive within time windows defined by other Observables.
1833+
* <p>
1834+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout6.png">
1835+
* </p>
1836+
* @param firstTimeoutSelector
1837+
* a function that returns an Observable which determines the timeout window for the
1838+
* first source item
1839+
* @param timeoutSelector
1840+
* a function that returns an Observable for each item emitted by the source
1841+
* Observable and that determines the timeout window in which the subsequent source
1842+
* item must arrive in order to continue the sequence
1843+
* @param other
1844+
* the fallback Observable to switch to if the source Observable times out
1845+
* @return an Observable that mirrors the source Observable, but switches to the {@code other} Observable if either the first item emitted by the source Observable or any
1846+
* subsequent item don't arrive within time windows defined by the timeout selectors
1847+
*/
1848+
def timeout[U, V, O >: T](firstTimeoutSelector: () => Observable[U], timeoutSelector: T => Observable[V], other: Observable[O]): Observable[O] = {
1849+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[O]]
1850+
toScalaObservable[O](thisJava.timeout(
1851+
{ firstTimeoutSelector().asJavaObservable.asInstanceOf[rx.Observable[U]] },
1852+
{ t: O => timeoutSelector(t.asInstanceOf[T]).asJavaObservable.asInstanceOf[rx.Observable[V]] },
1853+
other.asJavaObservable))
1854+
}
17591855

17601856
/**
17611857
* Returns an Observable that sums up the elements of this Observable.

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7753,8 +7753,8 @@ public final Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
77537753
* a function that returns an Observable for each item emitted by the source
77547754
* Observable and that determines the timeout window in which the subsequent source
77557755
* item must arrive in order to continue the sequence
7756-
* @return an Observable that completes if either the first item or any subsequent item doesn't
7757-
* arrive within the time windows specified by the timeout selectors
7756+
* @return an Observable that mirrors the source Observable, but emits a TimeoutException if
7757+
* either the first item or any subsequent item doesn't arrive within the time windows specified by the timeout selectors
77587758
*/
77597759
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<V>> timeoutSelector) {
77607760
return timeout(firstTimeoutSelector, timeoutSelector, null);
@@ -7805,7 +7805,7 @@ public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTi
78057805
* @param timeoutSelector
78067806
* a function that returns an observable for each item emitted by the source
78077807
* Observable and that determines the timeout window for the subsequent item
7808-
* @return an Observable that mirrors the source Observable, but completes if a item emitted by
7808+
* @return an Observable that mirrors the source Observable, but emits a TimeoutException if a item emitted by
78097809
* the source Observable takes longer to arrive than the time window defined by the
78107810
* selector for the previously emitted item
78117811
*/
@@ -7821,7 +7821,7 @@ public final <V> Observable<T> timeout(Func1<? super T, ? extends Observable<V>>
78217821
* <p>
78227822
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout4.png">
78237823
* <p>
7824-
* The arrival of the first source item is never timed out.
7824+
* Note: The arrival of the first source item is never timed out.
78257825
*
78267826
* @param <V>
78277827
* the timeout value type (ignored)

0 commit comments

Comments
 (0)