1
- package rx .ios .schedulers ;
2
1
/**
3
2
* Copyright 2013 Netflix, Inc.
4
3
* Copyright 2014 Ashley Williams
16
15
* limitations under the License.
17
16
*/
18
17
18
+ package rx .ios .schedulers ;
19
19
20
- import org .robovm .apple .foundation .NSBlockOperation ;
21
20
import org .robovm .apple .foundation .NSOperationQueue ;
22
21
import rx .Scheduler ;
23
22
import rx .Subscription ;
24
23
import rx .functions .Action0 ;
25
- import rx .subscriptions .BooleanSubscription ;
24
+ import rx .internal .util .RxThreadFactory ;
25
+ import rx .subscriptions .CompositeSubscription ;
26
26
import rx .subscriptions .Subscriptions ;
27
27
28
28
import java .util .concurrent .Executors ;
29
+ import java .util .concurrent .Future ;
29
30
import java .util .concurrent .ScheduledExecutorService ;
30
31
import java .util .concurrent .TimeUnit ;
31
32
35
36
public class HandlerThreadScheduler extends Scheduler {
36
37
37
38
private final NSOperationQueue operationQueue ;
39
+ private static final String THREAD_PREFIX = "RxiOSScheduledExecutorPool-" ;
40
+
38
41
39
42
public HandlerThreadScheduler (NSOperationQueue operationQueue ) {
40
43
this .operationQueue = operationQueue ;
@@ -49,7 +52,7 @@ public Worker createWorker() {
49
52
private static class InnerHandlerThreadScheduler extends Worker {
50
53
51
54
private final NSOperationQueue operationQueue ;
52
- private BooleanSubscription innerSubscription = new BooleanSubscription ();
55
+ private CompositeSubscription innerSubscription = new CompositeSubscription ();
53
56
54
57
55
58
public InnerHandlerThreadScheduler (NSOperationQueue operationQueue ) {
@@ -67,43 +70,53 @@ public boolean isUnsubscribed() {
67
70
}
68
71
69
72
@ Override
70
- public Subscription schedule (Action0 action0 ) {
71
- return schedule (action0 , 0 , TimeUnit . MILLISECONDS );
73
+ public Subscription schedule (final Action0 action ) {
74
+ return schedule (action , 0 , null );
72
75
}
73
76
74
77
@ Override
75
78
public Subscription schedule (final Action0 action , long delayTime , TimeUnit unit ) {
79
+ return scheduledAction (action , delayTime , unit );
80
+ }
81
+
82
+ public Subscription scheduledAction (final Action0 action , long delay , TimeUnit unit ) {
76
83
77
- ScheduledExecutorService executor = Executors .newScheduledThreadPool (1 );
78
- final NSBlockOperation runOperation = new NSBlockOperation ();
79
-
80
- executor .schedule (new Runnable () {
81
- @ Override
82
- public void run () {
83
- if (isUnsubscribed ()) {
84
- return ;
85
- }
86
- /* Runnable for action */
87
- final Runnable actionRunner = new Runnable () {
88
- @ Override
89
- public void run () {
90
- action .call ();
91
- }
92
- };
93
-
94
- runOperation .addExecutionBlock$ (actionRunner );
95
-
96
- /* Add operation to operation queue*/
97
- operationQueue .addOperation (runOperation );
98
- }
99
- }, delayTime , unit );
100
-
101
- return Subscriptions .create (new Action0 () {
102
- @ Override
103
- public void call () {
104
- runOperation .cancel ();
105
- }
106
- });
84
+ if (innerSubscription .isUnsubscribed ()) {
85
+ return Subscriptions .empty ();
86
+ }
87
+
88
+ final ScheduledIOSAction scheduledAction = new ScheduledIOSAction (action , operationQueue );
89
+ final ScheduledExecutorService executor = IOSScheduledExecutorPool .getInstance ();
90
+
91
+ Future <?> future ;
92
+ if (delay <= 0 ) {
93
+ future = executor .submit (scheduledAction );
94
+ } else {
95
+ future = executor .schedule (scheduledAction , delay , unit );
96
+ }
97
+
98
+ scheduledAction .add (Subscriptions .from (future ));
99
+ scheduledAction .addParent (innerSubscription );
100
+
101
+ return scheduledAction ;
107
102
}
108
103
}
104
+
105
+
106
+ private static final class IOSScheduledExecutorPool {
107
+
108
+ private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory (THREAD_PREFIX );
109
+
110
+ private static IOSScheduledExecutorPool INSTANCE = new IOSScheduledExecutorPool ();
111
+ private final ScheduledExecutorService executorService ;
112
+
113
+ private IOSScheduledExecutorPool () {
114
+ executorService = Executors .newScheduledThreadPool (1 , THREAD_FACTORY );
115
+ }
116
+
117
+ public static ScheduledExecutorService getInstance () {
118
+ return INSTANCE .executorService ;
119
+ }
120
+ }
121
+
109
122
}
0 commit comments