11/*
2- * Copyright 2016-2020 the original author or authors.
2+ * Copyright 2016-2021 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
2424import java .util .Collections ;
2525import java .util .Date ;
2626import java .util .Iterator ;
27+ import java .util .LinkedHashSet ;
2728import java .util .LinkedList ;
2829import java .util .List ;
2930import java .util .Map ;
3031import java .util .Objects ;
3132import java .util .Properties ;
3233import java .util .Set ;
34+ import java .util .concurrent .ConcurrentHashMap ;
3335import java .util .concurrent .CountDownLatch ;
3436import java .util .concurrent .ScheduledFuture ;
3537import java .util .concurrent .TimeUnit ;
@@ -103,7 +105,9 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta
103105
104106 protected final List <SimpleConsumer > consumers = new LinkedList <>(); // NOSONAR
105107
106- private final List <SimpleConsumer > consumersToRestart = new LinkedList <>();
108+ private final Set <SimpleConsumer > consumersToRestart = new LinkedHashSet <>();
109+
110+ private final Set <String > removedQueues = ConcurrentHashMap .newKeySet ();
107111
108112 private final MultiValueMap <String , SimpleConsumer > consumersByQueue = new LinkedMultiValueMap <>();
109113
@@ -243,6 +247,7 @@ public void addQueueNames(String... queueNames) {
243247 Assert .notNull (queueNames , "'queueNames' cannot be null" );
244248 Assert .noNullElements (queueNames , "'queueNames' cannot contain null elements" );
245249 try {
250+ Arrays .stream (queueNames ).forEach (this .removedQueues ::remove );
246251 addQueues (Arrays .stream (queueNames ));
247252 }
248253 catch (AmqpIOException e ) {
@@ -256,6 +261,9 @@ public void addQueues(Queue... queues) {
256261 Assert .notNull (queues , "'queues' cannot be null" );
257262 Assert .noNullElements (queues , "'queues' cannot contain null elements" );
258263 try {
264+ Arrays .stream (queues )
265+ .map (q -> q .getActualName ())
266+ .forEach (this .removedQueues ::remove );
259267 addQueues (Arrays .stream (queues ).map (Queue ::getName ));
260268 }
261269 catch (AmqpIOException e ) {
@@ -298,7 +306,10 @@ private void removeQueues(Stream<String> queueNames) {
298306 if (isRunning ()) {
299307 synchronized (this .consumersMonitor ) {
300308 checkStartState ();
301- queueNames .map (this .consumersByQueue ::remove )
309+ queueNames .map (queue -> {
310+ this .removedQueues .add (queue );
311+ return this .consumersByQueue .remove (queue );
312+ })
302313 .filter (Objects ::nonNull )
303314 .flatMap (Collection ::stream )
304315 .forEach (this ::cancelConsumer );
@@ -313,7 +324,21 @@ private void adjustConsumers(int newCount) {
313324 for (String queue : getQueueNames ()) {
314325 while (this .consumersByQueue .get (queue ) == null
315326 || this .consumersByQueue .get (queue ).size () < newCount ) { // NOSONAR never null
316- doConsumeFromQueue (queue );
327+ List <SimpleConsumer > cBQ = this .consumersByQueue .get (queue );
328+ int index = 0 ;
329+ if (cBQ != null ) {
330+ // find a gap or set the index to the end
331+ List <Integer > indices = cBQ .stream ()
332+ .map (cons -> cons .getIndex ())
333+ .sorted ()
334+ .collect (Collectors .toList ());
335+ for (index = 0 ; index < indices .size (); index ++) {
336+ if (index < indices .get (index )) {
337+ break ;
338+ }
339+ }
340+ }
341+ doConsumeFromQueue (queue , index );
317342 }
318343 List <SimpleConsumer > consumerList = this .consumersByQueue .get (queue );
319344 if (consumerList != null && consumerList .size () > newCount ) {
@@ -430,20 +455,19 @@ private void startMonitor(long idleEventInterval, final Map<String, Queue> names
430455 checkConsumers (now );
431456 if (this .lastRestartAttempt + getFailedDeclarationRetryInterval () < now ) {
432457 synchronized (this .consumersMonitor ) {
433- List <SimpleConsumer > restartableConsumers = new ArrayList <>(this .consumersToRestart );
434- this .consumersToRestart .clear ();
435458 if (this .started ) {
459+ List <SimpleConsumer > restartableConsumers = new ArrayList <>(this .consumersToRestart );
460+ this .consumersToRestart .clear ();
436461 if (restartableConsumers .size () > 0 ) {
437462 doRedeclareElementsIfNecessary ();
438463 }
439464 Iterator <SimpleConsumer > iterator = restartableConsumers .iterator ();
440465 while (iterator .hasNext ()) {
441466 SimpleConsumer consumer = iterator .next ();
442467 iterator .remove ();
443- if (!DirectMessageListenerContainer .this .consumersByQueue
444- .containsKey (consumer .getQueue ())) {
468+ if (DirectMessageListenerContainer .this .removedQueues .contains (consumer .getQueue ())) {
445469 if (this .logger .isDebugEnabled ()) {
446- this .logger .debug ("Skipping restart of consumer " + consumer );
470+ this .logger .debug ("Skipping restart of consumer, queue removed " + consumer );
447471 }
448472 continue ;
449473 }
@@ -516,11 +540,11 @@ private boolean restartConsumer(final Map<String, Queue> namesToQueues, List<Sim
516540 if (StringUtils .hasText (actualName )) {
517541 namesToQueues .remove (consumer .getQueue ());
518542 namesToQueues .put (actualName , queue );
519- consumer = new SimpleConsumer (null , null , actualName );
543+ consumer = new SimpleConsumer (null , null , actualName , consumer . getIndex () );
520544 }
521545 }
522546 try {
523- doConsumeFromQueue (consumer .getQueue ());
547+ doConsumeFromQueue (consumer .getQueue (), consumer . getIndex () );
524548 return true ;
525549 }
526550 catch (AmqpConnectException | AmqpIOException e ) {
@@ -646,12 +670,12 @@ private void consumeFromQueue(String queue) {
646670 // Possible race with setConsumersPerQueue and the task launched by start()
647671 if (CollectionUtils .isEmpty (list )) {
648672 for (int i = 0 ; i < this .consumersPerQueue ; i ++) {
649- doConsumeFromQueue (queue );
673+ doConsumeFromQueue (queue , i );
650674 }
651675 }
652676 }
653677
654- private void doConsumeFromQueue (String queue ) {
678+ private void doConsumeFromQueue (String queue , int index ) {
655679 if (!isActive ()) {
656680 if (this .logger .isDebugEnabled ()) {
657681 this .logger .debug ("Consume from queue " + queue + " ignore, container stopping" );
@@ -668,7 +692,7 @@ private void doConsumeFromQueue(String queue) {
668692 }
669693 catch (Exception e ) {
670694 publishConsumerFailedEvent (e .getMessage (), false , e );
671- addConsumerToRestart (new SimpleConsumer (null , null , queue ));
695+ addConsumerToRestart (new SimpleConsumer (null , null , queue , index ));
672696 throw e instanceof AmqpConnectException // NOSONAR exception type check
673697 ? (AmqpConnectException ) e
674698 : new AmqpConnectException (e );
@@ -678,7 +702,7 @@ private void doConsumeFromQueue(String queue) {
678702 SimpleResourceHolder .pop (getRoutingConnectionFactory ()); // NOSONAR never null here
679703 }
680704 }
681- SimpleConsumer consumer = consume (queue , connection );
705+ SimpleConsumer consumer = consume (queue , index , connection );
682706 synchronized (this .consumersMonitor ) {
683707 if (consumer != null ) {
684708 this .cancellationLock .add (consumer );
@@ -695,13 +719,13 @@ private void doConsumeFromQueue(String queue) {
695719 }
696720
697721 @ Nullable
698- private SimpleConsumer consume (String queue , Connection connection ) {
722+ private SimpleConsumer consume (String queue , int index , Connection connection ) {
699723 Channel channel = null ;
700724 SimpleConsumer consumer = null ;
701725 try {
702726 channel = connection .createChannel (isChannelTransacted ());
703727 channel .basicQos (getPrefetchCount ());
704- consumer = new SimpleConsumer (connection , channel , queue );
728+ consumer = new SimpleConsumer (connection , channel , queue , index );
705729 channel .queueDeclarePassive (queue );
706730 consumer .consumerTag = channel .basicConsume (queue , getAcknowledgeMode ().isAutoAck (),
707731 (getConsumerTagStrategy () != null
@@ -715,13 +739,14 @@ private SimpleConsumer consume(String queue, Connection connection) {
715739 RabbitUtils .closeChannel (channel );
716740 RabbitUtils .closeConnection (connection );
717741
718- consumer = handleConsumeException (queue , consumer , e );
742+ consumer = handleConsumeException (queue , index , consumer , e );
719743 }
720744 return consumer ;
721745 }
722746
723747 @ Nullable
724- private SimpleConsumer handleConsumeException (String queue , @ Nullable SimpleConsumer consumerArg , Exception e ) {
748+ private SimpleConsumer handleConsumeException (String queue , int index , @ Nullable SimpleConsumer consumerArg ,
749+ Exception e ) {
725750
726751 SimpleConsumer consumer = consumerArg ;
727752 if (e .getCause () instanceof ShutdownSignalException
@@ -732,6 +757,7 @@ private SimpleConsumer handleConsumeException(String queue, @Nullable SimpleCons
732757 }
733758 else if (e .getCause () instanceof ShutdownSignalException
734759 && RabbitUtils .isPassiveDeclarationChannelClose ((ShutdownSignalException ) e .getCause ())) {
760+ publishMissingQueueEvent (queue );
735761 this .logger .error ("Queue not present, scheduling consumer "
736762 + (consumer == null ? "for queue " + queue : consumer ) + " for restart" , e );
737763 }
@@ -741,7 +767,7 @@ else if (this.logger.isWarnEnabled()) {
741767 }
742768
743769 if (consumer == null ) {
744- addConsumerToRestart (new SimpleConsumer (null , null , queue ));
770+ addConsumerToRestart (new SimpleConsumer (null , null , queue , index ));
745771 }
746772 else {
747773 addConsumerToRestart (consumer );
@@ -836,11 +862,9 @@ private void cancelConsumer(SimpleConsumer consumer) {
836862 }
837863
838864 private void addConsumerToRestart (SimpleConsumer consumer ) {
839- if (this .started ) {
840- this .consumersToRestart .add (consumer );
841- if (this .logger .isTraceEnabled ()) {
842- this .logger .trace ("Consumers to restart now: " + this .consumersToRestart );
843- }
865+ this .consumersToRestart .add (consumer );
866+ if (this .logger .isTraceEnabled ()) {
867+ this .logger .trace ("Consumers to restart now: " + this .consumersToRestart );
844868 }
845869 }
846870
@@ -863,6 +887,8 @@ final class SimpleConsumer extends DefaultConsumer {
863887
864888 private final String queue ;
865889
890+ private final int index ;
891+
866892 private final boolean ackRequired ;
867893
868894 private final ConnectionFactory connectionFactory = getConnectionFactory ();
@@ -895,10 +921,11 @@ final class SimpleConsumer extends DefaultConsumer {
895921
896922 private volatile boolean ackFailed ;
897923
898- SimpleConsumer (@ Nullable Connection connection , @ Nullable Channel channel , String queue ) {
924+ SimpleConsumer (@ Nullable Connection connection , @ Nullable Channel channel , String queue , int index ) {
899925 super (channel );
900926 this .connection = connection ;
901927 this .queue = queue ;
928+ this .index = index ;
902929 this .ackRequired = !getAcknowledgeMode ().isAutoAck () && !getAcknowledgeMode ().isManual ();
903930 if (channel instanceof ChannelProxy ) {
904931 this .targetChannel = ((ChannelProxy ) channel ).getTargetChannel ();
@@ -908,10 +935,14 @@ final class SimpleConsumer extends DefaultConsumer {
908935 }
909936 }
910937
911- private String getQueue () {
938+ String getQueue () {
912939 return this .queue ;
913940 }
914941
942+ int getIndex () {
943+ return this .index ;
944+ }
945+
915946 @ Override
916947 public String getConsumerTag () {
917948 return this .consumerTag ;
@@ -1212,9 +1243,53 @@ private void finalizeConsumer() {
12121243 consumerRemoved (this );
12131244 }
12141245
1246+ @ Override
1247+ public int hashCode () {
1248+ final int prime = 31 ;
1249+ int result = 1 ;
1250+ result = prime * result + getEnclosingInstance ().hashCode ();
1251+ result = prime * result + this .index ;
1252+ result = prime * result + ((this .queue == null ) ? 0 : this .queue .hashCode ());
1253+ return result ;
1254+ }
1255+
1256+ @ Override
1257+ public boolean equals (Object obj ) {
1258+ if (this == obj ) {
1259+ return true ;
1260+ }
1261+ if (obj == null ) {
1262+ return false ;
1263+ }
1264+ if (getClass () != obj .getClass ()) {
1265+ return false ;
1266+ }
1267+ SimpleConsumer other = (SimpleConsumer ) obj ;
1268+ if (!getEnclosingInstance ().equals (other .getEnclosingInstance ())) {
1269+ return false ;
1270+ }
1271+ if (this .index != other .index ) {
1272+ return false ;
1273+ }
1274+ if (this .queue == null ) {
1275+ if (other .queue != null ) {
1276+ return false ;
1277+ }
1278+ }
1279+ else if (!this .queue .equals (other .queue )) {
1280+ return false ;
1281+ }
1282+ return true ;
1283+ }
1284+
1285+ private DirectMessageListenerContainer getEnclosingInstance () {
1286+ return DirectMessageListenerContainer .this ;
1287+ }
1288+
12151289 @ Override
12161290 public String toString () {
1217- return "SimpleConsumer [queue=" + this .queue + ", consumerTag=" + this .consumerTag
1291+ return "SimpleConsumer [queue=" + this .queue + ", index=" + this .index
1292+ + ", consumerTag=" + this .consumerTag
12181293 + " identity=" + ObjectUtils .getIdentityHexString (this ) + "]" ;
12191294 }
12201295
0 commit comments