Skip to content

Commit cff92cb

Browse files
committed
feat: add batch processing support in SqsAutoConfiguration
1 parent 07636d6 commit cff92cb

File tree

1 file changed

+48
-22
lines changed

1 file changed

+48
-22
lines changed

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717

1818
import com.fasterxml.jackson.databind.ObjectMapper;
1919
import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer;
20-
import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
21-
import io.awspring.cloud.autoconfigure.core.AwsClientCustomizer;
22-
import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
23-
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
24-
import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
20+
import io.awspring.cloud.autoconfigure.core.*;
2521
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
2622
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
2723
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
@@ -30,6 +26,7 @@
3026
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
3127
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
3228
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
29+
import io.awspring.cloud.sqs.operations.BatchingSqsClientAdapter;
3330
import io.awspring.cloud.sqs.operations.SqsTemplate;
3431
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
3532
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
@@ -48,8 +45,11 @@
4845
import org.springframework.boot.context.properties.PropertyMapper;
4946
import org.springframework.context.annotation.Bean;
5047
import org.springframework.context.annotation.Import;
48+
import org.springframework.context.annotation.Primary;
5149
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
5250
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
51+
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
52+
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
5353
import software.amazon.awssdk.services.sqs.model.Message;
5454

5555
/**
@@ -82,9 +82,35 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
8282
ObjectProvider<AwsConnectionDetails> connectionDetails,
8383
ObjectProvider<SqsAsyncClientCustomizer> sqsAsyncClientCustomizers,
8484
ObjectProvider<AwsAsyncClientCustomizer> awsAsyncClientCustomizers) {
85-
return awsClientBuilderConfigurer.configureAsyncClient(SqsAsyncClient.builder(), this.sqsProperties,
86-
connectionDetails.getIfAvailable(), configurer.getIfAvailable(),
87-
sqsAsyncClientCustomizers.orderedStream(), awsAsyncClientCustomizers.orderedStream()).build();
85+
return awsClientBuilderConfigurer.configureAsyncClient(SqsAsyncClient.builder(), this.sqsProperties,
86+
connectionDetails.getIfAvailable(), configurer.getIfAvailable(),
87+
sqsAsyncClientCustomizers.orderedStream(), awsAsyncClientCustomizers.orderedStream()).build();
88+
}
89+
90+
@ConditionalOnProperty(name = "spring.cloud.aws.sqs.batch.enabled", havingValue = "true")
91+
@Bean
92+
@Primary
93+
public SqsAsyncClient batchSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
94+
SqsAsyncBatchManager batchManager = createBatchManager(sqsAsyncClient);
95+
return new BatchingSqsClientAdapter(batchManager);
96+
}
97+
98+
private SqsAsyncBatchManager createBatchManager(SqsAsyncClient sqsAsyncClient) {
99+
return SqsAsyncBatchManager.builder()
100+
.client(sqsAsyncClient)
101+
.overrideConfiguration(this::configurationProperties)
102+
.build();
103+
}
104+
105+
private void configurationProperties(BatchOverrideConfiguration.Builder options) {
106+
PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull();
107+
mapper.from(this.sqsProperties.getBatch().getMaxNumberOfMessages()).to(options::maxBatchSize);
108+
mapper.from(this.sqsProperties.getBatch().getSendBatchFrequency()).to(options::sendRequestFrequency);
109+
mapper.from(this.sqsProperties.getBatch().getWaitTimeSeconds()).to(options::receiveMessageMinWaitDuration);
110+
mapper.from(this.sqsProperties.getBatch().getVisibilityTimeout()).to(options::receiveMessageVisibilityTimeout);
111+
mapper.from(this.sqsProperties.getBatch().getSystemAttributeNames())
112+
.to(options::receiveMessageSystemAttributeNames);
113+
mapper.from(this.sqsProperties.getBatch().getAttributeNames()).to(options::receiveMessageAttributeNames);
88114
}
89115

90116
@ConditionalOnMissingBean
@@ -93,15 +119,15 @@ public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<Obj
93119
ObjectProvider<ObservationRegistry> observationRegistryProvider,
94120
ObjectProvider<SqsTemplateObservation.Convention> observationConventionProvider,
95121
MessagingMessageConverter<Message> messageConverter) {
96-
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
97-
.messageConverter(messageConverter);
122+
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
123+
.messageConverter(messageConverter);
98124
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
99-
if (this.sqsProperties.isObservationEnabled()) {
100-
observationRegistryProvider
101-
.ifAvailable(registry -> builder.configure(options -> options.observationRegistry(registry)));
102-
observationConventionProvider
103-
.ifAvailable(convention -> builder.configure(options -> options.observationConvention(convention)));
104-
}
125+
if (this.sqsProperties.isObservationEnabled()) {
126+
observationRegistryProvider
127+
.ifAvailable(registry -> builder.configure(options -> options.observationRegistry(registry)));
128+
observationConventionProvider
129+
.ifAvailable(convention -> builder.configure(options -> options.observationConvention(convention)));
130+
}
105131
if (sqsProperties.getQueueNotFoundStrategy() != null) {
106132
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
107133
}
@@ -127,12 +153,12 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
127153
interceptors.forEach(factory::addMessageInterceptor);
128154
asyncInterceptors.forEach(factory::addMessageInterceptor);
129155
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
130-
if (this.sqsProperties.isObservationEnabled()) {
131-
observationRegistry
132-
.ifAvailable(registry -> factory.configure(options -> options.observationRegistry(registry)));
133-
observationConventionProvider
134-
.ifAvailable(convention -> factory.configure(options -> options.observationConvention(convention)));
135-
}
156+
if (this.sqsProperties.isObservationEnabled()) {
157+
observationRegistry
158+
.ifAvailable(registry -> factory.configure(options -> options.observationRegistry(registry)));
159+
observationConventionProvider
160+
.ifAvailable(convention -> factory.configure(options -> options.observationConvention(convention)));
161+
}
136162
factory.configure(options -> options.messageConverter(messagingMessageConverter));
137163
return factory;
138164
}

0 commit comments

Comments
 (0)