Skip to content

Commit 4919472

Browse files
committed
Split Take and TakeWhile
1 parent fb22a73 commit 4919472

File tree

3 files changed

+349
-137
lines changed

3 files changed

+349
-137
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import rx.operators.OperationDefer;
4242
import rx.operators.OperationDematerialize;
4343
import rx.operators.OperationFilter;
44+
import rx.operators.OperationTake;
45+
import rx.operators.OperationTakeWhile;
4446
import rx.operators.OperationWhere;
4547
import rx.operators.OperationMap;
4648
import rx.operators.OperationMaterialize;
@@ -54,7 +56,6 @@
5456
import rx.operators.OperationScan;
5557
import rx.operators.OperationSkip;
5658
import rx.operators.OperationSynchronize;
57-
import rx.operators.OperationTake;
5859
import rx.operators.OperationTakeLast;
5960
import rx.operators.OperationToObservableFuture;
6061
import rx.operators.OperationToObservableIterable;
@@ -1779,7 +1780,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
17791780
* @return
17801781
*/
17811782
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
1782-
return create(OperationTake.takeWhile(items, predicate));
1783+
return create(OperationTakeWhile.takeWhile(items, predicate));
17831784
}
17841785

17851786
/**
@@ -1811,16 +1812,18 @@ public Boolean call(T t) {
18111812
* @return
18121813
*/
18131814
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
1814-
return create(OperationTake.takeWhileWithIndex(items, predicate));
1815+
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
18151816
}
18161817

18171818
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
18181819
@SuppressWarnings("rawtypes")
18191820
final FuncN _f = Functions.from(predicate);
18201821

1821-
return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
1822+
return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>()
1823+
{
18221824
@Override
1823-
public Boolean call(T t, Integer integer) {
1825+
public Boolean call(T t, Integer integer)
1826+
{
18241827
return (Boolean) _f.call(t, integer);
18251828
}
18261829
}));

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 28 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,23 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
21-
22-
import java.util.concurrent.atomic.AtomicInteger;
23-
2418
import org.junit.Test;
25-
2619
import rx.Observable;
2720
import rx.Observer;
2821
import rx.Subscription;
22+
import rx.subscriptions.Subscriptions;
2923
import rx.util.AtomicObservableSubscription;
3024
import rx.util.functions.Func1;
31-
import rx.util.functions.Func2;
32-
import rx.subjects.Subject;
25+
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
28+
import static org.junit.Assert.fail;
29+
import static org.mockito.Matchers.any;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.never;
32+
import static org.mockito.Mockito.times;
33+
import static org.mockito.Mockito.verify;
34+
3335
/**
3436
* Returns a specified number of contiguous values from the start of an observable sequence.
3537
*/
@@ -43,61 +45,17 @@ public final class OperationTake {
4345
* @return
4446
*/
4547
public static <T> Func1<Observer<T>, Subscription> take(final Observable<T> items, final int num) {
46-
return takeWhileWithIndex(items, OperationTake.<T> numPredicate(num));
47-
}
48-
49-
/**
50-
* Returns a specified number of contiguous values from the start of an observable sequence.
51-
*
52-
* @param items
53-
* @param predicate
54-
* a function to test each source element for a condition
55-
* @return
56-
*/
57-
public static <T> Func1<Observer<T>, Subscription> takeWhile(final Observable<T> items, final Func1<T, Boolean> predicate) {
58-
return takeWhileWithIndex(items, OperationTake.<T> skipIndex(predicate));
59-
}
60-
61-
/**
62-
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
63-
*
64-
* @param items
65-
* @param predicate
66-
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
67-
* @return
68-
*/
69-
public static <T> Func1<Observer<T>, Subscription> takeWhileWithIndex(final Observable<T> items, final Func2<T, Integer, Boolean> predicate) {
7048
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
7149
return new Func1<Observer<T>, Subscription>() {
7250

7351
@Override
7452
public Subscription call(Observer<T> observer) {
75-
return new TakeWhile<T>(items, predicate).call(observer);
76-
}
77-
78-
};
79-
}
80-
81-
private static <T> Func2<T, Integer, Boolean> numPredicate(final int num) {
82-
return new Func2<T, Integer, Boolean>() {
83-
84-
@Override
85-
public Boolean call(T input, Integer index) {
86-
return index < num;
53+
return new Take<T>(items, num).call(observer);
8754
}
8855

8956
};
9057
}
9158

92-
private static <T> Func2<T, Integer, Boolean> skipIndex(final Func1<T, Boolean> underlying) {
93-
return new Func2<T, Integer, Boolean>() {
94-
@Override
95-
public Boolean call(T input, Integer index) {
96-
return underlying.call(input);
97-
}
98-
};
99-
}
100-
10159
/**
10260
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
10361
* <p>
@@ -109,19 +67,24 @@ public Boolean call(T input, Integer index) {
10967
*
11068
* @param <T>
11169
*/
112-
private static class TakeWhile<T> implements Func1<Observer<T>, Subscription> {
70+
private static class Take<T> implements Func1<Observer<T>, Subscription> {
11371
private final AtomicInteger counter = new AtomicInteger();
11472
private final Observable<T> items;
115-
private final Func2<T, Integer, Boolean> predicate;
73+
private final int num;
11674
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
11775

118-
private TakeWhile(Observable<T> items, Func2<T, Integer, Boolean> predicate) {
76+
private Take(Observable<T> items, int num) {
11977
this.items = items;
120-
this.predicate = predicate;
78+
this.num = num;
12179
}
12280

12381
@Override
12482
public Subscription call(Observer<T> observer) {
83+
if (num < 1) {
84+
observer.onCompleted();
85+
return Subscriptions.empty();
86+
}
87+
12588
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
12689
}
12790

@@ -144,10 +107,14 @@ public void onError(Exception e) {
144107

145108
@Override
146109
public void onNext(T args) {
147-
if (predicate.call(args, counter.getAndIncrement())) {
110+
final int count = counter.incrementAndGet();
111+
if (count <= num) {
148112
observer.onNext(args);
149-
} else {
150-
observer.onCompleted();
113+
if (count == num) {
114+
observer.onCompleted();
115+
}
116+
}
117+
if (count >= num) {
151118
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
152119
subscription.unsubscribe();
153120
}
@@ -159,77 +126,6 @@ public void onNext(T args) {
159126

160127
public static class UnitTest {
161128

162-
@Test
163-
public void testTakeWhile1() {
164-
Observable<Integer> w = Observable.toObservable(1, 2, 3);
165-
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
166-
@Override
167-
public Boolean call(Integer input) {
168-
return input < 3;
169-
}
170-
}));
171-
172-
@SuppressWarnings("unchecked")
173-
Observer<Integer> aObserver = mock(Observer.class);
174-
take.subscribe(aObserver);
175-
verify(aObserver, times(1)).onNext(1);
176-
verify(aObserver, times(1)).onNext(2);
177-
verify(aObserver, never()).onNext(3);
178-
verify(aObserver, never()).onError(any(Exception.class));
179-
verify(aObserver, times(1)).onCompleted();
180-
}
181-
182-
@Test
183-
public void testTakeWhileOnSubject1() {
184-
Subject<Integer> s = Subject.create();
185-
Observable<Integer> w = (Observable<Integer>)s;
186-
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
187-
@Override
188-
public Boolean call(Integer input) {
189-
return input < 3;
190-
}
191-
}));
192-
193-
@SuppressWarnings("unchecked")
194-
Observer<Integer> aObserver = mock(Observer.class);
195-
take.subscribe(aObserver);
196-
197-
s.onNext(1);
198-
s.onNext(2);
199-
s.onNext(3);
200-
s.onNext(4);
201-
s.onNext(5);
202-
s.onCompleted();
203-
204-
verify(aObserver, times(1)).onNext(1);
205-
verify(aObserver, times(1)).onNext(2);
206-
verify(aObserver, never()).onNext(3);
207-
verify(aObserver, never()).onNext(4);
208-
verify(aObserver, never()).onNext(5);
209-
verify(aObserver, never()).onError(any(Exception.class));
210-
verify(aObserver, times(1)).onCompleted();
211-
}
212-
213-
@Test
214-
public void testTakeWhile2() {
215-
Observable<String> w = Observable.toObservable("one", "two", "three");
216-
Observable<String> take = Observable.create(takeWhileWithIndex(w, new Func2<String, Integer, Boolean>() {
217-
@Override
218-
public Boolean call(String input, Integer index) {
219-
return index < 2;
220-
}
221-
}));
222-
223-
@SuppressWarnings("unchecked")
224-
Observer<String> aObserver = mock(Observer.class);
225-
take.subscribe(aObserver);
226-
verify(aObserver, times(1)).onNext("one");
227-
verify(aObserver, times(1)).onNext("two");
228-
verify(aObserver, never()).onNext("three");
229-
verify(aObserver, never()).onError(any(Exception.class));
230-
verify(aObserver, times(1)).onCompleted();
231-
}
232-
233129
@Test
234130
public void testTake1() {
235131
Observable<String> w = Observable.toObservable("one", "two", "three");

0 commit comments

Comments
 (0)