Skip to content

Commit ce9c147

Browse files
garyrussellartembilan
authored andcommitted
AMQP-836: SMLC queuesChanged() via Queue
JIRA: https://jira.spring.io/browse/AMQP-836 Changing queues with add/remove with `Queue` parameters did not restart consumers. Changing the queues with queue names worked. **cherry-pick to 2.0.x** (cherry picked from commit 2480b17)
1 parent ae7b72d commit ce9c147

File tree

2 files changed

+59
-3
lines changed

2 files changed

+59
-3
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
4242
import org.springframework.amqp.core.AcknowledgeMode;
4343
import org.springframework.amqp.core.Message;
44+
import org.springframework.amqp.core.Queue;
4445
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
4546
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
4647
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
@@ -336,7 +337,7 @@ public void setMissingQueuesFatal(boolean missingQueuesFatal) {
336337
@Override
337338
public void setQueueNames(String... queueName) {
338339
super.setQueueNames(queueName);
339-
this.queuesChanged();
340+
queuesChanged();
340341
}
341342

342343
/**
@@ -349,7 +350,7 @@ public void setQueueNames(String... queueName) {
349350
@Override
350351
public void addQueueNames(String... queueName) {
351352
super.addQueueNames(queueName);
352-
this.queuesChanged();
353+
queuesChanged();
353354
}
354355

355356
/**
@@ -361,7 +362,37 @@ public void addQueueNames(String... queueName) {
361362
@Override
362363
public boolean removeQueueNames(String... queueName) {
363364
if (super.removeQueueNames(queueName)) {
364-
this.queuesChanged();
365+
queuesChanged();
366+
return true;
367+
}
368+
else {
369+
return false;
370+
}
371+
}
372+
373+
/**
374+
* Add queue(s) to this container's list of queues. The existing consumers
375+
* will be cancelled after they have processed any pre-fetched messages and
376+
* new consumers will be created. The queue must exist to avoid problems when
377+
* restarting the consumers.
378+
* @param queue The queue to add.
379+
*/
380+
@Override
381+
public void addQueues(Queue... queue) {
382+
super.addQueues(queue);
383+
queuesChanged();
384+
}
385+
386+
/**
387+
* Remove queues from this container's list of queues. The existing consumers
388+
* will be cancelled after they have processed any pre-fetched messages and
389+
* new consumers will be created. At least one queue must remain.
390+
* @param queue The queue to remove.
391+
*/
392+
@Override
393+
public boolean removeQueues(Queue... queue) {
394+
if (super.removeQueues(queue)) {
395+
queuesChanged();
365396
return true;
366397
}
367398
else {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,31 @@ public void testChangeQueues() throws Exception {
165165
assertNull(template.receiveAndConvert(queue1.getName()));
166166
}
167167

168+
@Test
169+
public void testChangeQueues2() throws Exception { // addQueues instead of addQueueNames
170+
CountDownLatch latch = new CountDownLatch(30);
171+
container = createContainer(new MessageListenerAdapter(new PojoListener(latch)), queue.getName(), queue1.getName());
172+
final CountDownLatch consumerLatch = new CountDownLatch(1);
173+
this.container.setApplicationEventPublisher(e -> {
174+
if (e instanceof AsyncConsumerStoppedEvent) {
175+
consumerLatch.countDown();
176+
}
177+
});
178+
for (int i = 0; i < 10; i++) {
179+
template.convertAndSend(queue.getName(), i + "foo");
180+
template.convertAndSend(queue1.getName(), i + "foo");
181+
}
182+
container.addQueues(queue1);
183+
assertTrue(consumerLatch.await(10, TimeUnit.SECONDS));
184+
for (int i = 0; i < 10; i++) {
185+
template.convertAndSend(queue.getName(), i + "foo");
186+
}
187+
boolean waited = latch.await(10, TimeUnit.SECONDS);
188+
assertTrue("Timed out waiting for message", waited);
189+
assertNull(template.receiveAndConvert(queue.getName()));
190+
assertNull(template.receiveAndConvert(queue1.getName()));
191+
}
192+
168193
@Test
169194
public void testNoQueues() throws Exception {
170195
CountDownLatch latch1 = new CountDownLatch(20);

0 commit comments

Comments
 (0)