Skip to content

Commit 3ec56f2

Browse files
author
Simon MacMullen
committed
Mereg bug26008
2 parents fb57269 + 181a697 commit 3ec56f2

File tree

11 files changed

+147
-65
lines changed

11 files changed

+147
-65
lines changed

src/com/rabbitmq/client/Connection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,11 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
233233
* Remove all {@link BlockedListener}s.
234234
*/
235235
void clearBlockedListeners();
236+
237+
/**
238+
* Get the exception handler.
239+
*
240+
* @see com.rabbitmq.client.ExceptionHandler
241+
*/
242+
ExceptionHandler getExceptionHandler();
236243
}

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import com.rabbitmq.client.impl.AMQConnection;
3535
import com.rabbitmq.client.impl.ConnectionParams;
36+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
3637
import com.rabbitmq.client.impl.FrameHandler;
3738
import com.rabbitmq.client.impl.FrameHandlerFactory;
3839
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
@@ -90,6 +91,7 @@ public class ConnectionFactory implements Cloneable {
9091
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9192
private ExecutorService sharedExecutor;
9293
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
94+
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
9395

9496
private boolean automaticRecovery = false;
9597
private boolean topologyRecovery = true;
@@ -426,6 +428,28 @@ public void setSharedExecutor(ExecutorService executor) {
426428
this.sharedExecutor = executor;
427429
}
428430

431+
/**
432+
* Get the exception handler.
433+
*
434+
* @see com.rabbitmq.client.ExceptionHandler
435+
*/
436+
public ExceptionHandler getExceptionHandler() {
437+
return exceptionHandler;
438+
}
439+
440+
/**
441+
* Set the exception handler to use for newly created connections.
442+
*
443+
* @see com.rabbitmq.client.ExceptionHandler
444+
*/
445+
446+
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
447+
if (exceptionHandler == null) {
448+
throw new IllegalArgumentException("exception handler cannot be null!");
449+
}
450+
this.exceptionHandler = exceptionHandler;
451+
}
452+
429453
public boolean isSSL(){
430454
return getSocketFactory() instanceof SSLSocketFactory;
431455
}
@@ -559,7 +583,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
559583
public ConnectionParams params(ExecutorService executor) {
560584
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
561585
requestedFrameMax, requestedChannelMax, requestedHeartbeat, saslConfig,
562-
networkRecoveryInterval, topologyRecovery);
586+
networkRecoveryInterval, topologyRecovery, exceptionHandler);
563587
}
564588

565589
/**

src/com/rabbitmq/client/impl/ExceptionHandler.java renamed to src/com/rabbitmq/client/ExceptionHandler.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
// Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
1515
//
1616

17-
package com.rabbitmq.client.impl;
18-
19-
import com.rabbitmq.client.Channel;
20-
import com.rabbitmq.client.Connection;
21-
import com.rabbitmq.client.Consumer;
22-
import com.rabbitmq.client.TopologyRecoveryException;
17+
package com.rabbitmq.client;
2318

2419
/**
2520
* Interface to an exception-handling object.

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.rabbitmq.client.AMQP;
3232
import com.rabbitmq.client.AuthenticationFailureException;
3333
import com.rabbitmq.client.BlockedListener;
34+
import com.rabbitmq.client.ExceptionHandler;
3435
import com.rabbitmq.client.Method;
3536
import com.rabbitmq.client.AlreadyClosedException;
3637
import com.rabbitmq.client.Channel;
@@ -110,7 +111,7 @@ public static final Map<String, Object> defaultClientProperties() {
110111
private volatile boolean _running = false;
111112

112113
/** Handler for (uncaught) exceptions that crop up in the {@link MainLoop}. */
113-
private final ExceptionHandler _exceptionHandler;
114+
private ExceptionHandler _exceptionHandler;
114115

115116
/** Object used for blocking main application thread when doing all the necessary
116117
* connection shutdown operations
@@ -205,6 +206,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
205206
this._frameHandler = frameHandler;
206207
this._virtualHost = params.getVirtualHost();
207208
this._exceptionHandler = params.getExceptionHandler();
209+
208210
this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
209211
this.requestedFrameMax = params.getRequestedFrameMax();
210212
this.requestedChannelMax = params.getRequestedChannelMax();
@@ -220,7 +222,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
220222
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
221223
}
222224

223-
/**
225+
/**
224226
* Start up the connection, including the MainLoop thread.
225227
* Sends the protocol
226228
* version negotiation header, and runs through

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.rabbitmq.client.impl;
22

3+
import com.rabbitmq.client.ExceptionHandler;
34
import com.rabbitmq.client.SaslConfig;
45

56
import java.util.Map;
@@ -32,12 +33,13 @@ public class ConnectionParams {
3233
* @param saslConfig sasl configuration hook
3334
* @param networkRecoveryInterval interval used when recovering from network failure
3435
* @param topologyRecovery should topology (queues, exchanges, bindings, consumers) recovery be performed?
36+
* @param exceptionHandler
3537
*/
3638
public ConnectionParams(String username, String password, ExecutorService executor,
3739
String virtualHost, Map<String, Object> clientProperties,
3840
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
3941
SaslConfig saslConfig, int networkRecoveryInterval,
40-
boolean topologyRecovery) {
42+
boolean topologyRecovery, ExceptionHandler exceptionHandler) {
4143
this.username = username;
4244
this.password = password;
4345
this.executor = executor;
@@ -49,8 +51,7 @@ public ConnectionParams(String username, String password, ExecutorService execut
4951
this.saslConfig = saslConfig;
5052
this.networkRecoveryInterval = networkRecoveryInterval;
5153
this.topologyRecovery = topologyRecovery;
52-
53-
this.exceptionHandler = new DefaultExceptionHandler();
54+
this.exceptionHandler = exceptionHandler;
5455
}
5556

5657
public String getUsername() {
@@ -93,15 +94,6 @@ public ExceptionHandler getExceptionHandler() {
9394
return exceptionHandler;
9495
}
9596

96-
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
97-
this.exceptionHandler = exceptionHandler;
98-
}
99-
100-
public ConnectionParams exceptionHandler(ExceptionHandler exceptionHandler) {
101-
this.exceptionHandler = exceptionHandler;
102-
return this;
103-
}
104-
10597
public int getNetworkRecoveryInterval() {
10698
return networkRecoveryInterval;
10799
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
import com.rabbitmq.client.Channel;
2525
import com.rabbitmq.client.Connection;
2626
import com.rabbitmq.client.Consumer;
27+
import com.rabbitmq.client.ExceptionHandler;
2728
import com.rabbitmq.client.TopologyRecoveryException;
2829

2930
/**
30-
* Default implementation of {@link ExceptionHandler} used by {@link AMQConnection}.
31+
* Default implementation of {@link com.rabbitmq.client.ExceptionHandler} used by {@link AMQConnection}.
3132
*/
3233
public class DefaultExceptionHandler implements ExceptionHandler {
3334
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import com.rabbitmq.client.ShutdownSignalException;
1212
import com.rabbitmq.client.TopologyRecoveryException;
1313
import com.rabbitmq.client.impl.ConnectionParams;
14-
import com.rabbitmq.client.impl.ExceptionHandler;
14+
import com.rabbitmq.client.ExceptionHandler;
1515
import com.rabbitmq.client.impl.FrameHandlerFactory;
1616
import com.rabbitmq.client.impl.NetworkConnection;
1717

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import com.rabbitmq.client.ConnectionFactory;
3636
import com.rabbitmq.client.Consumer;
3737
import com.rabbitmq.client.impl.AMQConnection;
38-
import com.rabbitmq.client.impl.ExceptionHandler;
38+
import com.rabbitmq.client.ExceptionHandler;
3939
import com.rabbitmq.client.impl.Frame;
4040
import com.rabbitmq.client.impl.FrameHandler;
4141

@@ -59,6 +59,7 @@ public static TestSuite suite() {
5959
/** The mock frame handler used to test connection behaviour. */
6060
private MockFrameHandler _mockFrameHandler;
6161
private ConnectionFactory factory;
62+
private MyExceptionHandler exceptionHandler;
6263

6364
/** Setup the environment for this test
6465
* @see junit.framework.TestCase#setUp()
@@ -68,6 +69,8 @@ public static TestSuite suite() {
6869
super.setUp();
6970
_mockFrameHandler = new MockFrameHandler();
7071
factory = new ConnectionFactory();
72+
exceptionHandler = new MyExceptionHandler();
73+
factory.setExceptionHandler(exceptionHandler);
7174
}
7275

7376
/** Tear down the environment for this test
@@ -86,19 +89,17 @@ public static TestSuite suite() {
8689
public void testConnectionSendsSingleHeaderAndTimesOut() {
8790
IOException exception = new SocketTimeoutException();
8891
_mockFrameHandler.setExceptionOnReadingFrames(exception);
89-
MyExceptionHandler handler = new MyExceptionHandler();
9092
assertEquals(0, _mockFrameHandler.countHeadersSent());
9193
try {
9294
ConnectionParams params = factory.params(Executors.newFixedThreadPool(1));
93-
params.setExceptionHandler(handler);
9495
new AMQConnection(params, _mockFrameHandler).start();
9596
fail("Connection should have thrown exception");
9697
} catch(IOException signal) {
9798
// As expected
9899
}
99100
assertEquals(1, _mockFrameHandler.countHeadersSent());
100101
// _connection.close(0, CLOSE_MESSAGE);
101-
List<Throwable> exceptionList = handler.getHandledExceptions();
102+
List<Throwable> exceptionList = exceptionHandler.getHandledExceptions();
102103
assertEquals(Collections.<Throwable>singletonList(exception), exceptionList);
103104
}
104105

@@ -120,19 +121,17 @@ public void testConnectionSendsSingleHeaderAndTimesOut() {
120121
*/
121122
public void testConnectionHangInNegotiation() {
122123
this._mockFrameHandler.setTimeoutCount(10); // to limit hang
123-
MyExceptionHandler handler = new MyExceptionHandler();
124124
assertEquals(0, this._mockFrameHandler.countHeadersSent());
125125
try {
126126
ConnectionParams params = factory.params(Executors.newFixedThreadPool(1));
127-
params.setExceptionHandler(handler);
128127
new AMQConnection(params, this._mockFrameHandler).start();
129128
fail("Connection should have thrown exception");
130129
} catch(IOException signal) {
131130
// As expected
132131
}
133132
assertEquals(1, this._mockFrameHandler.countHeadersSent());
134133
// _connection.close(0, CLOSE_MESSAGE);
135-
List<Throwable> exceptionList = handler.getHandledExceptions();
134+
List<Throwable> exceptionList = exceptionHandler.getHandledExceptions();
136135
assertEquals("Only one exception expected", 1, exceptionList.size());
137136
assertEquals("Wrong type of exception returned.", SocketTimeoutException.class, exceptionList.get(0).getClass());
138137
}

test/src/com/rabbitmq/client/test/CloseInMainLoop.java

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,38 @@
3636
import com.rabbitmq.client.impl.SocketFrameHandler;
3737

3838
public class CloseInMainLoop extends BrokerTestCase{
39+
private final CountDownLatch closeLatch = new CountDownLatch(1);
40+
41+
private ConnectionFactory specialConnectionFactory() {
42+
ConnectionFactory f = new ConnectionFactory();
43+
f.setExceptionHandler(new DefaultExceptionHandler(){
44+
@Override
45+
public void handleConsumerException(Channel channel,
46+
Throwable exception,
47+
Consumer consumer,
48+
String consumerTag,
49+
String methodName) {
50+
try {
51+
// TODO: change this to call 4-parameter close and make 6-parm one private
52+
((AMQConnection) channel.getConnection())
53+
.close(AMQP.INTERNAL_ERROR,
54+
"Internal error in Consumer " + consumerTag,
55+
false,
56+
exception,
57+
-1,
58+
false);
59+
} catch (Throwable e) {
60+
// Man, this clearly isn't our day.
61+
// TODO: Log the nested failure
62+
} finally {
63+
closeLatch.countDown();
64+
}
65+
}
66+
});
67+
return f;
68+
}
3969

40-
private final CountDownLatch closeLatch = new CountDownLatch(1);
41-
42-
class SpecialConnection extends AMQConnection{
70+
class SpecialConnection extends AMQConnection{
4371
private AtomicBoolean validShutdown = new AtomicBoolean(false);
4472

4573
public boolean hadValidShutdown(){
@@ -48,45 +76,16 @@ public boolean hadValidShutdown(){
4876
}
4977

5078
public SpecialConnection() throws Exception {
51-
this(new ConnectionFactory());
52-
}
53-
54-
private SpecialConnection(ConnectionFactory factory) throws Exception{
55-
super(factory.params(Executors.newFixedThreadPool(1)).exceptionHandler(
56-
new DefaultExceptionHandler(){
57-
@Override
58-
public void handleConsumerException(Channel channel,
59-
Throwable exception,
60-
Consumer consumer,
61-
String consumerTag,
62-
String methodName) {
63-
try {
64-
// TODO: change this to call 4-parameter close and make 6-parm one private
65-
((AMQConnection) channel.getConnection())
66-
.close(AMQP.INTERNAL_ERROR,
67-
"Internal error in Consumer " + consumerTag,
68-
false,
69-
exception,
70-
-1,
71-
false);
72-
} catch (Throwable e) {
73-
// Man, this clearly isn't our day.
74-
// TODO: Log the nested failure
75-
} finally {
76-
closeLatch.countDown();
77-
}
78-
}
79-
}
80-
), new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT)));
79+
super(specialConnectionFactory().params(Executors.newFixedThreadPool(1)),
80+
new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT)));
8181
this.start();
82-
}
82+
}
8383

8484
@Override
8585
public boolean processControlCommand(Command c) throws IOException{
8686
if(c.getMethod() instanceof AMQP.Connection.CloseOk) validShutdown.set(true);
8787
return super.processControlCommand(c);
8888
}
89-
9089
}
9190

9291
public void testCloseOKNormallyReceived() throws Exception{

0 commit comments

Comments
 (0)