Skip to content

Commit b9a4f62

Browse files
committed
Introduce a BackPressureHandlerFactory for configuring SQS back pressure (#1251)
1 parent 4ea6d2c commit b9a4f62

9 files changed

+594
-131
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
2222
import java.time.Duration;
23-
import java.util.function.Supplier;
2423
import org.springframework.core.task.TaskExecutor;
2524
import org.springframework.lang.Nullable;
2625
import org.springframework.retry.backoff.BackOffPolicy;
@@ -54,7 +53,7 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
5453

5554
private final BackPressureMode backPressureMode;
5655

57-
private final Supplier<BackPressureHandler> backPressureHandlerSupplier;
56+
private final BackPressureHandlerFactory backPressureHandlerFactory;
5857

5958
private final ListenerMode listenerMode;
6059

@@ -87,7 +86,7 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
8786
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
8887
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
8988
this.backPressureMode = builder.backPressureMode;
90-
this.backPressureHandlerSupplier = builder.backPressureHandlerSupplier;
89+
this.backPressureHandlerFactory = builder.backPressureHandlerFactory;
9190
this.listenerMode = builder.listenerMode;
9291
this.messageConverter = builder.messageConverter;
9392
this.acknowledgementMode = builder.acknowledgementMode;
@@ -159,8 +158,8 @@ public BackPressureMode getBackPressureMode() {
159158
}
160159

161160
@Override
162-
public Supplier<BackPressureHandler> getBackPressureHandlerSupplier() {
163-
return this.backPressureHandlerSupplier;
161+
public BackPressureHandlerFactory getBackPressureHandlerFactory() {
162+
return this.backPressureHandlerFactory;
164163
}
165164

166165
@Override
@@ -223,7 +222,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
223222

224223
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
225224

226-
private static final Supplier<BackPressureHandler> DEFAULT_BACKPRESSURE_LIMITER = null;
225+
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = buildDefaultBackPressureHandlerFactory();
227226

228227
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
229228

@@ -245,7 +244,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
245244

246245
private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;
247246

248-
private Supplier<BackPressureHandler> backPressureHandlerSupplier = DEFAULT_BACKPRESSURE_LIMITER;
247+
private BackPressureHandlerFactory backPressureHandlerFactory = DEFAULT_BACKPRESSURE_FACTORY;
249248

250249
private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;
251250

@@ -285,7 +284,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
285284
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
286285
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
287286
this.backPressureMode = options.backPressureMode;
288-
this.backPressureHandlerSupplier = options.backPressureHandlerSupplier;
287+
this.backPressureHandlerFactory = options.backPressureHandlerFactory;
289288
this.listenerMode = options.listenerMode;
290289
this.messageConverter = options.messageConverter;
291290
this.acknowledgementMode = options.acknowledgementMode;
@@ -379,8 +378,8 @@ public B backPressureMode(BackPressureMode backPressureMode) {
379378
}
380379

381380
@Override
382-
public B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier) {
383-
this.backPressureHandlerSupplier = backPressureHandlerSupplier;
381+
public B backPressureHandlerFactory(BackPressureHandlerFactory backPressureHandlerFactory) {
382+
this.backPressureHandlerFactory = backPressureHandlerFactory;
384383
return self();
385384
}
386385

@@ -429,6 +428,12 @@ private static BackOffPolicy buildDefaultBackOffPolicy() {
429428
return BackOffPolicyBuilder.newBuilder().multiplier(DEFAULT_BACK_OFF_MULTIPLIER)
430429
.delay(DEFAULT_BACK_OFF_DELAY).maxDelay(DEFAULT_BACK_OFF_MAX_DELAY).build();
431430
}
431+
432+
private static BackPressureHandlerFactory buildDefaultBackPressureHandlerFactory() {
433+
return options -> SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
434+
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
435+
.throughputConfiguration(options.getBackPressureMode()).build();
436+
}
432437
}
433438

434439
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,13 +226,8 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
226226

227227
protected BackPressureHandler createBackPressureHandler() {
228228
O containerOptions = getContainerOptions();
229-
if (containerOptions.getBackPressureHandlerSupplier() != null) {
230-
return containerOptions.getBackPressureHandlerSupplier().get();
231-
}
232-
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
233-
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
234-
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
235-
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
229+
BackPressureHandlerFactory factory = containerOptions.getBackPressureHandlerFactory();
230+
return factory.createBackPressureHandler(containerOptions);
236231
}
237232

238233
protected TaskExecutor createSourcesTaskExecutor() {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
/**
19+
* A factory for creating {@link BackPressureHandler} for managing queue consumption backpressure. Implementations can
20+
* configure each the {@link BackPressureHandler} according to its strategies, using the provided
21+
* {@link ContainerOptions}.
22+
* <p>
23+
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
24+
* <ul>
25+
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be
26+
* processed concurrently by the application.</li>
27+
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order to
28+
* reduce SQS pull costs when few messages are coming in.</li>
29+
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and ensures
30+
* they cooperate.</li>
31+
* </ul>
32+
* <p>
33+
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own
34+
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the
35+
* {@link CompositeBackPressureHandler}.
36+
*
37+
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3>
38+
*
39+
* <pre>{@code
40+
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
41+
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
42+
* .batchSize(containerOptions.getMaxMessagesPerPoll())
43+
* .totalPermits(containerOptions.getMaxConcurrentMessages())
44+
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
45+
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
46+
* .build()
47+
* }}</pre>
48+
*
49+
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3>
50+
*
51+
* <pre>{@code
52+
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
53+
* int batchSize = containerOptions.getMaxMessagesPerPoll();
54+
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
55+
* .batchSize(batchSize)
56+
* .totalPermits(containerOptions.getMaxConcurrentMessages())
57+
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
58+
* .throughputConfiguration(BackPressureMode.AUTO)
59+
* .build()
60+
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
61+
* .batchSize(batchSize)
62+
* .build();
63+
* return new CompositeBackPressureHandler(List.of(
64+
* concurrencyLimiterBlockingBackPressureHandler,
65+
* throughputBackPressureHandler
66+
* ),
67+
* batchSize,
68+
* standbyLimitPollingInterval
69+
* );
70+
* }}</pre>
71+
*/
72+
public interface BackPressureHandlerFactory {
73+
74+
/**
75+
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions}.
76+
* <p>
77+
* <strong>NOTE:</strong> <em>it is important for the factory to always return a new instance as otherwise it might
78+
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple
79+
* containers which is very likely not the desired behavior.</em>
80+
*
81+
* @param containerOptions the container options to use for creating the BackPressureHandler.
82+
* @return the created BackPressureHandler
83+
*/
84+
BackPressureHandler createBackPressureHandler(ContainerOptions<?, ?> containerOptions);
85+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import java.time.Duration;
2222
import java.util.Collection;
23-
import java.util.function.Supplier;
2423
import org.springframework.core.task.TaskExecutor;
2524
import org.springframework.lang.Nullable;
2625
import org.springframework.retry.backoff.BackOffPolicy;
@@ -129,10 +128,10 @@ default BackOffPolicy getPollBackOffPolicy() {
129128
BackPressureMode getBackPressureMode();
130129

131130
/**
132-
* Return the a {@link Supplier} to create a {@link BackPressureHandler} for this container.
133-
* @return the BackPressureHandler supplier.
131+
* Return the a {@link BackPressureHandlerFactory} to create a {@link BackPressureHandler} for this container.
132+
* @return the BackPressureHandlerFactory.
134133
*/
135-
Supplier<BackPressureHandler> getBackPressureHandlerSupplier();
134+
BackPressureHandlerFactory getBackPressureHandlerFactory();
136135

137136
/**
138137
* Return the {@link ListenerMode} mode for this container.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

Lines changed: 5 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import java.time.Duration;
22-
import java.util.function.Supplier;
2322
import org.springframework.core.task.TaskExecutor;
2423
import org.springframework.retry.backoff.BackOffPolicy;
2524

@@ -147,66 +146,14 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
147146
B backPressureMode(BackPressureMode backPressureMode);
148147

149148
/**
150-
* Sets the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null} which
151-
* results in a default {@link SemaphoreBackPressureHandler} to be instantiated. In case a supplier is provided, the
152-
* {@link BackPressureHandler} will be instantiated by the supplier.
153-
* <p>
154-
* <strong>NOTE:</strong> <em>it is important for the supplier to always return a new instance as otherwise it might
155-
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple
156-
* containers which is very likely not the desired behavior.</em>
157-
* <p>
158-
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
159-
* <ul>
160-
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be
161-
* processed concurrently by the application.</li>
162-
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order
163-
* to reduce SQS pull costs when few messages are coming in.</li>
164-
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and
165-
* ensures they cooperate.</li>
166-
* </ul>
167-
* <p>
168-
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own
169-
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the
170-
* {@link CompositeBackPressureHandler}.
149+
* Sets the {@link BackPressureHandlerFactory} for this container. Default is
150+
* {@code AbstractContainerOptions.DEFAULT_BACKPRESSURE_FACTORY} which results in a default
151+
* {@link SemaphoreBackPressureHandler} to be instantiated.
171152
*
172-
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3>
173-
*
174-
* <pre>{@code
175-
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
176-
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
177-
* .batchSize(batchSize)
178-
* .totalPermits(maxConcurrentMessages)
179-
* .acquireTimeout(acquireTimeout)
180-
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
181-
* .build()
182-
* }}</pre>
183-
*
184-
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3>
185-
*
186-
* <pre>{@code
187-
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
188-
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
189-
* .batchSize(batchSize)
190-
* .totalPermits(maxConcurrentMessages)
191-
* .acquireTimeout(acquireTimeout)
192-
* .throughputConfiguration(BackPressureMode.AUTO)
193-
* .build()
194-
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
195-
* .batchSize(batchSize)
196-
* .build();
197-
* return new CompositeBackPressureHandler(List.of(
198-
* concurrencyLimiterBlockingBackPressureHandler,
199-
* throughputBackPressureHandler
200-
* ),
201-
* batchSize,
202-
* standbyLimitPollingInterval
203-
* );
204-
* }}</pre>
205-
*
206-
* @param backPressureHandlerSupplier the BackPressureHandler supplier.
153+
* @param backPressureHandlerFactory the BackPressureHandler supplier.
207154
* @return this instance.
208155
*/
209-
B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier);
156+
B backPressureHandlerFactory(BackPressureHandlerFactory backPressureHandlerFactory);
210157

211158
/**
212159
* Set the maximum interval between acknowledgements for batch acknowledgements. The default depends on the specific

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ThroughputBackPressureHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.springframework.util.Assert;
2626

2727
/**
28-
* {@link BackPressureHandler} implementation that uses a switches between high and low throughput modes.
28+
* {@link BackPressureHandler} implementation that uses a switch between high and low throughput modes.
2929
* <p>
3030
* The initial throughput mode is low, which means, only one batch at a time can be requested. If some messages are
31-
* fetched, then the throughput mode is switched to high, which means, the multiple batches can be requested (i.e. there
32-
* is no need to wait for the previous batch's processing to complete before requesting a new one). If no messages are
33-
* returned fetched by a poll, the throughput mode is switched back to low.
31+
* fetched, then the throughput mode is switched to high, which means that multiple batches can be requested (i.e.,
32+
* there is no need to wait for the previous batch's processing to complete before requesting a new one). If no messages
33+
* are returned fetched by a poll, the throughput mode is switched back to low.
3434
* <p>
3535
* This {@link BackPressureHandler} is designed to be used in combination with another {@link BackPressureHandler} like
3636
* the {@link ConcurrencyLimiterBlockingBackPressureHandler} that will handle the maximum concurrency level within the

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsBackPressureIntegrationTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ void staticBackPressureLimitShouldCapQueueProcessingCapacity(int staticLimit, in
132132
.queueNames(
133133
queueName)
134134
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
135-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
135+
.backPressureHandlerFactory(containerOptions -> new CompositeBackPressureHandler(
136136
List.of(limiter,
137137
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(5)
138138
.totalPermits(5).acquireTimeout(Duration.ofSeconds(1L))
@@ -172,7 +172,7 @@ void zeroBackPressureLimitShouldStopQueueProcessing() throws Exception {
172172
.queueNames(
173173
queueName)
174174
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
175-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
175+
.backPressureHandlerFactory(containerOptions -> new CompositeBackPressureHandler(
176176
List.of(limiter,
177177
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(5)
178178
.totalPermits(5).acquireTimeout(Duration.ofSeconds(1L))
@@ -218,7 +218,7 @@ void changeInBackPressureLimitShouldAdaptQueueProcessingCapacity() throws Except
218218
.queueNames(
219219
queueName)
220220
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
221-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
221+
.backPressureHandlerFactory(containerOptions -> new CompositeBackPressureHandler(
222222
List.of(limiter,
223223
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(5)
224224
.totalPermits(5).acquireTimeout(Duration.ofSeconds(1L))
@@ -442,9 +442,8 @@ void unsynchronizedChangesInBackPressureLimitShouldAdaptQueueProcessingCapacity(
442442
EventsCsvWriter eventsCsvWriter = new EventsCsvWriter();
443443
var container = SqsMessageListenerContainer.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient())
444444
.queueNames(queueName)
445-
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
446-
.standbyLimitPollingInterval(Duration.ofMillis(1))
447-
.backPressureHandlerSupplier(() -> new StatisticsBphDecorator(new CompositeBackPressureHandler(
445+
.configure(options -> options.pollTimeout(Duration.ofSeconds(1)).backPressureHandlerFactory(
446+
containerOptions -> new StatisticsBphDecorator(new CompositeBackPressureHandler(
448447
List.of(limiter,
449448
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(10)
450449
.totalPermits(10).acquireTimeout(Duration.ofSeconds(1L))

0 commit comments

Comments
 (0)