Skip to content

Commit 7ad4834

Browse files
authored
Introduce CompositeBackPressureHandler and related backpressure improvements
1 parent 6bdfae9 commit 7ad4834

24 files changed

+2615
-141
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,6 @@ NOTE: The same factory can be used to create both `single message` and `batch` c
806806

807807
IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods.
808808

809-
810809
==== Container Options
811810

812811
Each `MessageListenerContainer` can have a different set of options.
@@ -1977,6 +1976,7 @@ If after the 5 seconds for `maxDelayBetweenPolls` 6 messages have been processed
19771976
If the queue is depleted and a poll returns no messages, it'll enter `low throughput` mode again and perform only one poll at a time.
19781977

19791978
==== Configuring BackPressureMode
1979+
The default `BackPressureHandler` can be configured to optimize the polling behavior based on the application's throughput requirements.
19801980
The following `BackPressureMode` values can be set in `SqsContainerOptions` to configure polling behavior:
19811981

19821982
* `AUTO` - The default mode, as described in the previous section.
@@ -1987,6 +1987,70 @@ Useful for really high throughput scenarios where the risk of making parallel po
19871987

19881988
NOTE: The `AUTO` setting should be balanced for most use cases, including high throughput ones.
19891989

1990+
==== Advanced Backpressure management
1991+
1992+
Even though the default `BackPressureHandler` should be enough for most use cases, there are scenarios where more fine-grained control over message consumption is required not to overwhelm downstream systems or exceed resource limits.
1993+
In such a case, it is necessary to replace the default `BackPressureHandler` with a custom one that implements the `BackPressureHandler` interface.
1994+
A `backPressureHandlerFactory` can be set in `SqsContainerOptions` to configure which `BackPressureHandler` to use.
1995+
1996+
===== What is a BackPressureHandler?
1997+
1998+
A `BackPressureHandler` is an interface that determines whether the container should apply backpressure (i.e., slow down or pause polling) based on the current state of the system.
1999+
It is invoked before each poll to SQS and can prevent polling or poll for fewer messages if certain conditions are met, e.g., too many inflight messages, custom resource constraints, etc.
2000+
2001+
===== Creating a custom BackPressureHandler
2002+
2003+
To implement a custom backpressure logic, the `BackPressureHandler` interface must be implemented.
2004+
2005+
A `SqsMessageListenerContainer` can be configured to use the desired `BackPressureHandler` by setting the `backPressureHandlerFactory` on the `ContainerOptions`.
2006+
2007+
```java
2008+
SqsMessageListenerContainer container = SqsMessageListenerContainer.builder()
2009+
.configure(options -> options
2010+
.backPressureHandlerFactory(containerOptions -> new CustomBackPressureHandler())
2011+
// ... other options
2012+
)
2013+
// ... other container settings ...
2014+
.build();
2015+
```
2016+
2017+
===== Combining Multiple BackPressureHandlers
2018+
2019+
If necessary, multiple `BackPressureHandler` can be combined by using the `CompositeBackPressureHandler`.
2020+
Each of the `BackPressureHandler` (which we'll call delegates) are chained in the order they are provided.
2021+
The first delegate will be requested the initial amount of permits and will return the number of permits it accepts to grant.
2022+
The second delegate will get that potentially reduced number of permits as a request and might in turn reduce it further.
2023+
The process continues until all delegates have been called or one of them returns 0, which will prevent the polling of messages from SQS.
2024+
2025+
For example, to implement the `BackPressureMode.ALWAYS_POLL_MAX_MESSAGES` strategy, we can combine a concurrency limiter, an adaptative throughput handler, and a "full batch only" handler.
2026+
The resulting `CompositeBackPressureHandler` looks like this:
2027+
2028+
```java
2029+
Duration maxIdleWaitTime = Duration.ofMillis(50L);
2030+
List<BackPressureHandler> backPressureHandlers = List.of(
2031+
BackPressureHandlerFactories.concurrencyLimiterBackPressureHandler(options),
2032+
BackPressureHandlerFactories.throughputBackPressureHandler(options),
2033+
BackPressureHandlerFactories.fullBatchBackPressureHandler(options)
2034+
);
2035+
CompositeBackPressureHandler backPressureHandler = BackPressureHandlerFactories.compositeBackPressureHandler(
2036+
options, maxIdleWaitTime, backPressureHandlers);
2037+
```
2038+
2039+
===== Built-in BackPressureHandlers
2040+
2041+
Spring Cloud AWS provides several built-in `BackPressureHandler` implementations:
2042+
2043+
- `ConcurrencyLimiterBackPressureHandler`: Limits the number of messages being processed concurrently.
2044+
- `ThroughputBackPressureHandler`: Switches between high and low throughput modes. In high throughput mode, multiple polls can be done in parallel.
2045+
In low throughput mode, only one poll is done at a time.
2046+
- `FullBatchBackPressureHandler`: Ensure polls will always be done with a full batch of messages, meaning that the number of messages polled will always be equal to `maxMessagesPerPoll` if possible or `0` if not possible.
2047+
This `FullBatchBackPressureHandler` must always be the last in the chain for it to work properly.
2048+
2049+
The `BackPressureHandlerFactories` class provides factory methods to create these handlers easily.
2050+
These handlers can be used directly or combined with custom ones using the `CompositeBackPressureHandler` to fit the application's needs.
2051+
2052+
Additionally, the `BackPressureHandlerFactories#adaptativeThroughputBackPressureHandler` factory method combines the `ConcurrencyLimiterBackPressureHandler`, `ThroughputBackPressureHandler`, and `FullBatchBackPressureHandler` as per the desired `BackPressureMode`.
2053+
19902054
=== Blocking and Non-Blocking (Async) Components
19912055

19922056
The SQS integration leverages the `CompletableFuture`-based async capabilities of `AWS SDK 2.0` to deliver a fully non-blocking infrastructure.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
* Base implementation for {@link ContainerOptions}.
3333
*
3434
* @author Tomaz Fernandes
35+
* @author Loïc Rouchon
3536
* @since 3.0
3637
*/
3738
public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>, B extends ContainerOptionsBuilder<B, O>>
@@ -55,6 +56,8 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
5556

5657
private final BackPressureMode backPressureMode;
5758

59+
private final BackPressureHandlerFactory backPressureHandlerFactory;
60+
5861
private final ListenerMode listenerMode;
5962

6063
private final MessagingMessageConverter<?> messageConverter;
@@ -90,6 +93,7 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
9093
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
9194
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
9295
this.backPressureMode = builder.backPressureMode;
96+
this.backPressureHandlerFactory = builder.backPressureHandlerFactory;
9397
this.listenerMode = builder.listenerMode;
9498
this.messageConverter = builder.messageConverter;
9599
this.acknowledgementMode = builder.acknowledgementMode;
@@ -162,6 +166,11 @@ public BackPressureMode getBackPressureMode() {
162166
return this.backPressureMode;
163167
}
164168

169+
@Override
170+
public BackPressureHandlerFactory getBackPressureHandlerFactory() {
171+
return this.backPressureHandlerFactory;
172+
}
173+
165174
@Override
166175
public ListenerMode getListenerMode() {
167176
return this.listenerMode;
@@ -232,6 +241,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
232241

233242
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
234243

244+
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = BackPressureHandlerFactories::semaphoreBackPressureHandler;
245+
235246
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
236247

237248
private static final MessagingMessageConverter<?> DEFAULT_MESSAGE_CONVERTER = new SqsMessagingMessageConverter();
@@ -254,6 +265,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
254265

255266
private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;
256267

268+
private BackPressureHandlerFactory backPressureHandlerFactory = DEFAULT_BACKPRESSURE_FACTORY;
269+
257270
private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;
258271

259272
private Duration acknowledgementShutdownTimeout = DEFAULT_ACKNOWLEDGEMENT_SHUTDOWN_TIMEOUT;
@@ -296,6 +309,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
296309
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
297310
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
298311
this.backPressureMode = options.backPressureMode;
312+
this.backPressureHandlerFactory = options.backPressureHandlerFactory;
299313
this.listenerMode = options.listenerMode;
300314
this.messageConverter = options.messageConverter;
301315
this.acknowledgementMode = options.acknowledgementMode;
@@ -390,6 +404,12 @@ public B backPressureMode(BackPressureMode backPressureMode) {
390404
return self();
391405
}
392406

407+
@Override
408+
public B backPressureHandlerFactory(BackPressureHandlerFactory backPressureHandlerFactory) {
409+
this.backPressureHandlerFactory = backPressureHandlerFactory;
410+
return self();
411+
}
412+
393413
@Override
394414
public B acknowledgementInterval(Duration acknowledgementInterval) {
395415
Assert.notNull(acknowledgementInterval, "acknowledgementInterval cannot be null");

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,9 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
230230
}
231231

232232
protected BackPressureHandler createBackPressureHandler() {
233-
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
234-
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
235-
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
236-
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
233+
O containerOptions = getContainerOptions();
234+
BackPressureHandlerFactory factory = containerOptions.getBackPressureHandlerFactory();
235+
return factory.createBackPressureHandler(containerOptions);
237236
}
238237

239238
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,55 @@
2424
* semaphore-based, rate limiter-based, a mix of both, or any other.
2525
*
2626
* @author Tomaz Fernandes
27+
* @author Loïc Rouchon
2728
* @since 3.0
2829
*/
2930
public interface BackPressureHandler {
3031

3132
/**
32-
* Request a number of permits. Each obtained permit allows the
33+
* Requests a number of permits. Each obtained permit allows the
3334
* {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message.
3435
* @param amount the amount of permits to request.
3536
* @return the amount of permits obtained.
3637
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
3738
*/
3839
int request(int amount) throws InterruptedException;
3940

41+
/**
42+
* Releases the specified amount of permits for processed messages. Each message that has been processed should
43+
* release one permit, whether processing was successful or not.
44+
* <p>
45+
* This method can be called in the following use cases:
46+
* <ul>
47+
* <li>{@link ReleaseReason#LIMITED}: all/some permits were not used because another BackPressureHandler has a lower
48+
* permits limit and the difference in permits needs to be returned.</li>
49+
* <li>{@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
50+
* from SQS. Permits need to be returned.</li>
51+
* <li>{@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
52+
* The unused ones need to be returned. The amount to be returned might be {@literal 0}, in which case it means all
53+
* the permits will be used as the same number of messages were fetched from SQS.</li>
54+
* <li>{@link ReleaseReason#PROCESSED}: a message processing finished, successfully or not.</li>
55+
* </ul>
56+
* @param amount the amount of permits to release.
57+
* @param reason the reason why the permits were released.
58+
*/
59+
default void release(int amount, ReleaseReason reason) {
60+
release(amount);
61+
}
62+
4063
/**
4164
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
4265
* processing was successful or not.
4366
* @param amount the amount of permits to release.
67+
*
68+
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
69+
* Implement {@link #release(int, ReleaseReason)} instead.
4470
*/
45-
void release(int amount);
71+
@Deprecated
72+
default void release(int amount) {
73+
// Do not implement this method. It is not called anymore outside of backward compatibility use cases.
74+
// Implement `#release(int amount, ReleaseReason reason)` instead.
75+
}
4676

4777
/**
4878
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
@@ -52,4 +82,24 @@ public interface BackPressureHandler {
5282
*/
5383
boolean drain(Duration timeout);
5484

85+
enum ReleaseReason {
86+
/**
87+
* All/Some permits were not used because another BackPressureHandler has a lower permits limit and the permits
88+
* difference need to be aligned across all handlers.
89+
*/
90+
LIMITED,
91+
/**
92+
* No messages were retrieved from SQS, so all permits need to be returned.
93+
*/
94+
NONE_FETCHED,
95+
/**
96+
* Some messages were fetched from SQS. Unused permits if any need to be returned.
97+
*/
98+
PARTIAL_FETCH,
99+
/**
100+
* The processing of one or more messages finished, successfully or not.
101+
*/
102+
PROCESSED;
103+
}
104+
55105
}

0 commit comments

Comments
 (0)