Skip to content

Commit f42e019

Browse files
committed
Refactor: Focus on BatchingSqsClientAdapter;
1 parent 2ba6fd5 commit f42e019

File tree

8 files changed

+1272
-5
lines changed

8 files changed

+1272
-5
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,131 @@ NOTE: The same factory can be used to create both `single message` and `batch` c
806806

807807
IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods.
808808

809+
==== Automatic Batching with AWS SDK
810+
811+
Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls.
812+
813+
IMPORTANT: This is different from the <<Batch Processing,Batch Processing>> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency.
814+
815+
===== Enabling Automatic Batching
816+
817+
To enable automatic batching, set the following property:
818+
819+
[source,properties]
820+
----
821+
spring.cloud.aws.sqs.batch.enabled=true
822+
----
823+
824+
When enabled, Spring Cloud AWS will automatically wrap the `SqsAsyncClient` with a `BatchingSqsClientAdapter` that uses `SqsAsyncBatchManager` internally.
825+
826+
===== Configuration Properties
827+
828+
The following properties can be used to configure the batching behavior:
829+
830+
[source,properties]
831+
----
832+
# Enable automatic batching (default: false)
833+
spring.cloud.aws.sqs.batch.enabled=true
834+
835+
# Maximum number of messages in a batch (default: AWS SDK default, max: 10)
836+
spring.cloud.aws.sqs.batch.max-number-of-messages=10
837+
838+
# Frequency at which batched requests are sent (default: AWS SDK default)
839+
spring.cloud.aws.sqs.batch.send-batch-frequency=PT0.2S
840+
841+
# Visibility timeout for received messages (default: queue default)
842+
spring.cloud.aws.sqs.batch.visibility-timeout=PT30S
843+
844+
# Wait time for receiveMessage requests (default: AWS SDK default)
845+
spring.cloud.aws.sqs.batch.wait-time-seconds=PT5S
846+
847+
# System attributes to request for receiveMessage calls
848+
spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount
849+
850+
# Message attributes to request for receiveMessage calls
851+
spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId
852+
----
853+
854+
===== Important Considerations
855+
856+
WARNING: When using automatic batching, operations are processed asynchronously by the AWS SDK. This means that a method call may return successfully, but the actual request to AWS SQS might fail later during the batching process. This can result in **false positives** where operations appear to succeed locally but fail during transmission.
857+
858+
Applications should:
859+
860+
- **Always handle the returned `CompletableFuture`** to detect actual transmission errors
861+
- **Implement appropriate error handling and monitoring** to detect delayed failures
862+
- **Consider retry mechanisms** for critical operations
863+
864+
IMPORTANT: **Batch Manager Bypass**: The AWS SDK batch manager will bypass batching and send regular asynchronous requests when `receiveMessage` is called with any of the following parameters:
865+
866+
- `messageAttributeNames`
867+
- `messageSystemAttributeNames`
868+
- `messageSystemAttributeNamesWithStrings` (not used in Spring Cloud AWS `ReceiveMessageRequest`)
869+
- `overrideConfiguration` (not used in Spring Cloud AWS `ReceiveMessageRequest`)
870+
871+
When these parameters are used, the performance benefits of batching are lost for those specific requests.
872+
873+
**Note**: When using Spring Cloud AWS's automatic batching feature, `SqsTemplate` automatically excludes `messageAttributeNames` and `messageSystemAttributeNames` from individual `receiveMessage` requests to maintain batching efficiency. These attributes should be configured globally in the batch configuration instead:
874+
875+
[source,properties]
876+
----
877+
# Configure globally for batched requests
878+
spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount
879+
spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId
880+
----
881+
882+
If you need to use different attribute configurations per request, consider disabling automatic batching and using the standard `SqsAsyncClient` instead.
883+
884+
Example of proper error handling:
885+
886+
[source,java]
887+
----
888+
@Service
889+
public class MessageService {
890+
891+
private final SqsTemplate sqsTemplate;
892+
893+
public MessageService(SqsTemplate sqsTemplate) {
894+
this.sqsTemplate = sqsTemplate;
895+
}
896+
897+
public void sendMessage(String queueName, String message) {
898+
CompletableFuture<SendResult<String>> future = sqsTemplate.sendAsync(queueName, message);
899+
900+
future.whenComplete((result, throwable) -> {
901+
if (throwable != null) {
902+
// Handle actual transmission error
903+
log.error("Failed to send message to queue {}: {}", queueName, throwable.getMessage());
904+
// Implement retry or alternative handling logic
905+
} else {
906+
// Message sent successfully
907+
log.info("Message sent successfully with ID: {}", result.messageId());
908+
}
909+
});
910+
}
911+
}
912+
----
913+
914+
===== Performance Benefits
915+
916+
Automatic batching provides several benefits:
917+
918+
- **Reduced API calls**: Multiple operations are combined into single API calls
919+
- **Lower costs**: Fewer API calls result in reduced AWS charges
920+
- **Improved throughput**: Batching reduces network overhead and latency
921+
- **Better resource utilization**: More efficient use of network and AWS resources
922+
923+
===== Compatibility
924+
925+
Automatic batching is compatible with:
926+
927+
- `SqsTemplate` for sending and receiving messages
928+
- `@SqsListener` methods for message processing
929+
- Both standard and FIFO queues
930+
- All message conversion and error handling features
931+
932+
The batching is transparent to application code - existing `SqsTemplate` and `@SqsListener` code continues to work without changes.
933+
809934
==== Container Options
810935

811936
Each `MessageListenerContainer` can have a different set of options.

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import io.awspring.cloud.autoconfigure.AwsClientProperties;
1919
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
2020
import java.time.Duration;
21+
import java.util.List;
2122
import org.springframework.boot.context.properties.ConfigurationProperties;
2223
import org.springframework.lang.Nullable;
24+
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
2325

2426
/**
2527
* Properties related to AWS SQS.
@@ -38,6 +40,8 @@ public class SqsProperties extends AwsClientProperties {
3840

3941
private Listener listener = new Listener();
4042

43+
private Batch batch = new Batch();
44+
4145
public Listener getListener() {
4246
return this.listener;
4347
}
@@ -46,6 +50,14 @@ public void setListener(Listener listener) {
4650
this.listener = listener;
4751
}
4852

53+
public Batch getBatch() {
54+
return batch;
55+
}
56+
57+
public void setBatch(Batch batch) {
58+
this.batch = batch;
59+
}
60+
4961
@Nullable
5062
private QueueNotFoundStrategy queueNotFoundStrategy;
5163

@@ -153,6 +165,155 @@ public Boolean getAutoStartup() {
153165
public void setAutoStartup(Boolean autoStartup) {
154166
this.autoStartup = autoStartup;
155167
}
168+
169+
}
170+
171+
/**
172+
* Configuration properties for SQS automatic batching using AWS SDK's {@code SqsAsyncBatchManager}.
173+
*
174+
* <p>
175+
* Automatic batching improves performance and reduces costs by combining multiple SQS requests into fewer AWS API
176+
* calls. When enabled, Spring Cloud AWS will use a {@code BatchingSqsClientAdapter} that wraps the standard
177+
* {@code SqsAsyncClient} with batching capabilities.
178+
*
179+
* <p>
180+
* <strong>Important:</strong> Batched operations are processed asynchronously, which may result in false positives
181+
* where method calls appear to succeed locally but fail during actual transmission to AWS. Applications should
182+
* handle the returned {@code CompletableFuture} objects to detect actual transmission errors.
183+
*
184+
* @since 3.2
185+
*/
186+
public static class Batch {
187+
188+
/**
189+
* Enables SQS automatic batching using AWS SDK's SqsAsyncBatchManager.
190+
*
191+
* <p>
192+
* When set to {@code true}, the {@code SqsAsyncClient} bean will be wrapped with a
193+
* {@code BatchingSqsClientAdapter} that automatically batches requests to improve performance and reduce AWS
194+
* API calls.
195+
*
196+
* <p>
197+
* Default is {@code false}.
198+
*/
199+
private boolean enabled = false;
200+
201+
/**
202+
* The maximum number of messages that can be processed in a single batch. The maximum is 10.
203+
*/
204+
@Nullable
205+
private Integer maxNumberOfMessages;
206+
207+
/**
208+
* The frequency at which requests are sent to SQS when processing messages in a batch.
209+
*/
210+
@Nullable
211+
private Duration sendBatchFrequency;
212+
213+
/**
214+
* The visibility timeout to set for messages received in a batch. If unset, the queue default is used.
215+
*/
216+
@Nullable
217+
private Duration visibilityTimeout;
218+
219+
/**
220+
* The minimum wait duration for a receiveMessage request in a batch. To avoid unnecessary CPU usage, do not set
221+
* this value to 0.
222+
*/
223+
@Nullable
224+
private Duration waitTimeSeconds;
225+
226+
/**
227+
* The list of system attribute names to request for receiveMessage calls.
228+
*/
229+
@Nullable
230+
private List<MessageSystemAttributeName> systemAttributeNames;
231+
232+
/**
233+
* The list of attribute names to request for receiveMessage calls.
234+
*/
235+
@Nullable
236+
private List<String> attributeNames;
237+
238+
/**
239+
* The size of the scheduled thread pool used for batching operations. This thread pool handles periodic batch
240+
* sending and other scheduled tasks.
241+
*
242+
* <p>
243+
* Default is {@code 5}.
244+
*/
245+
private int scheduledExecutorPoolSize = 5;
246+
247+
public boolean isEnabled() {
248+
return enabled;
249+
}
250+
251+
public void setEnabled(boolean enabled) {
252+
this.enabled = enabled;
253+
}
254+
255+
@Nullable
256+
public Integer getMaxNumberOfMessages() {
257+
return maxNumberOfMessages;
258+
}
259+
260+
public void setMaxNumberOfMessages(Integer maxNumberOfMessages) {
261+
this.maxNumberOfMessages = maxNumberOfMessages;
262+
}
263+
264+
@Nullable
265+
public Duration getSendBatchFrequency() {
266+
return sendBatchFrequency;
267+
}
268+
269+
public void setSendBatchFrequency(Duration sendBatchFrequency) {
270+
this.sendBatchFrequency = sendBatchFrequency;
271+
}
272+
273+
@Nullable
274+
public Duration getVisibilityTimeout() {
275+
return visibilityTimeout;
276+
}
277+
278+
public void setVisibilityTimeout(Duration visibilityTimeout) {
279+
this.visibilityTimeout = visibilityTimeout;
280+
}
281+
282+
@Nullable
283+
public Duration getWaitTimeSeconds() {
284+
return waitTimeSeconds;
285+
}
286+
287+
public void setWaitTimeSeconds(Duration waitTimeSeconds) {
288+
this.waitTimeSeconds = waitTimeSeconds;
289+
}
290+
291+
@Nullable
292+
public List<MessageSystemAttributeName> getSystemAttributeNames() {
293+
return systemAttributeNames;
294+
}
295+
296+
public void setSystemAttributeNames(List<MessageSystemAttributeName> systemAttributeNames) {
297+
this.systemAttributeNames = systemAttributeNames;
298+
}
299+
300+
@Nullable
301+
public List<String> getAttributeNames() {
302+
return attributeNames;
303+
}
304+
305+
public void setAttributeNames(List<String> attributeNames) {
306+
this.attributeNames = attributeNames;
307+
}
308+
309+
public int getScheduledExecutorPoolSize() {
310+
return scheduledExecutorPoolSize;
311+
}
312+
313+
public void setScheduledExecutorPoolSize(int scheduledExecutorPoolSize) {
314+
this.scheduledExecutorPoolSize = scheduledExecutorPoolSize;
315+
}
316+
156317
}
157318

158319
}

0 commit comments

Comments
 (0)