1515 */
1616package rx .internal .operators ;
1717
18- import java .util .concurrent .atomic .AtomicLong ;
1918
2019import rx .*;
20+ import rx .internal .producers .ProducerArbiter ;
2121import rx .subscriptions .SerialSubscription ;
2222
2323/**
@@ -35,36 +35,32 @@ public OperatorSwitchIfEmpty(Observable<? extends T> alternate) {
3535 @ Override
3636 public Subscriber <? super T > call (Subscriber <? super T > child ) {
3737 final SerialSubscription ssub = new SerialSubscription ();
38- final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber (child , ssub );
38+ ProducerArbiter arbiter = new ProducerArbiter ();
39+ final ParentSubscriber <T > parent = new ParentSubscriber <T >(child , ssub , arbiter , alternate );
3940 ssub .set (parent );
4041 child .add (ssub );
42+ child .setProducer (arbiter );
4143 return parent ;
4244 }
4345
44- private class SwitchIfEmptySubscriber extends Subscriber <T > {
45-
46- boolean empty = true ;
47- final AtomicLong consumerCapacity = new AtomicLong (0l );
46+ private static final class ParentSubscriber <T > extends Subscriber <T > {
4847
48+ private boolean empty = true ;
4949 private final Subscriber <? super T > child ;
50- final SerialSubscription ssub ;
50+ private final SerialSubscription ssub ;
51+ private final ProducerArbiter arbiter ;
52+ private final Observable <? extends T > alternate ;
5153
52- public SwitchIfEmptySubscriber (Subscriber <? super T > child , final SerialSubscription ssub ) {
54+ ParentSubscriber (Subscriber <? super T > child , final SerialSubscription ssub , ProducerArbiter arbiter , Observable <? extends T > alternate ) {
5355 this .child = child ;
5456 this .ssub = ssub ;
57+ this .arbiter = arbiter ;
58+ this .alternate = alternate ;
5559 }
5660
5761 @ Override
5862 public void setProducer (final Producer producer ) {
59- super .setProducer (new Producer () {
60- @ Override
61- public void request (long n ) {
62- if (empty ) {
63- consumerCapacity .set (n );
64- }
65- producer .request (n );
66- }
67- });
63+ arbiter .setProducer (producer );
6864 }
6965
7066 @ Override
@@ -77,41 +73,9 @@ public void onCompleted() {
7773 }
7874
7975 private void subscribeToAlternate () {
80- ssub .set (alternate .unsafeSubscribe (new Subscriber <T >() {
81-
82- @ Override
83- public void setProducer (final Producer producer ) {
84- child .setProducer (new Producer () {
85- @ Override
86- public void request (long n ) {
87- producer .request (n );
88- }
89- });
90- }
91-
92- @ Override
93- public void onStart () {
94- final long capacity = consumerCapacity .get ();
95- if (capacity > 0 ) {
96- request (capacity );
97- }
98- }
99-
100- @ Override
101- public void onCompleted () {
102- child .onCompleted ();
103- }
104-
105- @ Override
106- public void onError (Throwable e ) {
107- child .onError (e );
108- }
109-
110- @ Override
111- public void onNext (T t ) {
112- child .onNext (t );
113- }
114- }));
76+ AlternateSubscriber <T > as = new AlternateSubscriber <T >(child , arbiter );
77+ ssub .set (as );
78+ alternate .unsafeSubscribe (as );
11579 }
11680
11781 @ Override
@@ -123,6 +87,39 @@ public void onError(Throwable e) {
12387 public void onNext (T t ) {
12488 empty = false ;
12589 child .onNext (t );
90+ arbiter .produced (1 );
91+ }
92+ }
93+
94+ private static final class AlternateSubscriber <T > extends Subscriber <T > {
95+
96+ private final ProducerArbiter arbiter ;
97+ private final Subscriber <? super T > child ;
98+
99+ AlternateSubscriber (Subscriber <? super T > child , ProducerArbiter arbiter ) {
100+ this .child = child ;
101+ this .arbiter = arbiter ;
102+ }
103+
104+ @ Override
105+ public void setProducer (final Producer producer ) {
106+ arbiter .setProducer (producer );
107+ }
108+
109+ @ Override
110+ public void onCompleted () {
111+ child .onCompleted ();
126112 }
113+
114+ @ Override
115+ public void onError (Throwable e ) {
116+ child .onError (e );
117+ }
118+
119+ @ Override
120+ public void onNext (T t ) {
121+ child .onNext (t );
122+ arbiter .produced (1 );
123+ }
127124 }
128125}
0 commit comments