Skip to content

Commit 7b038e2

Browse files
authored
GH-1 Implement support for retry for Multirabbit (#5)
1 parent 746838c commit 7b038e2

File tree

5 files changed

+156
-8
lines changed

5 files changed

+156
-8
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
<springboot.version>3.3.0</springboot.version>
4949
<javax-validation.version>2.0.1.Final</javax-validation.version>
50+
<awaitility.version>4.2.2</awaitility.version>
5051
<testcontainers-rabbitmq.version>1.17.1</testcontainers-rabbitmq.version>
5152
<java-jna.version>5.8.0</java-jna.version>
5253

spring-multirabbit/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@
5151
<artifactId>mockito-junit-jupiter</artifactId>
5252
<scope>test</scope>
5353
</dependency>
54+
<dependency>
55+
<groupId>org.awaitility</groupId>
56+
<artifactId>awaitility</artifactId>
57+
<version>${awaitility.version}</version>
58+
<scope>test</scope>
59+
</dependency>
5460
<dependency>
5561
<groupId>org.testcontainers</groupId>
5662
<artifactId>rabbitmq</artifactId>

spring-multirabbit/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfiguration.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ private MultiRabbitConnectionFactoryWrapper instantiateConnectionFactories(
220220
final ConnectionFactory connectionFactory = buildConnectionFactory(connDetails,
221221
multiRabbitConnectionFactoryCreator, resourceLoader, credentialsProvider,
222222
credentialsRefreshService, connectionNameStrategy, connectionFactoryCustomizer, sslBundles);
223-
final SimpleRabbitListenerContainerFactory containerFactory = newContainerFactory(connectionFactory);
223+
final SimpleRabbitListenerContainerFactory containerFactory
224+
= newContainerFactory(connectionFactory, propertiesMap.get(key));
224225
final RabbitAdmin rabbitAdmin = newRabbitAdmin(connectionFactory);
225226
wrapper.addConnectionFactory(key, connectionFactory, containerFactory, rabbitAdmin);
226227
}
@@ -277,9 +278,12 @@ private ConnectionFactory buildConnectionFactory(
277278
/**
278279
* Registers the ContainerFactory bean.
279280
*/
280-
private SimpleRabbitListenerContainerFactory newContainerFactory(final ConnectionFactory connectionFactory) {
281+
private SimpleRabbitListenerContainerFactory newContainerFactory(final ConnectionFactory connectionFactory,
282+
final RabbitProperties rabbitProperties) {
281283
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
282-
containerFactory.setConnectionFactory(connectionFactory);
284+
final SimpleRabbitListenerContainerFactoryConfigurer configurer
285+
= new SimpleRabbitListenerContainerFactoryConfigurer(rabbitProperties);
286+
configurer.configure(containerFactory, connectionFactory);
283287
return containerFactory;
284288
}
285289

spring-multirabbit/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryCreatorTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ class MultiRabbitConnectionFactoryCreatorTest {
7979
@Mock
8080
private RabbitConnectionDetails rabbitConnectionDetails;
8181

82-
@Mock
83-
private RabbitProperties secondaryRabbitProperties;
84-
8582
@Mock
8683
private MultiRabbitProperties multiRabbitProperties;
8784

@@ -266,7 +263,7 @@ void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithDefaultConnection()
266263
@Test
267264
void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithMultipleConnections() throws Exception {
268265
final MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
269-
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
266+
multiRabbitProperties.getConnections().put(DUMMY_KEY, new RabbitProperties());
270267
multiRabbitProperties.setDefaultConnection(DUMMY_KEY);
271268

272269
final RabbitConnectionFactoryBeanConfigurer rabbitConnectionFactoryBeanConfigurer
@@ -323,7 +320,7 @@ void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithMultipleConnections
323320
@Test
324321
void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithDefaultAndMultipleConnections() throws Exception {
325322
final MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
326-
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
323+
multiRabbitProperties.getConnections().put(DUMMY_KEY, new RabbitProperties());
327324

328325
final MultiRabbitConnectionFactoryCreatorMap multiRabbitConnectionFactoryCreatorMap
329326
= new MultiRabbitConnectionFactoryCreatorMap(Map.of(DUMMY_KEY, multiRabbitConnectionFactoryCreator));
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package org.springframework.boot.autoconfigure.amqp;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.time.temporal.ChronoUnit;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import org.assertj.core.data.TemporalUnitLessThanOffset;
9+
import org.junit.jupiter.api.AfterEach;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.DisplayName;
12+
import org.junit.jupiter.api.Test;
13+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
14+
import org.springframework.amqp.rabbit.annotation.Exchange;
15+
import org.springframework.amqp.rabbit.annotation.Queue;
16+
import org.springframework.amqp.rabbit.annotation.QueueBinding;
17+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
18+
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
19+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
20+
import org.springframework.boot.autoconfigure.AutoConfigurations;
21+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
22+
import org.springframework.stereotype.Component;
23+
import org.testcontainers.containers.RabbitMQContainer;
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
import static org.awaitility.Awaitility.await;
26+
27+
class MultiRabbitRetryTest {
28+
29+
private final RabbitMQContainer broker0 = new RabbitMQContainer("rabbitmq:3-management");
30+
private final RabbitMQContainer broker1 = new RabbitMQContainer("rabbitmq:3-management");
31+
32+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner().withConfiguration(
33+
AutoConfigurations.of(MultiRabbitAutoConfiguration.class, RabbitAutoConfiguration.class));
34+
35+
@BeforeEach
36+
void beforeEach() {
37+
this.broker0.start();
38+
this.broker1.start();
39+
}
40+
41+
@AfterEach
42+
void afterEach() {
43+
this.broker0.stop();
44+
this.broker1.stop();
45+
}
46+
47+
@Test()
48+
@DisplayName("should ensure retry logic for multirabbit listeners")
49+
void shouldEnsureRetryLogicForMultiRabbitConnections() {
50+
final Integer broker0Port = broker0.getMappedPort(5672);
51+
final long broker0RetryInterval = 10;
52+
final int broker0Attempts = 3;
53+
final Integer broker1Port = broker1.getMappedPort(5672);
54+
final long broker1RetryInterval = 50;
55+
final int broker1Attempts = 5;
56+
57+
final List<String> properties = List.of(
58+
"spring.rabbitmq.port=" + broker0Port,
59+
"spring.rabbitmq.listener.simple.retry.enabled=true",
60+
"spring.rabbitmq.listener.simple.retry.initial-interval=%d".formatted(broker0RetryInterval),
61+
"spring.rabbitmq.listener.simple.retry.max-attempts=%d".formatted(broker0Attempts),
62+
"spring.multirabbitmq.enabled=true",
63+
"spring.multirabbitmq.connections.%s.port=%d".formatted(TestListeners.BROKER_NAME_1, broker1Port),
64+
"spring.multirabbitmq.connections.%s.listener.simple.retry.enabled=true"
65+
.formatted(TestListeners.BROKER_NAME_1),
66+
"spring.multirabbitmq.connections.%s.listener.simple.retry.initial-interval=%d"
67+
.formatted(TestListeners.BROKER_NAME_1, broker1RetryInterval),
68+
"spring.multirabbitmq.connections.%s.listener.simple.retry.max-attempts=%d"
69+
.formatted(TestListeners.BROKER_NAME_1, broker1Attempts));
70+
71+
this.contextRunner.withPropertyValues(properties.toArray(new String[0]))
72+
.withBean(TestListeners.class)
73+
.run((context) -> {
74+
final RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
75+
rabbitTemplate.convertAndSend(TestListeners.EXCHANGE_0, TestListeners.ROUTING_KEY_0,
76+
"test-broker0");
77+
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), TestListeners.BROKER_NAME_1);
78+
try {
79+
rabbitTemplate.convertAndSend(TestListeners.EXCHANGE_1, TestListeners.ROUTING_KEY_1,
80+
"test-broker1");
81+
} finally {
82+
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
83+
}
84+
85+
Thread.sleep(Math.max(broker0RetryInterval, broker1RetryInterval));
86+
assertListenerRetryLogic(broker0RetryInterval, broker0Attempts,
87+
TestListeners.BROKER_0_RETRY_INSTANTS);
88+
assertListenerRetryLogic(broker1RetryInterval, broker1Attempts,
89+
TestListeners.BROKER_1_RETRY_INSTANTS);
90+
});
91+
}
92+
93+
private void assertListenerRetryLogic(final long retryInterval,
94+
final int attempts,
95+
final List<Instant> instants) {
96+
await().pollInterval(Duration.ofMillis(10))
97+
.atMost(Duration.ofMillis(2 * retryInterval * attempts))
98+
.until(() -> instants.size() == attempts);
99+
assertThat(instants).hasSize(attempts);
100+
for (int i = 0; i < instants.size() - 1; i++) {
101+
assertThat(instants.get(i)).isCloseTo(instants.get(i + 1),
102+
new TemporalUnitLessThanOffset(2 * retryInterval, ChronoUnit.MILLIS));
103+
}
104+
}
105+
106+
@Component
107+
@EnableRabbit
108+
private static class TestListeners {
109+
110+
public static final String EXCHANGE_0 = "exchange0";
111+
public static final String ROUTING_KEY_0 = "routingKey0";
112+
public static final String QUEUE_0 = "queue0";
113+
114+
public static final String BROKER_NAME_1 = "broker1";
115+
public static final String EXCHANGE_1 = "exchange1";
116+
public static final String ROUTING_KEY_1 = "routingKey1";
117+
public static final String QUEUE_1 = "queue1";
118+
119+
public static final List<Instant> BROKER_0_RETRY_INSTANTS = new ArrayList<>();
120+
public static final List<Instant> BROKER_1_RETRY_INSTANTS = new ArrayList<>();
121+
122+
@RabbitListener(bindings = @QueueBinding(
123+
exchange = @Exchange(EXCHANGE_0),
124+
value = @Queue(QUEUE_0),
125+
key = ROUTING_KEY_0))
126+
void listenBroker0(final String message) {
127+
BROKER_0_RETRY_INSTANTS.add(Instant.now());
128+
throw new RuntimeException("dummy-exception");
129+
}
130+
131+
@RabbitListener(containerFactory = BROKER_NAME_1, bindings = @QueueBinding(
132+
exchange = @Exchange(EXCHANGE_1),
133+
value = @Queue(QUEUE_1),
134+
key = ROUTING_KEY_1))
135+
void listenBroker1(final String message) {
136+
BROKER_1_RETRY_INSTANTS.add(Instant.now());
137+
throw new RuntimeException("dummy-exception");
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)