Skip to content

Commit 156a8bc

Browse files
committed
GH-1 Fix retry logic for Multirabbit
1 parent 3dec17a commit 156a8bc

File tree

5 files changed

+169
-18
lines changed

5 files changed

+169
-18
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
<springboot.version>3.0.0</springboot.version>
4949
<javax-validation.version>2.0.1.Final</javax-validation.version>
50+
<awaitility.version>4.2.2</awaitility.version>
5051
<testcontainers-rabbitmq.version>1.17.1</testcontainers-rabbitmq.version>
5152
<java-jna.version>5.8.0</java-jna.version>
5253

spring-multirabbit/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@
5151
<artifactId>mockito-junit-jupiter</artifactId>
5252
<scope>test</scope>
5353
</dependency>
54+
<dependency>
55+
<groupId>org.awaitility</groupId>
56+
<artifactId>awaitility</artifactId>
57+
<version>${awaitility.version}</version>
58+
<scope>test</scope>
59+
</dependency>
5460
<dependency>
5561
<groupId>org.testcontainers</groupId>
5662
<artifactId>rabbitmq</artifactId>

spring-multirabbit/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfiguration.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public class MultiRabbitAutoConfiguration {
5353
private static final Logger LOGGER = LoggerFactory.getLogger(MultiRabbitAutoConfiguration.class);
5454

5555
/**
56-
* Returns a {@link RabbitConnectionFactoryCreator}.
56+
* Returns a {@link RabbitConnectionFactoryCreator} to fulfill the default configuration.
5757
*
58-
* @return a {@link RabbitConnectionFactoryCreator}.
58+
* @return a {@link RabbitConnectionFactoryCreator} to fulfill the default configuration.
5959
*/
6060
@Primary
6161
@Bean(MultiRabbitConstants.CONNECTION_FACTORY_CREATOR_BEAN_NAME)
@@ -103,6 +103,7 @@ public ConnectionFactoryContextWrapper contextWrapper(final ConnectionFactory co
103103
* @return the empty wrapper if non is provided.
104104
*/
105105
@Bean
106+
// TODO github.com/rwanderc/spring-multirabbit/issues/2
106107
@ConditionalOnMissingBean
107108
public MultiRabbitConnectionFactoryWrapper externalEmptyWrapper() {
108109
return new MultiRabbitConnectionFactoryWrapper();
@@ -201,7 +202,8 @@ private MultiRabbitConnectionFactoryWrapper instantiateConnectionFactories(
201202
= springFactoryCreator.rabbitConnectionFactory(rabbitConnectionFactoryBeanConfigurer,
202203
rabbitCachingConnectionFactoryConfigurer,
203204
connectionFactoryCustomizer);
204-
final SimpleRabbitListenerContainerFactory containerFactory = newContainerFactory(connectionFactory);
205+
final SimpleRabbitListenerContainerFactory containerFactory
206+
= newContainerFactory(connectionFactory, entry.getValue());
205207
final RabbitAdmin rabbitAdmin = newRabbitAdmin(connectionFactory);
206208
wrapper.addConnectionFactory(entry.getKey(), connectionFactory, containerFactory, rabbitAdmin);
207209
}
@@ -238,9 +240,12 @@ private MultiRabbitConnectionFactoryWrapper instantiateConnectionFactories(
238240
/**
239241
* Registers the ContainerFactory bean.
240242
*/
241-
private SimpleRabbitListenerContainerFactory newContainerFactory(final ConnectionFactory connectionFactory) {
243+
private SimpleRabbitListenerContainerFactory newContainerFactory(final ConnectionFactory connectionFactory,
244+
final RabbitProperties rabbitProperties) {
242245
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
243-
containerFactory.setConnectionFactory(connectionFactory);
246+
final SimpleRabbitListenerContainerFactoryConfigurer configurer
247+
= new SimpleRabbitListenerContainerFactoryConfigurer(rabbitProperties);
248+
configurer.configure(containerFactory, connectionFactory);
244249
return containerFactory;
245250
}
246251

spring-multirabbit/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryCreatorTest.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ class MultiRabbitConnectionFactoryCreatorTest {
7272
@Mock
7373
private RabbitProperties rabbitProperties;
7474

75-
@Mock
76-
private RabbitProperties secondaryRabbitProperties;
77-
7875
@Mock
7976
private MultiRabbitProperties multiRabbitProperties;
8077

@@ -234,6 +231,10 @@ void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithDefaultConnection()
234231
@Test
235232
void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithMultipleConnections() throws Exception {
236233
final MultiRabbitConnectionFactoryWrapper externalWrapper = new MultiRabbitConnectionFactoryWrapper();
234+
final RabbitProperties secondaryRabbitProperties = new RabbitProperties();
235+
final MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
236+
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
237+
multiRabbitProperties.setDefaultConnection(DUMMY_KEY);
237238

238239
final RabbitConnectionFactoryBeanConfigurer rabbitConnectionFactoryBeanConfigurer = springFactoryCreator
239240
.rabbitConnectionFactoryBeanConfigurer(rabbitProperties, resourceLoader, credentialsProvider,
@@ -246,10 +247,6 @@ void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithMultipleConnections
246247
eq(connectionFactoryCustomizer)))
247248
.thenReturn(new CachingConnectionFactory());
248249

249-
MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
250-
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
251-
multiRabbitProperties.setDefaultConnection(DUMMY_KEY);
252-
253250
creator().routingConnectionFactory(null, multiRabbitProperties, externalWrapper, resourceLoader,
254251
credentialsProvider, credentialsRefreshService, connectionNameStrategy, connectionFactoryCustomizer);
255252

@@ -262,6 +259,10 @@ void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithMultipleConnections
262259

263260
@Test
264261
void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithDefaultAndMultipleConnections() throws Exception {
262+
final RabbitProperties secondaryRabbitProperties = new RabbitProperties();
263+
final MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
264+
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
265+
265266
final MultiRabbitConnectionFactoryWrapper externalWrapper = new MultiRabbitConnectionFactoryWrapper();
266267
externalWrapper.setDefaultConnectionFactory(connectionFactory0);
267268

@@ -278,9 +279,6 @@ void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithDefaultAndMultipleC
278279
connectionFactoryCustomizer))
279280
.thenReturn(new CachingConnectionFactory());
280281

281-
MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
282-
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
283-
284282
creator().routingConnectionFactory(rabbitProperties, multiRabbitProperties, externalWrapper, resourceLoader,
285283
credentialsProvider, credentialsRefreshService, connectionNameStrategy, connectionFactoryCustomizer);
286284

@@ -299,15 +297,16 @@ void shouldInstantiateMultiRabbitConnectionFactoryWrapperWithDefaultAndMultipleC
299297
@Test
300298
void shouldEncapsulateExceptionWhenFailingToCreateBean() throws Exception {
301299
final MultiRabbitConnectionFactoryWrapper externalWrapper = new MultiRabbitConnectionFactoryWrapper();
300+
final RabbitProperties secondaryRabbitProperties = new RabbitProperties();
301+
final MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
302+
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
303+
302304
when(springFactoryCreator.rabbitConnectionFactory(
303305
eq(rabbitConnectionFactoryBeanConfigurer),
304306
eq(rabbitCachingConnectionFactoryConfigurer),
305307
eq(connectionFactoryCustomizer)))
306308
.thenThrow(new Exception("mocked-exception"));
307309

308-
MultiRabbitProperties multiRabbitProperties = new MultiRabbitProperties();
309-
multiRabbitProperties.getConnections().put(DUMMY_KEY, secondaryRabbitProperties);
310-
311310
final Executable executable = () -> creator().routingConnectionFactory(rabbitProperties,
312311
multiRabbitProperties,
313312
externalWrapper, resourceLoader, credentialsProvider, credentialsRefreshService,
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package org.springframework.boot.autoconfigure.amqp;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.time.temporal.ChronoUnit;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import org.assertj.core.data.TemporalUnitLessThanOffset;
9+
import org.junit.jupiter.api.AfterEach;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.DisplayName;
12+
import org.junit.jupiter.api.Test;
13+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
14+
import org.springframework.amqp.rabbit.annotation.Exchange;
15+
import org.springframework.amqp.rabbit.annotation.Queue;
16+
import org.springframework.amqp.rabbit.annotation.QueueBinding;
17+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
18+
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
19+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
20+
import org.springframework.boot.autoconfigure.AutoConfigurations;
21+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
22+
import org.springframework.stereotype.Component;
23+
import org.testcontainers.containers.RabbitMQContainer;
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
import static org.awaitility.Awaitility.await;
26+
27+
class MultiRabbitRetryTest {
28+
29+
private final RabbitMQContainer broker0 = new RabbitMQContainer("rabbitmq:3-management");
30+
private final RabbitMQContainer broker1 = new RabbitMQContainer("rabbitmq:3-management");
31+
32+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner().withConfiguration(
33+
AutoConfigurations.of(MultiRabbitAutoConfiguration.class, RabbitAutoConfiguration.class));
34+
35+
@BeforeEach
36+
void beforeEach() {
37+
this.broker0.start();
38+
this.broker1.start();
39+
}
40+
41+
@AfterEach
42+
void afterEach() {
43+
this.broker0.stop();
44+
this.broker1.stop();
45+
}
46+
47+
@Test()
48+
@DisplayName("should ensure retry logic for multirabbit listeners")
49+
void shouldEnsureRetryLogicForMultiRabbitConnections() {
50+
final Integer broker0Port = broker0.getMappedPort(5672);
51+
final long broker0RetryInterval = 10;
52+
final int broker0Attempts = 3;
53+
final Integer broker1Port = broker1.getMappedPort(5672);
54+
final long broker1RetryInterval = 50;
55+
final int broker1Attempts = 5;
56+
57+
final List<String> properties = List.of(
58+
"spring.rabbitmq.port=" + broker0Port,
59+
"spring.rabbitmq.listener.simple.retry.enabled=true",
60+
"spring.rabbitmq.listener.simple.retry.initial-interval=%d".formatted(broker0RetryInterval),
61+
"spring.rabbitmq.listener.simple.retry.max-attempts=%d".formatted(broker0Attempts),
62+
"spring.multirabbitmq.enabled=true",
63+
"spring.multirabbitmq.connections.%s.port=%d".formatted(TestListeners.BROKER_NAME_1, broker1Port),
64+
"spring.multirabbitmq.connections.%s.listener.simple.retry.enabled=true"
65+
.formatted(TestListeners.BROKER_NAME_1),
66+
"spring.multirabbitmq.connections.%s.listener.simple.retry.initial-interval=%d"
67+
.formatted(TestListeners.BROKER_NAME_1, broker1RetryInterval),
68+
"spring.multirabbitmq.connections.%s.listener.simple.retry.max-attempts=%d"
69+
.formatted(TestListeners.BROKER_NAME_1, broker1Attempts));
70+
71+
this.contextRunner.withPropertyValues(properties.toArray(new String[0]))
72+
.withBean(TestListeners.class)
73+
.run((context) -> {
74+
final RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
75+
rabbitTemplate.convertAndSend(TestListeners.EXCHANGE_0, TestListeners.ROUTING_KEY_0,
76+
"test-broker0");
77+
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), TestListeners.BROKER_NAME_1);
78+
try {
79+
rabbitTemplate.convertAndSend(TestListeners.EXCHANGE_1, TestListeners.ROUTING_KEY_1,
80+
"test-broker1");
81+
} finally {
82+
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
83+
}
84+
85+
Thread.sleep(Math.max(broker0RetryInterval, broker1RetryInterval));
86+
assertListenerRetryLogic(broker0RetryInterval, broker0Attempts,
87+
TestListeners.BROKER_0_RETRY_INSTANTS);
88+
assertListenerRetryLogic(broker1RetryInterval, broker1Attempts,
89+
TestListeners.BROKER_1_RETRY_INSTANTS);
90+
});
91+
}
92+
93+
private void assertListenerRetryLogic(final long retryInterval,
94+
final int attempts,
95+
final List<Instant> instants) {
96+
await().pollInterval(Duration.ofMillis(10))
97+
.atMost(Duration.ofMillis(2 * retryInterval * attempts))
98+
.until(() -> instants.size() == attempts);
99+
assertThat(instants).hasSize(attempts);
100+
for (int i = 0; i < instants.size() - 1; i++) {
101+
assertThat(instants.get(i)).isCloseTo(instants.get(i + 1),
102+
new TemporalUnitLessThanOffset(2 * retryInterval, ChronoUnit.MILLIS));
103+
}
104+
}
105+
106+
@Component
107+
@EnableRabbit
108+
private static class TestListeners {
109+
110+
public static final String EXCHANGE_0 = "exchange0";
111+
public static final String ROUTING_KEY_0 = "routingKey0";
112+
public static final String QUEUE_0 = "queue0";
113+
114+
public static final String BROKER_NAME_1 = "broker1";
115+
public static final String EXCHANGE_1 = "exchange1";
116+
public static final String ROUTING_KEY_1 = "routingKey1";
117+
public static final String QUEUE_1 = "queue1";
118+
119+
public static final List<Instant> BROKER_0_RETRY_INSTANTS = new ArrayList<>();
120+
public static final List<Instant> BROKER_1_RETRY_INSTANTS = new ArrayList<>();
121+
122+
@RabbitListener(bindings = @QueueBinding(
123+
exchange = @Exchange(EXCHANGE_0),
124+
value = @Queue(QUEUE_0),
125+
key = ROUTING_KEY_0))
126+
void listenBroker0(final String message) {
127+
BROKER_0_RETRY_INSTANTS.add(Instant.now());
128+
throw new RuntimeException("dummy-exception");
129+
}
130+
131+
@RabbitListener(containerFactory = BROKER_NAME_1, bindings = @QueueBinding(
132+
exchange = @Exchange(EXCHANGE_1),
133+
value = @Queue(QUEUE_1),
134+
key = ROUTING_KEY_1))
135+
void listenBroker1(final String message) {
136+
BROKER_1_RETRY_INSTANTS.add(Instant.now());
137+
throw new RuntimeException("dummy-exception");
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)