3535
3636import java .util .concurrent .atomic .AtomicBoolean ;
3737import java .util .concurrent .atomic .AtomicLong ;
38- import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
3938import java .util .concurrent .atomic .AtomicReference ;
4039
4140import rx .Notification ;
5049import rx .functions .Func2 ;
5150import rx .schedulers .Schedulers ;
5251import rx .subjects .PublishSubject ;
53- import rx .subscriptions .CompositeSubscription ;
52+ import rx .subscriptions .SerialSubscription ;
5453
5554public final class OnSubscribeRedo <T > implements OnSubscribe <T > {
5655
@@ -82,8 +81,10 @@ public Observable<?> call(Observable<? extends Notification<?>> ts) {
8281 @ Override
8382 public Notification <Long > call (Notification <Long > n , Notification <?> term ) {
8483 final long value = n .getValue ();
85- if (value < count ) return Notification .createOnNext (value + 1 );
86- else return (Notification <Long >) term ;
84+ if (value < count )
85+ return Notification .createOnNext (value + 1 );
86+ else
87+ return (Notification <Long >) term ;
8788 }
8889 }).dematerialize ();
8990 }
@@ -103,8 +104,10 @@ public Observable<? extends Notification<?>> call(Observable<? extends Notificat
103104 @ Override
104105 public Notification <Integer > call (Notification <Integer > n , Notification <?> term ) {
105106 final int value = n .getValue ();
106- if (predicate .call (value , term .getThrowable ()).booleanValue ()) return Notification .createOnNext (value + 1 );
107- else return (Notification <Integer >) term ;
107+ if (predicate .call (value , term .getThrowable ()).booleanValue ())
108+ return Notification .createOnNext (value + 1 );
109+ else
110+ return (Notification <Integer >) term ;
108111 }
109112 });
110113 }
@@ -115,8 +118,10 @@ public static <T> Observable<T> retry(Observable<T> source) {
115118 }
116119
117120 public static <T > Observable <T > retry (Observable <T > source , final long count ) {
118- if (count < 0 ) throw new IllegalArgumentException ("count >= 0 expected" );
119- if (count == 0 ) return source ;
121+ if (count < 0 )
122+ throw new IllegalArgumentException ("count >= 0 expected" );
123+ if (count == 0 )
124+ return source ;
120125 return retry (source , new RedoFinite (count ));
121126 }
122127
@@ -141,7 +146,8 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count) {
141146 }
142147
143148 public static <T > Observable <T > repeat (Observable <T > source , final long count , Scheduler scheduler ) {
144- if (count < 0 ) throw new IllegalArgumentException ("count >= 0 expected" );
149+ if (count < 0 )
150+ throw new IllegalArgumentException ("count >= 0 expected" );
145151 return repeat (source , new RedoFinite (count - 1 ), scheduler );
146152 }
147153
@@ -162,11 +168,6 @@ public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observa
162168 private boolean stopOnComplete ;
163169 private boolean stopOnError ;
164170 private final Scheduler scheduler ;
165- private final AtomicBoolean isLocked = new AtomicBoolean (true );
166- private final AtomicBoolean isStarted = new AtomicBoolean (false );
167- // incremented when requests are made, decremented when requests are fulfilled
168- private final AtomicLong consumerCapacity = new AtomicLong (0l );
169- private final AtomicReference <Producer > currentProducer = new AtomicReference <Producer >();
170171
171172 private OnSubscribeRedo (Observable <T > source , Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <?>> f , boolean stopOnComplete , boolean stopOnError ,
172173 Scheduler scheduler ) {
@@ -179,17 +180,18 @@ private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends
179180
180181 @ Override
181182 public void call (final Subscriber <? super T > child ) {
182- isStarted .set (false );
183- isLocked .set (true );
184- consumerCapacity .set (0l );
185- currentProducer .set (null );
183+ final AtomicBoolean isLocked = new AtomicBoolean (true );
184+ final AtomicBoolean isStarted = new AtomicBoolean (false );
185+ // incremented when requests are made, decremented when requests are fulfilled
186+ final AtomicLong consumerCapacity = new AtomicLong (0l );
187+ final AtomicReference <Producer > currentProducer = new AtomicReference <Producer >();
186188
187- final Scheduler .Worker inner = scheduler .createWorker ();
188- child .add (inner );
189+ final Scheduler .Worker worker = scheduler .createWorker ();
190+ child .add (worker );
189191
190- final CompositeSubscription sourceSubscriptions = new CompositeSubscription ();
192+ final SerialSubscription sourceSubscriptions = new SerialSubscription ();
191193 child .add (sourceSubscriptions );
192-
194+
193195 final PublishSubject <Notification <?>> terminals = PublishSubject .create ();
194196
195197 final Action0 subscribeToSource = new Action0 () {
@@ -222,7 +224,7 @@ public void setProducer(Producer producer) {
222224 };
223225 // new subscription each time so if it unsubscribes itself it does not prevent retries
224226 // by unsubscribing the child subscription
225- sourceSubscriptions .add (terminalDelegatingSubscriber );
227+ sourceSubscriptions .set (terminalDelegatingSubscriber );
226228 source .unsafeSubscribe (terminalDelegatingSubscriber );
227229 }
228230 };
@@ -247,8 +249,10 @@ public void onError(Throwable e) {
247249
248250 @ Override
249251 public void onNext (Notification <?> t ) {
250- if (t .isOnCompleted () && stopOnComplete ) child .onCompleted ();
251- else if (t .isOnError () && stopOnError ) child .onError (t .getThrowable ());
252+ if (t .isOnCompleted () && stopOnComplete )
253+ child .onCompleted ();
254+ else if (t .isOnError () && stopOnError )
255+ child .onError (t .getThrowable ());
252256 else {
253257 isLocked .set (false );
254258 filteredTerminals .onNext (t );
@@ -264,7 +268,7 @@ public void setProducer(Producer producer) {
264268 }));
265269
266270 // subscribe to the restarts observable to know when to schedule the next redo.
267- child . add ( inner .schedule (new Action0 () {
271+ worker .schedule (new Action0 () {
268272 @ Override
269273 public void call () {
270274 restarts .unsafeSubscribe (new Subscriber <Object >(child ) {
@@ -281,7 +285,9 @@ public void onError(Throwable e) {
281285 @ Override
282286 public void onNext (Object t ) {
283287 if (!isLocked .get () && !child .isUnsubscribed ()) {
284- child .add (inner .schedule (subscribeToSource ));
288+ if (consumerCapacity .get () > 0 ) {
289+ worker .schedule (subscribeToSource );
290+ }
285291 }
286292 }
287293
@@ -291,18 +297,24 @@ public void setProducer(Producer producer) {
291297 }
292298 });
293299 }
294- })) ;
300+ });
295301
296302 child .setProducer (new Producer () {
297303
298304 @ Override
299305 public void request (long n ) {
300306 if (isStarted .compareAndSet (false , true )) {
301307 consumerCapacity .set (n );
302- if (!child .isUnsubscribed ()) child .add (inner .schedule (subscribeToSource ));
303- } else if (currentProducer .get () != null ) {
304- consumerCapacity .getAndAdd (n );
305- currentProducer .get ().request (n );
308+ worker .schedule (subscribeToSource );
309+ } else {
310+ if (consumerCapacity .getAndAdd (n ) == 0 ) {
311+ // restart
312+ worker .schedule (subscribeToSource );
313+ } else {
314+ if (currentProducer .get () != null ) {
315+ currentProducer .get ().request (n );
316+ }
317+ }
306318 }
307319 }
308320 });
0 commit comments