-
-
Notifications
You must be signed in to change notification settings - Fork 350
Add automatic SQS request batching support component: sqs #1438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -806,6 +806,94 @@ NOTE: The same factory can be used to create both `single message` and `batch` c | |
|
||
IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods. | ||
|
||
==== Automatic Batching with AWS SDK | ||
|
||
Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls. | ||
|
||
IMPORTANT: This is different from the <<Batch Processing,Batch Processing>> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency. | ||
|
||
==== Automatic Request Batching with SqsAsyncBatchManager | ||
|
||
Spring Cloud AWS allows you to leverage the AWS SDK's `SqsAsyncBatchManager` for automatic request batching. This feature can significantly improve performance and reduce costs by transparently combining multiple SQS API calls (`sendMessage`, `deleteMessage`, etc.) into single batch requests. | ||
|
||
IMPORTANT: This is different from the <<Batch Processing,Batch Processing>> feature for `@SqsListener`. Listener batch processing deals with handling multiple messages within a single listener invocation, whereas automatic request batching optimizes the underlying API calls to AWS. | ||
|
||
===== Manual Configuration of the Batching Client | ||
|
||
Since automatic batching is a powerful feature with specific trade-offs, Spring Cloud AWS does not auto-configure it. You can enable it by creating your own `SqsAsyncClient` bean using the provided `BatchingSqsClientAdapter`. | ||
|
||
###### 1. Defining the Batching Client Bean | ||
The following example shows how to define a bean named `batchingSqsAsyncClient`. Notice the use of `@Qualifier("sqsAsyncClient")` in the method parameter. This is crucial to explicitly inject the standard, auto-configured `SqsAsyncClient` and avoid ambiguity. | ||
|
||
[source,java] | ||
---- | ||
@Configuration | ||
public class SqsBatchingConfiguration { | ||
|
||
// Define a constant for the bean name to avoid typos | ||
public static final String BATCHING_SQS_ASYNC_CLIENT = "batchingSqsAsyncClient"; | ||
|
||
@Bean(name = BATCHING_SQS_ASYNC_CLIENT) | ||
public SqsAsyncClient batchingSqsAsyncClient( | ||
// Inject the standard, auto-configured client to wrap it | ||
@Qualifier("sqsAsyncClient") SqsAsyncClient standardSqsAsyncClient) { | ||
|
||
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(5); | ||
|
||
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() | ||
.sqsAsyncClient(standardSqsAsyncClient) | ||
.scheduledExecutor(scheduledExecutor) | ||
.build(); | ||
|
||
return new BatchingSqsClientAdapter(batchManager); | ||
} | ||
} | ||
---- | ||
|
||
###### 2. Using the Batching Client | ||
Now, use `@Qualifier` to inject your named bean. The most common use case is configuring a dedicated `SqsTemplate`. | ||
|
||
[source,java] | ||
---- | ||
@Service | ||
public class MyBatchingMessageService { | ||
|
||
private final SqsTemplate batchingSqsTemplate; | ||
|
||
public MyBatchingMessageService( | ||
@Qualifier(SqsBatchingConfiguration.BATCHING_SQS_ASYNC_CLIENT) SqsAsyncClient batchingClient) { | ||
this.batchingSqsTemplate = SqsTemplate.builder() | ||
.sqsAsyncClient(batchingClient) | ||
.build(); | ||
} | ||
// ... service methods using batchingSqsTemplate | ||
} | ||
---- | ||
|
||
===== Important Considerations & Best Practices | ||
|
||
WARNING: **Asynchronous Operations and False Positives**. The batching client processes operations asynchronously. A call to `sqsTemplate.sendAsync(...)` might return a `CompletableFuture` that completes successfully before the message is actually sent to AWS. The actual transmission happens later in a background thread. This can lead to **false positives**. Always attach error handling to the `CompletableFuture` to detect and handle real transmission failures. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually standard behavior for For instance, if we execute a send operation and Also, this part of the documentation states: When you poll the SqsAsyncBatchManager#receiveMessage method in your application, the batch manager fetches messages from its internal buffer, which the SDK automatically updates in the background. I would assume this background buffering would only start after the first call to |
||
|
||
[source,java] | ||
---- | ||
CompletableFuture<SendResult<String>> future = batchingSqsTemplate.sendAsync(queueName, message); | ||
|
||
future.whenComplete((result, ex) -> { | ||
if (ex != null) { | ||
// This is where you handle the actual transmission error | ||
log.error("Failed to send message to queue {}: {}", queueName, ex.getMessage()); | ||
} else { | ||
log.info("Message acknowledged for batch sending with ID: {}", result.messageId()); | ||
} | ||
}); | ||
---- | ||
|
||
WARNING: **Not Recommended for `@SqsListener`**. While technically compatible, using this batching client with `@SqsListener` for receiving messages is **not recommended**. The `@SqsListener` infrastructure already performs efficient batch receiving and has a complex acknowledgment lifecycle. Adding another layer of asynchronous batching provides limited performance benefits while significantly increasing complexity. For listeners, it's best to rely on the default `SqsAsyncClient`. | ||
|
||
IMPORTANT: **Bean Injection Safety**. By using a named bean and `@Qualifier` as shown in the configuration examples, you ensure the batching client is only used where intended. This prevents it from accidentally being injected into `@SqsListener` infrastructure, which should use the default `SqsAsyncClient`. | ||
|
||
IMPORTANT: **AWS SDK Batching Bypass**. The `SqsAsyncBatchManager` will bypass batching for `receiveMessage` calls if certain parameters like `messageAttributeNames` are set on a per-request basis. To ensure batching works effectively, these should be configured globally on the `SqsAsyncBatchManager` builder, not on individual `receiveMessage` calls. See the `BatchingSqsClientAdapter` Javadoc for more details. | ||
|
||
==== Container Options | ||
|
||
Each `MessageListenerContainer` can have a different set of options. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
/* | ||
* Copyright 2013-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.awspring.cloud.sqs.operations; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.function.Consumer; | ||
import org.springframework.util.Assert; | ||
import software.amazon.awssdk.services.sqs.SqsAsyncClient; | ||
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; | ||
import software.amazon.awssdk.services.sqs.model.*; | ||
|
||
/** | ||
* An {@link SqsAsyncClient} adapter that provides automatic batching capabilities using AWS SDK's | ||
* {@link SqsAsyncBatchManager}. | ||
* | ||
* <p> | ||
* This adapter automatically batches SQS operations to improve performance and reduce costs by combining multiple | ||
* requests into fewer AWS API calls. All standard SQS operations are supported: send message, receive message, delete | ||
* message, and change message visibility. | ||
* | ||
* <p> | ||
* <strong>Important - False Positives Warning:</strong> This adapter processes requests asynchronously through | ||
* batching. Method calls may return successfully before the actual request is sent to AWS SQS. This can result in false | ||
* positives where the operation appears to succeed locally but fails during the actual transmission to AWS. | ||
* Applications should: | ||
* <ul> | ||
* <li>Always handle the returned {@link CompletableFuture} to detect actual transmission errors</li> | ||
* <li>Implement appropriate error handling and monitoring</li> | ||
* <li>Consider retry mechanisms for critical operations</li> | ||
* </ul> | ||
* | ||
* <p> | ||
* <strong>Batch Optimization:</strong> The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses | ||
* batching for {@code receiveMessage} calls that include per-request configurations for certain parameters. | ||
* To ensure batching is not bypassed, it is recommended to configure these settings globally on the | ||
* {@code SqsAsyncBatchManager} builder instead of on each {@code ReceiveMessageRequest}. | ||
* The parameters that trigger this bypass are: | ||
* <ul> | ||
* <li>{@code messageAttributeNames}</li> | ||
* <li>{@code messageSystemAttributeNames}</li> | ||
* <li>{@code messageSystemAttributeNamesWithStrings}</li> | ||
* <li>{@code overrideConfiguration}</li> | ||
* </ul> | ||
* <p> | ||
* By configuring these globally on the manager, you ensure consistent batching performance. If you require | ||
* per-request attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be | ||
* more appropriate. | ||
* @author Heechul Kang | ||
* @since 3.2 | ||
* @see SqsAsyncBatchManager | ||
* @see SqsAsyncClient | ||
*/ | ||
public class BatchingSqsClientAdapter implements SqsAsyncClient { | ||
private final SqsAsyncBatchManager batchManager; | ||
|
||
/** | ||
* Creates a new {@code BatchingSqsClientAdapter} with the specified batch manager. | ||
* | ||
* @param batchManager the {@link SqsAsyncBatchManager} to use for batching operations | ||
* @throws IllegalArgumentException if batchManager is null | ||
*/ | ||
public BatchingSqsClientAdapter(SqsAsyncBatchManager batchManager) { | ||
Assert.notNull(batchManager, "batchManager cannot be null"); | ||
this.batchManager = batchManager; | ||
} | ||
|
||
@Override | ||
public String serviceName() { | ||
return SqsAsyncClient.SERVICE_NAME; | ||
} | ||
|
||
/** | ||
* Closes the underlying batch manager and releases associated resources. | ||
* | ||
* <p> | ||
* This method should be called when the adapter is no longer needed to ensure proper cleanup of threads and | ||
* connections. | ||
*/ | ||
@Override | ||
public void close() { | ||
batchManager.close(); | ||
} | ||
|
||
/** | ||
* Sends a message to the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* <strong>Important:</strong> This method returns immediately, but the actual sending is performed asynchronously. | ||
* Handle the returned {@link CompletableFuture} to detect transmission errors. | ||
* | ||
* @param sendMessageRequest the request containing queue URL and message details | ||
* @return a {@link CompletableFuture} that completes with the send result | ||
*/ | ||
@Override | ||
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest sendMessageRequest) { | ||
return batchManager.sendMessage(sendMessageRequest); | ||
} | ||
|
||
/** | ||
* Sends a message to the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* <strong>Important:</strong> This method returns immediately, but the actual sending is performed asynchronously. | ||
* Handle the returned {@link CompletableFuture} to detect transmission errors. | ||
* | ||
* @param sendMessageRequest a consumer to configure the send message request | ||
* @return a {@link CompletableFuture} that completes with the send result | ||
*/ | ||
@Override | ||
public CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) { | ||
return batchManager.sendMessage(sendMessageRequest); | ||
} | ||
|
||
/** | ||
* Receives messages from the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* The batching manager may combine multiple receive requests to optimize AWS API usage. | ||
* | ||
* @param receiveMessageRequest the request containing queue URL and receive options | ||
* @return a {@link CompletableFuture} that completes with the received messages | ||
*/ | ||
@Override | ||
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest receiveMessageRequest) { | ||
return batchManager.receiveMessage(receiveMessageRequest); | ||
} | ||
|
||
/** | ||
* Receives messages from the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* The batching manager may combine multiple receive requests to optimize AWS API usage. | ||
* | ||
* @param receiveMessageRequest a consumer to configure the receive message request | ||
* @return a {@link CompletableFuture} that completes with the received messages | ||
*/ | ||
@Override | ||
public CompletableFuture<ReceiveMessageResponse> receiveMessage( | ||
Consumer<ReceiveMessageRequest.Builder> receiveMessageRequest) { | ||
return batchManager.receiveMessage(receiveMessageRequest); | ||
} | ||
|
||
/** | ||
* Deletes a message from the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* <strong>Important:</strong> The actual deletion may be delayed due to batching. Handle the returned | ||
* {@link CompletableFuture} to confirm successful deletion. | ||
* | ||
* @param deleteMessageRequest the request containing queue URL and receipt handle | ||
* @return a {@link CompletableFuture} that completes with the deletion result | ||
*/ | ||
@Override | ||
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest deleteMessageRequest) { | ||
return batchManager.deleteMessage(deleteMessageRequest); | ||
} | ||
|
||
/** | ||
* Deletes a message from the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* <strong>Important:</strong> The actual deletion may be delayed due to batching. Handle the returned | ||
* {@link CompletableFuture} to confirm successful deletion. | ||
* | ||
* @param deleteMessageRequest a consumer to configure the delete message request | ||
* @return a {@link CompletableFuture} that completes with the deletion result | ||
*/ | ||
@Override | ||
public CompletableFuture<DeleteMessageResponse> deleteMessage( | ||
Consumer<DeleteMessageRequest.Builder> deleteMessageRequest) { | ||
return batchManager.deleteMessage(deleteMessageRequest); | ||
} | ||
|
||
/** | ||
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* The batching manager may combine multiple visibility change requests to optimize AWS API usage. | ||
* | ||
* @param changeMessageVisibilityRequest the request containing queue URL, receipt handle, and new timeout | ||
* @return a {@link CompletableFuture} that completes with the visibility change result | ||
*/ | ||
@Override | ||
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility( | ||
ChangeMessageVisibilityRequest changeMessageVisibilityRequest) { | ||
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest); | ||
} | ||
|
||
/** | ||
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching. | ||
* | ||
* <p> | ||
* The batching manager may combine multiple visibility change requests to optimize AWS API usage. | ||
* | ||
* @param changeMessageVisibilityRequest a consumer to configure the change visibility request | ||
* @return a {@link CompletableFuture} that completes with the visibility change result | ||
*/ | ||
@Override | ||
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility( | ||
Consumer<ChangeMessageVisibilityRequest.Builder> changeMessageVisibilityRequest) { | ||
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we call out earlier that this feature is mostly intended to be used with the
SqsTemplate
rather than a general client to be used as a default bean, including in@SqsListener
?