Skip to content

Commit fe22890

Browse files
committed
Extract the IntegrationRabbitAmqpMessageListener as a top-level (package private) class.
Serves as a reusable unit of work for both `AmqpClientMessageProducer` & `AmqpClientInboundGateway`. The one-way and request-reply parts are handled as a `BiConsumer` action injection into this `IntegrationRabbitAmqpMessageListener` instance. Now both `AmqpClientMessageProducer` & `AmqpClientInboundGateway` are much simpler.
1 parent 6580959 commit fe22890

File tree

3 files changed

+235
-232
lines changed

3 files changed

+235
-232
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java

Lines changed: 67 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,21 @@
1919
import java.time.Duration;
2020
import java.util.Arrays;
2121
import java.util.Collection;
22-
import java.util.Map;
2322

24-
import com.rabbitmq.client.amqp.Consumer;
2523
import com.rabbitmq.client.amqp.Resource;
2624
import org.aopalliance.aop.Advice;
2725
import org.jspecify.annotations.Nullable;
2826

2927
import org.springframework.amqp.core.Address;
30-
import org.springframework.amqp.core.AmqpAcknowledgment;
3128
import org.springframework.amqp.core.MessagePostProcessor;
3229
import org.springframework.amqp.core.MessageProperties;
3330
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
34-
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
3531
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
3632
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate;
37-
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
3833
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
39-
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener;
4034
import org.springframework.amqp.support.converter.MessageConverter;
4135
import org.springframework.amqp.support.converter.SimpleMessageConverter;
4236
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
43-
import org.springframework.integration.IntegrationMessageHeaderAccessor;
44-
import org.springframework.integration.acks.AcknowledgmentCallback;
4537
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
4638
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
4739
import org.springframework.integration.core.Pausable;
@@ -185,7 +177,10 @@ public String getComponentType() {
185177
protected void onInit() {
186178
super.onInit();
187179
this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer");
188-
this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener());
180+
IntegrationRabbitAmqpMessageListener messageListener =
181+
new IntegrationRabbitAmqpMessageListener(this, this::processRequest, this.headerMapper,
182+
this.messageConverter, this.afterReceivePostProcessors);
183+
this.listenerContainer.setupMessageListener(messageListener);
189184
this.listenerContainer.afterPropertiesSet();
190185
}
191186

@@ -225,146 +220,89 @@ public boolean isPaused() {
225220
return this.paused;
226221
}
227222

228-
private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener {
229-
230-
@Override
231-
public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) {
232-
org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context);
233-
Message<?> messageToSend = toSpringMessage(message);
234-
try {
235-
Message<?> receivedMessage = sendAndReceiveMessage(messageToSend);
236-
if (receivedMessage != null) {
237-
org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, message);
238-
publishReply(message, replyMessage);
239-
}
240-
else {
241-
logger.warn(() -> "No reply received for message: " + amqpMessage);
242-
}
243-
}
244-
catch (Exception ex) {
245-
throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message);
246-
}
223+
/**
224+
* Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}.
225+
* @param messageToSend the message to produce from this endpoint.
226+
* @param requestMessage the request AMQP message.
227+
*/
228+
private void processRequest(Message<?> messageToSend, org.springframework.amqp.core.Message requestMessage) {
229+
Message<?> receivedMessage = sendAndReceiveMessage(messageToSend);
230+
if (receivedMessage != null) {
231+
org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, requestMessage);
232+
publishReply(requestMessage, replyMessage);
247233
}
248-
249-
private Message<?> toSpringMessage(org.springframework.amqp.core.Message message) {
250-
if (AmqpClientInboundGateway.this.afterReceivePostProcessors != null) {
251-
for (MessagePostProcessor processor : AmqpClientInboundGateway.this.afterReceivePostProcessors) {
252-
message = processor.postProcessMessage(message);
253-
}
254-
}
255-
MessageProperties messageProperties = message.getMessageProperties();
256-
AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment();
257-
AmqpAcknowledgmentCallback acknowledgmentCallback = null;
258-
if (amqpAcknowledgment != null) {
259-
acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment);
260-
}
261-
262-
Object payload = message;
263-
Map<String, @Nullable Object> headers = null;
264-
if (AmqpClientInboundGateway.this.messageConverter != null) {
265-
payload = AmqpClientInboundGateway.this.messageConverter.fromMessage(message);
266-
headers = AmqpClientInboundGateway.this.headerMapper.toHeadersFromRequest(messageProperties);
267-
}
268-
269-
return getMessageBuilderFactory()
270-
.withPayload(payload)
271-
.copyHeaders(headers)
272-
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback)
273-
.build();
234+
else {
235+
this.logger.warn(() -> "No reply received for message: " + requestMessage);
274236
}
237+
}
275238

276-
private org.springframework.amqp.core.Message fromSpringMessage(Message<?> receivedMessage,
277-
org.springframework.amqp.core.Message requestMessage) {
239+
private org.springframework.amqp.core.Message fromSpringMessage(Message<?> receivedMessage,
240+
org.springframework.amqp.core.Message requestMessage) {
278241

279-
org.springframework.amqp.core.Message replyMessage;
280-
MessageProperties messageProperties = new MessageProperties();
281-
Object payload = receivedMessage.getPayload();
282-
if (payload instanceof org.springframework.amqp.core.Message amqpMessage) {
283-
replyMessage = amqpMessage;
284-
}
285-
else {
286-
Assert.state(AmqpClientInboundGateway.this.messageConverter != null,
287-
"If reply payload is not an 'org.springframework.amqp.core.Message', " +
288-
"the 'messageConverter' must be provided.");
242+
org.springframework.amqp.core.Message replyMessage;
243+
MessageProperties messageProperties = new MessageProperties();
244+
Object payload = receivedMessage.getPayload();
245+
if (payload instanceof org.springframework.amqp.core.Message amqpMessage) {
246+
replyMessage = amqpMessage;
247+
}
248+
else {
249+
Assert.state(this.messageConverter != null,
250+
"If reply payload is not an 'org.springframework.amqp.core.Message', " +
251+
"the 'messageConverter' must be provided.");
252+
253+
replyMessage = this.messageConverter.toMessage(payload, messageProperties);
254+
this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(),
255+
messageProperties);
256+
}
289257

290-
replyMessage = AmqpClientInboundGateway.this.messageConverter.toMessage(payload, messageProperties);
291-
AmqpClientInboundGateway.this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(),
292-
messageProperties);
293-
}
258+
postProcessResponse(requestMessage, replyMessage);
259+
if (this.replyPostProcessor != null) {
260+
replyMessage = this.replyPostProcessor.apply(requestMessage, replyMessage);
261+
}
294262

295-
postProcessResponse(requestMessage, replyMessage);
296-
if (AmqpClientInboundGateway.this.replyPostProcessor != null) {
297-
replyMessage = AmqpClientInboundGateway.this.replyPostProcessor.apply(requestMessage, replyMessage);
298-
}
263+
return replyMessage;
264+
}
299265

300-
return replyMessage;
301-
}
266+
private void publishReply(org.springframework.amqp.core.Message requestMessage,
267+
org.springframework.amqp.core.Message replyMessage) {
302268

303-
private void publishReply(org.springframework.amqp.core.Message requestMessage,
304-
org.springframework.amqp.core.Message replyMessage) {
305-
306-
Address replyTo = requestMessage.getMessageProperties().getReplyToAddress();
307-
if (replyTo != null) {
308-
String exchangeName = replyTo.getExchangeName();
309-
String routingKey = replyTo.getRoutingKey();
310-
if (StringUtils.hasText(exchangeName)) {
311-
AmqpClientInboundGateway.this.replyTemplate.send(exchangeName, routingKey, replyMessage).join();
312-
}
313-
else {
314-
Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage.");
315-
String queue = routingKey.replaceFirst("queues/", "");
316-
AmqpClientInboundGateway.this.replyTemplate.send(queue, replyMessage).join();
317-
}
269+
Address replyTo = requestMessage.getMessageProperties().getReplyToAddress();
270+
if (replyTo != null) {
271+
String exchangeName = replyTo.getExchangeName();
272+
String routingKey = replyTo.getRoutingKey();
273+
if (StringUtils.hasText(exchangeName)) {
274+
this.replyTemplate.send(exchangeName, routingKey, replyMessage).join();
318275
}
319276
else {
320-
AmqpClientInboundGateway.this.replyTemplate.send(replyMessage).join();
277+
Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage.");
278+
String queue = routingKey.replaceFirst("queues/", "");
279+
this.replyTemplate.send(queue, replyMessage).join();
321280
}
322281
}
323-
324-
@Override
325-
public void onMessage(org.springframework.amqp.core.Message message) {
326-
throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'");
327-
}
328-
329-
/**
330-
* Post-process the given response message before it will be sent.
331-
* The default implementation sets the response's correlation id to the request message's correlation id, if any;
332-
* otherwise to the request message id.
333-
* @param request the original incoming Rabbit message
334-
* @param response the outgoing Rabbit message about to be sent
335-
*/
336-
private static void postProcessResponse(org.springframework.amqp.core.Message request,
337-
org.springframework.amqp.core.Message response) {
338-
339-
String correlation = request.getMessageProperties().getCorrelationId();
340-
341-
if (correlation == null) {
342-
String messageId = request.getMessageProperties().getMessageId();
343-
if (messageId != null) {
344-
correlation = messageId;
345-
}
346-
}
347-
response.getMessageProperties().setCorrelationId(correlation);
282+
else {
283+
this.replyTemplate.send(replyMessage).join();
348284
}
349-
350285
}
351286

352287
/**
353-
* The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}.
354-
* @param delegate the {@link AmqpAcknowledgment} to delegate to.
288+
* Post-process the given response message before it will be sent.
289+
* The default implementation sets the response's correlation id to the request message's correlation id, if any;
290+
* otherwise to the request message id.
291+
* @param request the original incoming Rabbit message
292+
* @param response the outgoing Rabbit message about to be sent
355293
*/
356-
private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback {
294+
private static void postProcessResponse(org.springframework.amqp.core.Message request,
295+
org.springframework.amqp.core.Message response) {
357296

358-
@Override
359-
public void acknowledge(Status status) {
360-
this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name()));
361-
}
297+
String correlation = request.getMessageProperties().getCorrelationId();
362298

363-
@Override
364-
public boolean isAutoAck() {
365-
return false;
299+
if (correlation == null) {
300+
String messageId = request.getMessageProperties().getMessageId();
301+
if (messageId != null) {
302+
correlation = messageId;
303+
}
366304
}
367-
305+
response.getMessageProperties().setCorrelationId(correlation);
368306
}
369307

370308
}

0 commit comments

Comments
 (0)