Skip to content

Commit 6c4b573

Browse files
Merge pull request #398 from benjchristensen/merge-385-any
Merge 'any' Pull Request
2 parents 75d8182 + d5dd873 commit 6c4b573

File tree

5 files changed

+316
-4
lines changed

5 files changed

+316
-4
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import rx.operators.OperationZip;
7979
import rx.operators.SafeObservableSubscription;
8080
import rx.operators.SafeObserver;
81+
import rx.operators.OperationAny;
8182
import rx.plugins.RxJavaErrorHandler;
8283
import rx.plugins.RxJavaObservableExecutionHook;
8384
import rx.plugins.RxJavaPlugins;
@@ -3138,6 +3139,22 @@ public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector, Com
31383139
return create(OperationDistinct.distinct(this, keySelector, equalityComparator));
31393140
}
31403141

3142+
/**
3143+
* Returns an {@link Observable} that emits <code>true</code> if any element of the source {@link Observable} satisfies
3144+
* the given condition, otherwise <code>false</code>. Note: always emit <code>false</code> if the source {@link Observable} is empty.
3145+
* <p>
3146+
* In Rx.Net this is the <code>any</code> operator but renamed in RxJava to better match Java naming idioms.
3147+
*
3148+
* @param predicate
3149+
* The condition to test every element.
3150+
* @return A subscription function for creating the target Observable.
3151+
* @see <a href= "http://msdn.microsoft.com/en-us/library/hh211993(v=vs.103).aspx" >MSDN: Observable.Any</a> Note: the description in this page is
3152+
* wrong.
3153+
*/
3154+
public Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
3155+
return create(OperationAny.exists(this, predicate));
3156+
}
3157+
31413158
/**
31423159
* Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}.
31433160
* <p>
@@ -4319,7 +4336,19 @@ public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T,
43194336
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
43204337
return create(OperationGroupBy.groupBy(this, keySelector));
43214338
}
4322-
4339+
4340+
/**
4341+
* Returns an {@link Observable} that emits <code>true</code> if the source {@link Observable} is empty, otherwise <code>false</code>.
4342+
* <p>
4343+
* In Rx.Net this is negated as the <code>any</code> operator but renamed in RxJava to better match Java naming idioms.
4344+
*
4345+
* @return A subscription function for creating the target Observable.
4346+
* @see <a href= "http://msdn.microsoft.com/en-us/library/hh229905(v=vs.103).aspx" >MSDN: Observable.Any</a>
4347+
*/
4348+
public Observable<Boolean> isEmpty() {
4349+
return create(OperationAny.isEmpty(this));
4350+
}
4351+
43234352
/**
43244353
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
43254354
* operators).
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
package rx.operators;
2+
3+
import static org.mockito.Mockito.*;
4+
import static rx.util.functions.Functions.*;
5+
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
8+
import org.junit.Test;
9+
10+
import rx.Observable;
11+
import rx.Observable.OnSubscribeFunc;
12+
import rx.Observer;
13+
import rx.Subscription;
14+
import rx.util.functions.Func1;
15+
16+
/**
17+
* Returns an {@link Observable} that emits <code>true</code> if any element of
18+
* an observable sequence satisfies a condition, otherwise <code>false</code>.
19+
*/
20+
public final class OperationAny {
21+
22+
/**
23+
* Returns an {@link Observable} that emits <code>true</code> if the source {@link Observable} is not empty, otherwise <code>false</code>.
24+
*
25+
* @param source
26+
* The source {@link Observable} to check if not empty.
27+
* @return A subscription function for creating the target Observable.
28+
*/
29+
public static <T> OnSubscribeFunc<Boolean> any(Observable<? extends T> source) {
30+
return new Any<T>(source, alwaysTrue(), false);
31+
}
32+
33+
public static <T> OnSubscribeFunc<Boolean> isEmpty(Observable<? extends T> source) {
34+
return new Any<T>(source, alwaysTrue(), true);
35+
}
36+
37+
/**
38+
* Returns an {@link Observable} that emits <code>true</code> if any element
39+
* of the source {@link Observable} satisfies the given condition, otherwise
40+
* <code>false</code>. Note: always emit <code>false</code> if the source {@link Observable} is empty.
41+
*
42+
* @param source
43+
* The source {@link Observable} to check if any element
44+
* satisfies the given condition.
45+
* @param predicate
46+
* The condition to test every element.
47+
* @return A subscription function for creating the target Observable.
48+
*/
49+
public static <T> OnSubscribeFunc<Boolean> any(Observable<? extends T> source, Func1<? super T, Boolean> predicate) {
50+
return new Any<T>(source, predicate, false);
51+
}
52+
53+
public static <T> OnSubscribeFunc<Boolean> exists(Observable<? extends T> source, Func1<? super T, Boolean> predicate) {
54+
return any(source, predicate);
55+
}
56+
57+
private static class Any<T> implements OnSubscribeFunc<Boolean> {
58+
59+
private final Observable<? extends T> source;
60+
private final Func1<? super T, Boolean> predicate;
61+
private final boolean returnOnEmpty;
62+
63+
private Any(Observable<? extends T> source, Func1<? super T, Boolean> predicate, boolean returnOnEmpty) {
64+
this.source = source;
65+
this.predicate = predicate;
66+
this.returnOnEmpty = returnOnEmpty;
67+
}
68+
69+
@Override
70+
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
71+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
72+
return subscription.wrap(source.subscribe(new Observer<T>() {
73+
74+
private final AtomicBoolean hasEmitted = new AtomicBoolean(false);
75+
76+
@Override
77+
public void onNext(T value) {
78+
try {
79+
if (hasEmitted.get() == false) {
80+
if (predicate.call(value) == true
81+
&& hasEmitted.getAndSet(true) == false) {
82+
observer.onNext(!returnOnEmpty);
83+
observer.onCompleted();
84+
// this will work if the sequence is asynchronous, it
85+
// will have no effect on a synchronous observable
86+
subscription.unsubscribe();
87+
}
88+
}
89+
} catch (Throwable ex) {
90+
observer.onError(ex);
91+
// this will work if the sequence is asynchronous, it
92+
// will have no effect on a synchronous observable
93+
subscription.unsubscribe();
94+
}
95+
96+
}
97+
98+
@Override
99+
public void onError(Throwable ex) {
100+
observer.onError(ex);
101+
}
102+
103+
@Override
104+
public void onCompleted() {
105+
if (!hasEmitted.get()) {
106+
observer.onNext(returnOnEmpty);
107+
observer.onCompleted();
108+
}
109+
}
110+
}));
111+
}
112+
113+
}
114+
115+
public static class UnitTest {
116+
117+
@Test
118+
public void testAnyWithTwoItems() {
119+
Observable<Integer> w = Observable.from(1, 2);
120+
Observable<Boolean> observable = Observable.create(any(w));
121+
122+
@SuppressWarnings("unchecked")
123+
Observer<Boolean> aObserver = mock(Observer.class);
124+
observable.subscribe(aObserver);
125+
verify(aObserver, never()).onNext(false);
126+
verify(aObserver, times(1)).onNext(true);
127+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
128+
verify(aObserver, times(1)).onCompleted();
129+
}
130+
131+
@Test
132+
public void testIsEmptyWithTwoItems() {
133+
Observable<Integer> w = Observable.from(1, 2);
134+
Observable<Boolean> observable = Observable.create(isEmpty(w));
135+
136+
@SuppressWarnings("unchecked")
137+
Observer<Boolean> aObserver = mock(Observer.class);
138+
observable.subscribe(aObserver);
139+
verify(aObserver, never()).onNext(true);
140+
verify(aObserver, times(1)).onNext(false);
141+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
142+
verify(aObserver, times(1)).onCompleted();
143+
}
144+
145+
@Test
146+
public void testAnyWithOneItem() {
147+
Observable<Integer> w = Observable.from(1);
148+
Observable<Boolean> observable = Observable.create(any(w));
149+
150+
@SuppressWarnings("unchecked")
151+
Observer<Boolean> aObserver = mock(Observer.class);
152+
observable.subscribe(aObserver);
153+
verify(aObserver, never()).onNext(false);
154+
verify(aObserver, times(1)).onNext(true);
155+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
156+
verify(aObserver, times(1)).onCompleted();
157+
}
158+
159+
@Test
160+
public void testIsEmptyWithOneItem() {
161+
Observable<Integer> w = Observable.from(1);
162+
Observable<Boolean> observable = Observable.create(isEmpty(w));
163+
164+
@SuppressWarnings("unchecked")
165+
Observer<Boolean> aObserver = mock(Observer.class);
166+
observable.subscribe(aObserver);
167+
verify(aObserver, never()).onNext(true);
168+
verify(aObserver, times(1)).onNext(false);
169+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
170+
verify(aObserver, times(1)).onCompleted();
171+
}
172+
173+
@Test
174+
public void testAnyWithEmpty() {
175+
Observable<Integer> w = Observable.empty();
176+
Observable<Boolean> observable = Observable.create(any(w));
177+
178+
@SuppressWarnings("unchecked")
179+
Observer<Boolean> aObserver = mock(Observer.class);
180+
observable.subscribe(aObserver);
181+
verify(aObserver, times(1)).onNext(false);
182+
verify(aObserver, never()).onNext(true);
183+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
184+
verify(aObserver, times(1)).onCompleted();
185+
}
186+
187+
@Test
188+
public void testIsEmptyWithEmpty() {
189+
Observable<Integer> w = Observable.empty();
190+
Observable<Boolean> observable = Observable.create(isEmpty(w));
191+
192+
@SuppressWarnings("unchecked")
193+
Observer<Boolean> aObserver = mock(Observer.class);
194+
observable.subscribe(aObserver);
195+
verify(aObserver, times(1)).onNext(true);
196+
verify(aObserver, never()).onNext(false);
197+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
198+
verify(aObserver, times(1)).onCompleted();
199+
}
200+
201+
@Test
202+
public void testAnyWithPredicate1() {
203+
Observable<Integer> w = Observable.from(1, 2, 3);
204+
Observable<Boolean> observable = Observable.create(any(w,
205+
new Func1<Integer, Boolean>() {
206+
207+
@Override
208+
public Boolean call(Integer t1) {
209+
return t1 < 2;
210+
}
211+
}));
212+
213+
@SuppressWarnings("unchecked")
214+
Observer<Boolean> aObserver = mock(Observer.class);
215+
observable.subscribe(aObserver);
216+
verify(aObserver, never()).onNext(false);
217+
verify(aObserver, times(1)).onNext(true);
218+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
219+
verify(aObserver, times(1)).onCompleted();
220+
}
221+
222+
@Test
223+
public void testExists1() {
224+
Observable<Integer> w = Observable.from(1, 2, 3);
225+
Observable<Boolean> observable = Observable.create(exists(w,
226+
new Func1<Integer, Boolean>() {
227+
228+
@Override
229+
public Boolean call(Integer t1) {
230+
return t1 < 2;
231+
}
232+
}));
233+
234+
@SuppressWarnings("unchecked")
235+
Observer<Boolean> aObserver = mock(Observer.class);
236+
observable.subscribe(aObserver);
237+
verify(aObserver, never()).onNext(false);
238+
verify(aObserver, times(1)).onNext(true);
239+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
240+
verify(aObserver, times(1)).onCompleted();
241+
}
242+
243+
@Test
244+
public void testAnyWithPredicate2() {
245+
Observable<Integer> w = Observable.from(1, 2, 3);
246+
Observable<Boolean> observable = Observable.create(any(w,
247+
new Func1<Integer, Boolean>() {
248+
249+
@Override
250+
public Boolean call(Integer t1) {
251+
return t1 < 1;
252+
}
253+
}));
254+
255+
@SuppressWarnings("unchecked")
256+
Observer<Boolean> aObserver = mock(Observer.class);
257+
observable.subscribe(aObserver);
258+
verify(aObserver, times(1)).onNext(false);
259+
verify(aObserver, never()).onNext(true);
260+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
261+
verify(aObserver, times(1)).onCompleted();
262+
}
263+
264+
@Test
265+
public void testAnyWithEmptyAndPredicate() {
266+
// If the source is empty, always output false.
267+
Observable<Integer> w = Observable.empty();
268+
Observable<Boolean> observable = Observable.create(any(w,
269+
new Func1<Integer, Boolean>() {
270+
271+
@Override
272+
public Boolean call(Integer t1) {
273+
return true;
274+
}
275+
}));
276+
277+
@SuppressWarnings("unchecked")
278+
Observer<Boolean> aObserver = mock(Observer.class);
279+
observable.subscribe(aObserver);
280+
verify(aObserver, times(1)).onNext(false);
281+
verify(aObserver, never()).onNext(true);
282+
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
283+
verify(aObserver, times(1)).onCompleted();
284+
}
285+
}
286+
}

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.subjects;
1717

18-
import static org.mockito.Matchers.*;
1918
import static org.mockito.Mockito.*;
2019

2120
import java.util.concurrent.ConcurrentHashMap;

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
2019
import static org.mockito.Mockito.*;
2120

2221
import java.util.ArrayList;

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.subjects;
1717

18-
import static org.mockito.Matchers.*;
1918
import static org.mockito.Mockito.*;
2019

2120
import java.util.ArrayList;

0 commit comments

Comments
 (0)