Skip to content

Commit c3b5353

Browse files
Merge branch 'OperatorAll' of github.com:akarnokd/RxJava into operator-all
Conflicts: rxjava-core/src/main/java/rx/operators/OperatorAll.java rxjava-core/src/test/java/rx/operators/OperatorAllTest.java
2 parents a2a467f + f4d468f commit c3b5353

File tree

4 files changed

+68
-133
lines changed

4 files changed

+68
-133
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import rx.observers.SafeSubscriber;
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
51-
import rx.operators.OperationAll;
51+
import rx.operators.OperatorAll;
5252
import rx.operators.OperationAny;
5353
import rx.operators.OperationAsObservable;
5454
import rx.operators.OperationBuffer;

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 0 additions & 90 deletions
This file was deleted.
Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,37 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.operators;
217

18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
320
import rx.Observable;
21+
import rx.Observable.OnSubscribeFunc;
22+
import rx.Observable.Operator;
23+
import rx.Observer;
424
import rx.Subscriber;
5-
import rx.exceptions.OnErrorThrowable;
25+
import rx.Subscription;
626
import rx.functions.Func1;
727

8-
import java.util.concurrent.atomic.AtomicBoolean;
9-
10-
import static rx.Observable.Operator;
11-
1228
/**
1329
* Returns an Observable that emits a Boolean that indicates whether all items emitted by an
1430
* Observable satisfy a condition.
1531
* <p>
1632
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/all.png">
1733
*/
18-
public class OperatorAll<T> implements Operator<Boolean,T>{
19-
34+
public final class OperatorAll<T> implements Operator<Boolean, T> {
2035
private final Func1<? super T, Boolean> predicate;
2136

2237
public OperatorAll(Func1<? super T, Boolean> predicate) {
@@ -25,13 +40,17 @@ public OperatorAll(Func1<? super T, Boolean> predicate) {
2540

2641
@Override
2742
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
28-
return new Subscriber<T>() {
29-
private AtomicBoolean status = new AtomicBoolean(true);
30-
43+
Subscriber s = new Subscriber<T>() {
44+
boolean done;
3145
@Override
32-
public void onCompleted() {
33-
child.onNext(status.get());
34-
child.onCompleted();
46+
public void onNext(T t) {
47+
boolean result = predicate.call(t);
48+
if (!result && !done) {
49+
done = true;
50+
child.onNext(false);
51+
child.onCompleted();
52+
unsubscribe();
53+
}
3554
}
3655

3756
@Override
@@ -40,19 +59,15 @@ public void onError(Throwable e) {
4059
}
4160

4261
@Override
43-
public void onNext(T t) {
44-
try {
45-
final Boolean result = predicate.call(t);
46-
boolean changed = status.compareAndSet(true, result);
47-
48-
if (changed && !result) {
49-
child.onNext(false);
50-
child.onCompleted();
51-
}
52-
} catch (Throwable e) {
53-
child.onError(OnErrorThrowable.addValueAsLastCause(e,t));
62+
public void onCompleted() {
63+
if (!done) {
64+
done = true;
65+
child.onNext(true);
66+
child.onCompleted();
5467
}
5568
}
5669
};
70+
child.add(s);
71+
return s;
5772
}
5873
}

rxjava-core/src/test/java/rx/operators/OperatorAllTest.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,26 @@
1919
import static org.mockito.Mockito.verify;
2020
import static org.mockito.Mockito.verifyNoMoreInteractions;
2121

22-
import java.util.Arrays;
23-
2422
import org.junit.Test;
2523

2624
import rx.Observable;
2725
import rx.Observer;
2826
import rx.functions.Func1;
29-
import rx.observers.TestSubscriber;
3027

3128
public class OperatorAllTest {
3229

33-
final Func1<String, Boolean> hasLength3 = new Func1<String, Boolean>() {
34-
@Override
35-
public Boolean call(String s) {
36-
return s.length() == 3;
37-
}
38-
};
39-
4030
@Test
4131
@SuppressWarnings("unchecked")
4232
public void testAll() {
43-
Observable<Boolean> obs = Observable.from(Arrays.asList("one", "two", "six")).all(hasLength3);
33+
Observable<String> obs = Observable.from("one", "two", "six");
4434

4535
Observer<Boolean> observer = mock(Observer.class);
46-
obs.subscribe(new TestSubscriber<Boolean>(observer));
36+
obs.all(new Func1<String, Boolean>() {
37+
@Override
38+
public Boolean call(String s) {
39+
return s.length() == 3;
40+
}
41+
}).subscribe(observer);
4742

4843
verify(observer).onNext(true);
4944
verify(observer).onCompleted();
@@ -53,10 +48,15 @@ public void testAll() {
5348
@Test
5449
@SuppressWarnings("unchecked")
5550
public void testNotAll() {
56-
Observable<Boolean> obs = Observable.from(Arrays.asList("one", "two", "three", "six")).all(hasLength3);
51+
Observable<String> obs = Observable.from("one", "two", "three", "six");
5752

5853
Observer<Boolean> observer = mock(Observer.class);
59-
obs.subscribe(new TestSubscriber<Boolean>(observer));
54+
obs.all(new Func1<String, Boolean>() {
55+
@Override
56+
public Boolean call(String s) {
57+
return s.length() == 3;
58+
}
59+
}).subscribe(observer);
6060

6161
verify(observer).onNext(false);
6262
verify(observer).onCompleted();
@@ -66,10 +66,15 @@ public void testNotAll() {
6666
@Test
6767
@SuppressWarnings("unchecked")
6868
public void testEmpty() {
69-
Observable<Boolean> obs = Observable.<String>empty().all(hasLength3);
69+
Observable<String> obs = Observable.empty();
7070

7171
Observer<Boolean> observer = mock(Observer.class);
72-
obs.subscribe(new TestSubscriber<Boolean>(observer));
72+
obs.all(new Func1<String, Boolean>() {
73+
@Override
74+
public Boolean call(String s) {
75+
return s.length() == 3;
76+
}
77+
}).subscribe(observer);
7378

7479
verify(observer).onNext(true);
7580
verify(observer).onCompleted();
@@ -80,10 +85,15 @@ public void testEmpty() {
8085
@SuppressWarnings("unchecked")
8186
public void testError() {
8287
Throwable error = new Throwable();
83-
Observable<Boolean> obs = Observable.<String>error(error).all(hasLength3);
88+
Observable<String> obs = Observable.error(error);
8489

8590
Observer<Boolean> observer = mock(Observer.class);
86-
obs.subscribe(new TestSubscriber<Boolean>(observer));
91+
obs.all(new Func1<String, Boolean>() {
92+
@Override
93+
public Boolean call(String s) {
94+
return s.length() == 3;
95+
}
96+
}).subscribe(observer);
8797

8898
verify(observer).onError(error);
8999
verifyNoMoreInteractions(observer);

0 commit comments

Comments
 (0)