Skip to content

Commit a2a467f

Browse files
Merge branch 'master' of github.com:MarkVanDerVoort/RxJava into operator-all
Conflicts: rxjava-core/src/main/java/rx/Observable.java rxjava-core/src/main/java/rx/operators/OperationAll.java rxjava-core/src/test/java/rx/operators/OperatorAllTest.java
2 parents 4e0ce47 + 0f19eb2 commit a2a467f

File tree

3 files changed

+78
-31
lines changed

3 files changed

+78
-31
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2941,7 +2941,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Ob
29412941
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-all">RxJava Wiki: all()</a>
29422942
*/
29432943
public final Observable<Boolean> all(Func1<? super T, Boolean> predicate) {
2944-
return create(OperationAll.all(this, predicate));
2944+
return lift(new OperatorAll<T>(predicate));
29452945
}
29462946

29472947
/**
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.Subscriber;
5+
import rx.exceptions.OnErrorThrowable;
6+
import rx.functions.Func1;
7+
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
import static rx.Observable.Operator;
11+
12+
/**
13+
* Returns an Observable that emits a Boolean that indicates whether all items emitted by an
14+
* Observable satisfy a condition.
15+
* <p>
16+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/all.png">
17+
*/
18+
public class OperatorAll<T> implements Operator<Boolean,T>{
19+
20+
private final Func1<? super T, Boolean> predicate;
21+
22+
public OperatorAll(Func1<? super T, Boolean> predicate) {
23+
this.predicate = predicate;
24+
}
25+
26+
@Override
27+
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
28+
return new Subscriber<T>() {
29+
private AtomicBoolean status = new AtomicBoolean(true);
30+
31+
@Override
32+
public void onCompleted() {
33+
child.onNext(status.get());
34+
child.onCompleted();
35+
}
36+
37+
@Override
38+
public void onError(Throwable e) {
39+
child.onError(e);
40+
}
41+
42+
@Override
43+
public void onNext(T t) {
44+
try {
45+
final Boolean result = predicate.call(t);
46+
boolean changed = status.compareAndSet(true, result);
47+
48+
if (changed && !result) {
49+
child.onNext(false);
50+
child.onCompleted();
51+
}
52+
} catch (Throwable e) {
53+
child.onError(OnErrorThrowable.addValueAsLastCause(e,t));
54+
}
55+
}
56+
};
57+
}
58+
}

rxjava-core/src/test/java/rx/operators/OperationAllTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorAllTest.java

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,32 @@
1818
import static org.mockito.Mockito.mock;
1919
import static org.mockito.Mockito.verify;
2020
import static org.mockito.Mockito.verifyNoMoreInteractions;
21-
import static rx.operators.OperationAll.all;
21+
22+
import java.util.Arrays;
2223

2324
import org.junit.Test;
2425

2526
import rx.Observable;
2627
import rx.Observer;
2728
import rx.functions.Func1;
29+
import rx.observers.TestSubscriber;
30+
31+
public class OperatorAllTest {
2832

29-
public class OperationAllTest {
33+
final Func1<String, Boolean> hasLength3 = new Func1<String, Boolean>() {
34+
@Override
35+
public Boolean call(String s) {
36+
return s.length() == 3;
37+
}
38+
};
3039

3140
@Test
3241
@SuppressWarnings("unchecked")
3342
public void testAll() {
34-
Observable<String> obs = Observable.from("one", "two", "six");
43+
Observable<Boolean> obs = Observable.from(Arrays.asList("one", "two", "six")).all(hasLength3);
3544

3645
Observer<Boolean> observer = mock(Observer.class);
37-
Observable.create(all(obs, new Func1<String, Boolean>() {
38-
@Override
39-
public Boolean call(String s) {
40-
return s.length() == 3;
41-
}
42-
})).subscribe(observer);
46+
obs.subscribe(new TestSubscriber<Boolean>(observer));
4347

4448
verify(observer).onNext(true);
4549
verify(observer).onCompleted();
@@ -49,15 +53,10 @@ public Boolean call(String s) {
4953
@Test
5054
@SuppressWarnings("unchecked")
5155
public void testNotAll() {
52-
Observable<String> obs = Observable.from("one", "two", "three", "six");
56+
Observable<Boolean> obs = Observable.from(Arrays.asList("one", "two", "three", "six")).all(hasLength3);
5357

5458
Observer<Boolean> observer = mock(Observer.class);
55-
Observable.create(all(obs, new Func1<String, Boolean>() {
56-
@Override
57-
public Boolean call(String s) {
58-
return s.length() == 3;
59-
}
60-
})).subscribe(observer);
59+
obs.subscribe(new TestSubscriber<Boolean>(observer));
6160

6261
verify(observer).onNext(false);
6362
verify(observer).onCompleted();
@@ -67,15 +66,10 @@ public Boolean call(String s) {
6766
@Test
6867
@SuppressWarnings("unchecked")
6968
public void testEmpty() {
70-
Observable<String> obs = Observable.empty();
69+
Observable<Boolean> obs = Observable.<String>empty().all(hasLength3);
7170

7271
Observer<Boolean> observer = mock(Observer.class);
73-
Observable.create(all(obs, new Func1<String, Boolean>() {
74-
@Override
75-
public Boolean call(String s) {
76-
return s.length() == 3;
77-
}
78-
})).subscribe(observer);
72+
obs.subscribe(new TestSubscriber<Boolean>(observer));
7973

8074
verify(observer).onNext(true);
8175
verify(observer).onCompleted();
@@ -86,15 +80,10 @@ public Boolean call(String s) {
8680
@SuppressWarnings("unchecked")
8781
public void testError() {
8882
Throwable error = new Throwable();
89-
Observable<String> obs = Observable.error(error);
83+
Observable<Boolean> obs = Observable.<String>error(error).all(hasLength3);
9084

9185
Observer<Boolean> observer = mock(Observer.class);
92-
Observable.create(all(obs, new Func1<String, Boolean>() {
93-
@Override
94-
public Boolean call(String s) {
95-
return s.length() == 3;
96-
}
97-
})).subscribe(observer);
86+
obs.subscribe(new TestSubscriber<Boolean>(observer));
9887

9988
verify(observer).onError(error);
10089
verifyNoMoreInteractions(observer);

0 commit comments

Comments
 (0)