Skip to content

Commit d255671

Browse files
garyrussellartembilan
authored andcommitted
GH-2451: Add StreamAdmin
Resolves #2451 **cherry-pick to 2.4.x** # Conflicts: # src/reference/asciidoc/stream.adoc
1 parent 47b56fb commit d255671

File tree

3 files changed

+169
-4
lines changed

3 files changed

+169
-4
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.support;
18+
19+
import java.util.function.Consumer;
20+
21+
import org.springframework.context.SmartLifecycle;
22+
import org.springframework.util.Assert;
23+
24+
import com.rabbitmq.stream.Environment;
25+
import com.rabbitmq.stream.StreamCreator;
26+
27+
/**
28+
* Used to provision streams.
29+
*
30+
* @author Gary Russell
31+
* @since 2.4.13
32+
*
33+
*/
34+
public class StreamAdmin implements SmartLifecycle {
35+
36+
private final StreamCreator streamCreator;
37+
38+
private final Consumer<StreamCreator> callback;
39+
40+
private boolean autoStartup = true;
41+
42+
private int phase;
43+
44+
private volatile boolean running;
45+
46+
/**
47+
* Construct with the provided parameters.
48+
* @param env the environment.
49+
* @param callback the callback to receive the {@link StreamCreator}.
50+
*/
51+
public StreamAdmin(Environment env, Consumer<StreamCreator> callback) {
52+
Assert.notNull(env, "Environment cannot be null");
53+
Assert.notNull(callback, "'callback' cannot be null");
54+
this.streamCreator = env.streamCreator();
55+
this.callback = callback;
56+
}
57+
58+
@Override
59+
public int getPhase() {
60+
return this.phase;
61+
}
62+
63+
/**
64+
* Set the phase; default is 0.
65+
* @param phase the phase.
66+
*/
67+
public void setPhase(int phase) {
68+
this.phase = phase;
69+
}
70+
71+
/**
72+
* Set to false to prevent automatic startup.
73+
* @param autoStartup the autoStartup.
74+
*/
75+
public void setAutoStartup(boolean autoStartup) {
76+
this.autoStartup = autoStartup;
77+
}
78+
79+
@Override
80+
public void start() {
81+
this.callback.accept(this.streamCreator);
82+
this.running = true;
83+
}
84+
85+
@Override
86+
public void stop() {
87+
this.running = false;
88+
}
89+
90+
@Override
91+
public boolean isRunning() {
92+
return this.running;
93+
}
94+
95+
@Override
96+
public boolean isAutoStartup() {
97+
return this.autoStartup;
98+
}
99+
100+
}

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
5252
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
5353
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
54+
import org.springframework.rabbit.stream.support.StreamAdmin;
5455
import org.springframework.rabbit.stream.support.StreamMessageProperties;
5556
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
5657
import org.springframework.test.annotation.DirtiesContext;
@@ -169,7 +170,17 @@ static Environment environment() {
169170
}
170171

171172
@Bean
172-
SmartLifecycle creator(Environment env) {
173+
StreamAdmin streamAdmin(Environment env) {
174+
StreamAdmin streamAdmin = new StreamAdmin(env, sc -> {
175+
sc.stream("test.stream.queue1").create();
176+
sc.stream("test.stream.queue2").create();
177+
});
178+
streamAdmin.setAutoStartup(false);
179+
return streamAdmin;
180+
}
181+
182+
@Bean
183+
SmartLifecycle creator(Environment env, StreamAdmin admin) {
173184
return new SmartLifecycle() {
174185

175186
boolean running;
@@ -183,8 +194,7 @@ public void stop() {
183194
@Override
184195
public void start() {
185196
clean(env);
186-
env.streamCreator().stream("test.stream.queue1").create();
187-
env.streamCreator().stream("test.stream.queue2").create();
197+
admin.start();
188198
this.running = true;
189199
}
190200

src/reference/asciidoc/stream.adoc

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,61 @@ Version 2.4 introduces initial support for the https://github.com/rabbitmq/rabbi
66
* `RabbitStreamTemplate`
77
* `StreamListenerContainer`
88

9+
Add the `spring-rabbit-stream` dependency to your project:
10+
11+
.maven
12+
====
13+
[source,xml,subs="+attributes"]
14+
----
15+
<dependency>
16+
<groupId>org.springframework.amqp</groupId>
17+
<artifactId>spring-rabbit-stream</artifactId>
18+
<version>{project-version}</version>
19+
</dependency>
20+
----
21+
====
22+
23+
.gradle
24+
====
25+
[source,groovy,subs="+attributes"]
26+
----
27+
compile 'org.springframework.amqp:spring-rabbit-stream:{project-version}'
28+
----
29+
====
30+
31+
You can provision the queues as normal, using a `RabbitAdmin` bean, using the `QueueBuilder.stream()` method to designate the queue type.
32+
For example:
33+
34+
====
35+
[source, java]
36+
----
37+
@Bean
38+
Queue stream() {
39+
return QueueBuilder.durable("stream.queue1")
40+
.stream()
41+
.build();
42+
}
43+
----
44+
====
45+
46+
However, this will only work if you are also using non-stream components (such as the `SimpleMessageListenerContainer` or `DirectMessageListenerContainer`) because the admin is triggered to declare the defined beans when an AMQP connection is opened.
47+
If your application only uses stream components, or you wish to use advanced stream configuration features, you should configure a `StreamAdmin` instead:
48+
49+
====
50+
[source, java]
51+
----
52+
@Bean
53+
StreamAdmin streamAdmin(Environment env) {
54+
return new StreamAdmin(env, sc -> {
55+
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
56+
sc.stream("stream.queue2").create();
57+
});
58+
}
59+
----
60+
====
61+
62+
Refer to the RabbitMQ documentation for more information about the `StreamCreator`.
63+
964
==== Sending Messages
1065

1166
The `RabbitStreamTemplate` provides a subset of the `RabbitTemplate` (AMQP) functionality.

0 commit comments

Comments
 (0)