Skip to content

Commit b2ed57d

Browse files
garyrussellartembilan
authored andcommitted
GH-2432: Fix Redeclaration of Declarables
Resolves #2432 When a container starts; it looks to see if the context contains a queue bean for any of its queues and requests the admin to redeclare the infrastructure. However, if the queue is declared within a `Declarables`, the check is not performed and therefore the queue is not declared. Add a check for `Declarables` beans. **cherry-pick to 2.4.x**
1 parent 4e9ec33 commit b2ed57d

File tree

2 files changed

+132
-5
lines changed

2 files changed

+132
-5
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,9 +21,9 @@
2121
import java.util.Collection;
2222
import java.util.HashMap;
2323
import java.util.HashSet;
24+
import java.util.LinkedHashSet;
2425
import java.util.List;
2526
import java.util.Map;
26-
import java.util.Map.Entry;
2727
import java.util.Properties;
2828
import java.util.Set;
2929
import java.util.concurrent.CopyOnWriteArrayList;
@@ -42,6 +42,7 @@
4242
import org.springframework.amqp.core.AcknowledgeMode;
4343
import org.springframework.amqp.core.AmqpAdmin;
4444
import org.springframework.amqp.core.BatchMessageListener;
45+
import org.springframework.amqp.core.Declarables;
4546
import org.springframework.amqp.core.Message;
4647
import org.springframework.amqp.core.MessageListener;
4748
import org.springframework.amqp.core.MessagePostProcessor;
@@ -1932,9 +1933,11 @@ private void attemptDeclarations(AmqpAdmin admin) {
19321933
ApplicationContext context = this.getApplicationContext();
19331934
if (context != null) {
19341935
Set<String> queueNames = getQueueNamesAsSet();
1935-
Map<String, Queue> queueBeans = context.getBeansOfType(Queue.class);
1936-
for (Entry<String, Queue> entry : queueBeans.entrySet()) {
1937-
Queue queue = entry.getValue();
1936+
Collection<Queue> queueBeans = new LinkedHashSet<>(
1937+
context.getBeansOfType(Queue.class, false, false).values());
1938+
Map<String, Declarables> declarables = context.getBeansOfType(Declarables.class, false, false);
1939+
declarables.values().forEach(dec -> queueBeans.addAll(dec.getDeclarablesByType(Queue.class)));
1940+
for (Queue queue : queueBeans) {
19381941
if (isMismatchedQueuesFatal() || (queueNames.contains(queue.getName()) &&
19391942
admin.getQueueProperties(queue.getName()) == null)) {
19401943
if (logger.isDebugEnabled()) {
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyBoolean;
22+
import static org.mockito.ArgumentMatchers.anyInt;
23+
import static org.mockito.ArgumentMatchers.anyMap;
24+
import static org.mockito.ArgumentMatchers.anyString;
25+
import static org.mockito.BDDMockito.given;
26+
import static org.mockito.BDDMockito.willAnswer;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.verify;
29+
30+
import java.io.IOException;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
38+
import org.junit.jupiter.api.Test;
39+
import org.mockito.Mockito;
40+
41+
import org.springframework.amqp.core.AmqpAdmin;
42+
import org.springframework.amqp.core.Declarables;
43+
import org.springframework.amqp.core.Queue;
44+
import org.springframework.amqp.rabbit.connection.ChannelProxy;
45+
import org.springframework.amqp.rabbit.connection.Connection;
46+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
47+
import org.springframework.context.ApplicationContext;
48+
49+
import com.rabbitmq.client.AMQP;
50+
import com.rabbitmq.client.Channel;
51+
import com.rabbitmq.client.Consumer;
52+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
53+
54+
/**
55+
* @author Gary Russell
56+
* @since 2.4.12
57+
*
58+
*/
59+
public class QueueDeclarationTests {
60+
61+
@Test
62+
void redeclareWhenQueue() throws IOException, InterruptedException {
63+
AmqpAdmin admin = mock(AmqpAdmin.class);
64+
ApplicationContext context = mock(ApplicationContext.class);
65+
final CountDownLatch latch = new CountDownLatch(1);
66+
SimpleMessageListenerContainer container = createContainer(admin, latch);
67+
given(context.getBeansOfType(Queue.class, false, false)).willReturn(Map.of("foo", new Queue("test")));
68+
given(context.getBeansOfType(Declarables.class, false, false)).willReturn(new HashMap<>());
69+
container.setApplicationContext(context);
70+
container.start();
71+
72+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
73+
verify(admin).initialize();
74+
container.stop();
75+
}
76+
77+
@Test
78+
void redeclareWhenDeclarables() throws IOException, InterruptedException {
79+
AmqpAdmin admin = mock(AmqpAdmin.class);
80+
ApplicationContext context = mock(ApplicationContext.class);
81+
final CountDownLatch latch = new CountDownLatch(1);
82+
SimpleMessageListenerContainer container = createContainer(admin, latch);
83+
given(context.getBeansOfType(Queue.class, false, false)).willReturn(new HashMap<>());
84+
given(context.getBeansOfType(Declarables.class, false, false))
85+
.willReturn(Map.of("foo", new Declarables(List.of(new Queue("test")))));
86+
container.setApplicationContext(context);
87+
container.start();
88+
89+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
90+
verify(admin).initialize();
91+
container.stop();
92+
}
93+
94+
private SimpleMessageListenerContainer createContainer(AmqpAdmin admin, final CountDownLatch latch)
95+
throws IOException {
96+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
97+
Connection connection = mock(Connection.class);
98+
ChannelProxy channel = mock(ChannelProxy.class);
99+
Channel rabbitChannel = mock(AutorecoveringChannel.class);
100+
given(channel.getTargetChannel()).willReturn(rabbitChannel);
101+
102+
given(connectionFactory.createConnection()).willReturn(connection);
103+
given(connection.createChannel(anyBoolean())).willReturn(channel);
104+
final AtomicBoolean isOpen = new AtomicBoolean(true);
105+
willAnswer(i -> isOpen.get()).given(channel).isOpen();
106+
given(channel.queueDeclarePassive(Mockito.anyString()))
107+
.willAnswer(invocation -> mock(AMQP.Queue.DeclareOk.class));
108+
given(channel.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(),
109+
anyMap(), any(Consumer.class))).willReturn("consumerTag");
110+
111+
willAnswer(i -> {
112+
latch.countDown();
113+
return null;
114+
}).given(channel).basicQos(anyInt(), anyBoolean());
115+
116+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
117+
container.setQueueNames("test");
118+
container.setPrefetchCount(2);
119+
container.setAmqpAdmin(admin);
120+
container.afterPropertiesSet();
121+
return container;
122+
}
123+
124+
}

0 commit comments

Comments
 (0)