Skip to content

Commit 69ec278

Browse files
garyrussellartembilan
authored andcommitted
GH-1296: Fix DMLC Recovery: Missing Queue at Start
Resolves #1296 - Add `MissingQueueEvent` - Fix detection of a deleted queue in recovery - previously incorrectly used the absense of the queue in `consumersByQueue`, which can be empty if missing during start - Add an index to `SimpleConsumer` - When adjusting consumer counts, look for gaps in the index sequence because reducing the consumer count can remove any idle consumer. - Change consumers to restart to a `Set` to avoid OOM when no broker (see #642) - Unconditionally add consumers to `consumersToRestart` **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java
1 parent a9f7830 commit 69ec278

File tree

8 files changed

+229
-37
lines changed

8 files changed

+229
-37
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1651,6 +1651,13 @@ protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullabl
16511651
}
16521652
}
16531653

1654+
protected void publishMissingQueueEvent(String queue) {
1655+
if (this.applicationEventPublisher != null) {
1656+
this.applicationEventPublisher
1657+
.publishEvent(new MissingQueueEvent(this, queue));
1658+
}
1659+
}
1660+
16541661
protected final void publishIdleContainerEvent(long idleTime) {
16551662
if (this.applicationEventPublisher != null) {
16561663
this.applicationEventPublisher.publishEvent(

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -156,6 +156,8 @@ public class BlockingQueueConsumer {
156156

157157
private ApplicationEventPublisher applicationEventPublisher;
158158

159+
private java.util.function.Consumer<String> missingQueuePublisher = str -> { };
160+
159161
private volatile long abortStarted;
160162

161163
private volatile boolean normalCancel;
@@ -370,6 +372,15 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
370372
this.applicationEventPublisher = applicationEventPublisher;
371373
}
372374

375+
/**
376+
* Set the publisher for a missing queue event.
377+
* @param missingQueuePublisher the publisher.
378+
* @since 2.1.18
379+
*/
380+
public void setMissingQueuePublisher(java.util.function.Consumer<String> missingQueuePublisher) {
381+
this.missingQueuePublisher = missingQueuePublisher;
382+
}
383+
373384
/**
374385
* Clear the delivery tags when rolling back with an external transaction
375386
* manager.
@@ -691,6 +702,7 @@ private void attemptPassiveDeclarations() {
691702
if (logger.isWarnEnabled()) {
692703
logger.warn("Failed to declare queue: " + queueName);
693704
}
705+
this.missingQueuePublisher.accept(queueName);
694706
if (!this.channel.isOpen()) {
695707
throw new AmqpIOException(e);
696708
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 103 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,12 +24,14 @@
2424
import java.util.Collections;
2525
import java.util.Date;
2626
import java.util.Iterator;
27+
import java.util.LinkedHashSet;
2728
import java.util.LinkedList;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
3132
import java.util.Properties;
3233
import java.util.Set;
34+
import java.util.concurrent.ConcurrentHashMap;
3335
import java.util.concurrent.CountDownLatch;
3436
import java.util.concurrent.ScheduledFuture;
3537
import java.util.concurrent.TimeUnit;
@@ -101,7 +103,9 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta
101103

102104
protected final List<SimpleConsumer> consumers = new LinkedList<>(); // NOSONAR
103105

104-
private final List<SimpleConsumer> consumersToRestart = new LinkedList<>();
106+
private final Set<SimpleConsumer> consumersToRestart = new LinkedHashSet<>();
107+
108+
private final Set<String> removedQueues = ConcurrentHashMap.newKeySet();
105109

106110
private final MultiValueMap<String, SimpleConsumer> consumersByQueue = new LinkedMultiValueMap<>();
107111

@@ -241,6 +245,7 @@ public void addQueueNames(String... queueNames) {
241245
Assert.notNull(queueNames, "'queueNames' cannot be null");
242246
Assert.noNullElements(queueNames, "'queueNames' cannot contain null elements");
243247
try {
248+
Arrays.stream(queueNames).forEach(this.removedQueues::remove);
244249
addQueues(Arrays.stream(queueNames));
245250
}
246251
catch (AmqpIOException e) {
@@ -254,6 +259,9 @@ public void addQueues(Queue... queues) {
254259
Assert.notNull(queues, "'queues' cannot be null");
255260
Assert.noNullElements(queues, "'queues' cannot contain null elements");
256261
try {
262+
Arrays.stream(queues)
263+
.map(q -> q.getActualName())
264+
.forEach(this.removedQueues::remove);
257265
addQueues(Arrays.stream(queues).map(Queue::getName));
258266
}
259267
catch (AmqpIOException e) {
@@ -296,7 +304,10 @@ private void removeQueues(Stream<String> queueNames) {
296304
if (isRunning()) {
297305
synchronized (this.consumersMonitor) {
298306
checkStartState();
299-
queueNames.map(this.consumersByQueue::remove)
307+
queueNames.map(queue -> {
308+
this.removedQueues.add(queue);
309+
return this.consumersByQueue.remove(queue);
310+
})
300311
.filter(Objects::nonNull)
301312
.flatMap(Collection::stream)
302313
.forEach(this::cancelConsumer);
@@ -311,7 +322,21 @@ private void adjustConsumers(int newCount) {
311322
for (String queue : getQueueNames()) {
312323
while (this.consumersByQueue.get(queue) == null
313324
|| this.consumersByQueue.get(queue).size() < newCount) { // NOSONAR never null
314-
doConsumeFromQueue(queue);
325+
List<SimpleConsumer> cBQ = this.consumersByQueue.get(queue);
326+
int index = 0;
327+
if (cBQ != null) {
328+
// find a gap or set the index to the end
329+
List<Integer> indices = cBQ.stream()
330+
.map(cons -> cons.getIndex())
331+
.sorted()
332+
.collect(Collectors.toList());
333+
for (index = 0; index < indices.size(); index++) {
334+
if (index < indices.get(index)) {
335+
break;
336+
}
337+
}
338+
}
339+
doConsumeFromQueue(queue, index);
315340
}
316341
List<SimpleConsumer> consumerList = this.consumersByQueue.get(queue);
317342
if (consumerList != null && consumerList.size() > newCount) {
@@ -428,20 +453,19 @@ private void startMonitor(long idleEventInterval, final Map<String, Queue> names
428453
checkConsumers(now);
429454
if (this.lastRestartAttempt + getFailedDeclarationRetryInterval() < now) {
430455
synchronized (this.consumersMonitor) {
431-
List<SimpleConsumer> restartableConsumers = new ArrayList<>(this.consumersToRestart);
432-
this.consumersToRestart.clear();
433456
if (this.started) {
457+
List<SimpleConsumer> restartableConsumers = new ArrayList<>(this.consumersToRestart);
458+
this.consumersToRestart.clear();
434459
if (restartableConsumers.size() > 0) {
435460
doRedeclareElementsIfNecessary();
436461
}
437462
Iterator<SimpleConsumer> iterator = restartableConsumers.iterator();
438463
while (iterator.hasNext()) {
439464
SimpleConsumer consumer = iterator.next();
440465
iterator.remove();
441-
if (!DirectMessageListenerContainer.this.consumersByQueue
442-
.containsKey(consumer.getQueue())) {
466+
if (DirectMessageListenerContainer.this.removedQueues.contains(consumer.getQueue())) {
443467
if (this.logger.isDebugEnabled()) {
444-
this.logger.debug("Skipping restart of consumer " + consumer);
468+
this.logger.debug("Skipping restart of consumer, queue removed " + consumer);
445469
}
446470
continue;
447471
}
@@ -514,11 +538,11 @@ private boolean restartConsumer(final Map<String, Queue> namesToQueues, List<Sim
514538
if (StringUtils.hasText(actualName)) {
515539
namesToQueues.remove(consumer.getQueue());
516540
namesToQueues.put(actualName, queue);
517-
consumer = new SimpleConsumer(null, null, actualName);
541+
consumer = new SimpleConsumer(null, null, actualName, consumer.getIndex());
518542
}
519543
}
520544
try {
521-
doConsumeFromQueue(consumer.getQueue());
545+
doConsumeFromQueue(consumer.getQueue(), consumer.getIndex());
522546
return true;
523547
}
524548
catch (AmqpConnectException | AmqpIOException e) {
@@ -644,12 +668,12 @@ private void consumeFromQueue(String queue) {
644668
// Possible race with setConsumersPerQueue and the task launched by start()
645669
if (CollectionUtils.isEmpty(list)) {
646670
for (int i = 0; i < this.consumersPerQueue; i++) {
647-
doConsumeFromQueue(queue);
671+
doConsumeFromQueue(queue, i);
648672
}
649673
}
650674
}
651675

652-
private void doConsumeFromQueue(String queue) {
676+
private void doConsumeFromQueue(String queue, int index) {
653677
if (!isActive()) {
654678
if (this.logger.isDebugEnabled()) {
655679
this.logger.debug("Consume from queue " + queue + " ignore, container stopping");
@@ -666,7 +690,7 @@ private void doConsumeFromQueue(String queue) {
666690
}
667691
catch (Exception e) {
668692
publishConsumerFailedEvent(e.getMessage(), false, e);
669-
addConsumerToRestart(new SimpleConsumer(null, null, queue));
693+
addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
670694
throw e instanceof AmqpConnectException // NOSONAR exception type check
671695
? (AmqpConnectException) e
672696
: new AmqpConnectException(e);
@@ -676,7 +700,7 @@ private void doConsumeFromQueue(String queue) {
676700
SimpleResourceHolder.pop(getRoutingConnectionFactory()); // NOSONAR never null here
677701
}
678702
}
679-
SimpleConsumer consumer = consume(queue, connection);
703+
SimpleConsumer consumer = consume(queue, index, connection);
680704
synchronized (this.consumersMonitor) {
681705
if (consumer != null) {
682706
this.cancellationLock.add(consumer);
@@ -693,13 +717,13 @@ private void doConsumeFromQueue(String queue) {
693717
}
694718

695719
@Nullable
696-
private SimpleConsumer consume(String queue, Connection connection) {
720+
private SimpleConsumer consume(String queue, int index, Connection connection) {
697721
Channel channel = null;
698722
SimpleConsumer consumer = null;
699723
try {
700724
channel = connection.createChannel(isChannelTransacted());
701725
channel.basicQos(getPrefetchCount());
702-
consumer = new SimpleConsumer(connection, channel, queue);
726+
consumer = new SimpleConsumer(connection, channel, queue, index);
703727
channel.queueDeclarePassive(queue);
704728
consumer.consumerTag = channel.basicConsume(queue, getAcknowledgeMode().isAutoAck(),
705729
(getConsumerTagStrategy() != null
@@ -713,13 +737,14 @@ private SimpleConsumer consume(String queue, Connection connection) {
713737
RabbitUtils.closeChannel(channel);
714738
RabbitUtils.closeConnection(connection);
715739

716-
consumer = handleConsumeException(queue, consumer, e);
740+
consumer = handleConsumeException(queue, index, consumer, e);
717741
}
718742
return consumer;
719743
}
720744

721745
@Nullable
722-
private SimpleConsumer handleConsumeException(String queue, SimpleConsumer consumerArg, Exception e) {
746+
private SimpleConsumer handleConsumeException(String queue, int index, @Nullable SimpleConsumer consumerArg,
747+
Exception e) {
723748

724749
SimpleConsumer consumer = consumerArg;
725750
if (e.getCause() instanceof ShutdownSignalException
@@ -730,6 +755,7 @@ private SimpleConsumer handleConsumeException(String queue, SimpleConsumer consu
730755
}
731756
else if (e.getCause() instanceof ShutdownSignalException
732757
&& RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) e.getCause())) {
758+
publishMissingQueueEvent(queue);
733759
this.logger.error("Queue not present, scheduling consumer "
734760
+ (consumer == null ? "for queue " + queue : consumer) + " for restart", e);
735761
}
@@ -739,7 +765,7 @@ else if (this.logger.isWarnEnabled()) {
739765
}
740766

741767
if (consumer == null) {
742-
addConsumerToRestart(new SimpleConsumer(null, null, queue));
768+
addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
743769
}
744770
else {
745771
addConsumerToRestart(consumer);
@@ -833,11 +859,9 @@ private void cancelConsumer(SimpleConsumer consumer) {
833859
}
834860

835861
private void addConsumerToRestart(SimpleConsumer consumer) {
836-
if (this.started) {
837-
this.consumersToRestart.add(consumer);
838-
if (this.logger.isTraceEnabled()) {
839-
this.logger.trace("Consumers to restart now: " + this.consumersToRestart);
840-
}
862+
this.consumersToRestart.add(consumer);
863+
if (this.logger.isTraceEnabled()) {
864+
this.logger.trace("Consumers to restart now: " + this.consumersToRestart);
841865
}
842866
}
843867

@@ -860,6 +884,8 @@ final class SimpleConsumer extends DefaultConsumer {
860884

861885
private final String queue;
862886

887+
private final int index;
888+
863889
private final boolean ackRequired;
864890

865891
private final ConnectionFactory connectionFactory = getConnectionFactory();
@@ -894,10 +920,11 @@ final class SimpleConsumer extends DefaultConsumer {
894920

895921
private volatile boolean ackFailed;
896922

897-
SimpleConsumer(Connection connection, Channel channel, String queue) {
923+
SimpleConsumer(@Nullable Connection connection, @Nullable Channel channel, String queue, int index) {
898924
super(channel);
899925
this.connection = connection;
900926
this.queue = queue;
927+
this.index = index;
901928
this.ackRequired = !getAcknowledgeMode().isAutoAck() && !getAcknowledgeMode().isManual();
902929
if (channel instanceof ChannelProxy) {
903930
this.targetChannel = ((ChannelProxy) channel).getTargetChannel();
@@ -907,10 +934,14 @@ final class SimpleConsumer extends DefaultConsumer {
907934
}
908935
}
909936

910-
private String getQueue() {
937+
String getQueue() {
911938
return this.queue;
912939
}
913940

941+
int getIndex() {
942+
return this.index;
943+
}
944+
914945
@Override
915946
public String getConsumerTag() {
916947
return this.consumerTag;
@@ -1203,9 +1234,53 @@ private void finalizeConsumer() {
12031234
consumerRemoved(this);
12041235
}
12051236

1237+
@Override
1238+
public int hashCode() {
1239+
final int prime = 31;
1240+
int result = 1;
1241+
result = prime * result + getEnclosingInstance().hashCode();
1242+
result = prime * result + this.index;
1243+
result = prime * result + ((this.queue == null) ? 0 : this.queue.hashCode());
1244+
return result;
1245+
}
1246+
1247+
@Override
1248+
public boolean equals(Object obj) {
1249+
if (this == obj) {
1250+
return true;
1251+
}
1252+
if (obj == null) {
1253+
return false;
1254+
}
1255+
if (getClass() != obj.getClass()) {
1256+
return false;
1257+
}
1258+
SimpleConsumer other = (SimpleConsumer) obj;
1259+
if (!getEnclosingInstance().equals(other.getEnclosingInstance())) {
1260+
return false;
1261+
}
1262+
if (this.index != other.index) {
1263+
return false;
1264+
}
1265+
if (this.queue == null) {
1266+
if (other.queue != null) {
1267+
return false;
1268+
}
1269+
}
1270+
else if (!this.queue.equals(other.queue)) {
1271+
return false;
1272+
}
1273+
return true;
1274+
}
1275+
1276+
private DirectMessageListenerContainer getEnclosingInstance() {
1277+
return DirectMessageListenerContainer.this;
1278+
}
1279+
12061280
@Override
12071281
public String toString() {
1208-
return "SimpleConsumer [queue=" + this.queue + ", consumerTag=" + this.consumerTag
1282+
return "SimpleConsumer [queue=" + this.queue + ", index=" + this.index
1283+
+ ", consumerTag=" + this.consumerTag
12091284
+ " identity=" + ObjectUtils.getIdentityHexString(this) + "]";
12101285
}
12111286

0 commit comments

Comments
 (0)