Skip to content

Commit 45360f7

Browse files
committed
Close connections in tests
1 parent e406a6d commit 45360f7

File tree

3 files changed

+31
-26
lines changed

3 files changed

+31
-26
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,9 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
636636
// was being re-established. So we attempt to close the newly created connection.
637637
newConn.abort();
638638
return null;
639+
} catch (IllegalStateException e) {
640+
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
641+
return null;
639642
} catch (Exception e) {
640643
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
641644
this.getExceptionHandler().handleConnectionRecoveryException(this, e);

src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
public class ExceptionHandling {
4141
private ConnectionFactory newConnectionFactory(ExceptionHandler eh) {
4242
ConnectionFactory cf = TestUtils.connectionFactory();
43+
cf.setNetworkRecoveryInterval(2000);
4344
cf.setExceptionHandler(eh);
4445
return cf;
4546
}
@@ -74,21 +75,22 @@ protected void testConsumerHandleConsumerException(ExceptionHandler eh, CountDow
7475
throws InterruptedException, TimeoutException, IOException {
7576
ConnectionFactory cf = newConnectionFactory(eh);
7677
assertEquals(cf.getExceptionHandler(), eh);
77-
Connection conn = cf.newConnection();
78-
assertEquals(conn.getExceptionHandler(), eh);
79-
Channel ch = conn.createChannel();
80-
String q = ch.queueDeclare().getQueue();
81-
ch.basicConsume(q, new DefaultConsumer(ch) {
82-
@Override
83-
public void handleDelivery(String consumerTag, Envelope envelope,
84-
AMQP.BasicProperties properties, byte[] body) throws IOException {
85-
throw new RuntimeException("exception expected here, don't freak out");
86-
}
87-
});
88-
ch.basicPublish("", q, null, "".getBytes());
89-
wait(latch);
78+
try (Connection conn = cf.newConnection()) {
79+
assertEquals(conn.getExceptionHandler(), eh);
80+
Channel ch = conn.createChannel();
81+
String q = ch.queueDeclare().getQueue();
82+
ch.basicConsume(q, new DefaultConsumer(ch) {
83+
@Override
84+
public void handleDelivery(String consumerTag, Envelope envelope,
85+
AMQP.BasicProperties properties, byte[] body) throws IOException {
86+
throw new RuntimeException("exception expected here, don't freak out");
87+
}
88+
});
89+
ch.basicPublish("", q, null, "".getBytes());
90+
wait(latch);
9091

91-
assertEquals(!expectChannelClose, ch.isOpen());
92+
assertEquals(!expectChannelClose, ch.isOpen());
93+
}
9294
}
9395

9496
@Test public void nullExceptionHandler() {

src/test/java/com/rabbitmq/client/test/functional/Metrics.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim
8282
Connection connection1 = null;
8383
Connection connection2 = null;
8484
try {
85-
connection1 = connectionFactory.newConnection();
85+
connection1 = connectionFactory.newConnection(generateConnectionName());
8686
assertThat(metrics.getConnections().getCount()).isEqualTo(1L);
8787

8888
connection1.createChannel();
@@ -102,7 +102,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim
102102
channel.basicGet(QUEUE, true);
103103
assertThat(metrics.getConsumedMessages().getCount()).isEqualTo(2L);
104104

105-
connection2 = connectionFactory.newConnection();
105+
connection2 = connectionFactory.newConnection(generateConnectionName());
106106
assertThat(metrics.getConnections().getCount()).isEqualTo(2L);
107107

108108
connection2.createChannel();
@@ -142,7 +142,7 @@ public void metricsPublisherUnrouted(ConnectionFactory connectionFactory) throws
142142
connectionFactory.setMetricsCollector(metrics);
143143
Connection connection = null;
144144
try {
145-
connection = connectionFactory.newConnection();
145+
connection = connectionFactory.newConnection(generateConnectionName());
146146
Channel channel = connection.createChannel();
147147
channel.confirmSelect();
148148
assertThat(metrics.getPublishUnroutedMessages().getCount()).isEqualTo(0L);
@@ -168,7 +168,7 @@ public void metricsPublisherAck(ConnectionFactory connectionFactory) throws IOEx
168168
connectionFactory.setMetricsCollector(metrics);
169169
Connection connection = null;
170170
try {
171-
connection = connectionFactory.newConnection();
171+
connection = connectionFactory.newConnection(generateConnectionName());
172172
Channel channel = connection.createChannel();
173173
channel.confirmSelect();
174174
assertThat(metrics.getPublishAcknowledgedMessages().getCount()).isEqualTo(0L);
@@ -196,7 +196,7 @@ public void metricsAck(ConnectionFactory connectionFactory) throws IOException,
196196

197197
Connection connection = null;
198198
try {
199-
connection = connectionFactory.newConnection();
199+
connection = connectionFactory.newConnection(generateConnectionName());
200200
Channel channel1 = connection.createChannel();
201201
Channel channel2 = connection.createChannel();
202202

@@ -264,7 +264,7 @@ public void metricsReject(ConnectionFactory connectionFactory) throws IOExceptio
264264

265265
Connection connection = null;
266266
try {
267-
connection = connectionFactory.newConnection();
267+
connection = connectionFactory.newConnection(generateConnectionName());
268268
Channel channel = connection.createChannel();
269269

270270
sendMessage(channel);
@@ -304,7 +304,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
304304
try {
305305
Channel [] channels = new Channel[nbChannels];
306306
for(int i = 0; i < nbConnections; i++) {
307-
connections[i] = connectionFactory.newConnection();
307+
connections[i] = connectionFactory.newConnection(generateConnectionName());
308308
for(int j = 0; j < nbChannelsPerConnection; j++) {
309309
Channel channel = connections[i].createChannel();
310310
channel.basicQos(1);
@@ -347,7 +347,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
347347
executorService.shutdownNow();
348348

349349
executorService = Executors.newFixedThreadPool(nbTasks);
350-
tasks = new ArrayList<Callable<Void>>();
350+
tasks = new ArrayList<>();
351351
for(int i = 0; i < nbTasks; i++) {
352352
Channel channelForConsuming = channels[i];
353353
tasks.add(random.nextBoolean() ?
@@ -376,7 +376,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
376376
executorService.shutdownNow();
377377

378378
executorService = Executors.newFixedThreadPool(nbTasks);
379-
tasks = new ArrayList<Callable<Void>>();
379+
tasks = new ArrayList<>();
380380
for(int i = 0; i < nbTasks; i++) {
381381
Channel channelForConsuming = channels[i];
382382
tasks.add(random.nextBoolean() ?
@@ -405,7 +405,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
405405

406406
Connection connection = null;
407407
try {
408-
connection = connectionFactory.newConnection();
408+
connection = connectionFactory.newConnection(generateConnectionName());
409409
Channel channel = connection.createChannel();
410410

411411
assertThat(metrics.getConnections().getCount()).isEqualTo(1L);
@@ -429,7 +429,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
429429

430430
Connection connection = null;
431431
try {
432-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
432+
connection = connectionFactory.newConnection(generateConnectionName());
433433

434434
Collection<?> shutdownHooks = getShutdownHooks(connection);
435435
assertThat(shutdownHooks.size()).isEqualTo(0);
@@ -459,7 +459,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
459459

460460
Connection connection = null;
461461
try {
462-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
462+
connection = connectionFactory.newConnection(generateConnectionName());
463463

464464
Channel channel1 = connection.createChannel();
465465
AtomicInteger ackedMessages = new AtomicInteger(0);

0 commit comments

Comments
 (0)