@@ -523,9 +523,13 @@ public boolean processControlCommand(Command c)
523523 return false ;
524524 } else {
525525 // Quiescing.
526- if (method instanceof AMQP .Connection .CloseOk ) {
527- // It's our final "RPC".
528- return false ;
526+ if (method instanceof AMQP .Connection .CloseOk ) {
527+ // It's our final "RPC". Time to shut down.
528+ _running = false ;
529+ // If this was called from within the MainLoop we
530+ // may not have a continuation to return to, so we
531+ // treat this as processed in that case.
532+ return _channel0 ._activeRpc == null ;
529533 } else {
530534 // Ignore all others.
531535 return true ;
@@ -680,14 +684,21 @@ public void close(int closeCode,
680684 boolean abort )
681685 throws IOException
682686 {
687+ final boolean sync = !(Thread .currentThread () instanceof MainLoop );
688+
683689 try {
684690 AMQImpl .Connection .Close reason =
685691 new AMQImpl .Connection .Close (closeCode , closeMessage , 0 , 0 );
692+
686693 shutdown (reason , initiatedByApplication , cause , true );
687- AMQChannel .SimpleBlockingRpcContinuation k =
688- new AMQChannel .SimpleBlockingRpcContinuation ();
689- _channel0 .quiescingRpc (reason , k );
690- k .getReply (timeout );
694+ if (sync ){
695+ AMQChannel .SimpleBlockingRpcContinuation k =
696+ new AMQChannel .SimpleBlockingRpcContinuation ();
697+ _channel0 .quiescingRpc (reason , k );
698+ k .getReply (timeout );
699+ } else {
700+ _channel0 .quiescingTransmit (reason );
701+ }
691702 } catch (TimeoutException tte ) {
692703 if (!abort )
693704 throw new ShutdownSignalException (true , true , tte , this );
@@ -698,7 +709,7 @@ public void close(int closeCode,
698709 if (!abort )
699710 throw ioe ;
700711 } finally {
701- _frameHandler .close ();
712+ if ( sync ) _frameHandler .close ();
702713 }
703714 }
704715
0 commit comments