Skip to content

Commit c1b850a

Browse files
authored
Conditionally add a random UUID dedup ID if the queue isn't configured for content based deduplication (#799)
Fixes #796
1 parent 358bdca commit c1b850a

File tree

2 files changed

+145
-49
lines changed

2 files changed

+145
-49
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.awspring.cloud.sqs.listener.QueueAttributes;
2323
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
2424
import io.awspring.cloud.sqs.listener.SqsHeaders;
25+
import io.awspring.cloud.sqs.listener.SqsHeaders.MessageSystemAttributes;
2526
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback;
2627
import io.awspring.cloud.sqs.support.converter.MessageAttributeDataTypes;
2728
import io.awspring.cloud.sqs.support.converter.MessageConversionContext;
@@ -32,8 +33,10 @@
3233
import java.util.Collection;
3334
import java.util.Collections;
3435
import java.util.HashMap;
36+
import java.util.HashSet;
3537
import java.util.Map;
3638
import java.util.Optional;
39+
import java.util.Set;
3740
import java.util.UUID;
3841
import java.util.concurrent.CompletableFuture;
3942
import java.util.concurrent.ConcurrentHashMap;
@@ -244,23 +247,35 @@ private Map<String, Object> addAdditionalReceiveHeaders(SqsReceiveOptionsImpl op
244247
@Override
245248
protected <T> org.springframework.messaging.Message<T> preProcessMessageForSend(String endpointToUse,
246249
org.springframework.messaging.Message<T> message) {
247-
return FifoUtils.isFifo(endpointToUse) ? addMissingFifoSendHeaders(message) : message;
250+
return FifoUtils.isFifo(endpointToUse) ? addMissingFifoSendHeaders(endpointToUse, message) : message;
248251
}
249252

250253
@Override
251254
protected <T> Collection<org.springframework.messaging.Message<T>> preProcessMessagesForSend(String endpointToUse,
252255
Collection<org.springframework.messaging.Message<T>> messages) {
253256
return FifoUtils.isFifo(endpointToUse)
254-
? messages.stream().map(this::addMissingFifoSendHeaders).collect(Collectors.toList())
257+
? messages.stream().map(message -> addMissingFifoSendHeaders(endpointToUse, message)).toList()
255258
: messages;
256259
}
257260

258-
private <T> org.springframework.messaging.Message<T> addMissingFifoSendHeaders(
261+
private <T> org.springframework.messaging.Message<T> addMissingFifoSendHeaders(String endpointName,
259262
org.springframework.messaging.Message<T> message) {
260-
return MessageHeaderUtils.addHeadersIfAbsent(message,
261-
Map.of(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString(),
262-
SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER,
263-
UUID.randomUUID().toString()));
263+
264+
Set<QueueAttributeName> additionalAttributes = Set.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION);
265+
String contentBasedDedupQueueAttribute = getQueueAttributes(endpointName, additionalAttributes).join()
266+
.getQueueAttribute(QueueAttributeName.CONTENT_BASED_DEDUPLICATION);
267+
268+
boolean isContentBasedDedup = Boolean.parseBoolean(contentBasedDedupQueueAttribute);
269+
Map<String, Object> defaultHeaders;
270+
if (isContentBasedDedup) {
271+
defaultHeaders = Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString());
272+
}
273+
else {
274+
defaultHeaders = Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString(),
275+
MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
276+
}
277+
278+
return MessageHeaderUtils.addHeadersIfAbsent(message, defaultHeaders);
264279
}
265280

266281
@Override
@@ -408,10 +423,21 @@ private boolean isSkipAttribute(MessageSystemAttributeName name) {
408423
}
409424

410425
private CompletableFuture<QueueAttributes> getQueueAttributes(String endpointName) {
411-
return this.queueAttributesCache.computeIfAbsent(endpointName,
412-
newName -> QueueAttributesResolver.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(newName)
413-
.queueNotFoundStrategy(this.queueNotFoundStrategy).queueAttributeNames(this.queueAttributeNames)
414-
.build().resolveQueueAttributes());
426+
return getQueueAttributes(endpointName, Collections.emptySet());
427+
}
428+
429+
private CompletableFuture<QueueAttributes> getQueueAttributes(String endpointName,
430+
Set<QueueAttributeName> additionalAttributes) {
431+
return this.queueAttributesCache.computeIfAbsent(endpointName, newName -> {
432+
// ensure we have the content based dedupe config to determine default fifo send headers
433+
Set<QueueAttributeName> namesToRequest = new HashSet<>(queueAttributeNames);
434+
if (additionalAttributes != null && !additionalAttributes.isEmpty()) {
435+
namesToRequest.addAll(additionalAttributes);
436+
}
437+
return QueueAttributesResolver.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(newName)
438+
.queueNotFoundStrategy(this.queueNotFoundStrategy).queueAttributeNames(namesToRequest).build()
439+
.resolveQueueAttributes();
440+
});
415441
}
416442

417443
@Override

0 commit comments

Comments
 (0)