Skip to content

Commit ba39385

Browse files
committed
Use executor for blocking I/O in Reactor request factory
Prior to this commit, the `ReactorClientHttpRequestFactory` and the `ReactorClientHttpRequest` would use the `Executor` from the current event loop for performing write operations. Depending on I/O demand, this work could be blocked and would result in blocked Netty event loop executors and the HTTP client hanging. This commit ensures that the client uses a separate Executor for such operations. If the application does not provide one on the request factory, a `Schedulers#boundedElastic` instance will be used. Fixes spring-projectsgh-34707
1 parent e735c2d commit ba39385

File tree

2 files changed

+40
-16
lines changed

2 files changed

+40
-16
lines changed

spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import java.net.URI;
2222
import java.time.Duration;
2323
import java.util.concurrent.Executor;
24-
import java.util.concurrent.atomic.AtomicReference;
2524

2625
import io.netty.buffer.ByteBuf;
2726
import io.netty.buffer.ByteBufAllocator;
2827
import org.reactivestreams.FlowAdapters;
2928
import org.reactivestreams.Publisher;
3029
import reactor.core.publisher.Mono;
30+
import reactor.core.scheduler.Schedulers;
3131
import reactor.netty.NettyOutbound;
3232
import reactor.netty.http.client.HttpClient;
3333
import reactor.netty.http.client.HttpClientRequest;
@@ -43,6 +43,7 @@
4343
*
4444
* @author Arjen Poutsma
4545
* @author Juergen Hoeller
46+
* @author Brian Clozel
4647
* @since 6.1
4748
*/
4849
final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest {
@@ -53,6 +54,8 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
5354

5455
private final URI uri;
5556

57+
private final Executor executor;
58+
5659
@Nullable
5760
private final Duration exchangeTimeout;
5861

@@ -65,19 +68,31 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
6568
* @since 6.2
6669
*/
6770
public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri) {
68-
this.httpClient = httpClient;
69-
this.method = method;
70-
this.uri = uri;
71-
this.exchangeTimeout = null;
71+
this(httpClient, method, uri, null);
72+
}
73+
74+
/**
75+
* Create an instance.
76+
* <p>If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler}
77+
* for performing blocking I/O operations.
78+
* @param httpClient the client to perform the request with
79+
* @param executor the executor to use
80+
* @param method the HTTP method
81+
* @param uri the URI for the request
82+
* @since 6.2.13
83+
*/
84+
public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor) {
85+
this(httpClient, method, uri, executor, null);
7286
}
7387

7488
/**
7589
* Package private constructor for use until exchangeTimeout is removed.
7690
*/
77-
ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Duration exchangeTimeout) {
91+
ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor, @Nullable Duration exchangeTimeout) {
7892
this.httpClient = httpClient;
7993
this.method = method;
8094
this.uri = uri;
95+
this.executor = (executor != null) ? executor : Schedulers.boundedElastic()::schedule;
8196
this.exchangeTimeout = exchangeTimeout;
8297
}
8398

@@ -92,11 +107,7 @@ public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI ur
92107
public ReactorClientHttpRequest(
93108
HttpClient httpClient, URI uri, HttpMethod method,
94109
@Nullable Duration exchangeTimeout, @Nullable Duration readTimeout) {
95-
96-
this.httpClient = httpClient;
97-
this.method = method;
98-
this.uri = uri;
99-
this.exchangeTimeout = exchangeTimeout;
110+
this(httpClient, method, uri, null, exchangeTimeout);
100111
}
101112

102113

@@ -150,13 +161,10 @@ private Publisher<Void> send(
150161
return Mono.empty();
151162
}
152163

153-
AtomicReference<Executor> executorRef = new AtomicReference<>();
154-
155164
return outbound
156-
.withConnection(connection -> executorRef.set(connection.channel().eventLoop()))
157165
.send(FlowAdapters.toPublisher(new OutputStreamPublisher<>(
158166
os -> body.writeTo(StreamUtils.nonClosing(os)), new ByteBufMapper(outbound),
159-
executorRef.getAndSet(null), null)));
167+
this.executor, null)));
160168
}
161169

162170
static IOException convertException(RuntimeException ex) {

spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import java.io.IOException;
2020
import java.net.URI;
2121
import java.time.Duration;
22+
import java.util.concurrent.Executor;
2223
import java.util.function.Function;
2324

2425
import io.netty.channel.ChannelOption;
2526
import org.apache.commons.logging.Log;
2627
import org.apache.commons.logging.LogFactory;
28+
import reactor.core.scheduler.Schedulers;
2729
import reactor.netty.http.client.HttpClient;
2830
import reactor.netty.resources.ConnectionProvider;
2931
import reactor.netty.resources.LoopResources;
@@ -42,6 +44,7 @@
4244
* @author Arjen Poutsma
4345
* @author Juergen Hoeller
4446
* @author Sebastien Deleuze
47+
* @author Brian Clozel
4548
* @since 6.2
4649
*/
4750
public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory, SmartLifecycle {
@@ -58,6 +61,9 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
5861
@Nullable
5962
private final Function<HttpClient, HttpClient> mapper;
6063

64+
@Nullable
65+
private Executor executor;
66+
6167
@Nullable
6268
private Integer connectTimeout;
6369

@@ -129,6 +135,16 @@ private HttpClient createHttpClient(ReactorResourceFactory factory, Function<Htt
129135
return client;
130136
}
131137

138+
/**
139+
* Set the {@code Executor} to use for performing blocking I/O operations.
140+
* <p>If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler}.
141+
* @param executor the executor to use.
142+
* @since 6.2.13
143+
*/
144+
public void setExecutor(Executor executor) {
145+
Assert.notNull(executor, "Executor must not be null");
146+
this.executor = executor;
147+
}
132148

133149
/**
134150
* Set the connect timeout value on the underlying client.
@@ -219,7 +235,7 @@ public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IO
219235
"Expected HttpClient or ResourceFactory and mapper");
220236
client = createHttpClient(this.resourceFactory, this.mapper);
221237
}
222-
return new ReactorClientHttpRequest(client, httpMethod, uri, this.exchangeTimeout);
238+
return new ReactorClientHttpRequest(client, httpMethod, uri, this.executor, this.exchangeTimeout);
223239
}
224240

225241

0 commit comments

Comments
 (0)