Skip to content

Commit af9d6ce

Browse files
committed
Add tests for ThroughputBackPressureHandler (#1251)
1 parent 77488f1 commit af9d6ce

File tree

4 files changed

+113
-20
lines changed

4 files changed

+113
-20
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public interface BackPressureHandler {
4343
* <p>
4444
* This method can be called in the following use cases:
4545
* <ul>
46-
* <li>{@link ReleaseReason#LIMITED}: all/some permits were not used because another BackPressureHandler has a lower permits
47-
* limit and the difference in permits needs to be returned.</li>
46+
* <li>{@link ReleaseReason#LIMITED}: all/some permits were not used because another BackPressureHandler has a lower
47+
* permits limit and the difference in permits needs to be returned.</li>
4848
* <li>{@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
4949
* from SQS. Permits need to be returned.</li>
5050
* <li>{@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
@@ -82,8 +82,8 @@ default void release(int amount) {
8282

8383
enum ReleaseReason {
8484
/**
85-
* All/Some permits were not used because another BackPressureHandler has a lower permits limit and the
86-
* permits difference need to be aligned across all handlers.
85+
* All/Some permits were not used because another BackPressureHandler has a lower permits limit and the permits
86+
* difference need to be aligned across all handlers.
8787
*/
8888
LIMITED,
8989
/**

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
/**
1919
* Factory interface for creating {@link BackPressureHandler} instances to manage queue consumption backpressure.
2020
* <p>
21-
* Implementations of this interface are responsible for producing a new {@link BackPressureHandler} for each
22-
* container, configured according to the provided {@link ContainerOptions}. This ensures that internal resources
23-
* (such as counters or semaphores) are not shared across containers, which could lead to unintended side effects.
21+
* Implementations of this interface are responsible for producing a new {@link BackPressureHandler} for each container,
22+
* configured according to the provided {@link ContainerOptions}. This ensures that internal resources (such as counters
23+
* or semaphores) are not shared across containers, which could lead to unintended side effects.
2424
* <p>
2525
* Default factory implementations can be found in the {@link BackPressureHandlerFactories} class.
2626
*/

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ThroughputBackPressureHandler.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
* <p>
2828
* <strong>Throughput modes</strong>
2929
* <ul>
30-
* <li>In low-throughput mode, a single batch can be requested at a time.</li>
30+
* <li>In low-throughput mode, a single batch can be requested at a time. The number of permits that will be * delivered
31+
* is the requested amount or 0 is a batch is already in-flight.</li>
3132
* <li>In high-throughput mode, multiple batches can be requested at a time. The number of permits that will be
3233
* delivered is the requested amount.</li>
3334
* </ul>
@@ -55,7 +56,7 @@ public class ThroughputBackPressureHandler implements BackPressureHandler, Ident
5556

5657
private String id = getClass().getSimpleName();
5758

58-
private ThroughputBackPressureHandler(Builder builder) {
59+
private ThroughputBackPressureHandler() {
5960
logger.debug("ThroughputBackPressureHandler created");
6061
}
6162

@@ -79,15 +80,16 @@ public int request(int amount) throws InterruptedException {
7980
return 0;
8081
}
8182
CurrentThroughputMode throughputMode = this.currentThroughputMode.get();
82-
if (throughputMode == CurrentThroughputMode.LOW && this.occupied.get()) {
83-
logger.debug("[{}] No permits acquired because a batch already being processed in low throughput mode",
84-
this.id);
85-
return 0;
86-
}
87-
else {
88-
logger.debug("[{}] Acquired {} permits ({} mode)", this.id, amount, throughputMode);
89-
return amount;
83+
if (throughputMode == CurrentThroughputMode.LOW) {
84+
if (this.occupied.get()) {
85+
logger.debug("[{}] No permits acquired because a batch already being processed in low throughput mode",
86+
this.id);
87+
return 0;
88+
}
89+
this.occupied.set(true);
9090
}
91+
logger.debug("[{}] Acquired {} permits ({} mode)", this.id, amount, throughputMode);
92+
return amount;
9193
}
9294

9395
@Override
@@ -97,8 +99,14 @@ public void release(int amount, ReleaseReason reason) {
9799
}
98100
logger.debug("[{}] Releasing {} permits ({})", this.id, amount, reason);
99101
switch (reason) {
100-
case NONE_FETCHED -> updateThroughputMode(CurrentThroughputMode.HIGH, CurrentThroughputMode.LOW);
101-
case PARTIAL_FETCH -> updateThroughputMode(CurrentThroughputMode.LOW, CurrentThroughputMode.HIGH);
102+
case NONE_FETCHED -> {
103+
this.occupied.compareAndSet(true, false);
104+
updateThroughputMode(CurrentThroughputMode.HIGH, CurrentThroughputMode.LOW);
105+
}
106+
case PARTIAL_FETCH -> {
107+
this.occupied.compareAndSet(true, false);
108+
updateThroughputMode(CurrentThroughputMode.LOW, CurrentThroughputMode.HIGH);
109+
}
102110
case LIMITED, PROCESSED -> {
103111
// No need to switch throughput mode
104112
}
@@ -129,7 +137,7 @@ private enum CurrentThroughputMode {
129137
public static class Builder {
130138

131139
public ThroughputBackPressureHandler build() {
132-
return new ThroughputBackPressureHandler(this);
140+
return new ThroughputBackPressureHandler();
133141
}
134142
}
135143
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.time.Duration;
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.params.ParameterizedTest;
24+
import org.junit.jupiter.params.provider.CsvSource;
25+
26+
class ThroughputBackPressureHandlerTest {
27+
28+
private ThroughputBackPressureHandler handler;
29+
30+
@BeforeEach
31+
void setUp() {
32+
handler = new ThroughputBackPressureHandler.Builder().build();
33+
}
34+
35+
@ParameterizedTest
36+
@CsvSource({ "LIMITED,0", "PROCESSED,0", "NONE_FETCHED,5", "PARTIAL_FETCH,5", })
37+
void lowThroughputMode_shouldReturnZeroUntilRelease(BackPressureHandler.ReleaseReason releaseReason,
38+
int expectedPermitsAfterRelease) throws InterruptedException {
39+
// Given a first batch
40+
int batchSize = 5;
41+
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
42+
// When a second batch is requested, it should return zero permits (because low throughput mode)
43+
assertThat(handler.request(batchSize)).isEqualTo(0);
44+
// When a batch is requested after a release, the expected permits should be
45+
// returned depending on the release reason
46+
handler.release(1, releaseReason);
47+
assertThat(handler.request(batchSize)).isEqualTo(expectedPermitsAfterRelease);
48+
}
49+
50+
@Test
51+
void highThroughputMode_shouldAllowMultipleConcurrentRequests() throws InterruptedException {
52+
// Given a first batch with polled messages
53+
int batchSize = 5;
54+
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
55+
handler.release(0, BackPressureHandler.ReleaseReason.PARTIAL_FETCH); // switch to HIGH
56+
// Then subsequent requests should return the same batch size
57+
// because we are in high throughput mode
58+
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
59+
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
60+
handler.release(0, BackPressureHandler.ReleaseReason.PARTIAL_FETCH);
61+
handler.release(0, BackPressureHandler.ReleaseReason.PARTIAL_FETCH);
62+
// When a fetch returns no messages, throughput mode should switch to LOW
63+
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
64+
handler.release(5, BackPressureHandler.ReleaseReason.NONE_FETCHED);
65+
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
66+
// And subsequent requests should return zero permits until the current batch finishes with NONE_FETCHED
67+
assertThat(handler.request(batchSize)).isEqualTo(0);
68+
assertThat(handler.request(batchSize)).isEqualTo(0);
69+
handler.release(5, BackPressureHandler.ReleaseReason.NONE_FETCHED);
70+
assertThat(handler.request(batchSize)).isEqualTo(5);
71+
// or until it (the current batch) finishes with PARTIAL_FETCH
72+
assertThat(handler.request(batchSize)).isEqualTo(0);
73+
assertThat(handler.request(batchSize)).isEqualTo(0);
74+
handler.release(3, BackPressureHandler.ReleaseReason.PARTIAL_FETCH);
75+
assertThat(handler.request(batchSize)).isEqualTo(5);
76+
}
77+
78+
@Test
79+
void drain_shouldSetDrainedAndReturnTrue() throws InterruptedException {
80+
boolean result = handler.drain(Duration.ofSeconds(1));
81+
assertThat(result).isTrue();
82+
assertThat(handler.request(5)).isEqualTo(0);
83+
}
84+
85+
}

0 commit comments

Comments
 (0)