Skip to content

Commit 142fd7f

Browse files
artembilangaryrussell
authored andcommitted
GH-920: Propagate afterReceive to AsyncRTemplate
Fixes #920 The `AsyncRabbitTemplate` is missing an after receive post-processing in its internal containers for replies * Reuse `afterPostProcessors` from the provided `RabbitTemplate` and propagate them into the internal listener containers for replies * Fix `AbstractCompressingPostProcessor` do not mutate provided `MessageProperties` and build a fresh instance. * Demonstrate post-processors propagation by the `GZipPostProcessor` & `GUnzipPostProcessor` configuration in the `AsyncRabbitTemplateTests` **Cherry-pick to 2.1.x** * Use `JavaUtils` for nullable settings in the `AsyncRabbitTemplate` * Add `copyProperties` option to the `AbstractCompressingPostProcessor` with `false` by default for better performance * Document a `copyProperties` in the `amqp.adoc`
1 parent ce2f515 commit 142fd7f

File tree

5 files changed

+101
-28
lines changed

5 files changed

+101
-28
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractCompressingPostProcessor.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.amqp.core.Message;
3030
import org.springframework.amqp.core.MessagePostProcessor;
3131
import org.springframework.amqp.core.MessageProperties;
32+
import org.springframework.amqp.core.MessagePropertiesBuilder;
3233
import org.springframework.core.Ordered;
3334
import org.springframework.util.FileCopyUtils;
3435

@@ -38,16 +39,20 @@
3839
* present.
3940
*
4041
* @author Gary Russell
42+
* @author Artem Bilan
43+
*
4144
* @since 1.4.2
4245
*/
4346
public abstract class AbstractCompressingPostProcessor implements MessagePostProcessor, Ordered {
4447

45-
private final Log logger = LogFactory.getLog(this.getClass());
48+
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final
4649

4750
private final boolean autoDecompress;
4851

4952
private int order;
5053

54+
private boolean copyProperties = false;
55+
5156
/**
5257
* Construct a post processor that will include the
5358
* {@link MessageProperties#SPRING_AUTO_DECOMPRESS} header set to 'true'.
@@ -68,23 +73,47 @@ public AbstractCompressingPostProcessor(boolean autoDecompress) {
6873
this.autoDecompress = autoDecompress;
6974
}
7075

76+
/**
77+
* Flag to indicate if {@link MessageProperties} should be used as is or cloned for new message
78+
* after compression.
79+
* By default this flag is turned off for better performance since in most cases the original message
80+
* is not used any more.
81+
* @param copyProperties clone or reuse original message properties.
82+
* @since 2.1.5
83+
*/
84+
public void setCopyProperties(boolean copyProperties) {
85+
this.copyProperties = copyProperties;
86+
}
87+
7188
@Override
7289
public Message postProcessMessage(Message message) throws AmqpException {
73-
ByteArrayOutputStream zipped = new ByteArrayOutputStream();
7490
try {
91+
ByteArrayOutputStream zipped = new ByteArrayOutputStream();
7592
OutputStream zipper = getCompressorStream(zipped);
7693
FileCopyUtils.copy(new ByteArrayInputStream(message.getBody()), zipper);
77-
MessageProperties messageProperties = message.getMessageProperties();
78-
String currentEncoding = messageProperties.getContentEncoding();
79-
messageProperties
80-
.setContentEncoding(getEncoding() + (currentEncoding == null ? "" : ":" + currentEncoding));
81-
if (this.autoDecompress) {
82-
messageProperties.setHeader(MessageProperties.SPRING_AUTO_DECOMPRESS, true);
83-
}
8494
byte[] compressed = zipped.toByteArray();
8595
if (this.logger.isTraceEnabled()) {
8696
this.logger.trace("Compressed " + message.getBody().length + " to " + compressed.length);
8797
}
98+
99+
MessageProperties originalProperties = message.getMessageProperties();
100+
101+
MessagePropertiesBuilder messagePropertiesBuilder =
102+
this.copyProperties
103+
? MessagePropertiesBuilder.fromClonedProperties(originalProperties)
104+
: MessagePropertiesBuilder.fromProperties(originalProperties);
105+
106+
if (this.autoDecompress) {
107+
messagePropertiesBuilder.setHeader(MessageProperties.SPRING_AUTO_DECOMPRESS, true);
108+
}
109+
110+
MessageProperties messageProperties =
111+
messagePropertiesBuilder.setContentEncoding(getEncoding() +
112+
(originalProperties.getContentEncoding() == null
113+
? ""
114+
: ":" + originalProperties.getContentEncoding()))
115+
.build();
116+
88117
return new Message(compressed, messageProperties);
89118
}
90119
catch (IOException e) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
4949
import org.springframework.amqp.support.converter.MessageConverter;
5050
import org.springframework.amqp.support.converter.SmartMessageConverter;
51+
import org.springframework.amqp.utils.JavaUtils;
5152
import org.springframework.beans.factory.BeanNameAware;
5253
import org.springframework.context.SmartLifecycle;
5354
import org.springframework.core.ParameterizedTypeReference;
@@ -157,6 +158,10 @@ public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange,
157158
this.template.setExchange(exchange == null ? "" : exchange);
158159
this.template.setRoutingKey(routingKey);
159160
this.container = new SimpleMessageListenerContainer(connectionFactory);
161+
JavaUtils.INSTANCE
162+
.acceptIfNotNull(this.template.getAfterReceivePostProcessors(),
163+
(value) -> this.container.setAfterReceivePostProcessors(
164+
value.toArray(new MessagePostProcessor[0])));
160165
this.container.setQueueNames(replyQueue);
161166
this.container.setMessageListener(this);
162167
this.container.afterPropertiesSet();
@@ -216,15 +221,10 @@ public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerConta
216221
* @since 2.0
217222
*/
218223
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey) {
219-
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
224+
this(new RabbitTemplate(connectionFactory));
220225
Assert.notNull(routingKey, "'routingKey' cannot be null");
221-
this.template = new RabbitTemplate(connectionFactory);
222226
this.template.setExchange(exchange == null ? "" : exchange);
223227
this.template.setRoutingKey(routingKey);
224-
this.container = null;
225-
this.replyAddress = null;
226-
this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
227-
this.directReplyToContainer.setMessageListener(this);
228228
}
229229

230230
/**
@@ -239,6 +239,10 @@ public AsyncRabbitTemplate(RabbitTemplate template) {
239239
this.container = null;
240240
this.replyAddress = null;
241241
this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
242+
JavaUtils.INSTANCE
243+
.acceptIfNotNull(template.getAfterReceivePostProcessors(),
244+
(value) -> this.directReplyToContainer.setAfterReceivePostProcessors(
245+
value.toArray(new MessagePostProcessor[0])));
242246
this.directReplyToContainer.setMessageListener(this);
243247
}
244248

@@ -573,9 +577,9 @@ public void onMessage(Message message, Channel channel) {
573577
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
574578
Object converted = rabbitFuture.getReturnType() != null
575579
&& messageConverter instanceof SmartMessageConverter
576-
? ((SmartMessageConverter) messageConverter).fromMessage(message,
577-
rabbitFuture.getReturnType())
578-
: messageConverter.fromMessage(message);
580+
? ((SmartMessageConverter) messageConverter).fromMessage(message,
581+
rabbitFuture.getReturnType())
582+
: messageConverter.fromMessage(message);
579583
rabbitFuture.set(converted);
580584
}
581585
else {
@@ -689,7 +693,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
689693
}
690694
AsyncRabbitTemplate.this.pending.remove(this.correlationId);
691695
if (this.channelHolder != null && AsyncRabbitTemplate.this.directReplyToContainer != null) {
692-
AsyncRabbitTemplate.this.directReplyToContainer.releaseConsumerFor(this.channelHolder, false, null); // NOSONAR
696+
AsyncRabbitTemplate.this.directReplyToContainer
697+
.releaseConsumerFor(this.channelHolder, false, null); // NOSONAR
693698
}
694699
return super.cancel(mayInterruptIfRunning);
695700
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ protected void initDefaultStrategies() {
290290
*
291291
* @param exchange the exchange name to use for send operations
292292
*/
293-
public void setExchange(String exchange) {
293+
public void setExchange(@Nullable String exchange) {
294294
this.exchange = (exchange != null) ? exchange : DEFAULT_EXCHANGE;
295295
}
296296

@@ -645,6 +645,18 @@ public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePo
645645
this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
646646
}
647647

648+
/**
649+
* Return configured after receive {@link MessagePostProcessor}s or {@code null}.
650+
* @return configured after receive {@link MessagePostProcessor}s or {@code null}.
651+
* @since 2.1.5
652+
*/
653+
@Nullable
654+
public Collection<MessagePostProcessor> getAfterReceivePostProcessors() {
655+
return this.afterReceivePostProcessors != null
656+
? Collections.unmodifiableCollection(this.afterReceivePostProcessors)
657+
: null;
658+
}
659+
648660
/**
649661
* Add {@link MessagePostProcessor} that will be invoked immediately after a {@code Channel#basicGet()}
650662
* and before any message conversion is performed.

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
5757
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
5858
import org.springframework.amqp.support.converter.SimpleMessageConverter;
59+
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
60+
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
5961
import org.springframework.amqp.utils.test.TestUtils;
6062
import org.springframework.beans.factory.annotation.Autowired;
6163
import org.springframework.context.annotation.Bean;
@@ -117,7 +119,8 @@ public void testConvert1ArgDirect() throws Exception {
117119
this.latch.set(null);
118120
waitForZeroInUseConsumers();
119121
assertThat(TestUtils
120-
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount", Integer.class),
122+
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
123+
Integer.class),
121124
equalTo(2));
122125
final String missingQueue = UUID.randomUUID().toString();
123126
this.asyncDirectTemplate.convertSendAndReceive("", missingQueue, "foo"); // send to nowhere
@@ -174,12 +177,14 @@ public void testMessage1ArgDirect() throws Exception {
174177
this.latch.set(null);
175178
waitForZeroInUseConsumers();
176179
assertThat(TestUtils
177-
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount", Integer.class),
180+
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
181+
Integer.class),
178182
equalTo(2));
179183
this.asyncDirectTemplate.stop();
180184
this.asyncDirectTemplate.start();
181185
assertThat(TestUtils
182-
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount", Integer.class),
186+
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
187+
Integer.class),
183188
equalTo(0));
184189
}
185190

@@ -232,7 +237,8 @@ private Message getFooMessage() {
232237
@DirtiesContext
233238
public void testReturn() throws Exception {
234239
this.asyncTemplate.setMandatory(true);
235-
ListenableFuture<String> future = this.asyncTemplate.convertSendAndReceive(this.requests.getName() + "x", "foo");
240+
ListenableFuture<String> future = this.asyncTemplate.convertSendAndReceive(this.requests.getName() + "x",
241+
"foo");
236242
try {
237243
future.get(10, TimeUnit.SECONDS);
238244
fail("Expected exception");
@@ -388,7 +394,7 @@ public void testStopCancelled() throws Exception {
388394

389395
private void checkConverterResult(ListenableFuture<String> future, String expected) throws InterruptedException {
390396
final CountDownLatch latch = new CountDownLatch(1);
391-
final AtomicReference<String> resultRef = new AtomicReference<String>();
397+
final AtomicReference<String> resultRef = new AtomicReference<>();
392398
future.addCallback(new ListenableFutureCallback<String>() {
393399

394400
@Override
@@ -409,7 +415,7 @@ public void onFailure(Throwable ex) {
409415

410416
private Message checkMessageResult(ListenableFuture<Message> future, String expected) throws InterruptedException {
411417
final CountDownLatch latch = new CountDownLatch(1);
412-
final AtomicReference<Message> resultRef = new AtomicReference<Message>();
418+
final AtomicReference<Message> resultRef = new AtomicReference<>();
413419
future.addCallback(new ListenableFutureCallback<Message>() {
414420

415421
@Override
@@ -482,24 +488,36 @@ public RabbitAdmin admin(ConnectionFactory connectionFactory) {
482488
return new RabbitAdmin(connectionFactory);
483489
}
484490

491+
@Bean
492+
public GZipPostProcessor gZipPostProcessor() {
493+
GZipPostProcessor gZipPostProcessor = new GZipPostProcessor();
494+
gZipPostProcessor.setCopyProperties(true);
495+
return gZipPostProcessor;
496+
}
497+
485498
@Bean
486499
public RabbitTemplate template(ConnectionFactory connectionFactory) {
487500
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
488501
rabbitTemplate.setRoutingKey(requests().getName());
502+
rabbitTemplate.addBeforePublishPostProcessors(gZipPostProcessor());
503+
rabbitTemplate.addAfterReceivePostProcessors(new GUnzipPostProcessor());
489504
return rabbitTemplate;
490505
}
491506

492507
@Bean
493508
public RabbitTemplate templateForDirect(ConnectionFactory connectionFactory) {
494509
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
495510
rabbitTemplate.setRoutingKey(requests().getName());
511+
rabbitTemplate.addBeforePublishPostProcessors(gZipPostProcessor());
512+
rabbitTemplate.addAfterReceivePostProcessors(new GUnzipPostProcessor());
496513
return rabbitTemplate;
497514
}
498515

499516
@Bean
500517
@Primary
501518
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
502519
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
520+
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
503521
container.setQueueNames(replies().getName());
504522
return container;
505523
}
@@ -518,7 +536,8 @@ public AsyncRabbitTemplate asyncDirectTemplate(RabbitTemplate templateForDirect)
518536
public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) {
519537
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
520538
container.setQueueNames(requests().getName());
521-
container.setMessageListener(
539+
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
540+
MessageListenerAdapter messageListener =
522541
new MessageListenerAdapter((ReplyingMessageListener<String, String>)
523542
message -> {
524543
CountDownLatch countDownLatch = latch().get();
@@ -542,7 +561,10 @@ else if ("noReply".equals(message)) {
542561
return null;
543562
}
544563
return message.toUpperCase();
545-
}));
564+
});
565+
566+
messageListener.setBeforeSendReplyPostProcessors(gZipPostProcessor());
567+
container.setMessageListener(messageListener);
546568
return container;
547569
}
548570

src/reference/asciidoc/amqp.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3432,6 +3432,11 @@ The second is invoked immediately after a message is received.
34323432
These extension points are used for such features as compression and, for this purpose, several `MessagePostProcessor` implementations are provided.
34333433
`GZipPostProcessor` and `ZipPostProcessor` compress messages before sending, and `GUnzipPostProcessor` and `UnzipPostProcessor` decompress received messages.
34343434

3435+
NOTE: Starting with version 2.1.5, the `GZipPostProcessor` can be configured with the `copyProperties = true` option to make a copy of the original message properties.
3436+
By default, these properties are reused for performance reasons, and modified with compression content encoding and the optional `MessageProperties.SPRING_AUTO_DECOMPRESS` header.
3437+
If you retain a reference to the original outbound message, its properties will change as well.
3438+
So, if your application retains a copy of an outbound message with these message post processors, consider turning the `copyProperties` option on.
3439+
34353440
Similarly, the `SimpleMessageListenerContainer` also has a `setAfterReceivePostProcessors()` method, letting the decompression be performed after messages are received by the container.
34363441

34373442
Starting with version 2.1.4, `addBeforePublishPostProcessors()` and `addAfterReceivePostProcessors()` have been added to the `RabbitTemplate` to allow appending new post processors to the list of before publish and after receive post processors respectively.

0 commit comments

Comments
 (0)