@@ -98,6 +98,7 @@ private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault)
98
98
static final class State <T > {
99
99
final AtomicReference <Object > latest = new AtomicReference <Object >();
100
100
final AtomicReference <BehaviorState > observers = new AtomicReference <BehaviorState >(BehaviorState .EMPTY );
101
+ boolean active = true ;
101
102
void set (Object value ) {
102
103
this .latest .set (value );
103
104
}
@@ -139,6 +140,7 @@ BehaviorObserver<T>[] next(Object n) {
139
140
}
140
141
BehaviorObserver <T >[] terminate (Object n ) {
141
142
set (n );
143
+ active = false ;
142
144
do {
143
145
BehaviorState oldState = observers .get ();
144
146
if (oldState .terminated ) {
@@ -149,9 +151,6 @@ BehaviorObserver<T>[] terminate(Object n) {
149
151
}
150
152
} while (true );
151
153
}
152
- boolean isActive () {
153
- return !observers .get ().terminated ;
154
- }
155
154
}
156
155
static final class BehaviorState {
157
156
final boolean terminated ;
@@ -241,7 +240,7 @@ protected BehaviorSubject(OnSubscribe<T> onSubscribe, State<T> state) {
241
240
@ Override
242
241
public void onCompleted () {
243
242
Object last = state .get ();
244
- if (last == null || state .isActive () ) {
243
+ if (last == null || state .active ) {
245
244
Object n = nl .completed ();
246
245
for (BehaviorObserver <T > bo : state .terminate (n )) {
247
246
bo .emitNext (n );
@@ -252,7 +251,7 @@ public void onCompleted() {
252
251
@ Override
253
252
public void onError (Throwable e ) {
254
253
Object last = state .get ();
255
- if (last == null || state .isActive () ) {
254
+ if (last == null || state .active ) {
256
255
Object n = nl .error (e );
257
256
for (BehaviorObserver <T > bo : state .terminate (n )) {
258
257
bo .emitNext (n );
@@ -263,7 +262,7 @@ public void onError(Throwable e) {
263
262
@ Override
264
263
public void onNext (T v ) {
265
264
Object last = state .get ();
266
- if (last == null || state .isActive () ) {
265
+ if (last == null || state .active ) {
267
266
Object n = nl .next (v );
268
267
for (BehaviorObserver <T > bo : state .next (n )) {
269
268
bo .emitNext (n );
@@ -284,8 +283,6 @@ private static final class BehaviorObserver<T> {
284
283
boolean emitting ;
285
284
/** Guarded by this. */
286
285
List <Object > queue ;
287
- /** Accessed only from serialized state. */
288
- boolean done ;
289
286
/* volatile */ boolean fastPath ;
290
287
public BehaviorObserver (Observer <? super T > actual ) {
291
288
this .actual = actual ;
@@ -349,10 +346,7 @@ void emitLoop(List<Object> localQueue, Object current) {
349
346
}
350
347
}
351
348
void accept (Object n ) {
352
- if (n != null && !done ) {
353
- if (nl .isCompleted (n ) || nl .isError (n )) {
354
- done = true ;
355
- }
349
+ if (n != null ) {
356
350
nl .accept (actual , n );
357
351
}
358
352
}
0 commit comments