Skip to content

Commit 2c929ff

Browse files
garyrussellartembilan
authored andcommitted
GH-1296: Fix DMLC Recovery: Missing Queue at Start
Resolves #1296 - Add `MissingQueueEvent` - Fix detection of a deleted queue in recovery - previously incorrectly used the absense of the queue in `consumersByQueue`, which can be empty if missing during start - Add an index to `SimpleConsumer` - When adjusting consumer counts, look for gaps in the index sequence because reducing the consumer count can remove any idle consumer. - Change consumers to restart to a `Set` to avoid OOM when no broker (see #642) - Unconditionally add consumers to `consumersToRestart` **cherry-pick to 2.2.x, 2.1.x**
1 parent d5f81a6 commit 2c929ff

File tree

8 files changed

+227
-36
lines changed

8 files changed

+227
-36
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1757,6 +1757,13 @@ protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullabl
17571757
}
17581758
}
17591759

1760+
protected void publishMissingQueueEvent(String queue) {
1761+
if (this.applicationEventPublisher != null) {
1762+
this.applicationEventPublisher
1763+
.publishEvent(new MissingQueueEvent(this, queue));
1764+
}
1765+
}
1766+
17601767
protected final void publishIdleContainerEvent(long idleTime) {
17611768
if (this.applicationEventPublisher != null) {
17621769
this.applicationEventPublisher.publishEvent(

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -160,6 +160,8 @@ public class BlockingQueueConsumer {
160160

161161
private long consumeDelay;
162162

163+
private java.util.function.Consumer<String> missingQueuePublisher = str -> { };
164+
163165
private volatile long abortStarted;
164166

165167
private volatile boolean normalCancel;
@@ -374,6 +376,15 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
374376
this.applicationEventPublisher = applicationEventPublisher;
375377
}
376378

379+
/**
380+
* Set the publisher for a missing queue event.
381+
* @param missingQueuePublisher the publisher.
382+
* @since 2.1.18
383+
*/
384+
public void setMissingQueuePublisher(java.util.function.Consumer<String> missingQueuePublisher) {
385+
this.missingQueuePublisher = missingQueuePublisher;
386+
}
387+
377388
/**
378389
* Set the consumeDelay - a time to wait before consuming in ms. This is useful when
379390
* using the sharding plugin with {@code concurrency > 1}, to avoid uneven distribution of
@@ -714,6 +725,7 @@ private void attemptPassiveDeclarations() {
714725
if (logger.isWarnEnabled()) {
715726
logger.warn("Failed to declare queue: " + queueName);
716727
}
728+
this.missingQueuePublisher.accept(queueName);
717729
if (!this.channel.isOpen()) {
718730
throw new AmqpIOException(e);
719731
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 103 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,12 +24,14 @@
2424
import java.util.Collections;
2525
import java.util.Date;
2626
import java.util.Iterator;
27+
import java.util.LinkedHashSet;
2728
import java.util.LinkedList;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
3132
import java.util.Properties;
3233
import java.util.Set;
34+
import java.util.concurrent.ConcurrentHashMap;
3335
import java.util.concurrent.CountDownLatch;
3436
import java.util.concurrent.ScheduledFuture;
3537
import java.util.concurrent.TimeUnit;
@@ -103,7 +105,9 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta
103105

104106
protected final List<SimpleConsumer> consumers = new LinkedList<>(); // NOSONAR
105107

106-
private final List<SimpleConsumer> consumersToRestart = new LinkedList<>();
108+
private final Set<SimpleConsumer> consumersToRestart = new LinkedHashSet<>();
109+
110+
private final Set<String> removedQueues = ConcurrentHashMap.newKeySet();
107111

108112
private final MultiValueMap<String, SimpleConsumer> consumersByQueue = new LinkedMultiValueMap<>();
109113

@@ -243,6 +247,7 @@ public void addQueueNames(String... queueNames) {
243247
Assert.notNull(queueNames, "'queueNames' cannot be null");
244248
Assert.noNullElements(queueNames, "'queueNames' cannot contain null elements");
245249
try {
250+
Arrays.stream(queueNames).forEach(this.removedQueues::remove);
246251
addQueues(Arrays.stream(queueNames));
247252
}
248253
catch (AmqpIOException e) {
@@ -256,6 +261,9 @@ public void addQueues(Queue... queues) {
256261
Assert.notNull(queues, "'queues' cannot be null");
257262
Assert.noNullElements(queues, "'queues' cannot contain null elements");
258263
try {
264+
Arrays.stream(queues)
265+
.map(q -> q.getActualName())
266+
.forEach(this.removedQueues::remove);
259267
addQueues(Arrays.stream(queues).map(Queue::getName));
260268
}
261269
catch (AmqpIOException e) {
@@ -298,7 +306,10 @@ private void removeQueues(Stream<String> queueNames) {
298306
if (isRunning()) {
299307
synchronized (this.consumersMonitor) {
300308
checkStartState();
301-
queueNames.map(this.consumersByQueue::remove)
309+
queueNames.map(queue -> {
310+
this.removedQueues.add(queue);
311+
return this.consumersByQueue.remove(queue);
312+
})
302313
.filter(Objects::nonNull)
303314
.flatMap(Collection::stream)
304315
.forEach(this::cancelConsumer);
@@ -313,7 +324,21 @@ private void adjustConsumers(int newCount) {
313324
for (String queue : getQueueNames()) {
314325
while (this.consumersByQueue.get(queue) == null
315326
|| this.consumersByQueue.get(queue).size() < newCount) { // NOSONAR never null
316-
doConsumeFromQueue(queue);
327+
List<SimpleConsumer> cBQ = this.consumersByQueue.get(queue);
328+
int index = 0;
329+
if (cBQ != null) {
330+
// find a gap or set the index to the end
331+
List<Integer> indices = cBQ.stream()
332+
.map(cons -> cons.getIndex())
333+
.sorted()
334+
.collect(Collectors.toList());
335+
for (index = 0; index < indices.size(); index++) {
336+
if (index < indices.get(index)) {
337+
break;
338+
}
339+
}
340+
}
341+
doConsumeFromQueue(queue, index);
317342
}
318343
List<SimpleConsumer> consumerList = this.consumersByQueue.get(queue);
319344
if (consumerList != null && consumerList.size() > newCount) {
@@ -430,20 +455,19 @@ private void startMonitor(long idleEventInterval, final Map<String, Queue> names
430455
checkConsumers(now);
431456
if (this.lastRestartAttempt + getFailedDeclarationRetryInterval() < now) {
432457
synchronized (this.consumersMonitor) {
433-
List<SimpleConsumer> restartableConsumers = new ArrayList<>(this.consumersToRestart);
434-
this.consumersToRestart.clear();
435458
if (this.started) {
459+
List<SimpleConsumer> restartableConsumers = new ArrayList<>(this.consumersToRestart);
460+
this.consumersToRestart.clear();
436461
if (restartableConsumers.size() > 0) {
437462
doRedeclareElementsIfNecessary();
438463
}
439464
Iterator<SimpleConsumer> iterator = restartableConsumers.iterator();
440465
while (iterator.hasNext()) {
441466
SimpleConsumer consumer = iterator.next();
442467
iterator.remove();
443-
if (!DirectMessageListenerContainer.this.consumersByQueue
444-
.containsKey(consumer.getQueue())) {
468+
if (DirectMessageListenerContainer.this.removedQueues.contains(consumer.getQueue())) {
445469
if (this.logger.isDebugEnabled()) {
446-
this.logger.debug("Skipping restart of consumer " + consumer);
470+
this.logger.debug("Skipping restart of consumer, queue removed " + consumer);
447471
}
448472
continue;
449473
}
@@ -516,11 +540,11 @@ private boolean restartConsumer(final Map<String, Queue> namesToQueues, List<Sim
516540
if (StringUtils.hasText(actualName)) {
517541
namesToQueues.remove(consumer.getQueue());
518542
namesToQueues.put(actualName, queue);
519-
consumer = new SimpleConsumer(null, null, actualName);
543+
consumer = new SimpleConsumer(null, null, actualName, consumer.getIndex());
520544
}
521545
}
522546
try {
523-
doConsumeFromQueue(consumer.getQueue());
547+
doConsumeFromQueue(consumer.getQueue(), consumer.getIndex());
524548
return true;
525549
}
526550
catch (AmqpConnectException | AmqpIOException e) {
@@ -646,12 +670,12 @@ private void consumeFromQueue(String queue) {
646670
// Possible race with setConsumersPerQueue and the task launched by start()
647671
if (CollectionUtils.isEmpty(list)) {
648672
for (int i = 0; i < this.consumersPerQueue; i++) {
649-
doConsumeFromQueue(queue);
673+
doConsumeFromQueue(queue, i);
650674
}
651675
}
652676
}
653677

654-
private void doConsumeFromQueue(String queue) {
678+
private void doConsumeFromQueue(String queue, int index) {
655679
if (!isActive()) {
656680
if (this.logger.isDebugEnabled()) {
657681
this.logger.debug("Consume from queue " + queue + " ignore, container stopping");
@@ -668,7 +692,7 @@ private void doConsumeFromQueue(String queue) {
668692
}
669693
catch (Exception e) {
670694
publishConsumerFailedEvent(e.getMessage(), false, e);
671-
addConsumerToRestart(new SimpleConsumer(null, null, queue));
695+
addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
672696
throw e instanceof AmqpConnectException // NOSONAR exception type check
673697
? (AmqpConnectException) e
674698
: new AmqpConnectException(e);
@@ -678,7 +702,7 @@ private void doConsumeFromQueue(String queue) {
678702
SimpleResourceHolder.pop(getRoutingConnectionFactory()); // NOSONAR never null here
679703
}
680704
}
681-
SimpleConsumer consumer = consume(queue, connection);
705+
SimpleConsumer consumer = consume(queue, index, connection);
682706
synchronized (this.consumersMonitor) {
683707
if (consumer != null) {
684708
this.cancellationLock.add(consumer);
@@ -695,7 +719,7 @@ private void doConsumeFromQueue(String queue) {
695719
}
696720

697721
@Nullable
698-
private SimpleConsumer consume(String queue, Connection connection) {
722+
private SimpleConsumer consume(String queue, int index, Connection connection) {
699723
Channel channel = null;
700724
SimpleConsumer consumer = null;
701725
try {
@@ -709,7 +733,7 @@ private SimpleConsumer consume(String queue, Connection connection) {
709733
}
710734
channel = connection.createChannel(isChannelTransacted());
711735
channel.basicQos(getPrefetchCount());
712-
consumer = new SimpleConsumer(connection, channel, queue);
736+
consumer = new SimpleConsumer(connection, channel, queue, index);
713737
channel.queueDeclarePassive(queue);
714738
consumer.consumerTag = channel.basicConsume(queue, getAcknowledgeMode().isAutoAck(),
715739
(getConsumerTagStrategy() != null
@@ -723,13 +747,14 @@ private SimpleConsumer consume(String queue, Connection connection) {
723747
RabbitUtils.closeChannel(channel);
724748
RabbitUtils.closeConnection(connection);
725749

726-
consumer = handleConsumeException(queue, consumer, e);
750+
consumer = handleConsumeException(queue, index, consumer, e);
727751
}
728752
return consumer;
729753
}
730754

731755
@Nullable
732-
private SimpleConsumer handleConsumeException(String queue, @Nullable SimpleConsumer consumerArg, Exception e) {
756+
private SimpleConsumer handleConsumeException(String queue, int index, @Nullable SimpleConsumer consumerArg,
757+
Exception e) {
733758

734759
SimpleConsumer consumer = consumerArg;
735760
if (e.getCause() instanceof ShutdownSignalException
@@ -740,6 +765,7 @@ private SimpleConsumer handleConsumeException(String queue, @Nullable SimpleCons
740765
}
741766
else if (e.getCause() instanceof ShutdownSignalException
742767
&& RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) e.getCause())) {
768+
publishMissingQueueEvent(queue);
743769
this.logger.error("Queue not present, scheduling consumer "
744770
+ (consumer == null ? "for queue " + queue : consumer) + " for restart", e);
745771
}
@@ -749,7 +775,7 @@ else if (this.logger.isWarnEnabled()) {
749775
}
750776

751777
if (consumer == null) {
752-
addConsumerToRestart(new SimpleConsumer(null, null, queue));
778+
addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
753779
}
754780
else {
755781
addConsumerToRestart(consumer);
@@ -844,11 +870,9 @@ private void cancelConsumer(SimpleConsumer consumer) {
844870
}
845871

846872
private void addConsumerToRestart(SimpleConsumer consumer) {
847-
if (this.started) {
848-
this.consumersToRestart.add(consumer);
849-
if (this.logger.isTraceEnabled()) {
850-
this.logger.trace("Consumers to restart now: " + this.consumersToRestart);
851-
}
873+
this.consumersToRestart.add(consumer);
874+
if (this.logger.isTraceEnabled()) {
875+
this.logger.trace("Consumers to restart now: " + this.consumersToRestart);
852876
}
853877
}
854878

@@ -871,6 +895,8 @@ final class SimpleConsumer extends DefaultConsumer {
871895

872896
private final String queue;
873897

898+
private final int index;
899+
874900
private final boolean ackRequired;
875901

876902
private final ConnectionFactory connectionFactory = getConnectionFactory();
@@ -903,10 +929,11 @@ final class SimpleConsumer extends DefaultConsumer {
903929

904930
private volatile boolean ackFailed;
905931

906-
SimpleConsumer(@Nullable Connection connection, @Nullable Channel channel, String queue) {
932+
SimpleConsumer(@Nullable Connection connection, @Nullable Channel channel, String queue, int index) {
907933
super(channel);
908934
this.connection = connection;
909935
this.queue = queue;
936+
this.index = index;
910937
this.ackRequired = !getAcknowledgeMode().isAutoAck() && !getAcknowledgeMode().isManual();
911938
if (channel instanceof ChannelProxy) {
912939
this.targetChannel = ((ChannelProxy) channel).getTargetChannel();
@@ -916,10 +943,14 @@ final class SimpleConsumer extends DefaultConsumer {
916943
}
917944
}
918945

919-
private String getQueue() {
946+
String getQueue() {
920947
return this.queue;
921948
}
922949

950+
int getIndex() {
951+
return this.index;
952+
}
953+
923954
@Override
924955
public String getConsumerTag() {
925956
return this.consumerTag;
@@ -1220,9 +1251,53 @@ private void finalizeConsumer() {
12201251
consumerRemoved(this);
12211252
}
12221253

1254+
@Override
1255+
public int hashCode() {
1256+
final int prime = 31;
1257+
int result = 1;
1258+
result = prime * result + getEnclosingInstance().hashCode();
1259+
result = prime * result + this.index;
1260+
result = prime * result + ((this.queue == null) ? 0 : this.queue.hashCode());
1261+
return result;
1262+
}
1263+
1264+
@Override
1265+
public boolean equals(Object obj) {
1266+
if (this == obj) {
1267+
return true;
1268+
}
1269+
if (obj == null) {
1270+
return false;
1271+
}
1272+
if (getClass() != obj.getClass()) {
1273+
return false;
1274+
}
1275+
SimpleConsumer other = (SimpleConsumer) obj;
1276+
if (!getEnclosingInstance().equals(other.getEnclosingInstance())) {
1277+
return false;
1278+
}
1279+
if (this.index != other.index) {
1280+
return false;
1281+
}
1282+
if (this.queue == null) {
1283+
if (other.queue != null) {
1284+
return false;
1285+
}
1286+
}
1287+
else if (!this.queue.equals(other.queue)) {
1288+
return false;
1289+
}
1290+
return true;
1291+
}
1292+
1293+
private DirectMessageListenerContainer getEnclosingInstance() {
1294+
return DirectMessageListenerContainer.this;
1295+
}
1296+
12231297
@Override
12241298
public String toString() {
1225-
return "SimpleConsumer [queue=" + this.queue + ", consumerTag=" + this.consumerTag
1299+
return "SimpleConsumer [queue=" + this.queue + ", index=" + this.index
1300+
+ ", consumerTag=" + this.consumerTag
12261301
+ " identity=" + ObjectUtils.getIdentityHexString(this) + "]";
12271302
}
12281303

0 commit comments

Comments
 (0)