Skip to content

Commit 07fb652

Browse files
committed
GH-10090: Add AmqpClientMessageHandler for AMQP 1.0
Related to: #10090 Spring AMQP now provides the `org.springframework.amqp:spring-rabbitmq-client` library to communicate to RabbitMQ with AMQP 1.0 protocol. * Replace `spring-rabbit` with the `spring-rabbitmq-client` since the latter brings the AMQP 0.9 protocol library as transitive dependency * Introduce an `AmqpClientMessageHandler` based on the `AsyncAmqpTemplate` where its implementation should be for AMQP 1.0, e.g. `org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate` * This outbound channel adapter can behave as a gateway when `setRequiresReply(true)` * Cover basic `AmqpClientMessageHandler` with integration tests * Document this new feature
1 parent 4e0667c commit 07fb652

File tree

6 files changed

+592
-1
lines changed

6 files changed

+592
-1
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ project('spring-integration-test-support') {
436436
project('spring-integration-amqp') {
437437
description = 'Spring Integration AMQP Support'
438438
dependencies {
439-
api 'org.springframework.amqp:spring-rabbit'
439+
api 'org.springframework.amqp:spring-rabbitmq-client'
440440
optionalApi 'org.springframework.amqp:spring-rabbit-stream'
441441

442442
testImplementation 'org.springframework.amqp:spring-rabbit-junit'
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.outbound;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import org.jspecify.annotations.Nullable;
22+
23+
import org.springframework.amqp.core.AsyncAmqpTemplate;
24+
import org.springframework.amqp.core.Message;
25+
import org.springframework.amqp.core.MessageProperties;
26+
import org.springframework.amqp.support.converter.MessageConverter;
27+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
28+
import org.springframework.amqp.support.converter.SmartMessageConverter;
29+
import org.springframework.core.ParameterizedTypeReference;
30+
import org.springframework.core.ResolvableType;
31+
import org.springframework.expression.Expression;
32+
import org.springframework.expression.spel.support.StandardEvaluationContext;
33+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
34+
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
35+
import org.springframework.integration.expression.ExpressionUtils;
36+
import org.springframework.integration.expression.ValueExpression;
37+
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
38+
import org.springframework.integration.mapping.AbstractHeaderMapper;
39+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
40+
import org.springframework.util.Assert;
41+
import org.springframework.util.StringUtils;
42+
43+
/**
44+
* An {@link AbstractReplyProducingMessageHandler} implementation for AMQP 1.0 client.
45+
* <p>
46+
* With the {@link #setRequiresReply(boolean)} configured as {@code true}, this message handler
47+
* behaves as a gateway - the RPC over AMQP.
48+
* In this case, the {@link AsyncAmqpTemplate} must be able to convert
49+
* a reply into a provided {@link #replyPayloadTypeExpression}.
50+
* <p>
51+
* This handler is {@code async} by default.
52+
* <p>
53+
* In async mode, the error is sent to the error channel even if not in a gateway mode.
54+
* <p>
55+
* The {@link #exchangeExpression}, {@link #routingKeyExpression} and {@link #queueExpression}
56+
* are optional.
57+
* Then they have to be supplied by the {@link AsyncAmqpTemplate}.
58+
*
59+
* @author Artem Bilan
60+
*
61+
* @since 7.0
62+
*/
63+
public class AmqpClientMessageHandler extends AbstractReplyProducingMessageHandler {
64+
65+
private final AsyncAmqpTemplate amqpTemplate;
66+
67+
private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper();
68+
69+
private MessageConverter messageConverter = new SimpleMessageConverter();
70+
71+
private @Nullable Expression exchangeExpression;
72+
73+
private @Nullable Expression routingKeyExpression;
74+
75+
private @Nullable Expression queueExpression;
76+
77+
private @Nullable Expression replyPayloadTypeExpression;
78+
79+
private boolean returnMessage;
80+
81+
@SuppressWarnings("NullAway.Init")
82+
private StandardEvaluationContext evaluationContext;
83+
84+
/**
85+
* Construct an instance with the provided {@link AsyncAmqpTemplate}.
86+
* The {@link AsyncAmqpTemplate} must be an implementation for AMQP 1.0 protocol,
87+
* e.g. {@link org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate}.
88+
* @param amqpTemplate the {@link AsyncAmqpTemplate} to use.
89+
*/
90+
@SuppressWarnings("this-escape")
91+
public AmqpClientMessageHandler(AsyncAmqpTemplate amqpTemplate) {
92+
this.amqpTemplate = amqpTemplate;
93+
setAsync(true);
94+
}
95+
96+
public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
97+
this.headerMapper = headerMapper;
98+
}
99+
100+
public void setMessageConverter(MessageConverter messageConverter) {
101+
this.messageConverter = messageConverter;
102+
}
103+
104+
public void setExchange(String exchange) {
105+
setExchangeExpression(new ValueExpression<>(exchange));
106+
}
107+
108+
public void setExchangeExpressionString(String exchangeExpression) {
109+
setExchangeExpression(EXPRESSION_PARSER.parseExpression(exchangeExpression));
110+
}
111+
112+
public void setExchangeExpression(Expression exchangeExpression) {
113+
this.exchangeExpression = exchangeExpression;
114+
}
115+
116+
public void setRoutingKey(String routingKey) {
117+
setRoutingKeyExpression(new ValueExpression<>(routingKey));
118+
}
119+
120+
public void setRoutingKeyExpressionString(String routingKeyExpression) {
121+
setRoutingKeyExpression(EXPRESSION_PARSER.parseExpression(routingKeyExpression));
122+
}
123+
124+
public void setRoutingKeyExpression(Expression routingKeyExpression) {
125+
this.routingKeyExpression = routingKeyExpression;
126+
}
127+
128+
public void setQueue(String queue) {
129+
setQueueExpression(new ValueExpression<>(queue));
130+
}
131+
132+
public void setQueueExpressionString(String queueExpression) {
133+
setQueueExpression(EXPRESSION_PARSER.parseExpression(queueExpression));
134+
}
135+
136+
public void setQueueExpression(Expression queueExpression) {
137+
this.queueExpression = queueExpression;
138+
}
139+
140+
/**
141+
* Set the reply payload type.
142+
* Used only if {@link #setRequiresReply(boolean)} is {@code true}.
143+
* @param replyPayloadType the reply payload type.
144+
*/
145+
public void setReplyPayloadType(Class<?> replyPayloadType) {
146+
setReplyPayloadType(ResolvableType.forClass(replyPayloadType));
147+
}
148+
149+
/**
150+
* Set the reply payload type.
151+
* Used only if {@link #setRequiresReply(boolean)} is {@code true}.
152+
* @param replyPayloadType the reply payload type.
153+
*/
154+
public void setReplyPayloadType(ResolvableType replyPayloadType) {
155+
setReplyPayloadTypeExpression(new ValueExpression<>(replyPayloadType));
156+
}
157+
158+
/**
159+
* Set a SpEL expression for the reply payload type.
160+
* Used only if {@link #setRequiresReply(boolean)} is {@code true}.
161+
* Must be evaluated to a {@link Class} or {@link ResolvableType}.
162+
* @param replyPayloadTypeExpression the expression for a reply payload type.
163+
*/
164+
public void setReplyPayloadTypeExpressionString(String replyPayloadTypeExpression) {
165+
setReplyPayloadTypeExpression(EXPRESSION_PARSER.parseExpression(replyPayloadTypeExpression));
166+
}
167+
168+
/**
169+
* Set a SpEL expression for the reply payload type.
170+
* Used only if {@link #setRequiresReply(boolean)} is {@code true}.
171+
* Must be evaluated to a {@link Class} or {@link ResolvableType}.
172+
* @param replyPayloadTypeExpression the expression for a reply payload type.
173+
*/
174+
public void setReplyPayloadTypeExpression(Expression replyPayloadTypeExpression) {
175+
this.replyPayloadTypeExpression = replyPayloadTypeExpression;
176+
}
177+
178+
/**
179+
* Set to true to return the reply as a whole AMQP message.
180+
* Used only in the gateway mode.
181+
* @param returnMessage true to return the reply as a whole AMQP message.
182+
*/
183+
public void setReturnMessage(boolean returnMessage) {
184+
this.returnMessage = returnMessage;
185+
}
186+
187+
@Override
188+
public String getComponentType() {
189+
return getRequiresReply() ? "amqp:outbound-gateway" : "amqp:outbound-channel-adapter";
190+
}
191+
192+
@Override
193+
protected void doInit() {
194+
super.doInit();
195+
if (this.headerMapper instanceof AbstractHeaderMapper<?> abstractHeaderMapper) {
196+
abstractHeaderMapper.setBeanClassLoader(getBeanClassLoader());
197+
}
198+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
199+
200+
Assert.state(this.exchangeExpression == null || this.queueExpression == null,
201+
"The 'exchange' (and optional 'routingKey') is mutually exclusive with 'queue'");
202+
203+
Assert.state(this.replyPayloadTypeExpression == null || this.messageConverter instanceof SmartMessageConverter,
204+
"The 'messageConverter' must be a 'SmartMessageConverter' when 'replyPayloadTypeExpression' is provided");
205+
206+
Assert.state(this.replyPayloadTypeExpression == null || !this.returnMessage,
207+
"The 'returnMessage == true' and 'replyPayloadTypeExpression' are mutually exclusive");
208+
}
209+
210+
@Override
211+
protected @Nullable Object handleRequestMessage(org.springframework.messaging.Message<?> requestMessage) {
212+
MessageProperties messageProperties = new MessageProperties();
213+
this.headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), messageProperties);
214+
Message amqpMessage = this.messageConverter.toMessage(requestMessage.getPayload(), messageProperties);
215+
216+
String queue = null;
217+
if (this.queueExpression != null) {
218+
queue = this.queueExpression.getValue(this.evaluationContext, requestMessage, String.class);
219+
}
220+
221+
String exchange = null;
222+
if (this.exchangeExpression != null) {
223+
exchange = this.exchangeExpression.getValue(this.evaluationContext, requestMessage, String.class);
224+
}
225+
226+
String routingKey = null;
227+
if (this.routingKeyExpression != null) {
228+
routingKey = this.routingKeyExpression.getValue(this.evaluationContext, requestMessage, String.class);
229+
}
230+
231+
if (getRequiresReply()) {
232+
return doSendAndReceive(requestMessage, amqpMessage, queue, exchange, routingKey);
233+
}
234+
else {
235+
doSend(requestMessage, amqpMessage, queue, exchange, routingKey);
236+
return null;
237+
}
238+
}
239+
240+
private void doSend(org.springframework.messaging.Message<?> requestMessage, Message amqpMessage,
241+
@Nullable String queue, @Nullable String exchange, @Nullable String routingKey) {
242+
243+
CompletableFuture<Boolean> sendResultFuture;
244+
245+
if (StringUtils.hasText(queue)) {
246+
sendResultFuture = this.amqpTemplate.send(queue, amqpMessage);
247+
}
248+
else if (StringUtils.hasText(exchange)) {
249+
sendResultFuture = this.amqpTemplate.send(exchange, routingKey, amqpMessage);
250+
}
251+
else {
252+
sendResultFuture = this.amqpTemplate.send(amqpMessage);
253+
}
254+
255+
if (isAsync()) {
256+
sendResultFuture.whenComplete((aBoolean, throwable) -> {
257+
if (throwable != null) {
258+
sendErrorMessage(requestMessage, throwable);
259+
}
260+
});
261+
}
262+
else {
263+
sendResultFuture.join();
264+
}
265+
}
266+
267+
private Object doSendAndReceive(org.springframework.messaging.Message<?> requestMessage, Message amqpMessage,
268+
@Nullable String queue, @Nullable String exchange, @Nullable String routingKey) {
269+
270+
ParameterizedTypeReference<?> replyType = null;
271+
if (this.replyPayloadTypeExpression != null) {
272+
Object type = this.replyPayloadTypeExpression.getValue(this.evaluationContext, requestMessage);
273+
274+
Assert.state(type instanceof Class<?> || type instanceof ResolvableType,
275+
"The 'replyPayloadTypeExpression' must evaluate to 'Class' or 'ResolvableType'");
276+
277+
ResolvableType replyResolvableType =
278+
type instanceof Class<?> aClass
279+
? ResolvableType.forClass(aClass)
280+
: (ResolvableType) type;
281+
282+
replyType = ParameterizedTypeReference.forType(replyResolvableType.getType());
283+
}
284+
285+
CompletableFuture<?> replyFuture;
286+
287+
if (StringUtils.hasText(queue)) {
288+
replyFuture = this.amqpTemplate.sendAndReceive(queue, amqpMessage);
289+
}
290+
else if (StringUtils.hasText(exchange)) {
291+
replyFuture = this.amqpTemplate.sendAndReceive(exchange, routingKey, amqpMessage);
292+
}
293+
else {
294+
replyFuture = this.amqpTemplate.sendAndReceive(amqpMessage);
295+
}
296+
297+
if (!this.returnMessage) {
298+
ParameterizedTypeReference<?> replyTypeToUse = replyType;
299+
replyFuture = replyFuture.thenApply((reply) -> buildReplyMessage((Message) reply, replyTypeToUse));
300+
}
301+
302+
return isAsync() ? replyFuture : replyFuture.join();
303+
}
304+
305+
private AbstractIntegrationMessageBuilder<?> buildReplyMessage(Message message,
306+
@Nullable ParameterizedTypeReference<?> replyType) {
307+
308+
Object replyPayload =
309+
replyType != null
310+
? ((SmartMessageConverter) this.messageConverter).fromMessage(message, replyType)
311+
: this.messageConverter.fromMessage(message);
312+
313+
return getMessageBuilderFactory().withPayload(replyPayload)
314+
.copyHeaders(this.headerMapper.toHeadersFromReply(message.getMessageProperties()));
315+
}
316+
317+
}

0 commit comments

Comments
 (0)