Skip to content

Commit c60e2e9

Browse files
committed
Merge bug18061 into default
2 parents 0b14414 + e22445b commit c60e2e9

File tree

10 files changed

+181
-55
lines changed

10 files changed

+181
-55
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client;
2+
3+
/*
4+
* Thrown when application tries to perform an action on connection/channel
5+
* which was already closed
6+
*/
7+
public class AlreadyClosedException extends ShutdownSignalException {
8+
public AlreadyClosedException(String s)
9+
{
10+
super(true, true, s);
11+
}
12+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
*
5757
*/
5858

59-
public interface Channel {
59+
public interface Channel extends ShutdownNotifier{
6060
/**
6161
* Retrieve this channel's channel number.
6262
* @return the channel number

src/com/rabbitmq/client/Connection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* Current implementations are thread-safe for code at the client API level,
5050
* and in fact thread-safe internally except for code within RPC calls.
5151
*/
52-
public interface Connection { // rename to AMQPConnection later, this is a temporary name
52+
public interface Connection extends ShutdownNotifier { // rename to AMQPConnection later, this is a temporary name
5353
/**
5454
* Retrieve the host.
5555
* @return the hostname of the peer we're connected to.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.EventListener;
4+
5+
public interface ShutdownListener extends EventListener {
6+
7+
public void shutdownCompleted(ShutdownSignalException cause);
8+
9+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Interface for components that are shutdown capable and
5+
* that allow listeners to be added for shutdown signals
6+
*
7+
* @see ShutdownListener
8+
* @see ShutdownSignalException
9+
*/
10+
public interface ShutdownNotifier {
11+
/**
12+
* Add shutdown listener.
13+
* If the component is already closed, handler is fired immediately
14+
*
15+
* @param listener {@link ShutdownListener} to the component
16+
*/
17+
public void addShutdownListener(ShutdownListener listener);
18+
19+
/**
20+
* Remove shutdown listener for the component.
21+
*
22+
* @param listener {@link ShutdownListener} to be removed
23+
*/
24+
public void removeShutdownListener(ShutdownListener listener);
25+
26+
/**
27+
* Get the shutdown reason object
28+
* @return ShutdownSignalException if component is closed, null otherwise
29+
*/
30+
public ShutdownSignalException getCloseReason();
31+
32+
/**
33+
* Protected API - notify the listeners attached to the component
34+
* @see com.rabbitmq.client.ShutdownListener
35+
*/
36+
public void notifyListeners();
37+
38+
/**
39+
* Determine whether the component is currently open.
40+
* Will return false if we are currently closing.
41+
* Checking this method should be only for information,
42+
* because of the race conditions - state can change after the call.
43+
* Instead just execute and try to catch ShutdownSignalException
44+
* and IOException
45+
*
46+
* @return true when component is open, false otherwise
47+
*/
48+
boolean isOpen();
49+
}

src/com/rabbitmq/client/ShutdownSignalException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Encapsulates a shutdown condition for a connection to an AMQP broker.
2929
*/
3030

31-
public class ShutdownSignalException extends Exception {
31+
public class ShutdownSignalException extends RuntimeException {
3232
/** True if the connection is shut down, or false if this signal refers to a channel */
3333
private final boolean _hardError;
3434

@@ -59,11 +59,13 @@ public ShutdownSignalException(boolean hardError,
5959

6060
/** @return true if this signals a connection error, or false if a channel error */
6161
public boolean isHardError() { return _hardError; }
62+
6263
/** @return true if this exception was caused by explicit application
6364
* action; false if it originated with the broker or as a result
6465
* of detectable non-deliberate application failure
6566
*/
6667
public boolean isInitiatedByApplication() { return _initiatedByApplication; }
68+
6769
/** @return the reason object, if any */
6870
public Object getReason() { return _reason; }
6971

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.io.IOException;
2929

30+
import com.rabbitmq.client.AlreadyClosedException;
3031
import com.rabbitmq.client.Command;
3132
import com.rabbitmq.client.Connection;
3233
import com.rabbitmq.client.ShutdownSignalException;
@@ -41,7 +42,7 @@
4142
* @see ChannelN
4243
* @see Connection
4344
*/
44-
public abstract class AMQChannel {
45+
public abstract class AMQChannel extends ShutdownNotifierComponent {
4546
/** The connection this channel is associated with. */
4647
public final AMQConnection _connection;
4748

@@ -54,9 +55,6 @@ public abstract class AMQChannel {
5455
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5556
public RpcContinuation _activeRpc = null;
5657

57-
/** Indicates whether this channel is in a state to handle further activity. */
58-
public volatile boolean _isOpen = true;
59-
6058
/**
6159
* Construct a channel on the given connection, with the given channel number.
6260
* @param connection the underlying connection for this channel
@@ -117,6 +115,10 @@ public AMQCommand exnWrappingRpc(Method m)
117115
{
118116
try {
119117
return rpc(m);
118+
} catch (AlreadyClosedException ace) {
119+
// Do not wrap it since it means that connection/channel
120+
// was closed in some action in the past
121+
throw ace;
120122
} catch (ShutdownSignalException ex) {
121123
throw wrap(ex);
122124
}
@@ -160,21 +162,18 @@ public synchronized void transmitAndEnqueue(Method m, RpcContinuation k)
160162
transmit(m);
161163
}
162164

163-
public synchronized RpcContinuation nextOutstandingRpc() {
165+
public synchronized RpcContinuation nextOutstandingRpc()
166+
{
164167
RpcContinuation result = _activeRpc;
165168
_activeRpc = null;
166169
return result;
167170
}
168171

169-
public boolean isOpen() {
170-
return _isOpen;
171-
}
172-
173172
public void ensureIsOpen()
174-
throws IllegalStateException
173+
throws AlreadyClosedException
175174
{
176175
if (!isOpen()) {
177-
throw new IllegalStateException("Attempt to use closed channel");
176+
throw new AlreadyClosedException("Attempt to use closed channel");
178177
}
179178
}
180179

@@ -261,7 +260,7 @@ public void run() {
261260
public void processShutdownSignal(ShutdownSignalException signal) {
262261
synchronized (this) {
263262
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
264-
_isOpen = false;
263+
_shutdownCause = signal;
265264
}
266265
RpcContinuation k = nextOutstandingRpc();
267266
if (k != null) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import com.rabbitmq.client.AMQP;
3434
import com.rabbitmq.client.Address;
35+
import com.rabbitmq.client.AlreadyClosedException;
3536
import com.rabbitmq.client.Channel;
3637
import com.rabbitmq.client.Command;
3738
import com.rabbitmq.client.Connection;
@@ -59,7 +60,7 @@
5960
* int ticket = ch1.accessRequest(realmName);
6061
* </pre>
6162
*/
62-
public class AMQConnection implements Connection {
63+
public class AMQConnection extends ShutdownNotifierComponent implements Connection{
6364
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6465
public static final int HANDSHAKE_TIMEOUT = 10000;
6566

@@ -88,19 +89,12 @@ public class AMQConnection implements Connection {
8889
/** Flag controlling the main driver loop's termination */
8990
public volatile boolean _running = false;
9091

91-
/**
92-
* When this value is null, the connection is in an "open"
93-
* state. When non-null, the connection is in "closed" state, and
94-
* this value indicates the circumstances of the shutdown.
95-
*/
96-
public volatile ShutdownSignalException _shutdownCause = null;
97-
9892
/** Maximum frame length, or zero if no limit is set */
9993
public int _frameMax;
10094

10195
/** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
10296
public final ExceptionHandler _exceptionHandler;
103-
97+
10498
/**
10599
* Protected API - respond, in the driver thread, to a ShutdownSignal.
106100
* @param channelNumber the number of the channel to disconnect
@@ -109,15 +103,11 @@ public final void disconnectChannel(int channelNumber) {
109103
_channelManager.disconnectChannel(channelNumber);
110104
}
111105

112-
public boolean isOpen() {
113-
return _shutdownCause == null;
114-
}
115-
116106
public void ensureIsOpen()
117-
throws IllegalStateException
107+
throws AlreadyClosedException
118108
{
119109
if (!isOpen()) {
120-
throw new IllegalStateException("Attempt to use closed connection");
110+
throw new AlreadyClosedException("Attempt to use closed connection");
121111
}
122112
}
123113

@@ -340,15 +330,13 @@ public Address[] open(final ConnectionParameters params, boolean insist)
340330
_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
341331
_frameHandler.sendHeader();
342332

343-
if (!isOpen()) {
344-
// See bug 17389. The MainLoop could have shut down already in
345-
// which case we don't want to wait forever for a reply.
346-
347-
// There is no race if the MainLoop shuts down after enqueuing
348-
// the RPC because if that happens the channel will correctly
349-
// pass the exception into RPC, waking it up.
350-
throw _shutdownCause;
351-
}
333+
// See bug 17389. The MainLoop could have shut down already in
334+
// which case we don't want to wait forever for a reply.
335+
336+
// There is no race if the MainLoop shuts down after enqueuing
337+
// the RPC because if that happens the channel will correctly
338+
// pass the exception into RPC, waking it up.
339+
ensureIsOpen();
352340

353341
AMQP.Connection.Start connStart =
354342
(AMQP.Connection.Start) connStartBlocker.getReply().getMethod();
@@ -561,6 +549,7 @@ public void handleConnectionClose(Command closeCommand) {
561549
Utility.emptyStatement();
562550
}
563551
shutdown(closeCommand, false, null);
552+
notifyListeners();
564553
}
565554

566555
/**
@@ -616,6 +605,7 @@ public void close(int closeCode,
616605
} finally {
617606
_running = false;
618607
}
608+
notifyListeners();
619609
}
620610

621611
@Override public String toString() {

0 commit comments

Comments
 (0)