Skip to content

Commit 626e228

Browse files
committed
feat: add scheduled executor pool size configuration for SQS batching operations
1 parent 76b7fe8 commit 626e228

File tree

4 files changed

+75
-5
lines changed

4 files changed

+75
-5
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper
10071007
| `spring.cloud.aws.sqs.batch.wait-time-seconds` | Wait time for receiveMessage requests in batch. | No | AWS SDK default
10081008
| `spring.cloud.aws.sqs.batch.system-attribute-names` | System attributes to request for receiveMessage calls. | No | None
10091009
| `spring.cloud.aws.sqs.batch.attribute-names` | Message attributes to request for receiveMessage calls. | No | None
1010+
| `spring.cloud.aws.sqs.batch.scheduled-executor-pool-size` | Size of the scheduled thread pool for batching operations. | No | 5
10101011
|===
10111012

10121013

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.awspring.cloud.sqs.support.observation.SqsTemplateObservation;
3636
import io.micrometer.observation.ObservationRegistry;
3737
import java.util.concurrent.Executors;
38+
import java.util.concurrent.ScheduledExecutorService;
3839
import org.springframework.beans.factory.ObjectProvider;
3940
import org.springframework.boot.autoconfigure.AutoConfiguration;
4041
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
@@ -60,6 +61,7 @@
6061
* @author Maciej Walkowiak
6162
* @author Wei Jiang
6263
* @author Dongha Kim
64+
* @author khc41
6365
* @since 3.0
6466
*/
6567
@AutoConfiguration
@@ -91,14 +93,27 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
9193
@ConditionalOnProperty(name = "spring.cloud.aws.sqs.batch.enabled", havingValue = "true")
9294
@Bean
9395
@Primary
94-
public SqsAsyncClient batchSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
95-
SqsAsyncBatchManager batchManager = createBatchManager(sqsAsyncClient);
96+
public SqsAsyncClient batchSqsAsyncClient(SqsAsyncClient sqsAsyncClient,
97+
ScheduledExecutorService sqsBatchingScheduledExecutor) {
98+
SqsAsyncBatchManager batchManager = createBatchManager(sqsAsyncClient, sqsBatchingScheduledExecutor);
9699
return new BatchingSqsClientAdapter(batchManager);
97100
}
98101

99-
private SqsAsyncBatchManager createBatchManager(SqsAsyncClient sqsAsyncClient) {
100-
return SqsAsyncBatchManager.builder().client(sqsAsyncClient)
101-
.scheduledExecutor(Executors.newScheduledThreadPool(5))
102+
@ConditionalOnProperty(name = "spring.cloud.aws.sqs.batch.enabled", havingValue = "true")
103+
@ConditionalOnMissingBean(name = "sqsBatchingScheduledExecutor")
104+
@Bean
105+
public ScheduledExecutorService sqsBatchingScheduledExecutor() {
106+
int poolSize = this.sqsProperties.getBatch().getScheduledExecutorPoolSize();
107+
if (poolSize <= 0) {
108+
throw new IllegalArgumentException(
109+
"scheduledExecutorPoolSize must be greater than 0, but was: " + poolSize);
110+
}
111+
return Executors.newScheduledThreadPool(poolSize);
112+
}
113+
114+
private SqsAsyncBatchManager createBatchManager(SqsAsyncClient sqsAsyncClient,
115+
ScheduledExecutorService scheduledExecutor) {
116+
return SqsAsyncBatchManager.builder().client(sqsAsyncClient).scheduledExecutor(scheduledExecutor)
102117
.overrideConfiguration(this::configurationProperties).build();
103118
}
104119

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,15 @@ public static class Batch {
220220
@Nullable
221221
private List<String> attributeNames;
222222

223+
/**
224+
* The size of the scheduled thread pool used for batching operations. This thread pool handles periodic batch
225+
* sending and other scheduled tasks.
226+
*
227+
* <p>
228+
* Default is {@code 5}.
229+
*/
230+
private int scheduledExecutorPoolSize = 5;
231+
223232
public boolean isEnabled() {
224233
return enabled;
225234
}
@@ -282,6 +291,14 @@ public void setAttributeNames(List<String> attributeNames) {
282291
this.attributeNames = attributeNames;
283292
}
284293

294+
public int getScheduledExecutorPoolSize() {
295+
return scheduledExecutorPoolSize;
296+
}
297+
298+
public void setScheduledExecutorPoolSize(int scheduledExecutorPoolSize) {
299+
this.scheduledExecutorPoolSize = scheduledExecutorPoolSize;
300+
}
301+
285302
}
286303

287304
}

spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ void sqsBatchConfigurationPropertiesWithDefaults() {
405405
assertThat(batchConfig.getWaitTimeSeconds()).isNull();
406406
assertThat(batchConfig.getSystemAttributeNames()).isNull();
407407
assertThat(batchConfig.getAttributeNames()).isNull();
408+
assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(5);
408409

409410
assertThat(context).hasSingleBean(SqsAsyncClient.class);
410411
SqsAsyncClient client = context.getBean(SqsAsyncClient.class);
@@ -454,6 +455,42 @@ void sqsBatchConfigurationWithAttributeNames() {
454455
});
455456
}
456457

458+
@Test
459+
void sqsBatchConfigurationWithDefaultScheduledExecutorPoolSize() {
460+
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true").run(context -> {
461+
assertThat(context).hasSingleBean(SqsProperties.class);
462+
SqsProperties sqsProperties = context.getBean(SqsProperties.class);
463+
SqsProperties.Batch batchConfig = sqsProperties.getBatch();
464+
465+
assertThat(batchConfig.isEnabled()).isTrue();
466+
assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(5);
467+
468+
assertThat(context).hasBean("sqsBatchingScheduledExecutor");
469+
});
470+
}
471+
472+
@Test
473+
void sqsBatchConfigurationWithCustomScheduledExecutorPoolSize() {
474+
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true",
475+
"spring.cloud.aws.sqs.batch.scheduled-executor-pool-size:10").run(context -> {
476+
assertThat(context).hasSingleBean(SqsProperties.class);
477+
SqsProperties sqsProperties = context.getBean(SqsProperties.class);
478+
SqsProperties.Batch batchConfig = sqsProperties.getBatch();
479+
480+
assertThat(batchConfig.isEnabled()).isTrue();
481+
assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(10);
482+
483+
assertThat(context).hasBean("sqsBatchingScheduledExecutor");
484+
});
485+
}
486+
487+
@Test
488+
void sqsBatchConfigurationWithBatchDisabledDoesNotCreateScheduledExecutor() {
489+
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:false").run(context -> {
490+
assertThat(context).doesNotHaveBean("sqsBatchingScheduledExecutor");
491+
});
492+
}
493+
457494
@Configuration(proxyBeanMethods = false)
458495
static class CustomComponentsConfiguration {
459496

0 commit comments

Comments
 (0)