Skip to content

Commit cc6b1fc

Browse files
committed
feat: implement automatic batching for SQS operations using SqsAsyncBatchManager
1 parent 6d11e81 commit cc6b1fc

File tree

4 files changed

+299
-0
lines changed

4 files changed

+299
-0
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,130 @@ NOTE: The same factory can be used to create both `single message` and `batch` c
806806

807807
IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods.
808808

809+
==== Automatic Batching with AWS SDK
810+
811+
Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls.
812+
813+
IMPORTANT: This is different from the <<Batch Processing,Batch Processing>> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency.
814+
815+
===== Enabling Automatic Batching
816+
817+
To enable automatic batching, set the following property:
818+
819+
[source,properties]
820+
----
821+
spring.cloud.aws.sqs.batch.enabled=true
822+
----
823+
824+
When enabled, Spring Cloud AWS will automatically wrap the `SqsAsyncClient` with a `BatchingSqsClientAdapter` that uses `SqsAsyncBatchManager` internally.
825+
826+
===== Configuration Properties
827+
828+
The following properties can be used to configure the batching behavior:
829+
830+
[source,properties]
831+
----
832+
# Enable automatic batching (default: false)
833+
spring.cloud.aws.sqs.batch.enabled=true
834+
835+
# Maximum number of messages in a batch (default: AWS SDK default, max: 10)
836+
spring.cloud.aws.sqs.batch.max-number-of-messages=10
837+
838+
# Frequency at which batched requests are sent (default: AWS SDK default)
839+
spring.cloud.aws.sqs.batch.send-batch-frequency=PT0.2S
840+
841+
# Visibility timeout for received messages (default: queue default)
842+
spring.cloud.aws.sqs.batch.visibility-timeout=PT30S
843+
844+
# Wait time for receiveMessage requests (default: AWS SDK default)
845+
spring.cloud.aws.sqs.batch.wait-time-seconds=PT5S
846+
847+
# System attributes to request for receiveMessage calls
848+
spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount
849+
850+
# Message attributes to request for receiveMessage calls
851+
spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId
852+
----
853+
854+
===== Important Considerations
855+
856+
WARNING: When using automatic batching, operations are processed asynchronously by the AWS SDK. This means that a method call may return successfully, but the actual request to AWS SQS might fail later during the batching process. This can result in **false positives** where operations appear to succeed locally but fail during transmission.
857+
858+
Applications should:
859+
860+
- **Always handle the returned `CompletableFuture`** to detect actual transmission errors
861+
- **Implement appropriate error handling and monitoring** to detect delayed failures
862+
- **Consider retry mechanisms** for critical operations
863+
864+
IMPORTANT: **Batch Manager Bypass**: The AWS SDK batch manager will bypass batching and send regular asynchronous requests when `receiveMessage` is called with any of the following parameters:
865+
866+
- `messageAttributeNames`
867+
- `messageSystemAttributeNames`
868+
- `messageSystemAttributeNamesWithStrings` (not used in Spring Cloud AWS `ReceiveMessageRequest`)
869+
- `overrideConfiguration` (not used in Spring Cloud AWS `ReceiveMessageRequest`)
870+
871+
When these parameters are used, the performance benefits of batching are lost for those specific requests.
872+
873+
**Note**: When using Spring Cloud AWS's automatic batching feature, `SqsTemplate` automatically excludes `messageAttributeNames` and `messageSystemAttributeNames` from individual `receiveMessage` requests to maintain batching efficiency. These attributes should be configured globally in the batch configuration instead:
874+
875+
[source,properties]
876+
----
877+
# Configure globally for batched requests
878+
spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount
879+
spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId
880+
----
881+
882+
If you need to use different attribute configurations per request, consider disabling automatic batching and using the standard `SqsAsyncClient` instead.
883+
884+
Example of proper error handling:
885+
886+
[source,java]
887+
----
888+
@Service
889+
public class MessageService {
890+
891+
private final SqsTemplate sqsTemplate;
892+
893+
public MessageService(SqsTemplate sqsTemplate) {
894+
this.sqsTemplate = sqsTemplate;
895+
}
896+
897+
public void sendMessage(String queueName, String message) {
898+
CompletableFuture<SendResult<String>> future = sqsTemplate.sendAsync(queueName, message);
899+
900+
future.whenComplete((result, throwable) -> {
901+
if (throwable != null) {
902+
// Handle actual transmission error
903+
log.error("Failed to send message to queue {}: {}", queueName, throwable.getMessage());
904+
// Implement retry or alternative handling logic
905+
} else {
906+
// Message sent successfully
907+
log.info("Message sent successfully with ID: {}", result.messageId());
908+
}
909+
});
910+
}
911+
}
912+
----
913+
914+
===== Performance Benefits
915+
916+
Automatic batching provides several benefits:
917+
918+
- **Reduced API calls**: Multiple operations are combined into single API calls
919+
- **Lower costs**: Fewer API calls result in reduced AWS charges
920+
- **Improved throughput**: Batching reduces network overhead and latency
921+
- **Better resource utilization**: More efficient use of network and AWS resources
922+
923+
===== Compatibility
924+
925+
Automatic batching is compatible with:
926+
927+
- `SqsTemplate` for sending and receiving messages
928+
- `@SqsListener` methods for message processing
929+
- Both standard and FIFO queues
930+
- All message conversion and error handling features
931+
932+
The batching is transparent to application code - existing `SqsTemplate` and `@SqsListener` code continues to work without changes.
809933

810934
==== Container Options
811935

@@ -877,6 +1001,13 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper
8771001
| `spring.cloud.aws.sqs.listener.auto-startup` | Defines whether SQS listeners are started automatically or not. | No | true
8781002
| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE
8791003
| `spring.cloud.aws.sqs.observation-enabled` | Enables observability support for SQS operations. | No | false
1004+
| `spring.cloud.aws.sqs.batch.enabled` | Enables automatic SQS batching using AWS SDK's SqsAsyncBatchManager. | No | `false`
1005+
| `spring.cloud.aws.sqs.batch.max-number-of-messages` | Maximum number of messages in a batch (max: 10). | No | AWS SDK default
1006+
| `spring.cloud.aws.sqs.batch.send-batch-frequency` | Frequency at which batched requests are sent. | No | AWS SDK default
1007+
| `spring.cloud.aws.sqs.batch.visibility-timeout` | Visibility timeout for received messages in batch. | No | Queue default
1008+
| `spring.cloud.aws.sqs.batch.wait-time-seconds` | Wait time for receiveMessage requests in batch. | No | AWS SDK default
1009+
| `spring.cloud.aws.sqs.batch.system-attribute-names` | System attributes to request for receiveMessage calls. | No | None
1010+
| `spring.cloud.aws.sqs.batch.attribute-names` | Message attributes to request for receiveMessage calls. | No | None
8801011
|===
8811012

8821013

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,30 @@ public void setAutoStartup(Boolean autoStartup) {
169169

170170
}
171171

172+
/**
173+
* Configuration properties for SQS automatic batching using AWS SDK's {@code SqsAsyncBatchManager}.
174+
*
175+
* <p>Automatic batching improves performance and reduces costs by combining multiple SQS requests
176+
* into fewer AWS API calls. When enabled, Spring Cloud AWS will use a {@code BatchingSqsClientAdapter}
177+
* that wraps the standard {@code SqsAsyncClient} with batching capabilities.
178+
*
179+
* <p><strong>Important:</strong> Batched operations are processed asynchronously, which may result
180+
* in false positives where method calls appear to succeed locally but fail during actual transmission
181+
* to AWS. Applications should handle the returned {@code CompletableFuture} objects to detect
182+
* actual transmission errors.
183+
*
184+
* @since 3.2
185+
*/
172186
public static class Batch {
173187

174188
/**
175189
* Enables SQS automatic batching using AWS SDK's SqsAsyncBatchManager.
190+
*
191+
* <p>When set to {@code true}, the {@code SqsAsyncClient} bean will be wrapped
192+
* with a {@code BatchingSqsClientAdapter} that automatically batches requests
193+
* to improve performance and reduce AWS API calls.
194+
*
195+
* <p>Default is {@code false}.
176196
*/
177197
private boolean enabled = false;
178198

spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
*
7171
* @author Tomaz Fernandes
7272
* @author Wei Jiang
73+
* @author khc41
7374
*/
7475
class SqsAutoConfigurationTest {
7576

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,82 @@
1+
/*
2+
* Copyright 2013-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.awspring.cloud.sqs.operations;
217

18+
import org.springframework.util.Assert;
319
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
420
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
521
import software.amazon.awssdk.services.sqs.model.*;
622

723
import java.util.concurrent.CompletableFuture;
824
import java.util.function.Consumer;
925

26+
/**
27+
* An {@link SqsAsyncClient} adapter that provides automatic batching capabilities using AWS SDK's
28+
* {@link SqsAsyncBatchManager}.
29+
*
30+
* <p>This adapter automatically batches SQS operations to improve performance and reduce costs by
31+
* combining multiple requests into fewer AWS API calls. All standard SQS operations are supported:
32+
* send message, receive message, delete message, and change message visibility.
33+
*
34+
* <p><strong>Important - False Positives Warning:</strong> This adapter processes requests
35+
* asynchronously through batching. Method calls may return successfully before the actual request
36+
* is sent to AWS SQS. This can result in false positives where the operation appears to succeed
37+
* locally but fails during the actual transmission to AWS. Applications should:
38+
* <ul>
39+
* <li>Always handle the returned {@link CompletableFuture} to detect actual transmission errors</li>
40+
* <li>Implement appropriate error handling and monitoring</li>
41+
* <li>Consider retry mechanisms for critical operations</li>
42+
* </ul>
43+
*
44+
* <p><strong>Batch Optimization:</strong> The AWS SDK bypasses batching when {@code receiveMessage} is
45+
* called with any of the following parameters: {@code messageAttributeNames}, {@code messageSystemAttributeNames},
46+
* {@code messageSystemAttributeNamesWithStrings}, or {@code overrideConfiguration}. To maintain consistent
47+
* batching performance, Spring Cloud AWS handles these parameters as follows:
48+
* <ul>
49+
* <li>{@code messageAttributeNames} - excluded from per-request, configured globally via {@code spring.cloud.aws.sqs.batch.attribute-names}</li>
50+
* <li>{@code messageSystemAttributeNames} - excluded from per-request, configured globally via {@code spring.cloud.aws.sqs.batch.system-attribute-names}</li>
51+
* <li>{@code messageSystemAttributeNamesWithStrings} - not used in Spring Cloud AWS {@code ReceiveMessageRequest}</li>
52+
* <li>{@code overrideConfiguration} - not used in Spring Cloud AWS {@code ReceiveMessageRequest}</li>
53+
* </ul>
54+
* <p>This design prevents batch bypass and ensures optimal performance.
55+
* If per-request attribute configuration is required, consider disabling automatic batching.
56+
*
57+
* <p>This adapter is automatically configured by Spring Cloud AWS when automatic batching is enabled.
58+
* Users do not need to create instances directly - instead, enable batching through configuration:
59+
*
60+
* <pre>
61+
* spring.cloud.aws.sqs.batch.enabled=true
62+
* </pre>
63+
*
64+
* <p>Once enabled, all {@code SqsTemplate} operations will automatically use batching transparently.
65+
*
66+
* @author khc41
67+
* @since 3.2
68+
* @see SqsAsyncBatchManager
69+
* @see SqsAsyncClient
70+
*/
1071
public class BatchingSqsClientAdapter implements SqsAsyncClient {
1172
private final SqsAsyncBatchManager batchManager;
1273

74+
/**
75+
* Creates a new {@code BatchingSqsClientAdapter} with the specified batch manager.
76+
*
77+
* @param batchManager the {@link SqsAsyncBatchManager} to use for batching operations
78+
* @throws IllegalArgumentException if batchManager is null
79+
*/
1380
public BatchingSqsClientAdapter(SqsAsyncBatchManager batchManager) {
1481
Assert.notNull(batchManager, "batchManager cannot be null");
1582
this.batchManager = batchManager;
@@ -20,46 +87,126 @@ public String serviceName() {
2087
return SqsAsyncClient.SERVICE_NAME;
2188
}
2289

90+
/**
91+
* Closes the underlying batch manager and releases associated resources.
92+
*
93+
* <p>This method should be called when the adapter is no longer needed to ensure
94+
* proper cleanup of threads and connections.
95+
*/
2396
@Override
2497
public void close() {
2598
batchManager.close();
2699
}
27100

101+
/**
102+
* Sends a message to the specified SQS queue using automatic batching.
103+
*
104+
* <p><strong>Important:</strong> This method returns immediately, but the actual sending
105+
* is performed asynchronously. Handle the returned {@link CompletableFuture} to detect
106+
* transmission errors.
107+
*
108+
* @param sendMessageRequest the request containing queue URL and message details
109+
* @return a {@link CompletableFuture} that completes with the send result
110+
*/
28111
@Override
29112
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest sendMessageRequest) {
30113
return batchManager.sendMessage(sendMessageRequest);
31114
}
32115

116+
/**
117+
* Sends a message to the specified SQS queue using automatic batching.
118+
*
119+
* <p><strong>Important:</strong> This method returns immediately, but the actual sending
120+
* is performed asynchronously. Handle the returned {@link CompletableFuture} to detect
121+
* transmission errors.
122+
*
123+
* @param sendMessageRequest a consumer to configure the send message request
124+
* @return a {@link CompletableFuture} that completes with the send result
125+
*/
33126
@Override
34127
public CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) {
35128
return batchManager.sendMessage(sendMessageRequest);
36129
}
37130

131+
/**
132+
* Receives messages from the specified SQS queue using automatic batching.
133+
*
134+
* <p>The batching manager may combine multiple receive requests to optimize
135+
* AWS API usage.
136+
*
137+
* @param receiveMessageRequest the request containing queue URL and receive options
138+
* @return a {@link CompletableFuture} that completes with the received messages
139+
*/
38140
@Override
39141
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest receiveMessageRequest) {
40142
return batchManager.receiveMessage(receiveMessageRequest);
41143
}
42144

145+
/**
146+
* Receives messages from the specified SQS queue using automatic batching.
147+
*
148+
* <p>The batching manager may combine multiple receive requests to optimize
149+
* AWS API usage.
150+
*
151+
* @param receiveMessageRequest a consumer to configure the receive message request
152+
* @return a {@link CompletableFuture} that completes with the received messages
153+
*/
43154
@Override
44155
public CompletableFuture<ReceiveMessageResponse> receiveMessage(Consumer<ReceiveMessageRequest.Builder> receiveMessageRequest) {
45156
return batchManager.receiveMessage(receiveMessageRequest);
46157
}
47158

159+
/**
160+
* Deletes a message from the specified SQS queue using automatic batching.
161+
*
162+
* <p><strong>Important:</strong> The actual deletion may be delayed due to batching.
163+
* Handle the returned {@link CompletableFuture} to confirm successful deletion.
164+
*
165+
* @param deleteMessageRequest the request containing queue URL and receipt handle
166+
* @return a {@link CompletableFuture} that completes with the deletion result
167+
*/
48168
@Override
49169
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest deleteMessageRequest) {
50170
return batchManager.deleteMessage(deleteMessageRequest);
51171
}
52172

173+
/**
174+
* Deletes a message from the specified SQS queue using automatic batching.
175+
*
176+
* <p><strong>Important:</strong> The actual deletion may be delayed due to batching.
177+
* Handle the returned {@link CompletableFuture} to confirm successful deletion.
178+
*
179+
* @param deleteMessageRequest a consumer to configure the delete message request
180+
* @return a {@link CompletableFuture} that completes with the deletion result
181+
*/
53182
@Override
54183
public CompletableFuture<DeleteMessageResponse> deleteMessage(Consumer<DeleteMessageRequest.Builder> deleteMessageRequest) {
55184
return batchManager.deleteMessage(deleteMessageRequest);
56185
}
57186

187+
/**
188+
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching.
189+
*
190+
* <p>The batching manager may combine multiple visibility change requests to optimize
191+
* AWS API usage.
192+
*
193+
* @param changeMessageVisibilityRequest the request containing queue URL, receipt handle, and new timeout
194+
* @return a {@link CompletableFuture} that completes with the visibility change result
195+
*/
58196
@Override
59197
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
60198
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest);
61199
}
62200

201+
/**
202+
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching.
203+
*
204+
* <p>The batching manager may combine multiple visibility change requests to optimize
205+
* AWS API usage.
206+
*
207+
* @param changeMessageVisibilityRequest a consumer to configure the change visibility request
208+
* @return a {@link CompletableFuture} that completes with the visibility change result
209+
*/
63210
@Override
64211
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(Consumer<ChangeMessageVisibilityRequest.Builder> changeMessageVisibilityRequest) {
65212
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest);

0 commit comments

Comments
 (0)