@@ -547,7 +547,8 @@ private Semaphore obtainPermit(ChannelCachingConnectionProxy connection) {
547
547
return permits ;
548
548
}
549
549
550
- private ChannelProxy findOpenChannel (LinkedList <ChannelProxy > channelList , ChannelProxy channel ) {
550
+ private ChannelProxy findOpenChannel (LinkedList <ChannelProxy > channelList , // NOSONAR LinkedList.removeFirst()
551
+ ChannelProxy channel ) {
551
552
synchronized (channelList ) {
552
553
while (!channelList .isEmpty ()) {
553
554
channel = channelList .removeFirst ();
@@ -558,38 +559,42 @@ private ChannelProxy findOpenChannel(LinkedList<ChannelProxy> channelList, Chann
558
559
break ;
559
560
}
560
561
else {
561
- try {
562
- Channel target = channel .getTargetChannel ();
563
- if (target != null ) {
564
- target .close ();
565
- /*
566
- * To remove it from auto-recovery if so configured,
567
- * and nack any pending confirms if PublisherCallbackChannel.
568
- */
569
- }
570
- }
571
- catch (AlreadyClosedException e ) {
572
- if (logger .isTraceEnabled ()) {
573
- logger .trace (channel + " is already closed" );
574
- }
575
- }
576
- catch (IOException e ) {
577
- if (logger .isDebugEnabled ()) {
578
- logger .debug ("Unexpected Exception closing channel " + e .getMessage ());
579
- }
580
- }
581
- catch (TimeoutException e ) {
582
- if (logger .isWarnEnabled ()) {
583
- logger .warn ("TimeoutException closing channel " + e .getMessage ());
584
- }
585
- }
562
+ cleanUpClosedChannel (channel );
586
563
channel = null ;
587
564
}
588
565
}
589
566
}
590
567
return channel ;
591
568
}
592
569
570
+ private void cleanUpClosedChannel (ChannelProxy channel ) {
571
+ try {
572
+ Channel target = channel .getTargetChannel ();
573
+ if (target != null ) {
574
+ target .close ();
575
+ /*
576
+ * To remove it from auto-recovery if so configured,
577
+ * and nack any pending confirms if PublisherCallbackChannel.
578
+ */
579
+ }
580
+ }
581
+ catch (AlreadyClosedException e ) {
582
+ if (logger .isTraceEnabled ()) {
583
+ logger .trace (channel + " is already closed" );
584
+ }
585
+ }
586
+ catch (IOException e ) {
587
+ if (logger .isDebugEnabled ()) {
588
+ logger .debug ("Unexpected Exception closing channel " + e .getMessage ());
589
+ }
590
+ }
591
+ catch (TimeoutException e ) {
592
+ if (logger .isWarnEnabled ()) {
593
+ logger .warn ("TimeoutException closing channel " + e .getMessage ());
594
+ }
595
+ }
596
+ }
597
+
593
598
private ChannelProxy getCachedChannelProxy (ChannelCachingConnectionProxy connection ,
594
599
LinkedList <ChannelProxy > channelList , boolean transactional ) { //NOSONAR LinkedList for addLast()
595
600
@@ -689,17 +694,8 @@ else if (this.cacheMode == CacheMode.CONNECTION) {
689
694
private Connection connectionFromCache () {
690
695
ChannelCachingConnectionProxy cachedConnection = findIdleConnection ();
691
696
long now = System .currentTimeMillis ();
692
- while (cachedConnection == null && System .currentTimeMillis () - now < this .channelCheckoutTimeout ) {
693
- if (countOpenConnections () >= this .connectionLimit ) {
694
- try {
695
- this .connectionMonitor .wait (this .channelCheckoutTimeout );
696
- cachedConnection = findIdleConnection ();
697
- }
698
- catch (InterruptedException e ) {
699
- Thread .currentThread ().interrupt ();
700
- throw new AmqpException ("Interrupted while waiting for a connection" , e );
701
- }
702
- }
697
+ if (cachedConnection == null ) {
698
+ cachedConnection = waitForConnection (cachedConnection , now );
703
699
}
704
700
if (cachedConnection == null ) {
705
701
if (countOpenConnections () >= this .connectionLimit
@@ -737,6 +733,22 @@ else if (!cachedConnection.isOpen()) {
737
733
return cachedConnection ;
738
734
}
739
735
736
+ private ChannelCachingConnectionProxy waitForConnection (ChannelCachingConnectionProxy cachedConnection , long now ) {
737
+ while (cachedConnection == null && System .currentTimeMillis () - now < this .channelCheckoutTimeout ) {
738
+ if (countOpenConnections () >= this .connectionLimit ) {
739
+ try {
740
+ this .connectionMonitor .wait (this .channelCheckoutTimeout );
741
+ cachedConnection = findIdleConnection ();
742
+ }
743
+ catch (InterruptedException e ) {
744
+ Thread .currentThread ().interrupt ();
745
+ throw new AmqpException ("Interrupted while waiting for a connection" , e );
746
+ }
747
+ }
748
+ }
749
+ return cachedConnection ;
750
+ }
751
+
740
752
/*
741
753
* Iterate over the idle connections looking for an open one. If there are no idle,
742
754
* return null, if there are no open idle, return the first closed idle so it can
0 commit comments