Skip to content

Commit 1178d46

Browse files
artembilangaryrussell
authored andcommitted
Fix reply producing to not block reactive thread
When `DirectChannel` is used for reply producing, the data is handled on the same thread which has produced it (normally), so if we have a request-reply afterwards (e.g. `gateway()`), this thread is blocked waiting for reply. When the thread is assumed to be non-blocked (e.g. Netty event loop), the request-reply withing such a thread for the same non-blocking client causes a deadlock: the thread waits for reply, but at the same time it supposes to fulfil a synchronization barrier with that reply * Fix `AbstractMessageProducingHandler.asyncNonReactiveReply()` to use a `publishOn(Schedulers.boundedElastic())` for reply `Mono` to free producing thread from potential downstream blocking * Demonstrate deadlock with a new test in the `RSocketDslTests`; the original report was against WebFlux, but conditions are really the same: `reactor-netty` is used as a low-level client **Cherry-pick to `5.4.x`**
1 parent 8ff7ad7 commit 1178d46

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
import reactor.core.publisher.Flux;
5757
import reactor.core.publisher.Mono;
58+
import reactor.core.scheduler.Schedulers;
5859

5960
/**
6061
* The base {@link AbstractMessageHandler} implementation for the {@link MessageProducer}.
@@ -362,7 +363,9 @@ private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, @Nul
362363
else {
363364
reactiveReply = Mono.from((Publisher<?>) reply);
364365
}
365-
reactiveReply.subscribe(settableListenableFuture::set, settableListenableFuture::setException);
366+
reactiveReply
367+
.publishOn(Schedulers.boundedElastic())
368+
.subscribe(settableListenableFuture::set, settableListenableFuture::setException);
366369
future = settableListenableFuture;
367370
}
368371
future.addCallback(new ReplyFutureCallback(requestMessage, replyChannel));

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.rsocket.dsl;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
1921
import java.time.Duration;
2022
import java.util.function.Function;
2123

@@ -28,6 +30,7 @@
2830
import org.springframework.integration.config.EnableIntegration;
2931
import org.springframework.integration.dsl.IntegrationFlow;
3032
import org.springframework.integration.dsl.IntegrationFlows;
33+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
3134
import org.springframework.integration.rsocket.ClientRSocketConnector;
3235
import org.springframework.integration.rsocket.RSocketInteractionModel;
3336
import org.springframework.integration.rsocket.ServerRSocketConnector;
@@ -74,6 +77,39 @@ void testRsocketUpperCaseWholeFlows() {
7477
.verifyComplete();
7578
}
7679

80+
@Autowired
81+
IntegrationFlowContext integrationFlowContext;
82+
83+
@Autowired
84+
ClientRSocketConnector clientRSocketConnector;
85+
86+
@Test
87+
void testNoBlockingForReactiveThreads() {
88+
IntegrationFlow flow =
89+
f -> f
90+
.handle(RSockets.outboundGateway("/lowercase")
91+
.clientRSocketConnector(this.clientRSocketConnector))
92+
.transform("{ firstResult: payload }")
93+
.enrich(e -> e
94+
.requestPayloadExpression("payload.firstResult")
95+
.requestSubFlow(
96+
sf -> sf
97+
.handle(RSockets.outboundGateway("/lowercase")
98+
.clientRSocketConnector(this.clientRSocketConnector)))
99+
.propertyExpression("secondResult", "payload"))
100+
.transform("payload.values().toString()");
101+
102+
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
103+
this.integrationFlowContext.registration(flow).register();
104+
105+
String result = flowRegistration.getMessagingTemplate().convertSendAndReceive("TEST", String.class);
106+
107+
assertThat(result).isEqualTo("[test, test]");
108+
109+
flowRegistration.destroy();
110+
}
111+
112+
77113
@Configuration
78114
@EnableIntegration
79115
public static class TestConfiguration {
@@ -96,7 +132,7 @@ public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector client
96132
return IntegrationFlows
97133
.from(Function.class)
98134
.handle(RSockets.outboundGateway(message ->
99-
message.getHeaders().getOrDefault("route", "/uppercase"))
135+
message.getHeaders().getOrDefault("route", "/uppercase"))
100136
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
101137
.expectedResponseType("T(java.lang.String)")
102138
.clientRSocketConnector(clientRSocketConnector),
@@ -126,6 +162,14 @@ public IntegrationFlow rsocketUpperCaseWholeFlow() {
126162
.get();
127163
}
128164

165+
@Bean
166+
public IntegrationFlow rsocketLowerCaseFlow() {
167+
return IntegrationFlows
168+
.from(RSockets.inboundGateway("/lowercase"))
169+
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toLowerCase))
170+
.get();
171+
}
172+
129173
}
130174

131175
}

0 commit comments

Comments
 (0)