Skip to content

Commit 2f4ae47

Browse files
committed
GH-3009 Add post processing support for Supplier
Resolves #3009
1 parent b7f2f1b commit 2f4ae47

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionPostProcessingTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.cloud.stream.function;
1818

1919
import java.util.function.Function;
20+
import java.util.function.Supplier;
2021

2122
import org.junit.jupiter.api.Test;
2223

@@ -29,8 +30,10 @@
2930
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
3031
import org.springframework.context.ConfigurableApplicationContext;
3132
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
3234
import org.springframework.integration.support.MessageBuilder;
3335
import org.springframework.messaging.Message;
36+
import org.springframework.messaging.support.GenericMessage;
3437

3538
import static org.assertj.core.api.Assertions.assertThat;
3639

@@ -76,6 +79,22 @@ void successfulPostProcessingOfSingleFunction() {
7679
}
7780
}
7881

82+
@Test
83+
void successfulPostProcessingOfSupplierFunctionCompposition() throws Exception {
84+
System.clearProperty("spring.cloud.function.definition");
85+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
86+
TestChannelBinderConfiguration.getCompleteConfiguration(SupplierPostProcessingTestConfiguration.class))
87+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
88+
"--spring.cloud.function.definition=hello|uppercase",
89+
"--spring.cloud.stream.bindings.hellouppercase-out-0.producer.poller.fixed-delay=100")) {
90+
Thread.sleep(1000);
91+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
92+
93+
assertThat(outputDestination.receive(5000, "hellouppercase-out-0").getPayload()).isEqualTo("HELLO".getBytes());
94+
assertThat(context.getBean(SupplierPostProcessingTestConfiguration.class).postProcessed).isTrue();
95+
}
96+
}
97+
7998
@Test
8099
void noPostProcessingOnError() {
81100
System.clearProperty("spring.cloud.function.definition");
@@ -207,6 +226,31 @@ public Function<String, String> reverse() {
207226
}
208227
}
209228

229+
@EnableAutoConfiguration
230+
@Configuration
231+
public static class SupplierPostProcessingTestConfiguration {
232+
233+
public static boolean postProcessed;
234+
235+
@Bean
236+
public Supplier<Message<String>> hello() {
237+
return () -> new GenericMessage<>("hello");
238+
}
239+
240+
@Bean
241+
public Function<String, String> uppercase() {
242+
return new PostProcessingFunction<String, String>() {
243+
public String apply(String input) {
244+
return input.toUpperCase();
245+
}
246+
247+
public void postProcess(Message<String> result) {
248+
postProcessed = true;
249+
}
250+
};
251+
}
252+
}
253+
210254
private static class SingleFunctionPostProcessingFunction implements PostProcessingFunction<String, String> {
211255

212256
private boolean success;

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import org.springframework.messaging.MessageHeaders;
111111
import org.springframework.messaging.MessagingException;
112112
import org.springframework.messaging.SubscribableChannel;
113+
import org.springframework.messaging.support.ChannelInterceptor;
113114
import org.springframework.scheduling.TaskScheduler;
114115
import org.springframework.scheduling.Trigger;
115116
import org.springframework.scheduling.support.CronTrigger;
@@ -242,15 +243,22 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
242243
}
243244

244245
if (functionWrapper != null) {
246+
FunctionInvocationWrapper postProcessor = functionWrapper;
245247
IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionWrapper, context, producerProperties),
246248
pollable, context, taskScheduler, producerProperties, outputName)
249+
.intercept(new ChannelInterceptor() {
250+
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
251+
postProcessor.postProcess();
252+
}
253+
})
247254
.route(Message.class, message -> {
248255
if (message.getHeaders().get("spring.cloud.stream.sendto.destination") != null) {
249256
String destinationName = (String) message.getHeaders().get("spring.cloud.stream.sendto.destination");
250257
return streamBridge.resolveDestination(destinationName, producerProperties, null);
251258
}
252259
return outputName;
253-
}).get();
260+
})
261+
.get();
254262
IntegrationFlow postProcessedFlow = (IntegrationFlow) context.getAutowireCapableBeanFactory()
255263
.initializeBean(integrationFlow, integrationFlowName);
256264
context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow);

0 commit comments

Comments
 (0)