11/**
22 * Copyright 2014 Netflix, Inc.
3- *
3+ *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
66 * You may obtain a copy of the License at
7- *
7+ *
88 * http://www.apache.org/licenses/LICENSE-2.0
9- *
9+ *
1010 * Unless required by applicable law or agreed to in writing, software
1111 * distributed under the License is distributed on an "AS IS" BASIS,
1212 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
2020import rx .Scheduler ;
2121import rx .Subscription ;
2222import rx .functions .Action0 ;
23- import rx .functions . Action1 ;
24- import rx .subscriptions .BooleanSubscription ;
23+ import rx .internal . schedulers . ScheduledAction ;
24+ import rx .subscriptions .CompositeSubscription ;
2525import rx .subscriptions .Subscriptions ;
2626import android .os .Handler ;
2727
@@ -34,7 +34,7 @@ public class HandlerThreadScheduler extends Scheduler {
3434
3535 /**
3636 * Constructs a {@link HandlerThreadScheduler} using the given {@link Handler}
37- *
37+ *
3838 * @param handler
3939 * {@link Handler} to use when scheduling actions
4040 */
@@ -46,47 +46,42 @@ public HandlerThreadScheduler(Handler handler) {
4646 public Worker createWorker () {
4747 return new InnerHandlerThreadScheduler (handler );
4848 }
49-
49+
5050 private static class InnerHandlerThreadScheduler extends Worker {
5151
5252 private final Handler handler ;
53- private BooleanSubscription innerSubscription = new BooleanSubscription ();
53+
54+ private final CompositeSubscription compositeSubscription = new CompositeSubscription ();
5455
5556 public InnerHandlerThreadScheduler (Handler handler ) {
5657 this .handler = handler ;
5758 }
5859
5960 @ Override
6061 public void unsubscribe () {
61- innerSubscription .unsubscribe ();
62+ compositeSubscription .unsubscribe ();
6263 }
6364
6465 @ Override
6566 public boolean isUnsubscribed () {
66- return innerSubscription .isUnsubscribed ();
67+ return compositeSubscription .isUnsubscribed ();
6768 }
6869
6970 @ Override
7071 public Subscription schedule (final Action0 action , long delayTime , TimeUnit unit ) {
71- final Runnable runnable = new Runnable () {
72- @ Override
73- public void run () {
74- if (isUnsubscribed ()) {
75- return ;
76- }
77- action .call ();
78- }
79- };
80- handler .postDelayed (runnable , unit .toMillis (delayTime ));
81- return Subscriptions .create (new Action0 () {
82-
72+ final ScheduledAction scheduledAction = new ScheduledAction (action );
73+ scheduledAction .add (Subscriptions .create (new Action0 () {
8374 @ Override
8475 public void call () {
85- handler .removeCallbacks (runnable );
86-
76+ handler .removeCallbacks (scheduledAction );
8777 }
88-
89- });
78+ }));
79+ scheduledAction .addParent (compositeSubscription );
80+ compositeSubscription .add (scheduledAction );
81+
82+ handler .postDelayed (scheduledAction , unit .toMillis (delayTime ));
83+
84+ return scheduledAction ;
9085 }
9186
9287 @ Override
@@ -95,5 +90,4 @@ public Subscription schedule(final Action0 action) {
9590 }
9691
9792 }
98-
9993}
0 commit comments