Skip to content

Commit 3b2dfb3

Browse files
committed
Introduce factory methods for creating back-pressure handlers (#1251)
1 parent b9a4f62 commit 3b2dfb3

9 files changed

+373
-146
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
222222

223223
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
224224

225-
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = buildDefaultBackPressureHandlerFactory();
225+
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = BackPressureHandlerFactory::semaphoreBackPressureHandler;
226226

227227
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
228228

@@ -428,12 +428,6 @@ private static BackOffPolicy buildDefaultBackOffPolicy() {
428428
return BackOffPolicyBuilder.newBuilder().multiplier(DEFAULT_BACK_OFF_MULTIPLIER)
429429
.delay(DEFAULT_BACK_OFF_DELAY).maxDelay(DEFAULT_BACK_OFF_MAX_DELAY).build();
430430
}
431-
432-
private static BackPressureHandlerFactory buildDefaultBackPressureHandlerFactory() {
433-
return options -> SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
434-
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
435-
.throughputConfiguration(options.getBackPressureMode()).build();
436-
}
437431
}
438432

439433
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactory.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
*/
1616
package io.awspring.cloud.sqs.listener;
1717

18+
import java.time.Duration;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
1822
/**
1923
* A factory for creating {@link BackPressureHandler} for managing queue consumption backpressure. Implementations can
2024
* configure each the {@link BackPressureHandler} according to its strategies, using the provided
@@ -82,4 +86,99 @@ public interface BackPressureHandlerFactory {
8286
* @return the created BackPressureHandler
8387
*/
8488
BackPressureHandler createBackPressureHandler(ContainerOptions<?, ?> containerOptions);
89+
90+
/**
91+
* Creates a new {@link SemaphoreBackPressureHandler} instance based on the provided {@link ContainerOptions}.
92+
*
93+
* @param options the container options.
94+
* @return the created SemaphoreBackPressureHandler.
95+
*/
96+
static BatchAwareBackPressureHandler semaphoreBackPressureHandler(ContainerOptions<?, ?> options) {
97+
return SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
98+
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
99+
.throughputConfiguration(options.getBackPressureMode()).build();
100+
}
101+
102+
/**
103+
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions} combining a
104+
* {@link ConcurrencyLimiterBlockingBackPressureHandler}, a {@link ThroughputBackPressureHandler} and a
105+
* {@link FullBatchBackPressureHandler}. The exact combination of depends on the given {@link ContainerOptions}.
106+
*
107+
* @param options the container options.
108+
* @param maxIdleWaitTime the maximum amount of time to wait for a permit to be released in case no permits were
109+
* obtained.
110+
* @return the created SemaphoreBackPressureHandler.
111+
*/
112+
static BatchAwareBackPressureHandler concurrencyLimiterBackPressureHandler(ContainerOptions<?, ?> options,
113+
Duration maxIdleWaitTime) {
114+
BackPressureMode backPressureMode = options.getBackPressureMode();
115+
116+
var concurrencyLimiterBlockingBackPressureHandler = concurrencyLimiterBackPressureHandler2(options);
117+
if (backPressureMode == BackPressureMode.FIXED_HIGH_THROUGHPUT) {
118+
return concurrencyLimiterBlockingBackPressureHandler;
119+
}
120+
var backPressureHandlers = new ArrayList<BackPressureHandler>();
121+
backPressureHandlers.add(concurrencyLimiterBlockingBackPressureHandler);
122+
123+
// The ThroughputBackPressureHandler should run second in the chain as it is non-blocking.
124+
// Running it first would result in more polls as it would potentially limit the
125+
// ConcurrencyLimiterBlockingBackPressureHandler to a lower amount of requested permits
126+
// which means the ConcurrencyLimiterBlockingBackPressureHandler blocking behavior would
127+
// not be optimally leveraged.
128+
if (backPressureMode == BackPressureMode.AUTO
129+
|| backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
130+
backPressureHandlers.add(throughputBackPressureHandler(options));
131+
}
132+
133+
// The FullBatchBackPressureHandler should run last in the chain to ensure that a full batch is requested or not
134+
if (backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
135+
backPressureHandlers.add(fullBatchBackPressureHandler(options));
136+
}
137+
return compositeBackPressureHandler(options, maxIdleWaitTime, backPressureHandlers);
138+
}
139+
140+
/**
141+
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
142+
* {@link ContainerOptions}.
143+
*
144+
* @param options the container options.
145+
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
146+
*/
147+
static CompositeBackPressureHandler compositeBackPressureHandler(ContainerOptions<?, ?> options,
148+
Duration maxIdleWaitTime, List<BackPressureHandler> backPressureHandlers) {
149+
return new CompositeBackPressureHandler(List.copyOf(backPressureHandlers), options.getMaxMessagesPerPoll(),
150+
maxIdleWaitTime);
151+
}
152+
153+
/**
154+
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
155+
* {@link ContainerOptions}.
156+
* @param options the container options.
157+
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
158+
*/
159+
static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler2(
160+
ContainerOptions<?, ?> options) {
161+
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
162+
.totalPermits(options.getMaxConcurrentMessages()).throughputConfiguration(options.getBackPressureMode())
163+
.acquireTimeout(options.getMaxDelayBetweenPolls()).build();
164+
}
165+
166+
/**
167+
* Creates a new {@link ThroughputBackPressureHandler} instance based on the provided {@link ContainerOptions}.
168+
* @param options the container options.
169+
* @return the created ThroughputBackPressureHandler.
170+
*/
171+
static ThroughputBackPressureHandler throughputBackPressureHandler(ContainerOptions<?, ?> options) {
172+
return ThroughputBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
173+
.totalPermits(options.getMaxConcurrentMessages()).build();
174+
}
175+
176+
/**
177+
* Creates a new {@link FullBatchBackPressureHandler} instance based on the provided {@link ContainerOptions}.
178+
* @param options the container options.
179+
* @return the created FullBatchBackPressureHandler.
180+
*/
181+
static FullBatchBackPressureHandler fullBatchBackPressureHandler(ContainerOptions<?, ?> options) {
182+
return FullBatchBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build();
183+
}
85184
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,28 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27+
/**
28+
* Composite {@link BackPressureHandler} implementation that delegates the back-pressure handling to a list of
29+
* {@link BackPressureHandler}s.
30+
* <p>
31+
* This class is used to combine multiple back-pressure handlers into a single one. It allows for more complex
32+
* back-pressure handling strategies by combining different implementations.
33+
* <p>
34+
* The order in which the back-pressure handlers are registered in the {@link CompositeBackPressureHandler} is important
35+
* as it will affect the blocking and limiting behaviour of the back-pressure handling.
36+
* <p>
37+
* When {@link #request(int amount)} is called, the first back-pressure handler in the list is called with
38+
* {@code amount} as the requested amount of permits. The returned amount of permits (which is less than or equal to the
39+
* initial amount) is then passed to the next back-pressure handler in the list. This process of reducing the amount to
40+
* request for the next handlers in the chain is called "limiting". This process continues until all back-pressure
41+
* handlers have been called or {@literal 0} permits has been returned.
42+
* <p>
43+
* Once the final amount of available permits have been computed, unused acquired permits on back-pressure handlers (due
44+
* to later limiting happening in the chain) are released.
45+
* <p>
46+
* If no permits were obtained, the {@link #request(int)} method will wait up to {@code noPermitsReturnedWaitTimeout}
47+
* for a release of permits before returning.
48+
*/
2749
public class CompositeBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
2850

2951
private static final Logger logger = LoggerFactory.getLogger(CompositeBackPressureHandler.class);
@@ -41,10 +63,10 @@ public class CompositeBackPressureHandler implements BatchAwareBackPressureHandl
4163
private String id;
4264

4365
public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize,
44-
Duration waitTimeout) {
66+
Duration noPermitsReturnedWaitTimeout) {
4567
this.backPressureHandlers = backPressureHandlers;
4668
this.batchSize = batchSize;
47-
this.noPermitsReturnedWaitTimeout = waitTimeout;
69+
this.noPermitsReturnedWaitTimeout = noPermitsReturnedWaitTimeout;
4870
}
4971

5072
@Override

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ConcurrencyLimiterBlockingBackPressureHandler.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.awspring.cloud.sqs.listener;
1717

18+
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
1819
import java.time.Duration;
1920
import java.util.Arrays;
2021
import java.util.concurrent.Semaphore;
@@ -24,11 +25,10 @@
2425
import org.springframework.util.Assert;
2526

2627
/**
27-
* {@link BackPressureHandler} implementation that uses a {@link Semaphore} for handling backpressure.
28+
* Blocking {@link BackPressureHandler} implementation that uses a {@link Semaphore} for handling the number of
29+
* concurrent messages being processed.
2830
*
29-
* @author Tomaz Fernandes
30-
* @see io.awspring.cloud.sqs.listener.source.PollingMessageSource
31-
* @since 3.0
31+
* @see PollingMessageSource
3232
*/
3333
public class ConcurrencyLimiterBlockingBackPressureHandler
3434
implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
@@ -43,20 +43,17 @@ public class ConcurrencyLimiterBlockingBackPressureHandler
4343

4444
private final Duration acquireTimeout;
4545

46-
private final boolean alwaysPollMaxMessages;
47-
4846
private String id = getClass().getSimpleName();
4947

5048
private ConcurrencyLimiterBlockingBackPressureHandler(Builder builder) {
5149
this.batchSize = builder.batchSize;
5250
this.totalPermits = builder.totalPermits;
5351
this.acquireTimeout = builder.acquireTimeout;
54-
this.alwaysPollMaxMessages = BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(builder.backPressureMode);
55-
this.semaphore = new Semaphore(totalPermits);
5652
logger.debug(
5753
"ConcurrencyLimiterBlockingBackPressureHandler created with configuration "
58-
+ "totalPermits: {}, batchSize: {}, acquireTimeout: {}, an alwaysPollMaxMessages: {}",
59-
this.totalPermits, this.batchSize, this.acquireTimeout, this.alwaysPollMaxMessages);
54+
+ "totalPermits: {}, batchSize: {}, acquireTimeout: {}",
55+
this.totalPermits, this.batchSize, this.acquireTimeout);
56+
this.semaphore = new Semaphore(totalPermits);
6057
}
6158

6259
public static Builder builder() {
@@ -81,7 +78,7 @@ public int requestBatch() throws InterruptedException {
8178
@Override
8279
public int request(int amount) throws InterruptedException {
8380
int acquiredPermits = tryAcquire(amount, this.acquireTimeout);
84-
if (alwaysPollMaxMessages || acquiredPermits > 0) {
81+
if (acquiredPermits > 0) {
8582
return acquiredPermits;
8683
}
8784
int availablePermits = Math.min(this.semaphore.availablePermits(), amount);
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
19+
import java.time.Duration;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* Non-blocking {@link BackPressureHandler} implementation that ensures the exact batch size is requested.
26+
* <p>
27+
* If the amount of permits being requested is not equal to the batch size, permits will be limited to {@literal 0}. For
28+
* this limiting mechanism to work, the {@link FullBatchBackPressureHandler} must be used in combination with another
29+
* {@link BackPressureHandler} and be the last one in the chain of the {@link CompositeBackPressureHandler}
30+
*
31+
* @see PollingMessageSource
32+
*/
33+
public class FullBatchBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
34+
35+
private static final Logger logger = LoggerFactory.getLogger(FullBatchBackPressureHandler.class);
36+
37+
private final int batchSize;
38+
39+
private String id = getClass().getSimpleName();
40+
41+
private FullBatchBackPressureHandler(Builder builder) {
42+
this.batchSize = builder.batchSize;
43+
logger.debug("FullBatchBackPressureHandler created with configuration: batchSize: {}", this.batchSize);
44+
}
45+
46+
public static Builder builder() {
47+
return new Builder();
48+
}
49+
50+
@Override
51+
public void setId(String id) {
52+
this.id = id;
53+
}
54+
55+
@Override
56+
public String getId() {
57+
return this.id;
58+
}
59+
60+
@Override
61+
public int requestBatch() throws InterruptedException {
62+
return request(this.batchSize);
63+
}
64+
65+
@Override
66+
public int request(int amount) throws InterruptedException {
67+
if (amount == batchSize) {
68+
return amount;
69+
}
70+
logger.warn("[{}] Could not acquire a full batch ({} / {}), cancelling current poll", this.id, amount,
71+
this.batchSize);
72+
return 0;
73+
}
74+
75+
@Override
76+
public void release(int amount, ReleaseReason reason) {
77+
// NO-OP
78+
}
79+
80+
@Override
81+
public boolean drain(Duration timeout) {
82+
return true;
83+
}
84+
85+
public static class Builder {
86+
87+
private int batchSize;
88+
89+
public Builder batchSize(int batchSize) {
90+
this.batchSize = batchSize;
91+
return this;
92+
}
93+
94+
public FullBatchBackPressureHandler build() {
95+
Assert.notNull(this.batchSize, "Missing configuration for batch size");
96+
Assert.isTrue(this.batchSize > 0, "The batch size must be greater than 0");
97+
return new FullBatchBackPressureHandler(this);
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)