17
17
import rx .util .functions .Func1 ;
18
18
19
19
/**
20
- * Returns an {@link Observable} that emits <code>true</code> if all items of an
21
- * observable sequence satisfy a condition, otherwise <code>false</code>.
20
+ * Returns an {@link Observable} that emits <code>true</code> if any element of
21
+ * an observable sequence satisfies a condition, otherwise <code>false</code>.
22
22
*/
23
23
public final class OperationAny {
24
24
@@ -36,15 +36,16 @@ public static <T> OnSubscribeFunc<Boolean> any(
36
36
}
37
37
38
38
/**
39
- * Returns an {@link Observable} that emits <code>true</code> if all items
40
- * of the source {@link Observable} satisfy the given condition, otherwise
41
- * <code>false</code>.
39
+ * Returns an {@link Observable} that emits <code>true</code> if any element
40
+ * of the source {@link Observable} satisfies the given condition, otherwise
41
+ * <code>false</code>. Note: always emit <code>false</code> if the source
42
+ * {@link Observable} is empty.
42
43
*
43
44
* @param source
44
- * The source {@link Observable} to check if all items in it
45
- * satisfy the given condition
45
+ * The source {@link Observable} to check if any element
46
+ * satisfies the given condition.
46
47
* @param predicate
47
- * The condition all items have to satisfy .
48
+ * The condition to test every element .
48
49
* @return A subscription function for creating the target Observable.
49
50
*/
50
51
public static <T > OnSubscribeFunc <Boolean > any (
@@ -71,16 +72,13 @@ public Subscription onSubscribe(final Observer<? super Boolean> observer) {
71
72
private final AtomicBoolean hasEmitted = new AtomicBoolean (
72
73
false );
73
74
74
- private volatile boolean isNotEmpty = false ;
75
-
76
75
@ Override
77
76
public void onNext (T value ) {
78
- isNotEmpty = true ;
79
77
try {
80
78
if (hasEmitted .get () == false ) {
81
- if (predicate .call (value ) == false
79
+ if (predicate .call (value ) == true
82
80
&& hasEmitted .getAndSet (true ) == false ) {
83
- observer .onNext (false );
81
+ observer .onNext (true );
84
82
observer .onCompleted ();
85
83
// this will work if the sequence is
86
84
// asynchronous, it
@@ -106,7 +104,7 @@ public void onError(Throwable ex) {
106
104
@ Override
107
105
public void onCompleted () {
108
106
if (!hasEmitted .get ()) {
109
- observer .onNext (isNotEmpty );
107
+ observer .onNext (false );
110
108
observer .onCompleted ();
111
109
}
112
110
}
@@ -164,13 +162,13 @@ public void testAnyWithEmpty() {
164
162
165
163
@ Test
166
164
public void testAnyWithPredicate1 () {
167
- Observable <Integer > w = Observable .from (1 , 2 );
165
+ Observable <Integer > w = Observable .from (1 , 2 , 3 );
168
166
Observable <Boolean > observable = Observable .create (any (w ,
169
167
new Func1 <Integer , Boolean >() {
170
168
171
169
@ Override
172
170
public Boolean call (Integer t1 ) {
173
- return t1 < 3 ;
171
+ return t1 < 2 ;
174
172
}
175
173
}));
176
174
@@ -192,7 +190,7 @@ public void testAnyWithPredicate2() {
192
190
193
191
@ Override
194
192
public Boolean call (Integer t1 ) {
195
- return t1 < 3 ;
193
+ return t1 < 1 ;
196
194
}
197
195
}));
198
196
0 commit comments