Skip to content

Commit d8256ef

Browse files
Merge branch 'take-while' of git://github.com/zsxwing/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 30911bd + b7af5c9 commit d8256ef

File tree

4 files changed

+103
-168
lines changed

4 files changed

+103
-168
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import rx.operators.OperationTakeLast;
5959
import rx.operators.OperationTakeTimed;
6060
import rx.operators.OperationTakeUntil;
61-
import rx.operators.OperationTakeWhile;
6261
import rx.operators.OperationWindow;
6362
import rx.operators.OperatorAll;
6463
import rx.operators.OperatorAmb;
@@ -117,6 +116,7 @@
117116
import rx.operators.OperatorSkipWhile;
118117
import rx.operators.OperatorSubscribeOn;
119118
import rx.operators.OperatorTake;
119+
import rx.operators.OperatorTakeWhile;
120120
import rx.operators.OperatorThrottleFirst;
121121
import rx.operators.OperatorTimeInterval;
122122
import rx.operators.OperatorTimeout;
@@ -6687,7 +6687,7 @@ public final <E> Observable<T> takeUntil(Observable<? extends E> other) {
66876687
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-takewhile-and-takewhilewithindex">RxJava Wiki: takeWhile()</a>
66886688
*/
66896689
public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate) {
6690-
return create(OperationTakeWhile.takeWhile(this, predicate));
6690+
return lift(new OperatorTakeWhile(predicate));
66916691
}
66926692

66936693
/**
@@ -6706,7 +6706,7 @@ public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate)
67066706
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-takewhile-and-takewhilewithindex">RxJava Wiki: takeWhileWithIndex()</a>
67076707
*/
67086708
public final Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer, Boolean> predicate) {
6709-
return create(OperationTakeWhile.takeWhileWithIndex(this, predicate));
6709+
return lift(new OperatorTakeWhile(predicate));
67106710
}
67116711

67126712
/**

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

Lines changed: 0 additions & 146 deletions
This file was deleted.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
import rx.functions.Func1;
21+
import rx.functions.Func2;
22+
23+
/**
24+
* Returns an Observable that emits items emitted by the source Observable as long as a specified
25+
* condition is true.
26+
* <p>
27+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/takeWhile.png">
28+
*/
29+
public final class OperatorTakeWhile<T> implements Operator<T, T> {
30+
31+
private final Func2<? super T, ? super Integer, Boolean> predicate;
32+
33+
public OperatorTakeWhile(final Func1<? super T, Boolean> underlying) {
34+
this(new Func2<T, Integer, Boolean>() {
35+
@Override
36+
public Boolean call(T input, Integer index) {
37+
return underlying.call(input);
38+
}
39+
});
40+
}
41+
42+
public OperatorTakeWhile(Func2<? super T, ? super Integer, Boolean> predicate) {
43+
this.predicate = predicate;
44+
}
45+
46+
@Override
47+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
48+
return new Subscriber<T>(subscriber) {
49+
50+
private int counter = 0;
51+
52+
@Override
53+
public void onNext(T args) {
54+
boolean isSelected;
55+
try {
56+
isSelected = predicate.call(args, counter++);
57+
} catch (Throwable e) {
58+
subscriber.onError(e);
59+
unsubscribe();
60+
return;
61+
}
62+
if (isSelected) {
63+
subscriber.onNext(args);
64+
} else {
65+
subscriber.onCompleted();
66+
unsubscribe();
67+
}
68+
}
69+
70+
@Override
71+
public void onCompleted() {
72+
subscriber.onCompleted();
73+
}
74+
75+
@Override
76+
public void onError(Throwable e) {
77+
subscriber.onError(e);
78+
}
79+
80+
};
81+
}
82+
83+
}

rxjava-core/src/test/java/rx/operators/OperationTakeWhileTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorTakeWhileTest.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,30 @@
2121
import static org.mockito.Mockito.never;
2222
import static org.mockito.Mockito.times;
2323
import static org.mockito.Mockito.verify;
24-
import static rx.operators.OperationTakeWhile.takeWhile;
25-
import static rx.operators.OperationTakeWhile.takeWhileWithIndex;
2624

2725
import org.junit.Test;
2826

2927
import rx.Observable;
28+
import rx.Observable.OnSubscribe;
3029
import rx.Observer;
30+
import rx.Subscriber;
3131
import rx.Subscription;
3232
import rx.functions.Func1;
3333
import rx.functions.Func2;
3434
import rx.subjects.PublishSubject;
3535
import rx.subjects.Subject;
36-
import rx.subscriptions.Subscriptions;
3736

38-
public class OperationTakeWhileTest {
37+
public class OperatorTakeWhileTest {
3938

4039
@Test
4140
public void testTakeWhile1() {
4241
Observable<Integer> w = Observable.from(1, 2, 3);
43-
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
42+
Observable<Integer> take = w.takeWhile(new Func1<Integer, Boolean>() {
4443
@Override
4544
public Boolean call(Integer input) {
4645
return input < 3;
4746
}
48-
}));
47+
});
4948

5049
@SuppressWarnings("unchecked")
5150
Observer<Integer> observer = mock(Observer.class);
@@ -60,12 +59,12 @@ public Boolean call(Integer input) {
6059
@Test
6160
public void testTakeWhileOnSubject1() {
6261
Subject<Integer, Integer> s = PublishSubject.create();
63-
Observable<Integer> take = Observable.create(takeWhile(s, new Func1<Integer, Boolean>() {
62+
Observable<Integer> take = s.takeWhile(new Func1<Integer, Boolean>() {
6463
@Override
6564
public Boolean call(Integer input) {
6665
return input < 3;
6766
}
68-
}));
67+
});
6968

7069
@SuppressWarnings("unchecked")
7170
Observer<Integer> observer = mock(Observer.class);
@@ -90,12 +89,12 @@ public Boolean call(Integer input) {
9089
@Test
9190
public void testTakeWhile2() {
9291
Observable<String> w = Observable.from("one", "two", "three");
93-
Observable<String> take = Observable.create(takeWhileWithIndex(w, new Func2<String, Integer, Boolean>() {
92+
Observable<String> take = w.takeWhileWithIndex(new Func2<String, Integer, Boolean>() {
9493
@Override
9594
public Boolean call(String input, Integer index) {
9695
return index < 2;
9796
}
98-
}));
97+
});
9998

10099
@SuppressWarnings("unchecked")
101100
Observer<String> observer = mock(Observer.class);
@@ -109,21 +108,20 @@ public Boolean call(String input, Integer index) {
109108

110109
@Test
111110
public void testTakeWhileDoesntLeakErrors() {
112-
Observable<String> source = Observable.create(new Observable.OnSubscribeFunc<String>() {
111+
Observable<String> source = Observable.create(new OnSubscribe<String>() {
113112
@Override
114-
public Subscription onSubscribe(Observer<? super String> observer) {
113+
public void call(Subscriber<? super String> observer) {
115114
observer.onNext("one");
116115
observer.onError(new Throwable("test failed"));
117-
return Subscriptions.empty();
118116
}
119117
});
120118

121-
Observable.create(takeWhile(source, new Func1<String, Boolean>() {
119+
source.takeWhile(new Func1<String, Boolean>() {
122120
@Override
123121
public Boolean call(String s) {
124122
return false;
125123
}
126-
})).toBlockingObservable().lastOrDefault("");
124+
}).toBlockingObservable().lastOrDefault("");
127125
}
128126

129127
@Test
@@ -133,12 +131,12 @@ public void testTakeWhileProtectsPredicateCall() {
133131

134132
@SuppressWarnings("unchecked")
135133
Observer<String> observer = mock(Observer.class);
136-
Observable<String> take = Observable.create(takeWhile(Observable.create(source), new Func1<String, Boolean>() {
134+
Observable<String> take = Observable.create(source).takeWhile(new Func1<String, Boolean>() {
137135
@Override
138136
public Boolean call(String s) {
139137
throw testException;
140138
}
141-
}));
139+
});
142140
take.subscribe(observer);
143141

144142
// wait for the Observable to complete
@@ -160,12 +158,12 @@ public void testUnsubscribeAfterTake() {
160158

161159
@SuppressWarnings("unchecked")
162160
Observer<String> observer = mock(Observer.class);
163-
Observable<String> take = Observable.create(takeWhileWithIndex(Observable.create(w), new Func2<String, Integer, Boolean>() {
161+
Observable<String> take = Observable.create(w).takeWhileWithIndex(new Func2<String, Integer, Boolean>() {
164162
@Override
165163
public Boolean call(String s, Integer index) {
166164
return index < 1;
167165
}
168-
}));
166+
});
169167
take.subscribe(observer);
170168

171169
// wait for the Observable to complete

0 commit comments

Comments
 (0)