@@ -1756,6 +1756,102 @@ trait Observable[+T]
17561756 toScalaObservable[U ](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler))
17571757 }
17581758
1759+ /**
1760+ * Returns an Observable that mirrors the source Observable, but emits a TimeoutException if an item emitted by
1761+ * the source Observable doesn't arrive within a window of time after the emission of the
1762+ * previous item, where that period of time is measured by an Observable that is a function
1763+ * of the previous item.
1764+ * <p>
1765+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout3.png">
1766+ * </p>
1767+ * Note: The arrival of the first source item is never timed out.
1768+ *
1769+ * @param timeoutSelector
1770+ * a function that returns an observable for each item emitted by the source
1771+ * Observable and that determines the timeout window for the subsequent item
1772+ * @return an Observable that mirrors the source Observable, but emits a TimeoutException if a item emitted by
1773+ * the source Observable takes longer to arrive than the time window defined by the
1774+ * selector for the previously emitted item
1775+ */
1776+ def timeout [V ](timeoutSelector : T => Observable [V ]): Observable [T ] = {
1777+ toScalaObservable[T ](asJavaObservable.timeout({ t : T => timeoutSelector(t).asJavaObservable.asInstanceOf [rx.Observable [V ]] }))
1778+ }
1779+
1780+ /**
1781+ * Returns an Observable that mirrors the source Observable, but that switches to a fallback
1782+ * Observable if an item emitted by the source Observable doesn't arrive within a window of time
1783+ * after the emission of the previous item, where that period of time is measured by an
1784+ * Observable that is a function of the previous item.
1785+ * <p>
1786+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout4.png">
1787+ * </p>
1788+ * Note: The arrival of the first source item is never timed out.
1789+ *
1790+ * @param timeoutSelector
1791+ * a function that returns an observable for each item emitted by the source
1792+ * Observable and that determines the timeout window for the subsequent item
1793+ * @param other
1794+ * the fallback Observable to switch to if the source Observable times out
1795+ * @return an Observable that mirrors the source Observable, but switches to mirroring a
1796+ * fallback Observable if a item emitted by the source Observable takes longer to arrive
1797+ * than the time window defined by the selector for the previously emitted item
1798+ */
1799+ def timeout [V , O >: T ](timeoutSelector : T => Observable [V ], other : Observable [O ]): Observable [O ] = {
1800+ val thisJava = this .asJavaObservable.asInstanceOf [rx.Observable [O ]]
1801+ toScalaObservable[O ](thisJava.timeout(
1802+ { t : O => timeoutSelector(t.asInstanceOf [T ]).asJavaObservable.asInstanceOf [rx.Observable [V ]] },
1803+ other.asJavaObservable))
1804+ }
1805+
1806+ /**
1807+ * Returns an Observable that mirrors the source Observable, but emits a TimeoutException
1808+ * if either the first item emitted by the source Observable or any subsequent item
1809+ * don't arrive within time windows defined by other Observables.
1810+ * <p>
1811+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout5.png">
1812+ * </p>
1813+ * @param firstTimeoutSelector
1814+ * a function that returns an Observable that determines the timeout window for the
1815+ * first source item
1816+ * @param timeoutSelector
1817+ * a function that returns an Observable for each item emitted by the source
1818+ * Observable and that determines the timeout window in which the subsequent source
1819+ * item must arrive in order to continue the sequence
1820+ * @return an Observable that mirrors the source Observable, but emits a TimeoutException if either the first item or any subsequent item doesn't
1821+ * arrive within the time windows specified by the timeout selectors
1822+ */
1823+ def timeout [U , V ](firstTimeoutSelector : () => Observable [U ], timeoutSelector : T => Observable [V ]): Observable [T ] = {
1824+ toScalaObservable[T ](asJavaObservable.timeout(
1825+ { firstTimeoutSelector().asJavaObservable.asInstanceOf [rx.Observable [U ]] },
1826+ { t : T => timeoutSelector(t).asJavaObservable.asInstanceOf [rx.Observable [V ]] }))
1827+ }
1828+
1829+ /**
1830+ * Returns an Observable that mirrors the source Observable, but switches to a fallback
1831+ * Observable if either the first item emitted by the source Observable or any subsequent item
1832+ * don't arrive within time windows defined by other Observables.
1833+ * <p>
1834+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout6.png">
1835+ * </p>
1836+ * @param firstTimeoutSelector
1837+ * a function that returns an Observable which determines the timeout window for the
1838+ * first source item
1839+ * @param timeoutSelector
1840+ * a function that returns an Observable for each item emitted by the source
1841+ * Observable and that determines the timeout window in which the subsequent source
1842+ * item must arrive in order to continue the sequence
1843+ * @param other
1844+ * the fallback Observable to switch to if the source Observable times out
1845+ * @return an Observable that mirrors the source Observable, but switches to the {@code other} Observable if either the first item emitted by the source Observable or any
1846+ * subsequent item don't arrive within time windows defined by the timeout selectors
1847+ */
1848+ def timeout [U , V , O >: T ](firstTimeoutSelector : () => Observable [U ], timeoutSelector : T => Observable [V ], other : Observable [O ]): Observable [O ] = {
1849+ val thisJava = this .asJavaObservable.asInstanceOf [rx.Observable [O ]]
1850+ toScalaObservable[O ](thisJava.timeout(
1851+ { firstTimeoutSelector().asJavaObservable.asInstanceOf [rx.Observable [U ]] },
1852+ { t : O => timeoutSelector(t.asInstanceOf [T ]).asJavaObservable.asInstanceOf [rx.Observable [V ]] },
1853+ other.asJavaObservable))
1854+ }
17591855
17601856 /**
17611857 * Returns an Observable that sums up the elements of this Observable.
0 commit comments