Skip to content

Commit 2ba6fd5

Browse files
Refine BackPressureHandlerFactories API; clarify backpressure architecture direction (#1469)
Also includes: - Deprecation cleanup - Moves backpressure components to a separate package - Minor adjustments for consistency and maintainability
1 parent feb2c41 commit 2ba6fd5

25 files changed

+383
-1059
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,8 +1989,9 @@ NOTE: The `AUTO` setting should be balanced for most use cases, including high t
19891989

19901990
==== Advanced Backpressure management
19911991

1992+
Since 4.0.0, the default `BackPressureHandler` is assembled based on `ContainerOptions#BackPressureMode`, using a `CompositeBackPressureHandler` to combine multiple handlers that together replicate the behavior of the original `SemaphoreBackPressureHandler`.
19921993
Even though the default `BackPressureHandler` should be enough for most use cases, there are scenarios where more fine-grained control over message consumption is required not to overwhelm downstream systems or exceed resource limits.
1993-
In such a case, it is necessary to replace the default `BackPressureHandler` with a custom one that implements the `BackPressureHandler` interface.
1994+
In such a case, it is possible to replace the default `BackPressureHandler` with a custom one that implements the `BackPressureHandler` interface.
19941995
A `backPressureHandlerFactory` can be set in `SqsContainerOptions` to configure which `BackPressureHandler` to use.
19951996

19961997
===== What is a BackPressureHandler?
@@ -2017,28 +2018,26 @@ SqsMessageListenerContainer container = SqsMessageListenerContainer.builder()
20172018
===== Combining Multiple BackPressureHandlers
20182019

20192020
If necessary, multiple `BackPressureHandler` can be combined by using the `CompositeBackPressureHandler`.
2020-
Each of the `BackPressureHandler` (which we'll call delegates) are chained in the order they are provided.
2021+
Each of the `BackPressureHandler` delegates are chained in the order they are provided.
20212022
The first delegate will be requested the initial amount of permits and will return the number of permits it accepts to grant.
20222023
The second delegate will get that potentially reduced number of permits as a request and might in turn reduce it further.
2023-
The process continues until all delegates have been called or one of them returns 0, which will prevent the polling of messages from SQS.
2024+
The process continues until all delegates have been called or one of them returns 0, which will prevent the polling of messages from SQS for that round of permit requests.
20242025

20252026
For example, to implement the `BackPressureMode.ALWAYS_POLL_MAX_MESSAGES` strategy, we can combine a concurrency limiter, an adaptative throughput handler, and a "full batch only" handler.
2026-
The resulting `CompositeBackPressureHandler` looks like this:
2027+
The `BackPressureHandlerFactory` to create such a `CompositeBackPressureHandler` is assembled as follows:
20272028

20282029
```java
2029-
Duration maxIdleWaitTime = Duration.ofMillis(50L);
2030-
List<BackPressureHandler> backPressureHandlers = List.of(
2031-
BackPressureHandlerFactories.concurrencyLimiterBackPressureHandler(options),
2032-
BackPressureHandlerFactories.throughputBackPressureHandler(options),
2033-
BackPressureHandlerFactories.fullBatchBackPressureHandler(options)
2030+
List<BackPressureHandlerFactory> backPressureHandlerFactories = List.of(
2031+
BackPressureHandlerFactories.concurrencyLimiterBackPressureHandler(),
2032+
BackPressureHandlerFactories.throughputBackPressureHandler(),
2033+
BackPressureHandlerFactories.fullBatchBackPressureHandler()
20342034
);
2035-
CompositeBackPressureHandler backPressureHandler = BackPressureHandlerFactories.compositeBackPressureHandler(
2036-
options, maxIdleWaitTime, backPressureHandlers);
2035+
BackPressureHandlerFactory backPressureHandlerFactory = BackPressureHandlerFactories.compositeBackPressureHandler(backPressureHandlerFactories);
20372036
```
20382037

20392038
===== Built-in BackPressureHandlers
20402039

2041-
Spring Cloud AWS provides several built-in `BackPressureHandler` implementations:
2040+
Spring Cloud AWS provides the following built-in `BackPressureHandler` implementations:
20422041

20432042
- `ConcurrencyLimiterBackPressureHandler`: Limits the number of messages being processed concurrently.
20442043
- `ThroughputBackPressureHandler`: Switches between high and low throughput modes. In high throughput mode, multiple polls can be done in parallel.
@@ -2049,7 +2048,7 @@ This `FullBatchBackPressureHandler` must always be the last in the chain for it
20492048
The `BackPressureHandlerFactories` class provides factory methods to create these handlers easily.
20502049
These handlers can be used directly or combined with custom ones using the `CompositeBackPressureHandler` to fit the application's needs.
20512050

2052-
Additionally, the `BackPressureHandlerFactories#adaptativeThroughputBackPressureHandler` factory method combines the `ConcurrencyLimiterBackPressureHandler`, `ThroughputBackPressureHandler`, and `FullBatchBackPressureHandler` as per the desired `BackPressureMode`.
2051+
Additionally, the `BackPressureHandlerFactories#adaptiveThroughputBackPressureHandler` factory method combines the `ConcurrencyLimiterBackPressureHandler`, `ThroughputBackPressureHandler`, and `FullBatchBackPressureHandler` as per the desired `BackPressureMode`.
20532052

20542053
=== Blocking and Non-Blocking (Async) Components
20552054

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementOrdering;
1919
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
20+
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandlerFactories;
21+
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandlerFactory;
2022
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2123
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
2224
import io.micrometer.observation.ObservationConvention;
@@ -241,7 +243,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
241243

242244
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
243245

244-
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = BackPressureHandlerFactories::semaphoreBackPressureHandler;
246+
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = BackPressureHandlerFactories
247+
.adaptiveThroughputBackPressureHandler();
245248

246249
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
247250

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.awspring.cloud.sqs.MessageExecutionThread;
2121
import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
2222
import io.awspring.cloud.sqs.UnsupportedThreadFactoryException;
23+
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandler;
24+
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandlerFactory;
2325
import io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage;
2426
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingContextInterceptorExecutionStage;
2527
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingInterceptorExecutionStage;

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactories.java

Lines changed: 0 additions & 175 deletions
This file was deleted.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementOrdering;
1919
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
20+
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandler;
21+
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandlerFactory;
2022
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2123
import io.micrometer.observation.ObservationConvention;
2224
import io.micrometer.observation.ObservationRegistry;

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementOrdering;
1919
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
20+
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandlerFactory;
2021
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2122
import io.micrometer.observation.ObservationRegistry;
2223
import java.time.Duration;
@@ -152,7 +153,8 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
152153
/**
153154
* Sets the {@link BackPressureHandlerFactory} for this container. Default is
154155
* {@code AbstractContainerOptions.DEFAULT_BACKPRESSURE_FACTORY} which results in a default
155-
* {@link SemaphoreBackPressureHandler} to be instantiated.
156+
* {@link io.awspring.cloud.sqs.listener.backpressure.CompositeBackPressureHandler} to be instantiated,
157+
* with behavior analogous to the original {@code SemaphoreBackPressureHandler}.
156158
*
157159
* @param backPressureHandlerFactory the BackPressureHandler supplier.
158160
* @return this instance.

0 commit comments

Comments
 (0)