Skip to content

Commit 3e869b4

Browse files
garyrussellartembilan
authored andcommitted
GH-1021: Fix @SendTo after error is handled
Fixes #1021 Sending the result from a `RabbitListenerErrorHandler` was broken for class-level `@RabbitListener` because the send to expression was lost. **cherry-pick to 2.1.x** * * Also capture the generic return type after the error is handled # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java
1 parent c983c04 commit 3e869b4

File tree

4 files changed

+44
-7
lines changed

4 files changed

+44
-7
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,4 +272,14 @@ public boolean hasDefaultHandler() {
272272
return this.defaultHandler != null;
273273
}
274274

275+
@Nullable
276+
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
277+
InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
278+
if (handler != null) {
279+
return new InvocationResult(result, this.handlerSendTo.get(handler),
280+
handler.getMethod().getGenericReturnType());
281+
}
282+
return null;
283+
}
284+
275285
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/HandlerAdapter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.amqp.rabbit.listener.adapter;
1818

19+
import org.springframework.lang.Nullable;
1920
import org.springframework.messaging.Message;
2021
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
2122

@@ -94,5 +95,14 @@ public Object getBean() {
9495
}
9596
}
9697

98+
@Nullable
99+
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
100+
if (this.invokerHandlerMethod != null) {
101+
return new InvocationResult(result, null, this.invokerHandlerMethod.getMethod().getGenericReturnType());
102+
}
103+
else {
104+
return this.delegatingHandler.getInvocationResultFor(result, inboundPayload);
105+
}
106+
}
97107

98108
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel
142142
.build();
143143
Object errorResult = this.errorHandler.handleError(amqpMessage, message, e);
144144
if (errorResult != null) {
145-
handleResult(new InvocationResult(errorResult, null, null), amqpMessage, channel, message);
145+
handleResult(this.handlerAdapter.getInvocationResultFor(errorResult, message.getPayload()),
146+
amqpMessage, channel, message);
146147
}
147148
else {
148149
logger.trace("Error handler returned no result");

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,11 @@ public void multiListener() {
356356
rabbitTemplate.convertAndSend("multi.exch", "multi.rk", bar);
357357
rabbitTemplate.setReceiveTimeout(10000);
358358
assertEquals("BAR: bar", this.rabbitTemplate.receiveAndConvert("sendTo.replies"));
359+
bar.field = "crash";
360+
rabbitTemplate.convertAndSend("multi.exch", "multi.rk", bar);
361+
assertThat(this.rabbitTemplate.receiveAndConvert("sendTo.replies"))
362+
.isEqualTo("CRASHCRASH Test reply from error handler");
363+
bar.field = "bar";
359364
Baz baz = new Baz();
360365
baz.field = "baz";
361366
assertEquals("BAZ: baz", rabbitTemplate.convertSendAndReceive("multi.exch", "multi.rk", baz));
@@ -1518,14 +1523,22 @@ public MyService myService() {
15181523

15191524
@Bean
15201525
public RabbitListenerErrorHandler alwaysBARHandler() {
1521-
return (m, sm, e) -> "BAR";
1526+
return (msg, springMsg, ex) -> "BAR";
1527+
}
1528+
1529+
@Bean
1530+
public RabbitListenerErrorHandler upcaseAndRepeatErrorHandler() {
1531+
return (msg, springMsg, ex) -> {
1532+
String payload = ((Bar) springMsg.getPayload()).field.toUpperCase();
1533+
return payload + payload + " " + ex.getCause().getMessage();
1534+
};
15221535
}
15231536

15241537
@Bean
15251538
public RabbitListenerErrorHandler throwANewException() {
1526-
return (m, sm, e) -> {
1527-
this.errorHandlerChannel = sm.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
1528-
throw new RuntimeException("from error handler", e.getCause());
1539+
return (msg, springMsg, ex) -> {
1540+
this.errorHandlerChannel = springMsg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
1541+
throw new RuntimeException("from error handler", ex.getCause());
15291542
};
15301543
}
15311544

@@ -1600,7 +1613,7 @@ public TxClassLevel txClassLevel() {
16001613

16011614
@Bean
16021615
public org.springframework.amqp.core.Queue sendToReplies() {
1603-
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), false, false, true);
1616+
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), false, false, false);
16041617
}
16051618

16061619
@Bean
@@ -1635,12 +1648,15 @@ public DirectExchange internal() {
16351648
@RabbitListener(bindings = @QueueBinding
16361649
(value = @Queue,
16371650
exchange = @Exchange(value = "multi.exch", autoDelete = "true"),
1638-
key = "multi.rk"))
1651+
key = "multi.rk"), errorHandler = "upcaseAndRepeatErrorHandler")
16391652
static class MultiListenerBean {
16401653

16411654
@RabbitHandler
16421655
@SendTo("${foo.bar:#{sendToRepliesBean}}")
16431656
public String bar(@NonNull Bar bar) {
1657+
if (bar.field.equals("crash")) {
1658+
throw new RuntimeException("Test reply from error handler");
1659+
}
16441660
return "BAR: " + bar.field;
16451661
}
16461662

0 commit comments

Comments
 (0)