Skip to content

Commit fb0e8b0

Browse files
committed
Merge branch 'upstream' into issue43
2 parents 232612c + f1c54b5 commit fb0e8b0

File tree

10 files changed

+412
-119
lines changed

10 files changed

+412
-119
lines changed

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,12 @@ def class ObservableTests {
275275

276276
}
277277

278+
@Test
279+
public void testAll() {
280+
Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
281+
verify(a, times(1)).received(true);
282+
}
283+
278284
def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
279285

280286
public Subscription call(final Observer<Integer> observer) {

rxjava-core/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ targetCompatibility = JavaVersion.VERSION_1_6
88

99
dependencies {
1010
compile 'org.slf4j:slf4j-api:1.7.0'
11-
compile 'com.google.code.findbugs:jsr305:2.0.0'
1211
provided 'junit:junit:4.10'
1312
provided 'org.mockito:mockito-core:1.8.5'
1413
}

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.mockito.MockitoAnnotations;
3737

3838
import rx.observables.GroupedObservable;
39+
import rx.operators.OperationAll;
3940
import rx.operators.OperationConcat;
4041
import rx.operators.OperationDefer;
4142
import rx.operators.OperationDematerialize;
@@ -1678,6 +1679,35 @@ public T call(T t1, T t2) {
16781679
});
16791680
}
16801681

1682+
/**
1683+
* Determines whether all elements of an observable sequence satisfies a condition.
1684+
* @param sequence an observable sequence whose elements to apply the predicate to.
1685+
* @param predicate a function to test each element for a condition.
1686+
* @param <T> the type of observable.
1687+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
1688+
*/
1689+
public static <T> Observable<Boolean> all(final Observable<T> sequence, final Func1<T, Boolean> predicate) {
1690+
return _create(OperationAll.all(sequence, predicate));
1691+
}
1692+
1693+
/**
1694+
* Determines whether all elements of an observable sequence satisfies a condition.
1695+
* @param sequence an observable sequence whose elements to apply the predicate to.
1696+
* @param predicate a function to test each element for a condition.
1697+
* @param <T> the type of observable.
1698+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
1699+
*/
1700+
public static <T> Observable<Boolean> all(final Observable<T> sequence, Object predicate) {
1701+
final FuncN _f = Functions.from(predicate);
1702+
1703+
return all(sequence, new Func1<T, Boolean>() {
1704+
@Override
1705+
public Boolean call(T t) {
1706+
return (Boolean) _f.call(t);
1707+
}
1708+
});
1709+
}
1710+
16811711
/**
16821712
* Returns an Observable that skips the first <code>num</code> items emitted by the source
16831713
* Observable. You can ignore the first <code>num</code> items emitted by an Observable and attend
@@ -2997,6 +3027,24 @@ public Observable<T> scan(final T initialValue, final Object accumulator) {
29973027
return scan(this, initialValue, accumulator);
29983028
}
29993029

3030+
/**
3031+
* Determines whether all elements of an observable sequence satisfies a condition.
3032+
* @param predicate a function to test each element for a condition.
3033+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
3034+
*/
3035+
public Observable<Boolean> all(Func1<T, Boolean> predicate) {
3036+
return all(this, predicate);
3037+
}
3038+
3039+
/**
3040+
* Determines whether all elements of an observable sequence satisfies a condition.
3041+
* @param predicate a function to test each element for a condition.
3042+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
3043+
*/
3044+
public Observable<Boolean> all(Object predicate) {
3045+
return all(this, predicate);
3046+
}
3047+
30003048
/**
30013049
* Returns an Observable that skips the first <code>num</code> items emitted by the source
30023050
* Observable.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package rx.operators;
2+
3+
import org.junit.Test;
4+
import rx.Observable;
5+
import rx.Observer;
6+
import rx.Subscription;
7+
import rx.util.AtomicObservableSubscription;
8+
import rx.util.functions.Func1;
9+
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.verify;
14+
import static org.mockito.Mockito.verifyNoMoreInteractions;
15+
16+
public class OperationAll {
17+
18+
public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<T, Boolean> predicate) {
19+
return new AllObservable<T>(sequence, predicate);
20+
}
21+
22+
private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscription> {
23+
private final Observable<T> sequence;
24+
private final Func1<T, Boolean> predicate;
25+
26+
private final AtomicBoolean status = new AtomicBoolean(true);
27+
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
28+
29+
30+
private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
31+
this.sequence = sequence;
32+
this.predicate = predicate;
33+
}
34+
35+
36+
@Override
37+
public Subscription call(final Observer<Boolean> observer) {
38+
return subscription.wrap(sequence.subscribe(new Observer<T>() {
39+
@Override
40+
public void onCompleted() {
41+
if (status.get()) {
42+
observer.onNext(true);
43+
observer.onCompleted();
44+
}
45+
}
46+
47+
@Override
48+
public void onError(Exception e) {
49+
observer.onError(e);
50+
}
51+
52+
@Override
53+
public void onNext(T args) {
54+
boolean result = predicate.call(args);
55+
boolean changed = status.compareAndSet(true, result);
56+
57+
if (changed && !result) {
58+
observer.onNext(false);
59+
observer.onCompleted();
60+
subscription.unsubscribe();
61+
}
62+
}
63+
}));
64+
}
65+
}
66+
67+
public static class UnitTest {
68+
69+
@Test
70+
@SuppressWarnings("unchecked")
71+
public void testAll() {
72+
Observable<String> obs = Observable.from("one", "two", "six");
73+
74+
Observer<Boolean> observer = mock(Observer.class);
75+
Observable.create(all(obs, new Func1<String, Boolean>() {
76+
@Override
77+
public Boolean call(String s) {
78+
return s.length() == 3;
79+
}
80+
})).subscribe(observer);
81+
82+
verify(observer).onNext(true);
83+
verify(observer).onCompleted();
84+
verifyNoMoreInteractions(observer);
85+
}
86+
87+
@Test
88+
@SuppressWarnings("unchecked")
89+
public void testNotAll() {
90+
Observable<String> obs = Observable.from("one", "two", "three", "six");
91+
92+
Observer<Boolean> observer = mock(Observer.class);
93+
Observable.create(all(obs, new Func1<String, Boolean>() {
94+
@Override
95+
public Boolean call(String s) {
96+
return s.length() == 3;
97+
}
98+
})).subscribe(observer);
99+
100+
verify(observer).onNext(false);
101+
verify(observer).onCompleted();
102+
verifyNoMoreInteractions(observer);
103+
}
104+
105+
@Test
106+
@SuppressWarnings("unchecked")
107+
public void testEmpty() {
108+
Observable<String> obs = Observable.empty();
109+
110+
Observer<Boolean> observer = mock(Observer.class);
111+
Observable.create(all(obs, new Func1<String, Boolean>() {
112+
@Override
113+
public Boolean call(String s) {
114+
return s.length() == 3;
115+
}
116+
})).subscribe(observer);
117+
118+
verify(observer).onNext(true);
119+
verify(observer).onCompleted();
120+
verifyNoMoreInteractions(observer);
121+
}
122+
123+
@Test
124+
@SuppressWarnings("unchecked")
125+
public void testError() {
126+
Exception error = new Exception();
127+
Observable<String> obs = Observable.error(error);
128+
129+
Observer<Boolean> observer = mock(Observer.class);
130+
Observable.create(all(obs, new Func1<String, Boolean>() {
131+
@Override
132+
public Boolean call(String s) {
133+
return s.length() == 3;
134+
}
135+
})).subscribe(observer);
136+
137+
verify(observer).onError(error);
138+
verifyNoMoreInteractions(observer);
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)