Skip to content

Commit 7b475d5

Browse files
committed
Fix SimpleMessageListenerContainerLongTests fails
https://build.spring.io/browse/PLATFORM-COM2-JOB1-605 * A couple tests doesn't send to the proper queue for the consecutive consumers start logic to be triggered properly * The `testIncreaseMinAtMax()` used "too long" `startConsumerMinInterval` therefore existing consumers were able to consumer all the messages
1 parent 8d21242 commit 7b475d5

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerLongTests.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,18 +98,18 @@ private void testChangeConsumerCountGuts(boolean transacted) throws Exception {
9898
container.setMaxConcurrentConsumers(3);
9999
RabbitTemplate template = new RabbitTemplate(singleConnectionFactory);
100100
for (int i = 0; i < 20; i++) {
101-
template.convertAndSend("foo", "foo");
101+
template.convertAndSend(QUEUE, "foo");
102102
}
103103
waitForNConsumers(container, 2); // increased consumers due to work
104104
waitForNConsumers(container, 1, 20000); // should stop the extra consumer after 10 seconds idle
105105
container.setConcurrentConsumers(3);
106106
waitForNConsumers(container, 3);
107107
container.stop();
108108
waitForNConsumers(container, 0);
109-
singleConnectionFactory.destroy();
110109
}
111110
finally {
112111
container.stop();
112+
singleConnectionFactory.destroy();
113113
}
114114
}
115115

@@ -141,14 +141,15 @@ public void testAddQueuesAndStartInCycle() throws Exception {
141141
for (int i = 0; i < 20; i++) {
142142
admin.deleteQueue("testAddQueuesAndStartInCycle" + i);
143143
}
144+
144145
connectionFactory.destroy();
145146
}
146147

147148
@Test
148149
public void testIncreaseMinAtMax() throws Exception {
149150
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory("localhost");
150151
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
151-
container.setStartConsumerMinInterval(100);
152+
container.setStartConsumerMinInterval(1);
152153
container.setConsecutiveActiveTrigger(1);
153154
container.setMessageListener((MessageListener) m -> {
154155
try {
@@ -171,6 +172,9 @@ public void testIncreaseMinAtMax() throws Exception {
171172
container.setConcurrentConsumers(4);
172173
Set<?> consumers = (Set<?>) TestUtils.getPropertyValue(container, "consumers");
173174
assertThat(consumers.size(), equalTo(5));
175+
176+
container.stop();
177+
singleConnectionFactory.destroy();
174178
}
175179

176180
@Test
@@ -200,6 +204,9 @@ public void testDecreaseMinAtMax() throws Exception {
200204
container.setConcurrentConsumers(1);
201205
Set<?> consumers = (Set<?>) TestUtils.getPropertyValue(container, "consumers");
202206
assertThat(consumers.size(), equalTo(3));
207+
208+
container.stop();
209+
singleConnectionFactory.destroy();
203210
}
204211

205212
@Test
@@ -230,6 +237,9 @@ public void testDecreaseMaxAtMax() throws Exception {
230237
container.setMaxConcurrentConsumers(1);
231238
Set<?> consumers = (Set<?>) TestUtils.getPropertyValue(container, "consumers");
232239
assertThat(consumers.size(), equalTo(1));
240+
241+
container.stop();
242+
singleConnectionFactory.destroy();
233243
}
234244

235245
public void handleMessage(String foo) {
@@ -240,7 +250,9 @@ private void waitForNConsumers(SimpleMessageListenerContainer container, int n)
240250
this.waitForNConsumers(container, n, 10000);
241251
}
242252

243-
private void waitForNConsumers(SimpleMessageListenerContainer container, int n, int howLong) throws InterruptedException {
253+
private void waitForNConsumers(SimpleMessageListenerContainer container, int n, int howLong)
254+
throws InterruptedException {
255+
244256
int i = 0;
245257
while (true) {
246258
Set<?> consumers = (Set<?>) TestUtils.getPropertyValue(container, "consumers");

0 commit comments

Comments
 (0)