Skip to content

Commit 559a320

Browse files
garyrussellartembilan
authored andcommitted
DMLC: Publish event for connection failure
The `DirectMessageListenerContainer` did not publish a listener failed event for a connection failure. **cherry-pick to all 2.x branches** * Destroy beans in new test. # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java
1 parent 462efc4 commit 559a320

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,7 @@ private void doConsumeFromQueue(String queue) {
663663
connection = getConnectionFactory().createConnection();
664664
}
665665
catch (Exception e) {
666+
publishConsumerFailedEvent(e.getMessage(), false, e);
666667
addConsumerToRestart(new SimpleConsumer(null, null, queue));
667668
throw e instanceof AmqpConnectException // NOSONAR exception type check
668669
? (AmqpConnectException) e

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Properties;
3838
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.ExecutorService;
3940
import java.util.concurrent.Executors;
4041
import java.util.concurrent.TimeUnit;
4142
import java.util.concurrent.atomic.AtomicReference;
@@ -151,6 +152,36 @@ public void testSimple() throws Exception {
151152
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
152153
template.stop();
153154
cf.destroy();
155+
executor.destroy();
156+
}
157+
158+
@Test
159+
public void testBadHost() throws InterruptedException {
160+
CachingConnectionFactory cf = new CachingConnectionFactory("this.host.does.not.exist");
161+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
162+
executor.setThreadNamePrefix("client-");
163+
executor.afterPropertiesSet();
164+
cf.setExecutor(executor);
165+
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
166+
container.setQueueNames("dummy");
167+
container.setConsumersPerQueue(2);
168+
container.setMessageListener(in -> {
169+
});
170+
container.setBeanName("badHost");
171+
container.setConsumerTagStrategy(new Tag());
172+
CountDownLatch latch = new CountDownLatch(1);
173+
container.setApplicationEventPublisher(ev -> {
174+
if (ev instanceof ListenerContainerConsumerFailedEvent) {
175+
latch.countDown();
176+
}
177+
});
178+
container.setRecoveryInterval(100);
179+
container.afterPropertiesSet();
180+
container.start();
181+
assertTrue(latch.await(10, TimeUnit.SECONDS));
182+
container.stop();
183+
cf.destroy();
184+
executor.destroy();
154185
}
155186

156187
@Test
@@ -222,6 +253,7 @@ public void testQueueManagement() throws Exception {
222253
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
223254
template.stop();
224255
cf.destroy();
256+
executor.destroy();
225257
}
226258

227259
@Test
@@ -264,6 +296,7 @@ public void testQueueManagementQueueInstances() throws Exception {
264296
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
265297
template.stop();
266298
cf.destroy();
299+
executor.destroy();
267300
}
268301

269302
@Test
@@ -306,6 +339,7 @@ public void testAddRemoveConsumers() throws Exception {
306339
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
307340
template.stop();
308341
cf.destroy();
342+
executor.destroy();
309343
}
310344

311345
@Test
@@ -347,8 +381,8 @@ public void testErrorHandler() throws Exception {
347381
.put("x-dead-letter-routing-key", DLQ1)
348382
.get());
349383
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
350-
RabbitAdmin admin = new RabbitAdmin(cf);
351-
admin.declareQueue(q1);
384+
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
385+
rabbitAdmin.declareQueue(q1);
352386
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
353387
container.setQueueNames(Q1);
354388
container.setConsumersPerQueue(2);
@@ -457,13 +491,15 @@ public void testCancelConsumerBeforeConsumeOk() throws Exception {
457491
container.start();
458492
assertTrue(latch1.await(10, TimeUnit.SECONDS));
459493
Consumer consumer = consumerCaptor.getValue();
460-
Executors.newSingleThreadExecutor().execute(() -> {
494+
ExecutorService exec = Executors.newSingleThreadExecutor();
495+
exec.execute(() -> {
461496
container.stop();
462497
latch2.countDown();
463498
});
464499
assertTrue(latch2.await(10, TimeUnit.SECONDS));
465500
verify(channel).basicCancel(tag); // canceled properly even without consumeOk
466501
consumer.handleCancelOk(tag);
502+
exec.shutdownNow();
467503
}
468504

469505
@Test
@@ -482,9 +518,9 @@ private void testRecoverDeletedQueueGuts(boolean autoDeclare) throws Exception {
482518
if (autoDeclare) {
483519
GenericApplicationContext context = new GenericApplicationContext();
484520
context.getBeanFactory().registerSingleton("foo", new Queue(Q1));
485-
RabbitAdmin admin = new RabbitAdmin(cf);
486-
admin.setApplicationContext(context);
487-
context.getBeanFactory().registerSingleton("admin", admin);
521+
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
522+
rabbitAdmin.setApplicationContext(context);
523+
context.getBeanFactory().registerSingleton("admin", rabbitAdmin);
488524
context.refresh();
489525
container.setApplicationContext(context);
490526
}
@@ -505,10 +541,10 @@ private void testRecoverDeletedQueueGuts(boolean autoDeclare) throws Exception {
505541
assertTrue(consumersOnQueue(Q2, 2));
506542
assertTrue(activeConsumerCount(container, 2));
507543
assertTrue(restartConsumerCount(container, 2));
508-
RabbitAdmin admin = new RabbitAdmin(cf);
544+
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
509545
if (!autoDeclare) {
510546
Thread.sleep(2000);
511-
admin.declareQueue(new Queue(Q1));
547+
rabbitAdmin.declareQueue(new Queue(Q1));
512548
}
513549
assertTrue(consumersOnQueue(Q1, 2));
514550
assertTrue(consumersOnQueue(Q2, 2));

0 commit comments

Comments
 (0)