Skip to content

Commit f4760c0

Browse files
committed
[service-client] H2 Pool integation and usage in service client
- This PR finally enable using of the new H2 pool in the http service client. - It also fixes #4456 by making sure: - Request stream is closed immediately after the we receive a terminal state - Drain the response stream. This also fixes a connection thrashing issue
1 parent 17f9a75 commit f4760c0

File tree

5 files changed

+178
-59
lines changed

5 files changed

+178
-59
lines changed

crates/invoker-impl/src/invocation_task/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,8 @@ impl ResponseStream {
486486
// This task::spawn won't be required by hyper 1.0, as the connection will be driven by a task
487487
// spawned somewhere else (perhaps in the connection pool).
488488
// See: https://github.com/restatedev/restate/issues/96 and https://github.com/restatedev/restate/issues/76
489+
490+
//todo: this is a temp clone to test
489491
Self::WaitingHeaders {
490492
join_handle: AbortOnDropHandle::new(tokio::task::spawn(client.call(req))),
491493
}

crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,6 @@ where
147147
.try_into()
148148
.expect("must be able to build a valid invocation path");
149149

150-
let journal_size = journal_metadata.length;
151-
152150
debug!(
153151
restate.invocation.id = %self.invocation_task.invocation_id,
154152
deployment.address = %deployment.address_display(),
@@ -159,15 +157,14 @@ where
159157

160158
// Create an arc of the parent SpanContext.
161159
// We send this with every journal entry to correctly link new spans generated from journal entries.
162-
let service_invocation_span_context = journal_metadata.span_context;
163160

164161
// Prepare the request
165-
let (mut http_stream_tx, request) = Self::prepare_request(
162+
let (http_stream_tx, request) = Self::prepare_request(
166163
path,
167164
deployment,
168165
self.service_protocol_version,
169166
&self.invocation_task.invocation_id,
170-
&service_invocation_span_context,
167+
&journal_metadata.span_context,
171168
);
172169

173170
// Initialize the response stream state
@@ -183,6 +180,49 @@ where
183180
.throttle(self.invocation_task.action_token_bucket.take())
184181
);
185182

183+
let result = self
184+
.run_inner(
185+
txn,
186+
protocol_type,
187+
journal_metadata,
188+
keyed_service_id,
189+
cached_journal_items,
190+
http_stream_tx,
191+
&mut decoder_stream,
192+
)
193+
.await;
194+
// Sanity check of the stream decoder
195+
if decoder_stream.inner().has_remaining() {
196+
warn_it!(
197+
InvokerError::WriteAfterEndOfStream,
198+
"The read buffer is non empty after the stream has been closed."
199+
);
200+
}
201+
202+
let inner_stream = &mut decoder_stream.inner_pin_mut().inner;
203+
204+
while inner_stream.next().await.is_some() {}
205+
206+
result
207+
}
208+
209+
#[allow(clippy::too_many_arguments)]
210+
async fn run_inner<Txn, S>(
211+
&mut self,
212+
txn: Txn,
213+
protocol_type: ProtocolType,
214+
journal_metadata: JournalMetadata,
215+
keyed_service_id: Option<ServiceId>,
216+
cached_journal_items: Option<Vec<JournalEntry>>,
217+
mut http_stream_tx: mpsc::Sender<Result<Frame<Bytes>, Infallible>>,
218+
decoder_stream: &mut S,
219+
) -> TerminalLoopState<()>
220+
where
221+
Txn: InvocationReaderTransaction,
222+
S: Stream<Item = Result<DecoderStreamItem, InvokerError>> + Unpin,
223+
{
224+
let journal_size = journal_metadata.length;
225+
let service_invocation_span_context = journal_metadata.span_context;
186226
// === Replay phase (transaction alive) ===
187227
{
188228
// Read state if needed (state is collected for the START message)
@@ -215,7 +255,7 @@ where
215255
crate::shortcircuit!(
216256
self.replay_loop(
217257
&mut http_stream_tx,
218-
&mut decoder_stream,
258+
decoder_stream,
219259
journal_stream,
220260
journal_metadata.length
221261
)
@@ -234,7 +274,7 @@ where
234274
crate::shortcircuit!(
235275
self.replay_loop(
236276
&mut http_stream_tx,
237-
&mut decoder_stream,
277+
decoder_stream,
238278
journal_stream,
239279
journal_metadata.length
240280
)
@@ -255,32 +295,24 @@ where
255295
self.bidi_stream_loop(
256296
&service_invocation_span_context,
257297
http_stream_tx,
258-
&mut decoder_stream
298+
decoder_stream
259299
)
260300
.await
261301
);
262302
} else {
263-
trace!("Protocol is in bidi stream mode, will now drop the sender side of the request");
303+
trace!(
304+
"Protocol is not in bidi stream mode, will now drop the sender side of the request"
305+
);
264306
// Drop the http_stream_tx.
265307
// This is required in HTTP/1.1 to let the deployment send the headers back
266308
drop(http_stream_tx)
267309
}
268310

269311
// We don't have the invoker_rx, so we simply consume the response
270312
trace!("Sender side of the request has been dropped, now processing the response");
271-
let result = self
272-
.response_stream_loop(&service_invocation_span_context, &mut decoder_stream)
273-
.await;
274-
275-
// Sanity check of the stream decoder
276-
if decoder_stream.inner().has_remaining() {
277-
warn_it!(
278-
InvokerError::WriteAfterEndOfStream,
279-
"The read buffer is non empty after the stream has been closed."
280-
);
281-
}
282313

283-
result
314+
self.response_stream_loop(&service_invocation_span_context, decoder_stream)
315+
.await
284316
}
285317

286318
fn prepare_request(

crates/service-client/src/http.rs

Lines changed: 119 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@
1010

1111
use super::proxy::ProxyConnector;
1212

13+
use crate::pool::conn::PermittedRecvStream;
14+
use crate::pool::tls::TlsConnector;
15+
use crate::pool::{self, Pool, TcpConnector};
1316
use crate::utils::ErrorExt;
1417

1518
use bytes::Bytes;
1619
use futures::FutureExt;
17-
use futures::future::Either;
20+
use futures::future::{self, Either};
1821
use http::Version;
19-
use http_body_util::BodyExt;
20-
use hyper::body::Body;
22+
use http_body_util::{BodyExt, Either as EitherBody};
23+
use hyper::body::{Body, Incoming};
2124
use hyper::http::HeaderValue;
2225
use hyper::http::uri::PathAndQuery;
2326
use hyper::{HeaderMap, Method, Request, Response, Uri};
@@ -26,10 +29,13 @@ use hyper_util::client::legacy::connect::HttpConnector;
2629
use restate_types::config::HttpOptions;
2730
use rustls::{ClientConfig, KeyLogFile};
2831
use std::error::Error;
32+
use std::fmt;
2933
use std::fmt::Debug;
30-
use std::future::Future;
34+
use std::num::NonZeroUsize;
35+
use std::pin::Pin;
3136
use std::sync::{Arc, LazyLock};
32-
use std::{fmt, future};
37+
use std::task::{Context, Poll, ready};
38+
use tower::Layer;
3339

3440
type ProxiedHttpsConnector = ProxyConnector<HttpsConnector<HttpConnector>>;
3541

@@ -55,7 +61,7 @@ static TLS_CLIENT_CONFIG: LazyLock<ClientConfig> = LazyLock::new(|| {
5561
type BoxError = Box<dyn Error + Send + Sync + 'static>;
5662
type BoxBody = http_body_util::combinators::BoxBody<Bytes, BoxError>;
5763

58-
#[derive(Clone, Debug)]
64+
#[derive(Clone)]
5965
pub struct HttpClient {
6066
/// Client used for HTTPS as long as HTTP1.1 or HTTP2 was not specifically requested.
6167
/// All HTTP versions are possible.
@@ -68,7 +74,7 @@ pub struct HttpClient {
6874
/// Client when HTTP2 was specifically requested - for cleartext, we use h2c,
6975
/// and for HTTPS, we will fail unless the ALPN supports h2.
7076
/// In practice, at discovery time we never force h2 for HTTPS.
71-
h2_client: hyper_util::client::legacy::Client<ProxiedHttpsConnector, BoxBody>,
77+
h2_pool: Pool<ProxyConnector<TlsConnector<TcpConnector>>>,
7278
}
7379

7480
impl HttpClient {
@@ -101,11 +107,19 @@ impl HttpClient {
101107
.enable_http1()
102108
.wrap_connector(http_connector.clone());
103109

104-
let https_h2_connector = hyper_rustls::HttpsConnectorBuilder::new()
105-
.with_tls_config(TLS_CLIENT_CONFIG.clone())
106-
.https_or_http()
107-
.enable_http2()
108-
.wrap_connector(http_connector.clone());
110+
let h2_pool = {
111+
let connector = pool::tls::TlsConnectorLayer::new(TLS_CLIENT_CONFIG.clone())
112+
.layer(pool::TcpConnector);
113+
let connector = ProxyConnector::new(
114+
options.http_proxy.clone(),
115+
options.no_proxy.clone(),
116+
connector,
117+
);
118+
119+
pool::PoolBuilder::default()
120+
.max_connections(NonZeroUsize::new(20).unwrap())
121+
.build(connector)
122+
};
109123

110124
HttpClient {
111125
alpn_client: builder.clone().build::<_, BoxBody>(ProxyConnector::new(
@@ -118,14 +132,7 @@ impl HttpClient {
118132
options.no_proxy.clone(),
119133
https_h1_connector,
120134
)),
121-
h2_client: {
122-
builder.http2_only(true);
123-
builder.build::<_, BoxBody>(ProxyConnector::new(
124-
options.http_proxy.clone(),
125-
options.no_proxy.clone(),
126-
https_h2_connector,
127-
))
128-
},
135+
h2_pool,
129136
}
130137
}
131138

@@ -186,10 +193,10 @@ impl HttpClient {
186193
body: B,
187194
path: PathAndQuery,
188195
headers: HeaderMap<HeaderValue>,
189-
) -> impl Future<Output = Result<Response<hyper::body::Incoming>, HttpError>> + Send + 'static
196+
) -> impl Future<Output = Result<Response<ResponseBody>, HttpError>> + Send + 'static
190197
where
191198
B: Body<Data = Bytes> + Send + Sync + Unpin + Sized + 'static,
192-
<B as Body>::Error: Error + Send + Sync + 'static,
199+
B::Error: std::error::Error + Send + Sync + 'static,
193200
{
194201
let request = match Self::build_request(uri, version, body, method, path, headers) {
195202
Ok(request) => request,
@@ -198,21 +205,98 @@ impl HttpClient {
198205

199206
let fut = match version {
200207
// version is set to http1.1 when use_http1.1 is set
201-
Some(Version::HTTP_11) => self.h1_client.request(request),
208+
Some(Version::HTTP_11) => ResponseMapper {
209+
fut: self.h1_client.request(request),
210+
}
211+
.left_future(),
202212
// version is set to http2 for cleartext urls when use_http1.1 is not set
203-
Some(Version::HTTP_2) => self.h2_client.request(request),
213+
Some(Version::HTTP_2) => ResponseMapper {
214+
fut: self.h2_pool.request(request),
215+
}
216+
.right_future(),
204217
// version is currently set to none for https urls when use_http1.1 is not set
205-
None => self.alpn_client.request(request),
218+
None => ResponseMapper {
219+
fut: self.alpn_client.request(request),
220+
}
221+
.left_future(),
206222
// nothing currently sets a different version, but the alpn client is a sensible default
207-
Some(_) => self.alpn_client.request(request),
223+
Some(_) => ResponseMapper {
224+
fut: self.alpn_client.request(request),
225+
}
226+
.left_future(),
208227
};
209228

210-
Either::Left(async move {
211-
match fut.await {
212-
Ok(res) => Ok(res),
213-
Err(err) => Err(err.into()),
214-
}
215-
})
229+
Either::Left(fut)
230+
}
231+
}
232+
233+
#[pin_project::pin_project]
234+
struct ResponseMapper<F, B, E>
235+
where
236+
F: Future<Output = Result<Response<B>, E>>,
237+
E: Into<HttpError>,
238+
B: Into<ResponseBody>,
239+
{
240+
#[pin]
241+
fut: F,
242+
}
243+
244+
impl<F, B, E> Future for ResponseMapper<F, B, E>
245+
where
246+
F: Future<Output = Result<Response<B>, E>>,
247+
E: Into<HttpError>,
248+
B: Into<ResponseBody>,
249+
{
250+
type Output = Result<Response<ResponseBody>, HttpError>;
251+
252+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253+
let result = ready!(self.project().fut.poll(cx))
254+
.map_err(Into::into)
255+
.map(|response| response.map(Into::into));
256+
257+
Poll::Ready(result)
258+
}
259+
}
260+
261+
/// A wrapper around [`http_body_util::Either`] to hide
262+
/// type complexity for higher layer
263+
#[pin_project::pin_project]
264+
pub struct ResponseBody {
265+
#[pin]
266+
inner: EitherBody<Incoming, PermittedRecvStream>,
267+
}
268+
269+
impl From<Incoming> for ResponseBody {
270+
fn from(value: Incoming) -> Self {
271+
Self {
272+
inner: EitherBody::Left(value),
273+
}
274+
}
275+
}
276+
277+
impl From<PermittedRecvStream> for ResponseBody {
278+
fn from(value: PermittedRecvStream) -> Self {
279+
Self {
280+
inner: EitherBody::Right(value),
281+
}
282+
}
283+
}
284+
285+
impl Body for ResponseBody {
286+
type Data = Bytes;
287+
type Error = Box<dyn std::error::Error + Send + Sync>;
288+
289+
fn is_end_stream(&self) -> bool {
290+
self.inner.is_end_stream()
291+
}
292+
fn poll_frame(
293+
self: std::pin::Pin<&mut Self>,
294+
cx: &mut std::task::Context<'_>,
295+
) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
296+
self.project().inner.poll_frame(cx)
297+
}
298+
fn size_hint(&self) -> http_body::SizeHint {
299+
self.inner.size_hint()
216300
}
217301
}
218302

@@ -228,6 +312,8 @@ pub enum HttpError {
228312
Connect(#[source] hyper_util::client::legacy::Error),
229313
#[error("{}", FormatHyperError(.0))]
230314
Hyper(#[source] hyper_util::client::legacy::Error),
315+
#[error("h2 pool connection error: {0}")]
316+
PoolError(#[from] pool::ConnectionError),
231317
}
232318

233319
impl HttpError {
@@ -240,6 +326,7 @@ impl HttpError {
240326
HttpError::PossibleHTTP11Only(_) => false,
241327
HttpError::PossibleHTTP2Only(_) => false,
242328
HttpError::Connect(_) => true,
329+
HttpError::PoolError(_) => true,
243330
}
244331
}
245332

0 commit comments

Comments
 (0)