Skip to content

Commit 6402004

Browse files
committed
GH-2994: Add tests for RabbitAmqpListenerContainer
Fixes: #2994 * Add `@RabbitListener` tests for `RabbitAmqpMessageListener` * Adjust `RabbitAmqpMessageListener` error handling logic to call `errorHandler` first * Move `consumer.pause()` for `RabbitAmqpMessageListener.stop()` to the `supplyAsync()` to initiate pause for all consumers in parallel * Expose a failed `message` to the `ListenerExecutionFailedException` from the `RabbitAmqpMessageListenerAdapter` * Ensure RabbitMQ objects are deleted in the end of tests
1 parent 5be96cb commit 6402004

File tree

7 files changed

+242
-41
lines changed

7 files changed

+242
-41
lines changed

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpAdmin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,13 @@ public boolean isRunning() {
176176
*/
177177
@Override
178178
public void initialize() {
179-
redeclareBeanDeclarables();
179+
declareDeclarableBeans();
180180
}
181181

182182
/**
183183
* Process bean declarables.
184184
*/
185-
private void redeclareBeanDeclarables() {
185+
private void declareDeclarableBeans() {
186186
if (this.applicationContext == null) {
187187
LOG.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
188188
return;

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpListenerContainer.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -256,24 +256,25 @@ private void invokeListener(Consumer.Context context, com.rabbitmq.client.amqp.M
256256
}
257257
}
258258
catch (Exception ex) {
259-
if (!handleSpecialErrors(ex, context)) {
260-
try {
261-
this.errorHandler.handleError(ex);
262-
// If error handler does not re-throw an exception, treat it as a successful processing result.
259+
try {
260+
this.errorHandler.handleError(ex);
261+
// If error handler does not re-throw an exception, re-check original error.
262+
// If it is not special, treat the error handler outcome as a successful processing result.
263+
if (!handleSpecialErrors(ex, context)) {
263264
context.accept();
264265
}
265-
catch (Exception rethrow) {
266-
if (!handleSpecialErrors(rethrow, context)) {
267-
if (this.defaultRequeue) {
268-
context.requeue();
269-
}
270-
else {
271-
context.discard();
272-
}
273-
LOG.error(rethrow, () ->
274-
"The 'errorHandler' has thrown an exception. The '" + amqpMessage + "' is "
275-
+ (this.defaultRequeue ? "re-queued." : "discarded."));
266+
}
267+
catch (Exception rethrow) {
268+
if (!handleSpecialErrors(rethrow, context)) {
269+
if (this.defaultRequeue) {
270+
context.requeue();
271+
}
272+
else {
273+
context.discard();
276274
}
275+
LOG.error(rethrow, () ->
276+
"The 'errorHandler' has thrown an exception. The '" + amqpMessage + "' is "
277+
+ (this.defaultRequeue ? "re-queued." : "discarded."));
277278
}
278279
}
279280
}
@@ -321,9 +322,9 @@ public void stop(Runnable callback) {
321322
CompletableFuture<Void>[] completableFutures =
322323
this.queueToConsumers.values().stream()
323324
.flatMap(List::stream)
324-
.peek(Consumer::pause)
325325
.map((consumer) ->
326326
CompletableFuture.supplyAsync(() -> {
327+
consumer.pause();
327328
try (consumer) {
328329
while (consumer.unsettledMessageCount() > 0) {
329330
Thread.sleep(100);

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpMessageListenerAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public RabbitAmqpMessageListenerAdapter(@Nullable Object bean, @Nullable Method
5353

5454
@Override
5555
public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) {
56+
org.springframework.amqp.core.Message springMessage = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context);
5657
try {
57-
org.springframework.amqp.core.Message springMessage = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context);
5858
org.springframework.messaging.Message<?> messagingMessage = toMessagingMessage(springMessage);
5959
InvocationResult result = getHandlerAdapter()
6060
.invoke(messagingMessage,
@@ -65,7 +65,7 @@ public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer
6565
}
6666
}
6767
catch (Exception ex) {
68-
throw new ListenerExecutionFailedException("Failed to invoke listener", ex);
68+
throw new ListenerExecutionFailedException("Failed to invoke listener", ex, springMessage);
6969
}
7070
}
7171

spring-rabbitmq-client/src/test/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpAdminTests.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,6 @@ void verifyBeanDeclarations() {
6464
assertThat(template.receiveAndConvert("q3")).succeedsWithin(Duration.ofSeconds(10)).isEqualTo("test4");
6565
assertThat(template.receiveAndConvert("q4")).succeedsWithin(Duration.ofSeconds(10)).isEqualTo("test5");
6666

67-
admin.deleteQueue("q1");
68-
admin.deleteQueue("q2");
69-
admin.deleteQueue("q3");
70-
admin.deleteQueue("q4");
71-
admin.deleteExchange("e1");
72-
admin.deleteExchange("e2");
73-
admin.deleteExchange("e3");
74-
admin.deleteExchange("e4");
75-
7667
assertThat(declarables.getDeclarablesByType(Queue.class))
7768
.hasSize(1)
7869
.extracting(Queue::getName)
@@ -92,12 +83,12 @@ public static class Config {
9283

9384
@Bean
9485
DirectExchange e1() {
95-
return new DirectExchange("e1", false, false);
86+
return new DirectExchange("e1");
9687
}
9788

9889
@Bean
9990
Queue q1() {
100-
return new Queue("q1", false, false, false);
91+
return new Queue("q1");
10192
}
10293

10394
@Bean
@@ -108,15 +99,15 @@ Binding b1() {
10899
@Bean
109100
Declarables es() {
110101
return new Declarables(
111-
new DirectExchange("e2", false, false),
112-
new DirectExchange("e3", false, false));
102+
new DirectExchange("e2"),
103+
new DirectExchange("e3"));
113104
}
114105

115106
@Bean
116107
Declarables qs() {
117108
return new Declarables(
118-
new Queue("q2", false, false, false),
119-
new Queue("q3", false, false, false));
109+
new Queue("q2"),
110+
new Queue("q3"));
120111
}
121112

122113
@Bean
@@ -129,8 +120,8 @@ Declarables bs() {
129120
@Bean
130121
Declarables ds() {
131122
return new Declarables(
132-
new DirectExchange("e4", false, false),
133-
new Queue("q4", false, false, false),
123+
new DirectExchange("e4"),
124+
new Queue("q4"),
134125
new Binding("q4", Binding.DestinationType.QUEUE, "e4", "k4", null));
135126
}
136127

spring-rabbitmq-client/src/test/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplateTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,12 @@ static class Config {
102102

103103
@Bean
104104
DirectExchange e1() {
105-
return new DirectExchange("e1", false, false);
105+
return new DirectExchange("e1");
106106
}
107107

108108
@Bean
109109
Queue q1() {
110-
return new Queue("q1", false, false, false);
110+
return new Queue("q1");
111111
}
112112

113113
@Bean

spring-rabbitmq-client/src/test/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTestBase.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616

1717
package org.springframework.amqp.rabbitmq.client;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.stream.Stream;
22+
1923
import com.rabbitmq.client.amqp.Connection;
2024
import com.rabbitmq.client.amqp.Environment;
2125
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
2226

27+
import org.springframework.amqp.core.Declarable;
28+
import org.springframework.amqp.core.Declarables;
29+
import org.springframework.amqp.core.Exchange;
30+
import org.springframework.amqp.core.Queue;
2331
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
2432
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.context.Lifecycle;
2534
import org.springframework.context.annotation.Bean;
2635
import org.springframework.context.annotation.Configuration;
2736
import org.springframework.test.annotation.DirtiesContext;
@@ -36,7 +45,7 @@
3645
*/
3746
@SpringJUnitConfig
3847
@DirtiesContext
39-
abstract class RabbitAmqpTestBase extends AbstractTestContainerTests {
48+
public abstract class RabbitAmqpTestBase extends AbstractTestContainerTests {
4049

4150
@Autowired
4251
protected Environment environment;
@@ -51,7 +60,16 @@ abstract class RabbitAmqpTestBase extends AbstractTestContainerTests {
5160
protected RabbitAmqpTemplate template;
5261

5362
@Configuration
54-
public static class AmqpCommonConfig {
63+
public static class AmqpCommonConfig implements Lifecycle {
64+
65+
@Autowired
66+
List<Declarable> declarables;
67+
68+
@Autowired(required = false)
69+
List<Declarables> declarableContainers = new ArrayList<>();
70+
71+
@Autowired
72+
RabbitAmqpAdmin admin;
5573

5674
@Bean
5775
Environment environment() {
@@ -77,6 +95,34 @@ RabbitAmqpTemplate rabbitTemplate(Connection connection) {
7795
return new RabbitAmqpTemplate(connection);
7896
}
7997

98+
volatile boolean running;
99+
100+
@Override
101+
public void start() {
102+
this.running = true;
103+
}
104+
105+
@Override
106+
public boolean isRunning() {
107+
return this.running;
108+
}
109+
110+
@Override
111+
public void stop() {
112+
Stream.concat(this.declarables.stream(),
113+
this.declarableContainers.stream()
114+
.flatMap((declarables) -> declarables.getDeclarables().stream()))
115+
.filter((declarable) -> declarable instanceof Queue || declarable instanceof Exchange)
116+
.forEach((declarable) -> {
117+
if (declarable instanceof Queue queue) {
118+
this.admin.deleteQueue(queue.getName());
119+
}
120+
else {
121+
this.admin.deleteExchange(((Exchange) declarable).getName());
122+
}
123+
});
124+
}
125+
80126
}
81127

82128
}

0 commit comments

Comments
 (0)