Skip to content

Commit 0f19eb2

Browse files
OperationAll to OperatorAll
1 parent 940c26e commit 0f19eb2

File tree

4 files changed

+79
-205
lines changed

4 files changed

+79
-205
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -49,88 +49,7 @@
4949
import rx.observables.ConnectableObservable;
5050
import rx.observables.GroupedObservable;
5151
import rx.observers.SafeSubscriber;
52-
import rx.operators.OnSubscribeFromIterable;
53-
import rx.operators.OnSubscribeRange;
54-
import rx.operators.OperationAll;
55-
import rx.operators.OperationAmb;
56-
import rx.operators.OperationAny;
57-
import rx.operators.OperationAsObservable;
58-
import rx.operators.OperationAverage;
59-
import rx.operators.OperationBuffer;
60-
import rx.operators.OperationCache;
61-
import rx.operators.OperationCombineLatest;
62-
import rx.operators.OperationConcat;
63-
import rx.operators.OperationDebounce;
64-
import rx.operators.OperationDefaultIfEmpty;
65-
import rx.operators.OperationDefer;
66-
import rx.operators.OperationDelay;
67-
import rx.operators.OperationDematerialize;
68-
import rx.operators.OperationDistinct;
69-
import rx.operators.OperationDistinctUntilChanged;
70-
import rx.operators.OperationElementAt;
71-
import rx.operators.OperationFinally;
72-
import rx.operators.OperationFlatMap;
73-
import rx.operators.OperationGroupByUntil;
74-
import rx.operators.OperationGroupJoin;
75-
import rx.operators.OperationInterval;
76-
import rx.operators.OperationJoin;
77-
import rx.operators.OperationJoinPatterns;
78-
import rx.operators.OperationMaterialize;
79-
import rx.operators.OperationMergeDelayError;
80-
import rx.operators.OperationMergeMaxConcurrent;
81-
import rx.operators.OperationMinMax;
82-
import rx.operators.OperationMulticast;
83-
import rx.operators.OperationOnErrorResumeNextViaObservable;
84-
import rx.operators.OperationOnErrorReturn;
85-
import rx.operators.OperationOnExceptionResumeNextViaObservable;
86-
import rx.operators.OperationParallelMerge;
87-
import rx.operators.OperationReplay;
88-
import rx.operators.OperationSample;
89-
import rx.operators.OperationSequenceEqual;
90-
import rx.operators.OperationSingle;
91-
import rx.operators.OperationSkip;
92-
import rx.operators.OperationSkipLast;
93-
import rx.operators.OperationSkipUntil;
94-
import rx.operators.OperationSkipWhile;
95-
import rx.operators.OperationSum;
96-
import rx.operators.OperationSwitch;
97-
import rx.operators.OperationSynchronize;
98-
import rx.operators.OperationTakeLast;
99-
import rx.operators.OperationTakeTimed;
100-
import rx.operators.OperationTakeUntil;
101-
import rx.operators.OperationTakeWhile;
102-
import rx.operators.OperationThrottleFirst;
103-
import rx.operators.OperationTimeInterval;
104-
import rx.operators.OperationTimer;
105-
import rx.operators.OperationToMap;
106-
import rx.operators.OperationToMultimap;
107-
import rx.operators.OperationToObservableFuture;
108-
import rx.operators.OperationUsing;
109-
import rx.operators.OperationWindow;
110-
import rx.operators.OperatorCast;
111-
import rx.operators.OperatorDoOnEach;
112-
import rx.operators.OperatorFilter;
113-
import rx.operators.OperatorGroupBy;
114-
import rx.operators.OperatorMap;
115-
import rx.operators.OperatorMerge;
116-
import rx.operators.OperatorObserveOn;
117-
import rx.operators.OperatorOnErrorFlatMap;
118-
import rx.operators.OperatorOnErrorResumeNextViaFunction;
119-
import rx.operators.OperatorParallel;
120-
import rx.operators.OperatorRepeat;
121-
import rx.operators.OperatorRetry;
122-
import rx.operators.OperatorScan;
123-
import rx.operators.OperatorSkip;
124-
import rx.operators.OperatorSubscribeOn;
125-
import rx.operators.OperatorTake;
126-
import rx.operators.OperatorTimeout;
127-
import rx.operators.OperatorTimeoutWithSelector;
128-
import rx.operators.OperatorTimestamp;
129-
import rx.operators.OperatorToObservableList;
130-
import rx.operators.OperatorToObservableSortedList;
131-
import rx.operators.OperatorUnsubscribeOn;
132-
import rx.operators.OperatorZip;
133-
import rx.operators.OperatorZipIterable;
52+
import rx.operators.*;
13453
import rx.plugins.RxJavaObservableExecutionHook;
13554
import rx.plugins.RxJavaPlugins;
13655
import rx.schedulers.Schedulers;
@@ -3461,7 +3380,7 @@ public final <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R>
34613380
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-all">RxJava Wiki: all()</a>
34623381
*/
34633382
public final Observable<Boolean> all(Func1<? super T, Boolean> predicate) {
3464-
return create(OperationAll.all(this, predicate));
3383+
return lift(new OperatorAll<T>(predicate));
34653384
}
34663385

34673386
/**

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 0 additions & 91 deletions
This file was deleted.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.Subscriber;
5+
import rx.exceptions.OnErrorThrowable;
6+
import rx.functions.Func1;
7+
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
import static rx.Observable.Operator;
11+
12+
/**
13+
* Returns an Observable that emits a Boolean that indicates whether all items emitted by an
14+
* Observable satisfy a condition.
15+
* <p>
16+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/all.png">
17+
*/
18+
public class OperatorAll<T> implements Operator<Boolean,T>{
19+
20+
private final Func1<? super T, Boolean> predicate;
21+
22+
public OperatorAll(Func1<? super T, Boolean> predicate) {
23+
this.predicate = predicate;
24+
}
25+
26+
@Override
27+
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
28+
return new Subscriber<T>() {
29+
private AtomicBoolean status = new AtomicBoolean(true);
30+
31+
@Override
32+
public void onCompleted() {
33+
child.onNext(status.get());
34+
child.onCompleted();
35+
}
36+
37+
@Override
38+
public void onError(Throwable e) {
39+
child.onError(e);
40+
}
41+
42+
@Override
43+
public void onNext(T t) {
44+
try {
45+
final Boolean result = predicate.call(t);
46+
boolean changed = status.compareAndSet(true, result);
47+
48+
if (changed && !result) {
49+
child.onNext(false);
50+
child.onCompleted();
51+
}
52+
} catch (Throwable e) {
53+
child.onError(OnErrorThrowable.addValueAsLastCause(e,t));
54+
}
55+
}
56+
};
57+
}
58+
}

rxjava-core/src/test/java/rx/operators/OperationAllTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorAllTest.java

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,31 @@
1616
package rx.operators;
1717

1818
import static org.mockito.Mockito.*;
19-
import static rx.operators.OperationAll.*;
20-
2119
import org.junit.Test;
2220

2321
import rx.Observable;
2422
import rx.Observer;
2523
import rx.functions.Func1;
24+
import rx.observers.TestSubscriber;
25+
26+
import java.util.Arrays;
27+
28+
public class OperatorAllTest {
2629

27-
public class OperationAllTest {
30+
final Func1<String, Boolean> hasLength3 = new Func1<String, Boolean>() {
31+
@Override
32+
public Boolean call(String s) {
33+
return s.length() == 3;
34+
}
35+
};
2836

2937
@Test
3038
@SuppressWarnings("unchecked")
3139
public void testAll() {
32-
Observable<String> obs = Observable.from("one", "two", "six");
40+
Observable<Boolean> obs = Observable.from(Arrays.asList("one", "two", "six")).all(hasLength3);
3341

3442
Observer<Boolean> observer = mock(Observer.class);
35-
Observable.create(all(obs, new Func1<String, Boolean>() {
36-
@Override
37-
public Boolean call(String s) {
38-
return s.length() == 3;
39-
}
40-
})).subscribe(observer);
43+
obs.subscribe(new TestSubscriber<Boolean>(observer));
4144

4245
verify(observer).onNext(true);
4346
verify(observer).onCompleted();
@@ -47,15 +50,10 @@ public Boolean call(String s) {
4750
@Test
4851
@SuppressWarnings("unchecked")
4952
public void testNotAll() {
50-
Observable<String> obs = Observable.from("one", "two", "three", "six");
53+
Observable<Boolean> obs = Observable.from(Arrays.asList("one", "two", "three", "six")).all(hasLength3);
5154

5255
Observer<Boolean> observer = mock(Observer.class);
53-
Observable.create(all(obs, new Func1<String, Boolean>() {
54-
@Override
55-
public Boolean call(String s) {
56-
return s.length() == 3;
57-
}
58-
})).subscribe(observer);
56+
obs.subscribe(new TestSubscriber<Boolean>(observer));
5957

6058
verify(observer).onNext(false);
6159
verify(observer).onCompleted();
@@ -65,15 +63,10 @@ public Boolean call(String s) {
6563
@Test
6664
@SuppressWarnings("unchecked")
6765
public void testEmpty() {
68-
Observable<String> obs = Observable.empty();
66+
Observable<Boolean> obs = Observable.<String>empty().all(hasLength3);
6967

7068
Observer<Boolean> observer = mock(Observer.class);
71-
Observable.create(all(obs, new Func1<String, Boolean>() {
72-
@Override
73-
public Boolean call(String s) {
74-
return s.length() == 3;
75-
}
76-
})).subscribe(observer);
69+
obs.subscribe(new TestSubscriber<Boolean>(observer));
7770

7871
verify(observer).onNext(true);
7972
verify(observer).onCompleted();
@@ -84,15 +77,10 @@ public Boolean call(String s) {
8477
@SuppressWarnings("unchecked")
8578
public void testError() {
8679
Throwable error = new Throwable();
87-
Observable<String> obs = Observable.error(error);
80+
Observable<Boolean> obs = Observable.<String>error(error).all(hasLength3);
8881

8982
Observer<Boolean> observer = mock(Observer.class);
90-
Observable.create(all(obs, new Func1<String, Boolean>() {
91-
@Override
92-
public Boolean call(String s) {
93-
return s.length() == 3;
94-
}
95-
})).subscribe(observer);
83+
obs.subscribe(new TestSubscriber<Boolean>(observer));
9684

9785
verify(observer).onError(error);
9886
verifyNoMoreInteractions(observer);

0 commit comments

Comments
 (0)