Skip to content

Commit 287ce48

Browse files
fix: polling future removed on exception (#1455)
1 parent 7ad4834 commit 287ce48

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2022 the original author or authors.
2+
* Copyright 2013-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -319,7 +319,7 @@ private Collection<S> resetBackOffContext(Collection<S> messages) {
319319

320320
private <F> CompletableFuture<F> managePollingFuture(CompletableFuture<F> pollingFuture) {
321321
this.pollingFutures.add(pollingFuture);
322-
pollingFuture.thenRun(() -> this.pollingFutures.remove(pollingFuture));
322+
pollingFuture.whenComplete((result, throwable) -> this.pollingFutures.remove(pollingFuture));
323323
return pollingFuture;
324324
}
325325

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2023 the original author or authors.
2+
* Copyright 2013-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -418,6 +418,47 @@ else if (currentPoll.compareAndSet(2, 3)) {
418418
}
419419
}
420420

421+
@Test
422+
void shouldRemovePollingFutureOnException() throws InterruptedException {
423+
String testName = "shouldClearPollingFuturesOnException";
424+
425+
SemaphoreBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder()
426+
.acquireTimeout(Duration.ofMillis(100)).batchSize(10).totalPermits(10)
427+
.throughputConfiguration(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build();
428+
429+
AbstractPollingMessageSource<Object, Message> source = new AbstractPollingMessageSource<>() {
430+
@Override
431+
protected CompletableFuture<Collection<Message>> doPollForMessages(int messagesToRequest) {
432+
return CompletableFuture.failedFuture(new RuntimeException("Simulating a polling error"));
433+
}
434+
};
435+
436+
BackOffPolicy policy = mock(BackOffPolicy.class);
437+
BackOffContext ctx = mock(BackOffContext.class);
438+
given(policy.start(null)).willReturn(ctx);
439+
440+
source.setBackPressureHandler(backPressureHandler);
441+
source.setMessageSink((msgs, context) -> CompletableFuture.completedFuture(null));
442+
source.setId(testName + " source");
443+
source.setPollingEndpointName("test-queue");
444+
source.configure(SqsContainerOptions.builder().pollBackOffPolicy(policy).build());
445+
source.setTaskExecutor(createTaskExecutor(testName));
446+
source.setAcknowledgementProcessor(getNoOpsAcknowledgementProcessor());
447+
448+
@SuppressWarnings("unchecked")
449+
Collection<CompletableFuture<?>> futures = (Collection<CompletableFuture<?>>) ReflectionTestUtils
450+
.getField(source, "pollingFutures");
451+
// Verify that the pollingFutures collection is initially empty
452+
assertThat(futures).isEmpty();
453+
454+
source.start();
455+
456+
// Verify that the pollingFutures collection is empty after the exceptional completion
457+
assertThat(futures).isEmpty();
458+
459+
source.stop();
460+
}
461+
421462
private static boolean doAwait(CountDownLatch processingLatch) {
422463
try {
423464
return processingLatch.await(4, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)