Skip to content

Commit 87e766d

Browse files
Merge branch 'operation-throttle' of git://github.com/michaeldejong/RxJava into throttle-merge
Conflicts: rxjava-core/src/main/java/rx/Observable.java rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
2 parents a2f04b0 + 2519ef8 commit 87e766d

File tree

4 files changed

+287
-5
lines changed

4 files changed

+287
-5
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import rx.operators.OperationTakeLast;
6565
import rx.operators.OperationTakeUntil;
6666
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationThrottleLast;
6768
import rx.operators.OperationTimestamp;
6869
import rx.operators.OperationToObservableFuture;
6970
import rx.operators.OperationToObservableIterable;
@@ -1809,6 +1810,36 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
18091810
return create(OperationInterval.interval(interval, unit, scheduler));
18101811
}
18111812

1813+
/**
1814+
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
1815+
*
1816+
* @param timeout
1817+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1818+
*
1819+
* @param unit
1820+
* The {@link TimeUnit} for the timeout.
1821+
*
1822+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1823+
*/
1824+
public Observable<T> throttleLast(long timeout, TimeUnit unit) {
1825+
return create(OperationThrottleLast.throttleLast(this, timeout, unit));
1826+
}
1827+
1828+
/**
1829+
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
1830+
*
1831+
* @param timeout
1832+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1833+
* @param unit
1834+
* The {@link TimeUnit} for the timeout.
1835+
* @param scheduler
1836+
* The {@link Scheduler} to use when timing incoming values.
1837+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1838+
*/
1839+
public Observable<T> throttleLast(long timeout, TimeUnit unit, Scheduler scheduler) {
1840+
return create(OperationThrottleLast.throttleLast(this, timeout, unit, scheduler));
1841+
}
1842+
18121843
/**
18131844
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
18141845
* <p>

rxjava-core/src/main/java/rx/concurrency/TestScheduler.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
import java.util.PriorityQueue;
2020
import java.util.Queue;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import rx.Scheduler;
2425
import rx.Subscription;
25-
import rx.subscriptions.Subscriptions;
2626
import rx.util.functions.Func2;
2727

2828
public class TestScheduler extends Scheduler {
2929
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());
3030

3131
private static class TimedAction<T> {
32+
3233
private final long time;
3334
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
3435
private final T state;
3536
private final TestScheduler scheduler;
37+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
3638

3739
private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
3840
this.time = time;
@@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
4143
this.scheduler = scheduler;
4244
}
4345

46+
public void cancel() {
47+
isCancelled.set(true);
48+
}
49+
4450
@Override
4551
public String toString() {
4652
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
@@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
8490
}
8591
time = current.time;
8692
queue.remove();
87-
// because the queue can have wildcards we have to ignore the type T for the state
88-
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
93+
94+
// Only execute if the TimedAction has not yet been cancelled
95+
if (!current.isCancelled.get()) {
96+
// because the queue can have wildcards we have to ignore the type T for the state
97+
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
98+
}
8999
}
90100
time = targetTimeInNanos;
91101
}
@@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
97107

98108
@Override
99109
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
100-
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
101-
return Subscriptions.empty();
110+
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
111+
queue.add(timedAction);
112+
113+
return new Subscription() {
114+
@Override
115+
public void unsubscribe() {
116+
timedAction.cancel();
117+
}
118+
};
102119
}
103120
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
import org.mockito.InOrder;
26+
27+
import rx.Observable;
28+
import rx.Observable.OnSubscribeFunc;
29+
import rx.Observer;
30+
import rx.Scheduler;
31+
import rx.Subscription;
32+
import rx.concurrency.Schedulers;
33+
import rx.concurrency.TestScheduler;
34+
import rx.subscriptions.Subscriptions;
35+
import rx.util.functions.Action0;
36+
import rx.util.functions.Func1;
37+
38+
/**
39+
* This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too
40+
* quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published
41+
* as soon as the timeout expires.
42+
*/
43+
public final class OperationThrottleLast {
44+
45+
/**
46+
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
47+
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
48+
* the last received event is published.
49+
*
50+
* @param items
51+
* The {@link Observable} which is publishing events.
52+
* @param timeout
53+
* How long each event has to be the 'last event' before it gets published.
54+
* @param unit
55+
* The unit of time for the specified timeout.
56+
* @return A {@link Func1} which performs the throttle operation.
57+
*/
58+
public static <T> OnSubscribeFunc<T> throttleLast(Observable<T> items, long timeout, TimeUnit unit) {
59+
return throttleLast(items, timeout, unit, Schedulers.threadPoolForComputation());
60+
}
61+
62+
/**
63+
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
64+
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
65+
* the last received event is published.
66+
*
67+
* @param items
68+
* The {@link Observable} which is publishing events.
69+
* @param timeout
70+
* How long each event has to be the 'last event' before it gets published.
71+
* @param unit
72+
* The unit of time for the specified timeout.
73+
* @param scheduler
74+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
75+
* @return A {@link Func1} which performs the throttle operation.
76+
*/
77+
public static <T> OnSubscribeFunc<T> throttleLast(final Observable<T> items, final long timeout, final TimeUnit unit, final Scheduler scheduler) {
78+
return new OnSubscribeFunc<T>() {
79+
@Override
80+
public Subscription onSubscribe(Observer<? super T> observer) {
81+
return items.window(timeout, unit, scheduler).flatMap(new Func1<Observable<T>, Observable<T>>() {
82+
83+
@Override
84+
public Observable<T> call(Observable<T> o) {
85+
return o.takeLast(1);
86+
}
87+
}).subscribe(observer);
88+
}
89+
};
90+
}
91+
92+
public static class UnitTest {
93+
94+
private TestScheduler scheduler;
95+
private Observer<String> observer;
96+
97+
@Before
98+
@SuppressWarnings("unchecked")
99+
public void before() {
100+
scheduler = new TestScheduler();
101+
observer = mock(Observer.class);
102+
}
103+
104+
@Test
105+
public void testThrottlingWithCompleted() {
106+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
107+
@Override
108+
public Subscription onSubscribe(Observer<? super String> observer) {
109+
publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
110+
publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
111+
publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires.
112+
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
113+
114+
return Subscriptions.empty();
115+
}
116+
});
117+
118+
Observable<String> sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler));
119+
sampled.subscribe(observer);
120+
121+
InOrder inOrder = inOrder(observer);
122+
123+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
124+
inOrder.verify(observer, times(1)).onNext("two");
125+
inOrder.verify(observer, times(1)).onCompleted();
126+
inOrder.verifyNoMoreInteractions();
127+
}
128+
129+
@Test
130+
public void testThrottlingWithError() {
131+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
132+
@Override
133+
public Subscription onSubscribe(Observer<? super String> observer) {
134+
Exception error = new TestException();
135+
publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires.
136+
publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires.
137+
publishError(observer, 700, error); // Should be published as soon as the timeout expires.
138+
139+
return Subscriptions.empty();
140+
}
141+
});
142+
143+
Observable<String> sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler));
144+
sampled.subscribe(observer);
145+
146+
InOrder inOrder = inOrder(observer);
147+
148+
scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
149+
inOrder.verify(observer).onNext("one");
150+
scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
151+
inOrder.verify(observer).onError(any(TestException.class));
152+
inOrder.verifyNoMoreInteractions();
153+
}
154+
155+
private <T> void publishCompleted(final Observer<T> observer, long delay) {
156+
scheduler.schedule(new Action0() {
157+
@Override
158+
public void call() {
159+
observer.onCompleted();
160+
}
161+
}, delay, TimeUnit.MILLISECONDS);
162+
}
163+
164+
private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
165+
scheduler.schedule(new Action0() {
166+
@Override
167+
public void call() {
168+
observer.onError(error);
169+
}
170+
}, delay, TimeUnit.MILLISECONDS);
171+
}
172+
173+
private <T> void publishNext(final Observer<T> observer, long delay, final T value) {
174+
scheduler.schedule(new Action0() {
175+
@Override
176+
public void call() {
177+
observer.onNext(value);
178+
}
179+
}, delay, TimeUnit.MILLISECONDS);
180+
}
181+
182+
@SuppressWarnings("serial")
183+
private class TestException extends Exception {
184+
}
185+
186+
}
187+
188+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package rx;
2+
3+
import static org.mockito.Mockito.*;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
import org.mockito.InOrder;
9+
10+
import rx.concurrency.TestScheduler;
11+
import rx.subjects.PublishSubject;
12+
13+
public class ThrottleLastTests {
14+
15+
@Test
16+
public void testThrottle() {
17+
@SuppressWarnings("unchecked")
18+
Observer<Integer> observer = mock(Observer.class);
19+
TestScheduler s = new TestScheduler();
20+
PublishSubject<Integer> o = PublishSubject.create();
21+
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
22+
23+
// send events with simulated time increments
24+
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
25+
o.onNext(1); // skip
26+
o.onNext(2); // deliver
27+
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
28+
o.onNext(3); // skip
29+
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
30+
o.onNext(4); // skip
31+
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
32+
o.onNext(5); // skip
33+
o.onNext(6); // deliver
34+
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
35+
o.onNext(7); // deliver
36+
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
37+
o.onCompleted();
38+
39+
InOrder inOrder = inOrder(observer);
40+
inOrder.verify(observer).onNext(2);
41+
inOrder.verify(observer).onNext(6);
42+
inOrder.verify(observer).onNext(7);
43+
inOrder.verify(observer).onCompleted();
44+
inOrder.verifyNoMoreInteractions();
45+
}
46+
}

0 commit comments

Comments
 (0)