1616package rx .internal .operators ;
1717
1818
19+ import java .util .concurrent .atomic .AtomicInteger ;
20+
1921import rx .*;
2022import rx .internal .producers .ProducerArbiter ;
2123import rx .subscriptions .SerialSubscription ;
2628 * empty, the results of the given Observable will be emitted.
2729 * @param <T> the value type
2830 */
29- public final class OperatorSwitchIfEmpty <T > implements Observable .Operator <T , T > {
30- private final Observable <? extends T > alternate ;
31+ public final class OnSubscribeSwitchIfEmpty <T > implements Observable .OnSubscribe <T > {
32+
33+ final Observable <? extends T > source ;
3134
32- public OperatorSwitchIfEmpty (Observable <? extends T > alternate ) {
35+ final Observable <? extends T > alternate ;
36+
37+ public OnSubscribeSwitchIfEmpty (Observable <? extends T > source , Observable <? extends T > alternate ) {
38+ this .source = source ;
3339 this .alternate = alternate ;
3440 }
3541
3642 @ Override
37- public Subscriber <? super T > call (Subscriber <? super T > child ) {
43+ public void call (Subscriber <? super T > child ) {
3844 final SerialSubscription serial = new SerialSubscription ();
3945 ProducerArbiter arbiter = new ProducerArbiter ();
4046 final ParentSubscriber <T > parent = new ParentSubscriber <T >(child , serial , arbiter , alternate );
47+
4148 serial .set (parent );
4249 child .add (serial );
4350 child .setProducer (arbiter );
44- return parent ;
51+
52+ parent .subscribe (source );
4553 }
4654
4755 static final class ParentSubscriber <T > extends Subscriber <T > {
@@ -52,11 +60,15 @@ static final class ParentSubscriber<T> extends Subscriber<T> {
5260 private final ProducerArbiter arbiter ;
5361 private final Observable <? extends T > alternate ;
5462
63+ final AtomicInteger wip ;
64+ volatile boolean active ;
65+
5566 ParentSubscriber (Subscriber <? super T > child , final SerialSubscription serial , ProducerArbiter arbiter , Observable <? extends T > alternate ) {
5667 this .child = child ;
5768 this .serial = serial ;
5869 this .arbiter = arbiter ;
5970 this .alternate = alternate ;
71+ this .wip = new AtomicInteger ();
6072 }
6173
6274 @ Override
@@ -69,14 +81,33 @@ public void onCompleted() {
6981 if (!empty ) {
7082 child .onCompleted ();
7183 } else if (!child .isUnsubscribed ()) {
72- subscribeToAlternate ();
84+ active = false ;
85+ subscribe (null );
7386 }
7487 }
7588
76- private void subscribeToAlternate () {
77- AlternateSubscriber <T > as = new AlternateSubscriber <T >(child , arbiter );
78- serial .set (as );
79- alternate .unsafeSubscribe (as );
89+ void subscribe (Observable <? extends T > source ) {
90+ if (wip .getAndIncrement () == 0 ) {
91+ do {
92+ if (child .isUnsubscribed ()) {
93+ break ;
94+ }
95+
96+ if (!active ) {
97+ if (source == null ) {
98+ AlternateSubscriber <T > as = new AlternateSubscriber <T >(child , arbiter );
99+ serial .set (as );
100+ active = true ;
101+ alternate .unsafeSubscribe (as );
102+ } else {
103+ active = true ;
104+ source .unsafeSubscribe (this );
105+ source = null ;
106+ }
107+ }
108+
109+ } while (wip .decrementAndGet () != 0 );
110+ }
80111 }
81112
82113 @ Override
0 commit comments