57
57
*/
58
58
public class SafeObserver <T > implements Observer <T > {
59
59
60
- private volatile Observer <? super T > actual ;
60
+ private final Observer <? super T > actual ;
61
61
private final AtomicBoolean isFinished = new AtomicBoolean (false );
62
62
private final SafeObservableSubscription subscription ;
63
- /**
64
- * If the observer completes, this is swapped in place of the actual
65
- * should avoid the overhead of isFinished.get() on every onNext call. */
66
- private static final Observer <Object > nopObserver = new Observer <Object >() {
67
- @ Override
68
- public void onNext (Object args ) {
69
- }
70
- @ Override
71
- public void onError (Throwable e ) {
72
- }
73
63
74
- @ Override
75
- public void onCompleted () {
76
- }
77
-
78
- };
79
64
public SafeObserver (SafeObservableSubscription subscription , Observer <? super T > actual ) {
80
65
this .subscription = subscription ;
81
66
this .actual = actual ;
@@ -84,10 +69,8 @@ public SafeObserver(SafeObservableSubscription subscription, Observer<? super T>
84
69
@ Override
85
70
public void onCompleted () {
86
71
if (isFinished .compareAndSet (false , true )) {
87
- Observer <? super T > a = actual ;
88
- actual = nopObserver ;
89
72
try {
90
- a .onCompleted ();
73
+ actual .onCompleted ();
91
74
} catch (Throwable e ) {
92
75
// handle errors if the onCompleted implementation fails, not just if the Observable fails
93
76
onError (e );
@@ -100,11 +83,8 @@ public void onCompleted() {
100
83
@ Override
101
84
public void onError (Throwable e ) {
102
85
if (isFinished .compareAndSet (false , true )) {
103
- Observer <? super T > a = actual ;
104
- // will prevent onNext from sending a new value after completion
105
- actual = nopObserver ;
106
86
try {
107
- a .onError (e );
87
+ actual .onError (e );
108
88
} catch (Throwable e2 ) {
109
89
if (e2 instanceof OnErrorNotImplementedException ) {
110
90
/**
@@ -137,10 +117,12 @@ public void onError(Throwable e) {
137
117
@ Override
138
118
public void onNext (T args ) {
139
119
try {
140
- actual .onNext (args );
141
- } catch (Throwable t ) {
120
+ if (!isFinished .get ()) {
121
+ actual .onNext (args );
122
+ }
123
+ } catch (Throwable e ) {
142
124
// handle errors if the onNext implementation fails, not just if the Observable fails
143
- onError (t );
125
+ onError (e );
144
126
}
145
127
}
146
128
0 commit comments