Skip to content

Commit a233e1a

Browse files
authored
Introduce a SkipDetect layer to preempt detection (#620)
This change introduces a new SkipDetect layer that configures whether protocol detection should be attempted. This module will later be replaced/augmented by discovery. Furthermore, this change eliminates the `Accept` trait. Instead of modeling the accept stack as a simple service whose response is a future, we know model the stack as, effectively, a `MakeService<Meta, TcpStream>`. This is intended to support caching of the service that handles the tcp stream (i.e. to hold discovery responses). Detection timeouts have been moved from a dedicated layer into the detection modules.
1 parent 20126df commit a233e1a

File tree

32 files changed

+791
-816
lines changed

32 files changed

+791
-816
lines changed

Cargo.lock

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,6 @@ dependencies = [
826826
"linkerd2-proxy-api",
827827
"linkerd2-proxy-api-resolve",
828828
"linkerd2-proxy-core",
829-
"linkerd2-proxy-detect",
830829
"linkerd2-proxy-discover",
831830
"linkerd2-proxy-http",
832831
"linkerd2-proxy-identity",
@@ -1240,19 +1239,6 @@ dependencies = [
12401239
"tracing-futures",
12411240
]
12421241

1243-
[[package]]
1244-
name = "linkerd2-proxy-detect"
1245-
version = "0.1.0"
1246-
dependencies = [
1247-
"async-trait",
1248-
"futures 0.3.5",
1249-
"linkerd2-error",
1250-
"linkerd2-io",
1251-
"linkerd2-proxy-core",
1252-
"tokio",
1253-
"tower",
1254-
]
1255-
12561242
[[package]]
12571243
name = "linkerd2-proxy-discover"
12581244
version = "0.1.0"
@@ -1274,7 +1260,6 @@ dependencies = [
12741260
name = "linkerd2-proxy-http"
12751261
version = "0.1.0"
12761262
dependencies = [
1277-
"async-trait",
12781263
"bytes 0.5.4",
12791264
"futures 0.3.5",
12801265
"h2 0.2.6",
@@ -1291,8 +1276,7 @@ dependencies = [
12911276
"linkerd2-error",
12921277
"linkerd2-http-box",
12931278
"linkerd2-identity",
1294-
"linkerd2-proxy-core",
1295-
"linkerd2-proxy-detect",
1279+
"linkerd2-io",
12961280
"linkerd2-proxy-transport",
12971281
"linkerd2-stack",
12981282
"linkerd2-timeout",
@@ -1394,7 +1378,6 @@ dependencies = [
13941378
"linkerd2-io",
13951379
"linkerd2-metrics",
13961380
"linkerd2-proxy-core",
1397-
"linkerd2-proxy-detect",
13981381
"linkerd2-stack",
13991382
"pin-project",
14001383
"ring",

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ members = [
3232
"linkerd/opencensus",
3333
"linkerd/proxy/api-resolve",
3434
"linkerd/proxy/core",
35-
"linkerd/proxy/detect",
3635
"linkerd/proxy/discover",
3736
"linkerd/proxy/http",
3837
"linkerd/proxy/identity",
@@ -63,4 +62,4 @@ debug = false
6362

6463
[patch.crates-io]
6564
webpki = { git = "https://github.com/linkerd/webpki", branch = "cert-dns-names-0.21" }
66-
tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"}
65+
tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"}

linkerd/app/core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ linkerd2-opencensus = { path = "../../opencensus" }
4343
linkerd2-proxy-core = { path = "../../proxy/core" }
4444
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13" }
4545
linkerd2-proxy-api-resolve = { path = "../../proxy/api-resolve" }
46-
linkerd2-proxy-detect = { path = "../../proxy/detect" }
4746
linkerd2-proxy-discover = { path = "../../proxy/discover" }
4847
linkerd2-proxy-identity = { path = "../../proxy/identity" }
4948
linkerd2-proxy-http = { path = "../../proxy/http" }

linkerd/app/core/src/accept_error.rs

Lines changed: 0 additions & 59 deletions
This file was deleted.

linkerd/app/core/src/admin/mod.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,21 @@
33
//! * `/metrics` -- reports prometheus-formatted metrics.
44
//! * `/ready` -- returns 200 when the proxy is ready to participate in meshed traffic.
55
6-
use crate::{svc, trace, transport::tls::accept::Connection};
6+
use crate::{
7+
svc, trace,
8+
transport::{io, tls},
9+
};
710
use futures::{future, TryFutureExt};
811
use http::StatusCode;
912
use hyper::{Body, Request, Response};
10-
use linkerd2_error::Error;
13+
use linkerd2_error::{Error, Never};
1114
use linkerd2_metrics::{self as metrics, FmtMetrics};
12-
use std::future::Future;
13-
use std::io;
14-
use std::pin::Pin;
15-
use std::task::{Context, Poll};
16-
use tower::{service_fn, Service};
15+
use std::{
16+
future::Future,
17+
pin::Pin,
18+
task::{Context, Poll},
19+
};
20+
use tower::{service_fn, util::ServiceExt, Service};
1721

1822
mod readiness;
1923
mod tasks;
@@ -33,6 +37,9 @@ pub struct Admin<M: FmtMetrics> {
3337
#[derive(Debug, Clone)]
3438
pub struct Accept<M: FmtMetrics>(Admin<M>, hyper::server::conn::Http);
3539

40+
#[derive(Debug, Clone)]
41+
pub struct Serve<M: FmtMetrics>(tls::accept::Meta, Accept<M>);
42+
3643
#[derive(Clone, Debug)]
3744
pub struct ClientAddr(std::net::SocketAddr);
3845

@@ -103,28 +110,43 @@ impl<M: FmtMetrics> Service<Request<Body>> for Admin<M> {
103110
}
104111
}
105112

106-
impl<M: FmtMetrics + Clone + Send + 'static> svc::Service<Connection> for Accept<M> {
107-
type Response = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'static>>;
108-
type Error = Error;
113+
impl<M: FmtMetrics + Clone + Send + 'static> svc::Service<tls::accept::Meta> for Accept<M> {
114+
type Response = Serve<M>;
115+
type Error = Never;
109116
type Future = future::Ready<Result<Self::Response, Self::Error>>;
110117

111118
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
112119
Poll::Ready(Ok(()))
113120
}
114121

115-
fn call(&mut self, (meta, io): Connection) -> Self::Future {
122+
fn call(&mut self, meta: tls::accept::Meta) -> Self::Future {
123+
future::ok(Serve(meta, self.clone()))
124+
}
125+
}
126+
127+
impl<M: FmtMetrics + Clone + Send + 'static> svc::Service<io::BoxedIo> for Serve<M> {
128+
type Response = ();
129+
type Error = Error;
130+
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
131+
132+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133+
Poll::Ready(Ok(()))
134+
}
135+
136+
fn call(&mut self, io: io::BoxedIo) -> Self::Future {
137+
let Self(ref meta, Accept(ref svc, ref server)) = self;
138+
116139
// Since the `/proxy-log-level` controls access based on the
117140
// client's IP address, we wrap the service with a new service
118141
// that adds the remote IP as a request extension.
119142
let peer = meta.addrs.peer();
120-
let mut svc = self.0.clone();
143+
let svc = svc.clone();
121144
let svc = service_fn(move |mut req: Request<Body>| {
122145
req.extensions_mut().insert(ClientAddr(peer));
123-
svc.call(req)
146+
svc.clone().oneshot(req)
124147
});
125148

126-
let connection_future = self.1.serve_connection(io, svc).map_err(Into::into);
127-
future::ok(Box::pin(connection_future))
149+
Box::pin(server.serve_connection(io, svc).map_err(Into::into))
128150
}
129151
}
130152

linkerd/app/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ pub use linkerd2_stack_metrics as stack_metrics;
2727
pub use linkerd2_stack_tracing as stack_tracing;
2828
pub use linkerd2_trace_context::TraceContextLayer;
2929

30-
pub mod accept_error;
3130
pub mod admin;
3231
pub mod classify;
3332
pub mod config;

linkerd/app/core/src/proxy/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
33
pub use linkerd2_proxy_api_resolve as api_resolve;
44
pub use linkerd2_proxy_core as core;
5-
pub use linkerd2_proxy_detect as detect;
65
pub use linkerd2_proxy_discover as discover;
76
pub use linkerd2_proxy_http as http;
87
pub use linkerd2_proxy_identity as identity;
98
pub use linkerd2_proxy_resolve as resolve;
109
pub use linkerd2_proxy_tap as tap;
1110
pub use linkerd2_proxy_tcp as tcp;
11+
12+
mod skip_detect;
13+
14+
pub use self::skip_detect::SkipDetect;
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use futures::prelude::*;
2+
use indexmap::IndexSet;
3+
use linkerd2_error::Error;
4+
use linkerd2_proxy_transport::listen::Addrs;
5+
use std::{
6+
future::Future,
7+
pin::Pin,
8+
sync::Arc,
9+
task::{Context, Poll},
10+
};
11+
use tower::util::ServiceExt;
12+
13+
pub trait SkipTarget<T> {
14+
fn skip_target(&self, target: &T) -> bool;
15+
}
16+
17+
impl SkipTarget<Addrs> for Arc<IndexSet<u16>> {
18+
fn skip_target(&self, addrs: &Addrs) -> bool {
19+
self.contains(&addrs.target_addr().port())
20+
}
21+
}
22+
23+
pub struct SkipDetect<S, D, F> {
24+
skip: S,
25+
detect: D,
26+
tcp: F,
27+
}
28+
29+
pub enum Accept<D, F> {
30+
Detect(D),
31+
Tcp(F),
32+
}
33+
34+
impl<S, D, F> SkipDetect<S, D, F> {
35+
pub fn new(skip: S, detect: D, tcp: F) -> Self {
36+
Self { skip, detect, tcp }
37+
}
38+
}
39+
40+
impl<S, D, F> tower::Service<Addrs> for SkipDetect<S, D, F>
41+
where
42+
S: SkipTarget<Addrs>,
43+
D: tower::Service<Addrs> + Clone + Send + 'static,
44+
D::Error: Into<Error>,
45+
D::Future: Send,
46+
F: tower::Service<Addrs> + Clone + Send + 'static,
47+
F::Error: Into<Error>,
48+
F::Future: Send,
49+
{
50+
type Response = Accept<D::Response, F::Response>;
51+
type Error = Error;
52+
type Future = Pin<
53+
Box<dyn Future<Output = Result<Accept<D::Response, F::Response>, Error>> + Send + 'static>,
54+
>;
55+
56+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Error>> {
57+
Poll::Ready(Ok(()))
58+
}
59+
60+
fn call(&mut self, addrs: Addrs) -> Self::Future {
61+
if self.skip.skip_target(&addrs) {
62+
let tcp = self.tcp.clone();
63+
Box::pin(async move {
64+
let f = tcp.oneshot(addrs).err_into::<Error>().await?;
65+
Ok(Accept::Tcp(f))
66+
})
67+
} else {
68+
let detect = self.detect.clone();
69+
Box::pin(async move {
70+
let d = detect.oneshot(addrs).err_into::<Error>().await?;
71+
Ok(Accept::Detect(d))
72+
})
73+
}
74+
}
75+
}
76+
77+
impl<D, F, T> tower::Service<T> for Accept<D, F>
78+
where
79+
D: tower::Service<T, Response = ()>,
80+
D::Error: Into<Error>,
81+
D::Future: Send + 'static,
82+
F: tower::Service<T, Response = ()>,
83+
F::Error: Into<Error>,
84+
F::Future: Send + 'static,
85+
{
86+
type Response = ();
87+
type Error = Error;
88+
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
89+
90+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
91+
Poll::Ready(match self {
92+
Self::Detect(d) => futures::ready!(d.poll_ready(cx)).map_err(Into::into),
93+
Self::Tcp(f) => futures::ready!(f.poll_ready(cx)).map_err(Into::into),
94+
})
95+
}
96+
97+
fn call(&mut self, io: T) -> Self::Future {
98+
match self {
99+
Self::Detect(d) => Box::pin(d.call(io).err_into::<Error>()),
100+
Self::Tcp(f) => Box::pin(f.call(io).err_into::<Error>()),
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)