Skip to content

Commit 4e37bb3

Browse files
authored
Fix corePoolSize so that maximum number of messages (maxConcurrentMessages * number of queues) are processed simultaneously. (#833)
* Fix `corePoolSize` so that maximum number of messages (`maxConcurrentMessages` * number of queues) are processed simultaneously. * add test for maxConcurrentMessages * fix maxMessagesPerPoll * remove unused import * fix parameters for a common scenario * fix test * fix log level
1 parent 83811be commit 4e37bb3

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ protected TaskExecutor createTaskExecutor() {
241241
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
242242
int poolSize = getContainerOptions().getMaxConcurrentMessages() * this.messageSources.size();
243243
executor.setMaxPoolSize(poolSize);
244-
executor.setCorePoolSize(getContainerOptions().getMaxMessagesPerPoll());
244+
executor.setCorePoolSize(poolSize);
245245
// Necessary due to a small racing condition between releasing the permit and releasing the thread.
246246
executor.setQueueCapacity(poolSize);
247247
executor.setAllowCoreThreadTimeOut(true);

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static java.util.Collections.singletonMap;
1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2021

2122
import com.fasterxml.jackson.databind.ObjectMapper;
2223
import io.awspring.cloud.sqs.CompletableFutures;
@@ -53,8 +54,10 @@
5354
import java.util.Collections;
5455
import java.util.List;
5556
import java.util.UUID;
57+
import java.util.concurrent.BrokenBarrierException;
5658
import java.util.concurrent.CompletableFuture;
5759
import java.util.concurrent.CountDownLatch;
60+
import java.util.concurrent.CyclicBarrier;
5861
import java.util.concurrent.TimeUnit;
5962
import java.util.concurrent.atomic.AtomicBoolean;
6063
import java.util.stream.Collectors;
@@ -116,6 +119,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {
116119

117120
static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue";
118121

122+
static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue";
123+
119124
static final String LOW_RESOURCE_FACTORY = "lowResourceFactory";
120125

121126
static final String MANUAL_ACK_FACTORY = "manualAcknowledgementFactory";
@@ -139,7 +144,8 @@ static void beforeTests() {
139144
createQueue(client, RESOLVES_PARAMETER_TYPES_QUEUE_NAME,
140145
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "20")),
141146
createQueue(client, MANUALLY_CREATE_CONTAINER_QUEUE_NAME),
142-
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME)).join();
147+
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME),
148+
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME)).join();
143149
}
144150

145151
@Autowired
@@ -275,6 +281,20 @@ void manuallyCreatesFactory() throws Exception {
275281
assertThat(latchContainer.manuallyCreatedFactorySinkLatch.await(10, TimeUnit.SECONDS)).isTrue();
276282
}
277283

284+
@Test
285+
void maxConcurrentMessages() {
286+
List<Message<String>> messages1 = IntStream.range(0, 10)
287+
.mapToObj(index -> "maxConcurrentMessages-payload-" + index)
288+
.map(payload -> MessageBuilder.withPayload(payload).build()).collect(Collectors.toList());
289+
List<Message<String>> messages2 = IntStream.range(10, 20)
290+
.mapToObj(index -> "maxConcurrentMessages-payload-" + index)
291+
.map(payload -> MessageBuilder.withPayload(payload).build()).collect(Collectors.toList());
292+
sqsTemplate.sendManyAsync(MAX_CONCURRENT_MESSAGES_QUEUE_NAME, messages1);
293+
sqsTemplate.sendManyAsync(MAX_CONCURRENT_MESSAGES_QUEUE_NAME, messages2);
294+
logger.debug("Sent messages to queue {} with messages {} and {}", MAX_CONCURRENT_MESSAGES_QUEUE_NAME, messages1, messages2);
295+
assertDoesNotThrow(() -> latchContainer.maxConcurrentMessagesBarrier.await(10, TimeUnit.SECONDS));
296+
}
297+
278298
static class ReceivesMessageListener {
279299

280300
@Autowired
@@ -399,6 +419,18 @@ void listen(Message<String> message, MessageHeaders headers, Acknowledgement ack
399419
}
400420
}
401421

422+
static class MaxConcurrentMessagesListener {
423+
424+
@Autowired
425+
LatchContainer latchContainer;
426+
427+
@SqsListener(queueNames = MAX_CONCURRENT_MESSAGES_QUEUE_NAME, maxMessagesPerPoll = "10", maxConcurrentMessages = "20", id = "max-concurrent-messages")
428+
void listen(String message) throws BrokenBarrierException, InterruptedException {
429+
logger.debug("Received message in Listener Method: " + message);
430+
latchContainer.maxConcurrentMessagesBarrier.await();
431+
}
432+
}
433+
402434
static class LatchContainer {
403435

404436
final CountDownLatch receivesMessageLatch = new CountDownLatch(1);
@@ -421,6 +453,7 @@ static class LatchContainer {
421453
final CountDownLatch acknowledgementCallbackSuccessLatch = new CountDownLatch(1);
422454
final CountDownLatch acknowledgementCallbackBatchLatch = new CountDownLatch(1);
423455
final CountDownLatch acknowledgementCallbackErrorLatch = new CountDownLatch(1);
456+
final CyclicBarrier maxConcurrentMessagesBarrier = new CyclicBarrier(21);
424457

425458
}
426459

@@ -612,6 +645,11 @@ ResolvesParameterTypesListener resolvesParameterTypesListener() {
612645
return new ResolvesParameterTypesListener();
613646
}
614647

648+
@Bean
649+
MaxConcurrentMessagesListener maxConcurrentMessagesListener() {
650+
return new MaxConcurrentMessagesListener();
651+
}
652+
615653
@Bean
616654
SqsListenerConfigurer customizer() {
617655
return registrar -> {

0 commit comments

Comments
 (0)