Skip to content

Commit 74dc430

Browse files
committed
Revert changes to SemaphoreBackPressureHandler not to change default behavior (#1251)
1 parent 1e99159 commit 74dc430

File tree

4 files changed

+329
-20
lines changed

4 files changed

+329
-20
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -235,25 +235,10 @@ protected BackPressureHandler createBackPressureHandler() {
235235
if (containerOptions.getBackPressureHandlerSupplier() != null) {
236236
return containerOptions.getBackPressureHandlerSupplier().get();
237237
}
238-
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
239-
int batchSize = containerOptions.getMaxMessagesPerPoll();
240-
int maxConcurrentMessages = containerOptions.getMaxConcurrentMessages();
241-
var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
242-
.batchSize(batchSize).totalPermits(maxConcurrentMessages).acquireTimeout(acquireTimeout)
243-
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
244-
if (maxConcurrentMessages == batchSize) {
245-
return concurrencyLimiterBlockingBackPressureHandler;
246-
}
247-
return switch (containerOptions.getBackPressureMode()) {
248-
case FIXED_HIGH_THROUGHPUT -> concurrencyLimiterBlockingBackPressureHandler;
249-
case ALWAYS_POLL_MAX_MESSAGES,
250-
AUTO -> {
251-
var throughputBackPressureHandler = ThroughputBackPressureHandler.builder().batchSize(batchSize).build();
252-
yield new CompositeBackPressureHandler(
253-
List.of(concurrencyLimiterBlockingBackPressureHandler, throughputBackPressureHandler),
254-
batchSize, containerOptions.getStandbyLimitPollingInterval());
255-
}
256-
};
238+
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
239+
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
240+
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
241+
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
257242
}
258243

259244
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,61 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
157157
B backPressureMode(BackPressureMode backPressureMode);
158158

159159
/**
160-
* Set the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null}.
160+
* Sets the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null} which
161+
* results in a default {@link SemaphoreBackPressureHandler} to be instantiated. In case a supplier is provided, the
162+
* {@link BackPressureHandler} will be instantiated by the supplier.
163+
* <p>
164+
* <strong>NOTE:</strong> <em>it is important for the supplier to always return a new instance as otherwise it might
165+
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple
166+
* containers which is very likely not the desired behavior.</em>
167+
* <p>
168+
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
169+
* <ul>
170+
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be
171+
* processed concurrently by the application.</li>
172+
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order
173+
* to reduce SQS pull costs when few messages are coming in.</li>
174+
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and
175+
* ensures they cooperate.</li>
176+
* </ul>
177+
* <p>
178+
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own
179+
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the
180+
* {@link CompositeBackPressureHandler}.
181+
*
182+
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3>
183+
*
184+
* <pre>{@code
185+
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
186+
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
187+
* .batchSize(batchSize)
188+
* .totalPermits(maxConcurrentMessages)
189+
* .acquireTimeout(acquireTimeout)
190+
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
191+
* .build()
192+
* }}</pre>
193+
*
194+
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3>
195+
*
196+
* <pre>{@code
197+
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
198+
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
199+
* .batchSize(batchSize)
200+
* .totalPermits(maxConcurrentMessages)
201+
* .acquireTimeout(acquireTimeout)
202+
* .throughputConfiguration(BackPressureMode.AUTO)
203+
* .build()
204+
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
205+
* .batchSize(batchSize)
206+
* .build();
207+
* return new CompositeBackPressureHandler(List.of(
208+
* concurrencyLimiterBlockingBackPressureHandler,
209+
* throughputBackPressureHandler
210+
* ),
211+
* batchSize,
212+
* standbyLimitPollingInterval
213+
* );
214+
* }}</pre>
161215
*
162216
* @param backPressureHandlerSupplier the BackPressureHandler supplier.
163217
* @return this instance.
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.Arrays;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* {@link BackPressureHandler} implementation that uses a {@link Semaphore} for handling backpressure.
29+
*
30+
* @author Tomaz Fernandes
31+
* @since 3.0
32+
* @see io.awspring.cloud.sqs.listener.source.PollingMessageSource
33+
*/
34+
public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
35+
36+
private static final Logger logger = LoggerFactory.getLogger(SemaphoreBackPressureHandler.class);
37+
38+
private final Semaphore semaphore;
39+
40+
private final int batchSize;
41+
42+
private final int totalPermits;
43+
44+
private final Duration acquireTimeout;
45+
46+
private final BackPressureMode backPressureConfiguration;
47+
48+
private volatile CurrentThroughputMode currentThroughputMode;
49+
50+
private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false);
51+
52+
private String id;
53+
54+
private SemaphoreBackPressureHandler(Builder builder) {
55+
this.batchSize = builder.batchSize;
56+
this.totalPermits = builder.totalPermits;
57+
this.acquireTimeout = builder.acquireTimeout;
58+
this.backPressureConfiguration = builder.backPressureMode;
59+
this.semaphore = new Semaphore(totalPermits);
60+
this.currentThroughputMode = BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(backPressureConfiguration)
61+
? CurrentThroughputMode.HIGH
62+
: CurrentThroughputMode.LOW;
63+
logger.debug("SemaphoreBackPressureHandler created with configuration {} and {} total permits",
64+
backPressureConfiguration, totalPermits);
65+
}
66+
67+
public static Builder builder() {
68+
return new Builder();
69+
}
70+
71+
@Override
72+
public void setId(String id) {
73+
this.id = id;
74+
}
75+
76+
@Override
77+
public String getId() {
78+
return this.id;
79+
}
80+
81+
@Override
82+
public int request(int amount) throws InterruptedException {
83+
return tryAcquire(amount, this.currentThroughputMode) ? amount : 0;
84+
}
85+
86+
// @formatter:off
87+
@Override
88+
public int requestBatch() throws InterruptedException {
89+
return CurrentThroughputMode.LOW.equals(this.currentThroughputMode)
90+
? requestInLowThroughputMode()
91+
: requestInHighThroughputMode();
92+
}
93+
94+
private int requestInHighThroughputMode() throws InterruptedException {
95+
return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH)
96+
? this.batchSize
97+
: tryAcquirePartial();
98+
}
99+
// @formatter:on
100+
101+
private int tryAcquirePartial() throws InterruptedException {
102+
int availablePermits = this.semaphore.availablePermits();
103+
if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) {
104+
return 0;
105+
}
106+
int permitsToRequest = Math.min(availablePermits, this.batchSize);
107+
CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode;
108+
logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}",
109+
permitsToRequest, availablePermits, this.id, currentThroughputModeNow);
110+
boolean hasAcquiredPartial = tryAcquire(permitsToRequest, currentThroughputModeNow);
111+
return hasAcquiredPartial ? permitsToRequest : 0;
112+
}
113+
114+
private int requestInLowThroughputMode() throws InterruptedException {
115+
// Although LTM can be set / unset by many processes, only the MessageSource thread gets here,
116+
// so no actual concurrency
117+
logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id,
118+
this.semaphore.availablePermits());
119+
boolean hasAcquired = tryAcquire(this.totalPermits, CurrentThroughputMode.LOW);
120+
if (hasAcquired) {
121+
logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits());
122+
// We've acquired all permits - there's no other process currently processing messages
123+
if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) {
124+
logger.warn("hasAcquiredFullPermits was already true. Permits left: {}",
125+
this.semaphore.availablePermits());
126+
}
127+
return this.batchSize;
128+
}
129+
else {
130+
return 0;
131+
}
132+
}
133+
134+
private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputModeNow) throws InterruptedException {
135+
logger.trace("Acquiring {} permits for {} in TM {}", amount, this.id, this.currentThroughputMode);
136+
boolean hasAcquired = this.semaphore.tryAcquire(amount, this.acquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
137+
if (hasAcquired) {
138+
logger.trace("{} permits acquired for {} in TM {}. Permits left: {}", amount, this.id,
139+
currentThroughputModeNow, this.semaphore.availablePermits());
140+
}
141+
else {
142+
logger.trace("Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}", amount,
143+
this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow,
144+
this.semaphore.availablePermits());
145+
}
146+
return hasAcquired;
147+
}
148+
149+
@Override
150+
public void releaseBatch() {
151+
maybeSwitchToLowThroughputMode();
152+
int permitsToRelease = getPermitsToRelease(this.batchSize);
153+
this.semaphore.release(permitsToRelease);
154+
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
155+
this.semaphore.availablePermits());
156+
}
157+
158+
@Override
159+
public int getBatchSize() {
160+
return this.batchSize;
161+
}
162+
163+
private void maybeSwitchToLowThroughputMode() {
164+
if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration)
165+
&& CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) {
166+
logger.debug("Entire batch of permits released for {}, setting TM LOW. Permits left: {}", this.id,
167+
this.semaphore.availablePermits());
168+
this.currentThroughputMode = CurrentThroughputMode.LOW;
169+
}
170+
}
171+
172+
@Override
173+
public void release(int amount) {
174+
logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
175+
this.semaphore.availablePermits());
176+
maybeSwitchToHighThroughputMode(amount);
177+
int permitsToRelease = getPermitsToRelease(amount);
178+
this.semaphore.release(permitsToRelease);
179+
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
180+
this.semaphore.availablePermits());
181+
}
182+
183+
@Override
184+
public void release(int amount, ReleaseReason reason) {
185+
if (amount == this.batchSize && reason == ReleaseReason.NONE_FETCHED) {
186+
releaseBatch();
187+
}
188+
else {
189+
release(amount);
190+
}
191+
}
192+
193+
private int getPermitsToRelease(int amount) {
194+
return this.hasAcquiredFullPermits.compareAndSet(true, false)
195+
// The first process that gets here should release all permits except for inflight messages
196+
// We can have only one batch of messages at this point since we have all permits
197+
? this.totalPermits - (this.batchSize - amount)
198+
: amount;
199+
}
200+
201+
private void maybeSwitchToHighThroughputMode(int amount) {
202+
if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
203+
logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
204+
this.semaphore.availablePermits());
205+
this.currentThroughputMode = CurrentThroughputMode.HIGH;
206+
}
207+
}
208+
209+
@Override
210+
public boolean drain(Duration timeout) {
211+
logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(),
212+
this.totalPermits - this.semaphore.availablePermits(), this.id);
213+
try {
214+
return this.semaphore.tryAcquire(this.totalPermits, (int) timeout.getSeconds(), TimeUnit.SECONDS);
215+
}
216+
catch (InterruptedException e) {
217+
Thread.currentThread().interrupt();
218+
throw new IllegalStateException("Interrupted while waiting to acquire permits", e);
219+
}
220+
}
221+
222+
private enum CurrentThroughputMode {
223+
224+
HIGH,
225+
226+
LOW;
227+
228+
}
229+
230+
public static class Builder {
231+
232+
private int batchSize;
233+
234+
private int totalPermits;
235+
236+
private Duration acquireTimeout;
237+
238+
private BackPressureMode backPressureMode;
239+
240+
public Builder batchSize(int batchSize) {
241+
this.batchSize = batchSize;
242+
return this;
243+
}
244+
245+
public Builder totalPermits(int totalPermits) {
246+
this.totalPermits = totalPermits;
247+
return this;
248+
}
249+
250+
public Builder acquireTimeout(Duration acquireTimeout) {
251+
this.acquireTimeout = acquireTimeout;
252+
return this;
253+
}
254+
255+
public Builder throughputConfiguration(BackPressureMode backPressureConfiguration) {
256+
this.backPressureMode = backPressureConfiguration;
257+
return this;
258+
}
259+
260+
public SemaphoreBackPressureHandler build() {
261+
Assert.noNullElements(
262+
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode),
263+
"Missing configuration");
264+
return new SemaphoreBackPressureHandler(this);
265+
}
266+
267+
}
268+
269+
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ else if (hasMadeSecondPoll.compareAndSet(false, true)) {
166166
}
167167
catch (Throwable t) {
168168
logger.error("Error (not expecting it)", t);
169+
errors.add(t);
169170
throw new RuntimeException(t);
170171
}
171172
}, threadPool).whenComplete((v, t) -> {

0 commit comments

Comments
 (0)