Skip to content

Commit 4bac424

Browse files
authored
outbound: TCP discovery and load balancing (#652)
The proxy only forward TCP connections to the original destination address, never using the mesh's mTLS. This change modifies the outbound TCP forwarding stack to do discovery based on the original destination address, resolving service IPs to the individual endpoint IPs (and their associated identities). This enables mTLS for meshed TCP connections. When the endpoint cannot be discovered (i.e. due to an InvalidArgument response from the controller), the connection is forwarded as before. The PeakEWMA balancer is repurposed for this, using connection latency (and pending connections) as the load metric. In the future, this should be modified to count active connections towards the load (but this won't Just Work with the PeakEwma balancer, so this is deferred for now). This has been tested [manually](https://github.com/olix0r/init-net-test/blob/b3860861c54344667a797ece5b685a0574798fb3/k8s.yml). In follow-up changes, we'll extend the transparency tests to validate this behavior.
1 parent 1382e32 commit 4bac424

File tree

13 files changed

+233
-46
lines changed

13 files changed

+233
-46
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,9 @@ dependencies = [
13731373
"futures 0.3.5",
13741374
"linkerd2-duplex",
13751375
"linkerd2-error",
1376+
"linkerd2-stack",
13761377
"pin-project",
1378+
"rand 0.7.2",
13771379
"tokio",
13781380
"tower",
13791381
]

linkerd/app/core/src/svc.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,14 @@ impl<S> Stack<S> {
159159
self.push(stack::map_target::MapTargetLayer::new(map_target))
160160
}
161161

162+
/// Wraps a `Service<T>` as a `Service<()>`.
163+
///
164+
/// Each time the service is called, the `T`-typed request is cloned and
165+
/// issued into the inner service.
166+
pub fn push_make_thunk(self) -> Stack<stack::make_thunk::MakeThunk<S>> {
167+
self.push(layer::mk(stack::make_thunk::MakeThunk::new))
168+
}
169+
162170
pub fn instrument<G: Clone>(self, get_span: G) -> Stack<InstrumentMake<G, S>> {
163171
self.push(InstrumentMakeLayer::new(get_span))
164172
}

linkerd/app/inbound/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,9 @@ impl Config {
371371
// tasks from their constructor. This helps to ensure that tasks are
372372
// spawned on the same runtime as the proxy.
373373
// Forwards TCP streams that cannot be decoded as HTTP.
374-
let tcp_forward = svc::stack(tcp::Forward::new(tcp_connect))
374+
let tcp_forward = svc::stack(tcp_connect)
375+
.push_make_thunk()
376+
.push(svc::layer::mk(tcp::Forward::new))
375377
.push(admit::AdmitLayer::new(prevent_loop.into()));
376378

377379
let http = DetectHttp::new(

linkerd/app/outbound/src/endpoint.rs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub struct HttpEndpoint {
5858
pub struct TcpEndpoint {
5959
pub addr: SocketAddr,
6060
pub identity: tls::PeerIdentity,
61+
//pub labels: Option<String>,
6162
}
6263

6364
// === impl HttpConrete ===
@@ -302,15 +303,22 @@ impl Into<EndpointLabels> for HttpEndpoint {
302303

303304
// === impl TcpEndpoint ===
304305

305-
impl From<listen::Addrs> for TcpEndpoint {
306-
fn from(addrs: listen::Addrs) -> Self {
306+
impl From<SocketAddr> for TcpEndpoint {
307+
fn from(addr: SocketAddr) -> Self {
307308
Self {
308-
addr: addrs.target_addr(),
309+
addr,
309310
identity: Conditional::None(tls::ReasonForNoPeerName::NotHttp.into()),
311+
//labels: None,
310312
}
311313
}
312314
}
313315

316+
impl From<listen::Addrs> for TcpEndpoint {
317+
fn from(addrs: listen::Addrs) -> Self {
318+
addrs.target_addr().into()
319+
}
320+
}
321+
314322
impl Into<SocketAddr> for TcpEndpoint {
315323
fn into(self) -> SocketAddr {
316324
self.addr
@@ -323,14 +331,35 @@ impl tls::HasPeerIdentity for TcpEndpoint {
323331
}
324332
}
325333

326-
impl Into<EndpointLabels> for TcpEndpoint {
327-
fn into(self) -> EndpointLabels {
328-
use linkerd2_app_core::metric_labels::{Direction, TlsId};
329-
EndpointLabels {
330-
direction: Direction::Out,
331-
tls_id: self.identity.as_ref().map(|id| TlsId::ServerId(id.clone())),
332-
authority: None,
333-
labels: None,
334+
// impl Into<EndpointLabels> for TcpEndpoint {
335+
// fn into(self) -> EndpointLabels {
336+
// use linkerd2_app_core::metric_labels::{Direction, TlsId};
337+
// EndpointLabels {
338+
// authority: None,
339+
// direction: Direction::Out,
340+
// labels: self.labels,
341+
// tls_id: self.identity.as_ref().map(|id| TlsId::ServerId(id.clone())),
342+
// }
343+
// }
344+
// }
345+
346+
impl MapEndpoint<Addr, Metadata> for FromMetadata {
347+
type Out = TcpEndpoint;
348+
349+
fn map_endpoint(&self, dst: &Addr, addr: SocketAddr, metadata: Metadata) -> Self::Out {
350+
tracing::debug!(%dst, %addr, ?metadata, "Resolved endpoint");
351+
let identity = metadata
352+
.identity()
353+
.cloned()
354+
.map(Conditional::Some)
355+
.unwrap_or_else(|| {
356+
Conditional::None(tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into())
357+
});
358+
359+
TcpEndpoint {
360+
addr,
361+
identity,
362+
//labels: prefix_labels("dst", metadata.labels().into_iter()),
334363
}
335364
}
336365
}

linkerd/app/outbound/src/lib.rs

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@ use linkerd2_app_core::{
2222
spans::SpanConverter,
2323
svc::{self, NewService},
2424
transport::{self, listen, tls},
25-
Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
25+
Addr, Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
2626
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
2727
};
28-
use std::{collections::HashMap, net::IpAddr, time::Duration};
28+
use std::{
29+
collections::HashMap,
30+
net::{IpAddr, SocketAddr},
31+
time::Duration,
32+
};
2933
use tokio::sync::mpsc;
3034
use tracing::{info, info_span};
3135

@@ -363,18 +367,22 @@ impl Config {
363367
.into_inner()
364368
}
365369

366-
pub async fn build_server<R, C, H, S>(
370+
pub async fn build_server<E, R, C, H, S>(
367371
self,
368372
listen_addr: std::net::SocketAddr,
369373
listen: impl Stream<Item = std::io::Result<listen::Connection>> + Send + 'static,
370374
refine: R,
375+
resolve: E,
371376
tcp_connect: C,
372377
http_router: H,
373378
metrics: ProxyMetrics,
374379
span_sink: Option<mpsc::Sender<oc::Span>>,
375380
drain: drain::Watch,
376381
) -> Result<(), Error>
377382
where
383+
E: Resolve<Addr, Endpoint = proxy::api_resolve::Metadata> + Unpin + Clone + Send + 'static,
384+
E::Future: Unpin + Send,
385+
E::Resolution: Unpin + Send,
378386
R: tower::Service<dns::Name, Error = Error, Response = dns::Name>
379387
+ Unpin
380388
+ Clone
@@ -404,6 +412,8 @@ impl Config {
404412
dispatch_timeout,
405413
max_in_flight_requests,
406414
detect_protocol_timeout,
415+
cache_max_idle_age,
416+
buffer_capacity,
407417
..
408418
} = self.proxy;
409419
let canonicalize_timeout = self.canonicalize_timeout;
@@ -448,22 +458,57 @@ impl Config {
448458
.into_inner()
449459
.into_make_service();
450460

451-
// The stack is served lazily since caching layers spawn tasks from
452-
// their constructor. This helps to ensure that tasks are spawned on the
453-
// same runtime as the proxy.
454-
// Forwards TCP streams that cannot be decoded as HTTP.
455-
let tcp_forward = svc::stack(tcp::Forward::new(tcp_connect))
461+
// Load balances TCP streams that cannot be decoded as HTTP.
462+
let tcp_balance = svc::stack(tcp_connect.clone())
463+
.push_make_thunk()
464+
.instrument(|t: &TcpEndpoint| info_span!("tcp.endpoint", peer.addr = %t.addr, peer.id = ?t.identity))
456465
.push(admit::AdmitLayer::new(prevent_loop))
457-
.push_map_target(TcpEndpoint::from);
466+
.check_make_service::<TcpEndpoint, ()>()
467+
.push(discover::resolve(map_endpoint::Resolve::new(
468+
endpoint::FromMetadata,
469+
resolve,
470+
)))
471+
.push(discover::buffer(1_000, cache_max_idle_age))
472+
.push_map_target(Addr::from)
473+
.push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
474+
.instrument(|a: &SocketAddr| info_span!("tcp.balance", dst = %a))
475+
.push_fallback_with_predicate(
476+
svc::stack(tcp_connect.clone())
477+
.push_make_thunk()
478+
.push(admit::AdmitLayer::new(prevent_loop))
479+
.push_map_target(TcpEndpoint::from)
480+
.instrument(|a: &SocketAddr| info_span!("tcp.forward", peer.addr = %a)),
481+
is_discovery_rejected,
482+
)
483+
.into_new_service()
484+
.check_new_service::<SocketAddr, ()>()
485+
.cache(
486+
svc::layers().push_on_response(
487+
svc::layers()
488+
.push_failfast(dispatch_timeout)
489+
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
490+
.push(metrics.stack.layer(stack_labels("tcp"))),
491+
),
492+
)
493+
.spawn_buffer(buffer_capacity)
494+
.check_make_service::<SocketAddr, ()>()
495+
.push(svc::layer::mk(tcp::Forward::new))
496+
.push_map_target(|a: listen::Addrs| a.target_addr());
458497

459498
let http = http::DetectHttp::new(
460499
h2_settings,
461500
detect_protocol_timeout,
462501
http_server,
463-
tcp_forward.clone(),
502+
tcp_balance,
464503
drain.clone(),
465504
);
466505

506+
let tcp_forward = svc::stack(tcp_connect)
507+
.push_make_thunk()
508+
.push(svc::layer::mk(tcp::Forward::new))
509+
.push(admit::AdmitLayer::new(prevent_loop))
510+
.push_map_target(TcpEndpoint::from);
511+
467512
let accept = svc::stack(SkipDetect::new(skip_detect, http, tcp_forward))
468513
.push(metrics.transport.layer_accept(TransportLabels));
469514

linkerd/app/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl Config {
143143

144144
let outbound_http = outbound.build_http_router(
145145
outbound_http_endpoint,
146-
dst.resolve,
146+
dst.resolve.clone(),
147147
dst.profiles.clone(),
148148
outbound_metrics.clone(),
149149
);
@@ -156,6 +156,7 @@ impl Config {
156156
svc::stack(refine.clone())
157157
.push_map_response(|(n, _)| n)
158158
.into_inner(),
159+
dst.resolve,
159160
outbound_connect,
160161
outbound_http.clone(),
161162
outbound_metrics,

linkerd/proxy/http/src/version.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use linkerd2_proxy_transport::io::{self, Peekable, PrefixedIo};
2+
use tracing::{debug, trace};
23

34
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
45
pub enum Version {
@@ -16,6 +17,7 @@ impl Version {
1617
// http2 is easiest to detect
1718
if bytes.len() >= Self::H2_PREFACE.len() {
1819
if &bytes[..Self::H2_PREFACE.len()] == Self::H2_PREFACE {
20+
trace!("Detected H2");
1921
return Some(Version::H2);
2022
}
2123
}
@@ -34,11 +36,14 @@ impl Version {
3436
// We didn't want to keep parsing headers, just validate that
3537
// the first line is HTTP1.
3638
Ok(_) | Err(httparse::Error::TooManyHeaders) => {
39+
trace!("Detected H1");
3740
return Some(Version::Http1);
3841
}
3942
_ => {}
4043
}
4144

45+
debug!("Not HTTP");
46+
trace!(?bytes);
4247
None
4348
}
4449

linkerd/proxy/tcp/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ publish = false
1010
futures = { version = "0.3", features = ["compat"] }
1111
linkerd2-duplex = { path = "../../duplex" }
1212
linkerd2-error = { path = "../../error" }
13+
linkerd2-stack = { path = "../../stack" }
14+
rand = { version = "0.7", features = ["small_rng"] }
1315
tokio = { version = "0.2" }
1416
tower = { version = "0.3", default-features = false }
15-
pin-project = "0.4"
17+
pin-project = "0.4"

linkerd/proxy/tcp/src/balance.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use linkerd2_error::Error;
2+
use linkerd2_stack::layer;
3+
use rand::{rngs::SmallRng, SeedableRng};
4+
use std::{hash::Hash, time::Duration};
5+
pub use tower::{
6+
balance::p2c::Balance,
7+
load::{Load, PeakEwmaDiscover},
8+
};
9+
use tower::{discover::Discover, load::CompleteOnResponse};
10+
11+
/// Produces a PeakEWMA balancer that uses connect latency (and pending
12+
/// connections) as its load metric.
13+
pub fn layer<T, D>(
14+
default_rtt: Duration,
15+
decay: Duration,
16+
) -> impl tower::layer::Layer<D, Service = Balance<PeakEwmaDiscover<D, CompleteOnResponse>, T>> + Clone
17+
where
18+
D: Discover,
19+
D::Key: Hash,
20+
D::Service: tower::Service<T>,
21+
<D::Service as tower::Service<T>>::Error: Into<Error>,
22+
{
23+
let rng = SmallRng::from_entropy();
24+
layer::mk(move |discover| {
25+
let loaded =
26+
PeakEwmaDiscover::new(discover, default_rtt, decay, CompleteOnResponse::default());
27+
Balance::from_rng(loaded, rng.clone()).expect("RNG must be valid")
28+
})
29+
}

linkerd/proxy/tcp/src/forward.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,53 @@
11
use futures::{future, prelude::*};
22
use linkerd2_duplex::Duplex;
33
use linkerd2_error::Error;
4-
use std::future::Future;
5-
use std::pin::Pin;
6-
use std::task::{Context, Poll};
4+
use std::{
5+
future::Future,
6+
pin::Pin,
7+
task::{Context, Poll},
8+
};
79
use tokio::io::{AsyncRead, AsyncWrite};
810
use tower::Service;
911

1012
#[derive(Clone, Debug)]
11-
pub struct Forward<C> {
12-
connect: C,
13+
pub struct Forward<M> {
14+
make_connect: M,
1315
}
1416

1517
#[derive(Clone, Debug)]
16-
pub struct Accept<C, T> {
18+
pub struct Accept<C> {
1719
connect: C,
18-
target: T,
1920
}
2021

21-
impl<C> Forward<C> {
22-
pub fn new(connect: C) -> Self {
23-
Self { connect }
22+
impl<M> Forward<M> {
23+
pub fn new(make_connect: M) -> Self {
24+
Self { make_connect }
2425
}
2526
}
2627

27-
impl<C: Clone, T> Service<T> for Forward<C> {
28-
type Response = Accept<C, T>;
29-
type Error = Error;
30-
type Future = future::Ready<Result<Self::Response, Error>>;
28+
impl<M, T> Service<T> for Forward<M>
29+
where
30+
M: Service<T>,
31+
{
32+
type Response = Accept<M::Response>;
33+
type Error = M::Error;
34+
type Future = future::MapOk<M::Future, fn(M::Response) -> Accept<M::Response>>;
3135

32-
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), self::Error>> {
33-
Poll::Ready(Ok(()))
36+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), M::Error>> {
37+
self.make_connect.poll_ready(cx)
3438
}
3539

3640
fn call(&mut self, target: T) -> Self::Future {
37-
let connect = self.connect.clone();
38-
future::ok(Accept { connect, target })
41+
self.make_connect
42+
.call(target)
43+
.map_ok(|connect| Accept { connect })
3944
}
4045
}
4146

42-
impl<C, T, I> Service<I> for Accept<C, T>
47+
impl<C, I> Service<I> for Accept<C>
4348
where
44-
T: Clone,
4549
I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
46-
C: tower::Service<T> + Send + 'static,
50+
C: tower::Service<()> + Send + 'static,
4751
C::Error: Into<Error>,
4852
C::Future: Send + 'static,
4953
C::Response: AsyncRead + AsyncWrite + Send + Unpin + 'static,
@@ -58,7 +62,7 @@ where
5862
}
5963

6064
fn call(&mut self, src_io: I) -> Self::Future {
61-
let connect = self.connect.call(self.target.clone()).err_into::<Error>();
65+
let connect = self.connect.call(()).err_into::<Error>();
6266
Box::pin(async move {
6367
let dst_io = connect.await?;
6468
Duplex::new(src_io, dst_io).err_into::<Error>().await

0 commit comments

Comments
 (0)