Skip to content

Commit ca32f3f

Browse files
garyrussellartembilan
authored andcommitted
AMQP-790: Fix after receive MPPs with send/receive
JIRA: https://jira.spring.io/browse/AMQP-790 Previously, `afterReceivePostProcessors` were not called on `sendAndReceive()` operations. # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java
1 parent 5b7eebd commit ca32f3f

File tree

3 files changed

+178
-13
lines changed

3 files changed

+178
-13
lines changed

spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunning.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ public ConnectionFactory getConnectionFactory() {
551551
this.connectionFactory.setPort(this.port);
552552
this.connectionFactory.setUsername(this.user);
553553
this.connectionFactory.setPassword(this.password);
554+
this.connectionFactory.setAutomaticRecoveryEnabled(false);
554555
}
555556
return this.connectionFactory;
556557
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@ public void convertAndSend(String routingKey, Object message, MessagePostProcess
792792
}
793793

794794
public void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor,
795-
CorrelationData correlationData)
795+
CorrelationData correlationData)
796796
throws AmqpException {
797797
convertAndSend(this.exchange, routingKey, message, messagePostProcessor, correlationData);
798798
}
@@ -808,7 +808,7 @@ public void convertAndSend(String exchange, String routingKey, final Object mess
808808
Message messageToSend = convertMessageIfNecessary(message);
809809
messageToSend = messagePostProcessor instanceof CorrelationAwareMessagePostProcessor
810810
? ((CorrelationAwareMessagePostProcessor) messagePostProcessor)
811-
.postProcessMessage(messageToSend, correlationData)
811+
.postProcessMessage(messageToSend, correlationData)
812812
: messagePostProcessor.postProcessMessage(messageToSend);
813813
send(exchange, routingKey, messageToSend, correlationData);
814814
}
@@ -954,7 +954,7 @@ public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, fi
954954

955955
@Override
956956
public <R, S> boolean receiveAndReply(final String queueName, ReceiveAndReplyCallback<R, S> callback, final String replyExchange,
957-
final String replyRoutingKey) throws AmqpException {
957+
final String replyRoutingKey) throws AmqpException {
958958
return this.receiveAndReply(queueName, callback, new ReplyToAddressCallback<S>() {
959959

960960
@Override
@@ -973,13 +973,13 @@ public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, Re
973973

974974
@Override
975975
public <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
976-
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
976+
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
977977
return doReceiveAndReply(queueName, callback, replyToAddressCallback);
978978
}
979979

980980
@SuppressWarnings("unchecked")
981981
private <R, S> boolean doReceiveAndReply(final String queueName, final ReceiveAndReplyCallback<R, S> callback,
982-
final ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
982+
final ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
983983
return this.execute(new ChannelCallback<Boolean>() {
984984

985985
@SuppressWarnings("deprecation")
@@ -1212,7 +1212,7 @@ protected Message convertSendAndReceiveRaw(final String exchange, final String r
12121212
if (messagePostProcessor != null) {
12131213
requestMessage = messagePostProcessor instanceof CorrelationAwareMessagePostProcessor
12141214
? ((CorrelationAwareMessagePostProcessor) messagePostProcessor)
1215-
.postProcessMessage(requestMessage, correlationData)
1215+
.postProcessMessage(requestMessage, correlationData)
12161216
: messagePostProcessor.postProcessMessage(requestMessage);
12171217
}
12181218
Message replyMessage = doSendAndReceive(exchange, routingKey, requestMessage, correlationData);
@@ -1279,13 +1279,18 @@ public Message doInRabbit(Channel channel) throws Exception {
12791279

12801280
@Override
12811281
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
1282-
byte[] body) throws IOException {
1282+
byte[] body) throws IOException {
12831283
MessageProperties messageProperties = RabbitTemplate.this.messagePropertiesConverter
12841284
.toMessageProperties(properties, envelope, RabbitTemplate.this.encoding);
12851285
Message reply = new Message(body, messageProperties);
12861286
if (logger.isTraceEnabled()) {
12871287
logger.trace("Message received " + reply);
12881288
}
1289+
if (RabbitTemplate.this.afterReceivePostProcessors != null) {
1290+
for (MessagePostProcessor processor : RabbitTemplate.this.afterReceivePostProcessors) {
1291+
reply = processor.postProcessMessage(reply);
1292+
}
1293+
}
12891294
pendingReply.reply(reply);
12901295
}
12911296
};
@@ -1300,7 +1305,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
13001305
try {
13011306
channel.basicCancel(consumerTag);
13021307
}
1303-
catch (Exception e) { }
1308+
catch (Exception e) {
1309+
}
13041310
}
13051311
return reply;
13061312
}
@@ -1310,7 +1316,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
13101316
protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message,
13111317
final CorrelationData correlationData) {
13121318
Assert.state(this.isListener, "RabbitTemplate is not configured as MessageListener - "
1313-
+ "cannot use a 'replyAddress': " + this.replyAddress);
1319+
+ "cannot use a 'replyAddress': " + this.replyAddress);
13141320
return this.execute(new ChannelCallback<Message>() {
13151321

13161322
@SuppressWarnings("deprecation")
@@ -1516,7 +1522,7 @@ protected void doSend(Channel channel, String exchange, String routingKey, Messa
15161522
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
15171523
messageToUse = processor instanceof CorrelationAwareMessagePostProcessor
15181524
? ((CorrelationAwareMessagePostProcessor) processor)
1519-
.postProcessMessage(messageToUse, correlationData)
1525+
.postProcessMessage(messageToUse, correlationData)
15201526
: processor.postProcessMessage(messageToUse);
15211527
}
15221528
}
@@ -1564,6 +1570,7 @@ protected boolean isChannelLocallyTransacted(Channel channel) {
15641570
private Message buildMessageFromDelivery(com.rabbitmq.client.QueueingConsumer.Delivery delivery) {
15651571
return buildMessage(delivery.getEnvelope(), delivery.getProperties(), delivery.getBody(), -1);
15661572
}
1573+
15671574
private Message buildMessageFromResponse(GetResponse response) {
15681575
return buildMessage(response.getEnvelope(), response.getProps(), response.getBody(), response.getMessageCount());
15691576
}
@@ -1640,7 +1647,7 @@ private void addListener(Channel channel) {
16401647
else {
16411648
throw new IllegalStateException(
16421649
"Channel does not support confirms or returns; " +
1643-
"is the connection factory configured for confirms or returns?");
1650+
"is the connection factory configured for confirms or returns?");
16441651
}
16451652
}
16461653

@@ -1663,7 +1670,7 @@ public void handleReturn(int replyCode,
16631670
String routingKey,
16641671
BasicProperties properties,
16651672
byte[] body)
1666-
throws IOException {
1673+
throws IOException {
16671674

16681675
ReturnCallback returnCallback = this.returnCallback;
16691676
if (returnCallback == null) {
@@ -1766,7 +1773,7 @@ public void onMessage(Message message) {
17661773
else {
17671774
if (savedCorrelation != null) {
17681775
message.getMessageProperties().setHeader(this.correlationKey,
1769-
savedCorrelation);
1776+
savedCorrelation);
17701777
}
17711778
else {
17721779
message.getMessageProperties().getHeaders().remove(this.correlationKey);
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.core;
18+
19+
import static org.junit.Assert.assertTrue;
20+
21+
import org.junit.AfterClass;
22+
import org.junit.ClassRule;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
26+
import org.springframework.amqp.core.Message;
27+
import org.springframework.amqp.core.MessagePostProcessor;
28+
import org.springframework.amqp.core.MessageProperties;
29+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
30+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
31+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
32+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
33+
import org.springframework.amqp.rabbit.junit.BrokerRunning;
34+
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
35+
import org.springframework.beans.factory.annotation.Autowired;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.test.annotation.DirtiesContext;
39+
import org.springframework.test.annotation.DirtiesContext.ClassMode;
40+
import org.springframework.test.context.junit4.SpringRunner;
41+
42+
/**
43+
* @author Gary Russell
44+
* @since 1.7.6
45+
*
46+
*/
47+
@RunWith(SpringRunner.class)
48+
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
49+
public class RabbitTemplateMPPIntegrationTests {
50+
51+
private static final String QUEUE = "mpp.tests";
52+
53+
private static final String REPLIES = "mpp.tests.replies";
54+
55+
@ClassRule
56+
public static BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(QUEUE, REPLIES);
57+
58+
@Autowired
59+
private RabbitTemplate template;
60+
61+
@Autowired
62+
private Config config;
63+
64+
@AfterClass
65+
public static void tearDown() {
66+
brokerIsRunning.removeTestQueues();
67+
}
68+
69+
@Test
70+
public void testMPPsAppliedDirectReplyToContainerTests() {
71+
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
72+
assertTrue("before MPP not called", this.config.beforeMppCalled);
73+
assertTrue("after MPP not called", this.config.afterMppCalled);
74+
}
75+
76+
@Test
77+
public void testMPPsAppliedDirectReplyToTests() {
78+
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
79+
assertTrue("before MPP not called", this.config.beforeMppCalled);
80+
assertTrue("after MPP not called", this.config.afterMppCalled);
81+
}
82+
83+
@Test
84+
public void testMPPsAppliedTemporaryReplyQueueTests() {
85+
this.template.setUseTemporaryReplyQueues(true);
86+
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
87+
assertTrue("before MPP not called", this.config.beforeMppCalled);
88+
assertTrue("after MPP not called", this.config.afterMppCalled);
89+
}
90+
91+
@Test
92+
public void testMPPsAppliedReplyContainerTests() {
93+
this.template.setReplyAddress(REPLIES);
94+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.config.cf());
95+
try {
96+
container.setQueueNames(REPLIES);
97+
container.setMessageListener(this.template);
98+
container.setAfterReceivePostProcessors(this.config.afterMPP());
99+
container.afterPropertiesSet();
100+
container.start();
101+
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
102+
assertTrue("before MPP not called", this.config.beforeMppCalled);
103+
assertTrue("after MPP not called", this.config.afterMppCalled);
104+
}
105+
finally {
106+
container.stop();
107+
}
108+
}
109+
110+
@Configuration
111+
@EnableRabbit
112+
public static class Config {
113+
114+
private boolean beforeMppCalled;
115+
116+
private boolean afterMppCalled;
117+
118+
@Bean
119+
public CachingConnectionFactory cf() {
120+
return new CachingConnectionFactory(brokerIsRunning.getConnectionFactory());
121+
}
122+
123+
@Bean
124+
public RabbitTemplate template() {
125+
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf());
126+
rabbitTemplate.setRoutingKey(QUEUE);
127+
rabbitTemplate.setBeforePublishPostProcessors(m -> {
128+
this.beforeMppCalled = true;
129+
return m;
130+
});
131+
rabbitTemplate.setAfterReceivePostProcessors(afterMPP());
132+
return rabbitTemplate;
133+
}
134+
135+
@Bean
136+
public MessagePostProcessor afterMPP() {
137+
return m -> {
138+
this.afterMppCalled = true;
139+
return m;
140+
};
141+
}
142+
143+
@Bean
144+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
145+
SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
146+
cf.setConnectionFactory(cf());
147+
return cf;
148+
}
149+
150+
@RabbitListener(queues = QUEUE)
151+
public byte[] foo(byte[] in) {
152+
return in;
153+
}
154+
155+
}
156+
157+
}

0 commit comments

Comments
 (0)