diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index e045d0911..a9df9aa17 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -806,6 +806,94 @@ NOTE: The same factory can be used to create both `single message` and `batch` c IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods. +==== Automatic Batching with AWS SDK + +Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls. + +IMPORTANT: This is different from the <> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency. + +==== Automatic Request Batching with SqsAsyncBatchManager + +Spring Cloud AWS allows you to leverage the AWS SDK's `SqsAsyncBatchManager` for automatic request batching. This feature can significantly improve performance and reduce costs by transparently combining multiple SQS API calls (`sendMessage`, `deleteMessage`, etc.) into single batch requests. + +IMPORTANT: This is different from the <> feature for `@SqsListener`. Listener batch processing deals with handling multiple messages within a single listener invocation, whereas automatic request batching optimizes the underlying API calls to AWS. + +===== Manual Configuration of the Batching Client + +Since automatic batching is a powerful feature with specific trade-offs, Spring Cloud AWS does not auto-configure it. You can enable it by creating your own `SqsAsyncClient` bean using the provided `BatchingSqsClientAdapter`. + +###### 1. Defining the Batching Client Bean +The following example shows how to define a bean named `batchingSqsAsyncClient`. Notice the use of `@Qualifier("sqsAsyncClient")` in the method parameter. This is crucial to explicitly inject the standard, auto-configured `SqsAsyncClient` and avoid ambiguity. + +[source,java] +---- +@Configuration +public class SqsBatchingConfiguration { + + // Define a constant for the bean name to avoid typos + public static final String BATCHING_SQS_ASYNC_CLIENT = "batchingSqsAsyncClient"; + + @Bean(name = BATCHING_SQS_ASYNC_CLIENT) + public SqsAsyncClient batchingSqsAsyncClient( + // Inject the standard, auto-configured client to wrap it + @Qualifier("sqsAsyncClient") SqsAsyncClient standardSqsAsyncClient) { + + ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(5); + + SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() + .sqsAsyncClient(standardSqsAsyncClient) + .scheduledExecutor(scheduledExecutor) + .build(); + + return new BatchingSqsClientAdapter(batchManager); + } +} +---- + +###### 2. Using the Batching Client +Now, use `@Qualifier` to inject your named bean. The most common use case is configuring a dedicated `SqsTemplate`. + +[source,java] +---- +@Service +public class MyBatchingMessageService { + + private final SqsTemplate batchingSqsTemplate; + + public MyBatchingMessageService( + @Qualifier(SqsBatchingConfiguration.BATCHING_SQS_ASYNC_CLIENT) SqsAsyncClient batchingClient) { + this.batchingSqsTemplate = SqsTemplate.builder() + .sqsAsyncClient(batchingClient) + .build(); + } + // ... service methods using batchingSqsTemplate +} +---- + +===== Important Considerations & Best Practices + +WARNING: **Asynchronous Operations and False Positives**. The batching client processes operations asynchronously. A call to `sqsTemplate.sendAsync(...)` might return a `CompletableFuture` that completes successfully before the message is actually sent to AWS. The actual transmission happens later in a background thread. This can lead to **false positives**. Always attach error handling to the `CompletableFuture` to detect and handle real transmission failures. + +[source,java] +---- +CompletableFuture> future = batchingSqsTemplate.sendAsync(queueName, message); + +future.whenComplete((result, ex) -> { + if (ex != null) { + // This is where you handle the actual transmission error + log.error("Failed to send message to queue {}: {}", queueName, ex.getMessage()); + } else { + log.info("Message acknowledged for batch sending with ID: {}", result.messageId()); + } +}); +---- + +WARNING: **Not Recommended for `@SqsListener`**. While technically compatible, using this batching client with `@SqsListener` for receiving messages is **not recommended**. The `@SqsListener` infrastructure already performs efficient batch receiving and has a complex acknowledgment lifecycle. Adding another layer of asynchronous batching provides limited performance benefits while significantly increasing complexity. For listeners, it's best to rely on the default `SqsAsyncClient`. + +IMPORTANT: **Bean Injection Safety**. By using a named bean and `@Qualifier` as shown in the configuration examples, you ensure the batching client is only used where intended. This prevents it from accidentally being injected into `@SqsListener` infrastructure, which should use the default `SqsAsyncClient`. + +IMPORTANT: **AWS SDK Batching Bypass**. The `SqsAsyncBatchManager` will bypass batching for `receiveMessage` calls if certain parameters like `messageAttributeNames` are set on a per-request basis. To ensure batching works effectively, these should be configured globally on the `SqsAsyncBatchManager` builder, not on individual `receiveMessage` calls. See the `BatchingSqsClientAdapter` Javadoc for more details. + ==== Container Options Each `MessageListenerContainer` can have a different set of options. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java new file mode 100644 index 000000000..165381246 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java @@ -0,0 +1,216 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.operations; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; +import software.amazon.awssdk.services.sqs.model.*; + +/** + * An {@link SqsAsyncClient} adapter that provides automatic batching capabilities using AWS SDK's + * {@link SqsAsyncBatchManager}. + * + *

+ * This adapter automatically batches SQS operations to improve performance and reduce costs by combining multiple + * requests into fewer AWS API calls. All standard SQS operations are supported: send message, receive message, delete + * message, and change message visibility. + * + *

+ * Important - False Positives Warning: This adapter processes requests asynchronously through + * batching. Method calls may return successfully before the actual request is sent to AWS SQS. This can result in false + * positives where the operation appears to succeed locally but fails during the actual transmission to AWS. + * Applications should: + *

    + *
  • Always handle the returned {@link CompletableFuture} to detect actual transmission errors
  • + *
  • Implement appropriate error handling and monitoring
  • + *
  • Consider retry mechanisms for critical operations
  • + *
+ * + *

+ * Batch Optimization: The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses + * batching for {@code receiveMessage} calls that include per-request configurations for certain parameters. + * To ensure batching is not bypassed, it is recommended to configure these settings globally on the + * {@code SqsAsyncBatchManager} builder instead of on each {@code ReceiveMessageRequest}. + * The parameters that trigger this bypass are: + *

    + *
  • {@code messageAttributeNames}
  • + *
  • {@code messageSystemAttributeNames}
  • + *
  • {@code messageSystemAttributeNamesWithStrings}
  • + *
  • {@code overrideConfiguration}
  • + *
+ *

+ * By configuring these globally on the manager, you ensure consistent batching performance. If you require + * per-request attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be + * more appropriate. + * @author Heechul Kang + * @since 3.2 + * @see SqsAsyncBatchManager + * @see SqsAsyncClient + */ +public class BatchingSqsClientAdapter implements SqsAsyncClient { + private final SqsAsyncBatchManager batchManager; + + /** + * Creates a new {@code BatchingSqsClientAdapter} with the specified batch manager. + * + * @param batchManager the {@link SqsAsyncBatchManager} to use for batching operations + * @throws IllegalArgumentException if batchManager is null + */ + public BatchingSqsClientAdapter(SqsAsyncBatchManager batchManager) { + Assert.notNull(batchManager, "batchManager cannot be null"); + this.batchManager = batchManager; + } + + @Override + public String serviceName() { + return SqsAsyncClient.SERVICE_NAME; + } + + /** + * Closes the underlying batch manager and releases associated resources. + * + *

+ * This method should be called when the adapter is no longer needed to ensure proper cleanup of threads and + * connections. + */ + @Override + public void close() { + batchManager.close(); + } + + /** + * Sends a message to the specified SQS queue using automatic batching. + * + *

+ * Important: This method returns immediately, but the actual sending is performed asynchronously. + * Handle the returned {@link CompletableFuture} to detect transmission errors. + * + * @param sendMessageRequest the request containing queue URL and message details + * @return a {@link CompletableFuture} that completes with the send result + */ + @Override + public CompletableFuture sendMessage(SendMessageRequest sendMessageRequest) { + return batchManager.sendMessage(sendMessageRequest); + } + + /** + * Sends a message to the specified SQS queue using automatic batching. + * + *

+ * Important: This method returns immediately, but the actual sending is performed asynchronously. + * Handle the returned {@link CompletableFuture} to detect transmission errors. + * + * @param sendMessageRequest a consumer to configure the send message request + * @return a {@link CompletableFuture} that completes with the send result + */ + @Override + public CompletableFuture sendMessage(Consumer sendMessageRequest) { + return batchManager.sendMessage(sendMessageRequest); + } + + /** + * Receives messages from the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple receive requests to optimize AWS API usage. + * + * @param receiveMessageRequest the request containing queue URL and receive options + * @return a {@link CompletableFuture} that completes with the received messages + */ + @Override + public CompletableFuture receiveMessage(ReceiveMessageRequest receiveMessageRequest) { + return batchManager.receiveMessage(receiveMessageRequest); + } + + /** + * Receives messages from the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple receive requests to optimize AWS API usage. + * + * @param receiveMessageRequest a consumer to configure the receive message request + * @return a {@link CompletableFuture} that completes with the received messages + */ + @Override + public CompletableFuture receiveMessage( + Consumer receiveMessageRequest) { + return batchManager.receiveMessage(receiveMessageRequest); + } + + /** + * Deletes a message from the specified SQS queue using automatic batching. + * + *

+ * Important: The actual deletion may be delayed due to batching. Handle the returned + * {@link CompletableFuture} to confirm successful deletion. + * + * @param deleteMessageRequest the request containing queue URL and receipt handle + * @return a {@link CompletableFuture} that completes with the deletion result + */ + @Override + public CompletableFuture deleteMessage(DeleteMessageRequest deleteMessageRequest) { + return batchManager.deleteMessage(deleteMessageRequest); + } + + /** + * Deletes a message from the specified SQS queue using automatic batching. + * + *

+ * Important: The actual deletion may be delayed due to batching. Handle the returned + * {@link CompletableFuture} to confirm successful deletion. + * + * @param deleteMessageRequest a consumer to configure the delete message request + * @return a {@link CompletableFuture} that completes with the deletion result + */ + @Override + public CompletableFuture deleteMessage( + Consumer deleteMessageRequest) { + return batchManager.deleteMessage(deleteMessageRequest); + } + + /** + * Changes the visibility timeout of a message in the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple visibility change requests to optimize AWS API usage. + * + * @param changeMessageVisibilityRequest the request containing queue URL, receipt handle, and new timeout + * @return a {@link CompletableFuture} that completes with the visibility change result + */ + @Override + public CompletableFuture changeMessageVisibility( + ChangeMessageVisibilityRequest changeMessageVisibilityRequest) { + return batchManager.changeMessageVisibility(changeMessageVisibilityRequest); + } + + /** + * Changes the visibility timeout of a message in the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple visibility change requests to optimize AWS API usage. + * + * @param changeMessageVisibilityRequest a consumer to configure the change visibility request + * @return a {@link CompletableFuture} that completes with the visibility change result + */ + @Override + public CompletableFuture changeMessageVisibility( + Consumer changeMessageVisibilityRequest) { + return batchManager.changeMessageVisibility(changeMessageVisibilityRequest); + } +} diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java new file mode 100644 index 000000000..031cd74a8 --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java @@ -0,0 +1,300 @@ +/* + * Copyright 2013-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.sqs.operations.BatchingSqsClientAdapter; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; +import software.amazon.awssdk.services.sqs.model.*; + +/** + * Integration tests for the Sqs Batching Client Adapter. + * + * @author Heechul Kang + */ +@SpringBootTest +public class BatchingSqsClientAdapterIntegrationTests extends BaseSqsIntegrationTest { + + private static final String BASE_QUEUE_NAME = "batching-test-queue"; + + @Autowired + private SqsAsyncClient asyncClient; + + @Test + void shouldReturnCorrectServiceName() { + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String serviceName = adapter.serviceName(); + assertThat(serviceName).isEqualTo(SqsAsyncClient.SERVICE_NAME); + } + } + + @Test + void shouldSendMessageThroughBatchManager() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for batching"; + SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody) + .build(); + + SendMessageResponse response = adapter.sendMessage(request).join(); + + assertThat(response.messageId()).isNotNull(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(received.messages()).hasSize(1); + assertThat(received.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldSendMessageWithConsumer() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message with consumer"; + + SendMessageResponse response = adapter + .sendMessage(builder -> builder.queueUrl(queueName).messageBody(messageBody)).join(); + + assertThat(response.messageId()).isNotNull(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(received.messages()).hasSize(1); + assertThat(received.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldReceiveMessageThroughBatchManager() throws InterruptedException { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for receiving"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + Thread.sleep(200); + + ReceiveMessageResponse response = adapter + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(response.messages()).hasSize(1); + assertThat(response.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldReceiveMessageWithConsumer() throws InterruptedException { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for receiving with consumer"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + Thread.sleep(200); + + ReceiveMessageResponse response = adapter + .receiveMessage(builder -> builder.queueUrl(queueName).maxNumberOfMessages(1)).join(); + + assertThat(response.messages()).hasSize(1); + assertThat(response.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldDeleteMessageThroughBatchManager() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for deletion"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(received.messages()).hasSize(1); + String receiptHandle = received.messages().get(0).receiptHandle(); + + DeleteMessageResponse deleteResponse = adapter + .deleteMessage( + DeleteMessageRequest.builder().queueUrl(queueName).receiptHandle(receiptHandle).build()) + .join(); + + assertThat(deleteResponse).isNotNull(); + + ReceiveMessageResponse afterDelete = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).waitTimeSeconds(1).build()).join(); + + assertThat(afterDelete.messages()).isEmpty(); + } + } + + @Test + void shouldDeleteMessageWithConsumer() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for deletion with consumer"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + String receiptHandle = received.messages().get(0).receiptHandle(); + + DeleteMessageResponse deleteResponse = adapter + .deleteMessage(builder -> builder.queueUrl(queueName).receiptHandle(receiptHandle)).join(); + + assertThat(deleteResponse).isNotNull(); + } + } + + @Test + void shouldChangeMessageVisibilityThroughBatchManager() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for visibility change"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).build()).join(); + + String receiptHandle = received.messages().get(0).receiptHandle(); + + ChangeMessageVisibilityResponse response = adapter.changeMessageVisibility(ChangeMessageVisibilityRequest + .builder().queueUrl(queueName).receiptHandle(receiptHandle).visibilityTimeout(30).build()).join(); + + assertThat(response).isNotNull(); + } + } + + @Test + void shouldChangeMessageVisibilityWithConsumer() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for visibility change with consumer"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).build()).join(); + + String receiptHandle = received.messages().get(0).receiptHandle(); + + ChangeMessageVisibilityResponse response = adapter + .changeMessageVisibility( + builder -> builder.queueUrl(queueName).receiptHandle(receiptHandle).visibilityTimeout(30)) + .join(); + + assertThat(response).isNotNull(); + } + } + + @Test + void shouldHandleBatchingEfficiently() throws InterruptedException { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + int messageCount = 5; + String messageBodyPrefix = "Batch test message "; + + CompletableFuture[] futures = new CompletableFuture[messageCount]; + + for (int i = 0; i < messageCount; i++) { + futures[i] = adapter.sendMessage( + SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBodyPrefix + i).build()); + } + + CompletableFuture.allOf(futures).join(); + + for (CompletableFuture future : futures) { + assertThat(future.join().messageId()).isNotNull(); + } + + Thread.sleep(200); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(10).build()) + .join(); + + assertThat(received.messages()).hasSize(messageCount); + } + } + + private String createUniqueQueueName() { + return BASE_QUEUE_NAME + "-" + UUID.randomUUID().toString().substring(0, 8); + } + + private BatchingSqsClientAdapter createBatchingAdapter() { + SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder().client(this.asyncClient) + .scheduledExecutor(Executors.newScheduledThreadPool(2)) + .overrideConfiguration(builder -> builder.maxBatchSize(10).sendRequestFrequency(Duration.ofMillis(100))) + .build(); + + return new BatchingSqsClientAdapter(batchManager); + } + + @Configuration + static class SQSConfiguration { + + @Bean + SqsAsyncClient client() { + return createAsyncClient(); + } + } +} \ No newline at end of file diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapterTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapterTests.java new file mode 100644 index 000000000..9acc4fa89 --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapterTests.java @@ -0,0 +1,284 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.operations; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; +import software.amazon.awssdk.services.sqs.model.*; + +/** + * @author Heechul Kang + */ +@ExtendWith(MockitoExtension.class) +class BatchingSqsClientAdapterTests { + SqsAsyncBatchManager mockBatchManager; + + BatchingSqsClientAdapter mockAdapter; + + @BeforeEach + void beforeEach() { + mockBatchManager = mock(SqsAsyncBatchManager.class); + mockAdapter = new BatchingSqsClientAdapter(mockBatchManager); + } + + @Test + void shouldThrowExceptionWhenBatchManagerIsNull() { + assertThatThrownBy(() -> new BatchingSqsClientAdapter(null)).isInstanceOf(IllegalArgumentException.class) + .hasMessage("batchManager cannot be null"); + } + + @Test + void shouldReturnCorrectServiceName() { + String serviceName = mockAdapter.serviceName(); + assertThat(serviceName).isEqualTo(SqsAsyncClient.SERVICE_NAME); + } + + @Test + void shouldDelegateBatchManagerClose() { + mockAdapter.close(); + then(mockBatchManager).should().close(); + } + + @Test + void shouldDelegateSendMessageWithRequest() { + String queueUrl = "test-queue"; + String messageBody = "test-message"; + + SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build(); + SendMessageResponse expectedResponse = SendMessageResponse.builder().messageId(UUID.randomUUID().toString()) + .build(); + given(mockBatchManager.sendMessage(request)).willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.sendMessage(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().sendMessage(request); + } + + @Test + void shouldDelegateSendMessageWithConsumer() { + String queueUrl = "test-queue"; + String messageBody = "test-message"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .messageBody(messageBody); + SendMessageResponse expectedResponse = SendMessageResponse.builder().messageId(UUID.randomUUID().toString()) + .build(); + given(mockBatchManager.sendMessage(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.sendMessage(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); + then(mockBatchManager).should().sendMessage(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldDelegateReceiveMessageWithRequest() { + String queueUrl = "test-queue"; + String body = "test-body"; + + ReceiveMessageRequest request = ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(10) + .build(); + Message message = Message.builder().messageId(UUID.randomUUID().toString()).body(body).build(); + ReceiveMessageResponse expectedResponse = ReceiveMessageResponse.builder().messages(message).build(); + given(mockBatchManager.receiveMessage(request)).willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.receiveMessage(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().receiveMessage(request); + } + + @Test + void shouldDelegateReceiveMessageWithConsumer() { + String queueUrl = "test-queue"; + String body = "test-body"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .maxNumberOfMessages(10); + Message message = Message.builder().messageId(UUID.randomUUID().toString()).body(body).build(); + ReceiveMessageResponse expectedResponse = ReceiveMessageResponse.builder().messages(message).build(); + given(mockBatchManager.receiveMessage(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.receiveMessage(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); + then(mockBatchManager).should().receiveMessage(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldDelegateDeleteMessageWithRequest() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + DeleteMessageRequest request = DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle) + .build(); + DeleteMessageResponse expectedResponse = DeleteMessageResponse.builder().build(); + given(mockBatchManager.deleteMessage(request)).willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.deleteMessage(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().deleteMessage(request); + } + + @Test + void shouldDelegateDeleteMessageWithConsumer() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .receiptHandle(receiptHandle); + DeleteMessageResponse expectedResponse = DeleteMessageResponse.builder().build(); + given(mockBatchManager.deleteMessage(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.deleteMessage(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); + then(mockBatchManager).should().deleteMessage(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldDelegateChangeMessageVisibilityWithRequest() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + ChangeMessageVisibilityRequest request = ChangeMessageVisibilityRequest.builder().queueUrl(queueUrl) + .receiptHandle(receiptHandle).visibilityTimeout(30).build(); + ChangeMessageVisibilityResponse expectedResponse = ChangeMessageVisibilityResponse.builder().build(); + given(mockBatchManager.changeMessageVisibility(request)) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.changeMessageVisibility(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().changeMessageVisibility(request); + } + + @Test + void shouldDelegateChangeMessageVisibilityWithConsumer() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .receiptHandle(receiptHandle).visibilityTimeout(30); + ChangeMessageVisibilityResponse expectedResponse = ChangeMessageVisibilityResponse.builder().build(); + given(mockBatchManager.changeMessageVisibility(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter + .changeMessageVisibility(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor + .forClass(Consumer.class); + then(mockBatchManager).should().changeMessageVisibility(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldHandleExceptionalCompletionInSendMessage() { + String queueUrl = "test-queue"; + String body = "test-message"; + + SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueUrl).messageBody(body).build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.sendMessage(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.sendMessage(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().sendMessage(request); + } + + @Test + void shouldHandleExceptionalCompletionInReceiveMessage() { + String queueUrl = "test-queue"; + + ReceiveMessageRequest request = ReceiveMessageRequest.builder().queueUrl(queueUrl).build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.receiveMessage(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.receiveMessage(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().receiveMessage(request); + } + + @Test + void shouldHandleExceptionalCompletionInDeleteMessage() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + DeleteMessageRequest request = DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle) + .build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.deleteMessage(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.deleteMessage(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().deleteMessage(request); + } + + @Test + void shouldHandleExceptionalCompletionInChangeMessageVisibility() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + ChangeMessageVisibilityRequest request = ChangeMessageVisibilityRequest.builder().queueUrl(queueUrl) + .receiptHandle(receiptHandle).visibilityTimeout(30).build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.changeMessageVisibility(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.changeMessageVisibility(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().changeMessageVisibility(request); + } +} \ No newline at end of file