Skip to content

Commit 2e625a6

Browse files
Operator: throttleFirst
Another take on `throttle` … this delivers the first value in each window.
1 parent a2f04b0 commit 2e625a6

File tree

4 files changed

+282
-5
lines changed

4 files changed

+282
-5
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import rx.operators.OperationTakeLast;
6565
import rx.operators.OperationTakeUntil;
6666
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationThrottleFirst;
6768
import rx.operators.OperationTimestamp;
6869
import rx.operators.OperationToObservableFuture;
6970
import rx.operators.OperationToObservableIterable;
@@ -1809,6 +1810,34 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
18091810
return create(OperationInterval.interval(interval, unit, scheduler));
18101811
}
18111812

1813+
/**
1814+
* Throttles to first value in each window.
1815+
*
1816+
* @param windowDuration
1817+
* Duration of windows within with the first value will be chosen.
1818+
* @param unit
1819+
* The unit of time for the specified timeout.
1820+
* @return Observable which performs the throttle operation.
1821+
*/
1822+
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
1823+
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
1824+
}
1825+
1826+
/**
1827+
* Throttles to first value in each window.
1828+
*
1829+
* @param windowDuration
1830+
* Duration of windows within with the first value will be chosen.
1831+
* @param unit
1832+
* The unit of time for the specified timeout.
1833+
* @param scheduler
1834+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1835+
* @return Observable which performs the throttle operation.
1836+
*/
1837+
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) {
1838+
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit, scheduler));
1839+
}
1840+
18121841
/**
18131842
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
18141843
* <p>

rxjava-core/src/main/java/rx/concurrency/TestScheduler.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
import java.util.PriorityQueue;
2020
import java.util.Queue;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import rx.Scheduler;
2425
import rx.Subscription;
25-
import rx.subscriptions.Subscriptions;
2626
import rx.util.functions.Func2;
2727

2828
public class TestScheduler extends Scheduler {
2929
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());
3030

3131
private static class TimedAction<T> {
32+
3233
private final long time;
3334
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
3435
private final T state;
3536
private final TestScheduler scheduler;
37+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
3638

3739
private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
3840
this.time = time;
@@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
4143
this.scheduler = scheduler;
4244
}
4345

46+
public void cancel() {
47+
isCancelled.set(true);
48+
}
49+
4450
@Override
4551
public String toString() {
4652
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
@@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
8490
}
8591
time = current.time;
8692
queue.remove();
87-
// because the queue can have wildcards we have to ignore the type T for the state
88-
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
93+
94+
// Only execute if the TimedAction has not yet been cancelled
95+
if (!current.isCancelled.get()) {
96+
// because the queue can have wildcards we have to ignore the type T for the state
97+
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
98+
}
8999
}
90100
time = targetTimeInNanos;
91101
}
@@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
97107

98108
@Override
99109
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
100-
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
101-
return Subscriptions.empty();
110+
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
111+
queue.add(timedAction);
112+
113+
return new Subscription() {
114+
@Override
115+
public void unsubscribe() {
116+
timedAction.cancel();
117+
}
118+
};
102119
}
103120
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
import org.mockito.InOrder;
26+
27+
import rx.Observable;
28+
import rx.Observable.OnSubscribeFunc;
29+
import rx.Observer;
30+
import rx.Scheduler;
31+
import rx.Subscription;
32+
import rx.concurrency.Schedulers;
33+
import rx.concurrency.TestScheduler;
34+
import rx.subscriptions.Subscriptions;
35+
import rx.util.functions.Action0;
36+
import rx.util.functions.Func1;
37+
38+
/**
39+
* Throttle by windowing a stream and returning the first value in each window.
40+
*/
41+
public final class OperationThrottleFirst {
42+
43+
/**
44+
* Throttles to first value in each window.
45+
*
46+
* @param items
47+
* The {@link Observable} which is publishing events.
48+
* @param windowDuration
49+
* Duration of windows within with the first value will be chosen.
50+
* @param unit
51+
* The unit of time for the specified timeout.
52+
* @return A {@link Func1} which performs the throttle operation.
53+
*/
54+
public static <T> OnSubscribeFunc<T> throttleFirst(Observable<T> items, long windowDuration, TimeUnit unit) {
55+
return throttleFirst(items, windowDuration, unit, Schedulers.threadPoolForComputation());
56+
}
57+
58+
/**
59+
* Throttles to first value in each window.
60+
*
61+
* @param items
62+
* The {@link Observable} which is publishing events.
63+
* @param windowDuration
64+
* Duration of windows within with the first value will be chosen.
65+
* @param unit
66+
* The unit of time for the specified timeout.
67+
* @param scheduler
68+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
69+
* @return A {@link Func1} which performs the throttle operation.
70+
*/
71+
public static <T> OnSubscribeFunc<T> throttleFirst(final Observable<T> items, final long windowDuration, final TimeUnit unit, final Scheduler scheduler) {
72+
return new OnSubscribeFunc<T>() {
73+
@Override
74+
public Subscription onSubscribe(Observer<? super T> observer) {
75+
return items.window(windowDuration, unit, scheduler).flatMap(new Func1<Observable<T>, Observable<T>>() {
76+
77+
@Override
78+
public Observable<T> call(Observable<T> o) {
79+
return o.takeFirst();
80+
}
81+
}).subscribe(observer);
82+
}
83+
};
84+
}
85+
86+
public static class UnitTest {
87+
88+
private TestScheduler scheduler;
89+
private Observer<String> observer;
90+
91+
@Before
92+
@SuppressWarnings("unchecked")
93+
public void before() {
94+
scheduler = new TestScheduler();
95+
observer = mock(Observer.class);
96+
}
97+
98+
@Test
99+
public void testThrottlingWithCompleted() {
100+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
101+
@Override
102+
public Subscription onSubscribe(Observer<? super String> observer) {
103+
publishNext(observer, 100, "one"); // publish as it's first
104+
publishNext(observer, 300, "two"); // skip as it's last within the first 400
105+
publishNext(observer, 900, "three"); // publish
106+
publishNext(observer, 905, "four"); // skip
107+
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
108+
109+
return Subscriptions.empty();
110+
}
111+
});
112+
113+
Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
114+
sampled.subscribe(observer);
115+
116+
InOrder inOrder = inOrder(observer);
117+
118+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
119+
inOrder.verify(observer, times(1)).onNext("one");
120+
inOrder.verify(observer, times(0)).onNext("two");
121+
inOrder.verify(observer, times(1)).onNext("three");
122+
inOrder.verify(observer, times(0)).onNext("four");
123+
inOrder.verify(observer, times(1)).onCompleted();
124+
inOrder.verifyNoMoreInteractions();
125+
}
126+
127+
@Test
128+
public void testThrottlingWithError() {
129+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
130+
@Override
131+
public Subscription onSubscribe(Observer<? super String> observer) {
132+
Exception error = new TestException();
133+
publishNext(observer, 100, "one"); // Should be published since it is first
134+
publishNext(observer, 200, "two"); // Should be skipped since onError will arrive before the timeout expires
135+
publishError(observer, 300, error); // Should be published as soon as the timeout expires.
136+
137+
return Subscriptions.empty();
138+
}
139+
});
140+
141+
Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
142+
sampled.subscribe(observer);
143+
144+
InOrder inOrder = inOrder(observer);
145+
146+
scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
147+
inOrder.verify(observer).onNext("one");
148+
inOrder.verify(observer).onError(any(TestException.class));
149+
inOrder.verifyNoMoreInteractions();
150+
}
151+
152+
private <T> void publishCompleted(final Observer<T> observer, long delay) {
153+
scheduler.schedule(new Action0() {
154+
@Override
155+
public void call() {
156+
observer.onCompleted();
157+
}
158+
}, delay, TimeUnit.MILLISECONDS);
159+
}
160+
161+
private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
162+
scheduler.schedule(new Action0() {
163+
@Override
164+
public void call() {
165+
observer.onError(error);
166+
}
167+
}, delay, TimeUnit.MILLISECONDS);
168+
}
169+
170+
private <T> void publishNext(final Observer<T> observer, long delay, final T value) {
171+
scheduler.schedule(new Action0() {
172+
@Override
173+
public void call() {
174+
observer.onNext(value);
175+
}
176+
}, delay, TimeUnit.MILLISECONDS);
177+
}
178+
179+
@SuppressWarnings("serial")
180+
private class TestException extends Exception {
181+
}
182+
183+
}
184+
185+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package rx;
2+
3+
import static org.mockito.Mockito.*;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
import org.mockito.InOrder;
9+
10+
import rx.concurrency.TestScheduler;
11+
import rx.subjects.PublishSubject;
12+
13+
public class ThrottleFirstTests {
14+
15+
@Test
16+
public void testThrottle() {
17+
@SuppressWarnings("unchecked")
18+
Observer<Integer> observer = mock(Observer.class);
19+
TestScheduler s = new TestScheduler();
20+
PublishSubject<Integer> o = PublishSubject.create();
21+
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
22+
23+
// send events with simulated time increments
24+
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
25+
o.onNext(1); // deliver
26+
o.onNext(2); // skip
27+
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
28+
o.onNext(3); // deliver
29+
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
30+
o.onNext(4); // skip
31+
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
32+
o.onNext(5); // skip
33+
o.onNext(6); // skip
34+
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
35+
o.onNext(7); // deliver
36+
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
37+
o.onCompleted();
38+
39+
InOrder inOrder = inOrder(observer);
40+
inOrder.verify(observer).onNext(1);
41+
inOrder.verify(observer).onNext(3);
42+
inOrder.verify(observer).onNext(7);
43+
inOrder.verify(observer).onCompleted();
44+
inOrder.verifyNoMoreInteractions();
45+
}
46+
}

0 commit comments

Comments
 (0)