Skip to content

Commit baef9eb

Browse files
committed
Rename BackPressureHandlerFactory methods
1 parent 8fcc7be commit baef9eb

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,11 @@ static BatchAwareBackPressureHandler semaphoreBackPressureHandler(ContainerOptio
109109
* obtained.
110110
* @return the created SemaphoreBackPressureHandler.
111111
*/
112-
static BatchAwareBackPressureHandler concurrencyLimiterBackPressureHandler(ContainerOptions<?, ?> options,
112+
static BatchAwareBackPressureHandler adaptativeThroughputBackPressureHandler(ContainerOptions<?, ?> options,
113113
Duration maxIdleWaitTime) {
114114
BackPressureMode backPressureMode = options.getBackPressureMode();
115115

116-
var concurrencyLimiterBlockingBackPressureHandler = concurrencyLimiterBackPressureHandler2(options);
116+
var concurrencyLimiterBlockingBackPressureHandler = concurrencyLimiterBackPressureHandler(options);
117117
if (backPressureMode == BackPressureMode.FIXED_HIGH_THROUGHPUT) {
118118
return concurrencyLimiterBlockingBackPressureHandler;
119119
}
@@ -156,7 +156,7 @@ static CompositeBackPressureHandler compositeBackPressureHandler(ContainerOption
156156
* @param options the container options.
157157
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
158158
*/
159-
static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler2(
159+
static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler(
160160
ContainerOptions<?, ?> options) {
161161
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
162162
.totalPermits(options.getMaxConcurrentMessages()).throughputConfiguration(options.getBackPressureMode())

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsBackPressureIntegrationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ void staticBackPressureLimitShouldCapQueueProcessingCapacity(int staticLimit, in
135135
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactory
136136
.compositeBackPressureHandler(containerOptions, Duration.ofMillis(50L),
137137
List.of(limiter, BackPressureHandlerFactory
138-
.concurrencyLimiterBackPressureHandler2(containerOptions)))))
138+
.concurrencyLimiterBackPressureHandler(containerOptions)))))
139139
.messageListener(msg -> {
140140
int concurrentRqs = concurrentRequest.incrementAndGet();
141141
maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs));
@@ -173,7 +173,7 @@ void zeroBackPressureLimitShouldStopQueueProcessing() throws Exception {
173173
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactory
174174
.compositeBackPressureHandler(containerOptions, Duration.ofMillis(50L),
175175
List.of(limiter, BackPressureHandlerFactory
176-
.concurrencyLimiterBackPressureHandler2(containerOptions)))))
176+
.concurrencyLimiterBackPressureHandler(containerOptions)))))
177177
.messageListener(msg -> {
178178
int concurrentRqs = concurrentRequest.incrementAndGet();
179179
maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs));
@@ -217,7 +217,7 @@ void changeInBackPressureLimitShouldAdaptQueueProcessingCapacity() throws Except
217217
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactory
218218
.compositeBackPressureHandler(containerOptions, Duration.ofMillis(50L),
219219
List.of(limiter, BackPressureHandlerFactory
220-
.concurrencyLimiterBackPressureHandler2(containerOptions)))))
220+
.concurrencyLimiterBackPressureHandler(containerOptions)))))
221221
.messageListener(msg -> {
222222
try {
223223
if (!controlSemaphore.tryAcquire(5, TimeUnit.SECONDS) && !isDraining.get()) {
@@ -444,7 +444,7 @@ void unsynchronizedChangesInBackPressureLimitShouldAdaptQueueProcessingCapacity(
444444
BackPressureHandlerFactory.compositeBackPressureHandler(containerOptions,
445445
Duration.ofMillis(50L),
446446
List.of(limiter, BackPressureHandlerFactory
447-
.concurrencyLimiterBackPressureHandler2(containerOptions))),
447+
.concurrencyLimiterBackPressureHandler(containerOptions))),
448448
eventsCsvWriter)))
449449
.messageListener(msg -> {
450450
int currentConcurrentRq = concurrentRequest.incrementAndGet();

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ void shouldAcquireAndReleaseFullPermits() {
6262
.backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES)
6363
.maxDelayBetweenPolls(Duration.ofMillis(200)).build();
6464
BackPressureHandler backPressureHandler = BackPressureHandlerFactory
65-
.concurrencyLimiterBackPressureHandler(options, Duration.ofMillis(100L));
65+
.adaptativeThroughputBackPressureHandler(options, Duration.ofMillis(100L));
6666

6767
ExecutorService threadPool = Executors.newCachedThreadPool();
6868
CountDownLatch pollingCounter = new CountDownLatch(3);
@@ -120,7 +120,7 @@ void shouldAdaptThroughputMode() {
120120
.backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES)
121121
.maxDelayBetweenPolls(Duration.ofMillis(150)).build();
122122
BackPressureHandler backPressureHandler = BackPressureHandlerFactory
123-
.concurrencyLimiterBackPressureHandler(options, Duration.ofMillis(100L));
123+
.adaptativeThroughputBackPressureHandler(options, Duration.ofMillis(100L));
124124

125125
ExecutorService threadPool = Executors.newCachedThreadPool();
126126
CountDownLatch pollingCounter = new CountDownLatch(3);
@@ -206,7 +206,7 @@ void shouldAcquireAndReleasePartialPermits() {
206206
SqsContainerOptions options = SqsContainerOptions.builder().maxMessagesPerPoll(10).maxConcurrentMessages(10)
207207
.backPressureMode(BackPressureMode.AUTO).maxDelayBetweenPolls(Duration.ofMillis(150)).build();
208208
BackPressureHandler backPressureHandler = BackPressureHandlerFactory
209-
.concurrencyLimiterBackPressureHandler(options, Duration.ofMillis(200L));
209+
.adaptativeThroughputBackPressureHandler(options, Duration.ofMillis(200L));
210210

211211
ExecutorService threadPool = Executors
212212
.newCachedThreadPool(new MessageExecutionThreadFactory("test " + testCounter.incrementAndGet()));
@@ -297,7 +297,7 @@ public org.springframework.messaging.Message<?> toMessagingMessage(Message sourc
297297
.backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES)
298298
.maxDelayBetweenPolls(Duration.ofMillis(150)).messageConverter(converter).build();
299299
BackPressureHandler backPressureHandler = BackPressureHandlerFactory
300-
.concurrencyLimiterBackPressureHandler(options, Duration.ofMillis(100L));
300+
.adaptativeThroughputBackPressureHandler(options, Duration.ofMillis(100L));
301301

302302
AtomicInteger messagesInSink = new AtomicInteger(0);
303303
AtomicBoolean hasFailed = new AtomicBoolean(false);
@@ -350,7 +350,7 @@ void shouldBackOffIfPollingThrowsAnError() {
350350
.backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES)
351351
.maxDelayBetweenPolls(Duration.ofMillis(200)).pollBackOffPolicy(policy).build();
352352
BackPressureHandler backPressureHandler = BackPressureHandlerFactory
353-
.concurrencyLimiterBackPressureHandler(options, Duration.ofMillis(100L));
353+
.adaptativeThroughputBackPressureHandler(options, Duration.ofMillis(100L));
354354

355355
var currentPoll = new AtomicInteger(0);
356356
var waitThirdPollLatch = new CountDownLatch(4);

0 commit comments

Comments
 (0)