Skip to content

Commit 30911bd

Browse files
Merge branch 'throttle-first' of git://github.com/zsxwing/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 4495f14 + ed57dc5 commit 30911bd

File tree

5 files changed

+108
-166
lines changed

5 files changed

+108
-166
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import rx.operators.OperationTakeTimed;
6060
import rx.operators.OperationTakeUntil;
6161
import rx.operators.OperationTakeWhile;
62-
import rx.operators.OperationThrottleFirst;
6362
import rx.operators.OperationWindow;
6463
import rx.operators.OperatorAll;
6564
import rx.operators.OperatorAmb;
@@ -118,6 +117,7 @@
118117
import rx.operators.OperatorSkipWhile;
119118
import rx.operators.OperatorSubscribeOn;
120119
import rx.operators.OperatorTake;
120+
import rx.operators.OperatorThrottleFirst;
121121
import rx.operators.OperatorTimeInterval;
122122
import rx.operators.OperatorTimeout;
123123
import rx.operators.OperatorTimeoutWithSelector;
@@ -6725,7 +6725,7 @@ public final Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Int
67256725
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-throttlefirst">RxJava Wiki: throttleFirst()</a>
67266726
*/
67276727
public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
6728-
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
6728+
return lift(new OperatorThrottleFirst<T>(windowDuration, unit, Schedulers.computation()));
67296729
}
67306730

67316731
/**
@@ -6747,7 +6747,7 @@ public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
67476747
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-throttlefirst">RxJava Wiki: throttleFirst()</a>
67486748
*/
67496749
public final Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
6750-
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
6750+
return lift(new OperatorThrottleFirst<T>(skipDuration, unit, scheduler));
67516751
}
67526752

67536753
/**

rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java

Lines changed: 0 additions & 88 deletions
This file was deleted.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.*;
21+
import rx.Observable.Operator;
22+
23+
/**
24+
* Throttle by windowing a stream and returning the first value in each window.
25+
*/
26+
public final class OperatorThrottleFirst<T> implements Operator<T, T> {
27+
28+
private final long timeInMilliseconds;
29+
private final Scheduler scheduler;
30+
31+
public OperatorThrottleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) {
32+
this.timeInMilliseconds = unit.toMillis(windowDuration);
33+
this.scheduler = scheduler;
34+
}
35+
36+
@Override
37+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
38+
return new Subscriber<T>(subscriber) {
39+
40+
private long lastOnNext = 0;
41+
42+
@Override
43+
public void onNext(T v) {
44+
long now = scheduler.now();
45+
if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) {
46+
lastOnNext = now;
47+
subscriber.onNext(v);
48+
}
49+
}
50+
51+
@Override
52+
public void onCompleted() {
53+
subscriber.onCompleted();
54+
}
55+
56+
@Override
57+
public void onError(Throwable e) {
58+
subscriber.onError(e);
59+
}
60+
61+
};
62+
}
63+
}

rxjava-core/src/test/java/rx/ThrottleFirstTests.java

Lines changed: 0 additions & 62 deletions
This file was deleted.

rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorThrottleFirstTest.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@
2727
import org.mockito.InOrder;
2828

2929
import rx.Observable;
30+
import rx.Observable.OnSubscribe;
3031
import rx.Observer;
3132
import rx.Scheduler;
32-
import rx.Subscription;
33+
import rx.Subscriber;
3334
import rx.functions.Action0;
3435
import rx.schedulers.TestScheduler;
35-
import rx.subscriptions.Subscriptions;
36+
import rx.subjects.PublishSubject;
3637

37-
public class OperationThrottleFirstTest {
38+
public class OperatorThrottleFirstTest {
3839

3940
private TestScheduler scheduler;
4041
private Scheduler.Worker innerScheduler;
@@ -50,20 +51,18 @@ public void before() {
5051

5152
@Test
5253
public void testThrottlingWithCompleted() {
53-
Observable<String> source = Observable.create(new Observable.OnSubscribeFunc<String>() {
54+
Observable<String> source = Observable.create(new OnSubscribe<String>() {
5455
@Override
55-
public Subscription onSubscribe(Observer<? super String> observer) {
56+
public void call(Subscriber<? super String> observer) {
5657
publishNext(observer, 100, "one"); // publish as it's first
5758
publishNext(observer, 300, "two"); // skip as it's last within the first 400
5859
publishNext(observer, 900, "three"); // publish
5960
publishNext(observer, 905, "four"); // skip
6061
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
61-
62-
return Subscriptions.empty();
6362
}
6463
});
6564

66-
Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
65+
Observable<String> sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler);
6766
sampled.subscribe(observer);
6867

6968
InOrder inOrder = inOrder(observer);
@@ -79,19 +78,17 @@ public Subscription onSubscribe(Observer<? super String> observer) {
7978

8079
@Test
8180
public void testThrottlingWithError() {
82-
Observable<String> source = Observable.create(new Observable.OnSubscribeFunc<String>() {
81+
Observable<String> source = Observable.create(new OnSubscribe<String>() {
8382
@Override
84-
public Subscription onSubscribe(Observer<? super String> observer) {
83+
public void call(Subscriber<? super String> observer) {
8584
Exception error = new TestException();
8685
publishNext(observer, 100, "one"); // Should be published since it is first
8786
publishNext(observer, 200, "two"); // Should be skipped since onError will arrive before the timeout expires
8887
publishError(observer, 300, error); // Should be published as soon as the timeout expires.
89-
90-
return Subscriptions.empty();
9188
}
9289
});
9390

94-
Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
91+
Observable<String> sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler);
9592
sampled.subscribe(observer);
9693

9794
InOrder inOrder = inOrder(observer);
@@ -132,4 +129,36 @@ public void call() {
132129
@SuppressWarnings("serial")
133130
private class TestException extends Exception {
134131
}
132+
133+
@Test
134+
public void testThrottle() {
135+
@SuppressWarnings("unchecked")
136+
Observer<Integer> observer = mock(Observer.class);
137+
TestScheduler s = new TestScheduler();
138+
PublishSubject<Integer> o = PublishSubject.create();
139+
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
140+
141+
// send events with simulated time increments
142+
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
143+
o.onNext(1); // deliver
144+
o.onNext(2); // skip
145+
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
146+
o.onNext(3); // deliver
147+
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
148+
o.onNext(4); // skip
149+
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
150+
o.onNext(5); // skip
151+
o.onNext(6); // skip
152+
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
153+
o.onNext(7); // deliver
154+
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
155+
o.onCompleted();
156+
157+
InOrder inOrder = inOrder(observer);
158+
inOrder.verify(observer).onNext(1);
159+
inOrder.verify(observer).onNext(3);
160+
inOrder.verify(observer).onNext(7);
161+
inOrder.verify(observer).onCompleted();
162+
inOrder.verifyNoMoreInteractions();
163+
}
135164
}

0 commit comments

Comments
 (0)