Skip to content

Commit f2a59c6

Browse files
author
Alex Wenckus
committed
Fix for back pressure on the alternate subscription.
1 parent 6e6c771 commit f2a59c6

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,17 @@ public void onCompleted() {
7676

7777
private void subscribeToAlternate() {
7878
child.add(alternate.unsafeSubscribe(new Subscriber<T>() {
79+
80+
@Override
81+
public void setProducer(final Producer producer) {
82+
child.setProducer(new Producer() {
83+
@Override
84+
public void request(long n) {
85+
producer.request(n);
86+
}
87+
});
88+
}
89+
7990
@Override
8091
public void onStart() {
8192
final long capacity = consumerCapacity.get();

src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import rx.functions.Action0;
2424
import rx.subscriptions.Subscriptions;
2525

26+
import java.util.ArrayList;
2627
import java.util.Arrays;
28+
import java.util.List;
2729
import java.util.concurrent.atomic.AtomicBoolean;
2830

2931
import static org.junit.Assert.assertEquals;
@@ -56,13 +58,15 @@ public void testSwitchWhenEmpty() throws Exception {
5658

5759
@Test
5860
public void testSwitchWithProducer() throws Exception {
61+
final AtomicBoolean emitted = new AtomicBoolean(false);
5962
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
6063
@Override
6164
public void call(final Subscriber<? super Long> subscriber) {
6265
subscriber.setProducer(new Producer() {
6366
@Override
6467
public void request(long n) {
65-
if (n > 0) {
68+
if (n > 0 && !emitted.get()) {
69+
emitted.set(true);
6670
subscriber.onNext(42L);
6771
subscriber.onCompleted();
6872
}
@@ -127,4 +131,33 @@ public void call(final Subscriber<? super Long> subscriber) {
127131
}).switchIfEmpty(Observable.<Long>never()).subscribe();
128132
assertTrue(s.isUnsubscribed());
129133
}
134+
135+
@Test
136+
public void testSwitchRequestAlternativeObservableWithBackpressure() {
137+
final List<Integer> items = new ArrayList<Integer>();
138+
139+
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {
140+
141+
@Override
142+
public void onStart() {
143+
request(1);
144+
}
145+
146+
@Override
147+
public void onCompleted() {
148+
149+
}
150+
151+
@Override
152+
public void onError(Throwable e) {
153+
154+
}
155+
156+
@Override
157+
public void onNext(Integer integer) {
158+
items.add(integer);
159+
}
160+
});
161+
assertEquals(Arrays.asList(1), items);
162+
}
130163
}

0 commit comments

Comments
 (0)