Skip to content

Commit 312f129

Browse files
committed
GH-3174: Fix expiration property conversion for AMQP 1.0 module
Similar to: #3174 The `absoluteExpiryTime`, according to the AMQP 1.0 message format, is a timestamp when a message should expire. The AMQP 0.9.1 deals with a period instead of absolute time. * Fix `RabbitAmqpUtils` to calculate `expiration` from the difference between `absoluteExpiryTime` and `creationTime` (which is also absolute). * Use `System.currentTimeMillis()` if `creationTime` is not provided. * Fix an `absoluteExpiryTime` calculation based on the `creationTime` and `expiration` **Auto-cherry-pick to `3.2.x`**
1 parent a788393 commit 312f129

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpUtils.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.amqp.core.Message;
2828
import org.springframework.amqp.core.MessageProperties;
2929
import org.springframework.amqp.utils.JavaUtils;
30+
import org.springframework.util.StringUtils;
3031

3132
/**
3233
* The utilities for RabbitMQ AMQP 1.0 protocol API.
@@ -51,11 +52,19 @@ public static Message fromAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessa
5152
.acceptIfNotNull(amqpMessage.correlationIdAsString(), messageProperties::setCorrelationId)
5253
.acceptIfNotNull(amqpMessage.contentType(), messageProperties::setContentType)
5354
.acceptIfNotNull(amqpMessage.contentEncoding(), messageProperties::setContentEncoding)
54-
.acceptIfNotNull(amqpMessage.absoluteExpiryTime(),
55-
(exp) -> messageProperties.setExpiration(Long.toString(exp)))
56-
.acceptIfNotNull(amqpMessage.creationTime(), (time) -> messageProperties.setTimestamp(new Date(time)))
5755
.acceptIfNotNull(amqpMessage.replyTo(), messageProperties::setReplyTo);
5856

57+
long creationTime = amqpMessage.creationTime();
58+
if (creationTime <= 0) {
59+
creationTime = System.currentTimeMillis();
60+
}
61+
messageProperties.setTimestamp(new Date(creationTime));
62+
63+
long absoluteExpiryTime = amqpMessage.absoluteExpiryTime();
64+
if (absoluteExpiryTime > creationTime) {
65+
messageProperties.setExpiration(Long.toString(absoluteExpiryTime - creationTime));
66+
}
67+
5968
amqpMessage.forEachProperty(messageProperties::setHeader);
6069

6170
if (context != null) {
@@ -90,20 +99,21 @@ public static void toAmqpMessage(Message message, com.rabbitmq.client.amqp.Messa
9099
.priority(messageProperties.getPriority().byteValue());
91100

92101
Map<String, @Nullable Object> headers = messageProperties.getHeaders();
93-
if (!headers.isEmpty()) {
94-
headers.forEach((key, val) -> mapProp(key, val, amqpMessage));
95-
}
102+
headers.forEach((key, val) -> mapProp(key, val, amqpMessage));
96103

97104
JavaUtils.INSTANCE
98105
.acceptOrElseIfNotNull(messageProperties.getCorrelationId(),
99106
messageProperties.getMessageId(), amqpMessage::correlationId)
107+
.acceptOrElseIfNotNull(messageProperties.getTimestamp(),
108+
new Date(), (timestamp) -> amqpMessage.creationTime(timestamp.getTime()))
100109
.acceptIfNotNull(messageProperties.getUserId(),
101110
(userId) -> amqpMessage.userId(userId.getBytes(StandardCharsets.UTF_8)))
102-
.acceptIfNotNull(messageProperties.getReplyTo(), amqpMessage::to)
103-
.acceptIfNotNull(messageProperties.getTimestamp(),
104-
(timestamp) -> amqpMessage.creationTime(timestamp.getTime()))
105-
.acceptIfNotNull(messageProperties.getExpiration(),
106-
(expiration) -> amqpMessage.absoluteExpiryTime(Long.parseLong(expiration)));
111+
.acceptIfNotNull(messageProperties.getReplyTo(), amqpMessage::to);
112+
113+
String expiration = messageProperties.getExpiration();
114+
if (StringUtils.hasText(expiration)) {
115+
amqpMessage.absoluteExpiryTime(amqpMessage.creationTime() + Long.parseLong(expiration));
116+
}
107117
}
108118

109119
private static void mapProp(String key, @Nullable Object val, com.rabbitmq.client.amqp.Message amqpMessage) {

0 commit comments

Comments
 (0)