Skip to content

Commit f513d45

Browse files
committed
Add RabbitMQ Stream service connection from RabbitMQContainer
Add `RabbitStreamConnectionDetails` and support from `RabbitMQContainer` when `rabbitmq_stream` plugin is enabled.
1 parent 97ff6e2 commit f513d45

File tree

7 files changed

+321
-12
lines changed

7 files changed

+321
-12
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
2626
import org.springframework.amqp.support.converter.MessageConverter;
2727
import org.springframework.beans.factory.ObjectProvider;
28+
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Stream;
2829
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.StreamContainer;
2930
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
3031
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -50,6 +51,12 @@
5051
@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
5152
class RabbitStreamConfiguration {
5253

54+
@Bean
55+
@ConditionalOnMissingBean(RabbitStreamConnectionDetails.class)
56+
RabbitStreamConnectionDetails rabbitStreamConnectionDetails(RabbitProperties rabbitProperties) {
57+
return new PropertiesRabbitStreamConnectionDetails(rabbitProperties.getStream());
58+
}
59+
5360
@Bean(name = "rabbitListenerContainerFactory")
5461
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
5562
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "stream")
@@ -68,9 +75,9 @@ StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Enviro
6875

6976
@Bean(name = "rabbitStreamEnvironment")
7077
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
71-
Environment rabbitStreamEnvironment(RabbitProperties properties,
78+
Environment rabbitStreamEnvironment(RabbitProperties properties, RabbitStreamConnectionDetails connectionDetails,
7279
ObjectProvider<EnvironmentBuilderCustomizer> customizers) {
73-
EnvironmentBuilder builder = configure(Environment.builder(), properties);
80+
EnvironmentBuilder builder = configure(Environment.builder(), properties, connectionDetails);
7481
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
7582
return builder.build();
7683
}
@@ -99,12 +106,13 @@ RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, R
99106
return template;
100107
}
101108

102-
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
109+
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties,
110+
RabbitStreamConnectionDetails connectionDetails) {
103111
builder.lazyInitialization(true);
104112
RabbitProperties.Stream stream = properties.getStream();
105113
PropertyMapper map = PropertyMapper.get();
106-
map.from(stream.getHost()).to(builder::host);
107-
map.from(stream.getPort()).to(builder::port);
114+
map.from(connectionDetails.getHost()).to(builder::host);
115+
map.from(connectionDetails.getPort()).to(builder::port);
108116
map.from(stream.getVirtualHost())
109117
.as(withFallback(properties::getVirtualHost))
110118
.whenNonNull()
@@ -118,4 +126,24 @@ private static Function<String, String> withFallback(Supplier<String> fallback)
118126
return (value) -> (value != null) ? value : fallback.get();
119127
}
120128

129+
static class PropertiesRabbitStreamConnectionDetails implements RabbitStreamConnectionDetails {
130+
131+
private final Stream streamProperties;
132+
133+
PropertiesRabbitStreamConnectionDetails(Stream streamProperties) {
134+
this.streamProperties = streamProperties;
135+
}
136+
137+
@Override
138+
public String getHost() {
139+
return this.streamProperties.getHost();
140+
}
141+
142+
@Override
143+
public int getPort() {
144+
return this.streamProperties.getPort();
145+
}
146+
147+
}
148+
121149
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.amqp;
18+
19+
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
20+
21+
/**
22+
* Details required to establish a connection to a RabbitMQ service.
23+
*
24+
* @author Eddú Meléndez
25+
* @since 3.4.0
26+
*/
27+
public interface RabbitStreamConnectionDetails extends ConnectionDetails {
28+
29+
/**
30+
* Rabbit server host.
31+
* @return the rabbit server host
32+
*/
33+
String getHost();
34+
35+
/**
36+
* Rabbit server port.
37+
* @return the rabbit server port
38+
*/
39+
int getPort();
40+
41+
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.springframework.boot.autoconfigure.amqp;
1818

1919
import java.time.Duration;
20+
import java.util.List;
2021

22+
import com.rabbitmq.stream.Address;
2123
import com.rabbitmq.stream.BackOffDelayPolicy;
2224
import com.rabbitmq.stream.Codec;
2325
import com.rabbitmq.stream.Environment;
@@ -32,6 +34,7 @@
3234
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
3335
import org.springframework.amqp.support.converter.MessageConverter;
3436
import org.springframework.boot.autoconfigure.AutoConfigurations;
37+
import org.springframework.boot.autoconfigure.amqp.RabbitStreamConfiguration.PropertiesRabbitStreamConnectionDetails;
3538
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
3639
import org.springframework.context.annotation.Bean;
3740
import org.springframework.context.annotation.Configuration;
@@ -43,6 +46,7 @@
4346
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
4447
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
4548
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
49+
import org.springframework.test.util.ReflectionTestUtils;
4650

4751
import static org.assertj.core.api.Assertions.assertThat;
4852
import static org.mockito.BDDMockito.then;
@@ -127,7 +131,7 @@ void whenCustomMessageListenerContainerFactoryIsDefinedThenAutoConfiguredContain
127131
void environmentUsesPropertyDefaultsByDefault() {
128132
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
129133
RabbitProperties properties = new RabbitProperties();
130-
RabbitStreamConfiguration.configure(builder, properties);
134+
RabbitStreamConfiguration.configure(builder, properties, getRabbitConnectionDetails(properties));
131135
then(builder).should().port(5552);
132136
then(builder).should().host("localhost");
133137
then(builder).should().lazyInitialization(true);
@@ -141,7 +145,7 @@ void whenStreamPortIsSetThenEnvironmentUsesCustomPort() {
141145
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
142146
RabbitProperties properties = new RabbitProperties();
143147
properties.getStream().setPort(5553);
144-
RabbitStreamConfiguration.configure(builder, properties);
148+
RabbitStreamConfiguration.configure(builder, properties, getRabbitConnectionDetails(properties));
145149
then(builder).should().port(5553);
146150
}
147151

@@ -150,7 +154,7 @@ void whenStreamHostIsSetThenEnvironmentUsesCustomHost() {
150154
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
151155
RabbitProperties properties = new RabbitProperties();
152156
properties.getStream().setHost("stream.rabbit.example.com");
153-
RabbitStreamConfiguration.configure(builder, properties);
157+
RabbitStreamConfiguration.configure(builder, properties, getRabbitConnectionDetails(properties));
154158
then(builder).should().host("stream.rabbit.example.com");
155159
}
156160

@@ -159,7 +163,7 @@ void whenStreamVirtualHostIsSetThenEnvironmentUsesCustomVirtualHost() {
159163
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
160164
RabbitProperties properties = new RabbitProperties();
161165
properties.getStream().setVirtualHost("stream-virtual-host");
162-
RabbitStreamConfiguration.configure(builder, properties);
166+
RabbitStreamConfiguration.configure(builder, properties, getRabbitConnectionDetails(properties));
163167
then(builder).should().virtualHost("stream-virtual-host");
164168
}
165169

@@ -168,7 +172,7 @@ void whenStreamVirtualHostIsNotSetButDefaultVirtualHostIsSetThenEnvironmentUsesD
168172
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
169173
RabbitProperties properties = new RabbitProperties();
170174
properties.setVirtualHost("default-virtual-host");
171-
RabbitStreamConfiguration.configure(builder, properties);
175+
RabbitStreamConfiguration.configure(builder, properties, getRabbitConnectionDetails(properties));
172176
then(builder).should().virtualHost("default-virtual-host");
173177
}
174178

@@ -178,7 +182,7 @@ void whenStreamCredentialsAreNotSetThenEnvironmentUsesRabbitCredentials() {
178182
RabbitProperties properties = new RabbitProperties();
179183
properties.setUsername("alice");
180184
properties.setPassword("secret");
181-
RabbitStreamConfiguration.configure(builder, properties);
185+
RabbitStreamConfiguration.configure(builder, properties, getRabbitConnectionDetails(properties));
182186
then(builder).should().username("alice");
183187
then(builder).should().password("secret");
184188
}
@@ -191,7 +195,7 @@ void whenStreamCredentialsAreSetThenEnvironmentUsesStreamCredentials() {
191195
properties.setPassword("secret");
192196
properties.getStream().setUsername("bob");
193197
properties.getStream().setPassword("confidential");
194-
RabbitStreamConfiguration.configure(builder, properties);
198+
RabbitStreamConfiguration.configure(builder, properties, getRabbitConnectionDetails(properties));
195199
then(builder).should().username("bob");
196200
then(builder).should().password("confidential");
197201
}
@@ -260,6 +264,22 @@ void environmentCreatedByBuilderCanBeCustomized() {
260264
});
261265
}
262266

267+
@Test
268+
@SuppressWarnings("unchecked")
269+
void connectionDetailsAreApplied() {
270+
this.contextRunner.withPropertyValues("spring.rabbitmq.stream.name:stream-test")
271+
.withUserConfiguration(CustomConnectionDetails.class)
272+
.run((context) -> assertThat(context.getBean(Environment.class))
273+
.extracting((environment) -> (List<Address>) ReflectionTestUtils.getField(environment, "addresses"))
274+
.extracting((address) -> address.get(0))
275+
.extracting("host", "port")
276+
.containsExactly("rabbitmq", 5555));
277+
}
278+
279+
private RabbitStreamConnectionDetails getRabbitConnectionDetails(RabbitProperties properties) {
280+
return new PropertiesRabbitStreamConnectionDetails(properties.getStream());
281+
}
282+
263283
@Configuration(proxyBeanMethods = false)
264284
static class TestConfiguration {
265285

@@ -345,4 +365,24 @@ EnvironmentBuilderCustomizer customizerB() {
345365

346366
}
347367

368+
@Configuration(proxyBeanMethods = false)
369+
static class CustomConnectionDetails {
370+
371+
@Bean
372+
RabbitStreamConnectionDetails customRabbitMqStreamConnectionDetails() {
373+
return new RabbitStreamConnectionDetails() {
374+
@Override
375+
public String getHost() {
376+
return "rabbitmq";
377+
}
378+
379+
@Override
380+
public int getPort() {
381+
return 5555;
382+
}
383+
};
384+
}
385+
386+
}
387+
348388
}

spring-boot-project/spring-boot-testcontainers/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ dependencies {
4646
dockerTestImplementation("org.springframework:spring-jms")
4747
dockerTestImplementation("org.springframework:spring-r2dbc")
4848
dockerTestImplementation("org.springframework.amqp:spring-rabbit")
49+
dockerTestImplementation("org.springframework.amqp:spring-rabbit-stream")
4950
dockerTestImplementation("org.springframework.data:spring-data-redis")
5051
dockerTestImplementation("org.springframework.kafka:spring-kafka")
5152
dockerTestImplementation("org.springframework.ldap:spring-ldap-core")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.testcontainers.service.connection.amqp;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
import com.rabbitmq.stream.Address;
24+
import com.rabbitmq.stream.Environment;
25+
import org.awaitility.Awaitility;
26+
import org.junit.jupiter.api.Test;
27+
import org.testcontainers.containers.RabbitMQContainer;
28+
import org.testcontainers.images.builder.Transferable;
29+
import org.testcontainers.junit.jupiter.Container;
30+
import org.testcontainers.junit.jupiter.Testcontainers;
31+
32+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
35+
import org.springframework.boot.autoconfigure.amqp.EnvironmentBuilderCustomizer;
36+
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
37+
import org.springframework.boot.autoconfigure.amqp.RabbitStreamConnectionDetails;
38+
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
39+
import org.springframework.boot.testsupport.container.TestImage;
40+
import org.springframework.context.annotation.Bean;
41+
import org.springframework.context.annotation.Configuration;
42+
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
43+
import org.springframework.rabbit.stream.support.StreamAdmin;
44+
import org.springframework.test.context.TestPropertySource;
45+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
46+
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
49+
/**
50+
* Tests for {@link RabbitStreamContainerConnectionDetailsFactory}.
51+
*
52+
* @author Eddú Meléndez
53+
*/
54+
@SpringJUnitConfig
55+
@TestPropertySource(
56+
properties = { "spring.rabbitmq.stream.name=stream.queue1", "spring.rabbitmq.listener.type=stream" })
57+
@Testcontainers(disabledWithoutDocker = true)
58+
class RabbitStreamContainerConnectionDetailsFactoryIntegrationTests {
59+
60+
private static final int RABBITMQ_STREAMS_PORT = 5552;
61+
62+
@Container
63+
@ServiceConnection
64+
static final RabbitMQContainer rabbit = getRabbitMqStreamContainer();
65+
66+
private static RabbitMQContainer getRabbitMqStreamContainer() {
67+
RabbitMQContainer container = TestImage.container(RabbitMQContainer.class);
68+
container.addExposedPorts(RABBITMQ_STREAMS_PORT);
69+
var enabledPlugins = "[rabbitmq_stream,rabbitmq_prometheus].";
70+
container.withCopyToContainer(Transferable.of(enabledPlugins), "/etc/rabbitmq/enabled_plugins");
71+
return container;
72+
}
73+
74+
@Autowired(required = false)
75+
private RabbitStreamConnectionDetails connectionDetails;
76+
77+
@Autowired
78+
private RabbitStreamTemplate rabbitStreamTemplate;
79+
80+
@Autowired
81+
private TestListener listener;
82+
83+
@Test
84+
void connectionCanBeMadeToRabbitContainer() {
85+
assertThat(this.connectionDetails).isNotNull();
86+
this.rabbitStreamTemplate.convertAndSend("message");
87+
Awaitility.waitAtMost(Duration.ofMinutes(4))
88+
.untilAsserted(() -> assertThat(this.listener.messages).containsExactly("message"));
89+
90+
}
91+
92+
@Configuration(proxyBeanMethods = false)
93+
@ImportAutoConfiguration(RabbitAutoConfiguration.class)
94+
static class TestConfiguration {
95+
96+
@Bean
97+
StreamAdmin streamAdmin(Environment env) {
98+
return new StreamAdmin(env, sc -> {
99+
sc.stream("stream.queue1").create();
100+
});
101+
}
102+
103+
@Bean
104+
EnvironmentBuilderCustomizer environmentBuilderCustomizer() {
105+
return env -> {
106+
Address entrypoint = new Address(rabbit.getHost(), rabbit.getMappedPort(RABBITMQ_STREAMS_PORT));
107+
env.addressResolver(address -> entrypoint);
108+
};
109+
}
110+
111+
@Bean
112+
TestListener testListener() {
113+
return new TestListener();
114+
}
115+
116+
}
117+
118+
static class TestListener {
119+
120+
private final List<String> messages = new ArrayList<>();
121+
122+
@RabbitListener(queues = "stream.queue1")
123+
void processMessage(String message) {
124+
this.messages.add(message);
125+
}
126+
127+
}
128+
129+
}

0 commit comments

Comments
 (0)