@@ -759,21 +759,43 @@ private void close(Throwable cause) {
759759 for (AmqpConsumer consumer : this .consumers ) {
760760 consumer .close (cause );
761761 }
762+ boolean locked = false ;
762763 try {
763- this .dispatchingExecutorService . shutdownNow ( );
764- } catch ( Exception e ) {
765- LOGGER .info (
766- "Error while shutting down dispatching executor service for connection '{}': {}" ,
767- this . name (),
768- e . getMessage () );
764+ locked = this .instanceLock . tryLock ( 1 , TimeUnit . SECONDS );
765+ if (! locked ) {
766+ LOGGER .info ("Could not acquire connection lock during closing" );
767+ }
768+ } catch ( InterruptedException e ) {
769+ LOGGER . info ( "Interrupted while waiting for connection lock" );
769770 }
770771 try {
771- org .apache .qpid .protonj2 .client .Connection nc = this .nativeConnection ;
772- if (nc != null ) {
773- nc .close ();
772+ ExecutorService es = this .dispatchingExecutorService ;
773+ if (es != null ) {
774+ try {
775+ es .shutdownNow ();
776+ } catch (Exception e ) {
777+ LOGGER .info (
778+ "Error while shutting down dispatching executor service for connection '{}': {}" ,
779+ this .name (),
780+ e .getMessage ());
781+ }
782+ }
783+ try {
784+ org .apache .qpid .protonj2 .client .Connection nc = this .nativeConnection ;
785+ if (nc != null ) {
786+ nc .close ();
787+ }
788+ } catch (Exception e ) {
789+ LOGGER .warn ("Error while closing native connection" , e );
790+ }
791+ } finally {
792+ if (locked ) {
793+ try {
794+ this .instanceLock .unlock ();
795+ } catch (Exception e ) {
796+ LOGGER .debug ("Error while releasing connection lock: {}" , e .getMessage ());
797+ }
774798 }
775- } catch (Exception e ) {
776- LOGGER .warn ("Error while closing native connection" , e );
777799 }
778800 this .state (CLOSED , cause );
779801 this .environment .metricsCollector ().closeConnection ();
0 commit comments