Skip to content

Commit 7ffeb67

Browse files
committed
* Add AmqpClientMessageProducer implementation to consume message from RabbitMQ AMQP 1.0
* Fix typos and language in docs and Javadocs * Add tests and docs about new `AmqpClientMessageProducer` * Add `whats-new.adoc` bullet for a new `spring-integration-amqp` feature
1 parent f7efd36 commit 7ffeb67

File tree

6 files changed

+500
-7
lines changed

6 files changed

+500
-7
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.inbound;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import com.rabbitmq.client.amqp.Consumer;
27+
import com.rabbitmq.client.amqp.Resource;
28+
import org.aopalliance.aop.Advice;
29+
import org.jspecify.annotations.Nullable;
30+
31+
import org.springframework.amqp.core.AmqpAcknowledgment;
32+
import org.springframework.amqp.core.MessagePostProcessor;
33+
import org.springframework.amqp.core.MessageProperties;
34+
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
35+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
36+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
37+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener;
38+
import org.springframework.amqp.support.converter.MessageConverter;
39+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
40+
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
41+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
42+
import org.springframework.integration.StaticMessageHeaderAccessor;
43+
import org.springframework.integration.acks.AcknowledgmentCallback;
44+
import org.springframework.integration.acks.SimpleAcknowledgment;
45+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
46+
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
47+
import org.springframework.integration.core.Pausable;
48+
import org.springframework.integration.endpoint.MessageProducerSupport;
49+
import org.springframework.integration.support.MutableMessageBuilder;
50+
import org.springframework.messaging.Message;
51+
import org.springframework.scheduling.TaskScheduler;
52+
53+
/**
54+
* A {@link MessageProducerSupport} implementation for AMQP 1.0 client.
55+
* <p>
56+
* Based on the {@link RabbitAmqpListenerContainer} and requires an {@link AmqpConnectionFactory}.
57+
*
58+
* @author Artem Bilan
59+
*
60+
* @since 7.0
61+
*
62+
* @see RabbitAmqpListenerContainer
63+
* @see org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter
64+
*/
65+
public class AmqpClientMessageProducer extends MessageProducerSupport implements Pausable {
66+
67+
private final RabbitAmqpListenerContainer listenerContainer;
68+
69+
private @Nullable MessageConverter messageConverter = new SimpleMessageConverter();
70+
71+
private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
72+
73+
private @Nullable Collection<MessagePostProcessor> afterReceivePostProcessors;
74+
75+
private volatile boolean paused;
76+
77+
public AmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, String... queueNames) {
78+
this.listenerContainer = new RabbitAmqpListenerContainer(connectionFactory);
79+
this.listenerContainer.setQueueNames(queueNames);
80+
}
81+
82+
public void setInitialCredits(int initialCredits) {
83+
this.listenerContainer.setInitialCredits(initialCredits);
84+
}
85+
86+
public void setPriority(int priority) {
87+
this.listenerContainer.setPriority(priority);
88+
}
89+
90+
public void setStateListeners(Resource.StateListener... stateListeners) {
91+
this.listenerContainer.setStateListeners(stateListeners);
92+
}
93+
94+
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
95+
this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
96+
}
97+
98+
public void setBatchSize(int batchSize) {
99+
this.listenerContainer.setBatchSize(batchSize);
100+
}
101+
102+
public void setBatchReceiveTimeout(long batchReceiveTimeout) {
103+
this.listenerContainer.setBatchReceiveTimeout(batchReceiveTimeout);
104+
}
105+
106+
@Override
107+
public void setTaskScheduler(TaskScheduler taskScheduler) {
108+
this.listenerContainer.setTaskScheduler(taskScheduler);
109+
}
110+
111+
public void setAdviceChain(Advice... advices) {
112+
this.listenerContainer.setAdviceChain(advices);
113+
}
114+
115+
public void setAutoSettle(boolean autoSettle) {
116+
this.listenerContainer.setAutoSettle(autoSettle);
117+
}
118+
119+
public void setDefaultRequeue(boolean defaultRequeue) {
120+
this.listenerContainer.setDefaultRequeue(defaultRequeue);
121+
}
122+
123+
public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
124+
this.listenerContainer.setGracefulShutdownPeriod(gracefulShutdownPeriod);
125+
}
126+
127+
public void setConsumersPerQueue(int consumersPerQueue) {
128+
this.listenerContainer.setConsumersPerQueue(consumersPerQueue);
129+
}
130+
131+
/**
132+
* Set a {@link MessageConverter} to replace the default {@link SimpleMessageConverter}.
133+
* If set to null, an AMQP message is sent as is into a {@link Message} payload.
134+
* @param messageConverter the {@link MessageConverter} to use or null.
135+
*/
136+
public void setMessageConverter(@Nullable MessageConverter messageConverter) {
137+
this.messageConverter = messageConverter;
138+
}
139+
140+
public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
141+
this.headerMapper = headerMapper;
142+
}
143+
144+
@Override
145+
public String getComponentType() {
146+
return "amqp:inbound-channel-adapter";
147+
}
148+
149+
@Override
150+
protected void onInit() {
151+
super.onInit();
152+
this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer");
153+
this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener());
154+
this.listenerContainer.afterPropertiesSet();
155+
}
156+
157+
@Override
158+
protected void doStart() {
159+
super.doStart();
160+
this.listenerContainer.start();
161+
}
162+
163+
@Override
164+
protected void doStop() {
165+
super.doStop();
166+
this.listenerContainer.stop();
167+
}
168+
169+
@Override
170+
public void destroy() {
171+
super.destroy();
172+
this.listenerContainer.destroy();
173+
}
174+
175+
@Override
176+
public void pause() {
177+
this.listenerContainer.pause();
178+
this.paused = true;
179+
}
180+
181+
@Override
182+
public void resume() {
183+
this.listenerContainer.resume();
184+
this.paused = false;
185+
}
186+
187+
@Override
188+
public boolean isPaused() {
189+
return this.paused;
190+
}
191+
192+
private class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener {
193+
194+
@Override
195+
public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) {
196+
org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context);
197+
Message<?> messageToSend = toSpringMessage(message);
198+
199+
sendMessage(messageToSend);
200+
}
201+
202+
@Override
203+
public void onMessageBatch(List<org.springframework.amqp.core.Message> messages) {
204+
SimpleAcknowledgment acknowledgmentCallback = null;
205+
List<Message<?>> springMessages = new ArrayList<>(messages.size());
206+
for (org.springframework.amqp.core.Message message : messages) {
207+
Message<?> springMessage = toSpringMessage(message);
208+
if (acknowledgmentCallback == null) {
209+
acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgment(springMessage);
210+
}
211+
springMessages.add(springMessage);
212+
}
213+
214+
Message<List<Message<?>>> messageToSend =
215+
MutableMessageBuilder.withPayload(springMessages)
216+
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback)
217+
.build();
218+
219+
sendMessage(messageToSend);
220+
}
221+
222+
private Message<?> toSpringMessage(org.springframework.amqp.core.Message message) {
223+
if (AmqpClientMessageProducer.this.afterReceivePostProcessors != null) {
224+
for (MessagePostProcessor processor : AmqpClientMessageProducer.this.afterReceivePostProcessors) {
225+
message = processor.postProcessMessage(message);
226+
}
227+
}
228+
MessageProperties messageProperties = message.getMessageProperties();
229+
AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment();
230+
AmqpAcknowledgmentCallback acknowledgmentCallback = null;
231+
if (amqpAcknowledgment != null) {
232+
acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment);
233+
}
234+
235+
Object payload = message;
236+
Map<String, @Nullable Object> headers = null;
237+
if (AmqpClientMessageProducer.this.messageConverter != null) {
238+
payload = AmqpClientMessageProducer.this.messageConverter.fromMessage(message);
239+
headers = AmqpClientMessageProducer.this.headerMapper.toHeadersFromRequest(messageProperties);
240+
}
241+
242+
return getMessageBuilderFactory()
243+
.withPayload(payload)
244+
.copyHeaders(headers)
245+
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback)
246+
.build();
247+
}
248+
249+
@Override
250+
public void onMessage(org.springframework.amqp.core.Message message) {
251+
throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'");
252+
}
253+
254+
}
255+
256+
/**
257+
* The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}.
258+
* @param delegate the {@link AmqpAcknowledgment} to delegate to.
259+
*/
260+
private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback {
261+
262+
@Override
263+
public void acknowledge(Status status) {
264+
this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name()));
265+
}
266+
267+
@Override
268+
public boolean isAutoAck() {
269+
return false;
270+
}
271+
272+
}
273+
274+
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AmqpClientMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
* <p>
5555
* The {@link #exchangeExpression}, {@link #routingKeyExpression} and {@link #queueExpression}
5656
* are optional.
57-
* Then they have to be supplied by the {@link AsyncAmqpTemplate}.
57+
* In this case they have to be supplied by the {@link AsyncAmqpTemplate}.
5858
*
5959
* @author Artem Bilan
6060
*

0 commit comments

Comments
 (0)