Skip to content

Commit e917c77

Browse files
committed
Add takeUntil support in Single
1 parent ee9956a commit e917c77

File tree

2 files changed

+519
-8
lines changed

2 files changed

+519
-8
lines changed

src/main/java/rx/Single.java

Lines changed: 154 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
*/
1313
package rx;
1414

15+
import java.util.Collection;
16+
import java.util.concurrent.*;
17+
1518
import rx.Observable.Operator;
1619
import rx.annotations.Beta;
1720
import rx.annotations.Experimental;
@@ -23,15 +26,13 @@
2326
import rx.internal.util.ScalarSynchronousSingle;
2427
import rx.internal.util.UtilityFunctions;
2528
import rx.observers.SafeSubscriber;
29+
import rx.observers.SerializedSubscriber;
2630
import rx.plugins.RxJavaObservableExecutionHook;
2731
import rx.plugins.RxJavaPlugins;
2832
import rx.schedulers.Schedulers;
2933
import rx.singles.BlockingSingle;
3034
import rx.subscriptions.Subscriptions;
3135

32-
import java.util.Collection;
33-
import java.util.concurrent.*;
34-
3536
/**
3637
* The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the
3738
* implementation of the Reactive Pattern for a stream or vector of values.
@@ -1800,6 +1801,156 @@ public void onError(Throwable error) {
18001801
}
18011802
});
18021803
}
1804+
1805+
/**
1806+
* Returns a Single that emits the item emitted by the source Single until an Observable emits an item. Upon
1807+
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
1808+
* {@link SingleSubscriber#onSuccess(Object)}.
1809+
* <p>
1810+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
1811+
* <dl>
1812+
* <dt><b>Scheduler:</b></dt>
1813+
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
1814+
* </dl>
1815+
*
1816+
* @param other
1817+
* the Observable whose first emitted item will cause {@code takeUntil} to emit the item from the source
1818+
* Single
1819+
* @param <E>
1820+
* the type of items emitted by {@code other}
1821+
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits
1822+
* its first item
1823+
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
1824+
*/
1825+
public final <E> Single<T> takeUntil(final Observable<? extends E> other) {
1826+
return lift(new Operator<T, T>() {
1827+
@Override
1828+
public Subscriber<? super T> call(Subscriber<? super T> child) {
1829+
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
1830+
1831+
final Subscriber<T> main = new Subscriber<T>(serial, false) {
1832+
@Override
1833+
public void onNext(T t) {
1834+
serial.onNext(t);
1835+
}
1836+
@Override
1837+
public void onError(Throwable e) {
1838+
try {
1839+
serial.onError(e);
1840+
} finally {
1841+
serial.unsubscribe();
1842+
}
1843+
}
1844+
@Override
1845+
public void onCompleted() {
1846+
try {
1847+
serial.onCompleted();
1848+
} finally {
1849+
serial.unsubscribe();
1850+
}
1851+
}
1852+
};
1853+
1854+
final Subscriber<E> so = new Subscriber<E>() {
1855+
1856+
@Override
1857+
public void onCompleted() {
1858+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
1859+
}
1860+
1861+
@Override
1862+
public void onError(Throwable e) {
1863+
main.onError(e);
1864+
}
1865+
1866+
@Override
1867+
public void onNext(E e) {
1868+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
1869+
}
1870+
};
1871+
1872+
serial.add(main);
1873+
serial.add(so);
1874+
1875+
child.add(serial);
1876+
1877+
other.unsafeSubscribe(so);
1878+
1879+
return main;
1880+
}
1881+
});
1882+
}
1883+
1884+
/**
1885+
* Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon
1886+
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
1887+
* {@link SingleSubscriber#onSuccess(Object)}.
1888+
* <p>
1889+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
1890+
* <dl>
1891+
* <dt><b>Scheduler:</b></dt>
1892+
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
1893+
* </dl>
1894+
*
1895+
* @param other
1896+
* the Single whose emitted item will cause {@code takeUntil} to emit the item from the source Single
1897+
* @param <E>
1898+
* the type of item emitted by {@code other}
1899+
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits its item
1900+
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
1901+
*/
1902+
public final <E> Single<T> takeUntil(final Single<? extends E> other) {
1903+
return lift(new Operator<T, T>() {
1904+
@Override
1905+
public Subscriber<? super T> call(Subscriber<? super T> child) {
1906+
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
1907+
1908+
final Subscriber<T> main = new Subscriber<T>(serial, false) {
1909+
@Override
1910+
public void onNext(T t) {
1911+
serial.onNext(t);
1912+
}
1913+
@Override
1914+
public void onError(Throwable e) {
1915+
try {
1916+
serial.onError(e);
1917+
} finally {
1918+
serial.unsubscribe();
1919+
}
1920+
}
1921+
@Override
1922+
public void onCompleted() {
1923+
try {
1924+
serial.onCompleted();
1925+
} finally {
1926+
serial.unsubscribe();
1927+
}
1928+
}
1929+
};
1930+
1931+
final SingleSubscriber<E> so = new SingleSubscriber<E>() {
1932+
@Override
1933+
public void onSuccess(E value) {
1934+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
1935+
}
1936+
1937+
@Override
1938+
public void onError(Throwable e) {
1939+
main.onError(e);
1940+
}
1941+
};
1942+
1943+
serial.add(main);
1944+
serial.add(so);
1945+
1946+
child.add(serial);
1947+
1948+
other.subscribe(so);
1949+
1950+
return main;
1951+
}
1952+
});
1953+
}
18031954

18041955
/**
18051956
* Converts this Single into an {@link Observable}.

0 commit comments

Comments
 (0)