@@ -259,8 +259,9 @@ private void completeEvictions() {
259259 int count = 0 ;
260260 GroupedUnicast <K , V > evictedGroup ;
261261 while ((evictedGroup = evictedGroups .poll ()) != null ) {
262- evictedGroup .onComplete ();
263- count ++;
262+ if (evictedGroup .state .tryComplete ()) {
263+ count ++;
264+ }
264265 }
265266 if (count != 0 ) {
266267 groupCount .addAndGet (-count );
@@ -383,6 +384,8 @@ static final class State<T, K> extends BasicIntQueueSubscription<T> implements P
383384 static final int ABANDONED = 2 ;
384385 static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER ;
385386
387+ final AtomicBoolean evictOnce = new AtomicBoolean ();
388+
386389 State (int bufferSize , GroupBySubscriber <?, K , T > parent , K key , boolean delayError ) {
387390 this .queue = new SpscLinkedArrayQueue <>(bufferSize );
388391 this .parent = parent ;
@@ -444,9 +447,18 @@ public void onComplete() {
444447 drain ();
445448 }
446449
450+ boolean tryComplete () {
451+ boolean canEvict = evictOnce .compareAndSet (false , true );
452+ done = true ;
453+ drain ();
454+ return canEvict ;
455+ }
456+
447457 void cancelParent () {
448458 if ((once .get () & ABANDONED ) == 0 ) {
449- parent .cancel (key );
459+ if (evictOnce .compareAndSet (false , true )) {
460+ parent .cancel (key );
461+ }
450462 }
451463 }
452464
@@ -518,37 +530,44 @@ void drainNormal() {
518530 final SpscLinkedArrayQueue <T > q = queue ;
519531 final boolean delayError = this .delayError ;
520532 Subscriber <? super T > a = actual .get ();
533+ final AtomicBoolean cancelled = this .cancelled ;
534+
535+ outer :
521536 for (;;) {
522- if (a != null ) {
523- long r = requested .get ();
524- long e = 0 ;
537+ if (cancelled .get ()) {
538+ cleanupQueue (0 , false );
539+ } else {
540+ if (a != null ) {
541+ long r = requested .get ();
542+ long e = 0 ;
525543
526- while (e != r ) {
527- boolean d = done ;
528- T v = q .poll ();
529- boolean empty = v == null ;
544+ while (e != r ) {
545+ boolean d = done ;
546+ T v = q .poll ();
547+ boolean empty = v == null ;
530548
531- if (checkTerminated (d , empty , a , delayError , e )) {
532- return ;
533- }
549+ if (checkTerminated (d , empty , a , delayError , e , ! empty )) {
550+ continue outer ;
551+ }
534552
535- if (empty ) {
536- break ;
537- }
553+ if (empty ) {
554+ break ;
555+ }
538556
539- a .onNext (v );
557+ a .onNext (v );
540558
541- e ++;
542- }
559+ e ++;
560+ }
543561
544- if (e == r && checkTerminated (done , q .isEmpty (), a , delayError , e )) {
545- return ;
546- }
562+ if (e == r && checkTerminated (done , q .isEmpty (), a , delayError , e , false )) {
563+ continue outer ;
564+ }
547565
548- if (e != 0L ) {
549- BackpressureHelper .produced (requested , e );
550- // replenish based on this batch run
551- requestParent (e );
566+ if (e != 0L ) {
567+ BackpressureHelper .produced (requested , e );
568+ // replenish based on this batch run
569+ requestParent (e );
570+ }
552571 }
553572 }
554573
@@ -568,23 +587,32 @@ void requestParent(long e) {
568587 }
569588 }
570589
571- boolean checkTerminated (boolean d , boolean empty , Subscriber <? super T > a , boolean delayError , long emitted ) {
590+ void cleanupQueue (long emitted , boolean polled ) {
591+ // if this group is canceled, all accumulated emissions and
592+ // remaining items in the queue should be requested
593+ // so that other groups can proceed
594+ while (queue .poll () != null ) {
595+ emitted ++;
596+ }
597+ if (polled ) {
598+ emitted ++;
599+ }
600+ if (emitted != 0L ) {
601+ requestParent (emitted );
602+ }
603+ }
604+
605+ boolean checkTerminated (boolean d , boolean empty , Subscriber <? super T > a ,
606+ boolean delayError , long emitted , boolean polled ) {
572607 if (cancelled .get ()) {
573- // if this group is canceled, all accumulated emissions and
574- // remaining items in the queue should be requested
575- // so that other groups can proceed
576- while (queue .poll () != null ) {
577- emitted ++;
578- }
579- if (emitted != 0L ) {
580- requestParent (emitted );
581- }
608+ cleanupQueue (emitted , polled );
582609 return true ;
583610 }
584611
585612 if (d ) {
586613 if (delayError ) {
587614 if (empty ) {
615+ cancelled .lazySet (true );
588616 Throwable e = error ;
589617 if (e != null ) {
590618 a .onError (e );
@@ -597,10 +625,12 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boole
597625 Throwable e = error ;
598626 if (e != null ) {
599627 queue .clear ();
628+ cancelled .lazySet (true );
600629 a .onError (e );
601630 return true ;
602631 } else
603632 if (empty ) {
633+ cancelled .lazySet (true );
604634 a .onComplete ();
605635 return true ;
606636 }
0 commit comments