1313
1414package io .reactivex .internal .operators .flowable ;
1515
16- import io .reactivex .plugins .RxJavaPlugins ;
17-
18- import java .util .ArrayList ;
19- import java .util .List ;
2016import java .util .concurrent .Callable ;
2117import java .util .concurrent .atomic .*;
2218
2521import io .reactivex .disposables .Disposable ;
2622import io .reactivex .exceptions .*;
2723import io .reactivex .functions .Function ;
24+ import io .reactivex .internal .functions .ObjectHelper ;
2825import io .reactivex .internal .fuseable .*;
2926import io .reactivex .internal .queue .*;
3027import io .reactivex .internal .subscriptions .SubscriptionHelper ;
31- import io .reactivex .internal .util .BackpressureHelper ;
28+ import io .reactivex .internal .util .*;
29+ import io .reactivex .plugins .RxJavaPlugins ;
3230
3331public final class FlowableFlatMap <T , U > extends AbstractFlowableWithUpstream <T , U > {
3432 final Function <? super T , ? extends Publisher <? extends U >> mapper ;
@@ -64,13 +62,11 @@ static final class MergeSubscriber<T, U> extends AtomicInteger implements Subscr
6462 final int maxConcurrency ;
6563 final int bufferSize ;
6664
67- volatile SimpleQueue <U > queue ;
65+ volatile SimplePlainQueue <U > queue ;
6866
6967 volatile boolean done ;
7068
71- final AtomicReference <SimpleQueue <Throwable >> errors = new AtomicReference <SimpleQueue <Throwable >>();
72-
73- static final SimpleQueue <Throwable > ERRORS_CLOSED = new RejectingQueue <Throwable >();
69+ final AtomicThrowable errs = new AtomicThrowable ();
7470
7571 volatile boolean cancelled ;
7672
@@ -126,7 +122,7 @@ public void onNext(T t) {
126122 }
127123 Publisher <? extends U > p ;
128124 try {
129- p = mapper .apply (t );
125+ p = ObjectHelper . requireNonNull ( mapper .apply (t ), "The mapper returned a null Publisher" );
130126 } catch (Throwable e ) {
131127 Exceptions .throwIfFatal (e );
132128 s .cancel ();
@@ -140,7 +136,7 @@ public void onNext(T t) {
140136 u = ((Callable <U >)p ).call ();
141137 } catch (Throwable ex ) {
142138 Exceptions .throwIfFatal (ex );
143- getErrorQueue (). offer (ex );
139+ errs . addThrowable (ex );
144140 drain ();
145141 return ;
146142 }
@@ -210,7 +206,7 @@ void removeInner(InnerSubscriber<T, U> inner) {
210206 }
211207
212208 SimpleQueue <U > getMainQueue () {
213- SimpleQueue <U > q = queue ;
209+ SimplePlainQueue <U > q = queue ;
214210 if (q == null ) {
215211 if (maxConcurrency == Integer .MAX_VALUE ) {
216212 q = new SpscLinkedArrayQueue <U >(bufferSize );
@@ -316,9 +312,12 @@ public void onError(Throwable t) {
316312 RxJavaPlugins .onError (t );
317313 return ;
318314 }
319- getErrorQueue ().offer (t );
320- done = true ;
321- drain ();
315+ if (errs .addThrowable (t )) {
316+ done = true ;
317+ drain ();
318+ } else {
319+ RxJavaPlugins .onError (t );
320+ }
322321 }
323322
324323 @ Override
@@ -343,9 +342,13 @@ public void request(long n) {
343342 public void cancel () {
344343 if (!cancelled ) {
345344 cancelled = true ;
345+ s .cancel ();
346+ disposeAll ();
346347 if (getAndIncrement () == 0 ) {
347- s .cancel ();
348- disposeAll ();
348+ SimpleQueue <U > q = queue ;
349+ if (q != null ) {
350+ q .clear ();
351+ }
349352 }
350353 }
351354 }
@@ -363,7 +366,7 @@ void drainLoop() {
363366 if (checkTerminate ()) {
364367 return ;
365368 }
366- SimpleQueue <U > svq = queue ;
369+ SimplePlainQueue <U > svq = queue ;
367370
368371 long r = requested .get ();
369372 boolean unbounded = r == Long .MAX_VALUE ;
@@ -375,12 +378,8 @@ void drainLoop() {
375378 long scalarEmission = 0 ;
376379 U o = null ;
377380 while (r != 0L ) {
378- try {
379- o = svq .poll ();
380- } catch (Throwable ex ) {
381- Exceptions .throwIfFatal (ex );
382- getErrorQueue ().offer (ex );
383- }
381+ o = svq .poll ();
382+
384383 if (checkTerminate ()) {
385384 return ;
386385 }
@@ -413,11 +412,11 @@ void drainLoop() {
413412 int n = inner .length ;
414413
415414 if (d && (svq == null || svq .isEmpty ()) && n == 0 ) {
416- SimpleQueue < Throwable > e = errors . get ();
417- if (e == null || e . isEmpty () ) {
415+ Throwable ex = errs . terminate ();
416+ if (ex == null ) {
418417 child .onComplete ();
419418 } else {
420- reportError ( e );
419+ child . onError ( ex );
421420 }
422421 return ;
423422 }
@@ -447,6 +446,7 @@ void drainLoop() {
447446 }
448447
449448 int j = index ;
449+ sourceLoop :
450450 for (int i = 0 ; i < n ; i ++) {
451451 if (checkTerminate ()) {
452452 return ;
@@ -456,33 +456,40 @@ void drainLoop() {
456456
457457 U o = null ;
458458 for (;;) {
459+ if (checkTerminate ()) {
460+ return ;
461+ }
462+ SimpleQueue <U > q = is .queue ;
463+ if (q == null ) {
464+ break ;
465+ }
459466 long produced = 0 ;
460467 while (r != 0L ) {
461- if (checkTerminate ()) {
462- return ;
463- }
464- SimpleQueue <U > q = is .queue ;
465- if (q == null ) {
466- break ;
467- }
468468
469469 try {
470470 o = q .poll ();
471471 } catch (Throwable ex ) {
472472 Exceptions .throwIfFatal (ex );
473-
474- s .cancel ();
475- disposeAll ();
476-
477- child .onError (ex );
478- return ;
473+ is .dispose ();
474+ errs .addThrowable (ex );
475+ if (checkTerminate ()) {
476+ return ;
477+ }
478+ removeInner (is );
479+ innerCompleted = true ;
480+ i ++;
481+ continue sourceLoop ;
479482 }
480483 if (o == null ) {
481484 break ;
482485 }
483486
484487 child .onNext (o );
485488
489+ if (checkTerminate ()) {
490+ return ;
491+ }
492+
486493 r --;
487494 produced ++;
488495 }
@@ -536,86 +543,31 @@ void drainLoop() {
536543
537544 boolean checkTerminate () {
538545 if (cancelled ) {
539- s .cancel ();
540- disposeAll ();
546+ SimpleQueue <U > q = queue ;
547+ if (q != null ) {
548+ q .clear ();
549+ }
541550 return true ;
542551 }
543- SimpleQueue <Throwable > e = errors .get ();
544- if (!delayErrors && (e != null && !e .isEmpty ())) {
545- try {
546- reportError (e );
547- } finally {
548- disposeAll ();
549- }
552+ if (!delayErrors && errs .get () != null ) {
553+ actual .onError (errs .terminate ());
550554 return true ;
551555 }
552556 return false ;
553557 }
554558
555- void reportError (SimpleQueue <Throwable > q ) {
556- List <Throwable > composite = null ;
557- Throwable ex = null ;
558-
559- for (;;) {
560- Throwable t ;
561- try {
562- t = q .poll ();
563- } catch (Throwable exc ) {
564- Exceptions .throwIfFatal (exc );
565- if (ex == null ) {
566- ex = exc ;
567- } else {
568- if (composite == null ) {
569- composite = new ArrayList <Throwable >();
570- composite .add (ex );
571- }
572- composite .add (exc );
573- }
574- break ;
575- }
576-
577- if (t == null ) {
578- break ;
579- }
580- if (ex == null ) {
581- ex = t ;
582- } else {
583- if (composite == null ) {
584- composite = new ArrayList <Throwable >();
585- composite .add (ex );
586- }
587- composite .add (t );
588- }
589- }
590- if (composite != null ) {
591- actual .onError (new CompositeException (composite ));
592- } else {
593- actual .onError (ex );
594- }
595- }
596-
597559 void disposeAll () {
598560 InnerSubscriber <?, ?>[] a = subscribers .get ();
599561 if (a != CANCELLED ) {
600562 a = subscribers .getAndSet (CANCELLED );
601563 if (a != CANCELLED ) {
602- errors .getAndSet (ERRORS_CLOSED );
603564 for (InnerSubscriber <?, ?> inner : a ) {
604565 inner .dispose ();
605566 }
606- }
607- }
608- }
609-
610- SimpleQueue <Throwable > getErrorQueue () {
611- for (;;) {
612- SimpleQueue <Throwable > q = errors .get ();
613- if (q != null ) {
614- return q ;
615- }
616- q = new MpscLinkedQueue <Throwable >();
617- if (errors .compareAndSet (null , q )) {
618- return q ;
567+ Throwable ex = errs .terminate ();
568+ if (ex != null && ex != ExceptionHelper .TERMINATED ) {
569+ RxJavaPlugins .onError (ex );
570+ }
619571 }
620572 }
621573 }
@@ -676,9 +628,12 @@ public void onNext(U t) {
676628 }
677629 @ Override
678630 public void onError (Throwable t ) {
679- parent .getErrorQueue ().offer (t );
680- done = true ;
681- parent .drain ();
631+ if (parent .errs .addThrowable (t )) {
632+ done = true ;
633+ parent .drain ();
634+ } else {
635+ RxJavaPlugins .onError (t );
636+ }
682637 }
683638 @ Override
684639 public void onComplete () {
@@ -708,31 +663,4 @@ public boolean isDisposed() {
708663 return get () == SubscriptionHelper .CANCELLED ;
709664 }
710665 }
711-
712- static final class RejectingQueue <T > implements SimpleQueue <T > {
713- @ Override
714- public boolean offer (T e ) {
715- return false ;
716- }
717-
718- @ Override
719- public boolean offer (T v1 , T v2 ) {
720- return false ;
721- }
722-
723- @ Override
724- public T poll () {
725- return null ;
726- }
727-
728- @ Override
729- public void clear () {
730-
731- }
732-
733- @ Override
734- public boolean isEmpty () {
735- return true ;
736- }
737- }
738666}
0 commit comments