Skip to content

Commit 0ddf3af

Browse files
authored
Split linkerd-detect from linkerd-proxy-transport (#841)
The `linkerd2_proxy_transport::detect` module has no dependencies on other transport code. This chagne simplifies dependencies by moving the protocol detection infrastructure to a dedicated crate. We also provide a `NewDetectService::timeout` helper to reduce boilerplate in stack constructors.
1 parent 0476348 commit 0ddf3af

File tree

16 files changed

+101
-60
lines changed

16 files changed

+101
-60
lines changed

Cargo.lock

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,7 @@ dependencies = [
672672
"linkerd-cache",
673673
"linkerd-concurrency-limit",
674674
"linkerd-conditional",
675+
"linkerd-detect",
675676
"linkerd-dns",
676677
"linkerd-drain",
677678
"linkerd-duplex",
@@ -883,6 +884,21 @@ dependencies = [
883884
name = "linkerd-conditional"
884885
version = "0.1.0"
885886

887+
[[package]]
888+
name = "linkerd-detect"
889+
version = "0.1.0"
890+
dependencies = [
891+
"async-trait",
892+
"bytes",
893+
"futures",
894+
"linkerd-error",
895+
"linkerd-io",
896+
"linkerd-stack",
897+
"tokio",
898+
"tower",
899+
"tracing",
900+
]
901+
886902
[[package]]
887903
name = "linkerd-dns"
888904
version = "0.1.0"
@@ -1148,6 +1164,7 @@ dependencies = [
11481164
"hyper",
11491165
"hyper-balance",
11501166
"indexmap",
1167+
"linkerd-detect",
11511168
"linkerd-drain",
11521169
"linkerd-duplex",
11531170
"linkerd-error",
@@ -1244,7 +1261,6 @@ name = "linkerd-proxy-transport"
12441261
version = "0.1.0"
12451262
dependencies = [
12461263
"async-stream",
1247-
"async-trait",
12481264
"bytes",
12491265
"futures",
12501266
"libc",
@@ -1253,7 +1269,6 @@ dependencies = [
12531269
"linkerd-io",
12541270
"linkerd-metrics",
12551271
"linkerd-stack",
1256-
"linkerd-tls",
12571272
"pin-project 1.0.2",
12581273
"socket2",
12591274
"tokio",
@@ -1438,10 +1453,10 @@ version = "0.1.0"
14381453
dependencies = [
14391454
"async-trait",
14401455
"bytes",
1456+
"linkerd-detect",
14411457
"linkerd-dns-name",
14421458
"linkerd-error",
14431459
"linkerd-io",
1444-
"linkerd-proxy-transport",
14451460
"prost",
14461461
"prost-build",
14471462
"tokio",

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ members = [
1414
"linkerd/channel",
1515
"linkerd/concurrency-limit",
1616
"linkerd/conditional",
17+
"linkerd/detect",
1718
"linkerd/transport-header",
1819
"linkerd/dns/name",
1920
"linkerd/dns",

linkerd/app/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ linkerd-concurrency-limit = { path = "../../concurrency-limit" }
3030
linkerd-conditional = { path = "../../conditional" }
3131
linkerd-dns = { path = "../../dns" }
3232
linkerd-drain = { path = "../../drain", features = ["retain"] }
33+
linkerd-detect = { path = "../../detect" }
3334
linkerd-duplex = { path = "../../duplex" }
3435
linkerd-errno = { path = "../../errno" }
3536
linkerd-error = { path = "../../error" }

linkerd/app/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
pub use linkerd_addr::{self as addr, Addr, NameAddr};
1313
pub use linkerd_cache as cache;
1414
pub use linkerd_conditional::Conditional;
15+
pub use linkerd_detect as detect;
1516
pub use linkerd_dns;
1617
pub use linkerd_drain as drain;
1718
pub use linkerd_error::{Error, Never, Recover};

linkerd/app/inbound/src/lib.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use self::require_identity_for_ports::RequireIdentityForPorts;
1414
use linkerd_app_core::{
1515
classify,
1616
config::{ConnectConfig, ProxyConfig, ServerConfig},
17-
drain, dst, errors, io, metrics,
17+
detect, drain, dst, errors, io, metrics,
1818
opencensus::proto::trace::v1 as oc,
1919
profiles,
2020
proxy::{
@@ -327,11 +327,9 @@ impl Config {
327327
.push(svc::NewUnwrapOr::layer(
328328
svc::Fail::<_, NonOpaqueRefused>::default(),
329329
))
330-
.push(transport::NewDetectService::layer(
331-
transport::detect::DetectTimeout::new(
332-
self.proxy.detect_protocol_timeout,
333-
transport_header::DetectHeader::default(),
334-
),
330+
.push(detect::NewDetectService::timeout(
331+
self.proxy.detect_protocol_timeout,
332+
transport_header::DetectHeader::default(),
335333
)),
336334
)
337335
.into_inner();
@@ -373,11 +371,9 @@ impl Config {
373371
.push(http::NewServeHttp::layer(h2_settings, drain))
374372
.push(svc::NewUnwrapOr::layer(tcp))
375373
.push_cache(cache_max_idle_age)
376-
.push(transport::NewDetectService::layer(
377-
transport::detect::DetectTimeout::new(
378-
detect_protocol_timeout,
379-
http::DetectHttp::default(),
380-
),
374+
.push(detect::NewDetectService::timeout(
375+
detect_protocol_timeout,
376+
http::DetectHttp::default(),
381377
))
382378
.into_inner()
383379
}

linkerd/app/outbound/src/ingress.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{http, stack_labels, tcp, trace_labels, Config};
22
use linkerd_app_core::{
33
config::{ProxyConfig, ServerConfig},
4-
discovery_rejected, drain, errors, http_request_l5d_override_dst_addr, io, metrics,
4+
detect, discovery_rejected, drain, errors, http_request_l5d_override_dst_addr, io, metrics,
55
opencensus::proto::trace::v1 as oc,
66
profiles,
77
spans::SpanConverter,
@@ -119,11 +119,9 @@ where
119119
.push(http::NewServeHttp::layer(h2_settings, drain))
120120
.push(svc::NewUnwrapOr::layer(tcp))
121121
.push_cache(cache_max_idle_age)
122-
.push(transport::NewDetectService::layer(
123-
transport::detect::DetectTimeout::new(
124-
detect_protocol_timeout,
125-
http::DetectHttp::default(),
126-
),
122+
.push(detect::NewDetectService::timeout(
123+
detect_protocol_timeout,
124+
http::DetectHttp::default(),
127125
))
128126
.check_new_service::<tcp::Accept, transport::metrics::SensorIo<I>>()
129127
.push(metrics.transport.layer_accept())

linkerd/app/outbound/src/server.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
use crate::{http, stack_labels, tcp, trace_labels, Config};
44
use linkerd_app_core::{
55
config::{ProxyConfig, ServerConfig},
6-
discovery_rejected, drain, errors, io, metrics,
6+
detect, discovery_rejected, drain, errors, io, metrics,
77
opencensus::proto::trace::v1 as oc,
88
profiles,
99
proxy::{api_resolve::Metadata, core::resolve::Resolve},
1010
spans::SpanConverter,
1111
svc, tls,
12-
transport::{self, listen, metrics::SensorIo},
12+
transport::{listen, metrics::SensorIo},
1313
Addr, Error, IpMatch, TraceContext,
1414
};
1515
use std::net::SocketAddr;
@@ -195,11 +195,9 @@ where
195195
.into_inner(),
196196
))
197197
.push_cache(cache_max_idle_age)
198-
.push(transport::NewDetectService::layer(
199-
transport::detect::DetectTimeout::new(
200-
detect_protocol_timeout,
201-
http::DetectHttp::default(),
202-
),
198+
.push(detect::NewDetectService::timeout(
199+
detect_protocol_timeout,
200+
http::DetectHttp::default(),
203201
))
204202
.push_switch(
205203
SkipByProfile,

linkerd/detect/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "linkerd-detect"
3+
version = "0.1.0"
4+
authors = ["Linkerd Developers <[email protected]>"]
5+
license = "Apache-2.0"
6+
edition = "2018"
7+
publish = false
8+
9+
[dependencies]
10+
async-trait = "0.1"
11+
bytes = "1"
12+
futures = "0.3.9"
13+
linkerd-error = { path = "../error" }
14+
linkerd-io = { path = "../io" }
15+
linkerd-stack = { path = "../stack" }
16+
tokio = { version = "1", features = ["time"] }
17+
tower = "0.4"
18+
tracing = "0.1.22"

linkerd/proxy/transport/src/detect/mod.rs renamed to linkerd/detect/src/lib.rs

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![deny(warnings, rust_2018_idioms)]
2+
13
mod timeout;
24

35
pub use self::timeout::{DetectTimeout, DetectTimeoutError};
@@ -26,55 +28,70 @@ pub trait Detect: Clone + Send + Sync + 'static {
2628
) -> Result<Option<Self::Protocol>, Error>;
2729
}
2830

29-
#[derive(Copy, Clone)]
30-
pub struct NewDetectService<N, D> {
31-
new_accept: N,
31+
#[derive(Copy, Clone, Debug)]
32+
pub struct NewDetectService<D, N> {
33+
inner: N,
3234
detect: D,
3335
capacity: usize,
3436
}
3537

36-
#[derive(Copy, Clone)]
37-
pub struct DetectService<N, D, T> {
38+
#[derive(Copy, Clone, Debug)]
39+
pub struct DetectService<T, D, N> {
3840
target: T,
39-
new_accept: N,
41+
inner: N,
4042
detect: D,
4143
capacity: usize,
4244
}
4345

44-
// === impl NewDetectService ===
46+
const BUFFER_CAPACITY: usize = 1024;
4547

46-
impl<N, D: Clone> NewDetectService<N, D> {
47-
const BUFFER_CAPACITY: usize = 1024;
48+
// === impl NewDetectService ===
4849

49-
pub fn new(new_accept: N, detect: D) -> Self {
50+
impl<D: Clone, N> NewDetectService<D, N> {
51+
pub fn new(detect: D, inner: N) -> Self {
5052
Self {
5153
detect,
52-
new_accept,
53-
capacity: Self::BUFFER_CAPACITY,
54+
inner,
55+
capacity: BUFFER_CAPACITY,
5456
}
5557
}
5658

5759
pub fn layer(detect: D) -> impl layer::Layer<N, Service = Self> + Clone {
58-
layer::mk(move |new| Self::new(new, detect.clone()))
60+
layer::mk(move |inner| Self::new(detect.clone(), inner))
61+
}
62+
}
63+
64+
impl<D: Clone, N> NewDetectService<DetectTimeout<D>, N> {
65+
pub fn timeout(
66+
timeout: time::Duration,
67+
detect: D,
68+
) -> impl layer::Layer<N, Service = NewDetectService<DetectTimeout<D>, N>> + Clone {
69+
Self::layer(DetectTimeout::new(timeout, detect))
70+
}
71+
}
72+
73+
impl<D: Clone, N: Clone, T> NewService<T> for NewDetectService<D, N> {
74+
type Service = DetectService<T, D, N>;
75+
76+
fn new_service(&mut self, target: T) -> DetectService<T, D, N> {
77+
DetectService::new(target, self.detect.clone(), self.inner.clone())
5978
}
6079
}
6180

62-
impl<N: Clone, D: Clone, T> NewService<T> for NewDetectService<N, D> {
63-
type Service = DetectService<N, D, T>;
81+
// === impl DetectService ===
6482

65-
fn new_service(&mut self, target: T) -> DetectService<N, D, T> {
83+
impl<T, D: Clone, N: Clone> DetectService<T, D, N> {
84+
pub fn new(target: T, detect: D, inner: N) -> Self {
6685
DetectService {
6786
target,
68-
new_accept: self.new_accept.clone(),
69-
detect: self.detect.clone(),
70-
capacity: self.capacity,
87+
detect,
88+
inner,
89+
capacity: BUFFER_CAPACITY,
7190
}
7291
}
7392
}
7493

75-
// === impl DetectService ===
76-
77-
impl<N, S, D, T, I> tower::Service<I> for DetectService<N, D, T>
94+
impl<S, T, D, N, I> tower::Service<I> for DetectService<T, D, N>
7895
where
7996
T: Clone + Send + 'static,
8097
I: io::AsyncRead + Send + Unpin + 'static,
@@ -94,7 +111,7 @@ where
94111
}
95112

96113
fn call(&mut self, mut io: I) -> Self::Future {
97-
let mut new_accept = self.new_accept.clone();
114+
let mut inner = self.inner.clone();
98115
let mut buf = BytesMut::with_capacity(self.capacity);
99116
let detect = self.detect.clone();
100117
let target = self.target.clone();
@@ -108,7 +125,7 @@ where
108125
"Detected"
109126
);
110127

111-
let mut accept = new_accept
128+
let mut accept = inner
112129
.new_service((protocol, target))
113130
.ready_oneshot()
114131
.err_into::<Error>()
File renamed without changes.

0 commit comments

Comments
 (0)