Skip to content

Commit d13752b

Browse files
artembilangaryrussell
authored andcommitted
Infrastructure for ReactiveMessageHandler (#3137)
* Infrastructure for ReactiveMessageHandler We have now a `ReactiveMongoDbStoringMessageHandler` which implements a `ReactiveMessageHandler`, but not a `MessageHandler` for possible deferred subscriptions to the returned Reactor type We don't have a proper application context processing for this new type of message handlers * Change a `ConsumerEndpointFactoryBean` to apply an `MH` and `RMH` as possible types for handler * Introduce a `ReactiveMessageHandlerAdapter` to wrap an `RMH` into a `MH` for synchronous calls in the regular consumer endpoints * Wrap an `RMH` into a `ReactiveMessageHandlerAdapter` for regular endpoints and unwrap for `ReactiveStreamsConsumer` * Add `RMH`-based ctor into `ReactiveStreamsConsumer` for target reactive streams composition (`flatMap()` on the `RMH`) * Remove a `DelegatingSubscriber` from the `ReactiveStreamsConsumer` in favor of direct calls from the `doOnSubscribe()`, `doOnComplete()` & `doOnNext()` * Add an `onErrorContinue()` to handle per-message errors, but don't cancel the whole source `Publisher` * Use `Disposable` from the `subscribe()` to cancel in the `stop()` - recommended way in Reactor * Use `onErrorContinue()` in the `FluxMessageChannel` instead of `try..catch` in the `doOnNext()` - for possible `onErrorStop()` in the provided upstream `Publisher` * Handle `RMH` in the `ServiceActivatorFactoryBean` as a direct handler as well with wrapping into `ReactiveMessageHandlerAdapter` for return. The `ConsumerEndpointFactoryBean` extracts an `RMH` from the adapter for the `ReactiveStreamsConsumer` anyway * Add XML parsing test for `ReactiveMongoDbStoringMessageHandler` * Add `log4j-slf4j-impl` for all the test runtime since `slf4j-api` comes as a transitive dependency from many places * * Fix conflicts after rebasing to master * * Fix typo in warn message * Change `Assert.state()` to `Assert.isTrue()` for `ConsumerEndpointFactoryBean.setHandler()` * * Fix `ConsumerEndpointFactoryBean` when reactive and no advice-chain * Fix race condition in the `ReactiveMongoDbStoringMessageHandlerTests.testReactiveMongoMessageHandlerFromApplicationContext()` * * Handle `ReactiveMessageHandler` in Java DSL. Essentially request a wrapping into `ReactiveMessageHandlerAdapter`. Describe such a requirements in the `ReactiveMessageHandlerAdapter` JavaDocs * Some Java DSL test polishing * Add Java DSL for `ReactiveMongoDbStoringMessageHandler` * Propagate missed `ApplicationContext` population into an internally created `ReactiveMongoTemplate` in the `ReactiveMongoDbStoringMessageHandler`
1 parent e0509cc commit d13752b

File tree

18 files changed

+500
-130
lines changed

18 files changed

+500
-130
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ configure(javaProjects) { subproject ->
237237

238238
testRuntimeOnly 'org.apache.logging.log4j:log4j-core'
239239
testRuntimeOnly 'org.apache.logging.log4j:log4j-jcl'
240+
testRuntimeOnly 'org.apache.logging.log4j:log4j-slf4j-impl'
240241
}
241242

242243
// enable all compiler warnings; individual projects may customize further
@@ -471,7 +472,6 @@ project('spring-integration-gemfire') {
471472
api "commons-io:commons-io:$commonsIoVersion"
472473

473474
testImplementation project(':spring-integration-stream')
474-
testRuntimeOnly 'org.apache.logging.log4j:log4j-slf4j-impl'
475475
}
476476
}
477477

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,14 +78,8 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
7878
Flux.from(publisher)
7979
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
8080
.publishOn(Schedulers.boundedElastic())
81-
.doOnNext((message) -> {
82-
try {
83-
send(message);
84-
}
85-
catch (Exception e) {
86-
logger.warn("Error during processing event: " + message, e);
87-
}
88-
})
81+
.doOnNext(this::send)
82+
.onErrorContinue((ex, message) -> logger.warn("Error during processing event: " + message, ex))
8983
.subscribe());
9084
}
9185

spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@
4242
import org.springframework.integration.endpoint.PollingConsumer;
4343
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
4444
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
45+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
4546
import org.springframework.integration.handler.advice.HandleMessageAdvice;
4647
import org.springframework.integration.scheduling.PollerMetadata;
4748
import org.springframework.integration.support.channel.ChannelResolverUtils;
4849
import org.springframework.messaging.MessageChannel;
4950
import org.springframework.messaging.MessageHandler;
5051
import org.springframework.messaging.PollableChannel;
52+
import org.springframework.messaging.ReactiveMessageHandler;
5153
import org.springframework.messaging.SubscribableChannel;
5254
import org.springframework.messaging.core.DestinationResolver;
5355
import org.springframework.scheduling.TaskScheduler;
@@ -112,11 +114,17 @@ public class ConsumerEndpointFactoryBean
112114

113115
private volatile boolean initialized;
114116

115-
public void setHandler(MessageHandler handler) {
116-
Assert.notNull(handler, "handler must not be null");
117+
public void setHandler(Object handler) {
118+
Assert.isTrue(handler instanceof MessageHandler || handler instanceof ReactiveMessageHandler,
119+
"'handler' must be an instance of 'MessageHandler' or 'ReactiveMessageHandler'");
117120
synchronized (this.handlerMonitor) {
118121
Assert.isNull(this.handler, "handler cannot be overridden");
119-
this.handler = handler;
122+
if (handler instanceof ReactiveMessageHandler) {
123+
this.handler = new ReactiveMessageHandlerAdapter((ReactiveMessageHandler) handler);
124+
}
125+
else {
126+
this.handler = (MessageHandler) handler;
127+
}
120128
}
121129
}
122130

@@ -210,7 +218,12 @@ public void afterPropertiesSet() {
210218
}
211219
}
212220

213-
adviceChain();
221+
if (!(this.handler instanceof ReactiveMessageHandlerAdapter)) {
222+
adviceChain();
223+
}
224+
else if (!CollectionUtils.isEmpty(this.adviceChain)) {
225+
LOGGER.warn("the advice chain cannot be applied to a 'ReactiveMessageHandler'");
226+
}
214227
if (this.channelResolver == null) {
215228
this.channelResolver = ChannelResolverUtils.getChannelResolver(this.beanFactory);
216229
}
@@ -282,7 +295,13 @@ else if (channel instanceof PollableChannel) {
282295
pollingConsumer(channel);
283296
}
284297
else {
285-
this.endpoint = new ReactiveStreamsConsumer(channel, this.handler);
298+
if (this.handler instanceof ReactiveMessageHandlerAdapter) {
299+
this.endpoint = new ReactiveStreamsConsumer(channel,
300+
((ReactiveMessageHandlerAdapter) this.handler).getDelegate());
301+
}
302+
else {
303+
this.endpoint = new ReactiveStreamsConsumer(channel, this.handler);
304+
}
286305
}
287306
this.endpoint.setBeanName(this.beanName);
288307
this.endpoint.setBeanFactory(this.beanFactory);

spring-integration-core/src/main/java/org/springframework/integration/config/ServiceActivatorFactoryBean.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,9 +22,11 @@
2222
import org.springframework.integration.handler.AbstractMessageProducingHandler;
2323
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
2424
import org.springframework.integration.handler.MessageProcessor;
25+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
2526
import org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper;
2627
import org.springframework.integration.handler.ServiceActivatingHandler;
2728
import org.springframework.messaging.MessageHandler;
29+
import org.springframework.messaging.ReactiveMessageHandler;
2830
import org.springframework.util.StringUtils;
2931

3032
/**
@@ -47,7 +49,7 @@ public void setNotPropagatedHeaders(String... headers) {
4749

4850
@Override
4951
protected MessageHandler createMethodInvokingHandler(Object targetObject, String targetMethodName) {
50-
MessageHandler handler = null;
52+
MessageHandler handler;
5153
handler = createDirectHandlerIfPossible(targetObject, targetMethodName);
5254
if (handler == null) {
5355
handler = configureHandler(
@@ -67,34 +69,38 @@ protected MessageHandler createMethodInvokingHandler(Object targetObject, String
6769
*/
6870
protected MessageHandler createDirectHandlerIfPossible(final Object targetObject, String targetMethodName) {
6971
MessageHandler handler = null;
70-
if (targetObject instanceof MessageHandler
71-
&& this.methodIsHandleMessageOrEmpty(targetMethodName)) {
72+
if ((targetObject instanceof MessageHandler || targetObject instanceof ReactiveMessageHandler)
73+
&& methodIsHandleMessageOrEmpty(targetMethodName)) {
7274
if (targetObject instanceof AbstractMessageProducingHandler) {
7375
// should never happen but just return it if it's already an AMPH
7476
return (MessageHandler) targetObject;
7577
}
76-
/*
77-
* Return a reply-producing message handler so that we still get 'produced no reply' messages
78-
* and the super class will inject the advice chain to advise the handler method if needed.
79-
*/
80-
handler = new ReplyProducingMessageHandlerWrapper((MessageHandler) targetObject);
81-
78+
if (targetObject instanceof ReactiveMessageHandler) {
79+
handler = new ReactiveMessageHandlerAdapter((ReactiveMessageHandler) targetObject);
80+
}
81+
else {
82+
/*
83+
* Return a reply-producing message handler so that we still get 'produced no reply' messages
84+
* and the super class will inject the advice chain to advise the handler method if needed.
85+
*/
86+
handler = new ReplyProducingMessageHandlerWrapper((MessageHandler) targetObject);
87+
}
8288
}
8389
return handler;
8490
}
8591

8692
@Override
8793
protected MessageHandler createExpressionEvaluatingHandler(Expression expression) {
88-
ExpressionEvaluatingMessageProcessor<Object> processor = new ExpressionEvaluatingMessageProcessor<Object>(expression);
89-
processor.setBeanFactory(this.getBeanFactory());
94+
ExpressionEvaluatingMessageProcessor<Object> processor = new ExpressionEvaluatingMessageProcessor<>(expression);
95+
processor.setBeanFactory(getBeanFactory());
9096
ServiceActivatingHandler handler = new ServiceActivatingHandler(processor);
9197
handler.setPrimaryExpression(expression);
9298
return this.configureHandler(handler);
9399
}
94100

95101
@Override
96102
protected <T> MessageHandler createMessageProcessingHandler(MessageProcessor<T> processor) {
97-
return this.configureHandler(new ServiceActivatingHandler(processor));
103+
return configureHandler(new ServiceActivatingHandler(processor));
98104
}
99105

100106
protected MessageHandler configureHandler(ServiceActivatingHandler handler) {
@@ -115,7 +121,6 @@ protected boolean canBeUsedDirect(AbstractMessageProducingHandler handler) {
115121
@Override
116122
protected void postProcessReplyProducer(AbstractMessageProducingHandler handler) {
117123
super.postProcessReplyProducer(handler);
118-
119124
if (this.headers != null) {
120125
handler.setNotPropagatedHeaders(this.headers);
121126
}

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,19 +27,25 @@
2727
import org.springframework.integration.channel.MessageChannelReactiveUtils;
2828
import org.springframework.integration.channel.NullChannel;
2929
import org.springframework.integration.core.MessageProducer;
30+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
3031
import org.springframework.integration.router.MessageRouter;
32+
import org.springframework.lang.Nullable;
3133
import org.springframework.messaging.Message;
3234
import org.springframework.messaging.MessageChannel;
3335
import org.springframework.messaging.MessageHandler;
36+
import org.springframework.messaging.ReactiveMessageHandler;
3437
import org.springframework.util.Assert;
3538
import org.springframework.util.ErrorHandler;
3639

3740
import reactor.core.CoreSubscriber;
3841
import reactor.core.Disposable;
39-
import reactor.core.publisher.BaseSubscriber;
42+
import reactor.core.publisher.Flux;
4043

4144

4245
/**
46+
* An {@link AbstractEndpoint} implementation for Reactive Streams subscription into an
47+
* input channel and reactive consumption of messages from that channel.
48+
*
4349
* @author Artem Bilan
4450
*
4551
* @since 5.0
@@ -48,17 +54,22 @@ public class ReactiveStreamsConsumer extends AbstractEndpoint implements Integra
4854

4955
private final MessageChannel inputChannel;
5056

57+
private final Publisher<Message<Object>> publisher;
58+
5159
private final MessageHandler handler;
5260

53-
private final Publisher<Message<Object>> publisher;
61+
@Nullable
62+
private final ReactiveMessageHandler reactiveMessageHandler;
5463

64+
@Nullable
5565
private final Subscriber<Message<?>> subscriber;
5666

67+
@Nullable
5768
private final Lifecycle lifecycleDelegate;
5869

5970
private ErrorHandler errorHandler;
6071

61-
private volatile Subscription subscription;
72+
private volatile Disposable subscription;
6273

6374
@SuppressWarnings("unchecked")
6475
public ReactiveStreamsConsumer(MessageChannel inputChannel, MessageHandler messageHandler) {
@@ -68,10 +79,10 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, MessageHandler messa
6879
: new MessageHandlerSubscriber(messageHandler));
6980
}
7081

71-
public ReactiveStreamsConsumer(MessageChannel inputChannel, final Subscriber<Message<?>> subscriber) {
72-
this.inputChannel = inputChannel;
82+
public ReactiveStreamsConsumer(MessageChannel inputChannel, Subscriber<Message<?>> subscriber) {
7383
Assert.notNull(inputChannel, "'inputChannel' must not be null");
7484
Assert.notNull(subscriber, "'subscriber' must not be null");
85+
this.inputChannel = inputChannel;
7586

7687
if (inputChannel instanceof NullChannel && logger.isWarnEnabled()) {
7788
logger.warn("The consuming from the NullChannel does not have any effects: " +
@@ -90,6 +101,24 @@ else if (subscriber instanceof MessageHandler) {
90101
else {
91102
this.handler = this.subscriber::onNext;
92103
}
104+
this.reactiveMessageHandler = null;
105+
}
106+
107+
/**
108+
* Instantiate an endpoint based on the provided {@link MessageChannel} and {@link ReactiveMessageHandler}.
109+
* @param inputChannel the channel to consume in reactive manner.
110+
* @param reactiveMessageHandler the {@link ReactiveMessageHandler} to process messages.
111+
* @since 5.3
112+
*/
113+
public ReactiveStreamsConsumer(MessageChannel inputChannel, ReactiveMessageHandler reactiveMessageHandler) {
114+
Assert.notNull(inputChannel, "'inputChannel' must not be null");
115+
this.inputChannel = inputChannel;
116+
this.handler = new ReactiveMessageHandlerAdapter(reactiveMessageHandler);
117+
this.reactiveMessageHandler = reactiveMessageHandler;
118+
this.publisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
119+
this.subscriber = null;
120+
this.lifecycleDelegate =
121+
reactiveMessageHandler instanceof Lifecycle ? (Lifecycle) reactiveMessageHandler : null;
93122
}
94123

95124
public void setErrorHandler(ErrorHandler errorHandler) {
@@ -132,55 +161,35 @@ protected void doStart() {
132161
if (this.lifecycleDelegate != null) {
133162
this.lifecycleDelegate.start();
134163
}
135-
this.publisher.subscribe(new DelegatingSubscriber());
164+
165+
Flux<?> flux = null;
166+
if (this.reactiveMessageHandler != null) {
167+
flux = Flux.from(this.publisher)
168+
.flatMap(this.reactiveMessageHandler::handleMessage);
169+
}
170+
else if (this.subscriber != null) {
171+
flux = Flux.from(this.publisher)
172+
.doOnSubscribe(this.subscriber::onSubscribe)
173+
.doOnComplete(this.subscriber::onComplete)
174+
.doOnNext(this.subscriber::onNext);
175+
}
176+
if (flux != null) {
177+
this.subscription =
178+
flux.onErrorContinue((ex, data) -> this.errorHandler.handleError(ex))
179+
.subscribe();
180+
}
136181
}
137182

138183
@Override
139184
protected void doStop() {
140185
if (this.subscription != null) {
141-
this.subscription.cancel();
186+
this.subscription.dispose();
142187
}
143188
if (this.lifecycleDelegate != null) {
144189
this.lifecycleDelegate.stop();
145190
}
146191
}
147192

148-
private final class DelegatingSubscriber extends BaseSubscriber<Message<?>> {
149-
150-
private final Subscriber<Message<?>> delegate = ReactiveStreamsConsumer.this.subscriber;
151-
152-
DelegatingSubscriber() {
153-
}
154-
155-
@Override
156-
public void hookOnSubscribe(Subscription s) {
157-
ReactiveStreamsConsumer.this.subscription = s;
158-
this.delegate.onSubscribe(s);
159-
}
160-
161-
@Override
162-
public void hookOnNext(Message<?> message) {
163-
try {
164-
this.delegate.onNext(message);
165-
}
166-
catch (Exception e) {
167-
ReactiveStreamsConsumer.this.errorHandler.handleError(e);
168-
hookOnError(e);
169-
}
170-
}
171-
172-
@Override
173-
public void hookOnError(Throwable t) {
174-
this.delegate.onError(t);
175-
}
176-
177-
@Override
178-
public void hookOnComplete() {
179-
this.delegate.onComplete();
180-
}
181-
182-
}
183-
184193
private static final class MessageHandlerSubscriber
185194
implements CoreSubscriber<Message<?>>, Disposable, Lifecycle {
186195

0 commit comments

Comments
 (0)