Skip to content

Commit 3f72277

Browse files
committed
Remove BackPressureHandlerLimiter from the library and make it user-code in tests only (#1251)
1 parent 296d05a commit 3f72277

File tree

9 files changed

+243
-229
lines changed

9 files changed

+243
-229
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.micrometer.observation.ObservationConvention;
2323
import io.micrometer.observation.ObservationRegistry;
2424
import java.time.Duration;
25+
import java.util.function.Supplier;
2526
import org.springframework.core.task.TaskExecutor;
2627
import org.springframework.lang.Nullable;
2728
import org.springframework.retry.backoff.BackOffPolicy;
@@ -57,7 +58,7 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
5758

5859
private final BackPressureMode backPressureMode;
5960

60-
private final BackPressureLimiter backPressureLimiter;
61+
private final Supplier<BackPressureHandler> backPressureHandlerSupplier;
6162

6263
private final ListenerMode listenerMode;
6364

@@ -95,7 +96,7 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
9596
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
9697
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
9798
this.backPressureMode = builder.backPressureMode;
98-
this.backPressureLimiter = builder.backPressureLimiter;
99+
this.backPressureHandlerSupplier = builder.backPressureHandlerSupplier;
99100
this.listenerMode = builder.listenerMode;
100101
this.messageConverter = builder.messageConverter;
101102
this.acknowledgementMode = builder.acknowledgementMode;
@@ -174,8 +175,8 @@ public BackPressureMode getBackPressureMode() {
174175
}
175176

176177
@Override
177-
public BackPressureLimiter getBackPressureLimiter() {
178-
return this.backPressureLimiter;
178+
public Supplier<BackPressureHandler> getBackPressureHandlerSupplier() {
179+
return this.backPressureHandlerSupplier;
179180
}
180181

181182
@Override
@@ -250,7 +251,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
250251

251252
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
252253

253-
private static final BackPressureLimiter DEFAULT_BACKPRESSURE_LIMITER = null;
254+
private static final Supplier<BackPressureHandler> DEFAULT_BACKPRESSURE_LIMITER = null;
254255

255256
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
256257

@@ -276,7 +277,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
276277

277278
private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;
278279

279-
private BackPressureLimiter backPressureLimiter = DEFAULT_BACKPRESSURE_LIMITER;
280+
private Supplier<BackPressureHandler> backPressureHandlerSupplier = DEFAULT_BACKPRESSURE_LIMITER;
280281

281282
private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;
282283

@@ -320,7 +321,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
320321
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
321322
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
322323
this.backPressureMode = options.backPressureMode;
323-
this.backPressureLimiter = options.backPressureLimiter;
324+
this.backPressureHandlerSupplier = options.backPressureHandlerSupplier;
324325
this.listenerMode = options.listenerMode;
325326
this.messageConverter = options.messageConverter;
326327
this.acknowledgementMode = options.acknowledgementMode;
@@ -423,8 +424,8 @@ public B backPressureMode(BackPressureMode backPressureMode) {
423424
}
424425

425426
@Override
426-
public B backPressureLimiter(BackPressureLimiter backPressureLimiter) {
427-
this.backPressureLimiter = backPressureLimiter;
427+
public B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier) {
428+
this.backPressureHandlerSupplier = backPressureHandlerSupplier;
428429
return self();
429430
}
430431

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -232,17 +232,14 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
232232

233233
protected BackPressureHandler createBackPressureHandler() {
234234
O containerOptions = getContainerOptions();
235-
List<BackPressureHandler> backPressureHandlers = new ArrayList<>(2);
235+
if (containerOptions.getBackPressureHandlerSupplier() != null) {
236+
return containerOptions.getBackPressureHandlerSupplier().get();
237+
}
236238
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
237239
int batchSize = containerOptions.getMaxMessagesPerPoll();
238-
backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize)
240+
return SemaphoreBackPressureHandler.builder().batchSize(batchSize)
239241
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
240-
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
241-
if (containerOptions.getBackPressureLimiter() != null) {
242-
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
243-
acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize));
244-
}
245-
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
242+
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
246243
}
247244

248245
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ public interface BackPressureHandler {
5757
*/
5858
void release(int amount, ReleaseReason reason);
5959

60+
/**
61+
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
62+
* processing was successful or not.
63+
* @param amount the amount of permits to release.
64+
*
65+
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
66+
* Implement {@link #release(int, ReleaseReason)} instead.
67+
*/
68+
@Deprecated
69+
default void release(int amount) {
70+
}
71+
6072
/**
6173
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
6274
* thus no activity is left in the {@link io.awspring.cloud.sqs.listener.source.MessageSource}.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java

Lines changed: 0 additions & 133 deletions
This file was deleted.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureLimiter.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,28 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler {
3030
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
3131
*/
3232
int requestBatch() throws InterruptedException;
33+
34+
/**
35+
* Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all
36+
* permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits
37+
* can be specified.
38+
*
39+
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
40+
* Implement {@link BackPressureHandler#release(int, ReleaseReason)} instead.
41+
*/
42+
@Deprecated
43+
default void releaseBatch() {
44+
}
45+
46+
/**
47+
* Return the configured batch size for this handler.
48+
* @return the batch size.
49+
*
50+
* @deprecated This method is deprecated and will not be used by the Spring Cloud AWS SQS listener anymore.
51+
*/
52+
@Deprecated
53+
default int getBatchSize() {
54+
return 0;
55+
}
56+
3357
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.micrometer.observation.ObservationRegistry;
2323
import java.time.Duration;
2424
import java.util.Collection;
25+
import java.util.function.Supplier;
2526
import org.springframework.core.task.TaskExecutor;
2627
import org.springframework.lang.Nullable;
2728
import org.springframework.retry.backoff.BackOffPolicy;
@@ -63,8 +64,6 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
6364
/**
6465
* {@return the amount of time to wait before checking again for the current limit when the queue processing is on
6566
* standby} Default is 100 milliseconds.
66-
*
67-
* @see BackPressureLimiter#limit()
6867
*/
6968
Duration getStandbyLimitPollingInterval();
7069

@@ -138,10 +137,10 @@ default BackOffPolicy getPollBackOffPolicy() {
138137
BackPressureMode getBackPressureMode();
139138

140139
/**
141-
* Return the {@link BackPressureLimiter} for this container.
142-
* @return the backpressure limiter.
140+
* Return the a {@link Supplier} to create a {@link BackPressureHandler} for this container.
141+
* @return the BackPressureHandler supplier.
143142
*/
144-
BackPressureLimiter getBackPressureLimiter();
143+
Supplier<BackPressureHandler> getBackPressureHandlerSupplier();
145144

146145
/**
147146
* Return the {@link ListenerMode} mode for this container.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import io.micrometer.observation.ObservationRegistry;
2222
import java.time.Duration;
23+
import java.util.function.Supplier;
2324
import org.springframework.core.task.TaskExecutor;
2425
import org.springframework.retry.backoff.BackOffPolicy;
2526

@@ -63,7 +64,6 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
6364
*
6465
* @param standbyLimitPollingInterval the limit polling interval when the queue processing is on standby.
6566
* @return this instance.
66-
* @see BackPressureLimiter#limit()
6767
*/
6868
B standbyLimitPollingInterval(Duration standbyLimitPollingInterval);
6969

@@ -157,12 +157,12 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
157157
B backPressureMode(BackPressureMode backPressureMode);
158158

159159
/**
160-
* Set the {@link BackPressureLimiter} for this container. Default is {@code null}.
160+
* Set the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null}.
161161
*
162-
* @param backPressureLimiter the backpressure limiter.
162+
* @param backPressureHandlerSupplier the BackPressureHandler supplier.
163163
* @return this instance.
164164
*/
165-
B backPressureLimiter(BackPressureLimiter backPressureLimiter);
165+
B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier);
166166

167167
/**
168168
* Set the maximum interval between acknowledgements for batch acknowledgements. The default depends on the specific

0 commit comments

Comments
 (0)