Skip to content

Commit 2ed6d3e

Browse files
committed
GH-9538: Rely on the customizeMonoReply() for thread switching
Fixes: #9538 Some applications might not be satisfied with `.publishOn(Schedulers.boundedElastic())` used by default for `Mono` replies. * Remove that `.publishOn(Schedulers.boundedElastic())` from the `AbstractMessageProducingHandler`. Instead, the target project is free to make a choice via `customizeMonoReply()`, e.g.: ``` .handle(RSockets.outboundGateway("/lowercase") .clientRSocketConnector(this.clientRSocketConnector), endpoint -> endpoint.customizeMonoReply((message, mono) -> mono.publishOn(Schedulers.boundedElastic()))) ```
1 parent 8b1f828 commit 2ed6d3e

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import reactor.core.Exceptions;
3434
import reactor.core.publisher.Flux;
3535
import reactor.core.publisher.Mono;
36-
import reactor.core.scheduler.Schedulers;
3736

3837
import org.springframework.beans.factory.BeanCreationException;
3938
import org.springframework.beans.factory.BeanFactory;
@@ -386,7 +385,6 @@ private static CompletableFuture<?> toFutureReply(Object reply, @Nullable Reacti
386385
CompletableFuture<Object> replyFuture = new CompletableFuture<>();
387386

388387
reactiveReply
389-
.publishOn(Schedulers.boundedElastic())
390388
/*
391389
The MonoToCompletableFuture in Project Reactor does not support context propagation,
392390
and it does not suppose to, since there is no guarantee how this Future is going to

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.junit.jupiter.api.Test;
2323
import reactor.core.publisher.Flux;
24+
import reactor.core.scheduler.Schedulers;
2425
import reactor.test.StepVerifier;
2526

2627
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,7 +43,6 @@
4243

4344
/**
4445
* @author Artem Bilan
45-
*
4646
* @since 5.2
4747
*/
4848
@SpringJUnitConfig
@@ -86,7 +86,9 @@ void testNoBlockingForReactiveThreads() {
8686
IntegrationFlow flow =
8787
f -> f
8888
.handle(RSockets.outboundGateway("/lowercase")
89-
.clientRSocketConnector(this.clientRSocketConnector))
89+
.clientRSocketConnector(this.clientRSocketConnector),
90+
endpoint -> endpoint.customizeMonoReply((message, mono) ->
91+
mono.publishOn(Schedulers.boundedElastic())))
9092
.transform("{ firstResult: payload }")
9193
.enrich(e -> e
9294
.requestPayloadExpression("payload.firstResult")

0 commit comments

Comments
 (0)