diff --git a/build.gradle b/build.gradle index 6535ce8ba4f..4a71a19e520 100644 --- a/build.gradle +++ b/build.gradle @@ -436,7 +436,7 @@ project('spring-integration-test-support') { project('spring-integration-amqp') { description = 'Spring Integration AMQP Support' dependencies { - api 'org.springframework.amqp:spring-rabbit' + api 'org.springframework.amqp:spring-rabbitmq-client' optionalApi 'org.springframework.amqp:spring-rabbit-stream' testImplementation 'org.springframework.amqp:spring-rabbit-junit' diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java new file mode 100644 index 00000000000..f9b23fe276d --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java @@ -0,0 +1,286 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.Resource; +import org.aopalliance.aop.Advice; +import org.jspecify.annotations.Nullable; + +import org.springframework.amqp.core.AmqpAcknowledgment; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; +import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.core.Pausable; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.support.MutableMessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.scheduling.TaskScheduler; + +/** + * A {@link MessageProducerSupport} implementation for AMQP 1.0 client. + *

+ * Based on the {@link RabbitAmqpListenerContainer} and requires an {@link AmqpConnectionFactory}. + * + * @author Artem Bilan + * + * @since 7.0 + * + * @see RabbitAmqpListenerContainer + * @see org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter + */ +public class AmqpClientMessageProducer extends MessageProducerSupport implements Pausable { + + private final RabbitAmqpListenerContainer listenerContainer; + + private @Nullable MessageConverter messageConverter = new SimpleMessageConverter(); + + private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper(); + + private @Nullable Collection afterReceivePostProcessors; + + private volatile boolean paused; + + public AmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, String... queueNames) { + this.listenerContainer = new RabbitAmqpListenerContainer(connectionFactory); + this.listenerContainer.setQueueNames(queueNames); + } + + public void setInitialCredits(int initialCredits) { + this.listenerContainer.setInitialCredits(initialCredits); + } + + public void setPriority(int priority) { + this.listenerContainer.setPriority(priority); + } + + public void setStateListeners(Resource.StateListener... stateListeners) { + this.listenerContainer.setStateListeners(stateListeners); + } + + public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) { + this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors)); + } + + public void setBatchSize(int batchSize) { + this.listenerContainer.setBatchSize(batchSize); + } + + public void setBatchReceiveTimeout(long batchReceiveTimeout) { + this.listenerContainer.setBatchReceiveTimeout(batchReceiveTimeout); + } + + @Override + public void setTaskScheduler(TaskScheduler taskScheduler) { + this.listenerContainer.setTaskScheduler(taskScheduler); + } + + public void setAdviceChain(Advice... advices) { + this.listenerContainer.setAdviceChain(advices); + } + + public void setAutoSettle(boolean autoSettle) { + this.listenerContainer.setAutoSettle(autoSettle); + } + + public void setDefaultRequeue(boolean defaultRequeue) { + this.listenerContainer.setDefaultRequeue(defaultRequeue); + } + + public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) { + this.listenerContainer.setGracefulShutdownPeriod(gracefulShutdownPeriod); + } + + public void setConsumersPerQueue(int consumersPerQueue) { + this.listenerContainer.setConsumersPerQueue(consumersPerQueue); + } + + /** + * Set a {@link MessageConverter} to replace the default {@link SimpleMessageConverter}. + * If set to null, an AMQP message is sent as is into a {@link Message} payload. + * @param messageConverter the {@link MessageConverter} to use or null. + */ + public void setMessageConverter(@Nullable MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + public void setHeaderMapper(AmqpHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } + + @Override + public String getComponentType() { + return "amqp:inbound-channel-adapter"; + } + + @Override + protected void onInit() { + super.onInit(); + this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer"); + this.listenerContainer.setupMessageListener(new IntegrationRabbitAmqpMessageListener()); + this.listenerContainer.afterPropertiesSet(); + } + + @Override + protected void doStart() { + super.doStart(); + this.listenerContainer.start(); + } + + @Override + protected void doStop() { + super.doStop(); + this.listenerContainer.stop(); + } + + @Override + public void destroy() { + super.destroy(); + this.listenerContainer.destroy(); + } + + @Override + public void pause() { + this.listenerContainer.pause(); + this.paused = true; + } + + @Override + public void resume() { + this.listenerContainer.resume(); + this.paused = false; + } + + @Override + public boolean isPaused() { + return this.paused; + } + + private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener { + + @Override + public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) { + org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context); + Message messageToSend = toSpringMessage(message); + + try { + sendMessage(messageToSend); + } + catch (Exception ex) { + throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message); + } + } + + @Override + public void onMessageBatch(List messages) { + SimpleAcknowledgment acknowledgmentCallback = null; + List> springMessages = new ArrayList<>(messages.size()); + for (org.springframework.amqp.core.Message message : messages) { + Message springMessage = toSpringMessage(message); + if (acknowledgmentCallback == null) { + acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgment(springMessage); + } + springMessages.add(springMessage); + } + + Message>> messageToSend = + MutableMessageBuilder.withPayload(springMessages) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) + .build(); + + try { + sendMessage(messageToSend); + } + catch (Exception ex) { + throw new ListenerExecutionFailedException(getComponentName() + ".onMessageBatch() failed", ex, + messages.toArray(org.springframework.amqp.core.Message[]::new)); + } + } + + private Message toSpringMessage(org.springframework.amqp.core.Message message) { + if (AmqpClientMessageProducer.this.afterReceivePostProcessors != null) { + for (MessagePostProcessor processor : AmqpClientMessageProducer.this.afterReceivePostProcessors) { + message = processor.postProcessMessage(message); + } + } + MessageProperties messageProperties = message.getMessageProperties(); + AmqpAcknowledgment amqpAcknowledgment = messageProperties.getAmqpAcknowledgment(); + AmqpAcknowledgmentCallback acknowledgmentCallback = null; + if (amqpAcknowledgment != null) { + acknowledgmentCallback = new AmqpAcknowledgmentCallback(amqpAcknowledgment); + } + + Object payload = message; + Map headers = null; + if (AmqpClientMessageProducer.this.messageConverter != null) { + payload = AmqpClientMessageProducer.this.messageConverter.fromMessage(message); + headers = AmqpClientMessageProducer.this.headerMapper.toHeadersFromRequest(messageProperties); + } + + return getMessageBuilderFactory() + .withPayload(payload) + .copyHeaders(headers) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback) + .build(); + } + + @Override + public void onMessage(org.springframework.amqp.core.Message message) { + throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'"); + } + + } + + /** + * The {@link AcknowledgmentCallback} adapter for an {@link AmqpAcknowledgment}. + * @param delegate the {@link AmqpAcknowledgment} to delegate to. + */ + private record AmqpAcknowledgmentCallback(AmqpAcknowledgment delegate) implements AcknowledgmentCallback { + + @Override + public void acknowledge(Status status) { + this.delegate.acknowledge(AmqpAcknowledgment.Status.valueOf(status.name())); + } + + @Override + public boolean isAutoAck() { + return false; + } + + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AmqpClientMessageHandler.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AmqpClientMessageHandler.java new file mode 100644 index 00000000000..f10d9b3e21b --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AmqpClientMessageHandler.java @@ -0,0 +1,317 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.outbound; + +import java.util.concurrent.CompletableFuture; + +import org.jspecify.annotations.Nullable; + +import org.springframework.amqp.core.AsyncAmqpTemplate; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.amqp.support.converter.SmartMessageConverter; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.ResolvableType; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.mapping.AbstractHeaderMapper; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * An {@link AbstractReplyProducingMessageHandler} implementation for AMQP 1.0 client. + *

+ * With the {@link #setRequiresReply(boolean)} configured as {@code true}, this message handler + * behaves as a gateway - the RPC over AMQP. + * In this case, the {@link AsyncAmqpTemplate} must be able to convert + * a reply into a provided {@link #replyPayloadTypeExpression}. + *

+ * This handler is {@code async} by default. + *

+ * In async mode, the error is sent to the error channel even if not in a gateway mode. + *

+ * The {@link #exchangeExpression}, {@link #routingKeyExpression} and {@link #queueExpression} + * are optional. + * In this case they have to be supplied by the {@link AsyncAmqpTemplate}. + * + * @author Artem Bilan + * + * @since 7.0 + */ +public class AmqpClientMessageHandler extends AbstractReplyProducingMessageHandler { + + private final AsyncAmqpTemplate amqpTemplate; + + private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper(); + + private MessageConverter messageConverter = new SimpleMessageConverter(); + + private @Nullable Expression exchangeExpression; + + private @Nullable Expression routingKeyExpression; + + private @Nullable Expression queueExpression; + + private @Nullable Expression replyPayloadTypeExpression; + + private boolean returnMessage; + + @SuppressWarnings("NullAway.Init") + private StandardEvaluationContext evaluationContext; + + /** + * Construct an instance with the provided {@link AsyncAmqpTemplate}. + * The {@link AsyncAmqpTemplate} must be an implementation for AMQP 1.0 protocol, + * e.g. {@link org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate}. + * @param amqpTemplate the {@link AsyncAmqpTemplate} to use. + */ + @SuppressWarnings("this-escape") + public AmqpClientMessageHandler(AsyncAmqpTemplate amqpTemplate) { + this.amqpTemplate = amqpTemplate; + setAsync(true); + } + + public void setHeaderMapper(AmqpHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } + + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + public void setExchange(String exchange) { + setExchangeExpression(new ValueExpression<>(exchange)); + } + + public void setExchangeExpressionString(String exchangeExpression) { + setExchangeExpression(EXPRESSION_PARSER.parseExpression(exchangeExpression)); + } + + public void setExchangeExpression(Expression exchangeExpression) { + this.exchangeExpression = exchangeExpression; + } + + public void setRoutingKey(String routingKey) { + setRoutingKeyExpression(new ValueExpression<>(routingKey)); + } + + public void setRoutingKeyExpressionString(String routingKeyExpression) { + setRoutingKeyExpression(EXPRESSION_PARSER.parseExpression(routingKeyExpression)); + } + + public void setRoutingKeyExpression(Expression routingKeyExpression) { + this.routingKeyExpression = routingKeyExpression; + } + + public void setQueue(String queue) { + setQueueExpression(new ValueExpression<>(queue)); + } + + public void setQueueExpressionString(String queueExpression) { + setQueueExpression(EXPRESSION_PARSER.parseExpression(queueExpression)); + } + + public void setQueueExpression(Expression queueExpression) { + this.queueExpression = queueExpression; + } + + /** + * Set the reply payload type. + * Used only if {@link #setRequiresReply(boolean)} is {@code true}. + * @param replyPayloadType the reply payload type. + */ + public void setReplyPayloadType(Class replyPayloadType) { + setReplyPayloadType(ResolvableType.forClass(replyPayloadType)); + } + + /** + * Set the reply payload type. + * Used only if {@link #setRequiresReply(boolean)} is {@code true}. + * @param replyPayloadType the reply payload type. + */ + public void setReplyPayloadType(ResolvableType replyPayloadType) { + setReplyPayloadTypeExpression(new ValueExpression<>(replyPayloadType)); + } + + /** + * Set a SpEL expression for the reply payload type. + * Used only if {@link #setRequiresReply(boolean)} is {@code true}. + * Must be evaluated to a {@link Class} or {@link ResolvableType}. + * @param replyPayloadTypeExpression the expression for a reply payload type. + */ + public void setReplyPayloadTypeExpressionString(String replyPayloadTypeExpression) { + setReplyPayloadTypeExpression(EXPRESSION_PARSER.parseExpression(replyPayloadTypeExpression)); + } + + /** + * Set a SpEL expression for the reply payload type. + * Used only if {@link #setRequiresReply(boolean)} is {@code true}. + * Must be evaluated to a {@link Class} or {@link ResolvableType}. + * @param replyPayloadTypeExpression the expression for a reply payload type. + */ + public void setReplyPayloadTypeExpression(Expression replyPayloadTypeExpression) { + this.replyPayloadTypeExpression = replyPayloadTypeExpression; + } + + /** + * Set to true to return the reply as a whole AMQP message. + * Used only in the gateway mode. + * @param returnMessage true to return the reply as a whole AMQP message. + */ + public void setReturnMessage(boolean returnMessage) { + this.returnMessage = returnMessage; + } + + @Override + public String getComponentType() { + return getRequiresReply() ? "amqp:outbound-gateway" : "amqp:outbound-channel-adapter"; + } + + @Override + protected void doInit() { + super.doInit(); + if (this.headerMapper instanceof AbstractHeaderMapper abstractHeaderMapper) { + abstractHeaderMapper.setBeanClassLoader(getBeanClassLoader()); + } + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + + Assert.state(this.exchangeExpression == null || this.queueExpression == null, + "The 'exchange' (and optional 'routingKey') is mutually exclusive with 'queue'"); + + Assert.state(this.replyPayloadTypeExpression == null || this.messageConverter instanceof SmartMessageConverter, + "The 'messageConverter' must be a 'SmartMessageConverter' when 'replyPayloadTypeExpression' is provided"); + + Assert.state(this.replyPayloadTypeExpression == null || !this.returnMessage, + "The 'returnMessage == true' and 'replyPayloadTypeExpression' are mutually exclusive"); + } + + @Override + protected @Nullable Object handleRequestMessage(org.springframework.messaging.Message requestMessage) { + MessageProperties messageProperties = new MessageProperties(); + this.headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), messageProperties); + Message amqpMessage = this.messageConverter.toMessage(requestMessage.getPayload(), messageProperties); + + String queue = null; + if (this.queueExpression != null) { + queue = this.queueExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + + String exchange = null; + if (this.exchangeExpression != null) { + exchange = this.exchangeExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + + String routingKey = null; + if (this.routingKeyExpression != null) { + routingKey = this.routingKeyExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + + if (getRequiresReply()) { + return doSendAndReceive(requestMessage, amqpMessage, queue, exchange, routingKey); + } + else { + doSend(requestMessage, amqpMessage, queue, exchange, routingKey); + return null; + } + } + + private void doSend(org.springframework.messaging.Message requestMessage, Message amqpMessage, + @Nullable String queue, @Nullable String exchange, @Nullable String routingKey) { + + CompletableFuture sendResultFuture; + + if (StringUtils.hasText(queue)) { + sendResultFuture = this.amqpTemplate.send(queue, amqpMessage); + } + else if (StringUtils.hasText(exchange)) { + sendResultFuture = this.amqpTemplate.send(exchange, routingKey, amqpMessage); + } + else { + sendResultFuture = this.amqpTemplate.send(amqpMessage); + } + + if (isAsync()) { + sendResultFuture.whenComplete((aBoolean, throwable) -> { + if (throwable != null) { + sendErrorMessage(requestMessage, throwable); + } + }); + } + else { + sendResultFuture.join(); + } + } + + private Object doSendAndReceive(org.springframework.messaging.Message requestMessage, Message amqpMessage, + @Nullable String queue, @Nullable String exchange, @Nullable String routingKey) { + + ParameterizedTypeReference replyType = null; + if (this.replyPayloadTypeExpression != null) { + Object type = this.replyPayloadTypeExpression.getValue(this.evaluationContext, requestMessage); + + Assert.state(type instanceof Class || type instanceof ResolvableType, + "The 'replyPayloadTypeExpression' must evaluate to 'Class' or 'ResolvableType'"); + + ResolvableType replyResolvableType = + type instanceof Class aClass + ? ResolvableType.forClass(aClass) + : (ResolvableType) type; + + replyType = ParameterizedTypeReference.forType(replyResolvableType.getType()); + } + + CompletableFuture replyFuture; + + if (StringUtils.hasText(queue)) { + replyFuture = this.amqpTemplate.sendAndReceive(queue, amqpMessage); + } + else if (StringUtils.hasText(exchange)) { + replyFuture = this.amqpTemplate.sendAndReceive(exchange, routingKey, amqpMessage); + } + else { + replyFuture = this.amqpTemplate.sendAndReceive(amqpMessage); + } + + if (!this.returnMessage) { + ParameterizedTypeReference replyTypeToUse = replyType; + replyFuture = replyFuture.thenApply((reply) -> buildReplyMessage((Message) reply, replyTypeToUse)); + } + + return isAsync() ? replyFuture : replyFuture.join(); + } + + private AbstractIntegrationMessageBuilder buildReplyMessage(Message message, + @Nullable ParameterizedTypeReference replyType) { + + Object replyPayload = + replyType != null + ? ((SmartMessageConverter) this.messageConverter).fromMessage(message, replyType) + : this.messageConverter.fromMessage(message); + + return getMessageBuilderFactory().withPayload(replyPayload) + .copyHeaders(this.headerMapper.toHeadersFromReply(message.getMessageProperties())); + } + +} diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducerTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducerTests.java new file mode 100644 index 00000000000..8c1baf02f26 --- /dev/null +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducerTests.java @@ -0,0 +1,280 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.inbound; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Declarables; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; +import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpAdmin; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; +import org.springframework.amqp.rabbitmq.client.SingleAmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.amqp.support.RabbitTestContainer; +import org.springframework.integration.channel.FixedSubscriberChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConversionException; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 7.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class AmqpClientMessageProducerTests implements RabbitTestContainer { + + @Autowired + RabbitAmqpTemplate rabbitTemplate; + + @Autowired + QueueChannel inputChannel; + + @Test + void receiveSimpleMessageFromAmqp() { + this.rabbitTemplate.convertAndSend("q1", "test data"); + + Message receive = this.inputChannel.receive(10_000); + + assertThat(receive) + .extracting(Message::getPayload) + .isEqualTo("test data"); + } + + @Test + void receiveAndAck() { + this.rabbitTemplate.convertAndSend("q2", "test data #2"); + + Message receive = this.inputChannel.receive(10_000); + + assertThat(receive) + .extracting(Message::getPayload) + .isEqualTo("test data #2"); + + SimpleAcknowledgment acknowledgment = StaticMessageHeaderAccessor.getAcknowledgment(receive); + assertThat(acknowledgment).isNotNull(); + acknowledgment.acknowledge(); + } + + @Test + void receiveBatch() { + this.rabbitTemplate.convertAndSend("q3", "test data #3"); + this.rabbitTemplate.convertAndSend("q3", "test data #4"); + + Message receive = this.inputChannel.receive(10_000); + + assertThat(receive) + .extracting(Message::getPayload) + .asInstanceOf(InstanceOfAssertFactories.list(Message.class)) + .hasSize(2) + .extracting(Message::getPayload) + .contains("test data #3", "test data #4"); + } + + @Test + void receiveBatchAndAck() { + this.rabbitTemplate.convertAndSend("q4", "test data #5"); + this.rabbitTemplate.convertAndSend("q4", "test data #6"); + this.rabbitTemplate.convertAndSend("q4", "test data #7"); + + Message receive = this.inputChannel.receive(10_000); + + assertThat(receive) + .extracting(Message::getPayload) + .asInstanceOf(InstanceOfAssertFactories.list(Message.class)) + .hasSize(3) + .extracting(Message::getPayload) + .contains("test data #5", "test data #6", "test data #7"); + + SimpleAcknowledgment acknowledgment = StaticMessageHeaderAccessor.getAcknowledgment(receive); + assertThat(acknowledgment).isNotNull(); + acknowledgment.acknowledge(); + } + + @Autowired + AmqpClientMessageProducer failureAmqpClientMessageProducer; + + @Test + void failureAfterReceiving() { + RabbitAmqpListenerContainer listenerContainer = + TestUtils.getPropertyValue(this.failureAmqpClientMessageProducer, "listenerContainer", + RabbitAmqpListenerContainer.class); + + AtomicReference listenerError = new AtomicReference<>(); + + listenerContainer.setErrorHandler(new ConditionalRejectingErrorHandler() { + + @Override + protected void log(Throwable t) { + listenerError.set(t); + } + + }); + + this.rabbitTemplate.convertAndSend("queueForError", "discard"); + + assertThat(this.rabbitTemplate.receive("dlq1")).succeedsWithin(20, TimeUnit.SECONDS); + + assertThat(listenerError.get()) + .asInstanceOf(InstanceOfAssertFactories.throwable(ListenerExecutionFailedException.class)) + .hasCauseInstanceOf(MessageConversionException.class) + .hasStackTraceContaining("Intentional conversion failure"); + } + + @Configuration(proxyBeanMethods = false) + @EnableIntegration + public static class ContextConfiguration { + + @Bean + Environment environment() { + return new AmqpEnvironmentBuilder() + .connectionSettings() + .port(RabbitTestContainer.amqpPort()) + .environmentBuilder() + .build(); + } + + @Bean + AmqpConnectionFactory connectionFactory(Environment environment) { + return new SingleAmqpConnectionFactory(environment); + } + + @Bean + RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) { + return new RabbitAmqpAdmin(connectionFactory); + } + + @Bean + Declarables declarables() { + return new Declarables(Stream.of("q1", "q2", "q3", "q4").map(Queue::new).toArray(Queue[]::new)); + } + + @Bean + Queue queueForError() { + return QueueBuilder.durable("queueForError").deadLetterExchange("dlx1").build(); + } + + @Bean + TopicExchange dlx1() { + return new TopicExchange("dlx1"); + } + + @Bean + Queue dlq1() { + return new Queue("dlq1"); + } + + @Bean + Binding dlq1Binding(Queue dlq1, TopicExchange dlx1) { + return BindingBuilder.bind(dlq1).to(dlx1).with("#"); + } + + @Bean + RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) { + return new RabbitAmqpTemplate(connectionFactory); + } + + @Bean + QueueChannel inputChannel() { + return new QueueChannel(); + } + + @Bean + AmqpClientMessageProducer amqpClientMessageProducer(AmqpConnectionFactory connectionFactory, + QueueChannel inputChannel) { + + AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q1"); + amqpClientMessageProducer.setOutputChannel(inputChannel); + return amqpClientMessageProducer; + } + + @Bean + AmqpClientMessageProducer manualAckAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, + QueueChannel inputChannel) { + + AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q2"); + amqpClientMessageProducer.setOutputChannel(inputChannel); + amqpClientMessageProducer.setAutoSettle(false); + return amqpClientMessageProducer; + } + + @Bean + AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, + QueueChannel inputChannel) { + + AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3"); + amqpClientMessageProducer.setOutputChannel(inputChannel); + amqpClientMessageProducer.setBatchSize(2); + return amqpClientMessageProducer; + } + + @Bean + AmqpClientMessageProducer batchManualAckAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, + QueueChannel inputChannel) { + + AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q4"); + amqpClientMessageProducer.setOutputChannel(inputChannel); + amqpClientMessageProducer.setBatchSize(3); + amqpClientMessageProducer.setAutoSettle(false); + return amqpClientMessageProducer; + } + + @Bean + AmqpClientMessageProducer failureAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, + FixedSubscriberChannel conversionChannel) { + + var amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "queueForError"); + amqpClientMessageProducer.setOutputChannel(conversionChannel); + return amqpClientMessageProducer; + } + + @Bean + FixedSubscriberChannel conversionChannel() { + return new FixedSubscriberChannel(message -> { + throw new MessageConversionException(message, "Intentional conversion failure"); + }); + } + + } + +} diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/ManualAckTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/ManualAckTests.java index 1d330c006b0..8efb81864aa 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/ManualAckTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/ManualAckTests.java @@ -30,7 +30,6 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.integration.amqp.support.RabbitTestContainer; import org.springframework.integration.annotation.MessageEndpoint; @@ -93,7 +92,6 @@ public void testManual() { @Configuration @EnableIntegration - @ComponentScan @MessageEndpoint public static class ManualAckConfig { diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpClientMessageHandlerTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpClientMessageHandlerTests.java new file mode 100644 index 00000000000..b21f8c0fc8d --- /dev/null +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpClientMessageHandlerTests.java @@ -0,0 +1,217 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.outbound; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpAdmin; +import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; +import org.springframework.amqp.rabbitmq.client.SingleAmqpConnectionFactory; +import org.springframework.amqp.support.converter.JacksonJsonMessageConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.amqp.support.RabbitTestContainer; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** + * @author Artem Bilan + * + * @since 7.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class AmqpClientMessageHandlerTests implements RabbitTestContainer { + + @Autowired + RabbitAmqpTemplate rabbitTemplate; + + @Autowired + MessageChannel amqpClientSendChannel; + + @Autowired + MessageChannel amqpClientSendAndReceiveChannel; + + @Autowired + AmqpClientMessageHandler amqpClientGateway; + + @Test + void neitherExchangeAndQueue() { + Message message = new GenericMessage<>("test data"); + + assertThatExceptionOfType(MessageHandlingException.class) + .isThrownBy(() -> this.amqpClientSendChannel.send(message)) + .withRootCauseExactlyInstanceOf(IllegalStateException.class) + .withStackTraceContaining( + "For send with defaults, an 'exchange' (and optional 'routingKey') or 'queue' must be provided"); + } + + @Test + void verifyMessagePublishedProperlyWithCustomHeader() { + Message message = + MessageBuilder.withPayload("test data") + .setHeader("exchange", "e1") + .setHeader("routingKey", "k1") + .setHeader("testHeader", "testValue") + .build(); + + this.amqpClientSendChannel.send(message); + + CompletableFuture received = this.rabbitTemplate.receive("q1"); + + assertThat(received).succeedsWithin(Duration.ofSeconds(10)) + .satisfies(m -> { + // Converted to JSON + assertThat(m.getBody()).isEqualTo("test data".getBytes()); + assertThat(m.getMessageProperties().getHeaders()).containsEntry("testHeader", "testValue"); + }); + } + + @Test + void verifySendAndReceive() { + this.amqpClientGateway.setReturnMessage(false); + QueueChannel replyChannel = new QueueChannel(); + + Message message = + MessageBuilder.withPayload("request") + .setReplyChannel(replyChannel) + .build(); + + this.amqpClientSendAndReceiveChannel.send(message); + + this.rabbitTemplate.receiveAndReply("q1", payload -> "reply"); + + Message reply = replyChannel.receive(10000); + + assertThat(reply).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("reply"); + } + + @Test + void verifySendAndReceiveAsMessage() { + this.amqpClientGateway.setReturnMessage(true); + QueueChannel replyChannel = new QueueChannel(); + + Message message = + MessageBuilder.withPayload("request") + .setReplyChannel(replyChannel) + .build(); + + this.amqpClientSendAndReceiveChannel.send(message); + + this.rabbitTemplate.receiveAndReply("q1", payload -> "reply"); + + Message reply = replyChannel.receive(10000); + + assertThat(reply).isNotNull() + .extracting(Message::getPayload) + .isInstanceOf(org.springframework.amqp.core.Message.class) + .extracting("body") + .isEqualTo("\"reply\"".getBytes()); + } + + @Configuration(proxyBeanMethods = false) + @EnableIntegration + public static class ContextConfiguration { + + @Bean + Environment environment() { + return new AmqpEnvironmentBuilder() + .connectionSettings() + .port(RabbitTestContainer.amqpPort()) + .environmentBuilder() + .build(); + } + + @Bean + AmqpConnectionFactory connectionFactory(Environment environment) { + return new SingleAmqpConnectionFactory(environment); + } + + @Bean + RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) { + return new RabbitAmqpAdmin(connectionFactory); + } + + @Bean + DirectExchange e1() { + return new DirectExchange("e1"); + } + + @Bean + Queue q1() { + return new Queue("q1"); + } + + @Bean + Binding b1(Queue q1, DirectExchange e1) { + return BindingBuilder.bind(q1).to(e1).with("k1"); + } + + @Bean + RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) { + RabbitAmqpTemplate rabbitAmqpTemplate = new RabbitAmqpTemplate(connectionFactory); + rabbitAmqpTemplate.setMessageConverter(new JacksonJsonMessageConverter()); + return rabbitAmqpTemplate; + } + + @Bean + @ServiceActivator(inputChannel = "amqpClientSendChannel") + AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) { + AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate); + messageHandler.setExchangeExpressionString("headers[exchange]"); + messageHandler.setRoutingKeyExpressionString("headers[routingKey]"); + return messageHandler; + } + + @Bean + @ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel") + AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) { + AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate); + messageHandler.setRequiresReply(true); + messageHandler.setReplyPayloadType(String.class); + messageHandler.setMessageConverter(new JacksonJsonMessageConverter()); + messageHandler.setQueue("q1"); + return messageHandler; + } + + } + +} diff --git a/src/reference/antora/modules/ROOT/nav.adoc b/src/reference/antora/modules/ROOT/nav.adoc index ed0e24fff86..59f4379d030 100644 --- a/src/reference/antora/modules/ROOT/nav.adoc +++ b/src/reference/antora/modules/ROOT/nav.adoc @@ -121,6 +121,7 @@ ** xref:amqp/strict-ordering.adoc[] ** xref:amqp/samples.adoc[] ** xref:amqp/rmq-streams.adoc[] +** xref:amqp/amqp-1.0.adoc[] * xref:camel.adoc[] * xref:cassandra.adoc[] * xref:debezium.adoc[] diff --git a/src/reference/antora/modules/ROOT/pages/amqp.adoc b/src/reference/antora/modules/ROOT/pages/amqp.adoc index 5e635573f7f..54f4c1528f4 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp.adoc @@ -35,6 +35,7 @@ The following adapters are available: * xref:amqp/async-outbound-gateway.adoc[Async Outbound Gateway] * xref:amqp/rmq-streams.adoc#rmq-stream-inbound-channel-adapter[RabbitMQ Stream Queue Inbound Channel Adapter] * xref:amqp/rmq-streams.adoc#rmq-stream-outbound-channel-adapter[RabbitMQ Stream Queue Outbound Channel Adapter] +* xref:amqp/amqp-1.0.adoc[AMQP 1.0 Channel Adapters] Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues. diff --git a/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc new file mode 100644 index 00000000000..4833e7a43e1 --- /dev/null +++ b/src/reference/antora/modules/ROOT/pages/amqp/amqp-1.0.adoc @@ -0,0 +1,91 @@ +[[amqp-1.0]] += AMQP 1.0 Support + +Starting with version 7.0, Spring Integration provides channel adapters for RabbitMQ AMQP 1.0 support. +These channel adapters are based on the `org.springframework.amqp:spring-rabbitmq-client` library. + +The Spring AMQP documentation provides more details about https://docs.spring.io/spring-amqp/reference/4.0/rabbitmq-amqp-client.html[RabbitMQ AMQP 1.0 support]. + +[[amqp-1.0-outbound]] +== AMQP 1.0 Outbound Channel Adapters + +The `AmqpClientMessageHandler` is an `AbstractReplyProducingMessageHandler` implementation and can act as a one-way channel adapter or as an outbound gateway depending on the `setRequiresReply()` configuration. +The instance of this channel adapter requires an `AsyncAmqpTemplate` implementation for AMQP 1.0 protocol, e.g. `RabbitAmqpTemplate` from the mentioned above `spring-rabbitmq-client` library. +This message handler is asynchronous by default; therefore, publication errors should be handled via `errorChannel` header in the request message or global default `errorChannel` in the application context. + +The `exchange` to publish message (together with optional `routingKey`) is mutually exclusive with a `queue` to publish. +If neither is provided, then `AsyncAmqpTemplate` implementation must ensure some defaults for those destination parts; otherwise the message is going to be rejected as not delivered. + +By default, the `MessageConverter` is an `org.springframework.amqp.support.converter.SimpleMessageConverter` that handles String, Serializable instances, and byte arrays. +Also, a default `AmqpHeaderMapper` is a xref:amqp/message-headers.adoc[`DefaultAmqpHeaderMapper.outboundMapper()`]. +This header mapper is also used for mapping AMQP message properties to headers back on the reply. + +In a gateway mode, the `replyPayloadType` could be supplied to convert a reply message body. +However, the `MessageConverter` has to be an implementation of the `SmartMessageConverter` like a `JacksonJsonMessageConverter`. +Also, a mutually exclusive to the `replyPayloadType`, a `returnMessage` flag could be set to `true` to return the whole instance of `org.springframework.amqp.core.Message` as a reply message payload. + +The following example demonstrates how to configure an `AmqpClientMessageHandler` as a simple `@ServiceActivator`: + +[source, java] +---- +@Bean +@ServiceActivator(inputChannel = "amqpClientSendChannel") +AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) { + AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate); + messageHandler.setExchangeExpressionString("headers[exchange]"); + messageHandler.setRoutingKeyExpressionString("headers[routingKey]"); + return messageHandler; +} +---- + +The gateway variant for the `AmqpClientMessageHandler` could be like: + +[source, java] +---- +@Bean +@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel") +AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) { + AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate); + messageHandler.setRequiresReply(true); + messageHandler.setReplyPayloadType(String.class); + messageHandler.setMessageConverter(new JacksonJsonMessageConverter()); + messageHandler.setQueue("q1"); + return messageHandler; +} +---- + +[[amqp-1.0-message-driver]] +== AMQP 1.0 Message-Driver Channel Adapter + +The `AmqpClientMessageProducer` is a `MessageProducerSupport` implementation as a Message-Driver Channel Adapter to consume messages from queues over RabbitMQ AMQP 1.0 protocol. +It requires an `AmqpConnectionFactory` and at least one queue to consume. +Its logic internally is based on the `RabbitAmqpListenerContainer` and `IntegrationRabbitAmqpMessageListener` to relay consumed AMQP messages (after conversion) to the `outputChannel`. +Some of `RabbitAmqpListenerContainer` configuration options are exposed as setters from the `AmqpClientMessageProducer`. + +By default, the `MessageConverter` is an `org.springframework.amqp.support.converter.SimpleMessageConverter` that handles String, Serializable instances, and byte arrays. +Also, a default `AmqpHeaderMapper` is a xref:amqp/message-headers.adoc[`DefaultAmqpHeaderMapper.inboundMapper()`]. +The `messageConverter` option can be set to `null` to fully skip conversion (including headers mapping), and return the received AMQP message as a payload of Spring message to produce. + +Also, the `AmqpClientMessageProducer` implements a `Pausable` contract and delegates to the respective `RabbitAmqpListenerContainer` API. + +When `AmqpClientMessageProducer.setBatchSize() > 1`, this channel adapter works in a batch mode. +In this case received messages are gathered until the batch size is fulfilled, or `batchReceiveTimeout` period is exhausted. +All the batched AMQP messages then converted to Spring messages, and result list is produced as a payload of wrapping message to send to the `outputChannel`. +The batch mode gives some performance gain due to the settlement for all the batched message at once. + +When `autoSettle` flag is set to `false`, the `AcknowledgmentCallback` instance is provided as an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` message header to make settlement decision for received message or the whole batch. + +The following example demonstrates how to configure an `AmqpClientMessageProducer` as a simple inbound endpoint: + +[source, java] +---- +@Bean +AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory, + QueueChannel inputChannel) { + + AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3"); + amqpClientMessageProducer.setOutputChannel(inputChannel); + amqpClientMessageProducer.setBatchSize(2); + return amqpClientMessageProducer; +} +---- diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 68d9aa40057..79febc2c13e 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -67,6 +67,9 @@ The Jackson 2 support has been deprecated for removal. Jackson 3 is now the default with new components: `JacksonJsonObjectMapper`, `JacksonPropertyAccessor`, `JacksonIndexAccessor`, and `JacksonMessagingUtils`. See their Javadocs for more information and deprecated classes for a migration path. +The `spring-integration-amqp` module now implements channel adapters for RabbitMQ AMQP 1.0 support. +The dedicated xref:amqp/amqp-1.0.adoc[AMQP 1.0 Support] chapter provides more information. + [[x7.0-jdbc-changes]] === JDBC Changes