Skip to content

Commit e934c81

Browse files
committed
Merge pull request #2585 from akarnokd/switch_if_empty
Operator: switchIfEmpty
2 parents 039b270 + c4f3ac3 commit e934c81

File tree

3 files changed

+309
-0
lines changed

3 files changed

+309
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3748,6 +3748,27 @@ public final Observable<T> defaultIfEmpty(T defaultValue) {
37483748
return lift(new OperatorDefaultIfEmpty<T>(defaultValue));
37493749
}
37503750

3751+
/**
3752+
* Returns an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
3753+
* is empty.
3754+
* <p/>
3755+
* <dl>
3756+
* <dt><b>Scheduler:</b></dt>
3757+
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
3758+
* <dt><b>Beta:</b></dt>
3759+
* <dd>{@code switchIfEmpty} is currently in {@link rx.annotations.Beta} and subject to change.</dd>
3760+
* </dl>
3761+
*
3762+
* @param alternate
3763+
* the alternate Observable to subscribe to if the source does not emit any items
3764+
* @return an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
3765+
* is empty.
3766+
*/
3767+
@Experimental
3768+
public final Observable<T> switchIfEmpty(Observable<? extends T> alternate) {
3769+
return lift(new OperatorSwitchIfEmpty<T>(alternate));
3770+
}
3771+
37513772
/**
37523773
* Returns an Observable that delays the subscription to and emissions from the souce Observable via another
37533774
* Observable on a per-item basis.
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.AtomicLong;
19+
20+
import rx.*;
21+
import rx.subscriptions.SerialSubscription;
22+
23+
/**
24+
* If the Observable completes without emitting any items, subscribe to an alternate Observable. Allows for similar
25+
* functionality to {@link rx.internal.operators.OperatorDefaultIfEmpty} except instead of one item being emitted when
26+
* empty, the results of the given Observable will be emitted.
27+
*/
28+
public final class OperatorSwitchIfEmpty<T> implements Observable.Operator<T, T> {
29+
private final Observable<? extends T> alternate;
30+
31+
public OperatorSwitchIfEmpty(Observable<? extends T> alternate) {
32+
this.alternate = alternate;
33+
}
34+
35+
@Override
36+
public Subscriber<? super T> call(Subscriber<? super T> child) {
37+
final SerialSubscription ssub = new SerialSubscription();
38+
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
39+
ssub.set(parent);
40+
child.add(ssub);
41+
return parent;
42+
}
43+
44+
private class SwitchIfEmptySubscriber extends Subscriber<T> {
45+
46+
boolean empty = true;
47+
final AtomicLong consumerCapacity = new AtomicLong(0l);
48+
49+
private final Subscriber<? super T> child;
50+
final SerialSubscription ssub;
51+
52+
public SwitchIfEmptySubscriber(Subscriber<? super T> child, final SerialSubscription ssub) {
53+
this.child = child;
54+
this.ssub = ssub;
55+
}
56+
57+
@Override
58+
public void setProducer(final Producer producer) {
59+
super.setProducer(new Producer() {
60+
@Override
61+
public void request(long n) {
62+
if (empty) {
63+
consumerCapacity.set(n);
64+
}
65+
producer.request(n);
66+
}
67+
});
68+
}
69+
70+
@Override
71+
public void onCompleted() {
72+
if (!empty) {
73+
child.onCompleted();
74+
} else if (!child.isUnsubscribed()) {
75+
subscribeToAlternate();
76+
}
77+
}
78+
79+
private void subscribeToAlternate() {
80+
ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {
81+
82+
@Override
83+
public void setProducer(final Producer producer) {
84+
child.setProducer(new Producer() {
85+
@Override
86+
public void request(long n) {
87+
producer.request(n);
88+
}
89+
});
90+
}
91+
92+
@Override
93+
public void onStart() {
94+
final long capacity = consumerCapacity.get();
95+
if (capacity > 0) {
96+
request(capacity);
97+
}
98+
}
99+
100+
@Override
101+
public void onCompleted() {
102+
child.onCompleted();
103+
}
104+
105+
@Override
106+
public void onError(Throwable e) {
107+
child.onError(e);
108+
}
109+
110+
@Override
111+
public void onNext(T t) {
112+
child.onNext(t);
113+
}
114+
}));
115+
}
116+
117+
@Override
118+
public void onError(Throwable e) {
119+
child.onError(e);
120+
}
121+
122+
@Override
123+
public void onNext(T t) {
124+
empty = false;
125+
child.onNext(t);
126+
}
127+
}
128+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.*;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
23+
import org.junit.Test;
24+
25+
import rx.*;
26+
import rx.Observable;
27+
import rx.functions.Action0;
28+
import rx.observers.TestSubscriber;
29+
import rx.subscriptions.Subscriptions;
30+
31+
public class OperatorSwitchIfEmptyTest {
32+
33+
@Test
34+
public void testSwitchWhenNotEmpty() throws Exception {
35+
final AtomicBoolean subscribed = new AtomicBoolean(false);
36+
final Observable<Integer> observable = Observable.just(4).switchIfEmpty(Observable.just(2)
37+
.doOnSubscribe(new Action0() {
38+
@Override
39+
public void call() {
40+
subscribed.set(true);
41+
}
42+
}));
43+
44+
assertEquals(4, observable.toBlocking().single().intValue());
45+
assertFalse(subscribed.get());
46+
}
47+
48+
@Test
49+
public void testSwitchWhenEmpty() throws Exception {
50+
final Observable<Integer> observable = Observable.<Integer>empty().switchIfEmpty(Observable.from(Arrays.asList(42)));
51+
52+
assertEquals(42, observable.toBlocking().single().intValue());
53+
}
54+
55+
@Test
56+
public void testSwitchWithProducer() throws Exception {
57+
final AtomicBoolean emitted = new AtomicBoolean(false);
58+
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
59+
@Override
60+
public void call(final Subscriber<? super Long> subscriber) {
61+
subscriber.setProducer(new Producer() {
62+
@Override
63+
public void request(long n) {
64+
if (n > 0 && !emitted.get()) {
65+
emitted.set(true);
66+
subscriber.onNext(42L);
67+
subscriber.onCompleted();
68+
}
69+
}
70+
});
71+
}
72+
});
73+
74+
final Observable<Long> observable = Observable.<Long>empty().switchIfEmpty(withProducer);
75+
assertEquals(42, observable.toBlocking().single().intValue());
76+
}
77+
78+
@Test
79+
public void testSwitchTriggerUnsubscribe() throws Exception {
80+
final Subscription empty = Subscriptions.empty();
81+
82+
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
83+
@Override
84+
public void call(final Subscriber<? super Long> subscriber) {
85+
subscriber.add(empty);
86+
subscriber.onNext(42L);
87+
}
88+
});
89+
90+
final Subscription sub = Observable.<Long>empty().switchIfEmpty(withProducer).lift(new Observable.Operator<Long, Long>() {
91+
@Override
92+
public Subscriber<? super Long> call(final Subscriber<? super Long> child) {
93+
return new Subscriber<Long>(child) {
94+
@Override
95+
public void onCompleted() {
96+
97+
}
98+
99+
@Override
100+
public void onError(Throwable e) {
101+
102+
}
103+
104+
@Override
105+
public void onNext(Long aLong) {
106+
unsubscribe();
107+
}
108+
};
109+
}
110+
}).subscribe();
111+
112+
113+
assertTrue(empty.isUnsubscribed());
114+
assertTrue(sub.isUnsubscribed());
115+
}
116+
117+
@Test
118+
public void testSwitchShouldTriggerUnsubscribe() {
119+
final Subscription s = Subscriptions.empty();
120+
121+
Observable.create(new Observable.OnSubscribe<Long>() {
122+
@Override
123+
public void call(final Subscriber<? super Long> subscriber) {
124+
subscriber.add(s);
125+
subscriber.onCompleted();
126+
}
127+
}).switchIfEmpty(Observable.<Long>never()).subscribe();
128+
assertTrue(s.isUnsubscribed());
129+
}
130+
131+
@Test
132+
public void testSwitchRequestAlternativeObservableWithBackpressure() {
133+
134+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
135+
136+
@Override
137+
public void onStart() {
138+
request(1);
139+
}
140+
};
141+
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);
142+
143+
assertEquals(Arrays.asList(1), ts.getOnNextEvents());
144+
ts.assertNoErrors();
145+
}
146+
@Test
147+
public void testBackpressureNoRequest() {
148+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
149+
150+
@Override
151+
public void onStart() {
152+
request(0);
153+
}
154+
};
155+
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);
156+
157+
assertTrue(ts.getOnNextEvents().isEmpty());
158+
ts.assertNoErrors();
159+
}
160+
}

0 commit comments

Comments
 (0)