Skip to content

Commit a1ec58c

Browse files
SQS: Fix missing headers (#781)
* Fix missing MessageHeaders returned from MessageConverter * Make Message Id Consistent Make tests use correct type for sending messages --------- Co-authored-by: Stuart Still <[email protected]>
1 parent c6f552b commit a1ec58c

File tree

4 files changed

+47
-38
lines changed

4 files changed

+47
-38
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/AbstractMessagingMessageConverter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,14 @@ public MessageConversionContext createMessageConversionContext() {
205205

206206
@Override
207207
public S fromMessagingMessage(Message<?> message, @Nullable MessageConversionContext context) {
208+
// We must make sure the message id stays consistent throughout this process
208209
MessageHeaders headers = getMessageHeaders(message);
209-
S messageWithHeaders = this.headerMapper.fromHeaders(headers);
210-
Object payload = Objects
211-
.requireNonNull(this.payloadMessageConverter.toMessage(message.getPayload(), message.getHeaders()),
212-
() -> "payloadMessageConverter returned null message for message " + message)
213-
.getPayload();
214-
return doConvertMessage(messageWithHeaders, payload);
210+
Message<?> convertedMessage = Objects.requireNonNull(
211+
this.payloadMessageConverter.toMessage(message.getPayload(), message.getHeaders()),
212+
() -> "payloadMessageConverter returned null message for message " + message);
213+
MessageHeaders completeHeaders = MessageHeaderUtils.addHeadersIfAbsent(headers, convertedMessage.getHeaders());
214+
S messageWithHeaders = this.headerMapper.fromHeaders(completeHeaders);
215+
return doConvertMessage(messageWithHeaders, convertedMessage.getPayload());
215216
}
216217

217218
private MessageHeaders getMessageHeaders(Message<?> message) {

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsLoadIntegrationTests.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static java.util.Collections.singletonMap;
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21-
import com.fasterxml.jackson.core.JsonProcessingException;
2221
import com.fasterxml.jackson.databind.ObjectMapper;
2322
import io.awspring.cloud.sqs.MessageHeaderUtils;
2423
import io.awspring.cloud.sqs.annotation.SqsListener;
@@ -89,9 +88,6 @@ class SqsLoadIntegrationTests extends BaseSqsIntegrationTest {
8988
@Autowired
9089
SqsTemplate sqsTemplate;
9190

92-
@Autowired
93-
ObjectMapper objectMapper;
94-
9591
@Autowired
9692
Settings settings;
9793

@@ -202,11 +198,11 @@ private void sendMessageBatchAsync(String queueName) {
202198
if (!settings.sendMessages) {
203199
return;
204200
}
205-
Collection<Message<String>> messages = getMessages();
201+
Collection<Message<Object>> messages = getMessages();
206202
doSendMessageBatch(queueName, messages);
207203
}
208204

209-
private void doSendMessageBatch(String queueName, Collection<Message<String>> messages) {
205+
private void doSendMessageBatch(String queueName, Collection<Message<Object>> messages) {
210206
sqsTemplate.sendManyAsync(queueName, messages).thenRun(this::logSend).exceptionally(t -> {
211207
logger.error("Error sending messages - retrying", t);
212208
doSendMessageBatch(queueName, messages);
@@ -221,22 +217,16 @@ private void logSend() {
221217
}
222218
}
223219

224-
private Collection<Message<String>> getMessages() {
220+
private Collection<Message<Object>> getMessages() {
225221
return IntStream.range(0, Math.min(settings.totalMessages / 2, 10)).mapToObj(index -> {
226-
Message<String> message = MessageBuilder.withPayload(getBody()).build();
222+
Message<Object> message = MessageBuilder.withPayload(getBody()).build();
227223
logger.trace("Sending message with id {}", message.getHeaders().get("id"));
228224
return message;
229225
}).collect(Collectors.toList());
230226
}
231227

232-
private String getBody() {
233-
try {
234-
return this.objectMapper.writeValueAsString(
235-
new MyPojo("MyPojo - " + bodyInteger.incrementAndGet(), "MyPojo - secondValue"));
236-
}
237-
catch (JsonProcessingException e) {
238-
throw new RuntimeException(e);
239-
}
228+
private Object getBody() {
229+
return new MyPojo("MyPojo - " + bodyInteger.incrementAndGet(), "MyPojo - secondValue");
240230
}
241231

242232
static class MessageContainer {
@@ -266,7 +256,7 @@ static class ReceiveManyFromTwoQueuesListener {
266256

267257
@SqsListener(queueNames = { RECEIVE_FROM_MANY_1_QUEUE_NAME,
268258
RECEIVE_FROM_MANY_2_QUEUE_NAME }, factory = HIGH_THROUGHPUT_FACTORY_NAME, id = "many-from-two-queues")
269-
void listen(Message<String> message) throws Exception {
259+
void listen(Message<MyPojo> message) throws Exception {
270260
logger.trace("Started processing {}", MessageHeaderUtils.getId(message));
271261
if (this.messageContainer.receivedByListener.contains(MessageHeaderUtils.getId(message))) {
272262
logger.warn("Received duplicated message: {}", message);

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageConversionIntegrationTests.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest {
7272
@Autowired
7373
SqsTemplate sqsTemplate;
7474

75-
@Autowired
76-
ObjectMapper objectMapper;
77-
7875
@BeforeAll
7976
static void beforeTests() {
8077
SqsAsyncClient client = createAsyncClient();
@@ -88,31 +85,31 @@ static void beforeTests() {
8885

8986
@Test
9087
void resolvesPojoParameterTypes() throws Exception {
91-
String messageBody = objectMapper.writeValueAsString(new MyPojo("pojoParameterType", "secondValue"));
88+
MyPojo messageBody = new MyPojo("pojoParameterType", "secondValue");
9289
sqsTemplate.send(RESOLVES_POJO_TYPES_QUEUE_NAME, messageBody);
9390
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_TYPES_QUEUE_NAME, messageBody);
9491
assertThat(latchContainer.resolvesPojoLatch.await(10, TimeUnit.SECONDS)).isTrue();
9592
}
9693

9794
@Test
9895
void resolvesPojoMessage() throws Exception {
99-
String messageBody = objectMapper.writeValueAsString(new MyPojo("resolvesPojoMessage", "secondValue"));
96+
MyPojo messageBody = new MyPojo("resolvesPojoMessage", "secondValue");
10097
sqsTemplate.send(RESOLVES_POJO_MESSAGE_QUEUE_NAME, messageBody);
10198
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_MESSAGE_QUEUE_NAME, messageBody);
10299
assertThat(latchContainer.resolvesPojoMessageLatch.await(10, TimeUnit.SECONDS)).isTrue();
103100
}
104101

105102
@Test
106103
void resolvesPojoList() throws Exception {
107-
String messageBody = objectMapper.writeValueAsString(new MyPojo("resolvesPojoList", "secondValue"));
108-
sqsTemplate.send(RESOLVES_POJO_LIST_QUEUE_NAME, messageBody);
109-
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_LIST_QUEUE_NAME, messageBody);
104+
MyPojo payload = new MyPojo("resolvesPojoList", "secondValue");
105+
sqsTemplate.send(RESOLVES_POJO_LIST_QUEUE_NAME, payload);
106+
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_LIST_QUEUE_NAME, payload);
110107
assertThat(latchContainer.resolvesPojoListLatch.await(10, TimeUnit.SECONDS)).isTrue();
111108
}
112109

113110
@Test
114111
void resolvesPojoMessageList() throws Exception {
115-
String messageBody = objectMapper.writeValueAsString(new MyPojo("resolvesPojoMessageList", "secondValue"));
112+
MyPojo messageBody = new MyPojo("resolvesPojoMessageList", "secondValue");
116113
sqsTemplate.send(RESOLVES_POJO_MESSAGE_LIST_QUEUE_NAME, messageBody);
117114
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_MESSAGE_LIST_QUEUE_NAME,
118115
messageBody);
@@ -121,20 +118,20 @@ void resolvesPojoMessageList() throws Exception {
121118

122119
@Test
123120
void resolvesPojoFromHeader() throws Exception {
124-
String messageBody = objectMapper.writeValueAsString(new MyPojo("pojoParameterType", "secondValue"));
121+
MyPojo payload = new MyPojo("pojoParameterType", "secondValue");
125122
sqsTemplate.send(RESOLVES_POJO_FROM_HEADER_QUEUE_NAME,
126-
MessageBuilder.createMessage(messageBody, new MessagingMessageHeaders(getHeaderMapping(MyPojo.class))));
127-
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_FROM_HEADER_QUEUE_NAME, messageBody);
123+
MessageBuilder.createMessage(payload, new MessagingMessageHeaders(getHeaderMapping(MyPojo.class))));
124+
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_FROM_HEADER_QUEUE_NAME, payload);
128125
assertThat(latchContainer.resolvesPojoFromMappingLatch.await(10, TimeUnit.SECONDS)).isTrue();
129126
}
130127

131128
@Test
132129
void resolvesMyOtherPojoFromHeader() throws Exception {
133-
String messageBody = objectMapper.writeValueAsString(new MyOtherPojo("pojoParameterType", "secondValue"));
134-
sqsTemplate.send(RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME, MessageBuilder.createMessage(messageBody,
130+
MyOtherPojo payload = new MyOtherPojo("pojoParameterType", "secondValue");
131+
sqsTemplate.send(RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME, MessageBuilder.createMessage(payload,
135132
new MessagingMessageHeaders(getHeaderMapping(MyOtherPojo.class))));
136133
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME,
137-
messageBody);
134+
payload);
138135
assertThat(latchContainer.resolvesMyOtherPojoFromMappingLatch.await(10, TimeUnit.SECONDS)).isTrue();
139136
}
140137

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsMessagingMessageConverterTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.junit.jupiter.api.Test;
3030
import org.springframework.messaging.MessageHeaders;
3131
import org.springframework.messaging.converter.MessageConverter;
32+
import org.springframework.messaging.support.MessageBuilder;
3233
import software.amazon.awssdk.services.sqs.model.Message;
3334
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
3435

@@ -114,6 +115,26 @@ void shouldUseProvidedPayloadConverter() throws Exception {
114115
assertThat(resultMessage.getPayload()).isEqualTo(myPojo);
115116
}
116117

118+
@Test
119+
void shouldUseHeadersFromPayloadConverter() {
120+
MessageConverter payloadConverter = mock(MessageConverter.class);
121+
org.springframework.messaging.Message convertedMessageWithContentType = MessageBuilder.withPayload("example")
122+
.setHeader("contentType", "application/json").build();
123+
when(payloadConverter.toMessage(any(MyPojo.class), any())).thenReturn(convertedMessageWithContentType);
124+
125+
SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
126+
converter.setPayloadMessageConverter(payloadConverter);
127+
converter.setPayloadTypeMapper(msg -> MyPojo.class);
128+
129+
org.springframework.messaging.Message<MyPojo> message = MessageBuilder.createMessage(new MyPojo(),
130+
new MessageHeaders(null));
131+
Message resultMessage = converter.fromMessagingMessage(message);
132+
133+
assertThat(resultMessage.messageId()).isEqualTo(message.getHeaders().getId().toString());
134+
assertThat(resultMessage.messageAttributes()).containsEntry("contentType",
135+
MessageAttributeValue.builder().stringValue("application/json").dataType("String").build());
136+
}
137+
117138
static class MyPojo {
118139

119140
private String myProperty = "myValue";

0 commit comments

Comments
 (0)