Skip to content

Commit 2f1c0b7

Browse files
authored
fix(http2): Fix race condition in client dispatcher (#3041)
There exists a race condition in ClientTask::poll() when the request that is sent via h2::client::send_request() is pending open. A task will be spawned to wait for send capacity on the sendstream. Because this same stream is also stored in the pending member of h2::client::SendRequest the next iteration of the poll() loop can call poll_ready() and call wait_send() on the same stream passed into the spawned task. Fix this by always calling poll_ready() after send_request(). If this call to poll_ready() returns Pending save the necessary context in ClientTask and only spawn the task that will eventually resolve to the response after poll_ready() returns Ok. Closes #2419
1 parent 04d637e commit 2f1c0b7

File tree

1 file changed

+143
-81
lines changed

1 file changed

+143
-81
lines changed

src/proto/h2/client.rs

Lines changed: 143 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,22 @@ use futures_channel::{mpsc, oneshot};
77
use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
88
use futures_util::stream::StreamExt as _;
99
use h2::client::{Builder, SendRequest};
10+
use h2::SendStream;
1011
use http::{Method, StatusCode};
1112
use tokio::io::{AsyncRead, AsyncWrite};
1213
use tracing::{debug, trace, warn};
1314

1415
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
1516
use crate::body::HttpBody;
17+
use crate::client::dispatch::Callback;
1618
use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
1719
use crate::ext::Protocol;
1820
use crate::headers;
1921
use crate::proto::h2::UpgradedSendStream;
2022
use crate::proto::Dispatched;
2123
use crate::upgrade::Upgraded;
2224
use crate::{Body, Request, Response};
25+
use h2::client::ResponseFuture;
2326

2427
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
2528

@@ -170,6 +173,7 @@ where
170173
executor: exec,
171174
h2_tx,
172175
req_rx,
176+
fut_ctx: None,
173177
})
174178
}
175179

@@ -193,6 +197,20 @@ where
193197
}
194198
}
195199

200+
struct FutCtx<B>
201+
where
202+
B: HttpBody,
203+
{
204+
is_connect: bool,
205+
eos: bool,
206+
fut: ResponseFuture,
207+
body_tx: SendStream<SendBuf<B::Data>>,
208+
body: B,
209+
cb: Callback<Request<B>, Response<Body>>,
210+
}
211+
212+
impl<B: HttpBody> Unpin for FutCtx<B> {}
213+
196214
pub(crate) struct ClientTask<B>
197215
where
198216
B: HttpBody,
@@ -203,6 +221,7 @@ where
203221
executor: Exec,
204222
h2_tx: SendRequest<SendBuf<B::Data>>,
205223
req_rx: ClientRx<B>,
224+
fut_ctx: Option<FutCtx<B>>,
206225
}
207226

208227
impl<B> ClientTask<B>
@@ -214,6 +233,99 @@ where
214233
}
215234
}
216235

236+
impl<B> ClientTask<B>
237+
where
238+
B: HttpBody + Send + 'static,
239+
B::Data: Send,
240+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
241+
{
242+
fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut task::Context<'_>) {
243+
let ping = self.ping.clone();
244+
let send_stream = if !f.is_connect {
245+
if !f.eos {
246+
let mut pipe = Box::pin(PipeToSendStream::new(f.body, f.body_tx)).map(|res| {
247+
if let Err(e) = res {
248+
debug!("client request body error: {}", e);
249+
}
250+
});
251+
252+
// eagerly see if the body pipe is ready and
253+
// can thus skip allocating in the executor
254+
match Pin::new(&mut pipe).poll(cx) {
255+
Poll::Ready(_) => (),
256+
Poll::Pending => {
257+
let conn_drop_ref = self.conn_drop_ref.clone();
258+
// keep the ping recorder's knowledge of an
259+
// "open stream" alive while this body is
260+
// still sending...
261+
let ping = ping.clone();
262+
let pipe = pipe.map(move |x| {
263+
drop(conn_drop_ref);
264+
drop(ping);
265+
x
266+
});
267+
// Clear send task
268+
self.executor.execute(pipe);
269+
}
270+
}
271+
}
272+
273+
None
274+
} else {
275+
Some(f.body_tx)
276+
};
277+
278+
let fut = f.fut.map(move |result| match result {
279+
Ok(res) => {
280+
// record that we got the response headers
281+
ping.record_non_data();
282+
283+
let content_length = headers::content_length_parse_all(res.headers());
284+
if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
285+
if content_length.map_or(false, |len| len != 0) {
286+
warn!("h2 connect response with non-zero body not supported");
287+
288+
send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
289+
return Err((
290+
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
291+
None,
292+
));
293+
}
294+
let (parts, recv_stream) = res.into_parts();
295+
let mut res = Response::from_parts(parts, Body::empty());
296+
297+
let (pending, on_upgrade) = crate::upgrade::pending();
298+
let io = H2Upgraded {
299+
ping,
300+
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
301+
recv_stream,
302+
buf: Bytes::new(),
303+
};
304+
let upgraded = Upgraded::new(io, Bytes::new());
305+
306+
pending.fulfill(upgraded);
307+
res.extensions_mut().insert(on_upgrade);
308+
309+
Ok(res)
310+
} else {
311+
let res = res.map(|stream| {
312+
let ping = ping.for_stream(&stream);
313+
crate::Body::h2(stream, content_length.into(), ping)
314+
});
315+
Ok(res)
316+
}
317+
}
318+
Err(err) => {
319+
ping.ensure_not_timed_out().map_err(|e| (e, None))?;
320+
321+
debug!("client response error: {}", err);
322+
Err((crate::Error::new_h2(err), None))
323+
}
324+
});
325+
self.executor.execute(f.cb.send_when(fut));
326+
}
327+
}
328+
217329
impl<B> Future for ClientTask<B>
218330
where
219331
B: HttpBody + Send + 'static,
@@ -237,6 +349,16 @@ where
237349
}
238350
};
239351

352+
match self.fut_ctx.take() {
353+
// If we were waiting on pending open
354+
// continue where we left off.
355+
Some(f) => {
356+
self.poll_pipe(f, cx);
357+
continue;
358+
}
359+
None => (),
360+
}
361+
240362
match self.req_rx.poll_recv(cx) {
241363
Poll::Ready(Some((req, cb))) => {
242364
// check that future hasn't been canceled already
@@ -255,7 +377,6 @@ where
255377

256378
let is_connect = req.method() == Method::CONNECT;
257379
let eos = body.is_end_stream();
258-
let ping = self.ping.clone();
259380

260381
if is_connect {
261382
if headers::content_length_parse_all(req.headers())
@@ -283,90 +404,31 @@ where
283404
}
284405
};
285406

286-
let send_stream = if !is_connect {
287-
if !eos {
288-
let mut pipe =
289-
Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
290-
if let Err(e) = res {
291-
debug!("client request body error: {}", e);
292-
}
293-
});
294-
295-
// eagerly see if the body pipe is ready and
296-
// can thus skip allocating in the executor
297-
match Pin::new(&mut pipe).poll(cx) {
298-
Poll::Ready(_) => (),
299-
Poll::Pending => {
300-
let conn_drop_ref = self.conn_drop_ref.clone();
301-
// keep the ping recorder's knowledge of an
302-
// "open stream" alive while this body is
303-
// still sending...
304-
let ping = ping.clone();
305-
let pipe = pipe.map(move |x| {
306-
drop(conn_drop_ref);
307-
drop(ping);
308-
x
309-
});
310-
self.executor.execute(pipe);
311-
}
312-
}
313-
}
314-
315-
None
316-
} else {
317-
Some(body_tx)
407+
let f = FutCtx {
408+
is_connect,
409+
eos,
410+
fut,
411+
body_tx,
412+
body,
413+
cb,
318414
};
319415

320-
let fut = fut.map(move |result| match result {
321-
Ok(res) => {
322-
// record that we got the response headers
323-
ping.record_non_data();
324-
325-
let content_length = headers::content_length_parse_all(res.headers());
326-
if let (Some(mut send_stream), StatusCode::OK) =
327-
(send_stream, res.status())
328-
{
329-
if content_length.map_or(false, |len| len != 0) {
330-
warn!("h2 connect response with non-zero body not supported");
331-
332-
send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
333-
return Err((
334-
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
335-
None,
336-
));
337-
}
338-
let (parts, recv_stream) = res.into_parts();
339-
let mut res = Response::from_parts(parts, Body::empty());
340-
341-
let (pending, on_upgrade) = crate::upgrade::pending();
342-
let io = H2Upgraded {
343-
ping,
344-
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
345-
recv_stream,
346-
buf: Bytes::new(),
347-
};
348-
let upgraded = Upgraded::new(io, Bytes::new());
349-
350-
pending.fulfill(upgraded);
351-
res.extensions_mut().insert(on_upgrade);
352-
353-
Ok(res)
354-
} else {
355-
let res = res.map(|stream| {
356-
let ping = ping.for_stream(&stream);
357-
crate::Body::h2(stream, content_length.into(), ping)
358-
});
359-
Ok(res)
360-
}
416+
// Check poll_ready() again.
417+
// If the call to send_request() resulted in the new stream being pending open
418+
// we have to wait for the open to complete before accepting new requests.
419+
match self.h2_tx.poll_ready(cx) {
420+
Poll::Pending => {
421+
// Save Context
422+
self.fut_ctx = Some(f);
423+
return Poll::Pending;
361424
}
362-
Err(err) => {
363-
ping.ensure_not_timed_out().map_err(|e| (e, None))?;
364-
365-
debug!("client response error: {}", err);
366-
Err((crate::Error::new_h2(err), None))
425+
Poll::Ready(Ok(())) => (),
426+
Poll::Ready(Err(err)) => {
427+
f.cb.send(Err((crate::Error::new_h2(err), None)));
428+
continue;
367429
}
368-
});
369-
self.executor.execute(cb.send_when(fut));
430+
}
431+
self.poll_pipe(f, cx);
370432
continue;
371433
}
372434

0 commit comments

Comments
 (0)