From fef21b15f108002502b298b9ed6c049c17043faf Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 26 Sep 2025 15:41:50 -0400 Subject: [PATCH 1/4] GH-10090: Add `AmqpClientInboundGateway` Related to: https://github.com/spring-projects/spring-integration/issues/10090 * Add `AmqpClientInboundGateway` that is mostly a copy/paste of the `AmqpClientMessageProducer`, but adds a reply-producing logic * Cover with tests and document this new component * Fix a couple typos in the `amqp-1.0.adoc` --- .../inbound/AmqpClientInboundGateway.java | 370 ++++++++++++++++++ .../AmqpClientInboundGatewayTests.java | 162 ++++++++ .../modules/ROOT/pages/amqp/amqp-1.0.adoc | 34 +- 3 files changed, 562 insertions(+), 4 deletions(-) create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java create mode 100644 spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGatewayTests.java diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java new file mode 100644 index 0000000000..0f30b73936 --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java @@ -0,0 +1,370 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.Resource; +import org.aopalliance.aop.Advice; +import org.jspecify.annotations.Nullable; + +import org.springframework.amqp.core.Address; +import org.springframework.amqp.core.AmqpAcknowledgment; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; +import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.core.Pausable; +import org.springframework.integration.gateway.MessagingGatewaySupport; +import org.springframework.messaging.Message; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * A {@link MessagingGatewaySupport} implementation for AMQP 1.0 client. + *

+ * Based on the {@link RabbitAmqpListenerContainer} and requires an {@link AmqpConnectionFactory}. + * An internal {@link RabbitAmqpTemplate} is used to send replies. + * + * @author Artem Bilan + * + * @since 7.0 + * + * @see RabbitAmqpListenerContainer + * @see RabbitAmqpTemplate + * @see org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter + */ +public class AmqpClientInboundGateway extends MessagingGatewaySupport implements Pausable { + + private final RabbitAmqpListenerContainer listenerContainer; + + private final RabbitAmqpTemplate replyTemplate; + + private @Nullable MessageConverter messageConverter = new SimpleMessageConverter(); + + private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper(); + + private @Nullable Collection afterReceivePostProcessors; + + private @Nullable ReplyPostProcessor replyPostProcessor; + + private volatile boolean paused; + + public AmqpClientInboundGateway(AmqpConnectionFactory connectionFactory, String... queueNames) { + this.listenerContainer = new RabbitAmqpListenerContainer(connectionFactory); + this.listenerContainer.setQueueNames(queueNames); + this.replyTemplate = new RabbitAmqpTemplate(connectionFactory); + } + + public void setInitialCredits(int initialCredits) { + this.listenerContainer.setInitialCredits(initialCredits); + } + + public void setPriority(int priority) { + this.listenerContainer.setPriority(priority); + } + + public void setStateListeners(Resource.StateListener... stateListeners) { + this.listenerContainer.setStateListeners(stateListeners); + } + + public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) { + this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors)); + } + + @Override + public void setTaskScheduler(TaskScheduler taskScheduler) { + this.listenerContainer.setTaskScheduler(taskScheduler); + } + + public void setAdviceChain(Advice... advices) { + this.listenerContainer.setAdviceChain(advices); + } + + public void setAutoSettle(boolean autoSettle) { + this.listenerContainer.setAutoSettle(autoSettle); + } + + public void setDefaultRequeue(boolean defaultRequeue) { + this.listenerContainer.setDefaultRequeue(defaultRequeue); + } + + public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) { + this.listenerContainer.setGracefulShutdownPeriod(gracefulShutdownPeriod); + } + + public void setConsumersPerQueue(int consumersPerQueue) { + this.listenerContainer.setConsumersPerQueue(consumersPerQueue); + } + + /** + * Set a {@link MessageConverter} to replace the default {@link SimpleMessageConverter}. + * If set to null, an AMQP message is sent as is into a {@link Message} payload. + * And a reply message has to return an AMQP message as its payload. + * @param messageConverter the {@link MessageConverter} to use or null. + */ + public void setMessageConverter(@Nullable MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + public void setHeaderMapper(AmqpHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } + + public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) { + this.replyPostProcessor = replyPostProcessor; + } + + /** + * Set a default {@code exchange} for sending replies + * if {@code replyTo} address is not provided in the request message. + * Mutually exclusive with {@link #setReplyQueue(String)}. + * @param exchange the default exchange for sending replies + */ + public void setReplyExchange(String exchange) { + this.replyTemplate.setExchange(exchange); + } + + /** + * Set a default {@code routingKey} for sending replies + * if {@code replyTo} address is not provided in the request message. + * Used only if {@link #setReplyExchange(String)} is provided. + * @param routingKey the default routing key for sending replies + */ + public void setReplyRoutingKey(String routingKey) { + this.replyTemplate.setRoutingKey(routingKey); + } + + /** + * Set a default {@code queue} for sending replies + * if {@code replyTo} address is not provided in the request message. + * Mutually exclusive with {@link #setReplyExchange(String)}. + * @param queue the default queue for sending replies + */ + public void setReplyQueue(String queue) { + this.replyTemplate.setQueue(queue); + } + + @Override + public String getComponentType() { + return "amqp:inbound-gateway"; + } + + @Override + protected void onInit() { + super.onInit(); + this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer"); + this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener()); + this.listenerContainer.afterPropertiesSet(); + } + + @Override + protected void doStart() { + super.doStart(); + this.listenerContainer.start(); + } + + @Override + protected void doStop() { + super.doStop(); + this.listenerContainer.stop(); + } + + @Override + public void destroy() { + super.destroy(); + this.listenerContainer.destroy(); + this.replyTemplate.destroy(); + } + + @Override + public void pause() { + this.listenerContainer.pause(); + this.paused = true; + } + + @Override + public void resume() { + this.listenerContainer.resume(); + this.paused = false; + } + + @Override + public boolean isPaused() { + return this.paused; + } + + private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener { + + @Override + public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) { + org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context); + Message messageToSend = toSpringMessage(message); + try { + Message receivedMessage = sendAndReceiveMessage(messageToSend); + if (receivedMessage != null) { + org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, message); + publishReply(message, replyMessage); + } + else { + logger.warn(() -> "No reply received for message: " + amqpMessage); + } + } + catch (Exception ex) { + throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message); + } + } + + private Message toSpringMessage(org.springframework.amqp.core.Message message) { + if (AmqpClientInboundGateway.this.afterReceivePostProcessors != null) { + for (MessagePostProcessor processor : AmqpClientInboundGateway.this.afterReceivePostProcessors) { + message = processor.postProcessMessage(message); + } + } + MessageProperties messageProperties = message.getMessageProperties(); + AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment(); + AmqpAcknowledgmentCallback acknowledgmentCallback = null; + if (amqpAcknowledgment != null) { + acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment); + } + + Object payload = message; + Map headers = null; + if (AmqpClientInboundGateway.this.messageConverter != null) { + payload = AmqpClientInboundGateway.this.messageConverter.fromMessage(message); + headers = AmqpClientInboundGateway.this.headerMapper.toHeadersFromRequest(messageProperties); + } + + return getMessageBuilderFactory() + .withPayload(payload) + .copyHeaders(headers) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) + .build(); + } + + private org.springframework.amqp.core.Message fromSpringMessage(Message receivedMessage, + org.springframework.amqp.core.Message requestMessage) { + + org.springframework.amqp.core.Message replyMessage; + MessageProperties messageProperties = new MessageProperties(); + Object payload = receivedMessage.getPayload(); + if (payload instanceof org.springframework.amqp.core.Message amqpMessage) { + replyMessage = amqpMessage; + } + else { + Assert.state(AmqpClientInboundGateway.this.messageConverter != null, + "If reply payload is not an 'org.springframework.amqp.core.Message', " + + "the 'messageConverter' must be provided."); + + replyMessage = AmqpClientInboundGateway.this.messageConverter.toMessage(payload, messageProperties); + AmqpClientInboundGateway.this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(), + messageProperties); + } + + postProcessResponse(requestMessage, replyMessage); + if (AmqpClientInboundGateway.this.replyPostProcessor != null) { + replyMessage = AmqpClientInboundGateway.this.replyPostProcessor.apply(requestMessage, replyMessage); + } + + return replyMessage; + } + + private void publishReply(org.springframework.amqp.core.Message requestMessage, + org.springframework.amqp.core.Message replyMessage) { + + Address replyTo = requestMessage.getMessageProperties().getReplyToAddress(); + if (replyTo != null) { + String exchangeName = replyTo.getExchangeName(); + String routingKey = replyTo.getRoutingKey(); + if (StringUtils.hasText(exchangeName)) { + AmqpClientInboundGateway.this.replyTemplate.send(exchangeName, routingKey, replyMessage).join(); + } + else { + Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage."); + String queue = routingKey.replaceFirst("queues/", ""); + AmqpClientInboundGateway.this.replyTemplate.send(queue, replyMessage).join(); + } + } + else { + AmqpClientInboundGateway.this.replyTemplate.send(replyMessage).join(); + } + } + + @Override + public void onMessage(org.springframework.amqp.core.Message message) { + throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'"); + } + + /** + * Post-process the given response message before it will be sent. + * The default implementation sets the response's correlation id to the request message's correlation id, if any; + * otherwise to the request message id. + * @param request the original incoming Rabbit message + * @param response the outgoing Rabbit message about to be sent + */ + private static void postProcessResponse(org.springframework.amqp.core.Message request, + org.springframework.amqp.core.Message response) { + + String correlation = request.getMessageProperties().getCorrelationId(); + + if (correlation == null) { + String messageId = request.getMessageProperties().getMessageId(); + if (messageId != null) { + correlation = messageId; + } + } + response.getMessageProperties().setCorrelationId(correlation); + } + + } + + /** + * The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}. + * @param delegate the {@link AmqpAcknowledgment} to delegate to. + */ + private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback { + + @Override + public void acknowledge(Status status) { + this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name())); + } + + @Override + public boolean isAutoAck() { + return false; + } + + } + +} diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGatewayTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGatewayTests.java new file mode 100644 index 0000000000..4477472d92 --- /dev/null +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGatewayTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Declarables; +import org.springframework.amqp.core.MessageBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpAdmin; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; +import org.springframework.amqp.rabbitmq.client.SingleAmqpConnectionFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.amqp.support.RabbitTestContainer; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 7.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class AmqpClientInboundGatewayTests implements RabbitTestContainer { + + @Autowired + RabbitAmqpTemplate rabbitTemplate; + + @Autowired + ContextConfiguration contextConfiguration; + + @Test + void inboundGatewayExchange() { + assertThat(this.rabbitTemplate.convertSendAndReceive("q1", "test data")) + .succeedsWithin(Duration.ofSeconds(10)) + .isEqualTo("TEST DATA"); + } + + @Test + void inboundGatewayExchangeWithAck() throws InterruptedException { + org.springframework.amqp.core.Message requestMessage = + MessageBuilder.withBody("test data #2".getBytes()) + .setMessageId("someMessageId") + .setContentType(MimeTypeUtils.TEXT_PLAIN_VALUE) + .build(); + + this.rabbitTemplate.send("q2", requestMessage); + + assertThat(this.rabbitTemplate.receive("replyQueue")) + .succeedsWithin(Duration.ofSeconds(10)) + .satisfies(message -> { + assertThat(message.getMessageProperties().getCorrelationId()).isEqualTo("someMessageId"); + assertThat(message.getBody()).isEqualTo("TEST DATA #2".getBytes()); + }); + + assertThat(this.contextConfiguration.acknowledged.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Configuration(proxyBeanMethods = false) + @EnableIntegration + public static class ContextConfiguration { + + @Bean + Environment environment() { + return new AmqpEnvironmentBuilder() + .connectionSettings() + .port(RabbitTestContainer.amqpPort()) + .environmentBuilder() + .build(); + } + + @Bean + AmqpConnectionFactory connectionFactory(Environment environment) { + return new SingleAmqpConnectionFactory(environment); + } + + @Bean + RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) { + return new RabbitAmqpAdmin(connectionFactory); + } + + @Bean + Declarables declarables() { + return new Declarables(Stream.of("q1", "q2", "replyQueue").map(Queue::new).toArray(Queue[]::new)); + } + + @Bean + RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) { + return new RabbitAmqpTemplate(connectionFactory); + } + + @Bean + AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) { + AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "q1"); + amqpClientInboundGateway.setRequestChannelName("inputChannel"); + return amqpClientInboundGateway; + } + + @Bean + AmqpClientInboundGateway manualAckAmqpClientInboundGateway(AmqpConnectionFactory connectionFactory) { + AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "q2"); + amqpClientInboundGateway.setRequestChannelName("inputChannel"); + amqpClientInboundGateway.setReplyQueue("replyQueue"); + amqpClientInboundGateway.setAutoSettle(false); + return amqpClientInboundGateway; + } + + CountDownLatch acknowledged = new CountDownLatch(1); + + @ServiceActivator(inputChannel = "inputChannel") + String toUpperCase(String payload, + @Nullable @Header(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) + SimpleAcknowledgment acknowledgment) { + + try { + return payload.toUpperCase(); + } + finally { + if (acknowledgment != null) { + acknowledgment.acknowledge(); + acknowledged.countDown(); + } + } + } + + } + +} diff --git a/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc index 4833e7a43e..b2e71dfb1c 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc @@ -64,16 +64,16 @@ Some of `RabbitAmqpListenerContainer` configuration options are exposed as sette By default, the `MessageConverter` is an `org.springframework.amqp.support.converter.SimpleMessageConverter` that handles String, Serializable instances, and byte arrays. Also, a default `AmqpHeaderMapper` is a xref:amqp/message-headers.adoc[`DefaultAmqpHeaderMapper.inboundMapper()`]. -The `messageConverter` option can be set to `null` to fully skip conversion (including headers mapping), and return the received AMQP message as a payload of Spring message to produce. +The `messageConverter` option can be set to `null` to fully skip conversion (including header mapping), and return the received AMQP message as a payload of the Spring message to produce. Also, the `AmqpClientMessageProducer` implements a `Pausable` contract and delegates to the respective `RabbitAmqpListenerContainer` API. When `AmqpClientMessageProducer.setBatchSize() > 1`, this channel adapter works in a batch mode. In this case received messages are gathered until the batch size is fulfilled, or `batchReceiveTimeout` period is exhausted. -All the batched AMQP messages then converted to Spring messages, and result list is produced as a payload of wrapping message to send to the `outputChannel`. -The batch mode gives some performance gain due to the settlement for all the batched message at once. +All the batched AMQP messages then converted to Spring messages, and a result list is produced as a payload of a wrapping message to send to the `outputChannel`. +The batch mode gives some performance gain due to the settlement for all the batched messages at once. -When `autoSettle` flag is set to `false`, the `AcknowledgmentCallback` instance is provided as an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` message header to make settlement decision for received message or the whole batch. +When `autoSettle` flag is set to `false`, the `AcknowledgmentCallback` instance is provided as an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` message header to make settlement decision for the received message or the whole batch. The following example demonstrates how to configure an `AmqpClientMessageProducer` as a simple inbound endpoint: @@ -89,3 +89,29 @@ AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory c return amqpClientMessageProducer; } ---- + +[[amqp-1.0-inbound-gateway]] +== AMQP 1.0 Inbound Gateway + +The `AmqpClientInboundGateway` is a `MessagingGatewaySupport` implementation for receiving request and producing replies over RabbitMQ AMQP 1.0 protocol. +It is similar to the `AmqpClientMessageProducer` mentioned above and share many `RabbitAmqpListenerContainer` configuration options. +In addition, to produce AMQP 1.0 replies, the `AmqpClientInboundGateway` uses internally a `RabbitAmqpTemplate`. + +For automatic replies correlation with their requests, a `replyTo` property of the request message must be supplied. +For example, the `RabbitAmqpTemplate.sendAndReceive()` relies on the `RpcClient` from RabbitMQ AMQP 1.0 library which generates an exclusive and auto-deleted queue. +Alternatively, the reply address could be set as a `replyExchange` and `replyRoutingKey` (optional) or `replyQueue` on the `AmqpClientInboundGateway` which are delegated to the `RabbitAmqpTemplate` default options. +The `messageId` or `correllationId` request message properties can be used for associating with replies. +The `RpcClient` in the `RabbitAmqpTemplate.sendAndReceive()` generates one if missed. +The `AmqpClientInboundGateway` is able to map back such a correlation key into a reply message. + +The following example demonstrates how to configure an `AmqpClientInboundGateway` as a simple inbound gateway: + +[source, java] +---- +@Bean +AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) { + AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "q1"); + amqpClientInboundGateway.setRequestChannelName("inputChannel"); + return amqpClientInboundGateway; +} +---- \ No newline at end of file From 10839a7cc10348ebcb722f5ffb7cad53d2182916 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 26 Sep 2025 16:26:33 -0400 Subject: [PATCH 2/4] Fix docs typos --- src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc index b2e71dfb1c..c8c4a0345c 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc @@ -99,8 +99,8 @@ In addition, to produce AMQP 1.0 replies, the `AmqpClientInboundGateway` uses in For automatic replies correlation with their requests, a `replyTo` property of the request message must be supplied. For example, the `RabbitAmqpTemplate.sendAndReceive()` relies on the `RpcClient` from RabbitMQ AMQP 1.0 library which generates an exclusive and auto-deleted queue. -Alternatively, the reply address could be set as a `replyExchange` and `replyRoutingKey` (optional) or `replyQueue` on the `AmqpClientInboundGateway` which are delegated to the `RabbitAmqpTemplate` default options. -The `messageId` or `correllationId` request message properties can be used for associating with replies. +Alternatively, the reply address could be set as a `replyExchange`(and optional `replyRoutingKey`) or `replyQueue` (but not both) on the `AmqpClientInboundGateway` which are delegated to the `RabbitAmqpTemplate` default options. +The `messageId` or `correlationId` request message properties can be used for associating with replies. The `RpcClient` in the `RabbitAmqpTemplate.sendAndReceive()` generates one if missed. The `AmqpClientInboundGateway` is able to map back such a correlation key into a reply message. From c05adbba3172c2da43834a8d615aa529046df923 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 26 Sep 2025 17:22:31 -0400 Subject: [PATCH 3/4] Extract the `IntegrationRabbitAmqpMessageListener` as a top-level (package private) class. Serves as a reusable unit of work for both `AmqpClientMessageProducer` & `AmqpClientInboundGateway`. The one-way and request-reply parts are handled as a `BiConsumer` action injection into this `IntegrationRabbitAmqpMessageListener` instance. Now both `AmqpClientMessageProducer` & `AmqpClientInboundGateway` are much simpler. --- .../inbound/AmqpClientInboundGateway.java | 196 ++++++------------ .../inbound/AmqpClientMessageProducer.java | 113 +--------- .../IntegrationRabbitAmqpMessageListener.java | 158 ++++++++++++++ 3 files changed, 235 insertions(+), 232 deletions(-) create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java index 0f30b73936..ba2953d778 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java @@ -19,29 +19,21 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.Map; -import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.Resource; import org.aopalliance.aop.Advice; import org.jspecify.annotations.Nullable; import org.springframework.amqp.core.Address; -import org.springframework.amqp.core.AmqpAcknowledgment; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor; -import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; -import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils; import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; -import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils; -import org.springframework.integration.IntegrationMessageHeaderAccessor; -import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.core.Pausable; @@ -185,7 +177,10 @@ public String getComponentType() { protected void onInit() { super.onInit(); this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer"); - this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener()); + IntegrationRabbitAmqpMessageListener messageListener = + new IntegrationRabbitAmqpMessageListener(this, this::processRequest, this.headerMapper, + this.messageConverter, this.afterReceivePostProcessors); + this.listenerContainer.setupMessageListener(messageListener); this.listenerContainer.afterPropertiesSet(); } @@ -225,146 +220,89 @@ public boolean isPaused() { return this.paused; } - private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener { - - @Override - public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) { - org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context); - Message messageToSend = toSpringMessage(message); - try { - Message receivedMessage = sendAndReceiveMessage(messageToSend); - if (receivedMessage != null) { - org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, message); - publishReply(message, replyMessage); - } - else { - logger.warn(() -> "No reply received for message: " + amqpMessage); - } - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message); - } + /** + * Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}. + * @param messageToSend the message to produce from this endpoint. + * @param requestMessage the request AMQP message. + */ + private void processRequest(Message messageToSend, org.springframework.amqp.core.Message requestMessage) { + Message receivedMessage = sendAndReceiveMessage(messageToSend); + if (receivedMessage != null) { + org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, requestMessage); + publishReply(requestMessage, replyMessage); } - - private Message toSpringMessage(org.springframework.amqp.core.Message message) { - if (AmqpClientInboundGateway.this.afterReceivePostProcessors != null) { - for (MessagePostProcessor processor : AmqpClientInboundGateway.this.afterReceivePostProcessors) { - message = processor.postProcessMessage(message); - } - } - MessageProperties messageProperties = message.getMessageProperties(); - AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment(); - AmqpAcknowledgmentCallback acknowledgmentCallback = null; - if (amqpAcknowledgment != null) { - acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment); - } - - Object payload = message; - Map headers = null; - if (AmqpClientInboundGateway.this.messageConverter != null) { - payload = AmqpClientInboundGateway.this.messageConverter.fromMessage(message); - headers = AmqpClientInboundGateway.this.headerMapper.toHeadersFromRequest(messageProperties); - } - - return getMessageBuilderFactory() - .withPayload(payload) - .copyHeaders(headers) - .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) - .build(); + else { + this.logger.warn(() -> "No reply received for message: " + requestMessage); } + } - private org.springframework.amqp.core.Message fromSpringMessage(Message receivedMessage, - org.springframework.amqp.core.Message requestMessage) { + private org.springframework.amqp.core.Message fromSpringMessage(Message receivedMessage, + org.springframework.amqp.core.Message requestMessage) { - org.springframework.amqp.core.Message replyMessage; - MessageProperties messageProperties = new MessageProperties(); - Object payload = receivedMessage.getPayload(); - if (payload instanceof org.springframework.amqp.core.Message amqpMessage) { - replyMessage = amqpMessage; - } - else { - Assert.state(AmqpClientInboundGateway.this.messageConverter != null, - "If reply payload is not an 'org.springframework.amqp.core.Message', " + - "the 'messageConverter' must be provided."); + org.springframework.amqp.core.Message replyMessage; + MessageProperties messageProperties = new MessageProperties(); + Object payload = receivedMessage.getPayload(); + if (payload instanceof org.springframework.amqp.core.Message amqpMessage) { + replyMessage = amqpMessage; + } + else { + Assert.state(this.messageConverter != null, + "If reply payload is not an 'org.springframework.amqp.core.Message', " + + "the 'messageConverter' must be provided."); + + replyMessage = this.messageConverter.toMessage(payload, messageProperties); + this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(), + messageProperties); + } - replyMessage = AmqpClientInboundGateway.this.messageConverter.toMessage(payload, messageProperties); - AmqpClientInboundGateway.this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(), - messageProperties); - } + postProcessResponse(requestMessage, replyMessage); + if (this.replyPostProcessor != null) { + replyMessage = this.replyPostProcessor.apply(requestMessage, replyMessage); + } - postProcessResponse(requestMessage, replyMessage); - if (AmqpClientInboundGateway.this.replyPostProcessor != null) { - replyMessage = AmqpClientInboundGateway.this.replyPostProcessor.apply(requestMessage, replyMessage); - } + return replyMessage; + } - return replyMessage; - } + private void publishReply(org.springframework.amqp.core.Message requestMessage, + org.springframework.amqp.core.Message replyMessage) { - private void publishReply(org.springframework.amqp.core.Message requestMessage, - org.springframework.amqp.core.Message replyMessage) { - - Address replyTo = requestMessage.getMessageProperties().getReplyToAddress(); - if (replyTo != null) { - String exchangeName = replyTo.getExchangeName(); - String routingKey = replyTo.getRoutingKey(); - if (StringUtils.hasText(exchangeName)) { - AmqpClientInboundGateway.this.replyTemplate.send(exchangeName, routingKey, replyMessage).join(); - } - else { - Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage."); - String queue = routingKey.replaceFirst("queues/", ""); - AmqpClientInboundGateway.this.replyTemplate.send(queue, replyMessage).join(); - } + Address replyTo = requestMessage.getMessageProperties().getReplyToAddress(); + if (replyTo != null) { + String exchangeName = replyTo.getExchangeName(); + String routingKey = replyTo.getRoutingKey(); + if (StringUtils.hasText(exchangeName)) { + this.replyTemplate.send(exchangeName, routingKey, replyMessage).join(); } else { - AmqpClientInboundGateway.this.replyTemplate.send(replyMessage).join(); + Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage."); + String queue = routingKey.replaceFirst("queues/", ""); + this.replyTemplate.send(queue, replyMessage).join(); } } - - @Override - public void onMessage(org.springframework.amqp.core.Message message) { - throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'"); - } - - /** - * Post-process the given response message before it will be sent. - * The default implementation sets the response's correlation id to the request message's correlation id, if any; - * otherwise to the request message id. - * @param request the original incoming Rabbit message - * @param response the outgoing Rabbit message about to be sent - */ - private static void postProcessResponse(org.springframework.amqp.core.Message request, - org.springframework.amqp.core.Message response) { - - String correlation = request.getMessageProperties().getCorrelationId(); - - if (correlation == null) { - String messageId = request.getMessageProperties().getMessageId(); - if (messageId != null) { - correlation = messageId; - } - } - response.getMessageProperties().setCorrelationId(correlation); + else { + this.replyTemplate.send(replyMessage).join(); } - } /** - * The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}. - * @param delegate the {@link AmqpAcknowledgment} to delegate to. + * Post-process the given response message before it will be sent. + * The default implementation sets the response's correlation id to the request message's correlation id, if any; + * otherwise to the request message id. + * @param request the original incoming Rabbit message + * @param response the outgoing Rabbit message about to be sent */ - private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback { + private static void postProcessResponse(org.springframework.amqp.core.Message request, + org.springframework.amqp.core.Message response) { - @Override - public void acknowledge(Status status) { - this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name())); - } + String correlation = request.getMessageProperties().getCorrelationId(); - @Override - public boolean isAutoAck() { - return false; + if (correlation == null) { + String messageId = request.getMessageProperties().getMessageId(); + if (messageId != null) { + correlation = messageId; + } } - + response.getMessageProperties().setCorrelationId(correlation); } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java index f9b23fe276..66ba93c9aa 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java @@ -17,37 +17,23 @@ package org.springframework.integration.amqp.inbound; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.List; -import java.util.Map; -import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.Resource; import org.aopalliance.aop.Advice; import org.jspecify.annotations.Nullable; -import org.springframework.amqp.core.AmqpAcknowledgment; import org.springframework.amqp.core.MessagePostProcessor; -import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; -import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils; import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; -import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils; -import org.springframework.integration.IntegrationMessageHeaderAccessor; -import org.springframework.integration.StaticMessageHeaderAccessor; -import org.springframework.integration.acks.AcknowledgmentCallback; -import org.springframework.integration.acks.SimpleAcknowledgment; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.core.Pausable; import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.integration.support.MutableMessageBuilder; import org.springframework.messaging.Message; import org.springframework.scheduling.TaskScheduler; @@ -151,7 +137,10 @@ public String getComponentType() { protected void onInit() { super.onInit(); this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer"); - this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener()); + IntegrationRabbitAmqpMessageListener messageListener = + new IntegrationRabbitAmqpMessageListener(this, this::processRequest, this.headerMapper, + this.messageConverter, this.afterReceivePostProcessors); + this.listenerContainer.setupMessageListener(messageListener); this.listenerContainer.afterPropertiesSet(); } @@ -190,97 +179,15 @@ public boolean isPaused() { return this.paused; } - private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener { - - @Override - public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) { - org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context); - Message messageToSend = toSpringMessage(message); - - try { - sendMessage(messageToSend); - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message); - } - } - - @Override - public void onMessageBatch(List messages) { - SimpleAcknowledgment acknowledgmentCallback = null; - List> springMessages = new ArrayList<>(messages.size()); - for (org.springframework.amqp.core.Message message : messages) { - Message springMessage = toSpringMessage(message); - if (acknowledgmentCallback == null) { - acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgment(springMessage); - } - springMessages.add(springMessage); - } - - Message>> messageToSend = - MutableMessageBuilder.withPayload(springMessages) - .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) - .build(); - - try { - sendMessage(messageToSend); - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(getComponentName() + ".onMessageBatch() failed", ex, - messages.toArray(org.springframework.amqp.core.Message[]::new)); - } - } - - private Message toSpringMessage(org.springframework.amqp.core.Message message) { - if (AmqpClientMessageProducer.this.afterReceivePostProcessors != null) { - for (MessagePostProcessor processor : AmqpClientMessageProducer.this.afterReceivePostProcessors) { - message = processor.postProcessMessage(message); - } - } - MessageProperties messageProperties = message.getMessageProperties(); - AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment(); - AmqpAcknowledgmentCallback acknowledgmentCallback = null; - if (amqpAcknowledgment != null) { - acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment); - } - - Object payload = message; - Map headers = null; - if (AmqpClientMessageProducer.this.messageConverter != null) { - payload = AmqpClientMessageProducer.this.messageConverter.fromMessage(message); - headers = AmqpClientMessageProducer.this.headerMapper.toHeadersFromRequest(messageProperties); - } - - return getMessageBuilderFactory() - .withPayload(payload) - .copyHeaders(headers) - .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) - .build(); - } - - @Override - public void onMessage(org.springframework.amqp.core.Message message) { - throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'"); - } - - } - /** - * The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}. - * @param delegate the {@link AmqpAcknowledgment} to delegate to. + * Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}. + * @param messageToSend the message to produce from this endpoint. + * @param requestMessage the request AMQP message. */ - private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback { - - @Override - public void acknowledge(Status status) { - this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name())); - } - - @Override - public boolean isAutoAck() { - return false; - } + private void processRequest(Message messageToSend, + org.springframework.amqp.core.@Nullable Message requestMessage) { + sendMessage(messageToSend); } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java new file mode 100644 index 0000000000..56f65db111 --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java @@ -0,0 +1,158 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import com.rabbitmq.client.amqp.Consumer; +import org.jspecify.annotations.Nullable; + +import org.springframework.amqp.core.AmqpAcknowledgment; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.MutableMessageBuilder; +import org.springframework.integration.support.context.NamedComponent; +import org.springframework.messaging.Message; + +/** + * An internal {@link RabbitAmqpMessageListener} implementation for the AMQP 1.0 endpoints. + * + * @param amqpInboundEndpoint the endpoint this listener is used in + * @param requestAction the action to perform on the message to produce from the endpoint + * @param headerMapper a mapper to convert AMQP headers to Spring Integration headers + * @param messageConverter the message converter from AMQP message to the Spring Integration message + * @param afterReceivePostProcessors the post-processors to apply on the received AMQP message + * + * @author Artem Bilan + * + * @since 7.0 + */ +record IntegrationRabbitAmqpMessageListener(NamedComponent amqpInboundEndpoint, + BiConsumer, org.springframework.amqp.core.@Nullable Message> requestAction, + AmqpHeaderMapper headerMapper, @Nullable MessageConverter messageConverter, + @Nullable Collection afterReceivePostProcessors) + implements RabbitAmqpMessageListener { + + @Override + public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) { + org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context); + Message messageToSend = toSpringMessage(message); + try { + this.requestAction.accept(messageToSend, message); + } + catch (Exception ex) { + throw new ListenerExecutionFailedException( + this.amqpInboundEndpoint.getComponentName() + ".onAmqpMessage() failed", ex, message); + } + } + + @Override + public void onMessageBatch(List messages) { + SimpleAcknowledgment acknowledgmentCallback = null; + List> springMessages = new ArrayList<>(messages.size()); + for (org.springframework.amqp.core.Message message : messages) { + Message springMessage = toSpringMessage(message); + if (acknowledgmentCallback == null) { + acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgment(springMessage); + } + springMessages.add(springMessage); + } + + Message>> messageToSend = + MutableMessageBuilder.withPayload(springMessages) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) + .build(); + + try { + this.requestAction.accept(messageToSend, null); + } + catch (Exception ex) { + throw new ListenerExecutionFailedException( + this.amqpInboundEndpoint.getComponentName() + ".onMessageBatch() failed", ex, + messages.toArray(org.springframework.amqp.core.Message[]::new)); + } + } + + private Message toSpringMessage(org.springframework.amqp.core.Message message) { + if (this.afterReceivePostProcessors != null) { + for (MessagePostProcessor processor : this.afterReceivePostProcessors) { + message = processor.postProcessMessage(message); + } + } + MessageProperties messageProperties = message.getMessageProperties(); + AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment(); + AmqpAcknowledgmentCallback acknowledgmentCallback = null; + if (amqpAcknowledgment != null) { + acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment); + } + + Object payload = message; + Map headers = null; + if (this.messageConverter != null) { + payload = this.messageConverter.fromMessage(message); + headers = this.headerMapper.toHeadersFromRequest(messageProperties); + } + + return MessageBuilder.withPayload(payload) + .copyHeaders(headers) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) + .build(); + } + + @Override + public void onMessage(org.springframework.amqp.core.Message message) { + throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'"); + } + + /** + * The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}. + * + * @param delegate the {@link AmqpAcknowledgment} to delegate to. + * + * @author Artem Bilan + * + * @since 7.0 + */ + record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback { + + @Override + public void acknowledge(Status status) { + this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name())); + } + + @Override + public boolean isAutoAck() { + return false; + } + + } + +} From 73cf36c82104d357dbfba3a69847543d99ec5905 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 6 Oct 2025 13:29:12 -0400 Subject: [PATCH 4/4] * Improve `AmqpClientMessageProducer.processRequest` Javadoc mentioning that `requestMessage` param is out of use * Be more specific with the type for `UnsupportedOperationException` in the `IntegrationRabbitAmqpMessageListener` --- .../integration/amqp/inbound/AmqpClientMessageProducer.java | 2 +- .../amqp/inbound/IntegrationRabbitAmqpMessageListener.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java index 66ba93c9aa..f92c8cb4cd 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java @@ -182,7 +182,7 @@ public boolean isPaused() { /** * Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}. * @param messageToSend the message to produce from this endpoint. - * @param requestMessage the request AMQP message. + * @param requestMessage the request AMQP message. Ignored in this implementation. */ private void processRequest(Message messageToSend, org.springframework.amqp.core.@Nullable Message requestMessage) { diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java index 56f65db111..ffe88ba40c 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java @@ -129,7 +129,8 @@ private Message toSpringMessage(org.springframework.amqp.core.Message message @Override public void onMessage(org.springframework.amqp.core.Message message) { - throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'"); + throw new UnsupportedOperationException( + "The 'IntegrationRabbitAmqpMessageListener' does not implement 'onMessage()'"); } /**