Skip to content

Conversation

@artembilan
Copy link
Member

Related to: #10090

  • Add AmqpClientInboundGateway that is mostly a copy/paste of the AmqpClientMessageProducer, but adds a reply-producing logic
  • Cover with tests and document this new component
  • Fix a couple typos in the amqp-1.0.adoc

@artembilan
Copy link
Member Author

After doing this, I don't see a value in the AmqpClientMessageSource: who would need to poll messages from the queue on demand? 🤷

The RabbitAmqpTemplate.receive() API could be turned into a MessageSource very easy:

@Bean 
IntegrationFlow amqpClientPollingFlow(RabbitAmqpTemplate rabbitAmqpTemplate) {
    return IntegrationFlow.fromSupplier(rabbitAmqpTemplate::receive)
                          .handle((p, h) -> p, e -> e.async(true))
                          ...
}

So, the point is that RabbitAmqpTemplate.receive() returns a CompletableFuture<Message> which we don't want to join() immediately.

Copy link
Contributor

@cppwfs cppwfs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!
Just a couple of doc questions

@artembilan artembilan requested a review from cppwfs September 26, 2025 20:26
Copy link
Contributor

@cppwfs cppwfs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet!

I'll merge once it successfully builds.

@artembilan
Copy link
Member Author

Hold on from merging, please.

@cppwfs
Copy link
Contributor

cppwfs commented Sep 26, 2025

On hold till I get the go ahead.

@artembilan artembilan requested a review from cppwfs September 26, 2025 21:22
Copy link
Contributor

@cppwfs cppwfs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job on consolidating the code. The red is always pleasing to see! :-)

Just a couple of nitpicks.

* @param delegate the {@link AmqpAcknowledgment} to delegate to.
* Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}.
* @param messageToSend the message to produce from this endpoint.
* @param requestMessage the request AMQP message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to mention that the requestMessage is ignored in this processRequest.

try {
this.requestAction.accept(messageToSend, message);
}
catch (Exception ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a RuntimeException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the point.
We have to re-throw anything as a ListenerExecutionFailedException and ErrorHandler in the listener container will be able to parse this and its cause respectively.


@Override
public void onMessage(org.springframework.amqp.core.Message message) {
throw new UnsupportedOperationException("The 'RabbitAmqpMessageListener' does not implement 'onMessage()'");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it read, The 'IntegrationRabbitAmqpMessageListener' does not implement 'onMessge'?

Related to: spring-projects#10090

* Add `AmqpClientInboundGateway` that is mostly a copy/paste of the `AmqpClientMessageProducer`,
but adds a reply-producing logic
* Cover with tests and document this new component
* Fix a couple typos in the `amqp-1.0.adoc`
…ckage private) class.

Serves as a reusable unit of work for both `AmqpClientMessageProducer` & `AmqpClientInboundGateway`.
The one-way and request-reply parts are handled as a `BiConsumer` action injection
into this `IntegrationRabbitAmqpMessageListener` instance.
Now both `AmqpClientMessageProducer` & `AmqpClientInboundGateway` are much simpler.
mentioning that `requestMessage` param is out of use
* Be more specific with the type for `UnsupportedOperationException`
in the `IntegrationRabbitAmqpMessageListener`
@artembilan artembilan force-pushed the GH-10090-inbound-gateway branch from fe22890 to 73cf36c Compare October 6, 2025 17:29
Copy link
Contributor

@cppwfs cppwfs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

This is Awesome!

@cppwfs cppwfs merged commit aad2f7b into spring-projects:main Oct 6, 2025
3 checks passed
@artembilan artembilan deleted the GH-10090-inbound-gateway branch October 6, 2025 20:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants