Skip to content

Commit 8baaa76

Browse files
committed
GH-3163: Propagate EH error down to the super.asyncFailure
Fixes: #3163 The `MessagingMessageListenerAdapter` in its `asyncFailure()` calls the `handleException()`, but uses the same input `Throwable` for the `super.asyncFailure()`. This may not lead to expected state of the system, when an `AmqpRejectAndDontRequeueException` might be expected as a trigger to DLQ the failed message. * Fix `MessagingMessageListenerAdapter.asyncFailure()` to override error with an exception thrown from the `handleException()` before calling `super.asyncFailure()` **Auto-cherry-pick to `3.2.x`**
1 parent 7d633ae commit 8baaa76

File tree

2 files changed

+38
-8
lines changed

2 files changed

+38
-8
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,15 +172,18 @@ public void onMessage(org.springframework.amqp.core.Message amqpMessage, @Nullab
172172
protected void asyncFailure(org.springframework.amqp.core.Message request, @Nullable Channel channel, Throwable t,
173173
@Nullable Object source) {
174174

175+
Throwable throwableToHandle = t;
176+
175177
try {
176178
handleException(request, channel, (Message<?>) source,
177-
new ListenerExecutionFailedException("Async Fail", t, request));
179+
new ListenerExecutionFailedException("Async Fail", throwableToHandle, request));
178180
return;
179181
}
180182
catch (Exception ex) {
181-
// Ignore
183+
throwableToHandle = ex;
182184
}
183-
super.asyncFailure(request, channel, t, source);
185+
186+
super.asyncFailure(request, channel, throwableToHandle, source);
184187
}
185188

186189
protected void handleException(org.springframework.amqp.core.Message amqpMessage, @Nullable Channel channel,

spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ import assertk.assertions.isInstanceOf
2323
import assertk.assertions.isNotNull
2424
import assertk.assertions.isTrue
2525
import org.junit.jupiter.api.Test
26+
import org.springframework.amqp.AmqpRejectAndDontRequeueException
2627
import org.springframework.amqp.core.AcknowledgeMode
2728
import org.springframework.amqp.core.Message
29+
import org.springframework.amqp.core.QueueBuilder
2830
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
2931
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
32+
import org.springframework.amqp.rabbit.core.RabbitAdmin
3033
import org.springframework.amqp.rabbit.core.RabbitTemplate
3134
import org.springframework.amqp.rabbit.junit.RabbitAvailable
3235
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition
@@ -56,7 +59,7 @@ import java.util.concurrent.TimeUnit
5659
*
5760
*/
5861
@SpringJUnitConfig
59-
@RabbitAvailable(queues = ["kotlinQueue", "kotlinBatchQueue", "kotlinQueue1", "kotlinReplyQueue"])
62+
@RabbitAvailable(queues = ["kotlinDLQ", "kotlinBatchQueue", "kotlinQueue1", "kotlinReplyQueue"])
6063
@DirtiesContext
6164
class EnableRabbitKotlinTests {
6265

@@ -67,7 +70,7 @@ class EnableRabbitKotlinTests {
6770
private lateinit var registry: RabbitListenerEndpointRegistry
6871

6972
@Test
70-
fun `send and wait for consume`() {
73+
fun `send and wait for consume and DLQ on error`() {
7174
val template = RabbitTemplate(this.config.cf())
7275
template.setReplyTimeout(10_000)
7376
val result = template.convertSendAndReceive("kotlinQueue", "test")
@@ -83,6 +86,11 @@ class EnableRabbitKotlinTests {
8386
)
8487
.isEqualTo("class java.lang.String")
8588
}
89+
90+
template.convertAndSend("kotlinQueue", "junk")
91+
92+
var dlqResult = template.receiveAndConvert("kotlinDLQ", 30_000)
93+
assertThat(dlqResult).isEqualTo("junk")
8694
}
8795

8896
@Test
@@ -114,9 +122,25 @@ class EnableRabbitKotlinTests {
114122
@EnableRabbit
115123
class Config {
116124

117-
@RabbitListener(id = "single", queues = ["kotlinQueue"])
118-
suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String): String? {
119-
return data.uppercase()
125+
@Bean
126+
fun kotlinQueue() =
127+
QueueBuilder.durable("kotlinQueue")
128+
.deadLetterExchange("")
129+
.deadLetterRoutingKey("kotlinDLQ")
130+
.build()
131+
132+
@RabbitListener(id = "single", queues = ["kotlinQueue"], errorHandler = "dontRequeueErrorHandler")
133+
suspend fun handle(data: String): String? {
134+
if ("junk" == data) {
135+
throw IllegalArgumentException("Illegal data: $data")
136+
} else {
137+
return data.uppercase()
138+
}
139+
}
140+
141+
@Bean
142+
fun dontRequeueErrorHandler() = RabbitListenerErrorHandler { _, _, _, ex ->
143+
throw AmqpRejectAndDontRequeueException(ex)
120144
}
121145

122146
val batchReceived = CountDownLatch(1)
@@ -132,6 +156,9 @@ class EnableRabbitKotlinTests {
132156
batchReceived.countDown()
133157
}
134158

159+
@Bean
160+
fun rabbitAdmin(cf: CachingConnectionFactory) = RabbitAdmin(cf)
161+
135162
@Bean
136163
fun rabbitListenerContainerFactory(cf: CachingConnectionFactory) =
137164
SimpleRabbitListenerContainerFactory().also {

0 commit comments

Comments
 (0)