Skip to content

Commit a788393

Browse files
committed
GH-3174: Fix expiration property conversion for Stream module
Fixes: #3174 The Stream message comes with an `absoluteExpiryTime` according to the AMQP 1.0 message format. The AMQP 0.9.1 deals with a period instead of absolute time. * Fix `DefaultStreamMessageConverter` to calculate `expiration` from the difference between `absoluteExpiryTime` and `creationTime` (which is also absolute). * Use `System.currentTimeMillis()` if `creationTime` is not provided. * Fix an `absoluteExpiryTime` calculation based on the `creationTime` and `expiration` **Auto-cherry-pick to `3.2.x`**
1 parent cd94127 commit a788393

File tree

2 files changed

+38
-15
lines changed

2 files changed

+38
-15
lines changed

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/support/converter/DefaultStreamMessageConverter.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,23 @@
2828
import com.rabbitmq.stream.MessageBuilder.PropertiesBuilder;
2929
import com.rabbitmq.stream.Properties;
3030
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
31+
import org.jspecify.annotations.Nullable;
3132

3233
import org.springframework.amqp.core.Message;
3334
import org.springframework.amqp.core.MessageProperties;
3435
import org.springframework.amqp.support.converter.MessageConversionException;
3536
import org.springframework.amqp.utils.JavaUtils;
3637
import org.springframework.rabbit.stream.support.StreamMessageProperties;
3738
import org.springframework.util.Assert;
39+
import org.springframework.util.StringUtils;
3840

3941
/**
4042
* Default {@link StreamMessageConverter}.
4143
*
4244
* @author Gary Russell
4345
* @author Ngoc Nhan
46+
* @author Artem Bilan
47+
*
4448
* @since 2.4
4549
*
4650
*/
@@ -91,30 +95,37 @@ public com.rabbitmq.stream.Message fromMessage(Message message) throws MessageCo
9195
Assert.isInstanceOf(StreamMessageProperties.class, props);
9296
StreamMessageProperties mProps = (StreamMessageProperties) props;
9397
JavaUtils.INSTANCE
94-
.acceptIfNotNull(mProps.getMessageId(), propsBuilder::messageId) // TODO different types
98+
.acceptIfNotNull(mProps.getMessageId(), propsBuilder::messageId)
9599
.acceptIfNotNull(mProps.getUserId(), usr -> propsBuilder.userId(usr.getBytes(this.charset)))
96100
.acceptIfNotNull(mProps.getTo(), propsBuilder::to)
97101
.acceptIfNotNull(mProps.getSubject(), propsBuilder::subject)
98102
.acceptIfNotNull(mProps.getReplyTo(), propsBuilder::replyTo)
99-
.acceptIfNotNull(mProps.getCorrelationId(), propsBuilder::correlationId) // TODO different types
103+
.acceptIfNotNull(mProps.getCorrelationId(), propsBuilder::correlationId)
100104
.acceptIfNotNull(mProps.getContentType(), propsBuilder::contentType)
101105
.acceptIfNotNull(mProps.getContentEncoding(), propsBuilder::contentEncoding)
102-
.acceptIfNotNull(mProps.getExpiration(), exp -> propsBuilder.absoluteExpiryTime(Long.parseLong(exp)))
103106
.acceptIfNotNull(mProps.getCreationTime(), propsBuilder::creationTime)
104107
.acceptIfNotNull(mProps.getGroupId(), propsBuilder::groupId)
105108
.acceptIfNotNull(mProps.getGroupSequence(), propsBuilder::groupSequence)
106109
.acceptIfNotNull(mProps.getReplyToGroupId(), propsBuilder::replyToGroupId);
107110
ApplicationPropertiesBuilder appPropsBuilder = builder.applicationProperties();
108-
if (!mProps.getHeaders().isEmpty()) {
109-
mProps.getHeaders().forEach((key, val) -> {
110-
mapProp(key, val, appPropsBuilder);
111-
});
111+
112+
long creationTime = mProps.getCreationTime();
113+
if (creationTime <= 0) {
114+
creationTime = System.currentTimeMillis();
115+
}
116+
propsBuilder.creationTime(creationTime);
117+
118+
String expiration = mProps.getExpiration();
119+
if (StringUtils.hasText(expiration)) {
120+
propsBuilder.absoluteExpiryTime(creationTime + Long.parseLong(expiration));
112121
}
122+
123+
mProps.getHeaders().forEach((key, val) -> mapProp(key, val, appPropsBuilder));
113124
builder.addData(message.getBody());
114125
return builder.build();
115126
}
116127

117-
private void mapProp(String key, Object val, ApplicationPropertiesBuilder builder) { // NOSONAR - complexity
128+
private void mapProp(String key, @Nullable Object val, ApplicationPropertiesBuilder builder) {
118129
if (val instanceof String string) {
119130
builder.entry(key, string);
120131
}
@@ -154,19 +165,28 @@ private void toMessageProperties(com.rabbitmq.stream.Message streamMessage,
154165
if (properties != null) {
155166
JavaUtils.INSTANCE
156167
.acceptIfNotNull(properties.getMessageIdAsString(), mProps::setMessageId)
157-
.acceptIfNotNull(properties.getUserId(), usr -> mProps.setUserId(new String(usr, this.charset)))
168+
.acceptIfNotNull(properties.getUserId(),
169+
usr -> mProps.setUserId(new String(usr, this.charset)))
158170
.acceptIfNotNull(properties.getTo(), mProps::setTo)
159171
.acceptIfNotNull(properties.getSubject(), mProps::setSubject)
160172
.acceptIfNotNull(properties.getReplyTo(), mProps::setReplyTo)
161173
.acceptIfNotNull(properties.getCorrelationIdAsString(), mProps::setCorrelationId)
162174
.acceptIfNotNull(properties.getContentType(), mProps::setContentType)
163175
.acceptIfNotNull(properties.getContentEncoding(), mProps::setContentEncoding)
164-
.acceptIfNotNull(properties.getAbsoluteExpiryTime(),
165-
exp -> mProps.setExpiration(Long.toString(exp)))
166-
.acceptIfNotNull(properties.getCreationTime(), mProps::setCreationTime)
167176
.acceptIfNotNull(properties.getGroupId(), mProps::setGroupId)
168177
.acceptIfNotNull(properties.getGroupSequence(), mProps::setGroupSequence)
169178
.acceptIfNotNull(properties.getReplyToGroupId(), mProps::setReplyToGroupId);
179+
180+
long creationTime = properties.getCreationTime();
181+
if (creationTime <= 0) {
182+
creationTime = System.currentTimeMillis();
183+
}
184+
mProps.setCreationTime(creationTime);
185+
186+
long absoluteExpiryTime = properties.getAbsoluteExpiryTime();
187+
if (absoluteExpiryTime > creationTime) {
188+
mProps.setExpiration(Long.toString(absoluteExpiryTime - creationTime));
189+
}
170190
}
171191
Map<String, Object> applicationProperties = streamMessage.getApplicationProperties();
172192
if (applicationProperties != null) {

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/converter/DefaultStreamMessageConverterTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import org.springframework.rabbit.stream.support.StreamMessageProperties;
2525

2626
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.assertj.core.api.Assertions.withinPercentage;
2728
import static org.mockito.Mockito.mock;
2829

2930
/**
3031
* @author Gary Russell
32+
* @author Artem Bilan
33+
*
3134
* @since 2.4
3235
*
3336
*/
@@ -45,7 +48,7 @@ void toAndFrom() {
4548
smp.setContentType("application/json");
4649
smp.setContentEncoding("UTF-8");
4750
smp.setExpiration("42");
48-
smp.setCreationTime(43L);
51+
smp.setCreationTime(System.currentTimeMillis());
4952
smp.setGroupId("groupId");
5053
smp.setGroupSequence(44L);
5154
smp.setReplyToGroupId("replyGroupId");
@@ -62,8 +65,8 @@ void toAndFrom() {
6265
assertThat(props.getCorrelationIdAsString()).isEqualTo("correlation");
6366
assertThat(props.getContentType()).isEqualTo("application/json");
6467
assertThat(props.getContentEncoding()).isEqualTo("UTF-8");
65-
assertThat(props.getAbsoluteExpiryTime()).isEqualTo(42L);
66-
assertThat(props.getCreationTime()).isEqualTo(43L);
68+
assertThat(props.getAbsoluteExpiryTime()).isCloseTo(System.currentTimeMillis() + 42, withinPercentage(5));
69+
assertThat(props.getCreationTime()).isCloseTo(System.currentTimeMillis(), withinPercentage(5));
6770
assertThat(props.getGroupId()).isEqualTo("groupId");
6871
assertThat(props.getGroupSequence()).isEqualTo(44L);
6972
assertThat(props.getReplyToGroupId()).isEqualTo("replyGroupId");

0 commit comments

Comments
 (0)