23
23
import rx .functions .Action0 ;
24
24
import rx .plugins .RxJavaPlugins ;
25
25
import rx .subscriptions .CompositeSubscription ;
26
- import rx .subscriptions .Subscriptions ;
27
26
28
27
/**
29
28
* A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
32
31
public final class ScheduledAction implements Runnable , Subscription {
33
32
final CompositeSubscription cancel ;
34
33
final Action0 action ;
35
- volatile int once ;
36
- static final AtomicIntegerFieldUpdater <ScheduledAction > ONCE_UPDATER
37
- = AtomicIntegerFieldUpdater .newUpdater (ScheduledAction .class , "once" );
38
34
/** Set by the run() method to avoid self interrupting at the end of the run method. */
39
35
volatile Thread runner ;
40
36
@@ -71,9 +67,7 @@ public boolean isUnsubscribed() {
71
67
72
68
@ Override
73
69
public void unsubscribe () {
74
- if (ONCE_UPDATER .compareAndSet (this , 0 , 1 )) {
75
- cancel .unsubscribe ();
76
- }
70
+ cancel .unsubscribe ();
77
71
}
78
72
79
73
/**
@@ -92,7 +86,7 @@ public void add(Subscription s) {
92
86
* @param f the future to add
93
87
*/
94
88
public void add (final Future <?> f ) {
95
- cancel .add (Subscriptions . create ( new FutureCompleter (f ) ));
89
+ cancel .add (new FutureCompleter (f ));
96
90
}
97
91
98
92
/**
@@ -112,19 +106,25 @@ public void addParent(CompositeSubscription parent) {
112
106
* prevent unnecessary self-interrupting if the unsubscription
113
107
* happens from the same thread.
114
108
*/
115
- private final class FutureCompleter implements Action0 {
109
+ private final class FutureCompleter implements Subscription {
116
110
private final Future <?> f ;
117
111
118
112
private FutureCompleter (Future <?> f ) {
119
113
this .f = f ;
120
114
}
121
115
122
116
@ Override
123
- public void call () {
117
+ public void unsubscribe () {
124
118
if (runner != Thread .currentThread ()) {
125
119
f .cancel (true );
120
+ } else {
121
+ f .cancel (false );
126
122
}
127
123
}
124
+ @ Override
125
+ public boolean isUnsubscribed () {
126
+ return f .isCancelled ();
127
+ }
128
128
}
129
129
130
130
/** Remove a child subscription from a composite when unsubscribing. */
0 commit comments