Skip to content

Commit f93a740

Browse files
artembilangaryrussell
authored andcommitted
Messaging Annotations: process ReactiveMH (#3141)
* Messaging Annotations: process ReactiveMH * Add support for `ReactiveMessageHandler` as a `@ServiceActivator` bean * Wrap `ReactiveMessageHandler` in the `ReactiveMessageHandlerAdapter` in the `ServiceActivatorAnnotationPostProcessor` * Unwrap `ReactiveMessageHandlerAdapter` in the `AbstractMethodAnnotationPostProcessor` for `ReactiveStreamsConsumer` * * Add documentation for the `ReactiveMessageHandler` & Reactive MongoDb channel adapters
1 parent d13752b commit f93a740

File tree

6 files changed

+124
-19
lines changed

6 files changed

+124
-19
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,7 @@
6666
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
6767
import org.springframework.integration.handler.LambdaMessageProcessor;
6868
import org.springframework.integration.handler.MessageProcessor;
69+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
6970
import org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper;
7071
import org.springframework.integration.handler.advice.HandleMessageAdvice;
7172
import org.springframework.integration.router.AbstractMessageRouter;
@@ -104,7 +105,7 @@ public abstract class AbstractMethodAnnotationPostProcessor<T extends Annotation
104105

105106
protected static final String SEND_TIMEOUT_ATTRIBUTE = "sendTimeout";
106107

107-
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
108+
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR
108109

109110
protected final List<String> messageHandlerAttributes = new ArrayList<>(); // NOSONAR
110111

@@ -156,15 +157,17 @@ public Object postProcess(Object bean, String beanName, Method method, List<Anno
156157

157158
MessageHandler handler = createHandler(bean, method, annotations);
158159

159-
orderable(method, handler);
160-
producerOrRouter(annotations, handler);
160+
if (!(handler instanceof ReactiveMessageHandlerAdapter)) {
161+
orderable(method, handler);
162+
producerOrRouter(annotations, handler);
161163

162-
if (!handler.equals(sourceHandler)) {
163-
handler = registerHandlerBean(beanName, method, handler);
164-
}
164+
if (!handler.equals(sourceHandler)) {
165+
handler = registerHandlerBean(beanName, method, handler);
166+
}
165167

166-
handler = annotated(method, handler);
167-
handler = adviceChain(beanName, annotations, handler);
168+
handler = annotated(method, handler);
169+
handler = adviceChain(beanName, annotations, handler);
170+
}
168171

169172
AbstractEndpoint endpoint = createEndpoint(handler, method, annotations);
170173
if (endpoint != null) {
@@ -379,7 +382,13 @@ protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChann
379382
Assert.state(ObjectUtils.isEmpty(pollers), "A '@Poller' should not be specified for Annotation-based " +
380383
"endpoint, since '" + inputChannel + "' is a SubscribableChannel (not pollable).");
381384
if (inputChannel instanceof Publisher) {
382-
endpoint = new ReactiveStreamsConsumer(inputChannel, handler);
385+
if (handler instanceof ReactiveMessageHandlerAdapter) {
386+
endpoint = new ReactiveStreamsConsumer(inputChannel,
387+
((ReactiveMessageHandlerAdapter) handler).getDelegate());
388+
}
389+
else {
390+
endpoint = new ReactiveStreamsConsumer(inputChannel, handler);
391+
}
383392
}
384393
else {
385394
endpoint = new EventDrivenConsumer((SubscribableChannel) inputChannel, handler);

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/ServiceActivatorAnnotationPostProcessor.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,10 +27,12 @@
2727
import org.springframework.integration.annotation.ServiceActivator;
2828
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
2929
import org.springframework.integration.handler.MessageProcessor;
30+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
3031
import org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper;
3132
import org.springframework.integration.handler.ServiceActivatingHandler;
3233
import org.springframework.integration.util.MessagingAnnotationUtils;
3334
import org.springframework.messaging.MessageHandler;
35+
import org.springframework.messaging.ReactiveMessageHandler;
3436
import org.springframework.util.StringUtils;
3537

3638
/**
@@ -53,9 +55,12 @@ public ServiceActivatorAnnotationPostProcessor(ConfigurableListableBeanFactory b
5355
protected MessageHandler createHandler(Object bean, Method method, List<Annotation> annotations) {
5456
AbstractReplyProducingMessageHandler serviceActivator;
5557
if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
56-
final Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method);
58+
Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method);
5759
serviceActivator = extractTypeIfPossible(target, AbstractReplyProducingMessageHandler.class);
5860
if (serviceActivator == null) {
61+
if (target instanceof ReactiveMessageHandler) {
62+
return new ReactiveMessageHandlerAdapter((ReactiveMessageHandler) target);
63+
}
5964
if (target instanceof MessageHandler) {
6065
/*
6166
* Return a reply-producing message handler so that we still get 'produced no reply' messages

spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,8 +29,7 @@
2929

3030
import javax.annotation.Resource;
3131

32-
import org.junit.Test;
33-
import org.junit.runner.RunWith;
32+
import org.junit.jupiter.api.Test;
3433

3534
import org.springframework.beans.factory.annotation.Autowired;
3635
import org.springframework.beans.factory.annotation.Qualifier;
@@ -56,6 +55,7 @@
5655
import org.springframework.integration.annotation.Splitter;
5756
import org.springframework.integration.annotation.Transformer;
5857
import org.springframework.integration.channel.DirectChannel;
58+
import org.springframework.integration.channel.FluxMessageChannel;
5959
import org.springframework.integration.channel.QueueChannel;
6060
import org.springframework.integration.config.EnableIntegration;
6161
import org.springframework.integration.config.EnableMessageHistory;
@@ -73,14 +73,19 @@
7373
import org.springframework.messaging.MessageChannel;
7474
import org.springframework.messaging.MessageHandler;
7575
import org.springframework.messaging.PollableChannel;
76+
import org.springframework.messaging.ReactiveMessageHandler;
7677
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
7778
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
7879
import org.springframework.messaging.support.ErrorMessage;
7980
import org.springframework.messaging.support.GenericMessage;
8081
import org.springframework.messaging.support.MessageBuilder;
8182
import org.springframework.test.annotation.DirtiesContext;
8283
import org.springframework.test.context.ContextConfiguration;
83-
import org.springframework.test.context.junit4.SpringRunner;
84+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
85+
86+
import reactor.core.publisher.Mono;
87+
import reactor.core.publisher.MonoProcessor;
88+
import reactor.test.StepVerifier;
8489

8590
/**
8691
* @author Artem Bilan
@@ -90,7 +95,7 @@
9095
* @since 4.0
9196
*/
9297
@ContextConfiguration(classes = MessagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.class)
93-
@RunWith(SpringRunner.class)
98+
@SpringJUnitConfig
9499
@DirtiesContext
95100
public class MessagingAnnotationsWithBeanAnnotationTests {
96101

@@ -225,6 +230,25 @@ public void testInvalidMessagingAnnotationsConfig() {
225230
.withMessageContaining("The attribute causing the ambiguity is: [applySequence].");
226231
}
227232

233+
@Autowired
234+
private MessageChannel reactiveMessageHandlerChannel;
235+
236+
@Autowired
237+
private ContextConfiguration contextConfiguration;
238+
239+
@Test
240+
public void testReactiveMessageHandler() {
241+
this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test"));
242+
243+
StepVerifier.create(
244+
this.contextConfiguration.messageMonoProcessor
245+
.map(Message::getPayload)
246+
.cast(String.class))
247+
.expectNext("test")
248+
.verifyComplete();
249+
}
250+
251+
228252
@Configuration
229253
@EnableIntegration
230254
@EnableMessageHistory
@@ -403,6 +427,23 @@ public Consumer<Message<?>> messageConsumerAsService() {
403427
return collector()::add;
404428
}
405429

430+
MonoProcessor<Message<?>> messageMonoProcessor = MonoProcessor.create();
431+
432+
@Bean
433+
MessageChannel reactiveMessageHandlerChannel() {
434+
return new FluxMessageChannel();
435+
}
436+
437+
@Bean
438+
@ServiceActivator(inputChannel = "reactiveMessageHandlerChannel")
439+
public ReactiveMessageHandler reactiveMessageHandlerService() {
440+
return (message) -> {
441+
messageMonoProcessor.onNext(message);
442+
messageMonoProcessor.onComplete();
443+
return Mono.empty();
444+
};
445+
}
446+
406447
}
407448

408449
@Configuration

src/reference/asciidoc/mongodb.adoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,3 +467,29 @@ private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
467467
}
468468
----
469469
====
470+
471+
[[mongodb-reactive-channel-adapters]]
472+
=== MongoDB Reactive Channel Adapters
473+
474+
Starting with version 5.3, the `ReactiveMongoDbStoringMessageHandler` implementation is provided.
475+
It is based on the `ReactiveMongoOperations` from Spring Data and requires a `org.mongodb:mongodb-driver-reactivestreams` dependency.
476+
This is an implementation of the `ReactiveMessageHandler` which is supported natively in the framework when reactive streams composition is involved in the integration flow definition.
477+
See more information in the <<./reactive-streams.adoc/reactive-message-handler,ReactiveMessageHandler>>.
478+
479+
From configuration perspective there is no difference with many other standard channel adapters.
480+
For example with Java DSL such a channel adapter could be used like:
481+
482+
====
483+
[source, java]
484+
----
485+
@Bean
486+
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
487+
return f -> f
488+
.channel(MessageChannels.flux())
489+
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
490+
}
491+
----
492+
====
493+
494+
In this sample we are going to connect to the MongoDb via provided `ReactiveMongoDatabaseFactory` and store a data from request message into a default collection with the `data` name.
495+
The real operation is going to be performed on-demand from the reactive stream composition in the internally created `ReactiveStreamsConsumer`.

src/reference/asciidoc/reactive-streams.adoc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,18 @@ A result of the function is wrapped into a `Mono<Message<?>>` for flat-mapping i
105105

106106
See <<./dsl.adoc#java-dsl,Java DSL Chapter>> for more information.
107107

108+
[[reactive-message-handler]]
109+
=== `ReactiveMessageHandler`
110+
111+
Starting with version 5.3, the `ReactiveMessageHandler` is supported natively in the framework.
112+
This type of message handler is designed for reactive clients which return a reactive type for on-demand subscription for low-level operation execution and doesn't provide any reply data to continue a reactive stream composition.
113+
When a `ReactiveMessageHandler` is used in the imperative integration flow, the `handleMessage()` result in subscribed immediately after return, just because there is no reactive streams composition in such a flow to honor back-pressure.
114+
In this case the framework wraps this `ReactiveMessageHandler` into a `ReactiveMessageHandlerAdapter` - a plain implementation of `MessageHandler`.
115+
However when a `ReactiveStreamsConsumer` is involved in the flow (e.g. when channel to consume is a `FluxMessageChannel`), such a `ReactiveMessageHandler` is composed to the whole reactive stream with a `flatMap()` Reactor operator to honor back-pressure during consumption.
116+
117+
One of the out-of-the-box `ReactiveMessageHandler` implementation is a `ReactiveMongoDbStoringMessageHandler` for Outbound Channel Adapter.
118+
See <<./mongodb.adoc#mongodb-reactive-channel-adapters,MongoDB Reactive Channel Adapters>> for more information.
119+
108120
[[reactive-channel-adapters]]
109121
=== Reactive Channel Adapters
110122

@@ -120,7 +132,7 @@ A reactive outbound channel adapter implementation is about initiation (or conti
120132
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top.
121133
A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
122134

123-
Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>> and <<./rsocket.adoc#rsocket,RSocket>>.
135+
Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>> and <<./mongodb.adoc#mongodb,MongoDb>>.
124136
Also an https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-cassandra[Apache Cassandra Extension] provides a `MessageHandler` implementation for the Cassandra reactive driver.
125137
More reactive channel adapters are coming, for example for https://r2dbc.io/[R2DBC], https://mongodb.github.io/mongo-java-driver-reactivestreams/[MongoDB], for Apache Kafka in https://github.com/spring-projects/spring-integration-kafka[Spring Integration Kafka] based on the `ReactiveKafkaProducerTemplate` and `ReactiveKafkaConsumerTemplate` from https://spring.io/projects/spring-kafka[Spring for Apache Kafka] etc.
126138
For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.

src/reference/asciidoc/whats-new.adoc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,23 @@ If you are interested in more details, see the Issue Tracker tickets that were r
2121
The `IntegrationPattern` abstraction has been introduced to indicate which enterprise integration pattern (an `IntegrationPatternType`) and category a Spring Integration component belongs to.
2222
See its JavaDocs and <<./graph.adoc#integration-graph,Integration Graph>> for more information about this abstraction and its use-cases.
2323

24+
[[x5.3-reactive-message-handler]]
25+
==== `ReactiveMessageHandler`
26+
27+
The `ReactiveMessageHandler` is now natively supported in the framework.
28+
See <<./reactive-streams.adoc/reactive-message-handler,ReactiveMessageHandler>> for more information.
29+
30+
[[x5.3-mongodb-reactive-channel-adapters]]
31+
==== MongoDB Reactive Channel Adapters
32+
33+
`spring-integration-mongodb` module now provides channel adapter implementations for Reactive MongoDB driver support in Spring Data.
34+
See <<./mongodb.adoc#mongodb-reactive-channel-adapters,MongoDB Reactive Channel Adapters>> for more information.
35+
2436
[[x5.3-general]]
2537
=== General Changes
2638

2739
The gateway proxy now doesn't proxy `default` methods by default.
28-
see <<./gateway.adoc/gateway-calling-default-methods,Invoking `default` Methods>> for more information.
40+
See <<./gateway.adoc/gateway-calling-default-methods,Invoking `default` Methods>> for more information.
2941

3042

3143
Internal components (such as `_org.springframework.integration.errorLogger`) now have a shortened name when they are represented in the integration graph.

0 commit comments

Comments
 (0)