2929import java .io .IOException ;
3030import java .net .SocketException ;
3131import java .util .Map ;
32+ import java .util .concurrent .TimeoutException ;
3233
3334import com .rabbitmq .client .AMQP ;
3435import com .rabbitmq .client .Address ;
4041import com .rabbitmq .client .MissedHeartbeatException ;
4142import com .rabbitmq .client .RedirectException ;
4243import com .rabbitmq .client .ShutdownSignalException ;
44+ import com .rabbitmq .utility .BlockingCell ;
4345import com .rabbitmq .utility .Utility ;
4446
4547/**
@@ -95,6 +97,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
9597 /** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
9698 public final ExceptionHandler _exceptionHandler ;
9799
100+ public BlockingCell <Object > appContinuation = new BlockingCell <Object >();
101+
98102 /**
99103 * Protected API - respond, in the driver thread, to a ShutdownSignal.
100104 * @param channelNumber the number of the channel to disconnect
@@ -437,7 +441,8 @@ public MainLoop() {
437441 // channel zero that aren't Connection.CloseOk) must
438442 // be discarded.
439443 ChannelN channel = _channelManager .getChannel (frame .channel );
440- channel .handleFrame (frame ); // Should check for null here?
444+ // FIXME: catch NullPointerException and throw more informative one?
445+ channel .handleFrame (frame );
441446 }
442447 }
443448 }
@@ -462,6 +467,10 @@ public MainLoop() {
462467
463468 // Finally, shut down our underlying data connection.
464469 _frameHandler .close ();
470+
471+ synchronized (this ) {
472+ appContinuation .set (null );
473+ }
465474 }
466475 }
467476
@@ -543,12 +552,24 @@ public boolean processControlCommand(Command c)
543552 }
544553
545554 public void handleConnectionClose (Command closeCommand ) {
555+ shutdown (closeCommand , false , null );
546556 try {
547557 _channel0 .transmit (new AMQImpl .Connection .CloseOk ());
548558 } catch (IOException ioe ) {
549559 Utility .emptyStatement ();
550560 }
551- shutdown (closeCommand , false , null );
561+
562+ try {
563+ synchronized (this ) {
564+ appContinuation .uninterruptibleGet (CONNECTION_CLOSING_TIMEOUT );
565+ }
566+ } catch (TimeoutException ise ) {
567+ // Broker didn't close socket on time, force socket close
568+ // FIXME: notify about timeout exception?
569+ _frameHandler .close ();
570+ } finally {
571+ _running = false ;
572+ }
552573 notifyListeners ();
553574 }
554575
@@ -561,27 +582,86 @@ public void shutdown(Object reason,
561582 boolean initiatedByApplication ,
562583 Throwable cause )
563584 {
564- synchronized (this ) {
565- ensureIsOpen (); // invariant: we should never be shut down more than once per instance
566- _shutdownCause = new ShutdownSignalException (true ,
567- initiatedByApplication ,
568- reason , this );
569- }
570- if (cause != null ) {
571- _shutdownCause .initCause (cause );
572- }
573- _channel0 .processShutdownSignal (_shutdownCause );
585+ try {
586+ synchronized (this ) {
587+ ensureIsOpen (); // invariant: we should never be shut down more than once per instance
588+ _shutdownCause = new ShutdownSignalException (true ,
589+ initiatedByApplication ,
590+ reason , this );
591+ }
592+
593+ if (cause != null ) {
594+ _shutdownCause .initCause (cause );
595+ }
596+ _channel0 .processShutdownSignal (_shutdownCause );
597+ } catch (AlreadyClosedException ace ) {
598+ if (initiatedByApplication )
599+ throw ace ;
600+ }
574601 _channelManager .handleSignal (_shutdownCause );
575602 }
576603
577604 /**
578- * Public API - Close this connection with the given code and message.
605+ * Public API - Close this connection and all its channels
606+ */
607+ public void close ()
608+ throws IOException
609+ {
610+ close (-1 );
611+ }
612+
613+ /**
614+ * Public API - Close this connection and all its channels
615+ * with a given timeout
616+ */
617+ public void close (int timeout )
618+ throws IOException
619+ {
620+ close (200 , "Goodbye" , timeout );
621+ }
622+
623+ /**
624+ * Public API - Abort this connection and all its channels
625+ */
626+ public void abort ()
627+ {
628+ abort (-1 );
629+ }
630+
631+ public void abort (int timeout )
632+ {
633+
634+ try {
635+ close (200 , "Goodbye" , true , null , timeout , true );
636+ } catch (IOException e ) {
637+ Utility .emptyStatement ();
638+ }
639+ }
640+
641+ /**
642+ * Protected API - Close this connection with the given code and message.
579643 * See the comments in ChannelN.close() - we're very similar.
580644 */
581645 public void close (int closeCode , String closeMessage )
582646 throws IOException
583647 {
584- close (closeCode , closeMessage , true , null );
648+ close (closeCode , closeMessage , 0 );
649+ }
650+
651+ public void close (int closeCode , String closeMessage , int timeout )
652+ throws IOException
653+ {
654+ close (closeCode , closeMessage , true , null , timeout , false );
655+ }
656+
657+ public void close (int closeCode ,
658+ String closeMessage ,
659+ boolean initiatedByApplication ,
660+ Throwable cause )
661+ throws IOException
662+ {
663+ close (closeCode , closeMessage , initiatedByApplication , cause , 0 , false );
664+
585665 }
586666
587667 /**
@@ -590,25 +670,34 @@ public void close(int closeCode, String closeMessage)
590670 public void close (int closeCode ,
591671 String closeMessage ,
592672 boolean initiatedByApplication ,
593- Throwable cause )
673+ Throwable cause ,
674+ int timeout ,
675+ boolean abort )
594676 throws IOException
595677 {
596678 try {
597679 AMQImpl .Connection .Close reason =
598680 new AMQImpl .Connection .Close (closeCode , closeMessage , 0 , 0 );
599681 shutdown (reason , initiatedByApplication , cause );
600682 _channel0 .quiescingRpc (reason ,
601- CONNECTION_CLOSING_TIMEOUT ,
683+ timeout ,
602684 new AMQCommand (new AMQImpl .Connection .CloseOk ()));
685+ } catch (TimeoutException ise ) {
686+ // FIXME: notify about timeout exception ?
603687 } catch (ShutdownSignalException sse ) {
604- // Ignore.
688+ if (!abort )
689+ throw sse ;
690+ } catch (IOException ioe ) {
691+ if (!abort )
692+ throw ioe ;
605693 } finally {
606694 _running = false ;
695+ _frameHandler .close ();
607696 }
608697 notifyListeners ();
609698 }
610699
611700 @ Override public String toString () {
612701 return "amqp://" + _params .getUserName () + "@" + getHost () + ":" + getPort () + _params .getVirtualHost ();
613702 }
614- }
703+ }
0 commit comments