Skip to content

Commit 52c6ca0

Browse files
committed
Merge pull request #1473 from mattrjacobs/handle-backpressure-in-any-operator
OperatorAny needs to handle backpressure
2 parents 9f45065 + 70d0308 commit 52c6ca0

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorAny.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
3939
return new Subscriber<T>(child) {
4040
boolean hasElements;
4141
boolean done;
42+
4243
@Override
4344
public void onNext(T t) {
4445
hasElements = true;
@@ -48,6 +49,9 @@ public void onNext(T t) {
4849
child.onNext(!returnOnEmpty);
4950
child.onCompleted();
5051
unsubscribe();
52+
} else {
53+
// if we drop values we must replace them upstream as downstream won't receive and request more
54+
request(1);
5155
}
5256
}
5357

@@ -68,7 +72,7 @@ public void onCompleted() {
6872
child.onCompleted();
6973
}
7074
}
71-
75+
7276
};
7377
}
7478
}

rxjava-core/src/test/java/rx/internal/operators/OperatorAnyTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertTrue;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.times;
@@ -27,6 +28,8 @@
2728
import rx.functions.Func1;
2829
import rx.functions.Functions;
2930

31+
import java.util.Arrays;
32+
3033
public class OperatorAnyTest {
3134

3235
@Test
@@ -197,4 +200,16 @@ public Boolean call(Integer t1) {
197200
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
198201
verify(observer, times(1)).onCompleted();
199202
}
203+
204+
@Test
205+
public void testWithFollowingFirst() {
206+
Observable<Integer> o = Observable.from(Arrays.asList(1, 3, 5, 6));
207+
Observable<Boolean> anyEven = o.exists(new Func1<Integer, Boolean>() {
208+
@Override
209+
public Boolean call(Integer i) {
210+
return i % 2 == 0;
211+
}
212+
});
213+
assertTrue(anyEven.toBlocking().first());
214+
}
200215
}

0 commit comments

Comments
 (0)