Skip to content

Commit 4ea6d2c

Browse files
committed
Address review comments
1 parent b1698da commit 4ea6d2c

File tree

5 files changed

+6
-41
lines changed

5 files changed

+6
-41
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
4848

4949
private final Duration maxDelayBetweenPolls;
5050

51-
private final Duration standbyLimitPollingInterval;
52-
5351
private final Duration listenerShutdownTimeout;
5452

5553
private final Duration acknowledgementShutdownTimeout;
@@ -85,7 +83,6 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
8583
this.autoStartup = builder.autoStartup;
8684
this.pollTimeout = builder.pollTimeout;
8785
this.pollBackOffPolicy = builder.pollBackOffPolicy;
88-
this.standbyLimitPollingInterval = builder.standbyLimitPollingInterval;
8986
this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls;
9087
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
9188
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
@@ -129,11 +126,6 @@ public BackOffPolicy getPollBackOffPolicy() {
129126
return this.pollBackOffPolicy;
130127
}
131128

132-
@Override
133-
public Duration getStandbyLimitPollingInterval() {
134-
return this.standbyLimitPollingInterval;
135-
}
136-
137129
@Override
138130
public Duration getMaxDelayBetweenPolls() {
139131
return this.maxDelayBetweenPolls;
@@ -223,8 +215,6 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
223215

224216
private static final BackOffPolicy DEFAULT_POLL_BACK_OFF_POLICY = buildDefaultBackOffPolicy();
225217

226-
private static final Duration DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL = Duration.ofMillis(100);
227-
228218
private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10);
229219

230220
private static final Duration DEFAULT_LISTENER_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20);
@@ -251,8 +241,6 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
251241

252242
private BackOffPolicy pollBackOffPolicy = DEFAULT_POLL_BACK_OFF_POLICY;
253243

254-
private Duration standbyLimitPollingInterval = DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL;
255-
256244
private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT;
257245

258246
private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;
@@ -341,13 +329,6 @@ public B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
341329
return self();
342330
}
343331

344-
@Override
345-
public B standbyLimitPollingInterval(Duration standbyLimitPollingInterval) {
346-
Assert.notNull(standbyLimitPollingInterval, "standbyLimitPollingInterval cannot be null");
347-
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
348-
return self();
349-
}
350-
351332
@Override
352333
public B maxDelayBetweenPolls(Duration maxDelayBetweenPolls) {
353334
Assert.notNull(maxDelayBetweenPolls, "semaphoreAcquireTimeout cannot be null");

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ConcurrencyLimiterBlockingBackPressureHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,20 @@ public class ConcurrencyLimiterBlockingBackPressureHandler
4343

4444
private final Duration acquireTimeout;
4545

46-
private final boolean alwaysPollMasMessages;
46+
private final boolean alwaysPollMaxMessages;
4747

4848
private String id = getClass().getSimpleName();
4949

5050
private ConcurrencyLimiterBlockingBackPressureHandler(Builder builder) {
5151
this.batchSize = builder.batchSize;
5252
this.totalPermits = builder.totalPermits;
5353
this.acquireTimeout = builder.acquireTimeout;
54-
this.alwaysPollMasMessages = BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(builder.backPressureMode);
54+
this.alwaysPollMaxMessages = BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(builder.backPressureMode);
5555
this.semaphore = new Semaphore(totalPermits);
5656
logger.debug(
5757
"ConcurrencyLimiterBlockingBackPressureHandler created with configuration "
58-
+ "totalPermits: {}, batchSize: {}, acquireTimeout: {}, an alwaysPollMasMessages: {}",
59-
this.totalPermits, this.batchSize, this.acquireTimeout, this.alwaysPollMasMessages);
58+
+ "totalPermits: {}, batchSize: {}, acquireTimeout: {}, an alwaysPollMaxMessages: {}",
59+
this.totalPermits, this.batchSize, this.acquireTimeout, this.alwaysPollMaxMessages);
6060
}
6161

6262
public static Builder builder() {
@@ -81,7 +81,7 @@ public int requestBatch() throws InterruptedException {
8181
@Override
8282
public int request(int amount) throws InterruptedException {
8383
int acquiredPermits = tryAcquire(amount, this.acquireTimeout);
84-
if (alwaysPollMasMessages || acquiredPermits > 0) {
84+
if (alwaysPollMaxMessages || acquiredPermits > 0) {
8585
return acquiredPermits;
8686
}
8787
int availablePermits = Math.min(this.semaphore.availablePermits(), amount);

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
5959
*/
6060
boolean isAutoStartup();
6161

62-
/**
63-
* {@return the amount of time to wait before checking again for the current limit when the queue processing is on
64-
* standby} Default is 100 milliseconds.
65-
*/
66-
Duration getStandbyLimitPollingInterval();
67-
6862
/**
6963
* Sets the maximum time the polling thread should wait for a full batch of permits to be available before trying to
7064
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
5757
*/
5858
B autoStartup(boolean autoStartup);
5959

60-
/**
61-
* Sets the amount of time to wait before checking again for the current limit when the queue processing is on
62-
* standby.
63-
*
64-
* @param standbyLimitPollingInterval the limit polling interval when the queue processing is on standby.
65-
* @return this instance.
66-
*/
67-
B standbyLimitPollingInterval(Duration standbyLimitPollingInterval);
68-
6960
/**
7061
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
7162
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ThroughputBackPressureHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
1919
import java.time.Duration;
20-
import java.util.List;
2120
import java.util.concurrent.atomic.AtomicBoolean;
2221
import java.util.concurrent.atomic.AtomicInteger;
2322
import java.util.concurrent.atomic.AtomicReference;
@@ -146,7 +145,7 @@ public Builder batchSize(int batchSize) {
146145
}
147146

148147
public ThroughputBackPressureHandler build() {
149-
Assert.noNullElements(List.of(this.batchSize), "Missing configuration");
148+
Assert.notNull(this.batchSize, "Missing configuration");
150149
Assert.isTrue(this.batchSize > 0, "batch size must be greater than 0");
151150
return new ThroughputBackPressureHandler(this);
152151
}

0 commit comments

Comments
 (0)