5757import java .util .concurrent .TimeUnit ;
5858import java .util .concurrent .TimeoutException ;
5959import java .util .concurrent .atomic .AtomicBoolean ;
60+ import java .util .concurrent .atomic .AtomicInteger ;
6061import java .util .concurrent .atomic .AtomicReference ;
6162import java .util .function .Consumer ;
6263import java .util .function .Function ;
@@ -430,6 +431,8 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter {
430431 private final AtomicReference <CountDownLatch > writableLatch =
431432 new AtomicReference <>(new CountDownLatch (1 ));
432433 private final AtomicBoolean shutdownDispatched = new AtomicBoolean (false );
434+ private static final AtomicInteger SEQUENCE = new AtomicInteger (0 );
435+ private final String id ;
433436
434437 private AmqpHandler (
435438 int maxPayloadSize ,
@@ -438,6 +441,7 @@ private AmqpHandler(
438441 this .maxPayloadSize = maxPayloadSize ;
439442 this .closeSequence = closeSequence ;
440443 this .willRecover = willRecover ;
444+ this .id = "amqp-handler-" + SEQUENCE .getAndIncrement ();
441445 }
442446
443447 @ Override
@@ -501,8 +505,8 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
501505 public void channelInactive (ChannelHandlerContext ctx ) {
502506 if (needToDispatchIoError ()) {
503507 AMQConnection c = this .connection ;
508+ LOGGER .debug ("Dispatching shutdown when channel became inactive ({})" , this .id );
504509 if (c .isOpen ()) {
505- LOGGER .debug ("Dispatching shutdown when channel became inactive" );
506510 // it is likely to be an IO exception
507511 this .dispatchShutdownToConnection (() -> c .handleIoError (null ));
508512 } else {
@@ -566,7 +570,7 @@ private CountDownLatch writableLatch() {
566570
567571 protected void dispatchShutdownToConnection (Runnable connectionShutdownRunnable ) {
568572 if (this .shutdownDispatched .compareAndSet (false , true )) {
569- String name = "rabbitmq-connection-shutdown" ;
573+ String name = "rabbitmq-connection-shutdown-" + this . id ;
570574 AMQConnection c = this .connection ;
571575 if (c == null || ch == null ) {
572576 // not enough information, we dispatch in separate thread
@@ -576,7 +580,7 @@ protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable)
576580 if (this .willRecover .test (c .getCloseReason ()) || ch .eventLoop ().isShuttingDown ()) {
577581 // the connection will recover, we don't want this to happen in the event loop,
578582 // it could cause a deadlock, so using a separate thread
579- name = name + "-" + c ;
583+ // name = name + "-" + c;
580584 Environment .newThread (connectionShutdownRunnable , name ).start ();
581585 } else {
582586 // no recovery, it is safe to dispatch in the event loop
0 commit comments