Skip to content

Commit 84ebd3d

Browse files
committed
GH-2919: Add documentation for spring-rabbitmq-client
Fixes: #2919
1 parent 684fac2 commit 84ebd3d

File tree

1 file changed

+265
-1
lines changed

1 file changed

+265
-1
lines changed

src/reference/antora/modules/ROOT/pages/rabbitmq-amqp-client.adoc

Lines changed: 265 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,268 @@ Version 4.0 introduces `spring-rabbitmq-client` module for https://www.rabbitmq.
55

66
This artifact is based on the {rabbitmq-github}/rabbitmq-amqp-java-client[com.rabbitmq.client:amqp-client] library and therefore can work only with RabbitMQ and its AMQP 1.0 protocol support.
77
It cannot be used for any arbitrary AMQP 1.0 broker.
8-
For that purpose a https://qpid.apache.org/components/jms/index.html[JMS bridge] and respective {spring-framework-docs}/integration/jms.html[Spring JMS] integration is recommended so far.
8+
For that purpose a https://qpid.apache.org/components/jms/index.html[JMS bridge] and respective {spring-framework-docs}/integration/jms.html[Spring JMS] integration is recommended so far.
9+
10+
This dependency has to be added to the project to be able to interact with RabbitMQ AMQP 1.0 support:
11+
12+
.maven
13+
[source,xml,subs="+attributes"]
14+
----
15+
<dependency>
16+
<groupId>org.springframework.amqp</groupId>
17+
<artifactId>spring-rabbitmq-client</artifactId>
18+
<version>{project-version}</version>
19+
</dependency>
20+
----
21+
22+
.gradle
23+
[source,groovy,subs="+attributes"]
24+
----
25+
compile 'org.springframework.amqp:spring-rabbitmq-client:{project-version}'
26+
----
27+
28+
The `spring-rabbit` (for AMQP 0.9.1 protocol) comes as a transitive dependency for reusing some common API in this new client, for example, exceptions, the `@RabbitListener` support.
29+
It is not necessary to use both functionality in the target project, but RabbitMQ allows both AMQP 0.9.1 and 1.0 co-exists.
30+
31+
For more information about RabbitMQ AMQP 1.0 Java Client see its https://www.rabbitmq.com/client-libraries/amqp-client-libraries[documentation].
32+
33+
[[amqp-client-environment]]
34+
== RabbitMQ AMQP 1.0 Environment
35+
36+
The `com.rabbitmq.client.amqp.Environment` is the first thing which has to be added to the project for connection management and other common settings.
37+
It is an entry point to a node or a cluster of nodes.
38+
The environment allows creating connections.
39+
It can contain infrastructure-related configuration settings shared between connections, e.g. pools of threads, metrics and/or observation:
40+
41+
[source,java]
42+
----
43+
@Bean
44+
Environment environment() {
45+
return new AmqpEnvironmentBuilder()
46+
.connectionSettings()
47+
.port(5672)
48+
.environmentBuilder()
49+
.build();
50+
}
51+
----
52+
53+
The same `Environment` instance can be used for connecting to different RabbitMQ brokers, then connection setting must be provided on specific connection.
54+
See below.
55+
56+
[[amqp-client-connection-factory]]
57+
== AMQP Connection Factory
58+
59+
The `org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory` abstraction was introduced to manage `com.rabbitmq.client.amqp.Connection`.
60+
Don't confuse it with a `org.springframework.amqp.rabbit.connection.ConnectionFactory` which is only for AMQP 0.9.1 protocol.
61+
The `SingleAmqpConnectionFactory` implementation is present to manage one connection and its settings.
62+
The same `Connection` can be shared between many producers, consumers and management.
63+
The multi-plexing is handled by the link abstraction for AMQP 1.0 protocol implementation internally in the AMQP client library.
64+
The `Connection` has recovery capabilities and also handles topology.
65+
66+
In most cases there is just enough to add this bean into the project:
67+
68+
[source,java]
69+
----
70+
@Bean
71+
AmqpConnectionFactory connectionFactory(Environment environment) {
72+
return new SingleAmqpConnectionFactory(environment);
73+
}
74+
----
75+
76+
See `SingleAmqpConnectionFactory` setters for all connection-specific setting.
77+
78+
[[amqp-client-topology]]
79+
== RabbitMQ Topology Management
80+
81+
For topology management (exchanges, queues and binding between) from the application perspective, the `RabbitAmqpAdmin` is present, which is an implementation of existing `AmqpAdmin` interface:
82+
83+
[source,java]
84+
----
85+
@Bean
86+
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
87+
return new RabbitAmqpAdmin(connectionFactory);
88+
}
89+
----
90+
91+
The same bean definitions for `Exchange`, `Queue`, `Binding` and `Declarables` instances as described in the xref:amqp/broker-configuration.adoc[] has to be used to manage topology.
92+
The `RabbitAdmin` from `spring-rabbit` can also do that, but it happens against AMQP 0.9.1 connection, and since `RabbitAmqpAdmin` is based on the AMQP 1.0 connection, the topology recovery is handled smoothly from there, together with publishers and consumers recovery.
93+
94+
The `RabbitAmqpAdmin` performs respective beans scanning in its `start()` lifecycle callback.
95+
The `initialize()`, as well-as all other RabbitMQ entities management methods can be called manually at runtime.
96+
Internally the `RabbitAmqpAdmin` uses `com.rabbitmq.client.amqp.Connection.management()` API to perform respective topology manipulations.
97+
98+
[[amqp-client-template]]
99+
== `RabbitAmqpTemplate`
100+
101+
The `RabbitAmqpTemplate` is an implementation of the `AsyncAmqpTemplate` and performs various send/receive operations with AMQP 1.0 protocol.
102+
Requires an `AmqpConnectionFactory` and can be configured with some defaults.
103+
Even if `com.rabbitmq.client:amqp-client` library comes with a `com.rabbitmq.client.amqp.Message`, the `RabbitAmqpTemplate` still exposes an API based on the well-known `org.springframework.amqp.core.Message` with all the supporting classes like `MessageProperties` and `MessageConverter` abstraction.
104+
The conversion to/from `com.rabbitmq.client.amqp.Message` is done internally in the `RabbitAmqpTemplate`.
105+
All the methods return a `CompletableFuture` to obtain operation results eventually.
106+
The operations with plain object require message body conversion and `SimpleMessageConverter` is used by default.
107+
See xref:amqp/message-converters.adoc[] for more information about conversions.
108+
109+
Usually, just one bean like this is enough to perform all the possible template pattern operation:
110+
111+
[source,java]
112+
----
113+
@Bean
114+
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
115+
return new RabbitAmqpTemplate(connectionFactory);
116+
}
117+
----
118+
119+
It can be configured for some default exchange and routing key or just queue.
120+
The `RabbitAmqpTemplate` have a default queue for receive operation and another default queue for request-reply operation where temporary queue is created for the request by the client if not present.
121+
122+
Here are some samples of `RabbitAmqpTemplate` operations:
123+
124+
[source,java]
125+
----
126+
@Bean
127+
DirectExchange e1() {
128+
return new DirectExchange("e1");
129+
}
130+
131+
@Bean
132+
Queue q1() {
133+
return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
134+
}
135+
136+
@Bean
137+
Binding b1() {
138+
return BindingBuilder.bind(q1()).to(e1()).with("k1");
139+
}
140+
141+
...
142+
143+
@Test
144+
void defaultExchangeAndRoutingKey() {
145+
this.rabbitAmqpTemplate.setExchange("e1");
146+
this.rabbitAmqpTemplate.setRoutingKey("k1");
147+
this.rabbitAmqpTemplate.setReceiveQueue("q1");
148+
149+
assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
150+
.succeedsWithin(Duration.ofSeconds(10));
151+
152+
assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
153+
.succeedsWithin(Duration.ofSeconds(10))
154+
.isEqualTo("test1");
155+
}
156+
----
157+
158+
Here we declared an `e1` exchange, `q1` queue and bind it into that exchange with a `k1` routing key.
159+
Then we use a default setting for `RabbitAmqpTemplate` to publish messages to the mentioned exchange with the respective routing key and use `q1` as default queue for receiving operations.
160+
There are overloaded variants for those methods to send to specific exchange or queue (for send and receive).
161+
The `receiveAndConvert()` operations with a `ParameterizedTypeReference<T>` requires a `SmartMessageConverter` to be injected into the `RabbitAmqpTemplate`.
162+
163+
The next example demonstrate and RPC implementation with `RabbitAmqpTemplate` (assuming same RabbitMQ objects as in the previous example):
164+
165+
[source,java]
166+
----
167+
@Test
168+
void verifyRpc() {
169+
String testRequest = "rpc-request";
170+
String testReply = "rpc-reply";
171+
172+
CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);
173+
174+
AtomicReference<String> receivedRequest = new AtomicReference<>();
175+
CompletableFuture<Boolean> rpcServerResult =
176+
this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
177+
payload -> {
178+
receivedRequest.set(payload);
179+
return testReply;
180+
});
181+
182+
assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
183+
assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
184+
assertThat(receivedRequest.get()).isEqualTo(testRequest);
185+
}
186+
----
187+
188+
The correlation and `replyTo` queue are managed internally.
189+
The server side can be implemented with a `@RabbitListener` POJO method described below.
190+
191+
[[amqp-client-listener]]
192+
== The RabbitMQ AMQP 1.0 Consumer
193+
194+
As with many other messaging implementations for consumer side, the `spring-rabbitmq-client` modules comes with the `RabbitAmqpListenerContainer` which is, essentially, an implementation of well-know `MessageListenerContainer`.
195+
It does exactly the same as `DirectMessageListenerContainer`, but for RabbitMQ AMQP 1.0 support.
196+
Requires an `AmqpConnectionFactory` and at least one queue to consume from.
197+
Also, the `MessageListener` (or AMQP 1.0 specific `RabbitAmqpMessageListener`) must be provided.
198+
Can be configured with an `autoSettle = false`, with the meaning of `AcknowledgeMode.MANUAL`.
199+
In that case, the `Message` provided to the `MessageListener` has in its `MessageProperties` an `AmqpAcknowledgment` callback for target logic consideration.
200+
201+
The `RabbitAmqpMessageListener` has a contract for `com.rabbitmq.client:amqp-client` abstractions:
202+
203+
[source,java]
204+
----
205+
/**
206+
* Process an AMQP message.
207+
* @param message the message to process.
208+
* @param context the consumer context to settle message.
209+
* Null if container is configured for {@code autoSettle}.
210+
*/
211+
void onAmqpMessage(Message message, Consumer.Context context);
212+
----
213+
214+
Where the first argument is a native received `com.rabbitmq.client.amqp.Message` and `context` is a native callback for message settlement, similar to the mentioned above `AmqpAcknowledgment` abstraction.
215+
216+
The `RabbitAmqpMessageListener` can handle and settle messages in batches when `batchSize` option is provided.
217+
For this purpose the `MessageListener.onMessageBatch()` contract must be implemented.
218+
The `batchReceiveDuration` option is used to schedule a force release for not full batches to avoid memory and https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control[consumer credits] exhausting.
219+
220+
Usually, the `RabbitAmqpMessageListener` class is not used directly in the target project, and POJO method annotation configuration via `@RabbitListener` is chosen for declarative consumer configuration.
221+
The `RabbitAmqpListenerContainerFactory` must be registered under the `RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME`, and `@RabbitListener` annotation process will register `RabbitAmqpMessageListener` instance into the `RabbitListenerEndpointRegistry`.
222+
The target POJO method invocation is handled by specific `RabbitAmqpMessageListenerAdapter` implementation, which extends a `MessagingMessageListenerAdapter` and reuses a lot of its functionality, including request-reply scenarios (async or not).
223+
So, all the concepts described in the xref:amqp/receiving-messages/async-annotation-driven.adoc[] are applied with this `RabbitAmqpMessageListener` as well.
224+
225+
In addition to traditional messaging `payload` and `headers`, the `@RabbitListener` POJO method contract can be with these parameters:
226+
227+
* `com.rabbitmq.client.amqp.Message` - the native AMQP 1.0 message without any conversions;
228+
* `org.springframework.amqp.core.Message` - Spring AMQP message abstraction as conversion result from the native AMQP 1.0 message;
229+
* `org.springframework.messaging.Message` - Spring Messaging abstraction as conversion result from the Spring AMQP message;
230+
* `Consumer.Context` - RabbitMQ AMQP client consumer settlement API;
231+
* `org.springframework.amqp.core.AmqpAcknowledgment` - Spring AMQP acknowledgment abstraction: delegates to the `Consumer.Context`.
232+
233+
The following example demonstrates a simple `@RabbitListener` for RabbitMQ AMQP 1.0 interaction with the manual settlement:
234+
235+
[source,java]
236+
----
237+
@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
238+
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
239+
return new RabbitAmqpListenerContainerFactory(connectionFactory);
240+
}
241+
242+
final List<String> received = Collections.synchronizedList(new ArrayList<>());
243+
244+
CountDownLatch consumeIsDone = new CountDownLatch(11);
245+
246+
@RabbitListener(queues = {"q1", "q2"},
247+
ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
248+
concurrency = "2",
249+
id = "testAmqpListener")
250+
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
251+
try {
252+
if ("discard".equals(data)) {
253+
if (!this.received.contains(data)) {
254+
context.discard();
255+
}
256+
else {
257+
throw new MessageConversionException("Test message is rejected");
258+
}
259+
}
260+
else if ("requeue".equals(data) && !this.received.contains(data)) {
261+
acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
262+
}
263+
else {
264+
acknowledgment.acknowledge();
265+
}
266+
this.received.add(data);
267+
}
268+
finally {
269+
this.consumeIsDone.countDown();
270+
}
271+
}
272+
----

0 commit comments

Comments
 (0)