Skip to content

Commit 4f76b2b

Browse files
authored
Fix Compressed contentEncoding Delimiter
Resolves #1251 Delimiter should be a comma and whitespace trimmeed. Handle both delimiters in decompressors and add property for backwards compatibility. **I will backport to 2.2.x with default `:`** * Do not add a delimiter if the original encoding is an empty String.
1 parent 18f4938 commit 4f76b2b

File tree

7 files changed

+62
-19
lines changed

7 files changed

+62
-19
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractCompressingPostProcessor.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,9 @@
3131
import org.springframework.amqp.core.MessageProperties;
3232
import org.springframework.amqp.core.MessagePropertiesBuilder;
3333
import org.springframework.core.Ordered;
34+
import org.springframework.util.Assert;
3435
import org.springframework.util.FileCopyUtils;
36+
import org.springframework.util.StringUtils;
3537

3638
/**
3739
* Base class for post processors that compress the message body. The content encoding is
@@ -53,6 +55,8 @@ public abstract class AbstractCompressingPostProcessor implements MessagePostPro
5355

5456
private boolean copyProperties = false;
5557

58+
private String encodingDelimiter = ", ";
59+
5660
/**
5761
* Construct a post processor that will include the
5862
* {@link MessageProperties#SPRING_AUTO_DECOMPRESS} header set to 'true'.
@@ -85,6 +89,19 @@ public void setCopyProperties(boolean copyProperties) {
8589
this.copyProperties = copyProperties;
8690
}
8791

92+
/**
93+
* Set a delimiter to be added between the compression type and the original encoding,
94+
* if any. Defaults to {@code ", "} (since 2.3); for compatibility with consumers
95+
* using versions of spring-amqp earlier than 2.2.12, set it to {@code ":"} (no
96+
* trailing space).
97+
* @param encodingDelimiter the delimiter.
98+
* @since 2.2.12
99+
*/
100+
public void setEncodingDelimiter(String encodingDelimiter) {
101+
Assert.notNull(encodingDelimiter, "'encodingDelimiter' cannot be null");
102+
this.encodingDelimiter = encodingDelimiter;
103+
}
104+
88105
@Override
89106
public Message postProcessMessage(Message message) throws AmqpException {
90107
try {
@@ -109,9 +126,9 @@ public Message postProcessMessage(Message message) throws AmqpException {
109126

110127
MessageProperties messageProperties =
111128
messagePropertiesBuilder.setContentEncoding(getEncoding() +
112-
(originalProperties.getContentEncoding() == null
129+
(!StringUtils.hasText(originalProperties.getContentEncoding())
113130
? ""
114-
: ":" + originalProperties.getContentEncoding()))
131+
: this.encodingDelimiter + originalProperties.getContentEncoding()))
115132
.build();
116133

117134
return new Message(compressed, messageProperties);

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractDecompressingPostProcessor.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -91,17 +91,22 @@ public Message postProcessMessage(Message message) throws AmqpException {
9191
FileCopyUtils.copy(unzipper, out);
9292
MessageProperties messageProperties = message.getMessageProperties();
9393
String encoding = messageProperties.getContentEncoding();
94-
int colonAt = encoding.indexOf(':');
95-
if (colonAt > 0) {
96-
encoding = encoding.substring(0, colonAt);
94+
int delimAt = encoding.indexOf(':');
95+
if (delimAt < 0) {
96+
delimAt = encoding.indexOf(',');
97+
}
98+
if (delimAt > 0) {
99+
encoding = encoding.substring(0, delimAt);
97100
}
98101
Assert.state(getEncoding().equals(encoding), "Content encoding must be:" + getEncoding() + ", was:"
99102
+ encoding);
100-
if (colonAt < 0) {
103+
if (delimAt < 0) {
101104
messageProperties.setContentEncoding(null);
102105
}
103106
else {
104-
messageProperties.setContentEncoding(messageProperties.getContentEncoding().substring(colonAt + 1));
107+
messageProperties.setContentEncoding(messageProperties.getContentEncoding()
108+
.substring(delimAt + 1)
109+
.trim());
105110
}
106111
messageProperties.getHeaders().remove(MessageProperties.SPRING_AUTO_DECOMPRESS);
107112
return new Message(out.toByteArray(), messageProperties);

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/DelegatingDecompressingPostProcessor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -94,9 +94,12 @@ public Message postProcessMessage(Message message) throws AmqpException {
9494
return message;
9595
}
9696
else {
97-
int colonAt = encoding.indexOf(':');
98-
if (colonAt > 0) {
99-
encoding = encoding.substring(0, colonAt);
97+
int delimAt = encoding.indexOf(':');
98+
if (delimAt < 0) {
99+
delimAt = encoding.indexOf(',');
100+
}
101+
if (delimAt > 0) {
102+
encoding = encoding.substring(0, delimAt);
100103
}
101104
MessagePostProcessor decompressor = this.decompressors.get(encoding);
102105
if (decompressor != null) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ public void testSimpleBatchGZipped() throws Exception {
378378
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
379379
template.setConnectionFactory(this.connectionFactory);
380380
GZipPostProcessor gZipPostProcessor = new GZipPostProcessor();
381+
gZipPostProcessor.setEncodingDelimiter(":"); // unzip messages from older versions
381382
assertThat(getStreamLevel(gZipPostProcessor)).isEqualTo(Deflater.BEST_SPEED);
382383
template.setBeforePublishPostProcessors(gZipPostProcessor);
383384
MessageProperties props = new MessageProperties();
@@ -489,7 +490,7 @@ public void testSimpleBatchGZippedWithEncoding() throws Exception {
489490
message = new Message("bar".getBytes(), props);
490491
template.send("", ROUTE, message);
491492
message = receive(template);
492-
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("gzip:foo");
493+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("gzip, foo");
493494
GUnzipPostProcessor unzipper = new GUnzipPostProcessor();
494495
message = unzipper.postProcessMessage(message);
495496
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
@@ -500,10 +501,14 @@ public void testSimpleBatchGZippedWithEncodingInflated() throws Exception {
500501
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 30000);
501502
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
502503
template.setConnectionFactory(this.connectionFactory);
503-
template.setBeforePublishPostProcessors(new GZipPostProcessor());
504+
AtomicReference<String> encoding = new AtomicReference<>();
505+
template.setBeforePublishPostProcessors(new GZipPostProcessor(), msg -> {
506+
encoding.set(msg.getMessageProperties().getContentEncoding());
507+
return msg;
508+
});
504509
template.setAfterReceivePostProcessors(new DelegatingDecompressingPostProcessor());
505510
MessageProperties props = new MessageProperties();
506-
props.setContentEncoding("foo");
511+
props.setContentEncoding("");
507512
Message message = new Message("foo".getBytes(), props);
508513
template.send("", ROUTE, message);
509514
message = new Message("bar".getBytes(), props);
@@ -512,6 +517,7 @@ public void testSimpleBatchGZippedWithEncodingInflated() throws Exception {
512517
byte[] out = (byte[]) template.receiveAndConvert(ROUTE);
513518
assertThat(out).isNotNull();
514519
assertThat(new String(out)).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
520+
assertThat(encoding.get()).isEqualTo("gzip");
515521
}
516522

517523
@Test
@@ -550,7 +556,7 @@ public void testSimpleBatchZippedWithEncoding() throws Exception {
550556
message = new Message("bar".getBytes(), props);
551557
template.send("", ROUTE, message);
552558
message = receive(template);
553-
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("zip:foo");
559+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("zip, foo");
554560
UnzipPostProcessor unzipper = new UnzipPostProcessor();
555561
message = unzipper.postProcessMessage(message);
556562
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
@@ -612,7 +618,7 @@ public void testSimpleBatchDeflaterWithEncoding() throws Exception {
612618
message = new Message("bar".getBytes(), props);
613619
template.send("", ROUTE, message);
614620
message = receive(template);
615-
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate:foo");
621+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate, foo");
616622
InflaterPostProcessor inflater = new InflaterPostProcessor();
617623
message = inflater.postProcessMessage(message);
618624
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1393,7 +1393,7 @@ public String handleMessage(String message) {
13931393
Message message = new Message("foo".getBytes(), props);
13941394
Message reply = template.sendAndReceive("", ROUTE, message);
13951395
assertThat(reply).isNotNull();
1396-
assertThat(reply.getMessageProperties().getContentEncoding()).isEqualTo("gzip:UTF-8");
1396+
assertThat(reply.getMessageProperties().getContentEncoding()).isEqualTo("gzip, UTF-8");
13971397
GUnzipPostProcessor unzipper = new GUnzipPostProcessor();
13981398
reply = unzipper.postProcessMessage(reply);
13991399
assertThat(new String(reply.getBody())).isEqualTo("FOO");

src/reference/asciidoc/amqp.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3946,6 +3946,12 @@ By default, these properties are reused for performance reasons, and modified wi
39463946
If you retain a reference to the original outbound message, its properties will change as well.
39473947
So, if your application retains a copy of an outbound message with these message post processors, consider turning the `copyProperties` option on.
39483948

3949+
IMPORTANT: Starting with version 2.2.12, you can configure the delimiter that the compressing post processors use between content encoding elements.
3950+
With versions 2.2.11 and before, this was hard-coded as `:`, it is now set to `, ` by default.
3951+
The decompressors will work with both delimiters.
3952+
However, if you publish messages with 2.3 or later and consume with 2.2.11 or earlier, you MUST set the `encodingDelimiter` property on the compressor(s) to `:`.
3953+
When your consumers are upgraded to 2.2.11 or later, you can revert to the default of `, `.
3954+
39493955
Similarly, the `SimpleMessageListenerContainer` also has a `setAfterReceivePostProcessors()` method, letting the decompression be performed after messages are received by the container.
39503956

39513957
Starting with version 2.1.4, `addBeforePublishPostProcessors()` and `addAfterReceivePostProcessors()` have been added to the `RabbitTemplate` to allow appending new post processors to the list of before publish and after receive post processors respectively.

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,9 @@ See <<template-confirms>> for more information.
3838

3939
A new listener container property `consumeDelay` is now available; it is helpful when using the https://github.com/rabbitmq/rabbitmq-sharding[RabbitMQ Sharding Plugin].
4040
See <<containerAttributes>> for more information.
41+
42+
==== MessagePostProcessor Changes
43+
44+
The compressing `MessagePostProcessor` s now use a comma to separate multiple content encodings instead of a colon.
45+
The decompressors can handle both formats but, if you produce messages with this version that are consumed by versions earlier than 2.2.12, you should configure the compressor to use the old delimiter.
46+
See the IMPORTANT note in <<post-processing>> for more information.

0 commit comments

Comments
 (0)