15
15
*/
16
16
package rx .internal .schedulers ;
17
17
18
- import java .util .ArrayList ;
19
- import java .util .List ;
20
18
import java .util .concurrent .Future ;
21
- import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
19
+ import java .util .concurrent .atomic .AtomicBoolean ;
20
+ import java .util .concurrent .atomic .AtomicReference ;
22
21
23
22
import rx .Subscription ;
24
- import rx .exceptions .CompositeException ;
25
23
import rx .exceptions .OnErrorNotImplementedException ;
26
24
import rx .functions .Action0 ;
27
25
import rx .plugins .RxJavaPlugins ;
31
29
* A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
32
30
* {@code Subscriber} in respect of an {@code Observer}.
33
31
*/
34
- public final class ScheduledAction implements Runnable , Subscription {
35
- final Subscription [] cancel ;
36
- volatile int count ;
32
+ public final class ScheduledAction extends AtomicReference <Thread > implements Runnable , Subscription {
33
+ /** */
34
+ private static final long serialVersionUID = -3962399486978279857L ;
35
+ final CompositeSubscription cancel ;
37
36
final Action0 action ;
38
- /** Set by the run() method to avoid self interrupting at the end of the run method. */
39
- Thread runner ;
40
37
41
38
public ScheduledAction (Action0 action ) {
42
39
this .action = action ;
43
- this .cancel = new Subscription [ 4 ] ;
40
+ this .cancel = new CompositeSubscription () ;
44
41
}
45
42
46
43
@ Override
47
44
public void run () {
48
- Thread thread = Thread .currentThread ();
49
45
try {
50
- runner = thread ;
46
+ lazySet ( Thread . currentThread ()) ;
51
47
action .call ();
52
48
} catch (Throwable e ) {
53
49
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
@@ -58,6 +54,7 @@ public void run() {
58
54
ie = new IllegalStateException ("Fatal Exception thrown on Scheduler.Worker thread." , e );
59
55
}
60
56
RxJavaPlugins .getInstance ().getErrorHandler ().handleError (ie );
57
+ Thread thread = Thread .currentThread ();
61
58
thread .getUncaughtExceptionHandler ().uncaughtException (thread , ie );
62
59
} finally {
63
60
unsubscribe ();
@@ -66,76 +63,24 @@ public void run() {
66
63
67
64
@ Override
68
65
public boolean isUnsubscribed () {
69
- return count < 0 ;
66
+ return cancel . isUnsubscribed () ;
70
67
}
71
68
72
69
@ Override
73
70
public void unsubscribe () {
74
- Subscription [] subs = cancel ;
75
- if (count >= 0 ) {
76
- synchronized (this ) {
77
- if (count < 0 ) {
78
- return ;
79
- }
80
- count = -1 ;
81
- }
71
+ if (!cancel .isUnsubscribed ()) {
72
+ cancel .unsubscribe ();
82
73
}
83
- unsubscribeFromAll (subs );
84
74
}
85
75
86
-
87
76
/**
88
77
* Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
89
78
* if the underlying {@code action} completes or the this scheduled action is cancelled.
90
79
*
91
80
* @param s the Subscription to add
92
81
*/
93
82
public void add (Subscription s ) {
94
- if (count >= 0 ) {
95
- synchronized (this ) {
96
- int c = count ;
97
- if (c >= 0 ) {
98
- cancel [c ] = s ;
99
- count = c + 1 ;
100
- return ;
101
- }
102
- }
103
- }
104
- s .unsubscribe ();
105
- }
106
-
107
- private static void unsubscribeFromAll (Subscription ... subscriptions ) {
108
- if (subscriptions == null ) {
109
- return ;
110
- }
111
- List <Throwable > es = null ;
112
- for (Subscription s : subscriptions ) {
113
- if (s == null ) {
114
- break ;
115
- }
116
- try {
117
- s .unsubscribe ();
118
- } catch (Throwable e ) {
119
- if (es == null ) {
120
- es = new ArrayList <Throwable >();
121
- }
122
- es .add (e );
123
- }
124
- }
125
- if (es != null ) {
126
- if (es .size () == 1 ) {
127
- Throwable t = es .get (0 );
128
- if (t instanceof RuntimeException ) {
129
- throw (RuntimeException ) t ;
130
- } else {
131
- throw new CompositeException (
132
- "Failed to unsubscribe to 1 or more subscriptions." , es );
133
- }
134
- } else {
135
- throw new CompositeException (
136
- "Failed to unsubscribe to 2 or more subscriptions." , es );
137
- }
138
- }
83
+ cancel .add (s );
139
84
}
140
85
141
86
/**
@@ -144,7 +89,7 @@ private static void unsubscribeFromAll(Subscription... subscriptions) {
144
89
* @param f the future to add
145
90
*/
146
91
public void add (final Future <?> f ) {
147
- add (new FutureCompleter (f ));
92
+ cancel . add (new FutureCompleter (f ));
148
93
}
149
94
150
95
/**
@@ -155,7 +100,7 @@ public void add(final Future<?> f) {
155
100
* the parent {@code CompositeSubscription} to add
156
101
*/
157
102
public void addParent (CompositeSubscription parent ) {
158
- add (new Remover (this , parent ));
103
+ cancel . add (new Remover (this , parent ));
159
104
}
160
105
161
106
/**
@@ -173,7 +118,7 @@ private FutureCompleter(Future<?> f) {
173
118
174
119
@ Override
175
120
public void unsubscribe () {
176
- if (runner != Thread .currentThread ()) {
121
+ if (ScheduledAction . this . get () != Thread .currentThread ()) {
177
122
f .cancel (true );
178
123
} else {
179
124
f .cancel (false );
@@ -186,13 +131,11 @@ public boolean isUnsubscribed() {
186
131
}
187
132
188
133
/** Remove a child subscription from a composite when unsubscribing. */
189
- private static final class Remover implements Subscription {
134
+ private static final class Remover extends AtomicBoolean implements Subscription {
135
+ /** */
136
+ private static final long serialVersionUID = 247232374289553518L ;
190
137
final Subscription s ;
191
138
final CompositeSubscription parent ;
192
- @ SuppressWarnings ("unused" )
193
- volatile int once ;
194
- static final AtomicIntegerFieldUpdater <Remover > ONCE_UPDATER
195
- = AtomicIntegerFieldUpdater .newUpdater (Remover .class , "once" );
196
139
197
140
public Remover (Subscription s , CompositeSubscription parent ) {
198
141
this .s = s ;
@@ -206,7 +149,7 @@ public boolean isUnsubscribed() {
206
149
207
150
@ Override
208
151
public void unsubscribe () {
209
- if (ONCE_UPDATER . compareAndSet (this , 0 , 1 )) {
152
+ if (compareAndSet (false , true )) {
210
153
parent .remove (s );
211
154
}
212
155
}
0 commit comments