Skip to content

Commit 0e597c9

Browse files
garyrussellartembilan
authored andcommitted
AMQP-827: Fix @rl reply Message<?> conversion
JIRA: https://jira.spring.io/browse/AMQP-827 Use the correct `payloadConverter` in the `MessagingMessageConverter` to support `@RabbitListener` `Message<?>` return types. **cherry-pick to 2.0.x, 1.7.x** # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java
1 parent 97552bc commit 0e597c9

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ protected final MessagingMessageConverter getMessagingMessageConverter() {
9797
return this.messagingMessageConverter;
9898
}
9999

100+
@Override
101+
public void setMessageConverter(MessageConverter messageConverter) {
102+
super.setMessageConverter(messageConverter);
103+
this.messagingMessageConverter.setPayloadConverter(messageConverter);
104+
}
105+
100106
@Override
101107
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception {
102108
Message<?> message = toMessagingMessage(amqpMessage);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818

1919
import static org.hamcrest.Matchers.contains;
2020
import static org.hamcrest.Matchers.containsString;
21+
import static org.hamcrest.Matchers.equalTo;
2122
import static org.hamcrest.Matchers.instanceOf;
23+
import static org.hamcrest.Matchers.is;
24+
import static org.hamcrest.Matchers.notNullValue;
2225
import static org.hamcrest.Matchers.startsWith;
2326
import static org.junit.Assert.assertEquals;
2427
import static org.junit.Assert.assertFalse;
@@ -36,6 +39,7 @@
3639
import java.util.ArrayList;
3740
import java.util.Arrays;
3841
import java.util.Collection;
42+
import java.util.Collections;
3943
import java.util.Date;
4044
import java.util.HashMap;
4145
import java.util.List;
@@ -65,6 +69,7 @@
6569
import org.springframework.amqp.core.Message;
6670
import org.springframework.amqp.core.MessagePostProcessor;
6771
import org.springframework.amqp.core.MessageProperties;
72+
import org.springframework.amqp.core.MessagePropertiesBuilder;
6873
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
6974
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
7075
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -111,6 +116,7 @@
111116
import org.springframework.messaging.handler.annotation.Payload;
112117
import org.springframework.messaging.handler.annotation.SendTo;
113118
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
119+
import org.springframework.messaging.support.GenericMessage;
114120
import org.springframework.messaging.support.MessageBuilder;
115121
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
116122
import org.springframework.test.annotation.DirtiesContext;
@@ -153,7 +159,8 @@ public class EnableRabbitIntegrationTests {
153159
"test.converted", "test.converted.list", "test.converted.array", "test.converted.args1",
154160
"test.converted.args2", "test.converted.message", "test.notconverted.message",
155161
"test.notconverted.channel", "test.notconverted.messagechannel", "test.notconverted.messagingmessage",
156-
"test.converted.foomessage", "test.notconverted.messagingmessagenotgeneric", "amqp656dlq");
162+
"test.converted.foomessage", "test.notconverted.messagingmessagenotgeneric", "amqp656dlq",
163+
"test.messaging.message", "test.amqp.message");
157164

158165
@Autowired
159166
private RabbitTemplate rabbitTemplate;
@@ -577,6 +584,26 @@ public void testPrototypeCache() {
577584
assertSame(value, typeCache.get(Foo1.class));
578585
}
579586

587+
@Test
588+
public void messagingMessageReturned() {
589+
Message message = org.springframework.amqp.core.MessageBuilder.withBody("\"messaging\"".getBytes())
590+
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").build()).build();
591+
message = this.rabbitTemplate.sendAndReceive("test.messaging.message", message);
592+
assertThat(message, is(notNullValue()));
593+
assertThat(new String(message.getBody()), equalTo("{\"field\":\"MESSAGING\"}"));
594+
assertThat(message.getMessageProperties().getHeaders().get("foo"), equalTo("bar"));
595+
}
596+
597+
@Test
598+
public void amqpMessageReturned() {
599+
Message message = org.springframework.amqp.core.MessageBuilder.withBody("amqp".getBytes())
600+
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("text/plain").build()).build();
601+
message = this.rabbitTemplate.sendAndReceive("test.amqp.message", message);
602+
assertThat(message, is(notNullValue()));
603+
assertThat(new String(message.getBody()), equalTo("AMQP"));
604+
assertThat(message.getMessageProperties().getHeaders().get("foo"), equalTo("bar"));
605+
}
606+
580607
interface TxService {
581608

582609
@Transactional
@@ -811,6 +838,22 @@ public String handleWithInternalExchangeIgnore(String foo) {
811838
public String handleWithDeadLetterDefaultExchange(String foo) {
812839
throw new AmqpRejectAndDontRequeueException("dlq");
813840
}
841+
@RabbitListener(queues = "test.messaging.message", containerFactory = "simpleJsonListenerContainerFactory")
842+
public org.springframework.messaging.Message<Bar> messagingMessage(String in) {
843+
Bar bar = new Bar();
844+
bar.field = in.toUpperCase();
845+
return new GenericMessage<>(bar, Collections.singletonMap("foo", "bar"));
846+
}
847+
848+
@RabbitListener(queues = "test.amqp.message")
849+
public Message amqpMessage(String in) {
850+
return org.springframework.amqp.core.MessageBuilder.withBody(in.toUpperCase().getBytes())
851+
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("text/plain")
852+
.setHeader("foo", "bar")
853+
.build())
854+
.build();
855+
}
856+
814857
}
815858

816859
public static class Foo1 {

0 commit comments

Comments
 (0)