Skip to content

Commit 3991695

Browse files
committed
Fixed some concerns with the operator.
1 parent f2a59c6 commit 3991695

File tree

2 files changed

+36
-37
lines changed

2 files changed

+36
-37
lines changed

src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable;
19-
import rx.Producer;
20-
import rx.Subscriber;
21-
2218
import java.util.concurrent.atomic.AtomicLong;
2319

20+
import rx.*;
21+
import rx.subscriptions.SerialSubscription;
22+
2423
/**
2524
* If the Observable completes without emitting any items, subscribe to an alternate Observable. Allows for similar
2625
* functionality to {@link rx.internal.operators.OperatorDefaultIfEmpty} except instead of one item being emitted when
@@ -35,8 +34,10 @@ public OperatorSwitchIfEmpty(Observable<T> alternate) {
3534

3635
@Override
3736
public Subscriber<? super T> call(Subscriber<? super T> child) {
38-
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child);
39-
child.add(parent);
37+
final SerialSubscription ssub = new SerialSubscription();
38+
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
39+
ssub.set(parent);
40+
child.add(ssub);
4041
return parent;
4142
}
4243

@@ -46,9 +47,11 @@ private class SwitchIfEmptySubscriber extends Subscriber<T> {
4647
final AtomicLong consumerCapacity = new AtomicLong(0l);
4748

4849
private final Subscriber<? super T> child;
50+
final SerialSubscription ssub;
4951

50-
public SwitchIfEmptySubscriber(Subscriber<? super T> child) {
52+
public SwitchIfEmptySubscriber(Subscriber<? super T> child, final SerialSubscription ssub) {
5153
this.child = child;
54+
this.ssub = ssub;
5255
}
5356

5457
@Override
@@ -69,13 +72,12 @@ public void onCompleted() {
6972
if (!empty) {
7073
child.onCompleted();
7174
} else if (!child.isUnsubscribed()) {
72-
unsubscribe();
7375
subscribeToAlternate();
7476
}
7577
}
7678

7779
private void subscribeToAlternate() {
78-
child.add(alternate.unsafeSubscribe(new Subscriber<T>() {
80+
ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {
7981

8082
@Override
8183
public void setProducer(final Producer producer) {

src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,19 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.*;
19+
20+
import java.util.*;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
1823
import org.junit.Test;
24+
25+
import rx.*;
1926
import rx.Observable;
20-
import rx.Producer;
21-
import rx.Subscriber;
22-
import rx.Subscription;
2327
import rx.functions.Action0;
28+
import rx.observers.TestSubscriber;
2429
import rx.subscriptions.Subscriptions;
2530

26-
import java.util.ArrayList;
27-
import java.util.Arrays;
28-
import java.util.List;
29-
import java.util.concurrent.atomic.AtomicBoolean;
30-
31-
import static org.junit.Assert.assertEquals;
32-
import static org.junit.Assert.assertFalse;
33-
import static org.junit.Assert.assertTrue;
34-
3531
public class OperatorSwitchIfEmptyTest {
3632

3733
@Test
@@ -134,30 +130,31 @@ public void call(final Subscriber<? super Long> subscriber) {
134130

135131
@Test
136132
public void testSwitchRequestAlternativeObservableWithBackpressure() {
137-
final List<Integer> items = new ArrayList<Integer>();
138133

139-
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {
134+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
140135

141136
@Override
142137
public void onStart() {
143138
request(1);
144139
}
140+
};
141+
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);
142+
143+
assertEquals(Arrays.asList(1), ts.getOnNextEvents());
144+
ts.assertNoErrors();
145+
}
146+
@Test
147+
public void testBackpressureNoRequest() {
148+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
145149

146150
@Override
147-
public void onCompleted() {
148-
149-
}
150-
151-
@Override
152-
public void onError(Throwable e) {
153-
154-
}
155-
156-
@Override
157-
public void onNext(Integer integer) {
158-
items.add(integer);
151+
public void onStart() {
152+
request(0);
159153
}
160-
});
161-
assertEquals(Arrays.asList(1), items);
154+
};
155+
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);
156+
157+
assertTrue(ts.getOnNextEvents().isEmpty());
158+
ts.assertNoErrors();
162159
}
163160
}

0 commit comments

Comments
 (0)