Skip to content

Commit 8e0cd40

Browse files
garyrussellartembilan
authored andcommitted
AMQP-810: Fix adjust consumers when max present
JIRA: https://jira.spring.io/browse/AMQP-810 SMLC: adjusting the `concurrentConsumers` did not consider `maxConcurrentConsumers`. - increase added consumers even if at max - decrease removed consumers when they had increased due to max being set Further, decreasing the `maxConcurrentConsumers` did not remove consumers if there were more consumers than the new max. - don't add consumers beyond the max - don't remove consumers unless the new max is exceeded **cherry-pick to 2.0.x, 1.7.x** There will be some minor conflicts in 1.7.x since the modified test is JUnit5. (cherry picked from commit daebf40) # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerLongTests.java
1 parent 7368f43 commit 8e0cd40

File tree

2 files changed

+148
-23
lines changed

2 files changed

+148
-23
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -307,19 +307,8 @@ public void setConcurrentConsumers(final int concurrentConsumers) {
307307
}
308308
int delta = this.concurrentConsumers - concurrentConsumers;
309309
this.concurrentConsumers = concurrentConsumers;
310-
if (isActive() && this.consumers != null) {
311-
if (delta > 0) {
312-
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
313-
while (consumerIterator.hasNext() && delta > 0) {
314-
BlockingQueueConsumer consumer = consumerIterator.next();
315-
consumer.basicCancel(true);
316-
consumerIterator.remove();
317-
delta--;
318-
}
319-
}
320-
else {
321-
addAndStartConsumers(-delta);
322-
}
310+
if (isActive()) {
311+
adjustConsumers(delta);
323312
}
324313
}
325314
}
@@ -339,7 +328,15 @@ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
339328
"'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
340329
Assert.isTrue(!this.exclusive || maxConcurrentConsumers == 1,
341330
"When the consumer is exclusive, the concurrency must be 1");
331+
Integer oldMax = this.maxConcurrentConsumers;
342332
this.maxConcurrentConsumers = maxConcurrentConsumers;
333+
if (oldMax != null && isActive()) {
334+
int delta = oldMax - maxConcurrentConsumers;
335+
if (delta > 0) { // only decrease, not increase
336+
adjustConsumers(delta);
337+
}
338+
}
339+
343340
}
344341

345342
/**
@@ -1026,10 +1023,45 @@ private void checkMismatchedQueues() {
10261023
}
10271024
}
10281025

1026+
/**
1027+
* Adjust consumers depending on delta.
1028+
* @param delta a negative value increases, positive decreases.
1029+
* @since 1.7.8
1030+
*/
1031+
protected void adjustConsumers(int delta) {
1032+
synchronized (this.consumersMonitor) {
1033+
if (isActive() && this.consumers != null) {
1034+
if (delta > 0) {
1035+
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
1036+
while (consumerIterator.hasNext() && delta > 0
1037+
&& (this.maxConcurrentConsumers == null
1038+
|| this.consumers.size() > this.maxConcurrentConsumers)) {
1039+
BlockingQueueConsumer consumer = consumerIterator.next();
1040+
consumer.basicCancel(true);
1041+
consumerIterator.remove();
1042+
delta--;
1043+
}
1044+
}
1045+
else {
1046+
addAndStartConsumers(-delta);
1047+
}
1048+
}
1049+
}
1050+
}
1051+
1052+
1053+
/**
1054+
* Start up to delta consumers, limited by {@link #setMaxConcurrentConsumers(int)}.
1055+
* @param delta the consumers to add.
1056+
*/
10291057
protected void addAndStartConsumers(int delta) {
10301058
synchronized (this.consumersMonitor) {
10311059
if (this.consumers != null) {
10321060
for (int i = 0; i < delta; i++) {
1061+
if (this.maxConcurrentConsumers != null
1062+
&& this.consumers.size() >= this.maxConcurrentConsumers) {
1063+
break;
1064+
}
10331065
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
10341066
this.consumers.add(consumer);
10351067
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerLongTests.java

Lines changed: 103 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,9 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19+
import static org.hamcrest.Matchers.equalTo;
1920
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertThat;
2022
import static org.junit.Assert.fail;
2123

2224
import java.util.Set;
@@ -27,7 +29,6 @@
2729
import org.junit.Rule;
2830
import org.junit.Test;
2931

30-
import org.springframework.amqp.core.Message;
3132
import org.springframework.amqp.core.MessageListener;
3233
import org.springframework.amqp.core.Queue;
3334
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
@@ -41,19 +42,28 @@
4142

4243
/**
4344
* @author Gary Russell
45+
* @author Artem Bilan
4446
*
4547
* @since 1.2.1
4648
*
4749
*/
4850
public class SimpleMessageListenerContainerLongTests {
4951

52+
private static final String QUEUE = "SimpleMessageListenerContainerLongTests.queue";
53+
54+
private static final String QUEUE2 = "SimpleMessageListenerContainerLongTests.queue2";
55+
56+
private static final String QUEUE3 = "SimpleMessageListenerContainerLongTests.queue3";
57+
58+
private static final String QUEUE4 = "SimpleMessageListenerContainerLongTests.queue4";
59+
5060
private final Log logger = LogFactory.getLog(SimpleMessageListenerContainerLongTests.class);
5161

5262
@Rule
5363
public LongRunningIntegrationTest longTest = new LongRunningIntegrationTest();
5464

5565
@Rule
56-
public BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo");
66+
public BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues(QUEUE, QUEUE2, QUEUE3, QUEUE4);
5767

5868
@After
5969
public void tearDown() {
@@ -75,7 +85,7 @@ private void testChangeConsumerCountGuts(boolean transacted) throws Exception {
7585
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
7686
try {
7787
container.setMessageListener(new MessageListenerAdapter(this));
78-
container.setQueueNames("foo");
88+
container.setQueueNames(QUEUE);
7989
container.setAutoStartup(false);
8090
container.setConcurrentConsumers(2);
8191
container.setChannelTransacted(transacted);
@@ -90,7 +100,7 @@ private void testChangeConsumerCountGuts(boolean transacted) throws Exception {
90100
for (int i = 0; i < 20; i++) {
91101
template.convertAndSend("foo", "foo");
92102
}
93-
waitForNConsumers(container, 2); // increased consumers due to work
103+
waitForNConsumers(container, 2); // increased consumers due to work
94104
waitForNConsumers(container, 1, 20000); // should stop the extra consumer after 10 seconds idle
95105
container.setConcurrentConsumers(3);
96106
waitForNConsumers(container, 3);
@@ -107,11 +117,7 @@ private void testChangeConsumerCountGuts(boolean transacted) throws Exception {
107117
public void testAddQueuesAndStartInCycle() throws Exception {
108118
final SingleConnectionFactory connectionFactory = new SingleConnectionFactory("localhost");
109119
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
110-
container.setMessageListener(new MessageListener() {
111-
112-
@Override
113-
public void onMessage(Message message) {
114-
}
120+
container.setMessageListener((MessageListener) message -> {
115121
});
116122
container.setConcurrentConsumers(2);
117123
container.afterPropertiesSet();
@@ -138,6 +144,93 @@ public void onMessage(Message message) {
138144
connectionFactory.destroy();
139145
}
140146

147+
@Test
148+
public void testIncreaseMinAtMax() throws Exception {
149+
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory("localhost");
150+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
151+
container.setStartConsumerMinInterval(100);
152+
container.setConsecutiveActiveTrigger(1);
153+
container.setMessageListener((MessageListener) m -> {
154+
try {
155+
Thread.sleep(50);
156+
}
157+
catch (InterruptedException e) {
158+
Thread.currentThread().interrupt();
159+
}
160+
});
161+
container.setQueueNames(QUEUE2);
162+
container.setConcurrentConsumers(2);
163+
container.setMaxConcurrentConsumers(5);
164+
container.afterPropertiesSet();
165+
container.start();
166+
RabbitTemplate template = new RabbitTemplate(singleConnectionFactory);
167+
for (int i = 0; i < 20; i++) {
168+
template.convertAndSend(QUEUE2, "foo");
169+
}
170+
waitForNConsumers(container, 5);
171+
container.setConcurrentConsumers(4);
172+
Set<?> consumers = (Set<?>) TestUtils.getPropertyValue(container, "consumers");
173+
assertThat(consumers.size(), equalTo(5));
174+
}
175+
176+
@Test
177+
public void testDecreaseMinAtMax() throws Exception {
178+
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory("localhost");
179+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
180+
container.setStartConsumerMinInterval(100);
181+
container.setConsecutiveActiveTrigger(1);
182+
container.setMessageListener((MessageListener) m -> {
183+
try {
184+
Thread.sleep(50);
185+
}
186+
catch (InterruptedException e) {
187+
Thread.currentThread().interrupt();
188+
}
189+
});
190+
container.setQueueNames(QUEUE3);
191+
container.setConcurrentConsumers(2);
192+
container.setMaxConcurrentConsumers(3);
193+
container.afterPropertiesSet();
194+
container.start();
195+
RabbitTemplate template = new RabbitTemplate(singleConnectionFactory);
196+
for (int i = 0; i < 20; i++) {
197+
template.convertAndSend(QUEUE3, "foo");
198+
}
199+
waitForNConsumers(container, 3);
200+
container.setConcurrentConsumers(1);
201+
Set<?> consumers = (Set<?>) TestUtils.getPropertyValue(container, "consumers");
202+
assertThat(consumers.size(), equalTo(3));
203+
}
204+
205+
@Test
206+
public void testDecreaseMaxAtMax() throws Exception {
207+
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory("localhost");
208+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
209+
container.setStartConsumerMinInterval(100);
210+
container.setConsecutiveActiveTrigger(1);
211+
container.setMessageListener((MessageListener) m -> {
212+
try {
213+
Thread.sleep(50);
214+
}
215+
catch (InterruptedException e) {
216+
Thread.currentThread().interrupt();
217+
}
218+
});
219+
container.setQueueNames(QUEUE4);
220+
container.setConcurrentConsumers(2);
221+
container.setMaxConcurrentConsumers(3);
222+
container.afterPropertiesSet();
223+
container.start();
224+
RabbitTemplate template = new RabbitTemplate(singleConnectionFactory);
225+
for (int i = 0; i < 20; i++) {
226+
template.convertAndSend(QUEUE4, "foo");
227+
}
228+
waitForNConsumers(container, 3);
229+
container.setConcurrentConsumers(1);
230+
container.setMaxConcurrentConsumers(1);
231+
Set<?> consumers = (Set<?>) TestUtils.getPropertyValue(container, "consumers");
232+
assertThat(consumers.size(), equalTo(1));
233+
}
141234

142235
public void handleMessage(String foo) {
143236
logger.info(foo);

0 commit comments

Comments
 (0)