Skip to content

Commit 2ac7ec4

Browse files
Re-implemented Take Operator with Bind
- also simplified implementation to not worry about thread-safety as per Rx contract - performance improvement from 4,033,468 ops/sec -> 6,731,287 ops/sec
1 parent 1d1066b commit 2ac7ec4

File tree

6 files changed

+236
-116
lines changed

6 files changed

+236
-116
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
import rx.operators.OperationSum;
8989
import rx.operators.OperationSwitch;
9090
import rx.operators.OperationSynchronize;
91-
import rx.operators.OperationTake;
91+
import rx.operators.OperatorTake;
9292
import rx.operators.OperationTakeLast;
9393
import rx.operators.OperationTakeUntil;
9494
import rx.operators.OperationTakeWhile;
@@ -106,6 +106,7 @@
106106
import rx.operators.OperationUsing;
107107
import rx.operators.OperationWindow;
108108
import rx.operators.OperationZip;
109+
import rx.operators.OperatorTakeTimed;
109110
import rx.operators.SafeObservableSubscription;
110111
import rx.operators.SafeObserver;
111112
import rx.plugins.RxJavaObservableExecutionHook;
@@ -7384,7 +7385,7 @@ public final Observable<T> synchronize(Object lock) {
73847385
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#take">RxJava Wiki: take()</a>
73857386
*/
73867387
public final Observable<T> take(final int num) {
7387-
return create(OperationTake.take(this, num));
7388+
return bind(new OperatorTake<T>(num));
73887389
}
73897390

73907391
/**
@@ -7422,7 +7423,7 @@ public final Observable<T> take(long time, TimeUnit unit) {
74227423
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#take">RxJava Wiki: take()</a>
74237424
*/
74247425
public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
7425-
return create(new OperationTake.TakeTimed<T>(this, time, unit, scheduler));
7426+
return create(new OperatorTakeTimed.TakeTimed<T>(this, time, unit, scheduler));
74267427
}
74277428

74287429
/**
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.OperatorSubscription;
19+
import rx.Observer;
20+
import rx.util.functions.Func2;
21+
22+
/**
23+
* Returns an Observable that emits the first <code>num</code> items emitted by the source
24+
* Observable.
25+
* <p>
26+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/take.png">
27+
* <p>
28+
* You can choose to pay attention only to the first <code>num</code> items emitted by an
29+
* Observable by using the take operation. This operation returns an Observable that will invoke a
30+
* subscribing Observer's <code>onNext</code> function a maximum of <code>num</code> times before
31+
* invoking <code>onCompleted</code>.
32+
*/
33+
public final class OperatorTake<T> implements Func2<Observer<? super T>, OperatorSubscription, Observer<? super T>> {
34+
35+
final int limit;
36+
37+
public OperatorTake(int limit) {
38+
this.limit = limit;
39+
}
40+
41+
@Override
42+
public Observer<T> call(final Observer<? super T> o, final OperatorSubscription s) {
43+
if (limit == 0) {
44+
o.onCompleted();
45+
s.unsubscribe();
46+
}
47+
return new Observer<T>() {
48+
49+
int count = 0;
50+
51+
@Override
52+
public void onCompleted() {
53+
o.onCompleted();
54+
}
55+
56+
@Override
57+
public void onError(Throwable e) {
58+
o.onError(e);
59+
}
60+
61+
@Override
62+
public void onNext(T i) {
63+
if (!s.isUnsubscribed()) {
64+
o.onNext(i);
65+
if (++count >= limit) {
66+
o.onCompleted();
67+
s.unsubscribe();
68+
}
69+
}
70+
}
71+
72+
};
73+
}
74+
75+
}

rxjava-core/src/main/java/rx/operators/OperationTake.java renamed to rxjava-core/src/main/java/rx/operators/OperatorTakeTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* subscribing Observer's <code>onNext</code> function a maximum of <code>num</code> times before
3939
* invoking <code>onCompleted</code>.
4040
*/
41-
public final class OperationTake {
41+
public final class OperatorTakeTimed {
4242

4343
/**
4444
* Returns a specified number of contiguous values from the start of an observable sequence.

rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ public void call() {
2727
/**
2828
* Observable.range(0, 10).take(5);
2929
*
30-
* Run: 10 - 3,951,557 ops/sec
31-
* Run: 11 - 3,981,329 ops/sec
32-
* Run: 12 - 3,988,949 ops/sec
33-
* Run: 13 - 3,925,971 ops/sec
34-
* Run: 14 - 4,033,468 ops/sec
30+
* Run: 10 - 6,660,042 ops/sec
31+
* Run: 11 - 6,721,423 ops/sec
32+
* Run: 12 - 6,556,035 ops/sec
33+
* Run: 13 - 6,692,284 ops/sec
34+
* Run: 14 - 6,731,287 ops/sec
3535
*/
3636
public long timeTake5() {
3737

rxjava-core/src/test/java/rx/operators/OperationTakeTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java

Lines changed: 13 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import static org.junit.Assert.*;
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
21-
import static rx.operators.OperationTake.*;
21+
import static rx.operators.OperatorTake.*;
2222

23+
import java.util.Arrays;
2324
import java.util.concurrent.TimeUnit;
2425
import java.util.concurrent.atomic.AtomicBoolean;
2526

@@ -35,12 +36,12 @@
3536
import rx.subscriptions.Subscriptions;
3637
import rx.util.functions.Func1;
3738

38-
public class OperationTakeTest {
39+
public class OperatorTakeTest {
3940

4041
@Test
4142
public void testTake1() {
42-
Observable<String> w = Observable.from("one", "two", "three");
43-
Observable<String> take = Observable.create(take(w, 2));
43+
Observable<String> w = Observable.from(Arrays.asList("one", "two", "three"));
44+
Observable<String> take = w.bind(new OperatorTake<String>(2));
4445

4546
@SuppressWarnings("unchecked")
4647
Observer<String> aObserver = mock(Observer.class);
@@ -54,8 +55,8 @@ public void testTake1() {
5455

5556
@Test
5657
public void testTake2() {
57-
Observable<String> w = Observable.from("one", "two", "three");
58-
Observable<String> take = Observable.create(take(w, 1));
58+
Observable<String> w = Observable.from(Arrays.asList("one", "two", "three"));
59+
Observable<String> take = w.bind(new OperatorTake<String>(1));
5960

6061
@SuppressWarnings("unchecked")
6162
Observer<String> aObserver = mock(Observer.class);
@@ -69,7 +70,7 @@ public void testTake2() {
6970

7071
@Test(expected = IllegalArgumentException.class)
7172
public void testTakeWithError() {
72-
Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() {
73+
Observable.from(Arrays.asList(1, 2, 3)).take(1).map(new Func1<Integer, Integer>() {
7374
public Integer call(Integer t1) {
7475
throw new IllegalArgumentException("some error");
7576
}
@@ -78,7 +79,7 @@ public Integer call(Integer t1) {
7879

7980
@Test
8081
public void testTakeWithErrorHappeningInOnNext() {
81-
Observable<Integer> w = Observable.from(1, 2, 3).take(2).map(new Func1<Integer, Integer>() {
82+
Observable<Integer> w = Observable.from(Arrays.asList(1, 2, 3)).take(2).map(new Func1<Integer, Integer>() {
8283
public Integer call(Integer t1) {
8384
throw new IllegalArgumentException("some error");
8485
}
@@ -94,7 +95,7 @@ public Integer call(Integer t1) {
9495

9596
@Test
9697
public void testTakeWithErrorHappeningInTheLastOnNext() {
97-
Observable<Integer> w = Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() {
98+
Observable<Integer> w = Observable.from(Arrays.asList(1, 2, 3)).take(1).map(new Func1<Integer, Integer>() {
9899
public Integer call(Integer t1) {
99100
throw new IllegalArgumentException("some error");
100101
}
@@ -122,7 +123,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
122123
@SuppressWarnings("unchecked")
123124
Observer<String> aObserver = mock(Observer.class);
124125

125-
Observable.create(take(source, 1)).subscribe(aObserver);
126+
source.bind(new OperatorTake<String>(1)).subscribe(aObserver);
126127

127128
verify(aObserver, times(1)).onNext("one");
128129
// even though onError is called we take(1) so shouldn't see it
@@ -152,7 +153,7 @@ public void unsubscribe() {
152153
@SuppressWarnings("unchecked")
153154
Observer<String> aObserver = mock(Observer.class);
154155

155-
Observable.create(take(source, 0)).subscribe(aObserver);
156+
source.bind(new OperatorTake<String>(0)).subscribe(aObserver);
156157
assertTrue("source subscribed", subscribed.get());
157158
assertTrue("source unsubscribed", unSubscribed.get());
158159

@@ -171,7 +172,7 @@ public void testUnsubscribeAfterTake() {
171172

172173
@SuppressWarnings("unchecked")
173174
Observer<String> aObserver = mock(Observer.class);
174-
Observable<String> take = Observable.create(take(w, 1));
175+
Observable<String> take = w.bind(new OperatorTake<String>(1));
175176
take.subscribe(aObserver);
176177

177178
// wait for the Observable to complete
@@ -228,99 +229,4 @@ public void run() {
228229
return s;
229230
}
230231
}
231-
232-
@Test
233-
public void testTakeTimed() {
234-
TestScheduler scheduler = new TestScheduler();
235-
236-
PublishSubject<Integer> source = PublishSubject.create();
237-
238-
Observable<Integer> result = source.take(1, TimeUnit.SECONDS, scheduler);
239-
240-
Observer<Object> o = mock(Observer.class);
241-
242-
result.subscribe(o);
243-
244-
source.onNext(1);
245-
source.onNext(2);
246-
source.onNext(3);
247-
248-
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
249-
250-
source.onNext(4);
251-
252-
InOrder inOrder = inOrder(o);
253-
inOrder.verify(o).onNext(1);
254-
inOrder.verify(o).onNext(2);
255-
inOrder.verify(o).onNext(3);
256-
inOrder.verify(o).onCompleted();
257-
inOrder.verifyNoMoreInteractions();
258-
259-
verify(o, never()).onNext(4);
260-
verify(o, never()).onError(any(Throwable.class));
261-
}
262-
263-
@Test
264-
public void testTakeTimedErrorBeforeTime() {
265-
TestScheduler scheduler = new TestScheduler();
266-
267-
PublishSubject<Integer> source = PublishSubject.create();
268-
269-
Observable<Integer> result = source.take(1, TimeUnit.SECONDS, scheduler);
270-
271-
Observer<Object> o = mock(Observer.class);
272-
273-
result.subscribe(o);
274-
275-
source.onNext(1);
276-
source.onNext(2);
277-
source.onNext(3);
278-
source.onError(new CustomException());
279-
280-
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
281-
282-
source.onNext(4);
283-
284-
InOrder inOrder = inOrder(o);
285-
inOrder.verify(o).onNext(1);
286-
inOrder.verify(o).onNext(2);
287-
inOrder.verify(o).onNext(3);
288-
inOrder.verify(o).onError(any(CustomException.class));
289-
inOrder.verifyNoMoreInteractions();
290-
291-
verify(o, never()).onCompleted();
292-
verify(o, never()).onNext(4);
293-
}
294-
295-
@Test
296-
public void testTakeTimedErrorAfterTime() {
297-
TestScheduler scheduler = new TestScheduler();
298-
299-
PublishSubject<Integer> source = PublishSubject.create();
300-
301-
Observable<Integer> result = source.take(1, TimeUnit.SECONDS, scheduler);
302-
303-
Observer<Object> o = mock(Observer.class);
304-
305-
result.subscribe(o);
306-
307-
source.onNext(1);
308-
source.onNext(2);
309-
source.onNext(3);
310-
311-
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
312-
313-
source.onNext(4);
314-
source.onError(new CustomException());
315-
316-
InOrder inOrder = inOrder(o);
317-
inOrder.verify(o).onNext(1);
318-
inOrder.verify(o).onNext(2);
319-
inOrder.verify(o).onNext(3);
320-
inOrder.verify(o).onCompleted();
321-
inOrder.verifyNoMoreInteractions();
322-
323-
verify(o, never()).onNext(4);
324-
verify(o, never()).onError(any(CustomException.class));
325-
}
326232
}

0 commit comments

Comments
 (0)