Skip to content

Commit df7a9af

Browse files
committed
Introduce a BackPressureHandlerFactory for configuring SQS back pressure (#1251)
1 parent dbe37d9 commit df7a9af

9 files changed

+594
-131
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.micrometer.observation.ObservationConvention;
2323
import io.micrometer.observation.ObservationRegistry;
2424
import java.time.Duration;
25-
import java.util.function.Supplier;
2625
import org.springframework.core.task.TaskExecutor;
2726
import org.springframework.lang.Nullable;
2827
import org.springframework.retry.backoff.BackOffPolicy;
@@ -56,7 +55,7 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
5655

5756
private final BackPressureMode backPressureMode;
5857

59-
private final Supplier<BackPressureHandler> backPressureHandlerSupplier;
58+
private final BackPressureHandlerFactory backPressureHandlerFactory;
6059

6160
private final ListenerMode listenerMode;
6261

@@ -93,7 +92,7 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
9392
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
9493
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
9594
this.backPressureMode = builder.backPressureMode;
96-
this.backPressureHandlerSupplier = builder.backPressureHandlerSupplier;
95+
this.backPressureHandlerFactory = builder.backPressureHandlerFactory;
9796
this.listenerMode = builder.listenerMode;
9897
this.messageConverter = builder.messageConverter;
9998
this.acknowledgementMode = builder.acknowledgementMode;
@@ -167,8 +166,8 @@ public BackPressureMode getBackPressureMode() {
167166
}
168167

169168
@Override
170-
public Supplier<BackPressureHandler> getBackPressureHandlerSupplier() {
171-
return this.backPressureHandlerSupplier;
169+
public BackPressureHandlerFactory getBackPressureHandlerFactory() {
170+
return this.backPressureHandlerFactory;
172171
}
173172

174173
@Override
@@ -241,7 +240,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
241240

242241
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
243242

244-
private static final Supplier<BackPressureHandler> DEFAULT_BACKPRESSURE_LIMITER = null;
243+
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = buildDefaultBackPressureHandlerFactory();
245244

246245
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
247246

@@ -265,7 +264,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
265264

266265
private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;
267266

268-
private Supplier<BackPressureHandler> backPressureHandlerSupplier = DEFAULT_BACKPRESSURE_LIMITER;
267+
private BackPressureHandlerFactory backPressureHandlerFactory = DEFAULT_BACKPRESSURE_FACTORY;
269268

270269
private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;
271270

@@ -309,7 +308,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
309308
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
310309
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
311310
this.backPressureMode = options.backPressureMode;
312-
this.backPressureHandlerSupplier = options.backPressureHandlerSupplier;
311+
this.backPressureHandlerFactory = options.backPressureHandlerFactory;
313312
this.listenerMode = options.listenerMode;
314313
this.messageConverter = options.messageConverter;
315314
this.acknowledgementMode = options.acknowledgementMode;
@@ -405,8 +404,8 @@ public B backPressureMode(BackPressureMode backPressureMode) {
405404
}
406405

407406
@Override
408-
public B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier) {
409-
this.backPressureHandlerSupplier = backPressureHandlerSupplier;
407+
public B backPressureHandlerFactory(BackPressureHandlerFactory backPressureHandlerFactory) {
408+
this.backPressureHandlerFactory = backPressureHandlerFactory;
410409
return self();
411410
}
412411

@@ -468,6 +467,12 @@ private static BackOffPolicy buildDefaultBackOffPolicy() {
468467
return BackOffPolicyBuilder.newBuilder().multiplier(DEFAULT_BACK_OFF_MULTIPLIER)
469468
.delay(DEFAULT_BACK_OFF_DELAY).maxDelay(DEFAULT_BACK_OFF_MAX_DELAY).build();
470469
}
470+
471+
private static BackPressureHandlerFactory buildDefaultBackPressureHandlerFactory() {
472+
return options -> SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
473+
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
474+
.throughputConfiguration(options.getBackPressureMode()).build();
475+
}
471476
}
472477

473478
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,8 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
232232

233233
protected BackPressureHandler createBackPressureHandler() {
234234
O containerOptions = getContainerOptions();
235-
if (containerOptions.getBackPressureHandlerSupplier() != null) {
236-
return containerOptions.getBackPressureHandlerSupplier().get();
237-
}
238-
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
239-
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
240-
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
241-
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
235+
BackPressureHandlerFactory factory = containerOptions.getBackPressureHandlerFactory();
236+
return factory.createBackPressureHandler(containerOptions);
242237
}
243238

244239
protected TaskExecutor createSourcesTaskExecutor() {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
/**
19+
* A factory for creating {@link BackPressureHandler} for managing queue consumption backpressure. Implementations can
20+
* configure each the {@link BackPressureHandler} according to its strategies, using the provided
21+
* {@link ContainerOptions}.
22+
* <p>
23+
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
24+
* <ul>
25+
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be
26+
* processed concurrently by the application.</li>
27+
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order to
28+
* reduce SQS pull costs when few messages are coming in.</li>
29+
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and ensures
30+
* they cooperate.</li>
31+
* </ul>
32+
* <p>
33+
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own
34+
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the
35+
* {@link CompositeBackPressureHandler}.
36+
*
37+
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3>
38+
*
39+
* <pre>{@code
40+
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
41+
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
42+
* .batchSize(containerOptions.getMaxMessagesPerPoll())
43+
* .totalPermits(containerOptions.getMaxConcurrentMessages())
44+
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
45+
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
46+
* .build()
47+
* }}</pre>
48+
*
49+
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3>
50+
*
51+
* <pre>{@code
52+
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
53+
* int batchSize = containerOptions.getMaxMessagesPerPoll();
54+
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
55+
* .batchSize(batchSize)
56+
* .totalPermits(containerOptions.getMaxConcurrentMessages())
57+
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
58+
* .throughputConfiguration(BackPressureMode.AUTO)
59+
* .build()
60+
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
61+
* .batchSize(batchSize)
62+
* .build();
63+
* return new CompositeBackPressureHandler(List.of(
64+
* concurrencyLimiterBlockingBackPressureHandler,
65+
* throughputBackPressureHandler
66+
* ),
67+
* batchSize,
68+
* standbyLimitPollingInterval
69+
* );
70+
* }}</pre>
71+
*/
72+
public interface BackPressureHandlerFactory {
73+
74+
/**
75+
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions}.
76+
* <p>
77+
* <strong>NOTE:</strong> <em>it is important for the factory to always return a new instance as otherwise it might
78+
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple
79+
* containers which is very likely not the desired behavior.</em>
80+
*
81+
* @param containerOptions the container options to use for creating the BackPressureHandler.
82+
* @return the created BackPressureHandler
83+
*/
84+
BackPressureHandler createBackPressureHandler(ContainerOptions<?, ?> containerOptions);
85+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.micrometer.observation.ObservationRegistry;
2323
import java.time.Duration;
2424
import java.util.Collection;
25-
import java.util.function.Supplier;
2625
import org.springframework.core.task.TaskExecutor;
2726
import org.springframework.lang.Nullable;
2827
import org.springframework.retry.backoff.BackOffPolicy;
@@ -131,10 +130,10 @@ default BackOffPolicy getPollBackOffPolicy() {
131130
BackPressureMode getBackPressureMode();
132131

133132
/**
134-
* Return the a {@link Supplier} to create a {@link BackPressureHandler} for this container.
135-
* @return the BackPressureHandler supplier.
133+
* Return the a {@link BackPressureHandlerFactory} to create a {@link BackPressureHandler} for this container.
134+
* @return the BackPressureHandlerFactory.
136135
*/
137-
Supplier<BackPressureHandler> getBackPressureHandlerSupplier();
136+
BackPressureHandlerFactory getBackPressureHandlerFactory();
138137

139138
/**
140139
* Return the {@link ListenerMode} mode for this container.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

Lines changed: 5 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import io.micrometer.observation.ObservationRegistry;
2222
import java.time.Duration;
23-
import java.util.function.Supplier;
2423
import org.springframework.core.task.TaskExecutor;
2524
import org.springframework.retry.backoff.BackOffPolicy;
2625

@@ -148,66 +147,14 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
148147
B backPressureMode(BackPressureMode backPressureMode);
149148

150149
/**
151-
* Sets the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null} which
152-
* results in a default {@link SemaphoreBackPressureHandler} to be instantiated. In case a supplier is provided, the
153-
* {@link BackPressureHandler} will be instantiated by the supplier.
154-
* <p>
155-
* <strong>NOTE:</strong> <em>it is important for the supplier to always return a new instance as otherwise it might
156-
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple
157-
* containers which is very likely not the desired behavior.</em>
158-
* <p>
159-
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
160-
* <ul>
161-
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be
162-
* processed concurrently by the application.</li>
163-
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order
164-
* to reduce SQS pull costs when few messages are coming in.</li>
165-
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and
166-
* ensures they cooperate.</li>
167-
* </ul>
168-
* <p>
169-
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own
170-
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the
171-
* {@link CompositeBackPressureHandler}.
150+
* Sets the {@link BackPressureHandlerFactory} for this container. Default is
151+
* {@code AbstractContainerOptions.DEFAULT_BACKPRESSURE_FACTORY} which results in a default
152+
* {@link SemaphoreBackPressureHandler} to be instantiated.
172153
*
173-
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3>
174-
*
175-
* <pre>{@code
176-
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
177-
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
178-
* .batchSize(batchSize)
179-
* .totalPermits(maxConcurrentMessages)
180-
* .acquireTimeout(acquireTimeout)
181-
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
182-
* .build()
183-
* }}</pre>
184-
*
185-
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3>
186-
*
187-
* <pre>{@code
188-
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
189-
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
190-
* .batchSize(batchSize)
191-
* .totalPermits(maxConcurrentMessages)
192-
* .acquireTimeout(acquireTimeout)
193-
* .throughputConfiguration(BackPressureMode.AUTO)
194-
* .build()
195-
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
196-
* .batchSize(batchSize)
197-
* .build();
198-
* return new CompositeBackPressureHandler(List.of(
199-
* concurrencyLimiterBlockingBackPressureHandler,
200-
* throughputBackPressureHandler
201-
* ),
202-
* batchSize,
203-
* standbyLimitPollingInterval
204-
* );
205-
* }}</pre>
206-
*
207-
* @param backPressureHandlerSupplier the BackPressureHandler supplier.
154+
* @param backPressureHandlerFactory the BackPressureHandler supplier.
208155
* @return this instance.
209156
*/
210-
B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier);
157+
B backPressureHandlerFactory(BackPressureHandlerFactory backPressureHandlerFactory);
211158

212159
/**
213160
* Set the maximum interval between acknowledgements for batch acknowledgements. The default depends on the specific

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ThroughputBackPressureHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.springframework.util.Assert;
2626

2727
/**
28-
* {@link BackPressureHandler} implementation that uses a switches between high and low throughput modes.
28+
* {@link BackPressureHandler} implementation that uses a switch between high and low throughput modes.
2929
* <p>
3030
* The initial throughput mode is low, which means, only one batch at a time can be requested. If some messages are
31-
* fetched, then the throughput mode is switched to high, which means, the multiple batches can be requested (i.e. there
32-
* is no need to wait for the previous batch's processing to complete before requesting a new one). If no messages are
33-
* returned fetched by a poll, the throughput mode is switched back to low.
31+
* fetched, then the throughput mode is switched to high, which means that multiple batches can be requested (i.e.,
32+
* there is no need to wait for the previous batch's processing to complete before requesting a new one). If no messages
33+
* are returned fetched by a poll, the throughput mode is switched back to low.
3434
* <p>
3535
* This {@link BackPressureHandler} is designed to be used in combination with another {@link BackPressureHandler} like
3636
* the {@link ConcurrencyLimiterBlockingBackPressureHandler} that will handle the maximum concurrency level within the

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsBackPressureIntegrationTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ void staticBackPressureLimitShouldCapQueueProcessingCapacity(int staticLimit, in
132132
.queueNames(
133133
queueName)
134134
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
135-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
135+
.backPressureHandlerFactory(containerOptions -> new CompositeBackPressureHandler(
136136
List.of(limiter,
137137
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(5)
138138
.totalPermits(5).acquireTimeout(Duration.ofSeconds(1L))
@@ -172,7 +172,7 @@ void zeroBackPressureLimitShouldStopQueueProcessing() throws Exception {
172172
.queueNames(
173173
queueName)
174174
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
175-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
175+
.backPressureHandlerFactory(containerOptions -> new CompositeBackPressureHandler(
176176
List.of(limiter,
177177
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(5)
178178
.totalPermits(5).acquireTimeout(Duration.ofSeconds(1L))
@@ -218,7 +218,7 @@ void changeInBackPressureLimitShouldAdaptQueueProcessingCapacity() throws Except
218218
.queueNames(
219219
queueName)
220220
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
221-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
221+
.backPressureHandlerFactory(containerOptions -> new CompositeBackPressureHandler(
222222
List.of(limiter,
223223
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(5)
224224
.totalPermits(5).acquireTimeout(Duration.ofSeconds(1L))
@@ -442,9 +442,8 @@ void unsynchronizedChangesInBackPressureLimitShouldAdaptQueueProcessingCapacity(
442442
EventsCsvWriter eventsCsvWriter = new EventsCsvWriter();
443443
var container = SqsMessageListenerContainer.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient())
444444
.queueNames(queueName)
445-
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
446-
.standbyLimitPollingInterval(Duration.ofMillis(1))
447-
.backPressureHandlerSupplier(() -> new StatisticsBphDecorator(new CompositeBackPressureHandler(
445+
.configure(options -> options.pollTimeout(Duration.ofSeconds(1)).backPressureHandlerFactory(
446+
containerOptions -> new StatisticsBphDecorator(new CompositeBackPressureHandler(
448447
List.of(limiter,
449448
ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(10)
450449
.totalPermits(10).acquireTimeout(Duration.ofSeconds(1L))

0 commit comments

Comments
 (0)