Skip to content

Commit f9fe709

Browse files
artembilangaryrussell
authored andcommitted
Add Java DSL .fluxTransform(Function) operator (#2541)
* Add Java DSL `.fluxTransform(Function)` operator For better interoperability with Reactor `Flux` from the end-user perspective introduce an operator which could call a target `Function` with integration data wrapped to the `Flux` and expect a `Publisher<?>` result to continue the flow. This way end-user just needs to implement a `Function` (or method) to accept the `Flux` as an input and return some `Publisher` after the sequence of reactive operators. Such a new operator also allows a smooth integration with the Spring Cloud Function, where it is just enough to lookup the function in the catalog and inject it into this operator * * Move `fluxTransform()` body to `Transformers` for cleaner context distribution
1 parent e053e2a commit f9fe709

File tree

3 files changed

+98
-2
lines changed

3 files changed

+98
-2
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.springframework.util.CollectionUtils;
9797
import org.springframework.util.StringUtils;
9898

99+
import reactor.core.publisher.Flux;
99100
import reactor.util.function.Tuple2;
100101

101102
/**
@@ -544,7 +545,7 @@ public B transform(Object service, String methodName,
544545
* @see LambdaMessageProcessor
545546
*/
546547
public <S, T> B transform(GenericTransformer<S, T> genericTransformer) {
547-
return this.transform(null, genericTransformer);
548+
return transform(null, genericTransformer);
548549
}
549550

550551
/**
@@ -2866,6 +2867,30 @@ public B trigger(MessageTriggerAction triggerAction,
28662867
return handle(new ServiceActivatingHandler(triggerAction, "trigger"), endpointConfigurer);
28672868
}
28682869

2870+
/**
2871+
* Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
2872+
* wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}
2873+
* and emit the result to one more {@link FluxMessageChannel}, subscribed in the downstream flow.
2874+
* @param fluxFunction the {@link Function} to process data reactive manner.
2875+
* @return the current {@link IntegrationFlowDefinition}.
2876+
*/
2877+
@SuppressWarnings("unchecked")
2878+
public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {
2879+
if (!(this.currentMessageChannel instanceof FluxMessageChannel)) {
2880+
channel(new FluxMessageChannel());
2881+
}
2882+
2883+
Publisher<Message<I>> upstream = (Publisher<Message<I>>) this.currentMessageChannel;
2884+
2885+
Flux<Message<O>> result = Transformers.transformWithFunction(upstream, fluxFunction);
2886+
2887+
FluxMessageChannel downstream = new FluxMessageChannel();
2888+
downstream.subscribeTo((Flux<Message<?>>) (Flux<?>) result);
2889+
2890+
this.currentMessageChannel = downstream;
2891+
2892+
return addComponent(this.currentMessageChannel);
2893+
}
28692894

28702895
/**
28712896
* Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.

spring-integration-core/src/main/java/org/springframework/integration/dsl/Transformers.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.util.concurrent.atomic.AtomicReference;
1920
import java.util.function.Function;
2021

22+
import org.reactivestreams.Publisher;
23+
2124
import org.springframework.core.convert.converter.Converter;
2225
import org.springframework.core.serializer.Deserializer;
2326
import org.springframework.core.serializer.Serializer;
@@ -27,6 +30,7 @@
2730
import org.springframework.integration.expression.FunctionExpression;
2831
import org.springframework.integration.json.JsonToObjectTransformer;
2932
import org.springframework.integration.json.ObjectToJsonTransformer;
33+
import org.springframework.integration.support.MessageBuilder;
3034
import org.springframework.integration.support.json.JsonObjectMapper;
3135
import org.springframework.integration.transformer.DecodingTransformer;
3236
import org.springframework.integration.transformer.EncodingPayloadTransformer;
@@ -41,6 +45,9 @@
4145
import org.springframework.messaging.Message;
4246
import org.springframework.util.Assert;
4347

48+
import reactor.core.publisher.Flux;
49+
import reactor.core.publisher.Mono;
50+
4451
/**
4552
* An utility class to provide methods for out-of-the-box
4653
* {@link org.springframework.integration.transformer.Transformer}s.
@@ -262,4 +269,33 @@ public static StreamTransformer fromStream(String charset) {
262269
return new StreamTransformer(charset);
263270
}
264271

272+
273+
@SuppressWarnings("unchecked")
274+
static <I, O> Flux<Message<O>> transformWithFunction(Publisher<Message<I>> publisher,
275+
Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {
276+
277+
return Flux.from(publisher)
278+
.flatMap(message ->
279+
Mono.subscriberContext()
280+
.map(ctx -> {
281+
ctx.get(RequestMessageHolder.class).set(message);
282+
return message;
283+
}))
284+
.transform(fluxFunction)
285+
.flatMap(data ->
286+
data instanceof Message<?>
287+
? Mono.just((Message<O>) data)
288+
: Mono.subscriberContext()
289+
.map(ctx -> ctx.get(RequestMessageHolder.class).get())
290+
.map(requestMessage ->
291+
MessageBuilder.withPayload(data)
292+
.copyHeaders(requestMessage.getHeaders())
293+
.build()))
294+
.subscriberContext(ctx -> ctx.put(RequestMessageHolder.class, new RequestMessageHolder()));
295+
}
296+
297+
private static class RequestMessageHolder extends AtomicReference<Message<?>> {
298+
299+
}
300+
265301
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicBoolean;
3535
import java.util.logging.Level;
36+
import java.util.stream.Collectors;
3637

3738
import org.junit.Test;
3839
import org.junit.runner.RunWith;
@@ -101,7 +102,7 @@ public void testReactiveFlow() throws Exception {
101102
this.messageSource.start();
102103
assertTrue(latch.await(10, TimeUnit.SECONDS));
103104
String[] strings = results.toArray(new String[results.size()]);
104-
assertArrayEquals(new String[] {"A", "B", "C", "D", "E", "F"}, strings);
105+
assertArrayEquals(new String[] { "A", "B", "C", "D", "E", "F" }, strings);
105106
this.messageSource.stop();
106107
}
107108

@@ -172,6 +173,40 @@ public void testFromPublisher() {
172173
}
173174
}
174175

176+
@Test
177+
public void testFluxTransform() {
178+
QueueChannel resultChannel = new QueueChannel();
179+
180+
IntegrationFlow integrationFlow = f -> f
181+
.split()
182+
.<String, String>fluxTransform(flux -> flux
183+
.map(Message::getPayload)
184+
.map(String::toUpperCase))
185+
.aggregate(a -> a
186+
.outputProcessor(group -> group
187+
.getMessages()
188+
.stream()
189+
.map(Message::getPayload)
190+
.map(String.class::cast)
191+
.collect(Collectors.joining(","))))
192+
.channel(resultChannel);
193+
194+
IntegrationFlowContext.IntegrationFlowRegistration integrationFlowRegistration =
195+
this.integrationFlowContext
196+
.registration(integrationFlow)
197+
.register();
198+
199+
MessageChannel inputChannel = integrationFlowRegistration.getInputChannel();
200+
inputChannel.send(new GenericMessage<>("a,b,c,d,e"));
201+
202+
Message<?> receive = resultChannel.receive(10_000);
203+
204+
assertNotNull(receive);
205+
assertEquals("A,B,C,D,E", receive.getPayload());
206+
207+
integrationFlowRegistration.destroy();
208+
}
209+
175210
@Configuration
176211
@EnableIntegration
177212
public static class ContextConfiguration {

0 commit comments

Comments
 (0)