Skip to content

Commit 3ffc276

Browse files
authored
chore(http): test HTTP/2 flow control exhaustion (#2895)
This change adds tests that exercises the server's behavior when clients exhaust their HTTP/2 receive windows, i.e. so that the server is unable to send additional data to a client. These tests currently only document the existing behavior, but they will be extended to validate mitigation strategies as they are implemented.
1 parent edd01ad commit 3ffc276

File tree

2 files changed

+246
-0
lines changed

2 files changed

+246
-0
lines changed

linkerd/proxy/http/src/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use std::{
1313
use tower::Service;
1414
use tracing::{debug, Instrument};
1515

16+
#[cfg(test)]
17+
mod tests;
18+
1619
/// Configures HTTP server behavior.
1720
#[derive(Clone, Debug)]
1821
pub struct Params {
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
use std::vec;
2+
3+
use super::*;
4+
use bytes::Bytes;
5+
use http_body::Body;
6+
use linkerd_stack::CloneParam;
7+
use tokio::time;
8+
use tower::ServiceExt;
9+
use tower_test::mock;
10+
use tracing::info_span;
11+
12+
/// Tests how the server behaves when the client connection window is exhausted.
13+
#[tokio::test(flavor = "current_thread", start_paused = true)]
14+
async fn h2_connection_window_exhaustion() {
15+
let _trace = linkerd_tracing::test::with_default_filter(LOG_LEVEL);
16+
17+
// Setup a HTTP/2 server with consumers and producers that are mocked for
18+
// tests.
19+
const CONCURRENCY: u32 = 3;
20+
const CLIENT_STREAM_WINDOW: u32 = 65535;
21+
const CLIENT_CONN_WINDOW: u32 = CONCURRENCY * CLIENT_STREAM_WINDOW;
22+
23+
tracing::info!("Connecting to server");
24+
let mut server = TestServer::connect_h2(
25+
// A basic HTTP/2 server configuration with no overrides.
26+
H2Settings::default(),
27+
// An HTTP/2 client with constrained connection and stream windows to
28+
// force window exhaustion.
29+
hyper::client::conn::Builder::new()
30+
.http2_initial_connection_window_size(CLIENT_CONN_WINDOW)
31+
.http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
32+
)
33+
.await;
34+
35+
// Mocked response data to fill up the stream and connection windows.
36+
let bytes = (0..CLIENT_STREAM_WINDOW).map(|_| b'a').collect::<Bytes>();
37+
38+
// Response bodies held to exhaust connection window.
39+
let mut retain = vec![];
40+
41+
tracing::info!(
42+
streams = CONCURRENCY - 1,
43+
data = bytes.len(),
44+
"Consuming connection window"
45+
);
46+
for _ in 0..CONCURRENCY - 1 {
47+
let rx = timeout(server.respond(bytes.clone()))
48+
.await
49+
.expect("timed out");
50+
retain.push(rx);
51+
}
52+
53+
tracing::info!("Processing a stream with available connection window");
54+
let rx = timeout(server.respond(bytes.clone()))
55+
.await
56+
.expect("timed out");
57+
let body = timeout(rx.collect().instrument(info_span!("collect")))
58+
.await
59+
.expect("response timed out")
60+
.expect("response");
61+
assert_eq!(body.to_bytes(), bytes);
62+
63+
tracing::info!("Consuming the remaining connection window");
64+
let rx = timeout(server.respond(bytes.clone()))
65+
.await
66+
.expect("timed out");
67+
retain.push(rx);
68+
69+
tracing::info!("The connection window is exhausted");
70+
71+
tracing::info!("Trying to process an additional stream. The response headers are received but no data is received.");
72+
let mut rx = timeout(server.respond(bytes.clone()))
73+
.await
74+
.expect("timed out");
75+
tokio::select! {
76+
_ = time::sleep(time::Duration::from_secs(2)) => {}
77+
_ = rx.data() => panic!("unexpected data"),
78+
}
79+
80+
tracing::info!("Dropping one of the retained response bodies frees capacity so that the data can be received");
81+
drop(retain.pop());
82+
let body = timeout(rx.collect().instrument(info_span!("collect")))
83+
.await
84+
.expect("response timed out")
85+
.expect("response");
86+
assert_eq!(body.to_bytes(), bytes);
87+
}
88+
89+
/// Tests how the server behaves when the client stream window is exhausted.
90+
#[tokio::test(flavor = "current_thread", start_paused = true)]
91+
async fn h2_stream_window_exhaustion() {
92+
let _trace = linkerd_tracing::test::with_default_filter(LOG_LEVEL);
93+
94+
// Setup a HTTP/2 server with consumers and producers that are mocked for
95+
// tests.
96+
const CLIENT_STREAM_WINDOW: u32 = 1024;
97+
98+
let mut server = TestServer::connect_h2(
99+
// A basic HTTP/2 server configuration with no overrides.
100+
H2Settings::default(),
101+
// An HTTP/2 client with stream windows to force window exhaustion.
102+
hyper::client::conn::Builder::new().http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
103+
)
104+
.await;
105+
106+
let (mut tx, mut body) = timeout(server.get()).await.expect("timed out");
107+
108+
let chunk = (0..CLIENT_STREAM_WINDOW).map(|_| b'a').collect::<Bytes>();
109+
tracing::info!(sz = chunk.len(), "Sending chunk");
110+
tx.try_send_data(chunk.clone()).expect("send data");
111+
tokio::task::yield_now().await;
112+
113+
tracing::info!(sz = chunk.len(), "Buffering chunk in channel");
114+
tx.try_send_data(chunk.clone()).expect("send data");
115+
tokio::task::yield_now().await;
116+
117+
tracing::info!(sz = chunk.len(), "Confirming stream window exhaustion");
118+
assert!(
119+
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
120+
.await
121+
.is_err(),
122+
"stream window should be exhausted"
123+
);
124+
125+
tracing::info!("Once the pending data is read, the stream window should be replenished");
126+
let data = body.data().await.expect("data").expect("data");
127+
assert_eq!(data, chunk);
128+
let data = body.data().await.expect("data").expect("data");
129+
assert_eq!(data, chunk);
130+
131+
timeout(body.data()).await.expect_err("no more chunks");
132+
133+
tracing::info!(sz = chunk.len(), "Confirming stream window availability");
134+
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
135+
.await
136+
.expect("timed out")
137+
.expect("ready");
138+
}
139+
140+
// === Utilities ===
141+
142+
const LOG_LEVEL: &str = "h2::proto=trace,hyper=trace,linkerd=trace,info";
143+
144+
struct TestServer {
145+
client: hyper::client::conn::SendRequest<BoxBody>,
146+
server: Handle,
147+
}
148+
149+
type Mock = mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>;
150+
type Handle = mock::Handle<http::Request<BoxBody>, http::Response<BoxBody>>;
151+
152+
/// Allows us to configure a server from the Params type.
153+
#[derive(Clone, Debug)]
154+
struct NewMock(mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>);
155+
156+
impl NewService<()> for NewMock {
157+
type Service = NewMock;
158+
fn new_service(&self, _: ()) -> Self::Service {
159+
self.clone()
160+
}
161+
}
162+
163+
impl NewService<ClientHandle> for NewMock {
164+
type Service = Mock;
165+
fn new_service(&self, _: ClientHandle) -> Self::Service {
166+
self.0.clone()
167+
}
168+
}
169+
170+
fn drain() -> drain::Watch {
171+
let (mut sig, drain) = drain::channel();
172+
tokio::spawn(async move {
173+
sig.closed().await;
174+
});
175+
drain
176+
}
177+
178+
async fn timeout<F: Future>(inner: F) -> Result<F::Output, time::error::Elapsed> {
179+
time::timeout(time::Duration::from_secs(2), inner).await
180+
}
181+
182+
impl TestServer {
183+
#[tracing::instrument(skip_all)]
184+
async fn connect(params: Params, client: &mut hyper::client::conn::Builder) -> Self {
185+
// Build the HTTP server with a mocked inner service so that we can handle
186+
// requests.
187+
let (mock, server) = mock::pair();
188+
let svc = NewServeHttp::new(CloneParam::from(params), NewMock(mock)).new_service(());
189+
190+
let (sio, cio) = io::duplex(20 * 1024 * 1024); // 20 MB
191+
tokio::spawn(svc.oneshot(sio).instrument(info_span!("server")));
192+
193+
// Build a real HTTP/2 client using the mocked socket.
194+
let (client, task) = client
195+
.executor(crate::executor::TracingExecutor)
196+
.handshake::<_, BoxBody>(cio)
197+
.await
198+
.expect("client connect");
199+
tokio::spawn(task.instrument(info_span!("client")));
200+
201+
Self { client, server }
202+
}
203+
204+
async fn connect_h2(h2: H2Settings, client: &mut hyper::client::conn::Builder) -> Self {
205+
Self::connect(
206+
// A basic HTTP/2 server configuration with no overrides.
207+
Params {
208+
drain: drain(),
209+
version: Version::H2,
210+
h2,
211+
},
212+
// An HTTP/2 client with constrained connection and stream windows to accomodate
213+
client.http2_only(true),
214+
)
215+
.await
216+
}
217+
218+
/// Issues a request through the client to the mocked server and processes the
219+
/// response. The mocked response body sender and the readable response body are
220+
/// returned.
221+
#[tracing::instrument(skip(self))]
222+
async fn get(&mut self) -> (hyper::body::Sender, hyper::Body) {
223+
self.server.allow(1);
224+
let mut call0 = self
225+
.client
226+
.send_request(http::Request::new(BoxBody::default()));
227+
let (_req, next) = tokio::select! {
228+
_ = (&mut call0) => unreachable!("client cannot receive a response"),
229+
next = self.server.next_request() => next.expect("server not dropped"),
230+
};
231+
let (tx, rx) = hyper::Body::channel();
232+
next.send_response(http::Response::new(BoxBody::new(rx)));
233+
let rsp = call0.await.expect("response");
234+
(tx, rsp.into_body())
235+
}
236+
237+
#[tracing::instrument(skip(self))]
238+
async fn respond(&mut self, body: Bytes) -> hyper::Body {
239+
let (mut tx, rx) = self.get().await;
240+
tx.send_data(body.clone()).await.expect("send data");
241+
rx
242+
}
243+
}

0 commit comments

Comments
 (0)