16
16
package rx .observers ;
17
17
18
18
import java .util .Arrays ;
19
- import java .util .concurrent .atomic .AtomicBoolean ;
19
+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
20
20
21
21
import rx .Subscriber ;
22
22
import rx .exceptions .CompositeException ;
60
60
public class SafeSubscriber <T > extends Subscriber <T > {
61
61
62
62
private final Subscriber <? super T > actual ;
63
- private final AtomicBoolean isFinished = new AtomicBoolean (false );
63
+ /** Terminal state indication if not zero. */
64
+ volatile int done ;
65
+ @ SuppressWarnings ("rawtypes" )
66
+ static final AtomicIntegerFieldUpdater <SafeSubscriber > DONE_UPDATER
67
+ = AtomicIntegerFieldUpdater .newUpdater (SafeSubscriber .class , "done" );
64
68
65
69
public SafeSubscriber (Subscriber <? super T > actual ) {
66
70
super (actual );
@@ -69,7 +73,7 @@ public SafeSubscriber(Subscriber<? super T> actual) {
69
73
70
74
@ Override
71
75
public void onCompleted () {
72
- if (isFinished . compareAndSet ( false , true ) ) {
76
+ if (DONE_UPDATER . getAndSet ( this , 1 ) == 0 ) {
73
77
try {
74
78
actual .onCompleted ();
75
79
} catch (Throwable e ) {
@@ -90,15 +94,15 @@ public void onError(Throwable e) {
90
94
// we handle here instead of another method so we don't add stacks to the frame
91
95
// which can prevent it from being able to handle StackOverflow
92
96
Exceptions .throwIfFatal (e );
93
- if (isFinished . compareAndSet ( false , true ) ) {
97
+ if (DONE_UPDATER . getAndSet ( this , 1 ) == 0 ) {
94
98
_onError (e );
95
99
}
96
100
}
97
101
98
102
@ Override
99
103
public void onNext (T args ) {
100
104
try {
101
- if (! isFinished . get () ) {
105
+ if (done == 0 ) {
102
106
actual .onNext (args );
103
107
}
104
108
} catch (Throwable e ) {
0 commit comments