11package rx .operators ;
22
3- import static org .mockito .Mockito .mock ;
4- import static org .mockito .Mockito .never ;
5- import static org .mockito .Mockito .times ;
6- import static org .mockito .Mockito .verify ;
7- import static rx .util .functions .Functions .alwaysTrue ;
3+ import static org .mockito .Mockito .*;
4+ import static rx .util .functions .Functions .*;
85
96import java .util .concurrent .atomic .AtomicBoolean ;
107
2320public final class OperationAny {
2421
2522 /**
26- * Returns an {@link Observable} that emits <code>true</code> if the source
27- * {@link Observable} is not empty, otherwise <code>false</code>.
23+ * Returns an {@link Observable} that emits <code>true</code> if the source {@link Observable} is not empty, otherwise <code>false</code>.
2824 *
2925 * @param source
3026 * The source {@link Observable} to check if not empty.
3127 * @return A subscription function for creating the target Observable.
3228 */
33- public static <T > OnSubscribeFunc <Boolean > any (
34- Observable <? extends T > source ) {
35- return new Any <T >(source , alwaysTrue ());
29+ public static <T > OnSubscribeFunc <Boolean > any (Observable <? extends T > source ) {
30+ return new Any <T >(source , alwaysTrue (), false );
31+ }
32+
33+ public static <T > OnSubscribeFunc <Boolean > isEmpty (Observable <? extends T > source ) {
34+ return new Any <T >(source , alwaysTrue (), true );
3635 }
3736
3837 /**
3938 * Returns an {@link Observable} that emits <code>true</code> if any element
4039 * of the source {@link Observable} satisfies the given condition, otherwise
41- * <code>false</code>. Note: always emit <code>false</code> if the source
42- * {@link Observable} is empty.
40+ * <code>false</code>. Note: always emit <code>false</code> if the source {@link Observable} is empty.
4341 *
4442 * @param source
4543 * The source {@link Observable} to check if any element
@@ -48,42 +46,43 @@ public static <T> OnSubscribeFunc<Boolean> any(
4846 * The condition to test every element.
4947 * @return A subscription function for creating the target Observable.
5048 */
51- public static <T > OnSubscribeFunc <Boolean > any (
52- Observable <? extends T > source , Func1 <? super T , Boolean > predicate ) {
53- return new Any <T >(source , predicate );
49+ public static <T > OnSubscribeFunc <Boolean > any (Observable <? extends T > source , Func1 <? super T , Boolean > predicate ) {
50+ return new Any <T >(source , predicate , false );
51+ }
52+
53+ public static <T > OnSubscribeFunc <Boolean > exists (Observable <? extends T > source , Func1 <? super T , Boolean > predicate ) {
54+ return any (source , predicate );
5455 }
5556
5657 private static class Any <T > implements OnSubscribeFunc <Boolean > {
5758
5859 private final Observable <? extends T > source ;
5960 private final Func1 <? super T , Boolean > predicate ;
61+ private final boolean returnOnEmpty ;
6062
61- private Any (Observable <? extends T > source ,
62- Func1 <? super T , Boolean > predicate ) {
63+ private Any (Observable <? extends T > source , Func1 <? super T , Boolean > predicate , boolean returnOnEmpty ) {
6364 this .source = source ;
6465 this .predicate = predicate ;
66+ this .returnOnEmpty = returnOnEmpty ;
6567 }
6668
6769 @ Override
6870 public Subscription onSubscribe (final Observer <? super Boolean > observer ) {
6971 final SafeObservableSubscription subscription = new SafeObservableSubscription ();
7072 return subscription .wrap (source .subscribe (new Observer <T >() {
7173
72- private final AtomicBoolean hasEmitted = new AtomicBoolean (
73- false );
74+ private final AtomicBoolean hasEmitted = new AtomicBoolean (false );
7475
7576 @ Override
7677 public void onNext (T value ) {
7778 try {
7879 if (hasEmitted .get () == false ) {
7980 if (predicate .call (value ) == true
8081 && hasEmitted .getAndSet (true ) == false ) {
81- observer .onNext (true );
82+ observer .onNext (! returnOnEmpty );
8283 observer .onCompleted ();
83- // this will work if the sequence is
84- // asynchronous, it
85- // will have no effect on a synchronous
86- // observable
84+ // this will work if the sequence is asynchronous, it
85+ // will have no effect on a synchronous observable
8786 subscription .unsubscribe ();
8887 }
8988 }
@@ -104,7 +103,7 @@ public void onError(Throwable ex) {
104103 @ Override
105104 public void onCompleted () {
106105 if (!hasEmitted .get ()) {
107- observer .onNext (false );
106+ observer .onNext (returnOnEmpty );
108107 observer .onCompleted ();
109108 }
110109 }
@@ -125,8 +124,21 @@ public void testAnyWithTwoItems() {
125124 observable .subscribe (aObserver );
126125 verify (aObserver , never ()).onNext (false );
127126 verify (aObserver , times (1 )).onNext (true );
128- verify (aObserver , never ()).onError (
129- org .mockito .Matchers .any (Throwable .class ));
127+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
128+ verify (aObserver , times (1 )).onCompleted ();
129+ }
130+
131+ @ Test
132+ public void testIsEmptyWithTwoItems () {
133+ Observable <Integer > w = Observable .from (1 , 2 );
134+ Observable <Boolean > observable = Observable .create (isEmpty (w ));
135+
136+ @ SuppressWarnings ("unchecked" )
137+ Observer <Boolean > aObserver = mock (Observer .class );
138+ observable .subscribe (aObserver );
139+ verify (aObserver , never ()).onNext (true );
140+ verify (aObserver , times (1 )).onNext (false );
141+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
130142 verify (aObserver , times (1 )).onCompleted ();
131143 }
132144
@@ -140,8 +152,21 @@ public void testAnyWithOneItem() {
140152 observable .subscribe (aObserver );
141153 verify (aObserver , never ()).onNext (false );
142154 verify (aObserver , times (1 )).onNext (true );
143- verify (aObserver , never ()).onError (
144- org .mockito .Matchers .any (Throwable .class ));
155+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
156+ verify (aObserver , times (1 )).onCompleted ();
157+ }
158+
159+ @ Test
160+ public void testIsEmptyWithOneItem () {
161+ Observable <Integer > w = Observable .from (1 );
162+ Observable <Boolean > observable = Observable .create (isEmpty (w ));
163+
164+ @ SuppressWarnings ("unchecked" )
165+ Observer <Boolean > aObserver = mock (Observer .class );
166+ observable .subscribe (aObserver );
167+ verify (aObserver , never ()).onNext (true );
168+ verify (aObserver , times (1 )).onNext (false );
169+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
145170 verify (aObserver , times (1 )).onCompleted ();
146171 }
147172
@@ -155,8 +180,21 @@ public void testAnyWithEmpty() {
155180 observable .subscribe (aObserver );
156181 verify (aObserver , times (1 )).onNext (false );
157182 verify (aObserver , never ()).onNext (true );
158- verify (aObserver , never ()).onError (
159- org .mockito .Matchers .any (Throwable .class ));
183+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
184+ verify (aObserver , times (1 )).onCompleted ();
185+ }
186+
187+ @ Test
188+ public void testIsEmptyWithEmpty () {
189+ Observable <Integer > w = Observable .empty ();
190+ Observable <Boolean > observable = Observable .create (isEmpty (w ));
191+
192+ @ SuppressWarnings ("unchecked" )
193+ Observer <Boolean > aObserver = mock (Observer .class );
194+ observable .subscribe (aObserver );
195+ verify (aObserver , times (1 )).onNext (true );
196+ verify (aObserver , never ()).onNext (false );
197+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
160198 verify (aObserver , times (1 )).onCompleted ();
161199 }
162200
@@ -177,11 +215,31 @@ public Boolean call(Integer t1) {
177215 observable .subscribe (aObserver );
178216 verify (aObserver , never ()).onNext (false );
179217 verify (aObserver , times (1 )).onNext (true );
180- verify (aObserver , never ()).onError (
181- org .mockito .Matchers .any (Throwable .class ));
218+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
182219 verify (aObserver , times (1 )).onCompleted ();
183220 }
184221
222+ @ Test
223+ public void testExists1 () {
224+ Observable <Integer > w = Observable .from (1 , 2 , 3 );
225+ Observable <Boolean > observable = Observable .create (exists (w ,
226+ new Func1 <Integer , Boolean >() {
227+
228+ @ Override
229+ public Boolean call (Integer t1 ) {
230+ return t1 < 2 ;
231+ }
232+ }));
233+
234+ @ SuppressWarnings ("unchecked" )
235+ Observer <Boolean > aObserver = mock (Observer .class );
236+ observable .subscribe (aObserver );
237+ verify (aObserver , never ()).onNext (false );
238+ verify (aObserver , times (1 )).onNext (true );
239+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
240+ verify (aObserver , times (1 )).onCompleted ();
241+ }
242+
185243 @ Test
186244 public void testAnyWithPredicate2 () {
187245 Observable <Integer > w = Observable .from (1 , 2 , 3 );
@@ -199,8 +257,7 @@ public Boolean call(Integer t1) {
199257 observable .subscribe (aObserver );
200258 verify (aObserver , times (1 )).onNext (false );
201259 verify (aObserver , never ()).onNext (true );
202- verify (aObserver , never ()).onError (
203- org .mockito .Matchers .any (Throwable .class ));
260+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
204261 verify (aObserver , times (1 )).onCompleted ();
205262 }
206263
@@ -222,8 +279,7 @@ public Boolean call(Integer t1) {
222279 observable .subscribe (aObserver );
223280 verify (aObserver , times (1 )).onNext (false );
224281 verify (aObserver , never ()).onNext (true );
225- verify (aObserver , never ()).onError (
226- org .mockito .Matchers .any (Throwable .class ));
282+ verify (aObserver , never ()).onError (org .mockito .Matchers .any (Throwable .class ));
227283 verify (aObserver , times (1 )).onCompleted ();
228284 }
229285 }
0 commit comments