19
19
import java .util .ArrayList ;
20
20
import java .util .List ;
21
21
import java .util .concurrent .atomic .AtomicReference ;
22
- import rx .Notification ;
23
22
import rx .Observer ;
24
23
import rx .Subscriber ;
25
24
import rx .functions .Action0 ;
25
+ import rx .operators .NotificationLite ;
26
26
import rx .subscriptions .Subscriptions ;
27
27
28
28
/**
@@ -90,18 +90,18 @@ public static <T> BehaviorSubject<T> create(T defaultValue) {
90
90
private static <T > BehaviorSubject <T > create (T defaultValue , boolean hasDefault ) {
91
91
State <T > state = new State <T >();
92
92
if (hasDefault ) {
93
- state .set (Notification . createOnNext (defaultValue ));
93
+ state .set (NotificationLite . instance (). next (defaultValue ));
94
94
}
95
95
return new BehaviorSubject <T >(new BehaviorOnSubscribe <T >(state ), state );
96
96
}
97
97
98
98
static final class State <T > {
99
- final AtomicReference <Notification < T >> latest = new AtomicReference <Notification < T > >();
99
+ final AtomicReference <Object > latest = new AtomicReference <Object >();
100
100
final AtomicReference <BehaviorState > observers = new AtomicReference <BehaviorState >(BehaviorState .EMPTY );
101
- void set (Notification < T > value ) {
101
+ void set (Object value ) {
102
102
this .latest .set (value );
103
103
}
104
- Notification < T > get () {
104
+ Object get () {
105
105
return latest .get ();
106
106
}
107
107
BehaviorObserver <T >[] observers () {
@@ -133,11 +133,11 @@ void remove(BehaviorObserver<T> o) {
133
133
}
134
134
} while (true );
135
135
}
136
- BehaviorObserver <T >[] next (Notification < T > n ) {
136
+ BehaviorObserver <T >[] next (Object n ) {
137
137
set (n );
138
138
return observers .get ().observers ;
139
139
}
140
- BehaviorObserver <T >[] terminate (Notification < T > n ) {
140
+ BehaviorObserver <T >[] terminate (Object n ) {
141
141
set (n );
142
142
do {
143
143
BehaviorState oldState = observers .get ();
@@ -149,6 +149,9 @@ BehaviorObserver<T>[] terminate(Notification<T> n) {
149
149
}
150
150
} while (true );
151
151
}
152
+ boolean isActive () {
153
+ return !observers .get ().terminated ;
154
+ }
152
155
}
153
156
static final class BehaviorState {
154
157
final boolean terminated ;
@@ -228,6 +231,7 @@ public void call() {
228
231
229
232
230
233
private final State <T > state ;
234
+ private final NotificationLite <T > nl = NotificationLite .instance ();
231
235
232
236
protected BehaviorSubject (OnSubscribe <T > onSubscribe , State <T > state ) {
233
237
super (onSubscribe );
@@ -236,9 +240,9 @@ protected BehaviorSubject(OnSubscribe<T> onSubscribe, State<T> state) {
236
240
237
241
@ Override
238
242
public void onCompleted () {
239
- Notification < T > last = state .get ();
240
- if (last == null || last . isOnNext ()) {
241
- Notification < T > n = Notification .< T > createOnCompleted ();
243
+ Object last = state .get ();
244
+ if (last == null || state . isActive ()) {
245
+ Object n = nl . completed ();
242
246
for (BehaviorObserver <T > bo : state .terminate (n )) {
243
247
bo .emitNext (n );
244
248
}
@@ -247,9 +251,9 @@ public void onCompleted() {
247
251
248
252
@ Override
249
253
public void onError (Throwable e ) {
250
- Notification < T > last = state .get ();
251
- if (last == null || last . isOnNext ()) {
252
- Notification < T > n = Notification .< T > createOnError (e );
254
+ Object last = state .get ();
255
+ if (last == null || state . isActive ()) {
256
+ Object n = nl . error (e );
253
257
for (BehaviorObserver <T > bo : state .terminate (n )) {
254
258
bo .emitNext (n );
255
259
}
@@ -258,9 +262,9 @@ public void onError(Throwable e) {
258
262
259
263
@ Override
260
264
public void onNext (T v ) {
261
- Notification < T > last = state .get ();
262
- if (last == null || last . isOnNext ()) {
263
- Notification < T > n = Notification . createOnNext (v );
265
+ Object last = state .get ();
266
+ if (last == null || state . isActive ()) {
267
+ Object n = nl . next (v );
264
268
for (BehaviorObserver <T > bo : state .next (n )) {
265
269
bo .emitNext (n );
266
270
}
@@ -273,60 +277,52 @@ public void onNext(T v) {
273
277
274
278
private static final class BehaviorObserver <T > {
275
279
final Observer <? super T > actual ;
280
+ final NotificationLite <T > nl = NotificationLite .instance ();
276
281
/** Guarded by this. */
277
282
boolean first = true ;
278
283
/** Guarded by this. */
279
284
boolean emitting ;
280
285
/** Guarded by this. */
281
- List <Notification < T > > queue ;
286
+ List <Object > queue ;
282
287
/** Accessed only from serialized state. */
283
288
boolean done ;
284
- volatile boolean fastPath ;
289
+ /* volatile */ boolean fastPath ;
285
290
public BehaviorObserver (Observer <? super T > actual ) {
286
291
this .actual = actual ;
287
292
}
288
- void emitNext (Notification <T > n ) {
289
- if (fastPath ) {
290
- accept (n );
291
- return ;
292
- }
293
- List <Notification <T >> localQueue ;
294
- synchronized (this ) {
295
- first = false ;
296
- if (emitting ) {
297
- if (queue == null ) {
298
- queue = new ArrayList <Notification <T >>();
293
+ void emitNext (Object n ) {
294
+ if (!fastPath ) {
295
+ synchronized (this ) {
296
+ first = false ;
297
+ if (emitting ) {
298
+ if (queue == null ) {
299
+ queue = new ArrayList <Object >();
300
+ }
301
+ queue .add (n );
302
+ return ;
299
303
}
300
- queue .add (n );
301
- return ;
302
304
}
303
- emitting = true ;
304
- localQueue = queue ;
305
- queue = null ;
305
+ fastPath = true ;
306
306
}
307
- fastPath = true ;
308
- emitLoop (localQueue , n );
307
+ nl .accept (actual , n );
309
308
}
310
- void emitFirst (Notification <T > n ) {
311
- List <Notification <T >> localQueue ;
309
+ void emitFirst (Object n ) {
312
310
synchronized (this ) {
313
311
if (!first || emitting ) {
314
312
return ;
315
313
}
316
314
first = false ;
317
315
emitting = true ;
318
- localQueue = queue ;
319
- queue = null ;
320
316
}
321
- emitLoop (localQueue , n );
317
+ emitLoop (null , n );
322
318
}
323
- void emitLoop (List <Notification < T >> localQueue , Notification < T > current ) {
319
+ void emitLoop (List <Object > localQueue , Object current ) {
324
320
boolean once = true ;
325
321
boolean skipFinal = false ;
326
322
try {
327
323
do {
328
324
if (localQueue != null ) {
329
- for (Notification < T > n : localQueue ) {
325
+ for (Object n : localQueue ) {
330
326
accept (n );
331
327
}
332
328
}
@@ -352,12 +348,12 @@ void emitLoop(List<Notification<T>> localQueue, Notification<T> current) {
352
348
}
353
349
}
354
350
}
355
- void accept (Notification < T > n ) {
351
+ void accept (Object n ) {
356
352
if (n != null && !done ) {
357
- if (! n . isOnNext ( )) {
353
+ if (nl . isCompleted ( n ) || nl . isError ( n )) {
358
354
done = true ;
359
355
}
360
- n .accept (actual );
356
+ nl .accept (actual , n );
361
357
}
362
358
}
363
359
}
0 commit comments