Skip to content

Commit fc7c171

Browse files
author
Alex Wenckus
committed
Added Operator switchIfEmpty, like defaultIfEmpty but subscribes to and emits the items in an Observable if the source is empty
1 parent 9cae739 commit fc7c171

File tree

3 files changed

+250
-0
lines changed

3 files changed

+250
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3736,6 +3736,24 @@ public final Observable<T> defaultIfEmpty(T defaultValue) {
37363736
return lift(new OperatorDefaultIfEmpty<T>(defaultValue));
37373737
}
37383738

3739+
/**
3740+
* Returns an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
3741+
* is empty.
3742+
* <p>
3743+
* <dl>
3744+
* <dt><b>Scheduler:</b></dt>
3745+
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
3746+
* </dl>
3747+
*
3748+
* @param alternate
3749+
* the alternate Observable to subscribe to if the source does not emit any items
3750+
* @return an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
3751+
* is empty.
3752+
*/
3753+
public final Observable<T> switchIfEmpty(Observable<T> alternate) {
3754+
return lift(new OperatorSwitchIfEmpty<T>(alternate));
3755+
}
3756+
37393757
/**
37403758
* Returns an Observable that delays the subscription to and emissions from the souce Observable via another
37413759
* Observable on a per-item basis.
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Producer;
20+
import rx.Subscriber;
21+
22+
import java.util.concurrent.atomic.AtomicLong;
23+
24+
/**
25+
* If the Observable completes without emitting any items, subscribe to an alternate Observable. Allows for similar
26+
* functionality to {@link rx.internal.operators.OperatorDefaultIfEmpty} except instead of one item being emitted when
27+
* empty, the results of the given Observable will be emitted.
28+
*/
29+
public class OperatorSwitchIfEmpty<T> implements Observable.Operator<T, T> {
30+
private final Observable<T> alternate;
31+
32+
public OperatorSwitchIfEmpty(Observable<T> alternate) {
33+
this.alternate = alternate;
34+
}
35+
36+
@Override
37+
public Subscriber<? super T> call(Subscriber<? super T> child) {
38+
return new SwitchIfEmptySubscriber(child);
39+
}
40+
41+
private class SwitchIfEmptySubscriber extends Subscriber<T> {
42+
43+
boolean empty = true;
44+
final AtomicLong consumerCapacity = new AtomicLong(0l);
45+
46+
private final Subscriber<? super T> child;
47+
48+
public SwitchIfEmptySubscriber(Subscriber<? super T> child) {
49+
super(child);
50+
51+
this.child = child;
52+
}
53+
54+
@Override
55+
public void setProducer(final Producer producer) {
56+
super.setProducer(new Producer() {
57+
@Override
58+
public void request(long n) {
59+
if (empty) {
60+
consumerCapacity.set(n);
61+
}
62+
producer.request(n);
63+
}
64+
});
65+
}
66+
67+
@Override
68+
public void onCompleted() {
69+
if (!empty) {
70+
child.onCompleted();
71+
} else if (!child.isUnsubscribed()) {
72+
subscribeToAlternate();
73+
}
74+
}
75+
76+
private void subscribeToAlternate() {
77+
add(alternate.unsafeSubscribe(new Subscriber<T>() {
78+
@Override
79+
public void onStart() {
80+
final long capacity = consumerCapacity.get();
81+
if (capacity > 0) {
82+
request(capacity);
83+
}
84+
}
85+
86+
@Override
87+
public void onCompleted() {
88+
child.onCompleted();
89+
}
90+
91+
@Override
92+
public void onError(Throwable e) {
93+
child.onError(e);
94+
}
95+
96+
@Override
97+
public void onNext(T t) {
98+
child.onNext(t);
99+
}
100+
}));
101+
}
102+
103+
@Override
104+
public void onError(Throwable e) {
105+
child.onError(e);
106+
}
107+
108+
@Override
109+
public void onNext(T t) {
110+
empty = false;
111+
child.onNext(t);
112+
}
113+
}
114+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import org.junit.Test;
19+
import rx.Observable;
20+
import rx.Producer;
21+
import rx.Subscriber;
22+
import rx.Subscription;
23+
import rx.functions.Action0;
24+
import rx.functions.Action1;
25+
import rx.schedulers.Schedulers;
26+
import rx.subscriptions.Subscriptions;
27+
28+
import java.util.Arrays;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
import static org.junit.Assert.assertEquals;
32+
import static org.junit.Assert.assertFalse;
33+
import static org.junit.Assert.assertTrue;
34+
35+
public class OperatorSwitchIfEmptyTest {
36+
37+
@Test
38+
public void testSwitchWhenNotEmpty() throws Exception {
39+
final AtomicBoolean subscribed = new AtomicBoolean(false);
40+
final Observable<Integer> observable = Observable.just(4).switchIfEmpty(Observable.just(2)
41+
.doOnSubscribe(new Action0() {
42+
@Override
43+
public void call() {
44+
subscribed.set(true);
45+
}
46+
}));
47+
48+
assertEquals(4, observable.toBlocking().single().intValue());
49+
assertFalse(subscribed.get());
50+
}
51+
52+
@Test
53+
public void testSwitchWhenEmpty() throws Exception {
54+
final Observable<Integer> observable = Observable.<Integer>empty().switchIfEmpty(Observable.from(Arrays.asList(42)));
55+
56+
assertEquals(42, observable.toBlocking().single().intValue());
57+
}
58+
59+
@Test
60+
public void testSwitchWithProducer() throws Exception {
61+
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
62+
@Override
63+
public void call(final Subscriber<? super Long> subscriber) {
64+
subscriber.setProducer(new Producer() {
65+
@Override
66+
public void request(long n) {
67+
if (n > 0) {
68+
subscriber.onNext(42L);
69+
subscriber.onCompleted();
70+
}
71+
}
72+
});
73+
}
74+
});
75+
76+
final Observable<Long> observable = Observable.<Long>empty().switchIfEmpty(withProducer);
77+
assertEquals(42, observable.toBlocking().single().intValue());
78+
}
79+
80+
@Test
81+
public void testSwitchTriggerUnsubscribe() throws Exception {
82+
final Subscription empty = Subscriptions.empty();
83+
84+
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
85+
@Override
86+
public void call(final Subscriber<? super Long> subscriber) {
87+
subscriber.add(empty);
88+
subscriber.onNext(42L);
89+
}
90+
});
91+
92+
final Subscription sub = Observable.<Long>empty().switchIfEmpty(withProducer).lift(new Observable.Operator<Long, Long>() {
93+
@Override
94+
public Subscriber<? super Long> call(final Subscriber<? super Long> child) {
95+
return new Subscriber<Long>(child) {
96+
@Override
97+
public void onCompleted() {
98+
99+
}
100+
101+
@Override
102+
public void onError(Throwable e) {
103+
104+
}
105+
106+
@Override
107+
public void onNext(Long aLong) {
108+
unsubscribe();
109+
}
110+
};
111+
}
112+
}).subscribe();
113+
114+
115+
assertTrue(empty.isUnsubscribed());
116+
assertTrue(sub.isUnsubscribed());
117+
}
118+
}

0 commit comments

Comments
 (0)