Skip to content

Commit f1be3a6

Browse files
authored
Transport opaque connections over mTLS (#785)
The proxy supports transporting "opaque" TCP streams, but it cannot do so with mTLS. Because we cannot perform protocol detection (including mTLS discovery) on server-first or otherwise idle TCP streams, we have no reliable way to instrument mTLS on these connections. This change leverages a new discovery API that annotates an `OpaqueTransport` hint for targets that are marked opaque but have a Linkerd proxy. This hint includes the inbound port of the target proxy and, when specified, configures the outbound proxy to: 1. Connect directly to the inbound port (instead of the original target port). 2. Write an "opaque transport header"--a special protocol marker and a length-delimited protobuf message including the original target port. This allows the inbound proxy to perform TLS discovery for these streams; and then the inbound proxy is able to route the connection to the proper application port as informed by the connection header. This change, effectively, wraps arbitrary, opaque TCP streams in a client-first TCP protocol so that inbound proxies can perform mTLS detection. This change does **not** extend the proxy's gateway mode to transport these connections. This change will be done in a followup.
1 parent 25cfa94 commit f1be3a6

File tree

31 files changed

+787
-86
lines changed

31 files changed

+787
-86
lines changed

Cargo.lock

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ dependencies = [
632632
[[package]]
633633
name = "http-body"
634634
version = "0.4.0"
635-
source = "git+https://github.com/hyperium/http-body#5e434739e747c0b6611ec41020740b17f735d25a"
635+
source = "git+https://github.com/hyperium/http-body?branch=master#5e434739e747c0b6611ec41020740b17f735d25a"
636636
dependencies = [
637637
"bytes 0.6.0",
638638
"http",
@@ -876,6 +876,7 @@ dependencies = [
876876
"linkerd2-http-classify",
877877
"linkerd2-http-metrics",
878878
"linkerd2-metrics",
879+
"linkerd2-opaque-transport",
879880
"linkerd2-opencensus",
880881
"linkerd2-proxy-api",
881882
"linkerd2-proxy-api-resolve",
@@ -988,9 +989,11 @@ dependencies = [
988989
"linkerd2-retry",
989990
"pin-project 0.4.22",
990991
"tokio 0.3.5",
992+
"tokio-test 0.3.0",
991993
"tower",
992994
"tracing",
993995
"tracing-futures",
996+
"tracing-subscriber",
994997
]
995998

996999
[[package]]
@@ -1249,6 +1252,23 @@ dependencies = [
12491252
"tracing",
12501253
]
12511254

1255+
[[package]]
1256+
name = "linkerd2-opaque-transport"
1257+
version = "0.1.0"
1258+
dependencies = [
1259+
"async-trait",
1260+
"bytes 0.6.0",
1261+
"linkerd2-dns-name",
1262+
"linkerd2-error",
1263+
"linkerd2-io",
1264+
"linkerd2-proxy-transport",
1265+
"prost",
1266+
"prost-build",
1267+
"tokio 0.3.5",
1268+
"tokio-test 0.3.0",
1269+
"tracing",
1270+
]
1271+
12521272
[[package]]
12531273
name = "linkerd2-opencensus"
12541274
version = "0.1.0"
@@ -1281,8 +1301,8 @@ dependencies = [
12811301

12821302
[[package]]
12831303
name = "linkerd2-proxy-api"
1284-
version = "0.1.15"
1285-
source = "git+https://github.com/linkerd/linkerd2-proxy-api?rev=259628840ba613c2e5673fc6a39b946e1b06f09a#259628840ba613c2e5673fc6a39b946e1b06f09a"
1304+
version = "0.1.16"
1305+
source = "git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.16#5d60650505652c804b589e575a9302e75e772ef1"
12861306
dependencies = [
12871307
"h2 0.2.6",
12881308
"http",
@@ -2028,7 +2048,7 @@ dependencies = [
20282048
[[package]]
20292049
name = "prost-derive"
20302050
version = "0.6.1"
2031-
source = "git+https://github.com/danburkert/prost#278e8ab695f4acb415ab6f780a5c85f0a1a3b0ec"
2051+
source = "git+https://github.com/danburkert/prost#423f5ec5bd165a7007a388edfb2b485d5bbf40c7"
20322052
dependencies = [
20332053
"anyhow",
20342054
"itertools",

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ members = [
1515
"linkerd/channel",
1616
"linkerd/concurrency-limit",
1717
"linkerd/conditional",
18+
"linkerd/opaque-transport",
1819
"linkerd/dns/name",
1920
"linkerd/dns",
2021
"linkerd/drain",
@@ -78,4 +79,4 @@ tonic = { git = "https://github.com/hawkw/tonic", branch = "eliza/tokio-0.3" }
7879
tonic-build = { git = "https://github.com/hawkw/tonic", branch = "eliza/tokio-0.3" }
7980
prost = { git = "https://github.com/danburkert/prost" }
8081
prost-build = { git = "https://github.com/danburkert/prost" }
81-
prost-types = { git = "https://github.com/danburkert/prost" }
82+
prost-types = { git = "https://github.com/danburkert/prost" }

linkerd/app/core/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ linkerd2-cache = { path = "../../cache" }
2727
linkerd2-buffer = { path = "../../buffer" }
2828
linkerd2-concurrency-limit = { path = "../../concurrency-limit" }
2929
linkerd2-conditional = { path = "../../conditional" }
30+
linkerd2-opaque-transport = { path = "../../opaque-transport" }
3031
linkerd2-dns = { path = "../../dns" }
3132
linkerd2-drain = { path = "../../drain" }
3233
linkerd2-duplex = { path = "../../duplex" }
@@ -40,7 +41,7 @@ linkerd2-http-metrics = { path = "../../http-metrics" }
4041
linkerd2-metrics = { path = "../../metrics" }
4142
linkerd2-opencensus = { path = "../../opencensus" }
4243
linkerd2-proxy-core = { path = "../../proxy/core" }
43-
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "259628840ba613c2e5673fc6a39b946e1b06f09a" }
44+
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.16" }
4445
linkerd2-proxy-api-resolve = { path = "../../proxy/api-resolve" }
4546
linkerd2-proxy-discover = { path = "../../proxy/discover" }
4647
linkerd2-proxy-identity = { path = "../../proxy/identity" }
@@ -85,5 +86,5 @@ libc = "0.2"
8586
procinfo = "0.4.2"
8687

8788
[dev-dependencies]
88-
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "259628840ba613c2e5673fc6a39b946e1b06f09a", features = ["arbitrary"] }
89+
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.16", features = ["arbitrary"] }
8990
prost-types = "0.6.0"

linkerd/app/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub use linkerd2_drain as drain;
1717
pub use linkerd2_error::{Error, Never, Recover};
1818
pub use linkerd2_exp_backoff as exp_backoff;
1919
pub use linkerd2_http_metrics as http_metrics;
20+
pub use linkerd2_opaque_transport as opaque_transport;
2021
pub use linkerd2_opencensus as opencensus;
2122
pub use linkerd2_reconnect as reconnect;
2223
pub use linkerd2_service_profiles as profiles;

linkerd/app/inbound/src/endpoint.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use indexmap::IndexMap;
22
use linkerd2_app_core::{
33
classify, dst, http_request_authority_addr, http_request_host_addr,
4-
http_request_l5d_override_dst_addr, metrics, profiles,
4+
http_request_l5d_override_dst_addr, metrics,
5+
opaque_transport::Header,
6+
profiles,
57
proxy::{http, identity, tap},
68
stack_tracing, svc,
79
transport::{self, listen, tls},
@@ -140,6 +142,15 @@ impl From<TcpAccept> for TcpEndpoint {
140142
}
141143
}
142144

145+
impl From<(Option<Header>, TcpAccept)> for TcpEndpoint {
146+
fn from((hdr, tcp): (Option<Header>, TcpAccept)) -> Self {
147+
match hdr {
148+
Some(Header { port, .. }) => Self { port },
149+
None => tcp.into(),
150+
}
151+
}
152+
}
153+
143154
impl Into<SocketAddr> for TcpEndpoint {
144155
fn into(self) -> SocketAddr {
145156
(&self).into()

linkerd/app/inbound/src/lib.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use linkerd2_app_core::{
1616
classify,
1717
config::{ProxyConfig, ServerConfig},
1818
drain, dst, errors, metrics,
19+
opaque_transport::DetectHeader,
1920
opencensus::proto::trace::v1 as oc,
2021
profiles,
2122
proxy::{
@@ -115,6 +116,7 @@ impl Config {
115116
.into_inner();
116117

117118
let accept = self.build_accept(
119+
prevent_loop,
118120
tcp_forward.clone(),
119121
http_router,
120122
metrics.clone(),
@@ -273,11 +275,8 @@ impl Config {
273275

274276
// If the traffic is targeted at the inbound port, send it through
275277
// the loopback service (i.e. as a gateway).
276-
let switch_loopback = svc::stack(loopback).push_switch(prevent_loop, profile);
277-
278-
// Attempts to resolve the target as a service profile or, if that
279-
// fails, skips that stack to forward to the local endpoint.
280-
svc::stack(switch_loopback)
278+
svc::stack(profile)
279+
.push_switch(prevent_loop, loopback)
281280
.check_new_service::<Target, http::Request<http::boxed::BoxBody>>()
282281
.push_on_response(
283282
svc::layers()
@@ -300,6 +299,7 @@ impl Config {
300299

301300
pub fn build_accept<I, F, A, H, S>(
302301
&self,
302+
prevent_loop: impl Into<PreventLoop>,
303303
tcp_forward: F,
304304
http_router: H,
305305
metrics: metrics::Proxy,
@@ -319,10 +319,17 @@ impl Config {
319319
+ 'static
320320
where
321321
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Unpin + Send + 'static,
322-
F: svc::NewService<TcpEndpoint, Service = A> + Unpin + Clone + Send + 'static,
322+
F: svc::NewService<TcpEndpoint, Service = A> + Unpin + Clone + Send + Sync + 'static,
323323
A: tower::Service<io::PrefixedIo<I>, Response = ()> + Clone + Send + Sync + 'static,
324-
A::Error: Into<Error>,
325-
A::Future: Send,
324+
<A as tower::Service<io::PrefixedIo<I>>>::Error: Into<Error>,
325+
<A as tower::Service<io::PrefixedIo<I>>>::Future: Send,
326+
A: tower::Service<io::PrefixedIo<io::PrefixedIo<I>>, Response = ()>
327+
+ Clone
328+
+ Send
329+
+ Sync
330+
+ 'static,
331+
<A as tower::Service<io::PrefixedIo<io::PrefixedIo<I>>>>::Error: Into<Error>,
332+
<A as tower::Service<io::PrefixedIo<io::PrefixedIo<I>>>>::Future: Send,
326333
H: svc::NewService<Target, Service = S> + Unpin + Clone + Send + Sync + 'static,
327334
S: tower::Service<
328335
http::Request<http::boxed::BoxBody>,
@@ -384,13 +391,31 @@ impl Config {
384391
.check_new_service::<(http::Version, TcpAccept), http::Request<_>>()
385392
.into_inner();
386393

387-
svc::stack(http::NewServeHttp::new(h2_settings, http_server, drain))
388-
.push(svc::stack::NewOptional::layer(
394+
// When HTTP detection fails, forward the connection to the application
395+
// as an opaque TCP stream.
396+
let tcp = svc::stack(tcp_forward.clone())
397+
.push_map_target(TcpEndpoint::from)
398+
.push_switch(
399+
prevent_loop.into(),
400+
// If the connection targets the inbound port, try to detect an
401+
// opaque transport header and rewrite the target port
402+
// accordingly. If there was no opaque transport header, the
403+
// forwarding will fail when the tcp connect stack applies loop
404+
// prevention.
389405
svc::stack(tcp_forward)
390406
.push_map_target(TcpEndpoint::from)
391-
.into_inner(),
392-
))
393-
.check_new_clone::<(Option<http::Version>, TcpAccept)>()
407+
.push(transport::NewDetectService::layer(
408+
transport::detect::DetectTimeout::new(
409+
self.proxy.detect_protocol_timeout,
410+
DetectHeader::default(),
411+
),
412+
)),
413+
)
414+
.push_on_response(drain::Retain::layer(drain.clone()))
415+
.into_inner();
416+
417+
svc::stack(http::NewServeHttp::new(h2_settings, http_server, drain))
418+
.push(svc::stack::NewOptional::layer(tcp))
394419
.push_cache(cache_max_idle_age)
395420
.push(transport::NewDetectService::layer(
396421
transport::detect::DetectTimeout::new(

linkerd/app/inbound/src/prevent_loop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ where
4242
fn use_primary(&self, target: &T) -> bool {
4343
let addr = target.into();
4444
tracing::debug!(%addr, self.port);
45-
addr.port() == self.port
45+
addr.port() != self.port
4646
}
4747
}
4848

linkerd/app/integration/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ hyper = "0.14.0-dev"
2727
linkerd2-app = { path = "..", features = ["mock-orig-dst"] }
2828
linkerd2-app-core = { path = "../core", features = ["mock-orig-dst"] }
2929
linkerd2-metrics = { path = "../../metrics", features = ["test_util"] }
30-
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "259628840ba613c2e5673fc6a39b946e1b06f09a", features = ["arbitrary"] }
30+
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.16", features = ["arbitrary"] }
3131
linkerd2-app-test = { path = "../test" }
3232
regex = "0.1"
3333
socket2 = "0.3.12"

linkerd/app/integration/src/controller.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ pub fn destination_add_labeled(
426426
Hint::Unknown => None,
427427
Hint::H2 => Some(pb::ProtocolHint {
428428
protocol: Some(pb::protocol_hint::Protocol::H2(pb::protocol_hint::H2 {})),
429+
..Default::default()
429430
}),
430431
};
431432
pb::Update {

linkerd/app/integration/src/tests/transparency.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ macro_rules! http1_tests {
355355
}
356356

357357
#[tokio::test]
358-
async fn http1_removes_connection_headers() {
358+
async fn http1_removes_opaque_transports() {
359359
let _trace = trace_init();
360360

361361
let srv = server::http1()

0 commit comments

Comments
 (0)