Skip to content

Commit 2eb1671

Browse files
authored
Split the inbound server into multiple modules (#1179)
The `Inbound::push_server` stack builder is responsible for building all of the inbound stacks: the HTTP stack, the TCP forwarding stack, and the direct/gateway stack. Furthermore, most of these stacks rely on fixed target types (which makes them inflexible). This change refactors the inbound stack to setup for upcoming policy changes: 1. A 'detect' stack is split out into a `push_detect` helper that only deals with protocol detection and dispatching connections to either the TCP forwarding stack or the HTTP handling stack. 2. An 'accept' stack is split out into a `push_accept` helper that is responsible for extracting relevant connection metadata (client and orig dst addrs) and determining the policy to use for the connection. 3. Each stack/module defines its own target types and other modules rely on param constraints instead of fixed target types. 4. A `port_policies` module is introduced to model the existing per-port policies: whether identity is required and whether protocol detection should be skipped. Various auxiliary modules have been eliminated in favor of inlined target filters. This helps to make these stacks easier to reason about, as relevant logic isn't spread over multiple files.
1 parent 79de26d commit 2eb1671

File tree

23 files changed

+1491
-1145
lines changed

23 files changed

+1491
-1145
lines changed

linkerd/app/admin/src/server/mod.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use hyper::{
1919
use linkerd_app_core::{
2020
metrics::{self as metrics, FmtMetrics},
2121
proxy::http::ClientHandle,
22-
svc, trace, Error,
22+
trace, Error,
2323
};
2424
use std::{
2525
future::Future,
@@ -150,13 +150,6 @@ impl<M> Admin<M> {
150150
}
151151
}
152152

153-
impl<M: FmtMetrics + Clone, T> svc::NewService<T> for Admin<M> {
154-
type Service = Self;
155-
fn new_service(&mut self, _: T) -> Self::Service {
156-
self.clone()
157-
}
158-
}
159-
160153
impl<M, B> tower::Service<http::Request<B>> for Admin<M>
161154
where
162155
M: FmtMetrics,

linkerd/app/admin/src/stack.rs

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ use linkerd_app_core::{
77
serve,
88
svc::{self, ExtractParam, InsertParam, Param},
99
tls, trace,
10-
transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr},
10+
transport::{self, listen::Bind, ClientAddr, Local, Remote, ServerAddr},
1111
Error,
1212
};
13-
use linkerd_app_inbound::target::{HttpAccept, Target, TcpAccept};
1413
use std::{pin::Pin, time::Duration};
1514
use thiserror::Error;
1615
use tokio::sync::mpsc;
@@ -36,6 +35,19 @@ struct NonHttpClient(Remote<ClientAddr>);
3635
#[error("Unexpected TLS connection to {} from {}", self.0, self.1)]
3736
struct UnexpectedSni(tls::ServerId, Remote<ClientAddr>);
3837

38+
#[derive(Clone, Debug)]
39+
struct Tcp {
40+
addr: Local<ServerAddr>,
41+
client: Remote<ClientAddr>,
42+
tls: tls::ConditionalServerTls,
43+
}
44+
45+
#[derive(Clone, Debug)]
46+
struct Http {
47+
tcp: Tcp,
48+
version: http::Version,
49+
}
50+
3951
#[derive(Clone)]
4052
struct TlsParams {
4153
identity: Option<LocalCrtKey>,
@@ -66,31 +78,29 @@ impl Config {
6678

6779
let (ready, latch) = crate::server::Readiness::new();
6880
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
69-
let admin = svc::stack(admin)
70-
.push(metrics.http_endpoint.to_layer::<classify::Response, _, Target>())
81+
let admin = svc::stack(move |_| admin.clone())
82+
.push(metrics.http_endpoint.to_layer::<classify::Response, _, Http>())
7183
.push_on_response(
7284
svc::layers()
7385
.push(metrics.http_errors.clone())
7486
.push(errors::layer())
7587
.push(http::BoxResponse::layer()),
7688
)
77-
.push_map_target(Target::from)
7889
.push(http::NewServeHttp::layer(Default::default(), drain.clone()))
7990
.push_request_filter(
80-
|(version, tcp): (
91+
|(http, tcp): (
8192
Result<Option<http::Version>, detect::DetectTimeoutError<_>>,
82-
TcpAccept,
93+
Tcp,
8394
)| {
84-
match version {
85-
Ok(Some(version)) => Ok(HttpAccept::from((version, tcp))),
86-
// If detection timed out, we can make an educated guess
87-
// at the proper behavior:
88-
// - If the connection was meshed, it was most likely
89-
// transported over HTTP/2.
90-
// - If the connection was unmehsed, it was mostly
91-
// likely HTTP/1.
92-
// - If we received some unexpected SNI, the client is
93-
// mostly likely confused/stale.
95+
match http {
96+
Ok(Some(version)) => Ok(Http { version, tcp }),
97+
// If detection timed out, we can make an educated guess at the proper
98+
// behavior:
99+
// - If the connection was meshed, it was most likely transported over
100+
// HTTP/2.
101+
// - If the connection was unmeshed, it was mostly likely HTTP/1.
102+
// - If we received some unexpected SNI, the client is mostly likely
103+
// confused/stale.
94104
Err(_timeout) => {
95105
let version = match tcp.tls.clone() {
96106
tls::ConditionalServerTls::None(_) => http::Version::Http1,
@@ -101,20 +111,19 @@ impl Config {
101111
sni,
102112
}) => {
103113
debug_assert!(false, "If we know the stream is non-mesh TLS, we should be able to prove its not HTTP.");
104-
return Err(Error::from(UnexpectedSni(sni, tcp.client_addr)));
114+
return Err(Error::from(UnexpectedSni(sni, tcp.client)));
105115
}
106116
};
107117
debug!(%version, "HTTP detection timed out; assuming HTTP");
108-
Ok(HttpAccept::from((version, tcp)))
118+
Ok(Http { version, tcp })
109119
}
110-
// If the connection failed HTTP detection, check if we
111-
// detected TLS for another target. This might indicate
112-
// that the client is confused/stale.
120+
// If the connection failed HTTP detection, check if we detected TLS for
121+
// another target. This might indicate that the client is confused/stale.
113122
Ok(None) => match tcp.tls {
114123
tls::ConditionalServerTls::Some(tls::ServerTls::Passthru { sni }) => {
115-
Err(UnexpectedSni(sni, tcp.client_addr).into())
124+
Err(UnexpectedSni(sni, tcp.client).into())
116125
}
117-
_ => Err(NonHttpClient(tcp.client_addr).into()),
126+
_ => Err(NonHttpClient(tcp.client).into()),
118127
},
119128
}
120129
},
@@ -123,12 +132,10 @@ impl Config {
123132
.push(detect::NewDetectService::layer(detect::Config::<http::DetectHttp>::from_timeout(DETECT_TIMEOUT)))
124133
.push(metrics.transport.layer_accept())
125134
.push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
126-
// TODO this should use an admin-specific target type.
127-
let Local(ServerAddr(target_addr)) = addrs.param();
128-
TcpAccept {
135+
Tcp {
129136
tls,
130-
client_addr: addrs.param(),
131-
target_addr,
137+
client: addrs.param(),
138+
addr: addrs.param(),
132139
}
133140
})
134141
.push(svc::BoxNewService::layer())
@@ -146,6 +153,37 @@ impl Config {
146153
}
147154
}
148155

156+
// === impl Tcp ===
157+
158+
impl Param<transport::labels::Key> for Tcp {
159+
fn param(&self) -> transport::labels::Key {
160+
transport::labels::Key::Accept {
161+
direction: transport::labels::Direction::In,
162+
tls: self.tls.clone(),
163+
target_addr: self.addr.into(),
164+
}
165+
}
166+
}
167+
168+
// === impl Http ===
169+
170+
impl Param<http::Version> for Http {
171+
fn param(&self) -> http::Version {
172+
self.version
173+
}
174+
}
175+
176+
impl Param<metrics::EndpointLabels> for Http {
177+
fn param(&self) -> metrics::EndpointLabels {
178+
metrics::InboundEndpointLabels {
179+
tls: self.tcp.tls.clone(),
180+
authority: None,
181+
target_addr: self.tcp.addr.into(),
182+
}
183+
.into()
184+
}
185+
}
186+
149187
// === TlsParams ===
150188

151189
impl<T> ExtractParam<tls::server::Timeout, T> for TlsParams {

linkerd/app/core/src/config.rs

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@ use crate::{
44
svc::Param,
55
transport::{Keepalive, ListenAddr},
66
};
7-
use std::{
8-
collections::HashSet,
9-
hash::{BuildHasherDefault, Hasher},
10-
time::Duration,
11-
};
7+
use std::time::Duration;
128

139
#[derive(Clone, Debug)]
1410
pub struct ServerConfig {
@@ -37,19 +33,6 @@ pub struct ProxyConfig {
3733
pub detect_protocol_timeout: Duration,
3834
}
3935

40-
/// A `HashSet` specialized for ports.
41-
///
42-
/// Because ports are `u16` values, this type avoids the overhead of actually
43-
/// hashing ports.
44-
pub type PortSet = HashSet<u16, BuildHasherDefault<PortHasher>>;
45-
46-
/// A hasher for ports.
47-
///
48-
/// Because ports are single `u16` values, we don't have to hash them; we can just use
49-
/// the integer values as hashes directly.
50-
#[derive(Default)]
51-
pub struct PortHasher(u16);
52-
5336
// === impl ProxyConfig ===
5437

5538
impl ProxyConfig {
@@ -71,43 +54,3 @@ impl Param<Keepalive> for ServerConfig {
7154
self.keepalive
7255
}
7356
}
74-
75-
// === impl PortHasher ===
76-
77-
impl Hasher for PortHasher {
78-
fn write(&mut self, _: &[u8]) {
79-
unreachable!("hashing a `u16` calls `write_u16`");
80-
}
81-
82-
#[inline]
83-
fn write_u16(&mut self, port: u16) {
84-
self.0 = port;
85-
}
86-
87-
#[inline]
88-
fn finish(&self) -> u64 {
89-
self.0 as u64
90-
}
91-
}
92-
93-
#[cfg(test)]
94-
mod tests {
95-
use super::*;
96-
use quickcheck::*;
97-
98-
quickcheck! {
99-
fn portset_contains_all_ports(ports: Vec<u16>) -> bool {
100-
// Make a port set containing the generated port numbers.
101-
let portset = ports.iter().cloned().collect::<PortSet>();
102-
for port in ports {
103-
// If the port set doesn't contain one of the ports it was
104-
// created with, that's bad news!
105-
if !portset.contains(&port) {
106-
return false;
107-
}
108-
}
109-
110-
true
111-
}
112-
}
113-
}

linkerd/app/core/src/dst.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
use super::classify;
22
use crate::profiles;
3-
use linkerd_addr::Addr;
43
use linkerd_http_classify::CanClassify;
54
use linkerd_proxy_http::timeout;
6-
use std::fmt;
75
use std::time::Duration;
86

97
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
108
pub struct Route {
11-
pub target: Addr,
9+
pub addr: profiles::LogicalAddr,
1210
pub route: profiles::http::Route,
1311
pub direction: super::metrics::Direction,
1412
}
@@ -28,9 +26,3 @@ impl timeout::HasTimeout for Route {
2826
self.route.timeout()
2927
}
3028
}
31-
32-
impl fmt::Display for Route {
33-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34-
self.target.fmt(f)
35-
}
36-
}

linkerd/app/core/src/metrics/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ mod tcp_accept_errors;
22

33
use crate::{
44
classify::{Class, SuccessOrFailure},
5-
control, dst, errors, http_metrics, http_metrics as metrics, opencensus, stack_metrics,
5+
control, dst, errors, http_metrics, http_metrics as metrics, opencensus, profiles,
6+
stack_metrics,
67
svc::Param,
78
telemetry, tls,
89
transport::{
@@ -86,7 +87,7 @@ pub struct StackLabels {
8687
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
8788
pub struct RouteLabels {
8889
direction: Direction,
89-
target: Addr,
90+
addr: profiles::LogicalAddr,
9091
labels: Option<String>,
9192
}
9293

@@ -232,7 +233,7 @@ impl FmtLabels for ControlLabels {
232233
impl Param<RouteLabels> for dst::Route {
233234
fn param(&self) -> RouteLabels {
234235
RouteLabels {
235-
target: self.target.clone(),
236+
addr: self.addr.clone(),
236237
direction: self.direction,
237238
labels: prefix_labels("rt", self.route.labels().iter()),
238239
}
@@ -242,7 +243,7 @@ impl Param<RouteLabels> for dst::Route {
242243
impl FmtLabels for RouteLabels {
243244
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244245
self.direction.fmt_labels(f)?;
245-
write!(f, ",dst=\"{}\"", self.target)?;
246+
write!(f, ",dst=\"{}\"", self.addr)?;
246247

247248
if let Some(labels) = self.labels.as_ref() {
248249
write!(f, ",{}", labels)?;
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#![no_main]
22

33
#[cfg(fuzzing)]
4-
use {libfuzzer_sys::fuzz_target, linkerd_app_inbound::http::fuzz_logic::*};
4+
use {libfuzzer_sys::fuzz_target, linkerd_app_inbound::http_fuzz};
55

66
#[cfg(fuzzing)]
7-
fuzz_target!(|requests: Vec<HttpRequestSpec>| {
7+
fuzz_target!(|requests: Vec<http_fuzz::HttpRequestSpec>| {
88
// Don't enable tracing in `cluster-fuzz`, since we would emit verbose
99
// traces for *every* generated fuzz input...
1010
let _trace = linkerd_tracing::test::with_default_filter("off");
@@ -15,5 +15,5 @@ fuzz_target!(|requests: Vec<HttpRequestSpec>| {
1515

1616
tokio::runtime::Runtime::new()
1717
.unwrap()
18-
.block_on(fuzz_entry_raw(requests));
18+
.block_on(http_fuzz::fuzz_entry_raw(requests));
1919
});

0 commit comments

Comments
 (0)