diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java new file mode 100644 index 0000000000..ba2953d778 --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGateway.java @@ -0,0 +1,308 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; + +import com.rabbitmq.client.amqp.Resource; +import org.aopalliance.aop.Advice; +import org.jspecify.annotations.Nullable; + +import org.springframework.amqp.core.Address; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor; +import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.core.Pausable; +import org.springframework.integration.gateway.MessagingGatewaySupport; +import org.springframework.messaging.Message; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * A {@link MessagingGatewaySupport} implementation for AMQP 1.0 client. + *

+ * Based on the {@link RabbitAmqpListenerContainer} and requires an {@link AmqpConnectionFactory}. + * An internal {@link RabbitAmqpTemplate} is used to send replies. + * + * @author Artem Bilan + * + * @since 7.0 + * + * @see RabbitAmqpListenerContainer + * @see RabbitAmqpTemplate + * @see org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter + */ +public class AmqpClientInboundGateway extends MessagingGatewaySupport implements Pausable { + + private final RabbitAmqpListenerContainer listenerContainer; + + private final RabbitAmqpTemplate replyTemplate; + + private @Nullable MessageConverter messageConverter = new SimpleMessageConverter(); + + private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper(); + + private @Nullable Collection afterReceivePostProcessors; + + private @Nullable ReplyPostProcessor replyPostProcessor; + + private volatile boolean paused; + + public AmqpClientInboundGateway(AmqpConnectionFactory connectionFactory, String... queueNames) { + this.listenerContainer = new RabbitAmqpListenerContainer(connectionFactory); + this.listenerContainer.setQueueNames(queueNames); + this.replyTemplate = new RabbitAmqpTemplate(connectionFactory); + } + + public void setInitialCredits(int initialCredits) { + this.listenerContainer.setInitialCredits(initialCredits); + } + + public void setPriority(int priority) { + this.listenerContainer.setPriority(priority); + } + + public void setStateListeners(Resource.StateListener... stateListeners) { + this.listenerContainer.setStateListeners(stateListeners); + } + + public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) { + this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors)); + } + + @Override + public void setTaskScheduler(TaskScheduler taskScheduler) { + this.listenerContainer.setTaskScheduler(taskScheduler); + } + + public void setAdviceChain(Advice... advices) { + this.listenerContainer.setAdviceChain(advices); + } + + public void setAutoSettle(boolean autoSettle) { + this.listenerContainer.setAutoSettle(autoSettle); + } + + public void setDefaultRequeue(boolean defaultRequeue) { + this.listenerContainer.setDefaultRequeue(defaultRequeue); + } + + public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) { + this.listenerContainer.setGracefulShutdownPeriod(gracefulShutdownPeriod); + } + + public void setConsumersPerQueue(int consumersPerQueue) { + this.listenerContainer.setConsumersPerQueue(consumersPerQueue); + } + + /** + * Set a {@link MessageConverter} to replace the default {@link SimpleMessageConverter}. + * If set to null, an AMQP message is sent as is into a {@link Message} payload. + * And a reply message has to return an AMQP message as its payload. + * @param messageConverter the {@link MessageConverter} to use or null. + */ + public void setMessageConverter(@Nullable MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + public void setHeaderMapper(AmqpHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } + + public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) { + this.replyPostProcessor = replyPostProcessor; + } + + /** + * Set a default {@code exchange} for sending replies + * if {@code replyTo} address is not provided in the request message. + * Mutually exclusive with {@link #setReplyQueue(String)}. + * @param exchange the default exchange for sending replies + */ + public void setReplyExchange(String exchange) { + this.replyTemplate.setExchange(exchange); + } + + /** + * Set a default {@code routingKey} for sending replies + * if {@code replyTo} address is not provided in the request message. + * Used only if {@link #setReplyExchange(String)} is provided. + * @param routingKey the default routing key for sending replies + */ + public void setReplyRoutingKey(String routingKey) { + this.replyTemplate.setRoutingKey(routingKey); + } + + /** + * Set a default {@code queue} for sending replies + * if {@code replyTo} address is not provided in the request message. + * Mutually exclusive with {@link #setReplyExchange(String)}. + * @param queue the default queue for sending replies + */ + public void setReplyQueue(String queue) { + this.replyTemplate.setQueue(queue); + } + + @Override + public String getComponentType() { + return "amqp:inbound-gateway"; + } + + @Override + protected void onInit() { + super.onInit(); + this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer"); + IntegrationRabbitAmqpMessageListener messageListener = + new IntegrationRabbitAmqpMessageListener(this, this::processRequest, this.headerMapper, + this.messageConverter, this.afterReceivePostProcessors); + this.listenerContainer.setupMessageListener(messageListener); + this.listenerContainer.afterPropertiesSet(); + } + + @Override + protected void doStart() { + super.doStart(); + this.listenerContainer.start(); + } + + @Override + protected void doStop() { + super.doStop(); + this.listenerContainer.stop(); + } + + @Override + public void destroy() { + super.destroy(); + this.listenerContainer.destroy(); + this.replyTemplate.destroy(); + } + + @Override + public void pause() { + this.listenerContainer.pause(); + this.paused = true; + } + + @Override + public void resume() { + this.listenerContainer.resume(); + this.paused = false; + } + + @Override + public boolean isPaused() { + return this.paused; + } + + /** + * Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}. + * @param messageToSend the message to produce from this endpoint. + * @param requestMessage the request AMQP message. + */ + private void processRequest(Message messageToSend, org.springframework.amqp.core.Message requestMessage) { + Message receivedMessage = sendAndReceiveMessage(messageToSend); + if (receivedMessage != null) { + org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, requestMessage); + publishReply(requestMessage, replyMessage); + } + else { + this.logger.warn(() -> "No reply received for message: " + requestMessage); + } + } + + private org.springframework.amqp.core.Message fromSpringMessage(Message receivedMessage, + org.springframework.amqp.core.Message requestMessage) { + + org.springframework.amqp.core.Message replyMessage; + MessageProperties messageProperties = new MessageProperties(); + Object payload = receivedMessage.getPayload(); + if (payload instanceof org.springframework.amqp.core.Message amqpMessage) { + replyMessage = amqpMessage; + } + else { + Assert.state(this.messageConverter != null, + "If reply payload is not an 'org.springframework.amqp.core.Message', " + + "the 'messageConverter' must be provided."); + + replyMessage = this.messageConverter.toMessage(payload, messageProperties); + this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(), + messageProperties); + } + + postProcessResponse(requestMessage, replyMessage); + if (this.replyPostProcessor != null) { + replyMessage = this.replyPostProcessor.apply(requestMessage, replyMessage); + } + + return replyMessage; + } + + private void publishReply(org.springframework.amqp.core.Message requestMessage, + org.springframework.amqp.core.Message replyMessage) { + + Address replyTo = requestMessage.getMessageProperties().getReplyToAddress(); + if (replyTo != null) { + String exchangeName = replyTo.getExchangeName(); + String routingKey = replyTo.getRoutingKey(); + if (StringUtils.hasText(exchangeName)) { + this.replyTemplate.send(exchangeName, routingKey, replyMessage).join(); + } + else { + Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage."); + String queue = routingKey.replaceFirst("queues/", ""); + this.replyTemplate.send(queue, replyMessage).join(); + } + } + else { + this.replyTemplate.send(replyMessage).join(); + } + } + + /** + * Post-process the given response message before it will be sent. + * The default implementation sets the response's correlation id to the request message's correlation id, if any; + * otherwise to the request message id. + * @param request the original incoming Rabbit message + * @param response the outgoing Rabbit message about to be sent + */ + private static void postProcessResponse(org.springframework.amqp.core.Message request, + org.springframework.amqp.core.Message response) { + + String correlation = request.getMessageProperties().getCorrelationId(); + + if (correlation == null) { + String messageId = request.getMessageProperties().getMessageId(); + if (messageId != null) { + correlation = messageId; + } + } + response.getMessageProperties().setCorrelationId(correlation); + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java index f9b23fe276..f92c8cb4cd 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java @@ -17,37 +17,23 @@ package org.springframework.integration.amqp.inbound; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.List; -import java.util.Map; -import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.Resource; import org.aopalliance.aop.Advice; import org.jspecify.annotations.Nullable; -import org.springframework.amqp.core.AmqpAcknowledgment; import org.springframework.amqp.core.MessagePostProcessor; -import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; -import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils; import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; -import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils; -import org.springframework.integration.IntegrationMessageHeaderAccessor; -import org.springframework.integration.StaticMessageHeaderAccessor; -import org.springframework.integration.acks.AcknowledgmentCallback; -import org.springframework.integration.acks.SimpleAcknowledgment; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.core.Pausable; import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.integration.support.MutableMessageBuilder; import org.springframework.messaging.Message; import org.springframework.scheduling.TaskScheduler; @@ -151,7 +137,10 @@ public String getComponentType() { protected void onInit() { super.onInit(); this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer"); - this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener()); + IntegrationRabbitAmqpMessageListener messageListener = + new IntegrationRabbitAmqpMessageListener(this, this::processRequest, this.headerMapper, + this.messageConverter, this.afterReceivePostProcessors); + this.listenerContainer.setupMessageListener(messageListener); this.listenerContainer.afterPropertiesSet(); } @@ -190,97 +179,15 @@ public boolean isPaused() { return this.paused; } - private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener { - - @Override - public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) { - org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context); - Message messageToSend = toSpringMessage(message); - - try { - sendMessage(messageToSend); - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message); - } - } - - @Override - public void onMessageBatch(List messages) { - SimpleAcknowledgment acknowledgmentCallback = null; - List> springMessages = new ArrayList<>(messages.size()); - for (org.springframework.amqp.core.Message message : messages) { - Message springMessage = toSpringMessage(message); - if (acknowledgmentCallback == null) { - acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgment(springMessage); - } - springMessages.add(springMessage); - } - - Message>> messageToSend = - MutableMessageBuilder.withPayload(springMessages) - .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) - .build(); - - try { - sendMessage(messageToSend); - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(getComponentName() + ".onMessageBatch() failed", ex, - messages.toArray(org.springframework.amqp.core.Message[]::new)); - } - } - - private Message toSpringMessage(org.springframework.amqp.core.Message message) { - if (AmqpClientMessageProducer.this.afterReceivePostProcessors != null) { - for (MessagePostProcessor processor : AmqpClientMessageProducer.this.afterReceivePostProcessors) { - message = processor.postProcessMessage(message); - } - } - MessageProperties messageProperties = message.getMessageProperties(); - AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment(); - AmqpAcknowledgmentCallback acknowledgmentCallback = null; - if (amqpAcknowledgment != null) { - acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment); - } - - Object payload = message; - Map headers = null; - if (AmqpClientMessageProducer.this.messageConverter != null) { - payload = AmqpClientMessageProducer.this.messageConverter.fromMessage(message); - headers = AmqpClientMessageProducer.this.headerMapper.toHeadersFromRequest(messageProperties); - } - - return getMessageBuilderFactory() - .withPayload(payload) - .copyHeaders(headers) - .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) - .build(); - } - - @Override - public void onMessage(org.springframework.amqp.core.Message message) { - throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'"); - } - - } - /** - * The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}. - * @param delegate the {@link AmqpAcknowledgment} to delegate to. + * Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}. + * @param messageToSend the message to produce from this endpoint. + * @param requestMessage the request AMQP message. Ignored in this implementation. */ - private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback { - - @Override - public void acknowledge(Status status) { - this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name())); - } - - @Override - public boolean isAutoAck() { - return false; - } + private void processRequest(Message messageToSend, + org.springframework.amqp.core.@Nullable Message requestMessage) { + sendMessage(messageToSend); } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java new file mode 100644 index 0000000000..ffe88ba40c --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/IntegrationRabbitAmqpMessageListener.java @@ -0,0 +1,159 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import com.rabbitmq.client.amqp.Consumer; +import org.jspecify.annotations.Nullable; + +import org.springframework.amqp.core.AmqpAcknowledgment; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.MutableMessageBuilder; +import org.springframework.integration.support.context.NamedComponent; +import org.springframework.messaging.Message; + +/** + * An internal {@link RabbitAmqpMessageListener} implementation for the AMQP 1.0 endpoints. + * + * @param amqpInboundEndpoint the endpoint this listener is used in + * @param requestAction the action to perform on the message to produce from the endpoint + * @param headerMapper a mapper to convert AMQP headers to Spring Integration headers + * @param messageConverter the message converter from AMQP message to the Spring Integration message + * @param afterReceivePostProcessors the post-processors to apply on the received AMQP message + * + * @author Artem Bilan + * + * @since 7.0 + */ +record IntegrationRabbitAmqpMessageListener(NamedComponent amqpInboundEndpoint, + BiConsumer, org.springframework.amqp.core.@Nullable Message> requestAction, + AmqpHeaderMapper headerMapper, @Nullable MessageConverter messageConverter, + @Nullable Collection afterReceivePostProcessors) + implements RabbitAmqpMessageListener { + + @Override + public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) { + org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context); + Message messageToSend = toSpringMessage(message); + try { + this.requestAction.accept(messageToSend, message); + } + catch (Exception ex) { + throw new ListenerExecutionFailedException( + this.amqpInboundEndpoint.getComponentName() + ".onAmqpMessage() failed", ex, message); + } + } + + @Override + public void onMessageBatch(List messages) { + SimpleAcknowledgment acknowledgmentCallback = null; + List> springMessages = new ArrayList<>(messages.size()); + for (org.springframework.amqp.core.Message message : messages) { + Message springMessage = toSpringMessage(message); + if (acknowledgmentCallback == null) { + acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgment(springMessage); + } + springMessages.add(springMessage); + } + + Message>> messageToSend = + MutableMessageBuilder.withPayload(springMessages) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) + .build(); + + try { + this.requestAction.accept(messageToSend, null); + } + catch (Exception ex) { + throw new ListenerExecutionFailedException( + this.amqpInboundEndpoint.getComponentName() + ".onMessageBatch() failed", ex, + messages.toArray(org.springframework.amqp.core.Message[]::new)); + } + } + + private Message toSpringMessage(org.springframework.amqp.core.Message message) { + if (this.afterReceivePostProcessors != null) { + for (MessagePostProcessor processor : this.afterReceivePostProcessors) { + message = processor.postProcessMessage(message); + } + } + MessageProperties messageProperties = message.getMessageProperties(); + AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment(); + AmqpAcknowledgmentCallback acknowledgmentCallback = null; + if (amqpAcknowledgment != null) { + acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment); + } + + Object payload = message; + Map headers = null; + if (this.messageConverter != null) { + payload = this.messageConverter.fromMessage(message); + headers = this.headerMapper.toHeadersFromRequest(messageProperties); + } + + return MessageBuilder.withPayload(payload) + .copyHeaders(headers) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) + .build(); + } + + @Override + public void onMessage(org.springframework.amqp.core.Message message) { + throw new UnsupportedOperationException( + "The 'IntegrationRabbitAmqpMessageListener' does not implement 'onMessage()'"); + } + + /** + * The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}. + * + * @param delegate the {@link AmqpAcknowledgment} to delegate to. + * + * @author Artem Bilan + * + * @since 7.0 + */ + record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback { + + @Override + public void acknowledge(Status status) { + this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name())); + } + + @Override + public boolean isAutoAck() { + return false; + } + + } + +} diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGatewayTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGatewayTests.java new file mode 100644 index 0000000000..4477472d92 --- /dev/null +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientInboundGatewayTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Declarables; +import org.springframework.amqp.core.MessageBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpAdmin; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; +import org.springframework.amqp.rabbitmq.client.SingleAmqpConnectionFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.amqp.support.RabbitTestContainer; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 7.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class AmqpClientInboundGatewayTests implements RabbitTestContainer { + + @Autowired + RabbitAmqpTemplate rabbitTemplate; + + @Autowired + ContextConfiguration contextConfiguration; + + @Test + void inboundGatewayExchange() { + assertThat(this.rabbitTemplate.convertSendAndReceive("q1", "test data")) + .succeedsWithin(Duration.ofSeconds(10)) + .isEqualTo("TEST DATA"); + } + + @Test + void inboundGatewayExchangeWithAck() throws InterruptedException { + org.springframework.amqp.core.Message requestMessage = + MessageBuilder.withBody("test data #2".getBytes()) + .setMessageId("someMessageId") + .setContentType(MimeTypeUtils.TEXT_PLAIN_VALUE) + .build(); + + this.rabbitTemplate.send("q2", requestMessage); + + assertThat(this.rabbitTemplate.receive("replyQueue")) + .succeedsWithin(Duration.ofSeconds(10)) + .satisfies(message -> { + assertThat(message.getMessageProperties().getCorrelationId()).isEqualTo("someMessageId"); + assertThat(message.getBody()).isEqualTo("TEST DATA #2".getBytes()); + }); + + assertThat(this.contextConfiguration.acknowledged.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Configuration(proxyBeanMethods = false) + @EnableIntegration + public static class ContextConfiguration { + + @Bean + Environment environment() { + return new AmqpEnvironmentBuilder() + .connectionSettings() + .port(RabbitTestContainer.amqpPort()) + .environmentBuilder() + .build(); + } + + @Bean + AmqpConnectionFactory connectionFactory(Environment environment) { + return new SingleAmqpConnectionFactory(environment); + } + + @Bean + RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) { + return new RabbitAmqpAdmin(connectionFactory); + } + + @Bean + Declarables declarables() { + return new Declarables(Stream.of("q1", "q2", "replyQueue").map(Queue::new).toArray(Queue[]::new)); + } + + @Bean + RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) { + return new RabbitAmqpTemplate(connectionFactory); + } + + @Bean + AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) { + AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "q1"); + amqpClientInboundGateway.setRequestChannelName("inputChannel"); + return amqpClientInboundGateway; + } + + @Bean + AmqpClientInboundGateway manualAckAmqpClientInboundGateway(AmqpConnectionFactory connectionFactory) { + AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "q2"); + amqpClientInboundGateway.setRequestChannelName("inputChannel"); + amqpClientInboundGateway.setReplyQueue("replyQueue"); + amqpClientInboundGateway.setAutoSettle(false); + return amqpClientInboundGateway; + } + + CountDownLatch acknowledged = new CountDownLatch(1); + + @ServiceActivator(inputChannel = "inputChannel") + String toUpperCase(String payload, + @Nullable @Header(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) + SimpleAcknowledgment acknowledgment) { + + try { + return payload.toUpperCase(); + } + finally { + if (acknowledgment != null) { + acknowledgment.acknowledge(); + acknowledged.countDown(); + } + } + } + + } + +} diff --git a/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc index 4833e7a43e..c8c4a0345c 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc @@ -64,16 +64,16 @@ Some of `RabbitAmqpListenerContainer` configuration options are exposed as sette By default, the `MessageConverter` is an `org.springframework.amqp.support.converter.SimpleMessageConverter` that handles String, Serializable instances, and byte arrays. Also, a default `AmqpHeaderMapper` is a xref:amqp/message-headers.adoc[`DefaultAmqpHeaderMapper.inboundMapper()`]. -The `messageConverter` option can be set to `null` to fully skip conversion (including headers mapping), and return the received AMQP message as a payload of Spring message to produce. +The `messageConverter` option can be set to `null` to fully skip conversion (including header mapping), and return the received AMQP message as a payload of the Spring message to produce. Also, the `AmqpClientMessageProducer` implements a `Pausable` contract and delegates to the respective `RabbitAmqpListenerContainer` API. When `AmqpClientMessageProducer.setBatchSize() > 1`, this channel adapter works in a batch mode. In this case received messages are gathered until the batch size is fulfilled, or `batchReceiveTimeout` period is exhausted. -All the batched AMQP messages then converted to Spring messages, and result list is produced as a payload of wrapping message to send to the `outputChannel`. -The batch mode gives some performance gain due to the settlement for all the batched message at once. +All the batched AMQP messages then converted to Spring messages, and a result list is produced as a payload of a wrapping message to send to the `outputChannel`. +The batch mode gives some performance gain due to the settlement for all the batched messages at once. -When `autoSettle` flag is set to `false`, the `AcknowledgmentCallback` instance is provided as an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` message header to make settlement decision for received message or the whole batch. +When `autoSettle` flag is set to `false`, the `AcknowledgmentCallback` instance is provided as an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` message header to make settlement decision for the received message or the whole batch. The following example demonstrates how to configure an `AmqpClientMessageProducer` as a simple inbound endpoint: @@ -89,3 +89,29 @@ AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory c return amqpClientMessageProducer; } ---- + +[[amqp-1.0-inbound-gateway]] +== AMQP 1.0 Inbound Gateway + +The `AmqpClientInboundGateway` is a `MessagingGatewaySupport` implementation for receiving request and producing replies over RabbitMQ AMQP 1.0 protocol. +It is similar to the `AmqpClientMessageProducer` mentioned above and share many `RabbitAmqpListenerContainer` configuration options. +In addition, to produce AMQP 1.0 replies, the `AmqpClientInboundGateway` uses internally a `RabbitAmqpTemplate`. + +For automatic replies correlation with their requests, a `replyTo` property of the request message must be supplied. +For example, the `RabbitAmqpTemplate.sendAndReceive()` relies on the `RpcClient` from RabbitMQ AMQP 1.0 library which generates an exclusive and auto-deleted queue. +Alternatively, the reply address could be set as a `replyExchange`(and optional `replyRoutingKey`) or `replyQueue` (but not both) on the `AmqpClientInboundGateway` which are delegated to the `RabbitAmqpTemplate` default options. +The `messageId` or `correlationId` request message properties can be used for associating with replies. +The `RpcClient` in the `RabbitAmqpTemplate.sendAndReceive()` generates one if missed. +The `AmqpClientInboundGateway` is able to map back such a correlation key into a reply message. + +The following example demonstrates how to configure an `AmqpClientInboundGateway` as a simple inbound gateway: + +[source, java] +---- +@Bean +AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) { + AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "q1"); + amqpClientInboundGateway.setRequestChannelName("inputChannel"); + return amqpClientInboundGateway; +} +---- \ No newline at end of file