Skip to content

Commit 6e6c771

Browse files
author
Alex Wenckus
committed
Addressing review feedback:
Ensure that we unsubscribe upstream "parent" when switching to alternate. That way upstream will trigger unsubscribe when the first Observable completes. Added test. Child should contain downstream subscriptions - not parent.
1 parent fc7c171 commit 6e6c771

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

src/main/java/rx/Observable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3739,17 +3739,20 @@ public final Observable<T> defaultIfEmpty(T defaultValue) {
37393739
/**
37403740
* Returns an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
37413741
* is empty.
3742-
* <p>
3742+
* <p/>
37433743
* <dl>
37443744
* <dt><b>Scheduler:</b></dt>
37453745
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
3746+
* <dt><b>Beta:</b></dt>
3747+
* <dd>{@code switchIfEmpty} is currently in {@link rx.annotations.Beta} and subject to change.</dd>
37463748
* </dl>
37473749
*
37483750
* @param alternate
37493751
* the alternate Observable to subscribe to if the source does not emit any items
37503752
* @return an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
37513753
* is empty.
37523754
*/
3755+
@Beta
37533756
public final Observable<T> switchIfEmpty(Observable<T> alternate) {
37543757
return lift(new OperatorSwitchIfEmpty<T>(alternate));
37553758
}

src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ public OperatorSwitchIfEmpty(Observable<T> alternate) {
3535

3636
@Override
3737
public Subscriber<? super T> call(Subscriber<? super T> child) {
38-
return new SwitchIfEmptySubscriber(child);
38+
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child);
39+
child.add(parent);
40+
return parent;
3941
}
4042

4143
private class SwitchIfEmptySubscriber extends Subscriber<T> {
@@ -46,8 +48,6 @@ private class SwitchIfEmptySubscriber extends Subscriber<T> {
4648
private final Subscriber<? super T> child;
4749

4850
public SwitchIfEmptySubscriber(Subscriber<? super T> child) {
49-
super(child);
50-
5151
this.child = child;
5252
}
5353

@@ -69,12 +69,13 @@ public void onCompleted() {
6969
if (!empty) {
7070
child.onCompleted();
7171
} else if (!child.isUnsubscribed()) {
72+
unsubscribe();
7273
subscribeToAlternate();
7374
}
7475
}
7576

7677
private void subscribeToAlternate() {
77-
add(alternate.unsafeSubscribe(new Subscriber<T>() {
78+
child.add(alternate.unsafeSubscribe(new Subscriber<T>() {
7879
@Override
7980
public void onStart() {
8081
final long capacity = consumerCapacity.get();

src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import rx.Subscriber;
2222
import rx.Subscription;
2323
import rx.functions.Action0;
24-
import rx.functions.Action1;
25-
import rx.schedulers.Schedulers;
2624
import rx.subscriptions.Subscriptions;
2725

2826
import java.util.Arrays;
@@ -115,4 +113,18 @@ public void onNext(Long aLong) {
115113
assertTrue(empty.isUnsubscribed());
116114
assertTrue(sub.isUnsubscribed());
117115
}
116+
117+
@Test
118+
public void testSwitchShouldTriggerUnsubscribe() {
119+
final Subscription s = Subscriptions.empty();
120+
121+
Observable.create(new Observable.OnSubscribe<Long>() {
122+
@Override
123+
public void call(final Subscriber<? super Long> subscriber) {
124+
subscriber.add(s);
125+
subscriber.onCompleted();
126+
}
127+
}).switchIfEmpty(Observable.<Long>never()).subscribe();
128+
assertTrue(s.isUnsubscribed());
129+
}
118130
}

0 commit comments

Comments
 (0)