4242import java .util .Collections ;
4343import java .util .HashMap ;
4444import java .util .Map ;
45- import java .util .concurrent .atomic .AtomicLong ;
4645import java .util .concurrent .TimeoutException ;
4746
4847
@@ -231,27 +230,16 @@ public void releaseChannelNumber() {
231230 // incoming commands except for a close and close-ok.
232231
233232 Method method = command .getMethod ();
233+ // we deal with channel.close in the same way, regardless
234+ if (method instanceof Channel .Close ) {
235+ asyncShutdown (command );
236+ return true ;
237+ }
234238
235239 if (isOpen ()) {
236240 // We're in normal running mode.
237241
238- if (method instanceof Channel .Close ) {
239- releaseChannelNumber ();
240- ShutdownSignalException signal = new ShutdownSignalException (false ,
241- false ,
242- command ,
243- this );
244- synchronized (_channelMutex ) {
245- try {
246- processShutdownSignal (signal , true , false );
247- quiescingTransmit (new Channel .CloseOk ());
248- } finally {
249- notifyOutstandingRpc (signal );
250- }
251- }
252- notifyListeners ();
253- return true ;
254- } else if (method instanceof Basic .Deliver ) {
242+ if (method instanceof Basic .Deliver ) {
255243 Basic .Deliver m = (Basic .Deliver ) method ;
256244
257245 Consumer callback = _consumers .get (m .consumerTag );
@@ -355,15 +343,9 @@ public void releaseChannelNumber() {
355343 return false ;
356344 }
357345 } else {
358- // We're in quiescing mode.
346+ // We're in quiescing mode == !isOpen()
359347
360- if (method instanceof Channel .Close ) {
361- // We're already shutting down, so just send back an ok.
362- synchronized (_channelMutex ) {
363- quiescingTransmit (new Channel .CloseOk ());
364- }
365- return true ;
366- } else if (method instanceof Channel .CloseOk ) {
348+ if (method instanceof Channel .CloseOk ) {
367349 // We're quiescing, and we see a channel.close-ok:
368350 // this is our signal to leave quiescing mode and
369351 // finally shut down for good. Let it be handled as an
@@ -378,6 +360,23 @@ public void releaseChannelNumber() {
378360 }
379361 }
380362
363+ private void asyncShutdown (Command command ) throws IOException {
364+ releaseChannelNumber ();
365+ ShutdownSignalException signal = new ShutdownSignalException (false ,
366+ false ,
367+ command ,
368+ this );
369+ synchronized (_channelMutex ) {
370+ try {
371+ processShutdownSignal (signal , true , false );
372+ quiescingTransmit (new Channel .CloseOk ());
373+ } finally {
374+ notifyOutstandingRpc (signal );
375+ }
376+ }
377+ notifyListeners ();
378+ }
379+
381380 /** Public API - {@inheritDoc} */
382381 public void close ()
383382 throws IOException
@@ -419,7 +418,7 @@ public void close(int closeCode,
419418 throws IOException
420419 {
421420 // First, notify all our dependents that we are shutting down.
422- // This clears _isOpen , so no further work from the
421+ // This clears isOpen() , so no further work from the
423422 // application side will be accepted, and any inbound commands
424423 // will be discarded (unless they're channel.close-oks).
425424 Channel .Close reason = new Channel .Close (closeCode , closeMessage , 0 , 0 );
@@ -442,8 +441,8 @@ public void close(int closeCode,
442441 }
443442
444443 // Now that we're in quiescing state, channel.close was sent and
445- // we wait for the reply. We ignore the result. (It's always
446- // close-ok.)
444+ // we wait for the reply. We ignore the result.
445+ // (It's NOT always close-ok.)
447446 notify = true ;
448447 k .getReply (-1 );
449448 } catch (TimeoutException ise ) {
@@ -891,4 +890,5 @@ public Channel.FlowOk getFlow() {
891890 public long getNextPublishSeqNo () {
892891 return nextPublishSeqNo ;
893892 }
894- }
893+
894+ }
0 commit comments