2020import java .io .UncheckedIOException ;
2121import java .net .URI ;
2222import java .time .Duration ;
23- import java .util .Objects ;
2423import java .util .concurrent .Executor ;
25- import java .util .concurrent .atomic .AtomicReference ;
2624
2725import io .netty .buffer .ByteBuf ;
2826import io .netty .buffer .ByteBufAllocator ;
2927import org .jspecify .annotations .Nullable ;
3028import org .reactivestreams .FlowAdapters ;
3129import org .reactivestreams .Publisher ;
3230import reactor .core .publisher .Mono ;
31+ import reactor .core .scheduler .Schedulers ;
3332import reactor .netty .NettyOutbound ;
3433import reactor .netty .http .client .HttpClient ;
3534import reactor .netty .http .client .HttpClientRequest ;
4443 *
4544 * @author Arjen Poutsma
4645 * @author Juergen Hoeller
46+ * @author Brian Clozel
4747 * @since 6.1
4848 */
4949final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest {
@@ -54,6 +54,8 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
5454
5555 private final URI uri ;
5656
57+ private final Executor executor ;
58+
5759 private final @ Nullable Duration exchangeTimeout ;
5860
5961
@@ -65,19 +67,31 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
6567 * @since 6.2
6668 */
6769 public ReactorClientHttpRequest (HttpClient httpClient , HttpMethod method , URI uri ) {
68- this .httpClient = httpClient ;
69- this .method = method ;
70- this .uri = uri ;
71- this .exchangeTimeout = null ;
70+ this (httpClient , method , uri , null );
71+ }
72+
73+ /**
74+ * Create an instance.
75+ * <p>If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler}
76+ * for performing blocking I/O operations.
77+ * @param httpClient the client to perform the request with
78+ * @param executor the executor to use
79+ * @param method the HTTP method
80+ * @param uri the URI for the request
81+ * @since 6.2.13
82+ */
83+ public ReactorClientHttpRequest (HttpClient httpClient , HttpMethod method , URI uri , @ Nullable Executor executor ) {
84+ this (httpClient , method , uri , executor , null );
7285 }
7386
7487 /**
7588 * Package private constructor for use until exchangeTimeout is removed.
7689 */
77- ReactorClientHttpRequest (HttpClient httpClient , HttpMethod method , URI uri , @ Nullable Duration exchangeTimeout ) {
90+ ReactorClientHttpRequest (HttpClient httpClient , HttpMethod method , URI uri , @ Nullable Executor executor , @ Nullable Duration exchangeTimeout ) {
7891 this .httpClient = httpClient ;
7992 this .method = method ;
8093 this .uri = uri ;
94+ this .executor = (executor != null ) ? executor : Schedulers .boundedElastic ()::schedule ;
8195 this .exchangeTimeout = exchangeTimeout ;
8296 }
8397
@@ -132,13 +146,10 @@ private Publisher<Void> send(
132146 return Mono .empty ();
133147 }
134148
135- AtomicReference <@ Nullable Executor > executorRef = new AtomicReference <>();
136-
137149 return outbound
138- .withConnection (connection -> executorRef .set (connection .channel ().eventLoop ()))
139150 .send (FlowAdapters .toPublisher (new OutputStreamPublisher <>(
140151 os -> body .writeTo (StreamUtils .nonClosing (os )), new ByteBufMapper (outbound ),
141- Objects . requireNonNull ( executorRef . getAndSet ( null )) , null )));
152+ this . executor , null )));
142153 }
143154
144155 static IOException convertException (RuntimeException ex ) {
0 commit comments