Skip to content

Commit 72d47ea

Browse files
Merge branch 'any' of git://github.com/zsxwing/RxJava into merge-385-any
2 parents 75d8182 + c8f1199 commit 72d47ea

File tree

6 files changed

+272
-13
lines changed

6 files changed

+272
-13
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import rx.operators.OperationZip;
7979
import rx.operators.SafeObservableSubscription;
8080
import rx.operators.SafeObserver;
81+
import rx.operators.OperationAny;
8182
import rx.plugins.RxJavaErrorHandler;
8283
import rx.plugins.RxJavaObservableExecutionHook;
8384
import rx.plugins.RxJavaPlugins;
@@ -4352,4 +4353,35 @@ private boolean isInternalImplementation(Object o) {
43524353
return p != null && p.getName().startsWith("rx.operators");
43534354
}
43544355

4356+
/**
4357+
* Returns an {@link Observable} that emits <code>true</code> if the source
4358+
* {@link Observable} is not empty, otherwise <code>false</code>.
4359+
*
4360+
* @return A subscription function for creating the target Observable.
4361+
* @see <a href=
4362+
* "http://msdn.microsoft.com/en-us/library/hh229905(v=vs.103).aspx"
4363+
* >MSDN: Observable.Any</a>
4364+
*/
4365+
public Observable<Boolean> any() {
4366+
return create(OperationAny.any(this));
4367+
}
4368+
4369+
/**
4370+
* Returns an {@link Observable} that emits <code>true</code> if any element
4371+
* of the source {@link Observable} satisfies the given condition, otherwise
4372+
* <code>false</code>. Note: always emit <code>false</code> if the source
4373+
* {@link Observable} is empty.
4374+
*
4375+
* @param predicate
4376+
* The condition to test every element.
4377+
* @return A subscription function for creating the target Observable.
4378+
* @see <a href=
4379+
* "http://msdn.microsoft.com/en-us/library/hh211993(v=vs.103).aspx"
4380+
* >MSDN: Observable.Any</a> Note: the description in this page is
4381+
* wrong.
4382+
*/
4383+
public Observable<Boolean> any(Func1<? super T, Boolean> predicate) {
4384+
return create(OperationAny.any(this, predicate));
4385+
}
4386+
43554387
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package rx.operators;
2+
3+
import static org.mockito.Mockito.mock;
4+
import static org.mockito.Mockito.never;
5+
import static org.mockito.Mockito.times;
6+
import static org.mockito.Mockito.verify;
7+
import static rx.util.functions.Functions.alwaysTrue;
8+
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable;
14+
import rx.Observable.OnSubscribeFunc;
15+
import rx.Observer;
16+
import rx.Subscription;
17+
import rx.util.functions.Func1;
18+
19+
/**
20+
* Returns an {@link Observable} that emits <code>true</code> if any element of
21+
* an observable sequence satisfies a condition, otherwise <code>false</code>.
22+
*/
23+
public final class OperationAny {
24+
25+
/**
26+
* Returns an {@link Observable} that emits <code>true</code> if the source
27+
* {@link Observable} is not empty, otherwise <code>false</code>.
28+
*
29+
* @param source
30+
* The source {@link Observable} to check if not empty.
31+
* @return A subscription function for creating the target Observable.
32+
*/
33+
public static <T> OnSubscribeFunc<Boolean> any(
34+
Observable<? extends T> source) {
35+
return new Any<T>(source, alwaysTrue());
36+
}
37+
38+
/**
39+
* Returns an {@link Observable} that emits <code>true</code> if any element
40+
* of the source {@link Observable} satisfies the given condition, otherwise
41+
* <code>false</code>. Note: always emit <code>false</code> if the source
42+
* {@link Observable} is empty.
43+
*
44+
* @param source
45+
* The source {@link Observable} to check if any element
46+
* satisfies the given condition.
47+
* @param predicate
48+
* The condition to test every element.
49+
* @return A subscription function for creating the target Observable.
50+
*/
51+
public static <T> OnSubscribeFunc<Boolean> any(
52+
Observable<? extends T> source, Func1<? super T, Boolean> predicate) {
53+
return new Any<T>(source, predicate);
54+
}
55+
56+
private static class Any<T> implements OnSubscribeFunc<Boolean> {
57+
58+
private final Observable<? extends T> source;
59+
private final Func1<? super T, Boolean> predicate;
60+
61+
private Any(Observable<? extends T> source,
62+
Func1<? super T, Boolean> predicate) {
63+
this.source = source;
64+
this.predicate = predicate;
65+
}
66+
67+
@Override
68+
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
69+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
70+
return subscription.wrap(source.subscribe(new Observer<T>() {
71+
72+
private final AtomicBoolean hasEmitted = new AtomicBoolean(
73+
false);
74+
75+
@Override
76+
public void onNext(T value) {
77+
try {
78+
if (hasEmitted.get() == false) {
79+
if (predicate.call(value) == true
80+
&& hasEmitted.getAndSet(true) == false) {
81+
observer.onNext(true);
82+
observer.onCompleted();
83+
// this will work if the sequence is
84+
// asynchronous, it
85+
// will have no effect on a synchronous
86+
// observable
87+
subscription.unsubscribe();
88+
}
89+
}
90+
} catch (Throwable ex) {
91+
observer.onError(ex);
92+
// this will work if the sequence is asynchronous, it
93+
// will have no effect on a synchronous observable
94+
subscription.unsubscribe();
95+
}
96+
97+
}
98+
99+
@Override
100+
public void onError(Throwable ex) {
101+
observer.onError(ex);
102+
}
103+
104+
@Override
105+
public void onCompleted() {
106+
if (!hasEmitted.get()) {
107+
observer.onNext(false);
108+
observer.onCompleted();
109+
}
110+
}
111+
}));
112+
}
113+
114+
}
115+
116+
public static class UnitTest {
117+
118+
@Test
119+
public void testAnyWithTwoItems() {
120+
Observable<Integer> w = Observable.from(1, 2);
121+
Observable<Boolean> observable = Observable.create(any(w));
122+
123+
@SuppressWarnings("unchecked")
124+
Observer<Boolean> aObserver = mock(Observer.class);
125+
observable.subscribe(aObserver);
126+
verify(aObserver, never()).onNext(false);
127+
verify(aObserver, times(1)).onNext(true);
128+
verify(aObserver, never()).onError(
129+
org.mockito.Matchers.any(Throwable.class));
130+
verify(aObserver, times(1)).onCompleted();
131+
}
132+
133+
@Test
134+
public void testAnyWithOneItem() {
135+
Observable<Integer> w = Observable.from(1);
136+
Observable<Boolean> observable = Observable.create(any(w));
137+
138+
@SuppressWarnings("unchecked")
139+
Observer<Boolean> aObserver = mock(Observer.class);
140+
observable.subscribe(aObserver);
141+
verify(aObserver, never()).onNext(false);
142+
verify(aObserver, times(1)).onNext(true);
143+
verify(aObserver, never()).onError(
144+
org.mockito.Matchers.any(Throwable.class));
145+
verify(aObserver, times(1)).onCompleted();
146+
}
147+
148+
@Test
149+
public void testAnyWithEmpty() {
150+
Observable<Integer> w = Observable.empty();
151+
Observable<Boolean> observable = Observable.create(any(w));
152+
153+
@SuppressWarnings("unchecked")
154+
Observer<Boolean> aObserver = mock(Observer.class);
155+
observable.subscribe(aObserver);
156+
verify(aObserver, times(1)).onNext(false);
157+
verify(aObserver, never()).onNext(true);
158+
verify(aObserver, never()).onError(
159+
org.mockito.Matchers.any(Throwable.class));
160+
verify(aObserver, times(1)).onCompleted();
161+
}
162+
163+
@Test
164+
public void testAnyWithPredicate1() {
165+
Observable<Integer> w = Observable.from(1, 2, 3);
166+
Observable<Boolean> observable = Observable.create(any(w,
167+
new Func1<Integer, Boolean>() {
168+
169+
@Override
170+
public Boolean call(Integer t1) {
171+
return t1 < 2;
172+
}
173+
}));
174+
175+
@SuppressWarnings("unchecked")
176+
Observer<Boolean> aObserver = mock(Observer.class);
177+
observable.subscribe(aObserver);
178+
verify(aObserver, never()).onNext(false);
179+
verify(aObserver, times(1)).onNext(true);
180+
verify(aObserver, never()).onError(
181+
org.mockito.Matchers.any(Throwable.class));
182+
verify(aObserver, times(1)).onCompleted();
183+
}
184+
185+
@Test
186+
public void testAnyWithPredicate2() {
187+
Observable<Integer> w = Observable.from(1, 2, 3);
188+
Observable<Boolean> observable = Observable.create(any(w,
189+
new Func1<Integer, Boolean>() {
190+
191+
@Override
192+
public Boolean call(Integer t1) {
193+
return t1 < 1;
194+
}
195+
}));
196+
197+
@SuppressWarnings("unchecked")
198+
Observer<Boolean> aObserver = mock(Observer.class);
199+
observable.subscribe(aObserver);
200+
verify(aObserver, times(1)).onNext(false);
201+
verify(aObserver, never()).onNext(true);
202+
verify(aObserver, never()).onError(
203+
org.mockito.Matchers.any(Throwable.class));
204+
verify(aObserver, times(1)).onCompleted();
205+
}
206+
207+
@Test
208+
public void testAnyWithEmptyAndPredicate() {
209+
// If the source is empty, always output false.
210+
Observable<Integer> w = Observable.empty();
211+
Observable<Boolean> observable = Observable.create(any(w,
212+
new Func1<Integer, Boolean>() {
213+
214+
@Override
215+
public Boolean call(Integer t1) {
216+
return true;
217+
}
218+
}));
219+
220+
@SuppressWarnings("unchecked")
221+
Observer<Boolean> aObserver = mock(Observer.class);
222+
observable.subscribe(aObserver);
223+
verify(aObserver, times(1)).onNext(false);
224+
verify(aObserver, never()).onNext(true);
225+
verify(aObserver, never()).onError(
226+
org.mockito.Matchers.any(Throwable.class));
227+
verify(aObserver, times(1)).onCompleted();
228+
}
229+
}
230+
}

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public void testCompleted() {
169169
private void assertCompletedObserver(Observer<String> aObserver)
170170
{
171171
verify(aObserver, times(1)).onNext("three");
172-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
172+
verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
173173
verify(aObserver, times(1)).onCompleted();
174174
}
175175

@@ -222,7 +222,7 @@ public void testUnsubscribeBeforeCompleted() {
222222
private void assertNoOnNextEventsReceived(Observer<String> aObserver)
223223
{
224224
verify(aObserver, Mockito.never()).onNext(anyString());
225-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
225+
verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
226226
verify(aObserver, Mockito.never()).onCompleted();
227227
}
228228

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.subjects;
1717

18-
import static org.mockito.Matchers.*;
1918
import static org.mockito.Mockito.*;
2019

2120
import java.util.concurrent.ConcurrentHashMap;
@@ -200,7 +199,7 @@ private void assertCompletedObserver(Observer<String> aObserver)
200199
{
201200
verify(aObserver, times(1)).onNext("default");
202201
verify(aObserver, times(1)).onNext("one");
203-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
202+
verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
204203
verify(aObserver, times(1)).onCompleted();
205204
}
206205

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
2019
import static org.mockito.Mockito.*;
2120

2221
import java.util.ArrayList;
@@ -274,7 +273,7 @@ private void assertCompletedObserver(Observer<String> aObserver)
274273
verify(aObserver, times(1)).onNext("one");
275274
verify(aObserver, times(1)).onNext("two");
276275
verify(aObserver, times(1)).onNext("three");
277-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
276+
verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
278277
verify(aObserver, times(1)).onCompleted();
279278
}
280279

@@ -341,7 +340,7 @@ private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver
341340
verify(aObserver, Mockito.never()).onNext("one");
342341
verify(aObserver, Mockito.never()).onNext("two");
343342
verify(aObserver, times(1)).onNext("three");
344-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
343+
verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
345344
verify(aObserver, times(1)).onCompleted();
346345
}
347346

@@ -375,7 +374,7 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
375374
verify(aObserver, times(1)).onNext("one");
376375
verify(aObserver, times(1)).onNext("two");
377376
verify(aObserver, Mockito.never()).onNext("three");
378-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
377+
verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
379378
verify(aObserver, Mockito.never()).onCompleted();
380379
}
381380

@@ -405,7 +404,7 @@ public void testUnsubscribeAfterOnCompleted() {
405404
inOrder.verify(anObserver, times(1)).onNext("one");
406405
inOrder.verify(anObserver, times(1)).onNext("two");
407406
inOrder.verify(anObserver, times(1)).onCompleted();
408-
inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class));
407+
inOrder.verify(anObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
409408

410409
@SuppressWarnings("unchecked")
411410
Observer<String> anotherObserver = mock(Observer.class);
@@ -415,7 +414,7 @@ public void testUnsubscribeAfterOnCompleted() {
415414
inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
416415
inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
417416
inOrder.verify(anotherObserver, times(1)).onCompleted();
418-
inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class));
417+
inOrder.verify(anotherObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
419418
}
420419

421420
@Test

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.subjects;
1717

18-
import static org.mockito.Matchers.*;
1918
import static org.mockito.Mockito.*;
2019

2120
import java.util.ArrayList;
@@ -216,7 +215,7 @@ private void assertCompletedObserver(Observer<String> aObserver)
216215
inOrder.verify(aObserver, times(1)).onNext("one");
217216
inOrder.verify(aObserver, times(1)).onNext("two");
218217
inOrder.verify(aObserver, times(1)).onNext("three");
219-
inOrder.verify(aObserver, Mockito.never()).onError(any(Throwable.class));
218+
inOrder.verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
220219
inOrder.verify(aObserver, times(1)).onCompleted();
221220
inOrder.verifyNoMoreInteractions();
222221
}
@@ -308,7 +307,7 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
308307
verify(aObserver, times(1)).onNext("one");
309308
verify(aObserver, times(1)).onNext("two");
310309
verify(aObserver, Mockito.never()).onNext("three");
311-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
310+
verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class));
312311
verify(aObserver, Mockito.never()).onCompleted();
313312
}
314313

0 commit comments

Comments
 (0)