Skip to content

Commit 011fde0

Browse files
committed
Split SemaphoreBackPressureHandler into a ConcurrencyLimiterBlocking and a Throughput BackPressureHandler(s) (#1251)
1 parent 98ba703 commit 011fde0

File tree

7 files changed

+499
-338
lines changed

7 files changed

+499
-338
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,23 @@ protected BackPressureHandler createBackPressureHandler() {
232232
}
233233
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
234234
int batchSize = containerOptions.getMaxMessagesPerPoll();
235-
return SemaphoreBackPressureHandler.builder().batchSize(batchSize)
236-
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
235+
int maxConcurrentMessages = containerOptions.getMaxConcurrentMessages();
236+
var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
237+
.batchSize(batchSize).totalPermits(maxConcurrentMessages).acquireTimeout(acquireTimeout)
237238
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
239+
if (maxConcurrentMessages == batchSize) {
240+
return concurrencyLimiterBlockingBackPressureHandler;
241+
}
242+
return switch (containerOptions.getBackPressureMode()) {
243+
case FIXED_HIGH_THROUGHPUT -> concurrencyLimiterBlockingBackPressureHandler;
244+
case ALWAYS_POLL_MAX_MESSAGES,
245+
AUTO -> {
246+
var throughputBackPressureHandler = ThroughputBackPressureHandler.builder().batchSize(batchSize).build();
247+
yield new CompositeBackPressureHandler(
248+
List.of(concurrencyLimiterBlockingBackPressureHandler, throughputBackPressureHandler),
249+
batchSize, containerOptions.getStandbyLimitPollingInterval());
250+
}
251+
};
238252
}
239253

240254
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.awspring.cloud.sqs.listener;
1717

1818
import java.time.Duration;
19+
import java.time.Instant;
1920
import java.util.List;
2021
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.locks.Condition;
@@ -66,6 +67,7 @@ public int requestBatch() throws InterruptedException {
6667

6768
@Override
6869
public int request(int amount) throws InterruptedException {
70+
logger.debug("[{}] Requesting {} permits", this.id, amount);
6971
int obtained = amount;
7072
int[] obtainedPerBph = new int[backPressureHandlers.size()];
7173
for (int i = 0; i < backPressureHandlers.size() && obtained > 0; i++) {
@@ -81,11 +83,13 @@ public int request(int amount) throws InterruptedException {
8183
if (obtained == 0) {
8284
waitForPermitsToBeReleased();
8385
}
86+
logger.debug("[{}] Obtained {} permits ({} requested)", this.id, obtained, amount);
8487
return obtained;
8588
}
8689

8790
@Override
8891
public void release(int amount, ReleaseReason reason) {
92+
logger.debug("[{}] Releasing {} permits ({})", this.id, amount, reason);
8993
for (BackPressureHandler handler : backPressureHandlers) {
9094
handler.release(amount, reason);
9195
}
@@ -106,6 +110,8 @@ public void release(int amount, ReleaseReason reason) {
106110
private void waitForPermitsToBeReleased() throws InterruptedException {
107111
noPermitsReturnedWaitLock.lock();
108112
try {
113+
logger.trace("[{}] No permits were obtained, waiting for a release up to {}", this.id,
114+
noPermitsReturnedWaitTimeout);
109115
permitsReleasedCondition.await(noPermitsReturnedWaitTimeout.toMillis(), TimeUnit.MILLISECONDS);
110116
}
111117
finally {
@@ -125,12 +131,19 @@ private void signalPermitsWereReleased() {
125131

126132
@Override
127133
public boolean drain(Duration timeout) {
128-
logger.info("Draining back-pressure handlers initiated");
134+
logger.debug("[{}] Draining back-pressure handlers initiated", this.id);
129135
boolean result = true;
136+
Instant start = Instant.now();
130137
for (BackPressureHandler handler : backPressureHandlers) {
131-
result &= !handler.drain(timeout);
138+
Duration remainingTimeout = maxDuration(timeout.minus(Duration.between(start, Instant.now())),
139+
Duration.ZERO);
140+
result &= handler.drain(remainingTimeout);
132141
}
133-
logger.info("Draining back-pressure handlers completed");
142+
logger.debug("[{}] Draining back-pressure handlers completed", this.id);
134143
return result;
135144
}
145+
146+
private static Duration maxDuration(Duration first, Duration second) {
147+
return first.compareTo(second) > 0 ? first : second;
148+
}
136149
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.Arrays;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.TimeUnit;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import org.springframework.util.Assert;
25+
26+
/**
27+
* {@link BackPressureHandler} implementation that uses a {@link Semaphore} for handling backpressure.
28+
*
29+
* @author Tomaz Fernandes
30+
* @see io.awspring.cloud.sqs.listener.source.PollingMessageSource
31+
* @since 3.0
32+
*/
33+
public class ConcurrencyLimiterBlockingBackPressureHandler
34+
implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
35+
36+
private static final Logger logger = LoggerFactory.getLogger(ConcurrencyLimiterBlockingBackPressureHandler.class);
37+
38+
private final Semaphore semaphore;
39+
40+
private final int batchSize;
41+
42+
private final int totalPermits;
43+
44+
private final Duration acquireTimeout;
45+
46+
private final boolean alwaysPollMasMessages;
47+
48+
private String id = getClass().getSimpleName();
49+
50+
private ConcurrencyLimiterBlockingBackPressureHandler(Builder builder) {
51+
this.batchSize = builder.batchSize;
52+
this.totalPermits = builder.totalPermits;
53+
this.acquireTimeout = builder.acquireTimeout;
54+
this.alwaysPollMasMessages = BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(builder.backPressureMode);
55+
this.semaphore = new Semaphore(totalPermits);
56+
logger.debug(
57+
"ConcurrencyLimiterBlockingBackPressureHandler created with configuration "
58+
+ "totalPermits: {}, batchSize: {}, acquireTimeout: {}, an alwaysPollMasMessages: {}",
59+
this.totalPermits, this.batchSize, this.acquireTimeout, this.alwaysPollMasMessages);
60+
}
61+
62+
public static Builder builder() {
63+
return new Builder();
64+
}
65+
66+
@Override
67+
public void setId(String id) {
68+
this.id = id;
69+
}
70+
71+
@Override
72+
public String getId() {
73+
return this.id;
74+
}
75+
76+
@Override
77+
public int requestBatch() throws InterruptedException {
78+
return request(this.batchSize);
79+
}
80+
81+
@Override
82+
public int request(int amount) throws InterruptedException {
83+
int acquiredPermits = tryAcquire(amount, this.acquireTimeout);
84+
if (alwaysPollMasMessages || acquiredPermits > 0) {
85+
return acquiredPermits;
86+
}
87+
int availablePermits = Math.min(this.semaphore.availablePermits(), amount);
88+
if (availablePermits > 0) {
89+
return tryAcquire(availablePermits, this.acquireTimeout);
90+
}
91+
return 0;
92+
}
93+
94+
private int tryAcquire(int amount, Duration duration) throws InterruptedException {
95+
if (this.semaphore.tryAcquire(amount, duration.toMillis(), TimeUnit.MILLISECONDS)) {
96+
logger.debug("[{}] Acquired {} permits ({} / {} available)", this.id, amount,
97+
this.semaphore.availablePermits(), this.totalPermits);
98+
return amount;
99+
}
100+
return 0;
101+
}
102+
103+
@Override
104+
public void release(int amount, ReleaseReason reason) {
105+
this.semaphore.release(amount);
106+
logger.debug("[{}] Released {} permits ({}) ({} / {} available)", this.id, amount, reason,
107+
this.semaphore.availablePermits(), this.totalPermits);
108+
}
109+
110+
@Override
111+
public boolean drain(Duration timeout) {
112+
logger.debug("[{}] Waiting for up to {} for approx. {} permits to be released", this.id, timeout,
113+
this.totalPermits - this.semaphore.availablePermits());
114+
try {
115+
return tryAcquire(this.totalPermits, timeout) > 0;
116+
}
117+
catch (InterruptedException e) {
118+
Thread.currentThread().interrupt();
119+
logger.debug("[{}] Draining interrupted", this.id);
120+
return false;
121+
}
122+
}
123+
124+
public static class Builder {
125+
126+
private int batchSize;
127+
128+
private int totalPermits;
129+
130+
private Duration acquireTimeout;
131+
132+
private BackPressureMode backPressureMode;
133+
134+
public Builder batchSize(int batchSize) {
135+
this.batchSize = batchSize;
136+
return this;
137+
}
138+
139+
public Builder totalPermits(int totalPermits) {
140+
this.totalPermits = totalPermits;
141+
return this;
142+
}
143+
144+
public Builder acquireTimeout(Duration acquireTimeout) {
145+
this.acquireTimeout = acquireTimeout;
146+
return this;
147+
}
148+
149+
public Builder throughputConfiguration(BackPressureMode backPressureConfiguration) {
150+
this.backPressureMode = backPressureConfiguration;
151+
return this;
152+
}
153+
154+
public ConcurrencyLimiterBlockingBackPressureHandler build() {
155+
Assert.noNullElements(
156+
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode),
157+
"Missing configuration");
158+
Assert.isTrue(this.batchSize > 0, "The batch size must be greater than 0");
159+
Assert.isTrue(this.totalPermits >= this.batchSize, "Total permits must be greater than the batch size");
160+
return new ConcurrencyLimiterBlockingBackPressureHandler(this);
161+
}
162+
}
163+
}

0 commit comments

Comments
 (0)