Skip to content

Commit 219707e

Browse files
committed
GH-1302 Ensure reactive/imperative composition is not subscribed twise
Resolves #1302
1 parent 13c5c04 commit 219707e

File tree

3 files changed

+25
-6
lines changed

3 files changed

+25
-6
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,7 +741,7 @@ public String toString() {
741741
* Returns true if this function wrapper represents a composed function.
742742
* @return true if this function wrapper represents a composed function otherwise false
743743
*/
744-
boolean isComposed() {
744+
public boolean isComposed() {
745745
return this.composed;
746746
}
747747

spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public static Publisher<?> processRequest(FunctionWrapper wrapper, Object argume
132132

133133
Object result = function.apply(inputMessage);
134134
if (function.isConsumer()) {
135-
if (result instanceof Publisher) {
135+
if (result instanceof Publisher && !function.isComposed()) {
136136
Mono.from((Publisher) result).subscribe();
137137
}
138138
return "DELETE".equals(wrapper.getMethod()) ?

spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,6 @@ public void headers() throws Exception {
182182
ResponseEntity<String> result = this.rest.exchange(RequestEntity
183183
.post(new URI("/headers")).contentType(MediaType.APPLICATION_JSON)
184184
.body("[\"foo\",\"bar\"]"), String.class);
185-
// assertThat(result.getHeaders().getFirst("foo")).isEqualTo("bar");
186-
// assertThat(result.getHeaders()).doesNotContainKey("id");
187185
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
188186
}
189187

@@ -377,7 +375,6 @@ public void multipart() throws Exception {
377375

378376
@Test
379377
@DirtiesContext
380-
@Disabled
381378
public void count() throws Exception {
382379
List<String> list = Arrays.asList("A", "B", "A");
383380
assertThat(this.rest.exchange(
@@ -388,7 +385,6 @@ public void count() throws Exception {
388385

389386
@Test
390387
@DirtiesContext
391-
@Disabled
392388
public void fluxWithList() throws Exception {
393389
List<String> list = Arrays.asList("A", "B", "A");
394390
assertThat(this.rest.exchange(
@@ -397,6 +393,14 @@ public void fluxWithList() throws Exception {
397393
String.class).getBody()).isEqualTo("[\"A\",\"B\",\"A\"]");
398394
}
399395

396+
@Test
397+
@DirtiesContext
398+
public void testReactiveFunctionComposdWithImperativeConsumer() throws Exception {
399+
RequestEntity entity = RequestEntity.post(new URI("/functionReactive,consumerImperative")).build();
400+
this.rest.exchange(entity, String.class);
401+
assertThat(ApplicationConfiguration.functionReactiveInvocations).isEqualTo(1);
402+
}
403+
400404
private String sse(String... values) {
401405
return "[\"" + StringUtils.arrayToDelimitedString(values, "\",\"") + "\"]";
402406
}
@@ -407,11 +411,26 @@ public static class ApplicationConfiguration {
407411

408412
private List<String> list = new ArrayList<>();
409413

414+
private static int functionReactiveInvocations;
415+
410416
public static void main(String[] args) throws Exception {
411417
SpringApplication.run(HttpPostIntegrationTests.ApplicationConfiguration.class,
412418
args);
413419
}
414420

421+
@Bean
422+
public Function<Flux<String>, Flux<String>> functionReactive() {
423+
functionReactiveInvocations = 0;
424+
return flux -> flux.doOnNext(x -> functionReactiveInvocations++);
425+
}
426+
427+
@Bean
428+
public Consumer<String> consumerImperative() {
429+
return value -> {
430+
System.out.println(value);
431+
};
432+
}
433+
415434
@Bean({ "uppercase", "transform", "post/more" })
416435
public Function<Flux<String>, Flux<String>> uppercase() {
417436
return flux -> flux.log()

0 commit comments

Comments
 (0)