Skip to content

Commit d18730c

Browse files
authored
Split prefix buffering from HTTP detection (#676)
This change modifies `serve` to take a `NewService` instead of a `MakeService`. Services specific to the accept stack have been updated as well `DetectHttp` has been updated to work as either a `MakeService` or `NewService` -- the asynchronous version is still needed by the outbound proxy (until caching is changed). `DetectTls` is now purely a `NewService`.
1 parent f4de07d commit d18730c

File tree

6 files changed

+103
-53
lines changed

6 files changed

+103
-53
lines changed

linkerd/app/inbound/src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use linkerd2_app_core::{
1616
opencensus::proto::trace::v1 as oc,
1717
profiles,
1818
proxy::{
19-
http::{self, orig_proto, strip_header, DetectHttp},
19+
http::{self, orig_proto, strip_header},
2020
identity, tap, tcp,
2121
},
2222
reconnect, router,
@@ -367,15 +367,19 @@ impl Config {
367367
.check_new_service::<tls::accept::Meta, http::Request<_>>()
368368
.into_inner();
369369

370-
DetectHttp::new(
370+
svc::stack(http::DetectHttp::new(
371371
h2_settings,
372-
detect_protocol_timeout,
373372
http_server,
374373
svc::stack(tcp_forward)
375374
.push_map_target(TcpEndpoint::from)
376375
.into_inner(),
377376
drain.clone(),
378-
)
377+
))
378+
.push_on_response(transport::Prefix::layer(
379+
http::Version::DETECT_BUFFER_CAPACITY,
380+
detect_protocol_timeout,
381+
))
382+
.into_inner()
379383
}
380384

381385
pub fn build_tls_accept<D, A, F, B>(

linkerd/app/outbound/src/lib.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -518,17 +518,22 @@ impl Config {
518518
.push_map_target(|a: listen::Addrs| a.target_addr())
519519
.into_inner();
520520

521-
let http = http::DetectHttp::new(
521+
let http = svc::stack(http::DetectHttp::new(
522522
h2_settings,
523-
detect_protocol_timeout,
524523
http_server,
525524
tcp_balance,
526525
drain.clone(),
527-
);
526+
))
527+
.push_on_response(transport::Prefix::layer(
528+
http::Version::DETECT_BUFFER_CAPACITY,
529+
detect_protocol_timeout,
530+
))
531+
.into_new_service()
532+
.into_inner();
528533

529534
svc::stack(svc::stack::MakeSwitch::new(
530535
skip_detect.clone(),
531-
svc::stack(http).into_new_service().into_inner(),
536+
http,
532537
tcp_forward.push_map_target(TcpEndpoint::from).into_inner(),
533538
))
534539
.push(metrics.transport.layer_accept(TransportLabels))

linkerd/proxy/http/src/detect.rs

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,17 @@ use std::{
1313
future::Future,
1414
pin::Pin,
1515
task::{Context, Poll},
16-
time::Duration,
1716
};
1817
use tower::{util::ServiceExt, Service};
19-
use tracing::{debug, info_span, trace};
18+
use tracing::{debug, info_span};
2019
use tracing_futures::Instrument;
2120

2221
type Server = hyper::server::conn::Http<trace::Executor>;
2322

24-
#[derive(Copy, Clone, Debug)]
25-
pub struct DetectTimeout(());
26-
2723
#[derive(Clone, Debug)]
2824
pub struct DetectHttp<F, H> {
2925
tcp: F,
3026
http: H,
31-
timeout: Duration,
3227
server: Server,
3328
drain: drain::Watch,
3429
}
@@ -45,7 +40,6 @@ pub struct DetectHttp<F, H> {
4540
pub struct AcceptHttp<F, H> {
4641
tcp: F,
4742
http: H,
48-
timeout: Duration,
4943
server: hyper::server::conn::Http<trace::Executor>,
5044
drain: drain::Watch,
5145
}
@@ -54,14 +48,13 @@ pub struct AcceptHttp<F, H> {
5448

5549
impl<F, H> DetectHttp<F, H> {
5650
/// Creates a new `AcceptHttp`.
57-
pub fn new(h2: H2Settings, timeout: Duration, http: H, tcp: F, drain: drain::Watch) -> Self {
51+
pub fn new(h2: H2Settings, http: H, tcp: F, drain: drain::Watch) -> Self {
5852
let mut server = hyper::server::conn::Http::new().with_executor(trace::Executor::new());
5953
server
6054
.http2_initial_stream_window_size(h2.initial_stream_window_size)
6155
.http2_initial_connection_window_size(h2.initial_connection_window_size);
6256

6357
Self {
64-
timeout,
6558
server,
6659
tcp,
6760
http,
@@ -81,7 +74,6 @@ where
8174
fn new_service(&mut self, target: T) -> Self::Service {
8275
AcceptHttp::new(
8376
self.server.clone(),
84-
self.timeout,
8577
self.http.new_service(target.clone()),
8678
self.tcp.new_service(target),
8779
self.drain.clone(),
@@ -115,34 +107,32 @@ where
115107
let tcp = self.tcp.clone();
116108
let http = self.http.clone();
117109
let server = self.server.clone();
118-
let timeout = self.timeout;
119110

120111
Box::pin(async move {
121112
let (tcp, http) = futures::try_join!(
122113
tcp.oneshot(target.clone()).map_err(Into::<Error>::into),
123114
http.oneshot(target).map_err(Into::<Error>::into)
124115
)?;
125116

126-
Ok(AcceptHttp::new(server, timeout, http, tcp, drain))
117+
Ok(AcceptHttp::new(server, http, tcp, drain))
127118
})
128119
}
129120
}
130121

131122
// === impl AcceptHttp ===
132123

133124
impl<F, H> AcceptHttp<F, H> {
134-
pub fn new(server: Server, timeout: Duration, http: H, tcp: F, drain: drain::Watch) -> Self {
125+
pub fn new(server: Server, http: H, tcp: F, drain: drain::Watch) -> Self {
135126
Self {
136127
server,
137-
timeout,
138128
tcp,
139129
http,
140130
drain,
141131
}
142132
}
143133
}
144134

145-
impl<I, F, S> Service<I> for AcceptHttp<F, S>
135+
impl<I, F, S> Service<PrefixedIo<I>> for AcceptHttp<F, S>
146136
where
147137
I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
148138
F: tower::Service<PrefixedIo<I>, Response = ()> + Clone + Send + 'static,
@@ -164,22 +154,14 @@ where
164154
Poll::Ready(Ok(().into()))
165155
}
166156

167-
fn call(&mut self, io: I) -> Self::Future {
157+
fn call(&mut self, io: PrefixedIo<I>) -> Self::Future {
168158
let drain = self.drain.clone();
169159
let tcp = self.tcp.clone();
170160
let http = self.http.clone();
171161
let mut server = self.server.clone();
172162

173-
let timeout = tokio::time::delay_for(self.timeout);
163+
let version = HttpVersion::from_prefix(io.prefix());
174164
Box::pin(async move {
175-
trace!("Detecting");
176-
let (version, io) = tokio::select! {
177-
res = HttpVersion::detect(io) => { res? }
178-
() = timeout => {
179-
return Err(DetectTimeout(()).into());
180-
}
181-
};
182-
183165
match version {
184166
Some(HttpVersion::Http1) => {
185167
debug!("Handling as HTTP");
@@ -220,11 +202,3 @@ where
220202
})
221203
}
222204
}
223-
224-
impl std::fmt::Display for DetectTimeout {
225-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226-
write!(f, "HTTP detection timeout")
227-
}
228-
}
229-
230-
impl std::error::Error for DetectTimeout {}

linkerd/proxy/http/src/version.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use linkerd2_proxy_transport::io::{self, Peekable, PrefixedIo};
21
use tracing::{debug, trace};
32

43
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
@@ -22,6 +21,7 @@ impl std::convert::TryFrom<http::Version> for Version {
2221
}
2322

2423
impl Version {
24+
pub const DETECT_BUFFER_CAPACITY: usize = 8192;
2525
const H2_PREFACE: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
2626

2727
/// Tries to detect a known protocol in the peeked bytes.
@@ -60,17 +60,6 @@ impl Version {
6060
trace!(?bytes);
6161
None
6262
}
63-
64-
pub async fn detect<I>(io: I) -> io::Result<(Option<Self>, PrefixedIo<I>)>
65-
where
66-
I: io::AsyncRead + io::AsyncWrite + Unpin,
67-
{
68-
// If we don't find a newline, we consider the stream to be HTTP/1; so
69-
// we need enough capacity to prevent false-positives.
70-
let io = io.peek(8192).await?;
71-
let version = Self::from_prefix(io.prefix());
72-
Ok((version, io))
73-
}
7463
}
7564

7665
impl std::fmt::Display for Unsupported {

linkerd/proxy/transport/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ pub mod connect;
77
pub use linkerd2_io as io;
88
pub mod listen;
99
pub mod metrics;
10+
pub mod prefix;
1011
pub mod tls;
1112

1213
pub use self::{
1314
connect::Connect,
1415
io::BoxedIo,
1516
listen::{Bind, DefaultOrigDstAddr, NoOrigDstAddr, OrigDstAddr},
17+
prefix::Prefix,
1618
};
1719

1820
// Misc.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use crate::io;
2+
use futures::prelude::*;
3+
use linkerd2_error::Error;
4+
use linkerd2_stack::layer;
5+
use std::{
6+
future::Future,
7+
pin::Pin,
8+
task::{Context, Poll},
9+
time::Duration,
10+
};
11+
use tokio::time;
12+
use tower::util::ServiceExt;
13+
use tracing::{debug, trace};
14+
15+
#[derive(Copy, Clone)]
16+
pub struct Prefix<S> {
17+
inner: S,
18+
capacity: usize,
19+
timeout: Duration,
20+
}
21+
22+
#[derive(Debug)]
23+
pub struct ReadTimeout(());
24+
25+
impl<S> Prefix<S> {
26+
pub fn new(inner: S, capacity: usize, timeout: Duration) -> Self {
27+
Self {
28+
inner,
29+
capacity,
30+
timeout,
31+
}
32+
}
33+
34+
pub fn layer(
35+
capacity: usize,
36+
timeout: Duration,
37+
) -> impl layer::Layer<S, Service = Prefix<S>> + Clone {
38+
layer::mk(move |inner| Self::new(inner, capacity, timeout))
39+
}
40+
}
41+
42+
impl<S, I> tower::Service<I> for Prefix<S>
43+
where
44+
I: io::Peekable + Send + 'static,
45+
S: tower::Service<io::PrefixedIo<I>, Response = ()> + Clone + Send + 'static,
46+
S::Error: Into<Error>,
47+
S::Future: Send,
48+
{
49+
type Response = ();
50+
type Error = Error;
51+
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
52+
53+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
54+
Poll::Ready(Ok(().into()))
55+
}
56+
57+
fn call(&mut self, io: I) -> Self::Future {
58+
debug!(capacity = self.capacity, "Buffering prefix");
59+
let accept = self.inner.clone();
60+
let peek = time::timeout(self.timeout, io.peek(self.capacity)).map_err(|_| ReadTimeout(()));
61+
Box::pin(async move {
62+
let io = peek.await??;
63+
trace!(read = %io.prefix().len());
64+
accept.oneshot(io).err_into::<Error>().await?;
65+
Ok(())
66+
})
67+
}
68+
}
69+
70+
impl std::fmt::Display for ReadTimeout {
71+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72+
write!(f, "Timed out while reading client stream prefix")
73+
}
74+
}
75+
76+
impl std::error::Error for ReadTimeout {}

0 commit comments

Comments
 (0)