@@ -559,46 +559,46 @@ public void cancel() {
559559
560560 @ Override
561561 public void onSubscribe (Subscription subscription ) {
562- channel .addConfirmListener (new ConfirmListener () {
562+ if (Operators .validate (this .subscription , subscription )) {
563+ channel .addConfirmListener (new ConfirmListener () {
563564
564- @ Override
565- public void handleAck (long deliveryTag , boolean multiple ) {
566- handleAckNack (deliveryTag , multiple , true );
567- }
565+ @ Override
566+ public void handleAck (long deliveryTag , boolean multiple ) {
567+ handleAckNack (deliveryTag , multiple , true );
568+ }
568569
569- @ Override
570- public void handleNack (long deliveryTag , boolean multiple ) {
571- handleAckNack (deliveryTag , multiple , false );
572- }
570+ @ Override
571+ public void handleNack (long deliveryTag , boolean multiple ) {
572+ handleAckNack (deliveryTag , multiple , false );
573+ }
573574
574- private void handleAckNack (long deliveryTag , boolean multiple , boolean ack ) {
575- if (multiple ) {
576- try {
577- ConcurrentNavigableMap <Long , OutboundMessage > unconfirmedToSend = unconfirmed .headMap (deliveryTag , true );
578- Iterator <Map .Entry <Long , OutboundMessage >> iterator = unconfirmedToSend .entrySet ().iterator ();
579- while (iterator .hasNext ()) {
580- subscriber .onNext (new OutboundMessageResult (iterator .next ().getValue (), ack ));
581- iterator .remove ();
575+ private void handleAckNack (long deliveryTag , boolean multiple , boolean ack ) {
576+ if (multiple ) {
577+ try {
578+ ConcurrentNavigableMap <Long , OutboundMessage > unconfirmedToSend = unconfirmed .headMap (deliveryTag , true );
579+ Iterator <Map .Entry <Long , OutboundMessage >> iterator = unconfirmedToSend .entrySet ().iterator ();
580+ while (iterator .hasNext ()) {
581+ subscriber .onNext (new OutboundMessageResult (iterator .next ().getValue (), ack ));
582+ iterator .remove ();
583+ }
584+ } catch (Exception e ) {
585+ handleError (e , null );
586+ }
587+ } else {
588+ OutboundMessage outboundMessage = unconfirmed .get (deliveryTag );
589+ try {
590+ unconfirmed .remove (deliveryTag );
591+ subscriber .onNext (new OutboundMessageResult (outboundMessage , ack ));
592+ } catch (Exception e ) {
593+ handleError (e , new OutboundMessageResult (outboundMessage , ack ));
582594 }
583- } catch (Exception e ) {
584- handleError (e , null );
585595 }
586- } else {
587- OutboundMessage outboundMessage = unconfirmed .get (deliveryTag );
588- try {
589- unconfirmed .remove (deliveryTag );
590- subscriber .onNext (new OutboundMessageResult (outboundMessage , ack ));
591- } catch (Exception e ) {
592- handleError (e , new OutboundMessageResult (outboundMessage , ack ));
596+ if (unconfirmed .size () == 0 ) {
597+ maybeComplete ();
593598 }
594599 }
595- if (unconfirmed .size () == 0 ) {
596- maybeComplete ();
597- }
598- }
599- });
600- state .set (SubscriberState .ACTIVE );
601- if (Operators .validate (this .subscription , subscription )) {
600+ });
601+ state .set (SubscriberState .ACTIVE );
602602 this .subscription = subscription ;
603603 subscriber .onSubscribe (this );
604604 }
0 commit comments