Skip to content

Commit 1b97e57

Browse files
authored
detect: Make protocol detection more robust (#744)
The existing protocol detection implementation is somewhat naive: we simply perform a single read on the socket and use that initial buffer to try to determine the protocol. This means that if the initial read only returns a few bytes, we won't actually have enough information to do protocol detection. Furthermore, because we don't actually read until a `\r\n`, we can incorrectly infer that a non-HTTP protocol is HTTP/1.1. This change fixes this by replacing the `Prefix` middleware with a `Detect` service and trait that does its own buffering as it tries to read from the socket. When detection times out, an error is thrown to the server. This change requires modifying test data to include a trailing newline (since it's not feasible to close these connections given the current test infrastructure). This change increases the default dispatch (and therefore detection) timeouts to 5s.
1 parent 147908f commit 1b97e57

File tree

25 files changed

+559
-407
lines changed

25 files changed

+559
-407
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,6 +1360,7 @@ dependencies = [
13601360
name = "linkerd2-proxy-http"
13611361
version = "0.1.0"
13621362
dependencies = [
1363+
"async-trait",
13631364
"bytes 0.6.0",
13641365
"futures 0.3.5",
13651366
"h2 0.3.0",
@@ -1375,14 +1376,17 @@ dependencies = [
13751376
"linkerd2-http-box",
13761377
"linkerd2-identity",
13771378
"linkerd2-io",
1379+
"linkerd2-proxy-transport",
13781380
"linkerd2-stack",
13791381
"linkerd2-timeout",
13801382
"pin-project 1.0.2",
13811383
"rand 0.7.2",
13821384
"tokio 0.3.5",
1385+
"tokio-test 0.3.0",
13831386
"tower",
13841387
"tracing",
13851388
"tracing-futures",
1389+
"tracing-subscriber",
13861390
"try-lock",
13871391
]
13881392

@@ -1459,6 +1463,7 @@ name = "linkerd2-proxy-transport"
14591463
version = "0.1.0"
14601464
dependencies = [
14611465
"async-stream 0.2.1",
1466+
"async-trait",
14621467
"bytes 0.6.0",
14631468
"futures 0.3.5",
14641469
"indexmap",

linkerd/app/inbound/src/lib.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,10 @@ impl Config {
317317
where
318318
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Unpin + Send + 'static,
319319
F: svc::NewService<TcpEndpoint, Service = A> + Unpin + Clone + Send + 'static,
320-
A: tower::Service<io::PrefixedIo<I>, Response = ()> + Clone + Send + 'static,
320+
A: tower::Service<io::PrefixedIo<I>, Response = ()> + Clone + Send + Sync + 'static,
321321
A::Error: Into<Error>,
322322
A::Future: Send,
323-
H: svc::NewService<Target, Service = S> + Unpin + Clone + Send + 'static,
323+
H: svc::NewService<Target, Service = S> + Unpin + Clone + Send + Sync + 'static,
324324
S: tower::Service<
325325
http::Request<http::boxed::BoxBody>,
326326
Response = http::Response<http::boxed::BoxBody>,
@@ -335,7 +335,7 @@ impl Config {
335335
dispatch_timeout,
336336
max_in_flight_requests,
337337
detect_protocol_timeout,
338-
buffer_capacity,
338+
cache_max_idle_age,
339339
..
340340
} = self.proxy.clone();
341341

@@ -389,10 +389,12 @@ impl Config {
389389
.into_inner(),
390390
drain.clone(),
391391
))
392-
.push_on_response(svc::layers().push_spawn_buffer(buffer_capacity).push(
393-
transport::Prefix::layer(
394-
http::Version::DETECT_BUFFER_CAPACITY,
392+
.check_new_clone::<(Option<http::Version>, TcpAccept)>()
393+
.push_cache(cache_max_idle_age)
394+
.push(transport::NewDetectService::layer(
395+
transport::detect::DetectTimeout::new(
395396
detect_protocol_timeout,
397+
http::DetectHttp::default(),
396398
),
397399
))
398400
.into_inner()

linkerd/app/integration/src/tests/identity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async fn nonblocking_identity_detection() {
2525
.await;
2626
let proxy = proxy::new().identity(id_svc);
2727

28-
let msg1 = "custom tcp hello";
28+
let msg1 = "custom tcp hello\n";
2929
let msg2 = "custom tcp bye";
3030
let srv = server::tcp()
3131
.accept(move |read| {

linkerd/app/integration/src/tests/shutdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ async fn tcp_waits_for_proxies_to_close() {
100100
let _trace = trace_init();
101101

102102
let (shdn, rx) = shutdown_signal();
103-
let msg1 = "custom tcp hello";
103+
let msg1 = "custom tcp hello\n";
104104
let msg2 = "custom tcp bye";
105105

106106
let srv = server::tcp()

linkerd/app/integration/src/tests/telemetry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl Fixture {
7676
}
7777

7878
impl TcpFixture {
79-
const HELLO_MSG: &'static str = "custom tcp hello";
79+
const HELLO_MSG: &'static str = "custom tcp hello\n";
8080
const BYE_MSG: &'static str = "custom tcp bye";
8181

8282
async fn server() -> server::Listening {

linkerd/app/integration/src/tests/transparency.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async fn inbound_http1() {
5050
async fn outbound_tcp() {
5151
let _trace = trace_init();
5252

53-
let msg1 = "custom tcp hello";
53+
let msg1 = "custom tcp hello\n";
5454
let msg2 = "custom tcp bye";
5555

5656
let srv = server::tcp()
@@ -88,7 +88,7 @@ async fn outbound_tcp() {
8888
async fn outbound_tcp_external() {
8989
let _trace = trace_init();
9090

91-
let msg1 = "custom tcp hello";
91+
let msg1 = "custom tcp hello\n";
9292
let msg2 = "custom tcp bye";
9393

9494
let srv = server::tcp()
@@ -127,7 +127,7 @@ async fn outbound_tcp_external() {
127127
async fn inbound_tcp() {
128128
let _trace = trace_init();
129129

130-
let msg1 = "custom tcp hello";
130+
let msg1 = "custom tcp hello\n";
131131
let msg2 = "custom tcp bye";
132132

133133
let srv = server::tcp()
@@ -296,7 +296,7 @@ async fn tcp_server_first_tls() {
296296
async fn tcp_connections_close_if_client_closes() {
297297
let _trace = trace_init();
298298

299-
let msg1 = "custom tcp hello";
299+
let msg1 = "custom tcp hello\n";
300300
let msg2 = "custom tcp bye";
301301

302302
let (mut tx, mut rx) = mpsc::channel(1);

linkerd/app/outbound/src/ingress.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ where
4646
Error = Error,
4747
> + Clone
4848
+ Send
49+
+ Sync
4950
+ 'static,
5051
TSvc::Future: Send,
5152
H: svc::NewService<http::Logical, Service = HSvc> + Unpin + Clone + Send + Sync + 'static,
@@ -130,11 +131,13 @@ where
130131
.into_inner();
131132

132133
svc::stack(http::NewServeHttp::new(h2_settings, http, tcp, drain))
133-
.check_new_service::<tcp::Accept, io::PrefixedIo<transport::metrics::SensorIo<I>>>()
134-
.push_on_response(svc::layers().push_spawn_buffer(buffer_capacity).push(
135-
transport::Prefix::layer(
136-
http::Version::DETECT_BUFFER_CAPACITY,
134+
.check_new_service::<(Option<http::Version>, tcp::Accept), io::PrefixedIo<transport::metrics::SensorIo<I>>>()
135+
.check_new_clone::<(Option<http::Version>, tcp::Accept)>()
136+
.push_cache(cache_max_idle_age)
137+
.push(transport::NewDetectService::layer(
138+
transport::detect::DetectTimeout::new(
137139
detect_protocol_timeout,
140+
http::DetectHttp::default(),
138141
),
139142
))
140143
.check_new_service::<tcp::Accept, transport::metrics::SensorIo<I>>()

linkerd/app/outbound/src/server.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ where
156156
max_in_flight_requests,
157157
detect_protocol_timeout,
158158
buffer_capacity,
159+
cache_max_idle_age,
159160
..
160161
} = config.proxy.clone();
161162

@@ -222,11 +223,13 @@ where
222223
tcp_balance,
223224
drain.clone(),
224225
))
225-
.check_new_service::<tcp::Logical, transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
226-
.push_on_response(svc::layers().push_spawn_buffer(buffer_capacity).push(
227-
transport::Prefix::layer(
228-
http::Version::DETECT_BUFFER_CAPACITY,
226+
.check_new_clone::<(Option<http::Version>, tcp::Logical)>()
227+
.check_new_service::<(Option<http::Version>, tcp::Logical), transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
228+
.push_cache(cache_max_idle_age)
229+
.push(transport::NewDetectService::layer(
230+
transport::detect::DetectTimeout::new(
229231
detect_protocol_timeout,
232+
http::DetectHttp::default(),
230233
),
231234
))
232235
.check_new_service::<tcp::Logical, transport::metrics::SensorIo<I>>()

linkerd/app/outbound/src/target.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,21 @@ impl<P> Logical<P> {
127127
}
128128
}
129129

130+
impl<P: PartialEq> PartialEq<Logical<P>> for Logical<P> {
131+
fn eq(&self, other: &Logical<P>) -> bool {
132+
self.orig_dst == other.orig_dst && self.protocol == other.protocol
133+
}
134+
}
135+
136+
impl<P: Eq> Eq for Logical<P> {}
137+
138+
impl<P: std::hash::Hash> std::hash::Hash for Logical<P> {
139+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
140+
self.orig_dst.hash(state);
141+
self.protocol.hash(state);
142+
}
143+
}
144+
130145
impl<P: std::fmt::Debug> std::fmt::Debug for Logical<P> {
131146
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132147
f.debug_struct("Logical")

linkerd/app/outbound/src/tcp/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -865,7 +865,7 @@ where
865865
svc
866866
};
867867
async move {
868-
let io = support::io().read(b"hello\r\n").write(b"world").build();
868+
let io = support::io().read(b"hello\n").write(b"world").build();
869869
let res = svc.oneshot(io).err_into::<Error>().await;
870870
tracing::trace!(?res);
871871
if let Err(err) = res {

0 commit comments

Comments
 (0)