2828import java .io .EOFException ;
2929import java .io .IOException ;
3030import java .net .SocketException ;
31+ import java .util .Collections ;
32+ import java .util .LinkedList ;
33+ import java .util .List ;
3134import java .util .Map ;
3235
3336import com .rabbitmq .client .AMQP ;
3437import com .rabbitmq .client .Address ;
38+ import com .rabbitmq .client .AlreadyClosedException ;
3539import com .rabbitmq .client .Channel ;
3640import com .rabbitmq .client .Command ;
3741import com .rabbitmq .client .Connection ;
3842import com .rabbitmq .client .ConnectionParameters ;
3943import com .rabbitmq .client .MissedHeartbeatException ;
4044import com .rabbitmq .client .RedirectException ;
45+ import com .rabbitmq .client .ShutdownListener ;
4146import com .rabbitmq .client .ShutdownSignalException ;
4247import com .rabbitmq .utility .Utility ;
4348
@@ -100,7 +105,11 @@ public class AMQConnection implements Connection {
100105
101106 /** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
102107 public final ExceptionHandler _exceptionHandler ;
103-
108+
109+ /** List of all shutdown listeners associated with the connection */
110+ public List <ShutdownListener > listeners
111+ = Collections .synchronizedList (new LinkedList <ShutdownListener >());
112+
104113 /**
105114 * Protected API - respond, in the driver thread, to a ShutdownSignal.
106115 * @param channelNumber the number of the channel to disconnect
@@ -109,15 +118,19 @@ public final void disconnectChannel(int channelNumber) {
109118 _channelManager .disconnectChannel (channelNumber );
110119 }
111120
121+ /**
122+ * Public API - Determine whether the connection is open
123+ * @return true if haven't yet received shutdown signal, false otherwise
124+ */
112125 public boolean isOpen () {
113126 return _shutdownCause == null ;
114127 }
115128
116129 public void ensureIsOpen ()
117- throws IllegalStateException
130+ throws AlreadyClosedException
118131 {
119132 if (!isOpen ()) {
120- throw new IllegalStateException ("Attempt to use closed connection" );
133+ throw new AlreadyClosedException ("Attempt to use closed connection" );
121134 }
122135 }
123136
@@ -576,7 +589,7 @@ public void shutdown(Object reason,
576589 ensureIsOpen (); // invariant: we should never be shut down more than once per instance
577590 _shutdownCause = new ShutdownSignalException (true ,
578591 initiatedByApplication ,
579- reason );
592+ reason , this );
580593 }
581594 if (cause != null ) {
582595 _shutdownCause .initCause (cause );
@@ -616,6 +629,48 @@ public void close(int closeCode,
616629 } finally {
617630 _running = false ;
618631 }
632+
633+ synchronized (listeners ) {
634+ for (ShutdownListener l : listeners )
635+ l .service (getCloseReason ());
636+ }
637+ }
638+
639+ /**
640+ * Public API - Add shutdown listener fired when closing the connection
641+ * @see com.rabbitmq.client.Connection#addShutdownListener()
642+ */
643+ public void addShutdownListener (ShutdownListener listener )
644+ {
645+
646+ boolean closed = false ;
647+ synchronized (listeners ) {
648+ closed = !isOpen ();
649+ listeners .add (listener );
650+ }
651+ if (closed )
652+ listener .service (_shutdownCause );
653+ }
654+
655+ /**
656+ * Public API - Remove shutdown listener for this connection
657+ * Removing only the first found object
658+ * @see com.rabbitmq.client.Connection#removeShutdownListener()
659+ */
660+ public void removeShutdownListener (ShutdownListener listener )
661+ {
662+ synchronized (listeners ) {
663+ listeners .remove (listener );
664+ }
665+ }
666+
667+ /**
668+ * Public API - Get reason for shutdown, or null if open
669+ * @see com.rabbitmq.client.Connection#getShutdownReason()
670+ */
671+ public ShutdownSignalException getCloseReason ()
672+ {
673+ return _shutdownCause ;
619674 }
620675
621676 @ Override public String toString () {
0 commit comments