Skip to content

Commit 90ba66e

Browse files
authored
outbound: Avoid redundant TCP endpoint resolution (#742)
This change extends the changes in #736 to the TCP-forwarding stack so that load balancer resolutions are not created when the control plane does not indicate that the target is a service.
1 parent a466ba5 commit 90ba66e

File tree

5 files changed

+75
-18
lines changed

5 files changed

+75
-18
lines changed

linkerd/app/outbound/src/server.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,18 @@ where
9393
.check_new_service::<(http::Version, tcp::Logical), http::Request<_>>()
9494
.into_inner();
9595

96+
let tcp_forward = svc::stack(tcp_connect.clone())
97+
.push_make_thunk()
98+
.check_make_service::<tcp::Endpoint, ()>()
99+
.push_on_response(svc::layer::mk(tcp::Forward::new))
100+
.into_new_service()
101+
.check_new_service::<tcp::Endpoint, transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
102+
.push_map_target(tcp::Endpoint::from_logical(
103+
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery,
104+
))
105+
.check_new_service::<tcp::Logical, transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
106+
.into_inner();
107+
96108
// Load balances TCP streams that cannot be decoded as HTTP.
97109
let tcp_balance = svc::stack(tcp::balance::stack(
98110
&config.proxy,
@@ -101,6 +113,8 @@ where
101113
))
102114
.push_map_target(tcp::Concrete::from)
103115
.push(profiles::split::layer())
116+
.check_new_service::<tcp::Logical, transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
117+
.push_switch(tcp::Logical::should_resolve, tcp_forward)
104118
.push_on_response(
105119
svc::layers()
106120
.push_failfast(dispatch_timeout)

linkerd/app/outbound/src/tcp/tests.rs

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ async fn resolutions_are_reused() {
172172

173173
let addr = SocketAddr::new([0, 0, 0, 0].into(), 5550);
174174
let cfg = default_config(addr);
175+
let svc_name = profile::Name::from_str("foo.ns1.svc.example.com").unwrap();
175176
let id_name =
176177
linkerd2_identity::Name::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
177178
.expect("hostname is valid");
@@ -195,10 +196,16 @@ async fn resolutions_are_reused() {
195196

196197
// Configure the mock destination resolver to just give us a single endpoint
197198
// for the target, which always exists and has no metadata.
198-
let resolver = support::resolver().endpoint_exists(Addr::from(addr), addr, meta);
199+
let resolver = support::resolver().endpoint_exists((svc_name.clone(), addr.port()), addr, meta);
199200
let resolve_state = resolver.handle();
200201

201-
let profiles = support::profiles().profile(addr, Default::default());
202+
let profiles = support::profiles().profile(
203+
addr,
204+
profile::Profile {
205+
name: Some(svc_name),
206+
..profile::Profile::default()
207+
},
208+
);
202209
let profile_state = profiles.handle();
203210

204211
// Build the outbound server
@@ -252,6 +259,7 @@ async fn load_balances() {
252259
];
253260

254261
let cfg = default_config(svc_addr);
262+
let svc_name = profile::Name::from_str("foo.ns1.svc.example.com").unwrap();
255263
let id_name =
256264
linkerd2_identity::Name::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
257265
.expect("hostname is valid");
@@ -269,7 +277,13 @@ async fn load_balances() {
269277
);
270278
}
271279

272-
let profiles = support::profile::resolver().profile(svc_addr, Default::default());
280+
let profiles = support::profile::resolver().profile(
281+
svc_addr,
282+
profile::Profile {
283+
name: Some(svc_name.clone()),
284+
..Default::default()
285+
},
286+
);
273287
let profile_state = profiles.handle();
274288

275289
let meta = support::resolver::Metadata::new(
@@ -281,7 +295,7 @@ async fn load_balances() {
281295
);
282296

283297
let resolver = support::resolver();
284-
let mut dst = resolver.endpoint_tx(Addr::Socket(svc_addr));
298+
let mut dst = resolver.endpoint_tx((svc_name, svc_addr.port()));
285299
dst.add(endpoints.iter().map(|&(addr, _)| (addr, meta.clone())))
286300
.expect("still listening");
287301
let resolve_state = resolver.handle();
@@ -342,6 +356,7 @@ async fn load_balancer_add_endpoints() {
342356
];
343357

344358
let cfg = default_config(svc_addr);
359+
let svc_name = profile::Name::from_str("foo.ns1.svc.example.com").unwrap();
345360
let id_name =
346361
linkerd2_identity::Name::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
347362
.expect("hostname is valid");
@@ -358,7 +373,13 @@ async fn load_balancer_add_endpoints() {
358373
);
359374
}
360375

361-
let profiles = support::profile::resolver().profile(svc_addr, Default::default());
376+
let profiles = support::profile::resolver().profile(
377+
svc_addr,
378+
profile::Profile {
379+
name: Some(svc_name.clone()),
380+
..Default::default()
381+
},
382+
);
362383

363384
let meta = support::resolver::Metadata::new(
364385
Default::default(),
@@ -369,7 +390,7 @@ async fn load_balancer_add_endpoints() {
369390
);
370391

371392
let resolver = support::resolver();
372-
let mut dst = resolver.endpoint_tx(Addr::Socket(svc_addr));
393+
let mut dst = resolver.endpoint_tx((svc_name, svc_addr.port()));
373394
dst.add(Some((endpoints[0].0, meta.clone())))
374395
.expect("still listening");
375396

@@ -450,6 +471,7 @@ async fn load_balancer_remove_endpoints() {
450471
];
451472

452473
let cfg = default_config(svc_addr);
474+
let svc_name = profile::Name::from_str("foo.ns1.svc.example.com").unwrap();
453475
let id_name =
454476
linkerd2_identity::Name::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
455477
.expect("hostname is valid");
@@ -466,7 +488,13 @@ async fn load_balancer_remove_endpoints() {
466488
);
467489
}
468490

469-
let profiles = support::profile::resolver().profile(svc_addr, Default::default());
491+
let profiles = support::profile::resolver().profile(
492+
svc_addr,
493+
profile::Profile {
494+
name: Some(svc_name.clone()),
495+
..Default::default()
496+
},
497+
);
470498

471499
let meta = support::resolver::Metadata::new(
472500
Default::default(),
@@ -477,7 +505,7 @@ async fn load_balancer_remove_endpoints() {
477505
);
478506

479507
let resolver = support::resolver();
480-
let mut dst = resolver.endpoint_tx(Addr::Socket(svc_addr));
508+
let mut dst = resolver.endpoint_tx((svc_name, svc_addr.port()));
481509
dst.add(Some((endpoints[0].0, meta.clone())))
482510
.expect("still listening");
483511

@@ -539,6 +567,7 @@ async fn no_profiles_when_outside_search_nets() {
539567
allow_discovery: IpMatch::new(Some(IpNet::from_str("10.0.0.0/8").unwrap())).into(),
540568
..default_config(profile_addr)
541569
};
570+
let svc_name = profile::Name::from_str("foo.ns1.svc.example.com").unwrap();
542571
let id_name =
543572
linkerd2_identity::Name::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
544573
.expect("hostname is invalid");
@@ -578,11 +607,20 @@ async fn no_profiles_when_outside_search_nets() {
578607

579608
// Configure the mock destination resolver to just give us a single endpoint
580609
// for the target, which always exists and has no metadata.
581-
let resolver =
582-
support::resolver().endpoint_exists(Addr::from(profile_addr), profile_addr, meta);
610+
let resolver = support::resolver().endpoint_exists(
611+
(svc_name.clone(), profile_addr.port()),
612+
profile_addr,
613+
meta,
614+
);
583615
let resolve_state = resolver.handle();
584616

585-
let profiles = support::profiles().profile(profile_addr, Default::default());
617+
let profiles = support::profiles().profile(
618+
profile_addr,
619+
profile::Profile {
620+
name: Some(svc_name),
621+
..Default::default()
622+
},
623+
);
586624
let profile_state = profiles.handle();
587625

588626
// Build the outbound server

linkerd/app/test/src/connect.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,14 @@ where
3535
let span = tracing::info_span!("connect", %addr);
3636
let f = span.in_scope(|| {
3737
tracing::trace!("connecting...");
38-
match self.endpoints.lock().unwrap().get_mut(&addr) {
38+
let mut endpoints = self.endpoints.lock().unwrap();
39+
match endpoints.get_mut(&addr) {
3940
Some(f) => (f)(endpoint),
40-
None => panic!("did not expect to connect to the endpoint {:?}", endpoint),
41+
None => panic!(
42+
"did not expect to connect to the endpoint {} not in {:?}",
43+
addr,
44+
endpoints.keys().collect::<Vec<_>>()
45+
),
4146
}
4247
});
4348
f.instrument(span)

linkerd/app/test/src/resolver.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ impl<T, E> Dst<T, E>
8181
where
8282
T: Hash + Eq,
8383
{
84-
pub fn endpoint_tx(&self, endpoint: T) -> DstSender<E> {
84+
pub fn endpoint_tx(&self, t: impl Into<T>) -> DstSender<E> {
8585
let (tx, rx) = mpsc::unbounded_channel();
86-
self.state.endpoints.lock().unwrap().insert(endpoint, rx);
86+
self.state.endpoints.lock().unwrap().insert(t.into(), rx);
8787
DstSender(tx)
8888
}
8989

90-
pub fn endpoint_exists(self, endpoint: T, addr: SocketAddr, meta: E) -> Self {
91-
let mut tx = self.endpoint_tx(endpoint);
90+
pub fn endpoint_exists(self, t: impl Into<T>, addr: SocketAddr, meta: E) -> Self {
91+
let mut tx = self.endpoint_tx(t);
9292
tx.add(vec![(addr, meta)]).unwrap();
9393
self
9494
}

linkerd/service-profiles/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#![deny(warnings, rust_2018_idioms)]
22

33
use linkerd2_addr::Addr;
4-
use linkerd2_dns_name::Name;
4+
pub use linkerd2_dns_name::Name;
55
use linkerd2_error::Error;
66
use linkerd2_proxy_api_resolve::Metadata;
77
use std::{

0 commit comments

Comments
 (0)