3232 */
3333
3434import java .util .concurrent .atomic .AtomicInteger ;
35- import java .util .concurrent .atomic .AtomicReference ;
3635
3736import rx .Observable ;
3837import rx .Observable .Operator ;
39- import rx .Scheduler ;
4038import rx .Scheduler .Inner ;
4139import rx .Subscriber ;
42- import rx .Subscription ;
4340import rx .functions .Action1 ;
4441import rx .schedulers .Schedulers ;
42+ import rx .subscriptions .SerialSubscription ;
4543
4644public class OperatorRetry <T > implements Operator <T , Observable <T >> {
4745
@@ -58,63 +56,59 @@ public OperatorRetry() {
5856 }
5957
6058 @ Override
61- public Subscriber <? super Observable <T >> call (final Subscriber <? super T > s ) {
62- return new Subscriber <Observable <T >>(s ) {
59+ public Subscriber <? super Observable <T >> call (final Subscriber <? super T > child ) {
60+ final SerialSubscription serialSubscription = new SerialSubscription ();
61+ // add serialSubscription so it gets unsubscribed if child is unsubscribed
62+ child .add (serialSubscription );
63+ return new Subscriber <Observable <T >>(child ) {
6364 final AtomicInteger attempts = new AtomicInteger (0 );
64-
65+
6566 @ Override
6667 public void onCompleted () {
6768 // ignore as we expect a single nested Observable<T>
6869 }
6970
7071 @ Override
7172 public void onError (Throwable e ) {
72- s .onError (e );
73+ child .onError (e );
7374 }
7475
7576 @ Override
7677 public void onNext (final Observable <T > o ) {
77-
78- final AtomicReference <Subscription > retrySub =new AtomicReference <Subscription >();
79-
8078 Schedulers .trampoline ().schedule (new Action1 <Inner >() {
8179
8280 @ Override
8381 public void call (final Inner inner ) {
8482 final Action1 <Inner > _self = this ;
8583 attempts .incrementAndGet ();
86- retrySub .set (o .unsafeSubscribe (new Subscriber <T >(s ) {
84+
85+ Subscriber <T > subscriber = new Subscriber <T >(child ) {
8786
8887 @ Override
8988 public void onCompleted () {
90- s .onCompleted ();
89+ child .onCompleted ();
9190 }
9291
9392 @ Override
9493 public void onError (Throwable e ) {
9594 if ((retryCount == INFINITE_RETRY || attempts .get () <= retryCount ) && !inner .isUnsubscribed ()) {
9695 // retry again
97- inner .schedule (new Action1 <Inner >() {
98- @ Override
99- public void call (Inner inner )
100- {
101- // Remove the failed subscription first
102- retrySub .get ().unsubscribe ();
103- _self .call (inner );
104- }
105- });
96+ inner .schedule (_self );
10697 } else {
10798 // give up and pass the failure
108- s .onError (e );
99+ child .onError (e );
109100 }
110101 }
111102
112103 @ Override
113104 public void onNext (T v ) {
114- s .onNext (v );
105+ child .onNext (v );
115106 }
116107
117- }));
108+ };
109+ // register this Subscription (and unsubscribe previous if exists)
110+ serialSubscription .set (subscriber );
111+ o .unsafeSubscribe (subscriber );
118112 }
119113 });
120114 }
0 commit comments