Skip to content

Commit e257ad3

Browse files
committed
Implemented the 'any' operator
1 parent 00d7c3b commit e257ad3

File tree

2 files changed

+265
-0
lines changed

2 files changed

+265
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import rx.operators.OperationZip;
7878
import rx.operators.SafeObservableSubscription;
7979
import rx.operators.SafeObserver;
80+
import rx.operators.OperationAny;
8081
import rx.plugins.RxJavaErrorHandler;
8182
import rx.plugins.RxJavaObservableExecutionHook;
8283
import rx.plugins.RxJavaPlugins;
@@ -4187,4 +4188,36 @@ private boolean isInternalImplementation(Object o) {
41874188
return p != null && p.getName().startsWith("rx.operators");
41884189
}
41894190

4191+
// /**
4192+
// * Returns an {@link Observable} that emits <code>true</code> if the source
4193+
// * {@link Observable} is not empty, otherwise <code>false</code>.
4194+
// *
4195+
// * @param source
4196+
// * The source {@link Observable} to check if not empty.
4197+
// * @return A subscription function for creating the target Observable.
4198+
// * @see <a href=
4199+
// * "http://msdn.microsoft.com/en-us/library/hh229905(v=vs.103).aspx"
4200+
// * >MSDN: Observable.Any</a>
4201+
// */
4202+
// public Observable<Boolean> any() {
4203+
// return create(OperationAny.any(this));
4204+
// }
4205+
//
4206+
// /**
4207+
// * Returns an {@link Observable} that emits <code>true</code> if all items
4208+
// * of the source {@link Observable} satisfy the given condition, otherwise
4209+
// * <code>false</code>.
4210+
// *
4211+
// * @param predicate
4212+
// * The condition all items have to satisfy.
4213+
// * @return A subscription function for creating the target Observable.
4214+
// *
4215+
// * @see <a href=
4216+
// * "http://msdn.microsoft.com/en-us/library/hh211993(v=vs.103).aspx"
4217+
// * >MSDN: Observable.Any</a>
4218+
// */
4219+
// public Observable<Boolean> any(Func1<? super T, Boolean> predicate) {
4220+
// return create(OperationAny.any(this, predicate));
4221+
// }
4222+
41904223
}
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package rx.operators;
2+
3+
import static org.mockito.Mockito.mock;
4+
import static org.mockito.Mockito.never;
5+
import static org.mockito.Mockito.times;
6+
import static org.mockito.Mockito.verify;
7+
import static rx.util.functions.Functions.alwaysTrue;
8+
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable;
14+
import rx.Observable.OnSubscribeFunc;
15+
import rx.Observer;
16+
import rx.Subscription;
17+
import rx.util.functions.Func1;
18+
19+
/**
20+
* Returns an {@link Observable} that emits <code>true</code> if all items of an
21+
* observable sequence satisfy a condition, otherwise <code>false</code>.
22+
*/
23+
public final class OperationAny {
24+
25+
/**
26+
* Returns an {@link Observable} that emits <code>true</code> if the source
27+
* {@link Observable} is not empty, otherwise <code>false</code>.
28+
*
29+
* @param source
30+
* The source {@link Observable} to check if not empty.
31+
* @return A subscription function for creating the target Observable.
32+
*/
33+
public static <T> OnSubscribeFunc<Boolean> any(
34+
Observable<? extends T> source) {
35+
return new Any<T>(source, alwaysTrue());
36+
}
37+
38+
/**
39+
* Returns an {@link Observable} that emits <code>true</code> if all items
40+
* of the source {@link Observable} satisfy the given condition, otherwise
41+
* <code>false</code>.
42+
*
43+
* @param source
44+
* The source {@link Observable} to check if all items in it
45+
* satisfy the given condition
46+
* @param predicate
47+
* The condition all items have to satisfy.
48+
* @return A subscription function for creating the target Observable.
49+
*/
50+
public static <T> OnSubscribeFunc<Boolean> any(
51+
Observable<? extends T> source, Func1<? super T, Boolean> predicate) {
52+
return new Any<T>(source, predicate);
53+
}
54+
55+
private static class Any<T> implements OnSubscribeFunc<Boolean> {
56+
57+
private final Observable<? extends T> source;
58+
private final Func1<? super T, Boolean> predicate;
59+
60+
private Any(Observable<? extends T> source,
61+
Func1<? super T, Boolean> predicate) {
62+
this.source = source;
63+
this.predicate = predicate;
64+
}
65+
66+
@Override
67+
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
68+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
69+
return subscription.wrap(source.subscribe(new Observer<T>() {
70+
71+
private final AtomicBoolean hasEmitted = new AtomicBoolean(
72+
false);
73+
74+
private volatile boolean isNotEmpty = false;
75+
76+
@Override
77+
public void onNext(T value) {
78+
isNotEmpty = true;
79+
try {
80+
if (hasEmitted.get() == false) {
81+
if (predicate.call(value) == false
82+
&& hasEmitted.getAndSet(true) == false) {
83+
observer.onNext(false);
84+
observer.onCompleted();
85+
// this will work if the sequence is
86+
// asynchronous, it
87+
// will have no effect on a synchronous
88+
// observable
89+
subscription.unsubscribe();
90+
}
91+
}
92+
} catch (Throwable ex) {
93+
observer.onError(ex);
94+
// this will work if the sequence is asynchronous, it
95+
// will have no effect on a synchronous observable
96+
subscription.unsubscribe();
97+
}
98+
99+
}
100+
101+
@Override
102+
public void onError(Throwable ex) {
103+
observer.onError(ex);
104+
}
105+
106+
@Override
107+
public void onCompleted() {
108+
if (!hasEmitted.get()) {
109+
observer.onNext(isNotEmpty);
110+
observer.onCompleted();
111+
}
112+
}
113+
}));
114+
}
115+
116+
}
117+
118+
public static class UnitTest {
119+
120+
@Test
121+
public void testAnyWithTwoItems() {
122+
Observable<Integer> w = Observable.from(1, 2);
123+
Observable<Boolean> observable = Observable.create(any(w));
124+
125+
@SuppressWarnings("unchecked")
126+
Observer<Boolean> aObserver = mock(Observer.class);
127+
observable.subscribe(aObserver);
128+
verify(aObserver, never()).onNext(false);
129+
verify(aObserver, times(1)).onNext(true);
130+
verify(aObserver, never()).onError(
131+
org.mockito.Matchers.any(Throwable.class));
132+
verify(aObserver, times(1)).onCompleted();
133+
}
134+
135+
@Test
136+
public void testAnyWithOneItem() {
137+
Observable<Integer> w = Observable.from(1);
138+
Observable<Boolean> observable = Observable.create(any(w));
139+
140+
@SuppressWarnings("unchecked")
141+
Observer<Boolean> aObserver = mock(Observer.class);
142+
observable.subscribe(aObserver);
143+
verify(aObserver, never()).onNext(false);
144+
verify(aObserver, times(1)).onNext(true);
145+
verify(aObserver, never()).onError(
146+
org.mockito.Matchers.any(Throwable.class));
147+
verify(aObserver, times(1)).onCompleted();
148+
}
149+
150+
@Test
151+
public void testAnyWithEmpty() {
152+
Observable<Integer> w = Observable.empty();
153+
Observable<Boolean> observable = Observable.create(any(w));
154+
155+
@SuppressWarnings("unchecked")
156+
Observer<Boolean> aObserver = mock(Observer.class);
157+
observable.subscribe(aObserver);
158+
verify(aObserver, times(1)).onNext(false);
159+
verify(aObserver, never()).onNext(true);
160+
verify(aObserver, never()).onError(
161+
org.mockito.Matchers.any(Throwable.class));
162+
verify(aObserver, times(1)).onCompleted();
163+
}
164+
165+
@Test
166+
public void testAnyWithPredicate1() {
167+
Observable<Integer> w = Observable.from(1, 2);
168+
Observable<Boolean> observable = Observable.create(any(w,
169+
new Func1<Integer, Boolean>() {
170+
171+
@Override
172+
public Boolean call(Integer t1) {
173+
return t1 < 3;
174+
}
175+
}));
176+
177+
@SuppressWarnings("unchecked")
178+
Observer<Boolean> aObserver = mock(Observer.class);
179+
observable.subscribe(aObserver);
180+
verify(aObserver, never()).onNext(false);
181+
verify(aObserver, times(1)).onNext(true);
182+
verify(aObserver, never()).onError(
183+
org.mockito.Matchers.any(Throwable.class));
184+
verify(aObserver, times(1)).onCompleted();
185+
}
186+
187+
@Test
188+
public void testAnyWithPredicate2() {
189+
Observable<Integer> w = Observable.from(1, 2, 3);
190+
Observable<Boolean> observable = Observable.create(any(w,
191+
new Func1<Integer, Boolean>() {
192+
193+
@Override
194+
public Boolean call(Integer t1) {
195+
return t1 < 3;
196+
}
197+
}));
198+
199+
@SuppressWarnings("unchecked")
200+
Observer<Boolean> aObserver = mock(Observer.class);
201+
observable.subscribe(aObserver);
202+
verify(aObserver, times(1)).onNext(false);
203+
verify(aObserver, never()).onNext(true);
204+
verify(aObserver, never()).onError(
205+
org.mockito.Matchers.any(Throwable.class));
206+
verify(aObserver, times(1)).onCompleted();
207+
}
208+
209+
@Test
210+
public void testAnyWithEmptyAndPredicate() {
211+
// If the source is empty, always output false.
212+
Observable<Integer> w = Observable.empty();
213+
Observable<Boolean> observable = Observable.create(any(w,
214+
new Func1<Integer, Boolean>() {
215+
216+
@Override
217+
public Boolean call(Integer t1) {
218+
return true;
219+
}
220+
}));
221+
222+
@SuppressWarnings("unchecked")
223+
Observer<Boolean> aObserver = mock(Observer.class);
224+
observable.subscribe(aObserver);
225+
verify(aObserver, times(1)).onNext(false);
226+
verify(aObserver, never()).onNext(true);
227+
verify(aObserver, never()).onError(
228+
org.mockito.Matchers.any(Throwable.class));
229+
verify(aObserver, times(1)).onCompleted();
230+
}
231+
}
232+
}

0 commit comments

Comments
 (0)