Skip to content

Commit 0c18801

Browse files
committed
Add a wait condition to the CompositeBPH in case 0 permits were returned (#1251)
The wait can be interrupted when permits are returned.
1 parent b8e041d commit 0c18801

File tree

2 files changed

+95
-90
lines changed

2 files changed

+95
-90
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,33 @@
1717

1818
import java.time.Duration;
1919
import java.util.List;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.locks.Condition;
22+
import java.util.concurrent.locks.ReentrantLock;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2025

2126
public class CompositeBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
2227

28+
private static final Logger logger = LoggerFactory.getLogger(CompositeBackPressureHandler.class);
29+
2330
private final List<BackPressureHandler> backPressureHandlers;
2431

2532
private final int batchSize;
2633

34+
private final ReentrantLock noPermitsReturnedWaitLock = new ReentrantLock();
35+
36+
private final Condition permitsReleasedCondition = noPermitsReturnedWaitLock.newCondition();
37+
38+
private final Duration noPermitsReturnedWaitTimeout;
39+
2740
private String id;
2841

29-
public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize) {
42+
public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize,
43+
Duration waitTimeout) {
3044
this.backPressureHandlers = backPressureHandlers;
3145
this.batchSize = batchSize;
46+
this.noPermitsReturnedWaitTimeout = waitTimeout;
3247
}
3348

3449
@Override
@@ -63,6 +78,9 @@ public int request(int amount) throws InterruptedException {
6378
backPressureHandlers.get(i).release(obtainedForBph - obtained, ReleaseReason.LIMITED);
6479
}
6580
}
81+
if (obtained == 0) {
82+
waitForPermitsToBeReleased();
83+
}
6684
return obtained;
6785
}
6886

@@ -71,14 +89,48 @@ public void release(int amount, ReleaseReason reason) {
7189
for (BackPressureHandler handler : backPressureHandlers) {
7290
handler.release(amount, reason);
7391
}
92+
if (amount > 0) {
93+
signalPermitsWereReleased();
94+
}
95+
}
96+
97+
/**
98+
* Waits for permits to be released up to {@link #noPermitsReturnedWaitTimeout}. If no permits were released within
99+
* the configured {@link #noPermitsReturnedWaitTimeout}, returns immediately. This allows {@link #request(int)} to
100+
* return {@code 0} permits and will trigger another round of back-pressure handling.
101+
*
102+
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
103+
*/
104+
@SuppressWarnings({ "java:S899" // we are not interested in the await return value here
105+
})
106+
private void waitForPermitsToBeReleased() throws InterruptedException {
107+
noPermitsReturnedWaitLock.lock();
108+
try {
109+
permitsReleasedCondition.await(noPermitsReturnedWaitTimeout.toMillis(), TimeUnit.MILLISECONDS);
110+
}
111+
finally {
112+
noPermitsReturnedWaitLock.unlock();
113+
}
114+
}
115+
116+
private void signalPermitsWereReleased() {
117+
noPermitsReturnedWaitLock.lock();
118+
try {
119+
permitsReleasedCondition.signal();
120+
}
121+
finally {
122+
noPermitsReturnedWaitLock.unlock();
123+
}
74124
}
75125

76126
@Override
77127
public boolean drain(Duration timeout) {
128+
logger.info("Draining back-pressure handlers initiated");
78129
boolean result = true;
79130
for (BackPressureHandler handler : backPressureHandlers) {
80131
result &= !handler.drain(timeout);
81132
}
133+
logger.info("Draining back-pressure handlers completed");
82134
return result;
83135
}
84136
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsBackPressureIntegrationTests.java

Lines changed: 42 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.awspring.cloud.sqs.integration;
1717

18-
import static java.util.Collections.singletonMap;
1918
import static org.assertj.core.api.Assertions.assertThat;
2019

2120
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
@@ -36,7 +35,6 @@
3635
import java.util.List;
3736
import java.util.Queue;
3837
import java.util.Random;
39-
import java.util.concurrent.CompletableFuture;
4038
import java.util.concurrent.ConcurrentLinkedQueue;
4139
import java.util.concurrent.CountDownLatch;
4240
import java.util.concurrent.Semaphore;
@@ -47,7 +45,6 @@
4745
import java.util.function.IntUnaryOperator;
4846
import java.util.stream.Collectors;
4947
import java.util.stream.IntStream;
50-
import org.junit.jupiter.api.BeforeAll;
5148
import org.junit.jupiter.api.Test;
5249
import org.junit.jupiter.params.ParameterizedTest;
5350
import org.junit.jupiter.params.provider.CsvSource;
@@ -60,8 +57,6 @@
6057
import org.springframework.context.annotation.Import;
6158
import org.springframework.messaging.Message;
6259
import org.springframework.messaging.support.MessageBuilder;
63-
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
64-
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
6560

6661
/**
6762
* Integration tests for SQS containers back pressure management.
@@ -73,65 +68,6 @@ class SqsBackPressureIntegrationTests extends BaseSqsIntegrationTest {
7368

7469
private static final Logger logger = LoggerFactory.getLogger(SqsBackPressureIntegrationTests.class);
7570

76-
static final String RECEIVES_MESSAGE_QUEUE_NAME = "receives_message_test_queue";
77-
78-
static final String RECEIVES_MESSAGE_BATCH_QUEUE_NAME = "receives_message_batch_test_queue";
79-
80-
static final String RECEIVES_MESSAGE_ASYNC_QUEUE_NAME = "receives_message_async_test_queue";
81-
82-
static final String DOES_NOT_ACK_ON_ERROR_QUEUE_NAME = "does_not_ack_test_queue";
83-
84-
static final String DOES_NOT_ACK_ON_ERROR_ASYNC_QUEUE_NAME = "does_not_ack_async_test_queue";
85-
86-
static final String DOES_NOT_ACK_ON_ERROR_BATCH_QUEUE_NAME = "does_not_ack_batch_test_queue";
87-
88-
static final String DOES_NOT_ACK_ON_ERROR_BATCH_ASYNC_QUEUE_NAME = "does_not_ack_batch_async_test_queue";
89-
90-
static final String RESOLVES_PARAMETER_TYPES_QUEUE_NAME = "resolves_parameter_type_test_queue";
91-
92-
static final String MANUALLY_START_CONTAINER = "manually_start_container_test_queue";
93-
94-
static final String MANUALLY_CREATE_CONTAINER_QUEUE_NAME = "manually_create_container_test_queue";
95-
96-
static final String MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME = "manually_create_inactive_container_test_queue";
97-
98-
static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue";
99-
100-
static final String CONSUMES_ONE_MESSAGE_AT_A_TIME_QUEUE_NAME = "consumes_one_message_test_queue";
101-
102-
static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue";
103-
104-
static final String LOW_RESOURCE_FACTORY = "lowResourceFactory";
105-
106-
static final String MANUAL_ACK_FACTORY = "manualAcknowledgementFactory";
107-
108-
static final String MANUAL_ACK_BATCH_FACTORY = "manualAcknowledgementBatchFactory";
109-
110-
static final String ACK_AFTER_SECOND_ERROR_FACTORY = "ackAfterSecondErrorFactory";
111-
112-
@BeforeAll
113-
static void beforeTests() {
114-
SqsAsyncClient client = createAsyncClient();
115-
CompletableFuture.allOf(createQueue(client, RECEIVES_MESSAGE_QUEUE_NAME),
116-
createQueue(client, DOES_NOT_ACK_ON_ERROR_QUEUE_NAME,
117-
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "1")),
118-
createQueue(client, DOES_NOT_ACK_ON_ERROR_ASYNC_QUEUE_NAME,
119-
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "1")),
120-
createQueue(client, DOES_NOT_ACK_ON_ERROR_BATCH_QUEUE_NAME,
121-
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "1")),
122-
createQueue(client, DOES_NOT_ACK_ON_ERROR_BATCH_ASYNC_QUEUE_NAME,
123-
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "1")),
124-
createQueue(client, RECEIVES_MESSAGE_ASYNC_QUEUE_NAME),
125-
createQueue(client, RECEIVES_MESSAGE_BATCH_QUEUE_NAME),
126-
createQueue(client, RESOLVES_PARAMETER_TYPES_QUEUE_NAME,
127-
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "20")),
128-
createQueue(client, MANUALLY_CREATE_CONTAINER_QUEUE_NAME),
129-
createQueue(client, MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME),
130-
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME),
131-
createQueue(client, CONSUMES_ONE_MESSAGE_AT_A_TIME_QUEUE_NAME),
132-
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME)).join();
133-
}
134-
13571
@Autowired
13672
SqsTemplate sqsTemplate;
13773

@@ -202,11 +138,12 @@ void staticBackPressureLimitShouldCapQueueProcessingCapacity(int staticLimit, in
202138
.queueNames(
203139
queueName)
204140
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
205-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(List.of(limiter,
206-
SemaphoreBackPressureHandler.builder().batchSize(5).totalPermits(5)
207-
.acquireTimeout(Duration.ofSeconds(1L))
208-
.throughputConfiguration(BackPressureMode.AUTO).build()),
209-
5)))
141+
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
142+
List.of(limiter,
143+
SemaphoreBackPressureHandler.builder().batchSize(5).totalPermits(5)
144+
.acquireTimeout(Duration.ofSeconds(1L))
145+
.throughputConfiguration(BackPressureMode.AUTO).build()),
146+
5, Duration.ofMillis(50L))))
210147
.messageListener(msg -> {
211148
int concurrentRqs = concurrentRequest.incrementAndGet();
212149
maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs));
@@ -241,11 +178,12 @@ void zeroBackPressureLimitShouldStopQueueProcessing() throws Exception {
241178
.queueNames(
242179
queueName)
243180
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
244-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(List.of(limiter,
245-
SemaphoreBackPressureHandler.builder().batchSize(5).totalPermits(5)
246-
.acquireTimeout(Duration.ofSeconds(1L))
247-
.throughputConfiguration(BackPressureMode.AUTO).build()),
248-
5)))
181+
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
182+
List.of(limiter,
183+
SemaphoreBackPressureHandler.builder().batchSize(5).totalPermits(5)
184+
.acquireTimeout(Duration.ofSeconds(1L))
185+
.throughputConfiguration(BackPressureMode.AUTO).build()),
186+
5, Duration.ofMillis(50L))))
249187
.messageListener(msg -> {
250188
int concurrentRqs = concurrentRequest.incrementAndGet();
251189
maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs));
@@ -278,23 +216,33 @@ void changeInBackPressureLimitShouldAdaptQueueProcessingCapacity() throws Except
278216
var latch = new CountDownLatch(nbMessages);
279217
var controlSemaphore = new Semaphore(0);
280218
var advanceSemaphore = new Semaphore(0);
219+
var processingFailed = new AtomicBoolean(false);
220+
var isDraining = new AtomicBoolean(false);
281221
var container = SqsMessageListenerContainer
282222
.builder().sqsAsyncClient(
283223
BaseSqsIntegrationTest.createAsyncClient())
284224
.queueNames(
285225
queueName)
286226
.configure(options -> options.pollTimeout(Duration.ofSeconds(1))
287-
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(List.of(limiter,
288-
SemaphoreBackPressureHandler.builder().batchSize(5).totalPermits(5)
289-
.acquireTimeout(Duration.ofSeconds(1L))
290-
.throughputConfiguration(BackPressureMode.AUTO).build()),
291-
5)))
227+
.backPressureHandlerSupplier(() -> new CompositeBackPressureHandler(
228+
List.of(limiter,
229+
SemaphoreBackPressureHandler.builder().batchSize(5).totalPermits(5)
230+
.acquireTimeout(Duration.ofSeconds(1L))
231+
.throughputConfiguration(BackPressureMode.AUTO).build()),
232+
5, Duration.ofMillis(50L))))
292233
.messageListener(msg -> {
293234
try {
294-
controlSemaphore.acquire();
235+
if (!controlSemaphore.tryAcquire(5, TimeUnit.SECONDS) && !isDraining.get()) {
236+
processingFailed.set(true);
237+
throw new IllegalStateException("Failed to wait for control semaphore");
238+
}
295239
}
296240
catch (InterruptedException e) {
297-
throw new RuntimeException(e);
241+
if (!isDraining.get()) {
242+
processingFailed.set(true);
243+
Thread.currentThread().interrupt();
244+
throw new RuntimeException(e);
245+
}
298246
}
299247
int concurrentRqs = concurrentRequest.incrementAndGet();
300248
maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs));
@@ -310,14 +258,16 @@ class Controller {
310258
private final Semaphore controlSemaphore;
311259
private final NonBlockingExternalConcurrencyLimiterBackPressureHandler limiter;
312260
private final AtomicInteger maxConcurrentRequest;
261+
private final AtomicBoolean processingFailed;
313262

314263
Controller(Semaphore advanceSemaphore, Semaphore controlSemaphore,
315264
NonBlockingExternalConcurrencyLimiterBackPressureHandler limiter,
316-
AtomicInteger maxConcurrentRequest) {
265+
AtomicInteger maxConcurrentRequest, AtomicBoolean processingFailed) {
317266
this.advanceSemaphore = advanceSemaphore;
318267
this.controlSemaphore = controlSemaphore;
319268
this.limiter = limiter;
320269
this.maxConcurrentRequest = maxConcurrentRequest;
270+
this.processingFailed = processingFailed;
321271
}
322272

323273
public void updateLimit(int newLimit) {
@@ -341,9 +291,11 @@ void waitForAdvance(int permits) throws InterruptedException {
341291
.withFailMessage(() -> "Waiting for %d permits timed out. Only %d permits available"
342292
.formatted(permits, advanceSemaphore.availablePermits()))
343293
.isTrue();
294+
assertThat(processingFailed.get()).isFalse();
344295
}
345296
}
346-
var controller = new Controller(advanceSemaphore, controlSemaphore, limiter, maxConcurrentRequest);
297+
var controller = new Controller(advanceSemaphore, controlSemaphore, limiter, maxConcurrentRequest,
298+
processingFailed);
347299
try {
348300
container.start();
349301

@@ -386,8 +338,10 @@ void waitForAdvance(int permits) throws InterruptedException {
386338
controller.waitForAdvance(50);
387339
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
388340
assertThat(controller.maxConcurrentRequest.get()).isEqualTo(5);
341+
assertThat(processingFailed.get()).isFalse();
389342
}
390343
finally {
344+
isDraining.set(true);
391345
container.stop();
392346
}
393347
}
@@ -500,13 +454,12 @@ void unsynchronizedChangesInBackPressureLimitShouldAdaptQueueProcessingCapacity(
500454
options -> options.pollTimeout(Duration.ofSeconds(1))
501455
.standbyLimitPollingInterval(
502456
Duration.ofMillis(1))
503-
.backPressureHandlerSupplier(() -> new StatisticsBphDecorator(
504-
new CompositeBackPressureHandler(List.of(limiter,
505-
SemaphoreBackPressureHandler.builder().batchSize(10).totalPermits(10)
506-
.acquireTimeout(Duration.ofSeconds(1L))
457+
.backPressureHandlerSupplier(
458+
() -> new StatisticsBphDecorator(new CompositeBackPressureHandler(
459+
List.of(limiter, SemaphoreBackPressureHandler.builder().batchSize(10)
460+
.totalPermits(10).acquireTimeout(Duration.ofSeconds(1L))
507461
.throughputConfiguration(BackPressureMode.AUTO).build()),
508-
10),
509-
eventsCsvWriter)))
462+
10, Duration.ofMillis(50L)), eventsCsvWriter)))
510463
.messageListener(msg -> {
511464
int currentConcurrentRq = concurrentRequest.incrementAndGet();
512465
maxConcurrentRequest.updateAndGet(max -> Math.max(max, currentConcurrentRq));

0 commit comments

Comments
 (0)