Skip to content

Commit 2e8b9fc

Browse files
committed
[Service-Client] H2 Pool integation and usage in service client
- This PR finally enable using of the new H2 pool in the http service client.
1 parent 85c7ae2 commit 2e8b9fc

File tree

5 files changed

+162
-45
lines changed

5 files changed

+162
-45
lines changed

crates/invoker-impl/src/invocation_task/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,8 @@ impl ResponseStream {
496496
// This task::spawn won't be required by hyper 1.0, as the connection will be driven by a task
497497
// spawned somewhere else (perhaps in the connection pool).
498498
// See: https://github.com/restatedev/restate/issues/96 and https://github.com/restatedev/restate/issues/76
499+
500+
//todo: this is a temp clone to test
499501
Self::WaitingHeaders {
500502
join_handle: AbortOnDropHandle::new(tokio::task::spawn(client.call(req))),
501503
}

crates/service-client/src/http.rs

Lines changed: 137 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@
1010

1111
use super::proxy::ProxyConnector;
1212

13+
use crate::pool::conn::PermittedRecvStream;
14+
use crate::pool::tls::TlsConnector;
15+
use crate::pool::{self, Pool, TcpConnector};
1316
use crate::utils::ErrorExt;
1417

1518
use bytes::Bytes;
1619
use futures::FutureExt;
17-
use futures::future::Either;
20+
use futures::future::{self, Either};
1821
use http::Version;
19-
use http_body_util::BodyExt;
20-
use hyper::body::Body;
22+
use http_body_util::{BodyExt, Either as EitherBody};
23+
use hyper::body::{Body, Incoming};
2124
use hyper::http::HeaderValue;
2225
use hyper::http::uri::PathAndQuery;
2326
use hyper::{HeaderMap, Method, Request, Response, Uri};
@@ -26,10 +29,14 @@ use hyper_util::client::legacy::connect::HttpConnector;
2629
use restate_types::config::HttpOptions;
2730
use rustls::{ClientConfig, KeyLogFile};
2831
use std::error::Error;
32+
use std::fmt;
2933
use std::fmt::Debug;
30-
use std::future::Future;
34+
use std::num::NonZeroU32;
35+
use std::pin::Pin;
3136
use std::sync::{Arc, LazyLock};
32-
use std::{fmt, future};
37+
use std::task::{Context, Poll, ready};
38+
use std::time::Duration;
39+
use tower::Layer;
3340

3441
type ProxiedHttpsConnector = ProxyConnector<HttpsConnector<HttpConnector>>;
3542

@@ -55,7 +62,7 @@ static TLS_CLIENT_CONFIG: LazyLock<ClientConfig> = LazyLock::new(|| {
5562
type BoxError = Box<dyn Error + Send + Sync + 'static>;
5663
type BoxBody = http_body_util::combinators::BoxBody<Bytes, BoxError>;
5764

58-
#[derive(Clone, Debug)]
65+
#[derive(Clone)]
5966
pub struct HttpClient {
6067
/// Client used for HTTPS as long as HTTP1.1 or HTTP2 was not specifically requested.
6168
/// All HTTP versions are possible.
@@ -68,7 +75,7 @@ pub struct HttpClient {
6875
/// Client when HTTP2 was specifically requested - for cleartext, we use h2c,
6976
/// and for HTTPS, we will fail unless the ALPN supports h2.
7077
/// In practice, at discovery time we never force h2 for HTTPS.
71-
h2_client: hyper_util::client::legacy::Client<ProxiedHttpsConnector, BoxBody>,
78+
h2_pool: Pool<ProxyConnector<TlsConnector<TcpConnector>>>,
7279
}
7380

7481
impl HttpClient {
@@ -77,11 +84,18 @@ impl HttpClient {
7784
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default());
7885
builder.timer(hyper_util::rt::TokioTimer::default());
7986

87+
let keep_alive_interval: Duration = options.http_keep_alive_options.interval.into();
88+
let keep_alive_interval = if keep_alive_interval == Duration::ZERO {
89+
None
90+
} else {
91+
Some(keep_alive_interval)
92+
};
93+
8094
builder
81-
.http2_initial_max_send_streams(options.initial_max_send_streams)
95+
.http2_initial_max_send_streams(options.initial_max_send_streams.map(|v| v as usize))
8296
.http2_adaptive_window(true)
8397
.http2_keep_alive_timeout(options.http_keep_alive_options.timeout.into())
84-
.http2_keep_alive_interval(Some(options.http_keep_alive_options.interval.into()));
98+
.http2_keep_alive_interval(keep_alive_interval);
8599

86100
let mut http_connector = HttpConnector::new();
87101
http_connector.enforce_http(false);
@@ -101,11 +115,27 @@ impl HttpClient {
101115
.enable_http1()
102116
.wrap_connector(http_connector.clone());
103117

104-
let https_h2_connector = hyper_rustls::HttpsConnectorBuilder::new()
105-
.with_tls_config(TLS_CLIENT_CONFIG.clone())
106-
.https_or_http()
107-
.enable_http2()
108-
.wrap_connector(http_connector.clone());
118+
let h2_pool = {
119+
let connector = pool::tls::TlsConnectorLayer::new(TLS_CLIENT_CONFIG.clone())
120+
.layer(pool::TcpConnector::new(options.connect_timeout.into()));
121+
let connector = ProxyConnector::new(
122+
options.http_proxy.clone(),
123+
options.no_proxy.clone(),
124+
connector,
125+
);
126+
127+
let builder = pool::PoolBuilder::default()
128+
.max_connections(options.max_http2_connections)
129+
.keep_alive_interval(keep_alive_interval)
130+
.keep_alive_timeout(options.http_keep_alive_options.timeout.into());
131+
132+
let builder = match options.initial_max_send_streams.and_then(NonZeroU32::new) {
133+
Some(value) => builder.initial_max_send_streams(value),
134+
None => builder,
135+
};
136+
137+
builder.build(connector)
138+
};
109139

110140
HttpClient {
111141
alpn_client: builder.clone().build::<_, BoxBody>(ProxyConnector::new(
@@ -118,14 +148,7 @@ impl HttpClient {
118148
options.no_proxy.clone(),
119149
https_h1_connector,
120150
)),
121-
h2_client: {
122-
builder.http2_only(true);
123-
builder.build::<_, BoxBody>(ProxyConnector::new(
124-
options.http_proxy.clone(),
125-
options.no_proxy.clone(),
126-
https_h2_connector,
127-
))
128-
},
151+
h2_pool,
129152
}
130153
}
131154

@@ -186,10 +209,10 @@ impl HttpClient {
186209
body: B,
187210
path: PathAndQuery,
188211
headers: HeaderMap<HeaderValue>,
189-
) -> impl Future<Output = Result<Response<hyper::body::Incoming>, HttpError>> + Send + 'static
212+
) -> impl Future<Output = Result<Response<ResponseBody>, HttpError>> + Send + 'static
190213
where
191214
B: Body<Data = Bytes> + Send + Sync + Unpin + Sized + 'static,
192-
<B as Body>::Error: Error + Send + Sync + 'static,
215+
B::Error: std::error::Error + Send + Sync + 'static,
193216
{
194217
let request = match Self::build_request(uri, version, body, method, path, headers) {
195218
Ok(request) => request,
@@ -198,21 +221,98 @@ impl HttpClient {
198221

199222
let fut = match version {
200223
// version is set to http1.1 when use_http1.1 is set
201-
Some(Version::HTTP_11) => self.h1_client.request(request),
224+
Some(Version::HTTP_11) => ResponseMapper {
225+
fut: self.h1_client.request(request),
226+
}
227+
.left_future(),
202228
// version is set to http2 for cleartext urls when use_http1.1 is not set
203-
Some(Version::HTTP_2) => self.h2_client.request(request),
229+
Some(Version::HTTP_2) => ResponseMapper {
230+
fut: self.h2_pool.request(request),
231+
}
232+
.right_future(),
204233
// version is currently set to none for https urls when use_http1.1 is not set
205-
None => self.alpn_client.request(request),
234+
None => ResponseMapper {
235+
fut: self.alpn_client.request(request),
236+
}
237+
.left_future(),
206238
// nothing currently sets a different version, but the alpn client is a sensible default
207-
Some(_) => self.alpn_client.request(request),
239+
Some(_) => ResponseMapper {
240+
fut: self.alpn_client.request(request),
241+
}
242+
.left_future(),
208243
};
209244

210-
Either::Left(async move {
211-
match fut.await {
212-
Ok(res) => Ok(res),
213-
Err(err) => Err(err.into()),
214-
}
215-
})
245+
Either::Left(fut)
246+
}
247+
}
248+
249+
#[pin_project::pin_project]
250+
struct ResponseMapper<F, B, E>
251+
where
252+
F: Future<Output = Result<Response<B>, E>>,
253+
E: Into<HttpError>,
254+
B: Into<ResponseBody>,
255+
{
256+
#[pin]
257+
fut: F,
258+
}
259+
260+
impl<F, B, E> Future for ResponseMapper<F, B, E>
261+
where
262+
F: Future<Output = Result<Response<B>, E>>,
263+
E: Into<HttpError>,
264+
B: Into<ResponseBody>,
265+
{
266+
type Output = Result<Response<ResponseBody>, HttpError>;
267+
268+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
269+
let result = ready!(self.project().fut.poll(cx))
270+
.map_err(Into::into)
271+
.map(|response| response.map(Into::into));
272+
273+
Poll::Ready(result)
274+
}
275+
}
276+
277+
/// A wrapper around [`http_body_util::Either`] to hide
278+
/// type complexity for higher layer
279+
#[pin_project::pin_project]
280+
pub struct ResponseBody {
281+
#[pin]
282+
inner: EitherBody<Incoming, PermittedRecvStream>,
283+
}
284+
285+
impl From<Incoming> for ResponseBody {
286+
fn from(value: Incoming) -> Self {
287+
Self {
288+
inner: EitherBody::Left(value),
289+
}
290+
}
291+
}
292+
293+
impl From<PermittedRecvStream> for ResponseBody {
294+
fn from(value: PermittedRecvStream) -> Self {
295+
Self {
296+
inner: EitherBody::Right(value),
297+
}
298+
}
299+
}
300+
301+
impl Body for ResponseBody {
302+
type Data = Bytes;
303+
type Error = Box<dyn std::error::Error + Send + Sync>;
304+
305+
fn is_end_stream(&self) -> bool {
306+
self.inner.is_end_stream()
307+
}
308+
fn poll_frame(
309+
self: std::pin::Pin<&mut Self>,
310+
cx: &mut std::task::Context<'_>,
311+
) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
312+
self.project().inner.poll_frame(cx)
313+
}
314+
fn size_hint(&self) -> http_body::SizeHint {
315+
self.inner.size_hint()
216316
}
217317
}
218318

@@ -228,6 +328,8 @@ pub enum HttpError {
228328
Connect(#[source] hyper_util::client::legacy::Error),
229329
#[error("{}", FormatHyperError(.0))]
230330
Hyper(#[source] hyper_util::client::legacy::Error),
331+
#[error("h2 pool connection error: {0}")]
332+
PoolError(#[from] pool::ConnectionError),
231333
}
232334

233335
impl HttpError {
@@ -240,6 +342,7 @@ impl HttpError {
240342
HttpError::PossibleHTTP11Only(_) => false,
241343
HttpError::PossibleHTTP2Only(_) => false,
242344
HttpError::Connect(_) => true,
345+
HttpError::PoolError(_) => true,
243346
}
244347
}
245348

crates/service-client/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use arc_swap::ArcSwapOption;
1919
use bytes::Bytes;
2020
use bytestring::ByteString;
2121
use core::fmt;
22-
use futures::FutureExt;
22+
use futures::{FutureExt, future};
2323
use http_body_util::Full;
2424
use hyper::body::Body;
2525
use hyper::http::uri::PathAndQuery;
@@ -30,8 +30,6 @@ use restate_types::schema::deployment::EndpointLambdaCompression;
3030
use std::collections::HashMap;
3131
use std::error::Error;
3232
use std::fmt::Formatter;
33-
use std::future;
34-
use std::future::Future;
3533
use std::sync::Arc;
3634

3735
mod http;
@@ -41,9 +39,9 @@ mod proxy;
4139
mod request_identity;
4240
mod utils;
4341

44-
pub type ResponseBody = http_body_util::Either<hyper::body::Incoming, Full<Bytes>>;
42+
pub type ResponseBody = http_body_util::Either<http::ResponseBody, Full<Bytes>>;
4543

46-
#[derive(Debug, Clone)]
44+
#[derive(Clone)]
4745
pub struct ServiceClient {
4846
// TODO a single client uses the pooling provided by hyper, but this is not enough.
4947
// See https://github.com/restatedev/restate/issues/76 for more background on the topic.

crates/service-protocol/src/discovery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl DiscoveryError {
173173
}
174174
}
175175

176-
#[derive(Debug, Clone)]
176+
#[derive(Clone)]
177177
pub struct ServiceDiscovery {
178178
retry_policy: RetryPolicy,
179179
client: ServiceClient,

crates/types/src/config/http.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::num::NonZeroUsize;
12+
1113
use serde::{Deserialize, Serialize};
1214
use serde_with::serde_as;
1315

14-
use restate_time_util::NonZeroFriendlyDuration;
16+
use restate_time_util::{FriendlyDuration, NonZeroFriendlyDuration};
1517

1618
/// # HTTP client options
1719
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
@@ -23,7 +25,6 @@ pub struct HttpOptions {
2325
/// # HTTP/2 Keep-alive
2426
///
2527
/// Configuration for the HTTP/2 keep-alive mechanism, using PING frames.
26-
/// If unset, HTTP/2 keep-alive are disabled.
2728
pub http_keep_alive_options: Http2KeepAliveOptions,
2829
/// # Proxy URI
2930
///
@@ -59,7 +60,15 @@ pub struct HttpOptions {
5960
///
6061
/// **NOTE**: Setting this value to None (default) users the default
6162
/// recommended value from HTTP2 specs
62-
pub initial_max_send_streams: Option<usize>,
63+
pub initial_max_send_streams: Option<u32>,
64+
65+
/// # Max HTTP2 Connections
66+
///
67+
/// Sets the maximum number of open HTTP/2 connections per
68+
/// client for a single host.
69+
///
70+
/// Default: 20
71+
pub max_http2_connections: NonZeroUsize,
6372
}
6473

6574
impl Default for HttpOptions {
@@ -70,6 +79,7 @@ impl Default for HttpOptions {
7079
no_proxy: None,
7180
connect_timeout: NonZeroFriendlyDuration::from_secs_unchecked(10),
7281
initial_max_send_streams: None,
82+
max_http2_connections: NonZeroUsize::new(20).unwrap(),
7383
}
7484
}
7585
}
@@ -100,22 +110,26 @@ pub struct Http2KeepAliveOptions {
100110
/// Sets an interval for HTTP/2 PING frames should be sent to keep a
101111
/// connection alive.
102112
///
113+
/// `0` disables keep-alive pings entirely. Defaults to `40s`.
114+
///
103115
/// You should set this timeout with a value lower than the `abort_timeout`.
104-
pub interval: NonZeroFriendlyDuration,
116+
pub interval: FriendlyDuration,
105117

106118
/// # Timeout
107119
///
108120
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
109121
///
110122
/// If the ping is not acknowledged within the timeout, the connection will
111123
/// be closed.
124+
///
125+
/// Only meaningful when `keep_alive_interval` is `Some`. Defaults to 20 s.
112126
pub timeout: NonZeroFriendlyDuration,
113127
}
114128

115129
impl Default for Http2KeepAliveOptions {
116130
fn default() -> Self {
117131
Self {
118-
interval: NonZeroFriendlyDuration::from_secs_unchecked(40),
132+
interval: FriendlyDuration::from_secs(40),
119133
timeout: NonZeroFriendlyDuration::from_secs_unchecked(20),
120134
}
121135
}

0 commit comments

Comments
 (0)