17
17
18
18
import java .util .Queue ;
19
19
import java .util .concurrent .ConcurrentLinkedQueue ;
20
- import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
21
- import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
20
+ import java .util .concurrent .atomic .*;
22
21
23
- import rx .Observable ;
22
+ import rx .* ;
24
23
import rx .Observable .Operator ;
25
- import rx .Producer ;
26
- import rx .Subscriber ;
27
- import rx .exceptions .CompositeException ;
28
- import rx .exceptions .MissingBackpressureException ;
29
- import rx .exceptions .OnErrorThrowable ;
24
+ import rx .exceptions .*;
30
25
import rx .functions .Func1 ;
31
- import rx .internal .util .RxRingBuffer ;
32
- import rx .internal .util .ScalarSynchronousObservable ;
33
- import rx .internal .util .SubscriptionIndexedRingBuffer ;
26
+ import rx .internal .util .*;
34
27
35
28
/**
36
29
* Flattens a list of {@link Observable}s into one {@code Observable}, without any transformation.
@@ -135,7 +128,7 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
135
128
136
129
private volatile SubscriptionIndexedRingBuffer <InnerSubscriber <T >> childrenSubscribers ;
137
130
138
- private RxRingBuffer scalarValueQueue = null ;
131
+ private volatile RxRingBuffer scalarValueQueue = null ;
139
132
140
133
/* protected by lock on MergeSubscriber instance */
141
134
private int missedEmitting = 0 ;
@@ -266,9 +259,8 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchro
266
259
request (1 );
267
260
return ;
268
261
} else {
269
- initScalarValueQueueIfNeeded ();
270
262
try {
271
- scalarValueQueue .onNext (value );
263
+ getOrCreateScalarValueQueue () .onNext (value );
272
264
} catch (MissingBackpressureException e ) {
273
265
onError (e );
274
266
}
@@ -306,19 +298,20 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
306
298
307
299
// if we didn't return above we need to enqueue
308
300
// enqueue the values for later delivery
309
- initScalarValueQueueIfNeeded ();
310
301
try {
311
- scalarValueQueue .onNext (t .get ());
302
+ getOrCreateScalarValueQueue () .onNext (t .get ());
312
303
} catch (MissingBackpressureException e ) {
313
304
onError (e );
314
305
}
315
306
}
316
307
317
- private void initScalarValueQueueIfNeeded () {
318
- if (scalarValueQueue == null ) {
319
- scalarValueQueue = RxRingBuffer .getSpmcInstance ();
320
- add (scalarValueQueue );
308
+ private RxRingBuffer getOrCreateScalarValueQueue () {
309
+ RxRingBuffer svq = scalarValueQueue ;
310
+ if (svq == null ) {
311
+ svq = RxRingBuffer .getSpmcInstance ();
312
+ scalarValueQueue = svq ;
321
313
}
314
+ return svq ;
322
315
}
323
316
324
317
private synchronized boolean releaseEmitLock () {
@@ -381,21 +374,22 @@ private void drainChildrenQueues() {
381
374
* ONLY call when holding the EmitLock.
382
375
*/
383
376
private int drainScalarValueQueue () {
384
- if (scalarValueQueue != null ) {
377
+ RxRingBuffer svq = scalarValueQueue ;
378
+ if (svq != null ) {
385
379
long r = mergeProducer .requested ;
386
380
int emittedWhileDraining = 0 ;
387
381
if (r < 0 ) {
388
382
// drain it all
389
383
Object o = null ;
390
- while ((o = scalarValueQueue .poll ()) != null ) {
384
+ while ((o = svq .poll ()) != null ) {
391
385
on .accept (actual , o );
392
386
emittedWhileDraining ++;
393
387
}
394
388
} else if (r > 0 ) {
395
389
// drain what was requested
396
390
long toEmit = r ;
397
391
for (int i = 0 ; i < toEmit ; i ++) {
398
- Object o = scalarValueQueue .poll ();
392
+ Object o = svq .poll ();
399
393
if (o == null ) {
400
394
break ;
401
395
} else {
@@ -469,7 +463,7 @@ public void onCompleted() {
469
463
boolean c = false ;
470
464
synchronized (this ) {
471
465
completed = true ;
472
- if (wip == 0 && ( scalarValueQueue == null || scalarValueQueue . isEmpty ()) ) {
466
+ if (wip == 0 ) {
473
467
c = true ;
474
468
}
475
469
}
@@ -494,25 +488,38 @@ void completeInner(InnerSubscriber<T> s) {
494
488
}
495
489
496
490
private void drainAndComplete () {
497
- drainQueuesIfNeeded (); // TODO need to confirm whether this is needed or not
498
- if (delayErrors ) {
499
- Queue <Throwable > es = null ;
491
+ boolean moreToDrain = true ;
492
+ while (moreToDrain ) {
500
493
synchronized (this ) {
501
- es = exceptions ;
494
+ missedEmitting = 0 ;
502
495
}
503
- if (es != null ) {
504
- if (es .isEmpty ()) {
505
- actual .onCompleted ();
506
- } else if (es .size () == 1 ) {
507
- actual .onError (es .poll ());
496
+ drainScalarValueQueue ();
497
+ drainChildrenQueues ();
498
+ synchronized (this ) {
499
+ moreToDrain = missedEmitting > 0 ;
500
+ }
501
+ }
502
+ RxRingBuffer svq = scalarValueQueue ;
503
+ if (svq == null || svq .isEmpty ()) {
504
+ if (delayErrors ) {
505
+ Queue <Throwable > es = null ;
506
+ synchronized (this ) {
507
+ es = exceptions ;
508
+ }
509
+ if (es != null ) {
510
+ if (es .isEmpty ()) {
511
+ actual .onCompleted ();
512
+ } else if (es .size () == 1 ) {
513
+ actual .onError (es .poll ());
514
+ } else {
515
+ actual .onError (new CompositeException (es ));
516
+ }
508
517
} else {
509
- actual .onError ( new CompositeException ( es ) );
518
+ actual .onCompleted ( );
510
519
}
511
520
} else {
512
521
actual .onCompleted ();
513
522
}
514
- } else {
515
- actual .onCompleted ();
516
523
}
517
524
}
518
525
0 commit comments