Skip to content

Commit c95b810

Browse files
Merge branch 'throttleWithTimeout' into throttle
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 1c47b0c + 2a3ade2 commit c95b810

File tree

3 files changed

+330
-0
lines changed

3 files changed

+330
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import rx.operators.OperationTakeUntil;
6666
import rx.operators.OperationTakeWhile;
6767
import rx.operators.OperationThrottle;
68+
import rx.operators.OperationThrottleWithTimeout;
6869
import rx.operators.OperationTimestamp;
6970
import rx.operators.OperationToObservableFuture;
7071
import rx.operators.OperationToObservableIterable;
@@ -1810,6 +1811,39 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
18101811
return create(OperationInterval.interval(interval, unit, scheduler));
18111812
}
18121813

1814+
/**
1815+
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
1816+
* <p>
1817+
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
1818+
*
1819+
* @param timeout
1820+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1821+
* @param unit
1822+
* The {@link TimeUnit} for the timeout.
1823+
*
1824+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1825+
*/
1826+
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
1827+
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit));
1828+
}
1829+
1830+
/**
1831+
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
1832+
* <p>
1833+
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
1834+
*
1835+
* @param timeout
1836+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1837+
* @param unit
1838+
* The unit of time for the specified timeout.
1839+
* @param scheduler
1840+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1841+
* @return Observable which performs the throttle operation.
1842+
*/
1843+
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
1844+
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler));
1845+
}
1846+
18131847
/**
18141848
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
18151849
* <p>
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.mockito.InOrder;
27+
28+
import rx.Observable;
29+
import rx.Observable.OnSubscribeFunc;
30+
import rx.Observer;
31+
import rx.Scheduler;
32+
import rx.Subscription;
33+
import rx.concurrency.Schedulers;
34+
import rx.concurrency.TestScheduler;
35+
import rx.subscriptions.Subscriptions;
36+
import rx.util.functions.Action0;
37+
import rx.util.functions.Func1;
38+
39+
/**
40+
* This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too
41+
* quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published
42+
* as soon as the timeout expires.
43+
*/
44+
public final class OperationThrottleWithTimeout {
45+
46+
/**
47+
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
48+
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
49+
* the last received event is published.
50+
*
51+
* @param items
52+
* The {@link Observable} which is publishing events.
53+
* @param timeout
54+
* How long each event has to be the 'last event' before it gets published.
55+
* @param unit
56+
* The unit of time for the specified timeout.
57+
* @return A {@link Func1} which performs the throttle operation.
58+
*/
59+
public static <T> OnSubscribeFunc<T> throttleWithTimeout(Observable<T> items, long timeout, TimeUnit unit) {
60+
return throttleWithTimeout(items, timeout, unit, Schedulers.threadPoolForComputation());
61+
}
62+
63+
/**
64+
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
65+
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
66+
* the last received event is published.
67+
*
68+
* @param items
69+
* The {@link Observable} which is publishing events.
70+
* @param timeout
71+
* How long each event has to be the 'last event' before it gets published.
72+
* @param unit
73+
* The unit of time for the specified timeout.
74+
* @param scheduler
75+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
76+
* @return A {@link Func1} which performs the throttle operation.
77+
*/
78+
public static <T> OnSubscribeFunc<T> throttleWithTimeout(final Observable<T> items, final long timeout, final TimeUnit unit, final Scheduler scheduler) {
79+
return new OnSubscribeFunc<T>() {
80+
@Override
81+
public Subscription onSubscribe(Observer<? super T> observer) {
82+
return new Throttle<T>(items, timeout, unit, scheduler).onSubscribe(observer);
83+
}
84+
};
85+
}
86+
87+
private static class Throttle<T> implements OnSubscribeFunc<T> {
88+
89+
private final Observable<T> items;
90+
private final long timeout;
91+
private final TimeUnit unit;
92+
private final Scheduler scheduler;
93+
94+
public Throttle(Observable<T> items, long timeout, TimeUnit unit, Scheduler scheduler) {
95+
this.items = items;
96+
this.timeout = timeout;
97+
this.unit = unit;
98+
this.scheduler = scheduler;
99+
}
100+
101+
@Override
102+
public Subscription onSubscribe(Observer<? super T> observer) {
103+
return items.subscribe(new ThrottledObserver<T>(observer, timeout, unit, scheduler));
104+
}
105+
}
106+
107+
private static class ThrottledObserver<T> implements Observer<T> {
108+
109+
private final Observer<? super T> observer;
110+
private final long timeout;
111+
private final TimeUnit unit;
112+
private final Scheduler scheduler;
113+
114+
private final AtomicReference<Subscription> lastScheduledNotification = new AtomicReference<Subscription>();
115+
116+
public ThrottledObserver(Observer<? super T> observer, long timeout, TimeUnit unit, Scheduler scheduler) {
117+
this.observer = observer;
118+
this.timeout = timeout;
119+
this.unit = unit;
120+
this.scheduler = scheduler;
121+
}
122+
123+
@Override
124+
public void onCompleted() {
125+
observer.onCompleted();
126+
}
127+
128+
@Override
129+
public void onError(Throwable e) {
130+
lastScheduledNotification.get().unsubscribe();
131+
observer.onError(e);
132+
}
133+
134+
@Override
135+
public void onNext(final T v) {
136+
Subscription previousSubscription = lastScheduledNotification.getAndSet(scheduler.schedule(new Action0() {
137+
138+
@Override
139+
public void call() {
140+
observer.onNext(v);
141+
}
142+
143+
}, timeout, unit));
144+
// cancel previous if not already executed
145+
if (previousSubscription != null) {
146+
previousSubscription.unsubscribe();
147+
}
148+
}
149+
}
150+
151+
public static class UnitTest {
152+
153+
private TestScheduler scheduler;
154+
private Observer<String> observer;
155+
156+
@Before
157+
@SuppressWarnings("unchecked")
158+
public void before() {
159+
scheduler = new TestScheduler();
160+
observer = mock(Observer.class);
161+
}
162+
163+
@Test
164+
public void testThrottlingWithCompleted() {
165+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
166+
@Override
167+
public Subscription onSubscribe(Observer<? super String> observer) {
168+
publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
169+
publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
170+
publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires.
171+
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
172+
173+
return Subscriptions.empty();
174+
}
175+
});
176+
177+
Observable<String> sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler));
178+
sampled.subscribe(observer);
179+
180+
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
181+
InOrder inOrder = inOrder(observer);
182+
// must go to 800 since it must be 400 after when two is sent, which is at 400
183+
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
184+
inOrder.verify(observer, times(1)).onNext("two");
185+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
186+
inOrder.verify(observer, times(1)).onCompleted();
187+
inOrder.verifyNoMoreInteractions();
188+
}
189+
190+
@Test
191+
public void testThrottlingWithError() {
192+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
193+
@Override
194+
public Subscription onSubscribe(Observer<? super String> observer) {
195+
Exception error = new TestException();
196+
publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires.
197+
publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires.
198+
publishError(observer, 700, error); // Should be published as soon as the timeout expires.
199+
200+
return Subscriptions.empty();
201+
}
202+
});
203+
204+
Observable<String> sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler));
205+
sampled.subscribe(observer);
206+
207+
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
208+
InOrder inOrder = inOrder(observer);
209+
// 100 + 400 means it triggers at 500
210+
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
211+
inOrder.verify(observer).onNext("one");
212+
scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
213+
inOrder.verify(observer).onError(any(TestException.class));
214+
inOrder.verifyNoMoreInteractions();
215+
}
216+
217+
private <T> void publishCompleted(final Observer<T> observer, long delay) {
218+
scheduler.schedule(new Action0() {
219+
@Override
220+
public void call() {
221+
observer.onCompleted();
222+
}
223+
}, delay, TimeUnit.MILLISECONDS);
224+
}
225+
226+
private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
227+
scheduler.schedule(new Action0() {
228+
@Override
229+
public void call() {
230+
observer.onError(error);
231+
}
232+
}, delay, TimeUnit.MILLISECONDS);
233+
}
234+
235+
private <T> void publishNext(final Observer<T> observer, final long delay, final T value) {
236+
scheduler.schedule(new Action0() {
237+
@Override
238+
public void call() {
239+
observer.onNext(value);
240+
}
241+
}, delay, TimeUnit.MILLISECONDS);
242+
}
243+
244+
@SuppressWarnings("serial")
245+
private class TestException extends Exception {
246+
}
247+
248+
}
249+
250+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package rx;
2+
3+
import static org.mockito.Mockito.*;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
import org.mockito.InOrder;
9+
10+
import rx.concurrency.TestScheduler;
11+
import rx.subjects.PublishSubject;
12+
13+
public class ThrottleWithTimeoutTests {
14+
15+
@Test
16+
public void testThrottle() {
17+
@SuppressWarnings("unchecked")
18+
Observer<Integer> observer = mock(Observer.class);
19+
TestScheduler s = new TestScheduler();
20+
PublishSubject<Integer> o = PublishSubject.create();
21+
o.throttleWithTimeout(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
22+
23+
// send events with simulated time increments
24+
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
25+
o.onNext(1); // skip
26+
o.onNext(2); // deliver
27+
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
28+
o.onNext(3); // skip
29+
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
30+
o.onNext(4); // skip
31+
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
32+
o.onNext(5); // skip
33+
o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5)
34+
s.advanceTimeTo(1300, TimeUnit.MILLISECONDS);
35+
o.onNext(7); // deliver
36+
s.advanceTimeTo(1800, TimeUnit.MILLISECONDS);
37+
o.onCompleted();
38+
39+
InOrder inOrder = inOrder(observer);
40+
inOrder.verify(observer).onNext(2);
41+
inOrder.verify(observer).onNext(6);
42+
inOrder.verify(observer).onNext(7);
43+
inOrder.verify(observer).onCompleted();
44+
inOrder.verifyNoMoreInteractions();
45+
}
46+
}

0 commit comments

Comments
 (0)