1010
1111use super :: proxy:: ProxyConnector ;
1212
13+ use crate :: pool:: conn:: PermittedRecvStream ;
14+ use crate :: pool:: tls:: TlsConnector ;
15+ use crate :: pool:: { self , Pool , TcpConnector } ;
1316use crate :: utils:: ErrorExt ;
1417
1518use bytes:: Bytes ;
1619use futures:: FutureExt ;
17- use futures:: future:: Either ;
20+ use futures:: future:: { self , Either } ;
1821use http:: Version ;
19- use http_body_util:: BodyExt ;
20- use hyper:: body:: Body ;
22+ use http_body_util:: { BodyExt , Either as EitherBody } ;
23+ use hyper:: body:: { Body , Incoming } ;
2124use hyper:: http:: HeaderValue ;
2225use hyper:: http:: uri:: PathAndQuery ;
2326use hyper:: { HeaderMap , Method , Request , Response , Uri } ;
@@ -26,10 +29,14 @@ use hyper_util::client::legacy::connect::HttpConnector;
2629use restate_types:: config:: HttpOptions ;
2730use rustls:: { ClientConfig , KeyLogFile } ;
2831use std:: error:: Error ;
32+ use std:: fmt;
2933use std:: fmt:: Debug ;
30- use std:: future:: Future ;
34+ use std:: num:: NonZeroU32 ;
35+ use std:: pin:: Pin ;
3136use std:: sync:: { Arc , LazyLock } ;
32- use std:: { fmt, future} ;
37+ use std:: task:: { Context , Poll , ready} ;
38+ use std:: time:: Duration ;
39+ use tower:: Layer ;
3340
3441type ProxiedHttpsConnector = ProxyConnector < HttpsConnector < HttpConnector > > ;
3542
@@ -55,7 +62,7 @@ static TLS_CLIENT_CONFIG: LazyLock<ClientConfig> = LazyLock::new(|| {
5562type BoxError = Box < dyn Error + Send + Sync + ' static > ;
5663type BoxBody = http_body_util:: combinators:: BoxBody < Bytes , BoxError > ;
5764
58- #[ derive( Clone , Debug ) ]
65+ #[ derive( Clone ) ]
5966pub struct HttpClient {
6067 /// Client used for HTTPS as long as HTTP1.1 or HTTP2 was not specifically requested.
6168 /// All HTTP versions are possible.
@@ -68,7 +75,7 @@ pub struct HttpClient {
6875 /// Client when HTTP2 was specifically requested - for cleartext, we use h2c,
6976 /// and for HTTPS, we will fail unless the ALPN supports h2.
7077 /// In practice, at discovery time we never force h2 for HTTPS.
71- h2_client : hyper_util :: client :: legacy :: Client < ProxiedHttpsConnector , BoxBody > ,
78+ h2_pool : Pool < ProxyConnector < TlsConnector < TcpConnector > > > ,
7279}
7380
7481impl HttpClient {
@@ -77,11 +84,18 @@ impl HttpClient {
7784 hyper_util:: client:: legacy:: Client :: builder ( hyper_util:: rt:: TokioExecutor :: default ( ) ) ;
7885 builder. timer ( hyper_util:: rt:: TokioTimer :: default ( ) ) ;
7986
87+ let keep_alive_interval: Duration = options. http_keep_alive_options . interval . into ( ) ;
88+ let keep_alive_interval = if keep_alive_interval == Duration :: ZERO {
89+ None
90+ } else {
91+ Some ( keep_alive_interval)
92+ } ;
93+
8094 builder
81- . http2_initial_max_send_streams ( options. initial_max_send_streams )
95+ . http2_initial_max_send_streams ( options. initial_max_send_streams . map ( |v| v as usize ) )
8296 . http2_adaptive_window ( true )
8397 . http2_keep_alive_timeout ( options. http_keep_alive_options . timeout . into ( ) )
84- . http2_keep_alive_interval ( Some ( options . http_keep_alive_options . interval . into ( ) ) ) ;
98+ . http2_keep_alive_interval ( keep_alive_interval ) ;
8599
86100 let mut http_connector = HttpConnector :: new ( ) ;
87101 http_connector. enforce_http ( false ) ;
@@ -101,11 +115,27 @@ impl HttpClient {
101115 . enable_http1 ( )
102116 . wrap_connector ( http_connector. clone ( ) ) ;
103117
104- let https_h2_connector = hyper_rustls:: HttpsConnectorBuilder :: new ( )
105- . with_tls_config ( TLS_CLIENT_CONFIG . clone ( ) )
106- . https_or_http ( )
107- . enable_http2 ( )
108- . wrap_connector ( http_connector. clone ( ) ) ;
118+ let h2_pool = {
119+ let connector = pool:: tls:: TlsConnectorLayer :: new ( TLS_CLIENT_CONFIG . clone ( ) )
120+ . layer ( pool:: TcpConnector :: new ( options. connect_timeout . into ( ) ) ) ;
121+ let connector = ProxyConnector :: new (
122+ options. http_proxy . clone ( ) ,
123+ options. no_proxy . clone ( ) ,
124+ connector,
125+ ) ;
126+
127+ let builder = pool:: PoolBuilder :: default ( )
128+ . max_connections ( options. max_http2_connections )
129+ . keep_alive_interval ( keep_alive_interval)
130+ . keep_alive_timeout ( options. http_keep_alive_options . timeout . into ( ) ) ;
131+
132+ let builder = match options. initial_max_send_streams . and_then ( NonZeroU32 :: new) {
133+ Some ( value) => builder. initial_max_send_streams ( value) ,
134+ None => builder,
135+ } ;
136+
137+ builder. build ( connector)
138+ } ;
109139
110140 HttpClient {
111141 alpn_client : builder. clone ( ) . build :: < _ , BoxBody > ( ProxyConnector :: new (
@@ -118,14 +148,7 @@ impl HttpClient {
118148 options. no_proxy . clone ( ) ,
119149 https_h1_connector,
120150 ) ) ,
121- h2_client : {
122- builder. http2_only ( true ) ;
123- builder. build :: < _ , BoxBody > ( ProxyConnector :: new (
124- options. http_proxy . clone ( ) ,
125- options. no_proxy . clone ( ) ,
126- https_h2_connector,
127- ) )
128- } ,
151+ h2_pool,
129152 }
130153 }
131154
@@ -186,10 +209,10 @@ impl HttpClient {
186209 body : B ,
187210 path : PathAndQuery ,
188211 headers : HeaderMap < HeaderValue > ,
189- ) -> impl Future < Output = Result < Response < hyper :: body :: Incoming > , HttpError > > + Send + ' static
212+ ) -> impl Future < Output = Result < Response < ResponseBody > , HttpError > > + Send + ' static
190213 where
191214 B : Body < Data = Bytes > + Send + Sync + Unpin + Sized + ' static ,
192- < B as Body > :: Error : Error + Send + Sync + ' static ,
215+ B :: Error : std :: error :: Error + Send + Sync + ' static ,
193216 {
194217 let request = match Self :: build_request ( uri, version, body, method, path, headers) {
195218 Ok ( request) => request,
@@ -198,21 +221,98 @@ impl HttpClient {
198221
199222 let fut = match version {
200223 // version is set to http1.1 when use_http1.1 is set
201- Some ( Version :: HTTP_11 ) => self . h1_client . request ( request) ,
224+ Some ( Version :: HTTP_11 ) => ResponseMapper {
225+ fut : self . h1_client . request ( request) ,
226+ }
227+ . left_future ( ) ,
202228 // version is set to http2 for cleartext urls when use_http1.1 is not set
203- Some ( Version :: HTTP_2 ) => self . h2_client . request ( request) ,
229+ Some ( Version :: HTTP_2 ) => ResponseMapper {
230+ fut : self . h2_pool . request ( request) ,
231+ }
232+ . right_future ( ) ,
204233 // version is currently set to none for https urls when use_http1.1 is not set
205- None => self . alpn_client . request ( request) ,
234+ None => ResponseMapper {
235+ fut : self . alpn_client . request ( request) ,
236+ }
237+ . left_future ( ) ,
206238 // nothing currently sets a different version, but the alpn client is a sensible default
207- Some ( _) => self . alpn_client . request ( request) ,
239+ Some ( _) => ResponseMapper {
240+ fut : self . alpn_client . request ( request) ,
241+ }
242+ . left_future ( ) ,
208243 } ;
209244
210- Either :: Left ( async move {
211- match fut. await {
212- Ok ( res) => Ok ( res) ,
213- Err ( err) => Err ( err. into ( ) ) ,
214- }
215- } )
245+ Either :: Left ( fut)
246+ }
247+ }
248+
249+ #[ pin_project:: pin_project]
250+ struct ResponseMapper < F , B , E >
251+ where
252+ F : Future < Output = Result < Response < B > , E > > ,
253+ E : Into < HttpError > ,
254+ B : Into < ResponseBody > ,
255+ {
256+ #[ pin]
257+ fut : F ,
258+ }
259+
260+ impl < F , B , E > Future for ResponseMapper < F , B , E >
261+ where
262+ F : Future < Output = Result < Response < B > , E > > ,
263+ E : Into < HttpError > ,
264+ B : Into < ResponseBody > ,
265+ {
266+ type Output = Result < Response < ResponseBody > , HttpError > ;
267+
268+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
269+ let result = ready ! ( self . project( ) . fut. poll( cx) )
270+ . map_err ( Into :: into)
271+ . map ( |response| response. map ( Into :: into) ) ;
272+
273+ Poll :: Ready ( result)
274+ }
275+ }
276+
277+ /// A wrapper around [`http_body_util::Either`] to hide
278+ /// type complexity for higher layer
279+ #[ pin_project:: pin_project]
280+ pub struct ResponseBody {
281+ #[ pin]
282+ inner : EitherBody < Incoming , PermittedRecvStream > ,
283+ }
284+
285+ impl From < Incoming > for ResponseBody {
286+ fn from ( value : Incoming ) -> Self {
287+ Self {
288+ inner : EitherBody :: Left ( value) ,
289+ }
290+ }
291+ }
292+
293+ impl From < PermittedRecvStream > for ResponseBody {
294+ fn from ( value : PermittedRecvStream ) -> Self {
295+ Self {
296+ inner : EitherBody :: Right ( value) ,
297+ }
298+ }
299+ }
300+
301+ impl Body for ResponseBody {
302+ type Data = Bytes ;
303+ type Error = Box < dyn std:: error:: Error + Send + Sync > ;
304+
305+ fn is_end_stream ( & self ) -> bool {
306+ self . inner . is_end_stream ( )
307+ }
308+ fn poll_frame (
309+ self : std:: pin:: Pin < & mut Self > ,
310+ cx : & mut std:: task:: Context < ' _ > ,
311+ ) -> std:: task:: Poll < Option < Result < http_body:: Frame < Self :: Data > , Self :: Error > > > {
312+ self . project ( ) . inner . poll_frame ( cx)
313+ }
314+ fn size_hint ( & self ) -> http_body:: SizeHint {
315+ self . inner . size_hint ( )
216316 }
217317}
218318
@@ -228,6 +328,8 @@ pub enum HttpError {
228328 Connect ( #[ source] hyper_util:: client:: legacy:: Error ) ,
229329 #[ error( "{}" , FormatHyperError ( . 0 ) ) ]
230330 Hyper ( #[ source] hyper_util:: client:: legacy:: Error ) ,
331+ #[ error( "h2 pool connection error: {0}" ) ]
332+ PoolError ( #[ from] pool:: ConnectionError ) ,
231333}
232334
233335impl HttpError {
@@ -240,6 +342,7 @@ impl HttpError {
240342 HttpError :: PossibleHTTP11Only ( _) => false ,
241343 HttpError :: PossibleHTTP2Only ( _) => false ,
242344 HttpError :: Connect ( _) => true ,
345+ HttpError :: PoolError ( _) => true ,
243346 }
244347 }
245348
0 commit comments