Skip to content

Commit fef21b1

Browse files
committed
GH-10090: Add AmqpClientInboundGateway
Related to: #10090 * Add `AmqpClientInboundGateway` that is mostly a copy/paste of the `AmqpClientMessageProducer`, but adds a reply-producing logic * Cover with tests and document this new component * Fix a couple typos in the `amqp-1.0.adoc`
1 parent 0a26563 commit fef21b1

File tree

3 files changed

+562
-4
lines changed

3 files changed

+562
-4
lines changed
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.inbound;
18+
19+
import java.time.Duration;
20+
import java.util.Arrays;
21+
import java.util.Collection;
22+
import java.util.Map;
23+
24+
import com.rabbitmq.client.amqp.Consumer;
25+
import com.rabbitmq.client.amqp.Resource;
26+
import org.aopalliance.aop.Advice;
27+
import org.jspecify.annotations.Nullable;
28+
29+
import org.springframework.amqp.core.Address;
30+
import org.springframework.amqp.core.AmqpAcknowledgment;
31+
import org.springframework.amqp.core.MessagePostProcessor;
32+
import org.springframework.amqp.core.MessageProperties;
33+
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
34+
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
35+
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
36+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate;
37+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
38+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
39+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener;
40+
import org.springframework.amqp.support.converter.MessageConverter;
41+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
42+
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
43+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
44+
import org.springframework.integration.acks.AcknowledgmentCallback;
45+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
46+
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
47+
import org.springframework.integration.core.Pausable;
48+
import org.springframework.integration.gateway.MessagingGatewaySupport;
49+
import org.springframework.messaging.Message;
50+
import org.springframework.scheduling.TaskScheduler;
51+
import org.springframework.util.Assert;
52+
import org.springframework.util.StringUtils;
53+
54+
/**
55+
* A {@link MessagingGatewaySupport} implementation for AMQP 1.0 client.
56+
* <p>
57+
* Based on the {@link RabbitAmqpListenerContainer} and requires an {@link AmqpConnectionFactory}.
58+
* An internal {@link RabbitAmqpTemplate} is used to send replies.
59+
*
60+
* @author Artem Bilan
61+
*
62+
* @since 7.0
63+
*
64+
* @see RabbitAmqpListenerContainer
65+
* @see RabbitAmqpTemplate
66+
* @see org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter
67+
*/
68+
public class AmqpClientInboundGateway extends MessagingGatewaySupport implements Pausable {
69+
70+
private final RabbitAmqpListenerContainer listenerContainer;
71+
72+
private final RabbitAmqpTemplate replyTemplate;
73+
74+
private @Nullable MessageConverter messageConverter = new SimpleMessageConverter();
75+
76+
private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
77+
78+
private @Nullable Collection<MessagePostProcessor> afterReceivePostProcessors;
79+
80+
private @Nullable ReplyPostProcessor replyPostProcessor;
81+
82+
private volatile boolean paused;
83+
84+
public AmqpClientInboundGateway(AmqpConnectionFactory connectionFactory, String... queueNames) {
85+
this.listenerContainer = new RabbitAmqpListenerContainer(connectionFactory);
86+
this.listenerContainer.setQueueNames(queueNames);
87+
this.replyTemplate = new RabbitAmqpTemplate(connectionFactory);
88+
}
89+
90+
public void setInitialCredits(int initialCredits) {
91+
this.listenerContainer.setInitialCredits(initialCredits);
92+
}
93+
94+
public void setPriority(int priority) {
95+
this.listenerContainer.setPriority(priority);
96+
}
97+
98+
public void setStateListeners(Resource.StateListener... stateListeners) {
99+
this.listenerContainer.setStateListeners(stateListeners);
100+
}
101+
102+
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
103+
this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
104+
}
105+
106+
@Override
107+
public void setTaskScheduler(TaskScheduler taskScheduler) {
108+
this.listenerContainer.setTaskScheduler(taskScheduler);
109+
}
110+
111+
public void setAdviceChain(Advice... advices) {
112+
this.listenerContainer.setAdviceChain(advices);
113+
}
114+
115+
public void setAutoSettle(boolean autoSettle) {
116+
this.listenerContainer.setAutoSettle(autoSettle);
117+
}
118+
119+
public void setDefaultRequeue(boolean defaultRequeue) {
120+
this.listenerContainer.setDefaultRequeue(defaultRequeue);
121+
}
122+
123+
public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
124+
this.listenerContainer.setGracefulShutdownPeriod(gracefulShutdownPeriod);
125+
}
126+
127+
public void setConsumersPerQueue(int consumersPerQueue) {
128+
this.listenerContainer.setConsumersPerQueue(consumersPerQueue);
129+
}
130+
131+
/**
132+
* Set a {@link MessageConverter} to replace the default {@link SimpleMessageConverter}.
133+
* If set to null, an AMQP message is sent as is into a {@link Message} payload.
134+
* And a reply message has to return an AMQP message as its payload.
135+
* @param messageConverter the {@link MessageConverter} to use or null.
136+
*/
137+
public void setMessageConverter(@Nullable MessageConverter messageConverter) {
138+
this.messageConverter = messageConverter;
139+
}
140+
141+
public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
142+
this.headerMapper = headerMapper;
143+
}
144+
145+
public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) {
146+
this.replyPostProcessor = replyPostProcessor;
147+
}
148+
149+
/**
150+
* Set a default {@code exchange} for sending replies
151+
* if {@code replyTo} address is not provided in the request message.
152+
* Mutually exclusive with {@link #setReplyQueue(String)}.
153+
* @param exchange the default exchange for sending replies
154+
*/
155+
public void setReplyExchange(String exchange) {
156+
this.replyTemplate.setExchange(exchange);
157+
}
158+
159+
/**
160+
* Set a default {@code routingKey} for sending replies
161+
* if {@code replyTo} address is not provided in the request message.
162+
* Used only if {@link #setReplyExchange(String)} is provided.
163+
* @param routingKey the default routing key for sending replies
164+
*/
165+
public void setReplyRoutingKey(String routingKey) {
166+
this.replyTemplate.setRoutingKey(routingKey);
167+
}
168+
169+
/**
170+
* Set a default {@code queue} for sending replies
171+
* if {@code replyTo} address is not provided in the request message.
172+
* Mutually exclusive with {@link #setReplyExchange(String)}.
173+
* @param queue the default queue for sending replies
174+
*/
175+
public void setReplyQueue(String queue) {
176+
this.replyTemplate.setQueue(queue);
177+
}
178+
179+
@Override
180+
public String getComponentType() {
181+
return "amqp:inbound-gateway";
182+
}
183+
184+
@Override
185+
protected void onInit() {
186+
super.onInit();
187+
this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer");
188+
this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener());
189+
this.listenerContainer.afterPropertiesSet();
190+
}
191+
192+
@Override
193+
protected void doStart() {
194+
super.doStart();
195+
this.listenerContainer.start();
196+
}
197+
198+
@Override
199+
protected void doStop() {
200+
super.doStop();
201+
this.listenerContainer.stop();
202+
}
203+
204+
@Override
205+
public void destroy() {
206+
super.destroy();
207+
this.listenerContainer.destroy();
208+
this.replyTemplate.destroy();
209+
}
210+
211+
@Override
212+
public void pause() {
213+
this.listenerContainer.pause();
214+
this.paused = true;
215+
}
216+
217+
@Override
218+
public void resume() {
219+
this.listenerContainer.resume();
220+
this.paused = false;
221+
}
222+
223+
@Override
224+
public boolean isPaused() {
225+
return this.paused;
226+
}
227+
228+
private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener {
229+
230+
@Override
231+
public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) {
232+
org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context);
233+
Message<?> messageToSend = toSpringMessage(message);
234+
try {
235+
Message<?> receivedMessage = sendAndReceiveMessage(messageToSend);
236+
if (receivedMessage != null) {
237+
org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, message);
238+
publishReply(message, replyMessage);
239+
}
240+
else {
241+
logger.warn(() -> "No reply received for message: " + amqpMessage);
242+
}
243+
}
244+
catch (Exception ex) {
245+
throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message);
246+
}
247+
}
248+
249+
private Message<?> toSpringMessage(org.springframework.amqp.core.Message message) {
250+
if (AmqpClientInboundGateway.this.afterReceivePostProcessors != null) {
251+
for (MessagePostProcessor processor : AmqpClientInboundGateway.this.afterReceivePostProcessors) {
252+
message = processor.postProcessMessage(message);
253+
}
254+
}
255+
MessageProperties messageProperties = message.getMessageProperties();
256+
AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment();
257+
AmqpAcknowledgmentCallback acknowledgmentCallback = null;
258+
if (amqpAcknowledgment != null) {
259+
acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment);
260+
}
261+
262+
Object payload = message;
263+
Map<String, @Nullable Object> headers = null;
264+
if (AmqpClientInboundGateway.this.messageConverter != null) {
265+
payload = AmqpClientInboundGateway.this.messageConverter.fromMessage(message);
266+
headers = AmqpClientInboundGateway.this.headerMapper.toHeadersFromRequest(messageProperties);
267+
}
268+
269+
return getMessageBuilderFactory()
270+
.withPayload(payload)
271+
.copyHeaders(headers)
272+
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback)
273+
.build();
274+
}
275+
276+
private org.springframework.amqp.core.Message fromSpringMessage(Message<?> receivedMessage,
277+
org.springframework.amqp.core.Message requestMessage) {
278+
279+
org.springframework.amqp.core.Message replyMessage;
280+
MessageProperties messageProperties = new MessageProperties();
281+
Object payload = receivedMessage.getPayload();
282+
if (payload instanceof org.springframework.amqp.core.Message amqpMessage) {
283+
replyMessage = amqpMessage;
284+
}
285+
else {
286+
Assert.state(AmqpClientInboundGateway.this.messageConverter != null,
287+
"If reply payload is not an 'org.springframework.amqp.core.Message', " +
288+
"the 'messageConverter' must be provided.");
289+
290+
replyMessage = AmqpClientInboundGateway.this.messageConverter.toMessage(payload, messageProperties);
291+
AmqpClientInboundGateway.this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(),
292+
messageProperties);
293+
}
294+
295+
postProcessResponse(requestMessage, replyMessage);
296+
if (AmqpClientInboundGateway.this.replyPostProcessor != null) {
297+
replyMessage = AmqpClientInboundGateway.this.replyPostProcessor.apply(requestMessage, replyMessage);
298+
}
299+
300+
return replyMessage;
301+
}
302+
303+
private void publishReply(org.springframework.amqp.core.Message requestMessage,
304+
org.springframework.amqp.core.Message replyMessage) {
305+
306+
Address replyTo = requestMessage.getMessageProperties().getReplyToAddress();
307+
if (replyTo != null) {
308+
String exchangeName = replyTo.getExchangeName();
309+
String routingKey = replyTo.getRoutingKey();
310+
if (StringUtils.hasText(exchangeName)) {
311+
AmqpClientInboundGateway.this.replyTemplate.send(exchangeName, routingKey, replyMessage).join();
312+
}
313+
else {
314+
Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage.");
315+
String queue = routingKey.replaceFirst("queues/", "");
316+
AmqpClientInboundGateway.this.replyTemplate.send(queue, replyMessage).join();
317+
}
318+
}
319+
else {
320+
AmqpClientInboundGateway.this.replyTemplate.send(replyMessage).join();
321+
}
322+
}
323+
324+
@Override
325+
public void onMessage(org.springframework.amqp.core.Message message) {
326+
throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'");
327+
}
328+
329+
/**
330+
* Post-process the given response message before it will be sent.
331+
* The default implementation sets the response's correlation id to the request message's correlation id, if any;
332+
* otherwise to the request message id.
333+
* @param request the original incoming Rabbit message
334+
* @param response the outgoing Rabbit message about to be sent
335+
*/
336+
private static void postProcessResponse(org.springframework.amqp.core.Message request,
337+
org.springframework.amqp.core.Message response) {
338+
339+
String correlation = request.getMessageProperties().getCorrelationId();
340+
341+
if (correlation == null) {
342+
String messageId = request.getMessageProperties().getMessageId();
343+
if (messageId != null) {
344+
correlation = messageId;
345+
}
346+
}
347+
response.getMessageProperties().setCorrelationId(correlation);
348+
}
349+
350+
}
351+
352+
/**
353+
* The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}.
354+
* @param delegate the {@link AmqpAcknowledgment} to delegate to.
355+
*/
356+
private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback {
357+
358+
@Override
359+
public void acknowledge(Status status) {
360+
this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name()));
361+
}
362+
363+
@Override
364+
public boolean isAutoAck() {
365+
return false;
366+
}
367+
368+
}
369+
370+
}

0 commit comments

Comments
 (0)