Skip to content

Commit f8787aa

Browse files
Merge branch 'master' of github.com:Netflix/RxJava into idiomaticscala
2 parents c9f60ee + 4bda940 commit f8787aa

13 files changed

+921
-39
lines changed

CHANGES.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
# RxJava Releases #
22

3+
### Version 0.13.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.13.0%22)) ###
4+
5+
This release has some minor changes related to varargs that could break backwards compatibility
6+
if directly passing arrays but for most this release should not be breaking.
7+
8+
* [Pull 354](https://github.com/Netflix/RxJava/pull/354) Operators: Count, Sum, Average
9+
* [Pull 355](https://github.com/Netflix/RxJava/pull/355) Operators: skipWhile and skipWhileWithIndex
10+
* [Pull 356](https://github.com/Netflix/RxJava/pull/356) Operator: Interval
11+
* [Pull 357](https://github.com/Netflix/RxJava/pull/357) Operators: first and firstOrDefault
12+
* [Pull 368](https://github.com/Netflix/RxJava/pull/368) Operators: Throttle and Debounce
13+
* [Pull 371](https://github.com/Netflix/RxJava/pull/371) Operator: Retry
14+
* [Pull 370](https://github.com/Netflix/RxJava/pull/370) Change zip method signature from Collection to Iterable
15+
* [Pull 369](https://github.com/Netflix/RxJava/pull/369) Generics Improvements: co/contra-variance
16+
* [Pull 361](https://github.com/Netflix/RxJava/pull/361) Remove use of varargs from API
17+
318
### Version 0.12.2 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.12.2%22)) ###
419

520
* [Pull 352](https://github.com/Netflix/RxJava/pull/352) Groovy Language Adaptor: Add Func5-9 and N to the wrapper

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.12.3-SNAPSHOT
1+
version=0.13.1-SNAPSHOT

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 185 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package rx;
1717

18-
import static rx.util.functions.Functions.not;
18+
import static rx.util.functions.Functions.*;
1919

2020
import java.util.ArrayList;
2121
import java.util.Arrays;
2222
import java.util.Collection;
23-
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.concurrent.Future;
2625
import java.util.concurrent.TimeUnit;
@@ -65,6 +64,8 @@
6564
import rx.operators.OperationTakeLast;
6665
import rx.operators.OperationTakeUntil;
6766
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationThrottleFirst;
68+
import rx.operators.OperationDebounce;
6869
import rx.operators.OperationTimestamp;
6970
import rx.operators.OperationToObservableFuture;
7071
import rx.operators.OperationToObservableIterable;
@@ -1810,6 +1811,182 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
18101811
return create(OperationInterval.interval(interval, unit, scheduler));
18111812
}
18121813

1814+
/**
1815+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1816+
* <p>
1817+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1818+
* <p>
1819+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
1820+
* <p>
1821+
* Information on debounce vs throttle:
1822+
* <p>
1823+
* <ul>
1824+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1825+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1826+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1827+
* </ul>
1828+
*
1829+
* @param timeout
1830+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1831+
* @param unit
1832+
* The {@link TimeUnit} for the timeout.
1833+
*
1834+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1835+
* @see {@link #throttleWithTimeout};
1836+
*/
1837+
public Observable<T> debounce(long timeout, TimeUnit unit) {
1838+
return create(OperationDebounce.debounce(this, timeout, unit));
1839+
}
1840+
1841+
/**
1842+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1843+
* <p>
1844+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1845+
* <p>
1846+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
1847+
* <p>
1848+
* Information on debounce vs throttle:
1849+
* <p>
1850+
* <ul>
1851+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1852+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1853+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1854+
* </ul>
1855+
*
1856+
* @param timeout
1857+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1858+
* @param unit
1859+
* The unit of time for the specified timeout.
1860+
* @param scheduler
1861+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1862+
* @return Observable which performs the throttle operation.
1863+
* @see {@link #throttleWithTimeout};
1864+
*/
1865+
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
1866+
return create(OperationDebounce.debounce(this, timeout, unit));
1867+
}
1868+
1869+
/**
1870+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1871+
* <p>
1872+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1873+
* <p>
1874+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
1875+
* <p>
1876+
* Information on debounce vs throttle:
1877+
* <p>
1878+
* <ul>
1879+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1880+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1881+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1882+
* </ul>
1883+
*
1884+
* @param timeout
1885+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1886+
* @param unit
1887+
* The {@link TimeUnit} for the timeout.
1888+
*
1889+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1890+
* @see {@link #debounce}
1891+
*/
1892+
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
1893+
return create(OperationDebounce.debounce(this, timeout, unit));
1894+
}
1895+
1896+
/**
1897+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1898+
* <p>
1899+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1900+
* <p>
1901+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
1902+
*
1903+
* @param timeout
1904+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1905+
* @param unit
1906+
* The unit of time for the specified timeout.
1907+
* @param scheduler
1908+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1909+
* @return Observable which performs the throttle operation.
1910+
* @see {@link #debounce}
1911+
*/
1912+
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
1913+
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
1914+
}
1915+
1916+
/**
1917+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1918+
* <p>
1919+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
1920+
* <p>
1921+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
1922+
*
1923+
* @param skipDuration
1924+
* Time to wait before sending another value after emitting last value.
1925+
* @param unit
1926+
* The unit of time for the specified timeout.
1927+
* @param scheduler
1928+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1929+
* @return Observable which performs the throttle operation.
1930+
*/
1931+
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
1932+
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
1933+
}
1934+
1935+
/**
1936+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1937+
* <p>
1938+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
1939+
* <p>
1940+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
1941+
*
1942+
* @param skipDuration
1943+
* Time to wait before sending another value after emitting last value.
1944+
* @param unit
1945+
* The unit of time for the specified timeout.
1946+
* @param scheduler
1947+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1948+
* @return Observable which performs the throttle operation.
1949+
*/
1950+
public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
1951+
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
1952+
}
1953+
1954+
/**
1955+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1956+
* <p>
1957+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
1958+
* <p>
1959+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
1960+
*
1961+
* @param intervalDuration
1962+
* Duration of windows within with the last value will be chosen.
1963+
* @param unit
1964+
* The unit of time for the specified interval.
1965+
* @return Observable which performs the throttle operation.
1966+
* @see {@link #sample(long, TimeUnit)}
1967+
*/
1968+
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
1969+
return sample(intervalDuration, unit);
1970+
}
1971+
1972+
/**
1973+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1974+
* <p>
1975+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
1976+
* <p>
1977+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
1978+
*
1979+
* @param intervalDuration
1980+
* Duration of windows within with the last value will be chosen.
1981+
* @param unit
1982+
* The unit of time for the specified interval.
1983+
* @return Observable which performs the throttle operation.
1984+
* @see {@link #sample(long, TimeUnit, Scheduler)}
1985+
*/
1986+
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
1987+
return sample(intervalDuration, unit, scheduler);
1988+
}
1989+
18131990
/**
18141991
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
18151992
* <p>
@@ -3025,7 +3202,7 @@ public Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction)
30253202
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
30263203
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
30273204
*/
3028-
public Observable<T> reduce(Func2<? super T, ? super T, ? extends T> accumulator) {
3205+
public Observable<T> reduce(Func2<T, T, T> accumulator) {
30293206
return create(OperationScan.scan(this, accumulator)).takeLast(1);
30303207
}
30313208

@@ -3206,7 +3383,7 @@ public ConnectableObservable<T> publish() {
32063383
*
32073384
* @see #reduce(Func2)
32083385
*/
3209-
public Observable<T> aggregate(Func2<? super T, ? super T, ? extends T> accumulator) {
3386+
public Observable<T> aggregate(Func2<T, T, T> accumulator) {
32103387
return reduce(accumulator);
32113388
}
32123389

@@ -3233,7 +3410,7 @@ public Observable<T> aggregate(Func2<? super T, ? super T, ? extends T> accumula
32333410
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
32343411
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
32353412
*/
3236-
public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
3413+
public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
32373414
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
32383415
}
32393416

@@ -3244,7 +3421,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? ex
32443421
*
32453422
* @see #reduce(Object, Func2)
32463423
*/
3247-
public <R> Observable<R> aggregate(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
3424+
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
32483425
return reduce(initialValue, accumulator);
32493426
}
32503427

@@ -3267,7 +3444,7 @@ public <R> Observable<R> aggregate(R initialValue, Func2<? super R, ? super T, ?
32673444
* @return an Observable that emits the results of each call to the accumulator function
32683445
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
32693446
*/
3270-
public Observable<T> scan(Func2<? super T, ? super T, ? extends T> accumulator) {
3447+
public Observable<T> scan(Func2<T, T, T> accumulator) {
32713448
return create(OperationScan.scan(this, accumulator));
32723449
}
32733450

@@ -3328,7 +3505,7 @@ public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
33283505
* @return an Observable that emits the results of each call to the accumulator function
33293506
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
33303507
*/
3331-
public <R> Observable<R> scan(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
3508+
public <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
33323509
return create(OperationScan.scan(this, initialValue, accumulator));
33333510
}
33343511

rxjava-core/src/main/java/rx/concurrency/TestScheduler.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
import java.util.PriorityQueue;
2020
import java.util.Queue;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import rx.Scheduler;
2425
import rx.Subscription;
25-
import rx.subscriptions.Subscriptions;
2626
import rx.util.functions.Func2;
2727

2828
public class TestScheduler extends Scheduler {
2929
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());
3030

3131
private static class TimedAction<T> {
32+
3233
private final long time;
3334
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
3435
private final T state;
3536
private final TestScheduler scheduler;
37+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
3638

3739
private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
3840
this.time = time;
@@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
4143
this.scheduler = scheduler;
4244
}
4345

46+
public void cancel() {
47+
isCancelled.set(true);
48+
}
49+
4450
@Override
4551
public String toString() {
4652
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
@@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
8490
}
8591
time = current.time;
8692
queue.remove();
87-
// because the queue can have wildcards we have to ignore the type T for the state
88-
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
93+
94+
// Only execute if the TimedAction has not yet been cancelled
95+
if (!current.isCancelled.get()) {
96+
// because the queue can have wildcards we have to ignore the type T for the state
97+
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
98+
}
8999
}
90100
time = targetTimeInNanos;
91101
}
@@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
97107

98108
@Override
99109
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
100-
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
101-
return Subscriptions.empty();
110+
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
111+
queue.add(timedAction);
112+
113+
return new Subscription() {
114+
@Override
115+
public void unsubscribe() {
116+
timedAction.cancel();
117+
}
118+
};
102119
}
103120
}

0 commit comments

Comments
 (0)