1616package rx .internal .operators ;
1717
1818import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
19+
1920import rx .Observable ;
21+ import rx .Producer ;
2022import rx .Scheduler ;
2123import rx .Subscriber ;
2224import rx .functions .Action0 ;
2325import rx .functions .Func2 ;
26+ import rx .internal .producers .ProducerArbiter ;
2427import rx .schedulers .Schedulers ;
2528import rx .subscriptions .SerialSubscription ;
2629
@@ -38,88 +41,99 @@ public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child)
3841 final SerialSubscription serialSubscription = new SerialSubscription ();
3942 // add serialSubscription so it gets unsubscribed if child is unsubscribed
4043 child .add (serialSubscription );
41-
42- return new SourceSubscriber <T >(child , predicate , inner , serialSubscription );
44+ ProducerArbiter pa = new ProducerArbiter ();
45+ child .setProducer (pa );
46+ return new SourceSubscriber <T >(child , predicate , inner , serialSubscription , pa );
4347 }
4448
4549 static final class SourceSubscriber <T > extends Subscriber <Observable <T >> {
4650 final Subscriber <? super T > child ;
4751 final Func2 <Integer , Throwable , Boolean > predicate ;
4852 final Scheduler .Worker inner ;
4953 final SerialSubscription serialSubscription ;
54+ final ProducerArbiter pa ;
5055
5156 volatile int attempts ;
5257 @ SuppressWarnings ("rawtypes" )
5358 static final AtomicIntegerFieldUpdater <SourceSubscriber > ATTEMPTS_UPDATER
5459 = AtomicIntegerFieldUpdater .newUpdater (SourceSubscriber .class , "attempts" );
5560
56- public SourceSubscriber (Subscriber <? super T > child , final Func2 <Integer , Throwable , Boolean > predicate , Scheduler .Worker inner ,
57- SerialSubscription serialSubscription ) {
61+ public SourceSubscriber (Subscriber <? super T > child ,
62+ final Func2 <Integer , Throwable , Boolean > predicate ,
63+ Scheduler .Worker inner ,
64+ SerialSubscription serialSubscription ,
65+ ProducerArbiter pa ) {
5866 this .child = child ;
5967 this .predicate = predicate ;
6068 this .inner = inner ;
6169 this .serialSubscription = serialSubscription ;
70+ this .pa = pa ;
6271 }
6372
6473
6574 @ Override
66- public void onCompleted () {
67- // ignore as we expect a single nested Observable<T>
68- }
75+ public void onCompleted () {
76+ // ignore as we expect a single nested Observable<T>
77+ }
6978
70- @ Override
71- public void onError (Throwable e ) {
72- child .onError (e );
73- }
79+ @ Override
80+ public void onError (Throwable e ) {
81+ child .onError (e );
82+ }
7483
75- @ Override
76- public void onNext (final Observable <T > o ) {
77- inner .schedule (new Action0 () {
84+ @ Override
85+ public void onNext (final Observable <T > o ) {
86+ inner .schedule (new Action0 () {
7887
79- @ Override
80- public void call () {
81- final Action0 _self = this ;
82- ATTEMPTS_UPDATER .incrementAndGet (SourceSubscriber .this );
88+ @ Override
89+ public void call () {
90+ final Action0 _self = this ;
91+ ATTEMPTS_UPDATER .incrementAndGet (SourceSubscriber .this );
8392
84- // new subscription each time so if it unsubscribes itself it does not prevent retries
85- // by unsubscribing the child subscription
86- Subscriber <T > subscriber = new Subscriber <T >() {
87- boolean done ;
88- @ Override
89- public void onCompleted () {
90- if (!done ) {
91- done = true ;
92- child .onCompleted ();
93- }
93+ // new subscription each time so if it unsubscribes itself it does not prevent retries
94+ // by unsubscribing the child subscription
95+ Subscriber <T > subscriber = new Subscriber <T >() {
96+ boolean done ;
97+ @ Override
98+ public void onCompleted () {
99+ if (!done ) {
100+ done = true ;
101+ child .onCompleted ();
94102 }
103+ }
95104
96- @ Override
97- public void onError (Throwable e ) {
98- if (!done ) {
99- done = true ;
100- if (predicate .call (attempts , e ) && !inner .isUnsubscribed ()) {
101- // retry again
102- inner .schedule (_self );
103- } else {
104- // give up and pass the failure
105- child .onError (e );
106- }
105+ @ Override
106+ public void onError (Throwable e ) {
107+ if (!done ) {
108+ done = true ;
109+ if (predicate .call (attempts , e ) && !inner .isUnsubscribed ()) {
110+ // retry again
111+ inner .schedule (_self );
112+ } else {
113+ // give up and pass the failure
114+ child .onError (e );
107115 }
108116 }
117+ }
109118
110- @ Override
111- public void onNext (T v ) {
112- if (!done ) {
113- child .onNext (v );
114- }
119+ @ Override
120+ public void onNext (T v ) {
121+ if (!done ) {
122+ child .onNext (v );
123+ pa . produced ( 1 );
115124 }
125+ }
116126
117- };
118- // register this Subscription (and unsubscribe previous if exists)
119- serialSubscription .set (subscriber );
120- o .unsafeSubscribe (subscriber );
121- }
122- });
123- }
127+ @ Override
128+ public void setProducer (Producer p ) {
129+ pa .setProducer (p );
130+ }
131+ };
132+ // register this Subscription (and unsubscribe previous if exists)
133+ serialSubscription .set (subscriber );
134+ o .unsafeSubscribe (subscriber );
135+ }
136+ });
137+ }
124138 }
125139}
0 commit comments