Skip to content

Commit 02407f7

Browse files
authored
Add ReactiveMessageSourceProducer (#3254)
* Add `ReactiveMessageSourceProducer` The `ReactiveMessageSourceProducer` wraps a provided `MessageSource` into a `Flux` for subscription in the `subscribeToPublisher(Publisher<? extends Message<?>>)` to make a source polling feature fully based on a reactive, on demand solution * Introduce a `IntegrationReactiveUtils` replacing existing `MessageChannelReactiveUtils` with more functionality * Replace a deprecated `MessageChannelReactiveUtils` with a new `IntegrationReactiveUtils` * Test and document the feature * * Fix Docs typos * * Remove unused imports from `MessageChannelReactiveUtils` * * Fix JavaDoc copy/paste artifact
1 parent 9414765 commit 02407f7

File tree

10 files changed

+349
-54
lines changed

10 files changed

+349
-54
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/MessageChannelReactiveUtils.java

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,69 +16,29 @@
1616

1717
package org.springframework.integration.channel;
1818

19-
import java.time.Duration;
20-
2119
import org.reactivestreams.Publisher;
2220

21+
import org.springframework.integration.util.IntegrationReactiveUtils;
2322
import org.springframework.messaging.Message;
2423
import org.springframework.messaging.MessageChannel;
25-
import org.springframework.messaging.MessageHandler;
26-
import org.springframework.messaging.PollableChannel;
27-
import org.springframework.messaging.SubscribableChannel;
28-
29-
import reactor.core.publisher.EmitterProcessor;
30-
import reactor.core.publisher.Flux;
31-
import reactor.core.publisher.Mono;
32-
import reactor.core.scheduler.Schedulers;
3324

3425
/**
3526
* Utilities for adaptation {@link MessageChannel}s to the {@link Publisher}s.
3627
*
3728
* @author Artem Bilan
3829
*
3930
* @since 5.0
31+
*
32+
* @deprecated since 5.3 in favor of {@link IntegrationReactiveUtils}.
4033
*/
34+
@Deprecated
4135
public final class MessageChannelReactiveUtils {
4236

4337
private MessageChannelReactiveUtils() {
4438
}
4539

46-
@SuppressWarnings("unchecked")
4740
public static <T> Publisher<Message<T>> toPublisher(MessageChannel messageChannel) {
48-
if (messageChannel instanceof Publisher) {
49-
return (Publisher<Message<T>>) messageChannel;
50-
}
51-
else if (messageChannel instanceof SubscribableChannel) {
52-
return adaptSubscribableChannelToPublisher((SubscribableChannel) messageChannel);
53-
}
54-
else if (messageChannel instanceof PollableChannel) {
55-
return adaptPollableChannelToPublisher((PollableChannel) messageChannel);
56-
}
57-
else {
58-
throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, " +
59-
"SubscribableChannel or PollableChannel, not: " + messageChannel);
60-
}
61-
}
62-
63-
private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
64-
return Flux.defer(() -> {
65-
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
66-
@SuppressWarnings("unchecked")
67-
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
68-
inputChannel.subscribe(messageHandler);
69-
return publisher
70-
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
71-
});
72-
}
73-
74-
@SuppressWarnings("unchecked")
75-
private static <T> Publisher<Message<T>> adaptPollableChannelToPublisher(PollableChannel inputChannel) {
76-
return Mono.<Message<T>>create(monoSink ->
77-
monoSink.onRequest(value ->
78-
monoSink.success((Message<T>) inputChannel.receive(0))))
79-
.subscribeOn(Schedulers.boundedElastic())
80-
.repeatWhenEmpty(it -> it.delayElements(Duration.ofMillis(100))) // NOSONAR - magic
81-
.repeat();
41+
return IntegrationReactiveUtils.messageChannelToFlux(messageChannel);
8242
}
8343

8444
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.springframework.integration.channel.DirectChannel;
4040
import org.springframework.integration.channel.FixedSubscriberChannel;
4141
import org.springframework.integration.channel.FluxMessageChannel;
42-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
4342
import org.springframework.integration.channel.interceptor.WireTap;
4443
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
4544
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
@@ -86,6 +85,7 @@
8685
import org.springframework.integration.transformer.MethodInvokingTransformer;
8786
import org.springframework.integration.transformer.Transformer;
8887
import org.springframework.integration.util.ClassUtils;
88+
import org.springframework.integration.util.IntegrationReactiveUtils;
8989
import org.springframework.lang.Nullable;
9090
import org.springframework.messaging.Message;
9191
import org.springframework.messaging.MessageChannel;
@@ -2920,7 +2920,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {
29202920
if (channelForPublisher != null && components.size() > 1
29212921
&& !(channelForPublisher instanceof MessageChannelReference) &&
29222922
!(channelForPublisher instanceof FixedSubscriberChannelPrototype)) {
2923-
publisher = MessageChannelReactiveUtils.toPublisher(channelForPublisher);
2923+
publisher = IntegrationReactiveUtils.messageChannelToFlux(channelForPublisher);
29242924
}
29252925
else {
29262926
MessageChannel reactiveChannel = new FluxMessageChannel();
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.endpoint;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.integration.core.MessageSource;
22+
import org.springframework.integration.util.IntegrationReactiveUtils;
23+
import org.springframework.messaging.Message;
24+
import org.springframework.util.Assert;
25+
26+
import reactor.core.publisher.Flux;
27+
28+
/**
29+
* The {@link MessageProducerSupport} to adapt a provided {@link MessageSource}
30+
* into a {@link Flux} and let it be subscribed in the {@link #subscribeToPublisher}
31+
*
32+
* @author Artem Bilan
33+
*
34+
* @since 5.3
35+
*/
36+
public class ReactiveMessageSourceProducer extends MessageProducerSupport {
37+
38+
private final Flux<? extends Message<?>> messageFlux;
39+
40+
private Duration delayWhenEmpty = IntegrationReactiveUtils.DEFAULT_DELAY_WHEN_EMPTY;
41+
42+
/**
43+
* Create an instance based on the provided {@link MessageSource}.
44+
* @param messageSource the {@link MessageSource} to pull for messages.
45+
*/
46+
public ReactiveMessageSourceProducer(MessageSource<?> messageSource) {
47+
Assert.notNull(messageSource, "'messageSource' must not be null");
48+
this.messageFlux =
49+
IntegrationReactiveUtils.messageSourceToFlux(messageSource)
50+
.subscriberContext((ctx) ->
51+
ctx.put(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, this.delayWhenEmpty));
52+
}
53+
54+
/**
55+
* Configure a {@link Duration} to delay next pull request when the previous one
56+
* was empty. Defaults to {@link IntegrationReactiveUtils#DEFAULT_DELAY_WHEN_EMPTY}.
57+
* @param delayWhenEmpty the {@link Duration} to use.
58+
*/
59+
public void setDelayWhenEmpty(Duration delayWhenEmpty) {
60+
Assert.notNull(delayWhenEmpty, "'delayWhenEmpty' must not be null");
61+
this.delayWhenEmpty = delayWhenEmpty;
62+
}
63+
64+
@Override
65+
protected void doStart() {
66+
subscribeToPublisher(this.messageFlux);
67+
}
68+
69+
}

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424

2525
import org.springframework.context.Lifecycle;
2626
import org.springframework.integration.channel.ChannelUtils;
27-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
2827
import org.springframework.integration.channel.NullChannel;
2928
import org.springframework.integration.core.MessageProducer;
3029
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
3130
import org.springframework.integration.router.MessageRouter;
31+
import org.springframework.integration.util.IntegrationReactiveUtils;
3232
import org.springframework.lang.Nullable;
3333
import org.springframework.messaging.Message;
3434
import org.springframework.messaging.MessageChannel;
@@ -89,7 +89,7 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, Subscriber<Message<?
8989
"it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
9090
}
9191

92-
this.publisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
92+
this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
9393
this.subscriber = subscriber;
9494
this.lifecycleDelegate = subscriber instanceof Lifecycle ? (Lifecycle) subscriber : null;
9595
if (subscriber instanceof MessageHandlerSubscriber) {
@@ -115,7 +115,7 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, ReactiveMessageHandl
115115
this.inputChannel = inputChannel;
116116
this.handler = new ReactiveMessageHandlerAdapter(reactiveMessageHandler);
117117
this.reactiveMessageHandler = reactiveMessageHandler;
118-
this.publisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
118+
this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
119119
this.subscriber = null;
120120
this.lifecycleDelegate =
121121
reactiveMessageHandler instanceof Lifecycle ? (Lifecycle) reactiveMessageHandler : null;
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.util;
18+
19+
import java.time.Duration;
20+
import java.util.function.Function;
21+
22+
import org.reactivestreams.Publisher;
23+
24+
import org.springframework.integration.StaticMessageHeaderAccessor;
25+
import org.springframework.integration.acks.AckUtils;
26+
import org.springframework.integration.core.MessageSource;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.messaging.MessageChannel;
29+
import org.springframework.messaging.MessageHandler;
30+
import org.springframework.messaging.MessagingException;
31+
import org.springframework.messaging.PollableChannel;
32+
import org.springframework.messaging.SubscribableChannel;
33+
34+
import reactor.core.publisher.EmitterProcessor;
35+
import reactor.core.publisher.Flux;
36+
import reactor.core.publisher.Mono;
37+
import reactor.core.scheduler.Schedulers;
38+
39+
/**
40+
* Utilities for adapting integration components to/from reactive types.
41+
*
42+
* @author Artem Bilan
43+
*
44+
* @since 5.3
45+
*/
46+
public final class IntegrationReactiveUtils {
47+
48+
/**
49+
* The subscriber context entry for {@link Flux#delayElements}
50+
* from the {@link Mono#repeatWhenEmpty(Function)}.
51+
*/
52+
public static final String DELAY_WHEN_EMPTY_KEY = "DELAY_WHEN_EMPTY_KEY";
53+
54+
/**
55+
* A default delay before repeating an empty source {@link Mono} as 1 second {@link Duration}.
56+
*/
57+
public static final Duration DEFAULT_DELAY_WHEN_EMPTY = Duration.ofSeconds(1);
58+
59+
private IntegrationReactiveUtils() {
60+
}
61+
62+
/**
63+
* Wrap a provided {@link MessageSource} into a {@link Flux} for pulling the on demand.
64+
* When {@link MessageSource#receive()} returns {@code null}, the source {@link Mono}
65+
* goes to the {@link Mono#repeatWhenEmpty} state and performs a {@code delay}
66+
* based on the {@link #DELAY_WHEN_EMPTY_KEY} {@link Duration} entry in the subscriber context
67+
* or falls back to 1 second duration.
68+
* If a produced message has an
69+
* {@link org.springframework.integration.IntegrationMessageHeaderAccessor#ACKNOWLEDGMENT_CALLBACK} header
70+
* it is ack'ed in the {@link Mono#doOnSuccess} and nack'ed in the {@link Mono#doOnError}.
71+
* @param messageSource the {@link MessageSource} to adapt.
72+
* @param <T> the expected payload type.
73+
* @return a {@link Flux} which pulls messages from the {@link MessageSource} on demand.
74+
*/
75+
public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) {
76+
return Mono.
77+
<Message<T>>create(monoSink ->
78+
monoSink.onRequest(value ->
79+
monoSink.success(messageSource.receive())))
80+
.doOnSuccess((message) ->
81+
AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message)))
82+
.doOnError(MessagingException.class,
83+
(ex) -> {
84+
Message<?> failedMessage = ex.getFailedMessage();
85+
if (failedMessage != null) {
86+
AckUtils.autoNack(StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMessage));
87+
}
88+
})
89+
.subscribeOn(Schedulers.boundedElastic())
90+
.repeatWhenEmpty((repeat) ->
91+
repeat.flatMap((increment) ->
92+
Mono.subscriberContext()
93+
.flatMap(ctx ->
94+
Mono.delay(ctx.getOrDefault(DELAY_WHEN_EMPTY_KEY,
95+
DEFAULT_DELAY_WHEN_EMPTY)))))
96+
.repeat()
97+
.retry();
98+
}
99+
100+
/**
101+
* Adapt a provided {@link MessageChannel} into a {@link Flux} source:
102+
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
103+
* is returned as is because it is already a {@link Publisher};
104+
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
105+
* for the {@link EmitterProcessor#onNext(Object)} which is returned from this method;
106+
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
107+
* {@link #messageSourceToFlux(MessageSource)}.
108+
* @param messageChannel the {@link MessageChannel} to adapt.
109+
* @param <T> the expected payload type.
110+
* @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
111+
*/
112+
@SuppressWarnings("unchecked")
113+
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
114+
if (messageChannel instanceof Publisher) {
115+
return Flux.from((Publisher<Message<T>>) messageChannel);
116+
}
117+
else if (messageChannel instanceof SubscribableChannel) {
118+
return adaptSubscribableChannelToPublisher((SubscribableChannel) messageChannel);
119+
}
120+
else if (messageChannel instanceof PollableChannel) {
121+
return messageSourceToFlux(() -> (Message<T>) ((PollableChannel) messageChannel).receive(0));
122+
}
123+
else {
124+
throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, " +
125+
"SubscribableChannel or PollableChannel, not: " + messageChannel);
126+
}
127+
}
128+
129+
private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
130+
return Flux.defer(() -> {
131+
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
132+
@SuppressWarnings("unchecked")
133+
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
134+
inputChannel.subscribe(messageHandler);
135+
return publisher
136+
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
137+
});
138+
}
139+
140+
}

spring-integration-core/src/test/java/org/springframework/integration/channel/MessageChannelReactiveUtilsTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.junit.jupiter.api.Test;
2525

26+
import org.springframework.integration.util.IntegrationReactiveUtils;
2627
import org.springframework.messaging.support.GenericMessage;
2728

2829
import reactor.core.Disposable;
@@ -45,7 +46,7 @@ void testBackpressureWithSubscribableChannel() {
4546
try {
4647
DirectChannel channel = new DirectChannel();
4748
int initialRequest = 10;
48-
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
49+
StepVerifier.create(IntegrationReactiveUtils.messageChannelToFlux(channel), initialRequest)
4950
.expectSubscription()
5051
.then(() -> {
5152
compositeDisposable.add(
@@ -77,7 +78,7 @@ void testOverproducingWithSubscribableChannel() {
7778
AtomicInteger sendCount = new AtomicInteger();
7879
try {
7980
int initialRequest = 10;
80-
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
81+
StepVerifier.create(IntegrationReactiveUtils.messageChannelToFlux(channel), initialRequest)
8182
.expectSubscription()
8283
.then(() ->
8384
compositeDisposable.add(

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@
3333
import org.springframework.integration.annotation.BridgeFrom;
3434
import org.springframework.integration.annotation.ServiceActivator;
3535
import org.springframework.integration.channel.FluxMessageChannel;
36-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
3736
import org.springframework.integration.channel.QueueChannel;
3837
import org.springframework.integration.config.EnableIntegration;
3938
import org.springframework.integration.dsl.IntegrationFlow;
4039
import org.springframework.integration.dsl.MessageChannels;
4140
import org.springframework.integration.dsl.context.IntegrationFlowContext;
4241
import org.springframework.integration.test.util.TestUtils;
42+
import org.springframework.integration.util.IntegrationReactiveUtils;
4343
import org.springframework.messaging.Message;
4444
import org.springframework.messaging.MessageChannel;
4545
import org.springframework.messaging.MessagingException;
@@ -104,7 +104,7 @@ void testMessageChannelReactiveAdaptation() throws InterruptedException {
104104
List<String> results = new ArrayList<>();
105105

106106
Disposable disposable =
107-
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
107+
IntegrationReactiveUtils.<String>messageChannelToFlux(this.queueChannel)
108108
.map(Message::getPayload)
109109
.map(String::toUpperCase)
110110
.doOnNext(results::add)

0 commit comments

Comments
 (0)