Skip to content

Commit bf760fb

Browse files
authored
outbound: Use the OrigDstAddr type in stack targets (#936)
The outbound stack is ambiguous about the type of orig_dst/target address it uses. This change modifies the outbound stack types to use the `OrigDstAddr` type. The outbound stacks now error when an `OrigDstAddr` type is not present.
1 parent f0fce1c commit bf760fb

File tree

8 files changed

+60
-49
lines changed

8 files changed

+60
-49
lines changed

linkerd/app/gateway/src/gateway.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use linkerd_app_core::{
66
profiles,
77
proxy::http,
88
svc::{self, layer},
9-
tls, Error, NameAddr,
9+
tls,
10+
transport::OrigDstAddr,
11+
Error, NameAddr,
1012
};
1113
use linkerd_app_outbound as outbound;
1214
use std::{
@@ -71,7 +73,7 @@ where
7173
let svc = self.outbound.new_service(outbound::http::Logical {
7274
profile,
7375
protocol: http.version,
74-
orig_dst: ([0, 0, 0, 0], dst.port()).into(),
76+
orig_dst: OrigDstAddr(([0, 0, 0, 0], dst.port()).into()),
7577
});
7678

7779
Gateway::new(svc, http.target, local_id)

linkerd/app/gateway/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use linkerd_app_core::{
1111
proxy::{api_resolve::Metadata, core::Resolve, http},
1212
svc::{self, Param},
1313
tls,
14+
transport::OrigDstAddr,
1415
transport_header::SessionProtocol,
1516
Error, NameAddr, NameMatch, Never,
1617
};
@@ -120,7 +121,7 @@ where
120121
.push_request_filter(|(p, _): (Option<profiles::Receiver>, _)| match p {
121122
Some(rx) if rx.borrow().name.is_some() => Ok(outbound::tcp::Logical {
122123
profile: Some(rx),
123-
orig_dst: std::net::SocketAddr::from(([0, 0, 0, 0], 0)),
124+
orig_dst: OrigDstAddr(std::net::SocketAddr::from(([0, 0, 0, 0], 0))),
124125
protocol: (),
125126
}),
126127
_ => Err(discovery_rejected()),

linkerd/app/outbound/src/discover.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use linkerd_app_core::{
44
transport::{listen, metrics::SensorIo},
55
Error, IpMatch,
66
};
7+
use std::convert::TryFrom;
78

89
impl<N> Outbound<N> {
910
/// Discovers the profile for a TCP endpoint.
@@ -58,7 +59,7 @@ impl<N> Outbound<N> {
5859
.push(rt.metrics.transport.layer_accept())
5960
.push_cache(config.proxy.cache_max_idle_age)
6061
.check_new_service::<tcp::Accept, I>()
61-
.push_map_target(tcp::Accept::from)
62+
.push_request_filter(tcp::Accept::try_from)
6263
.check_new_service::<listen::Addrs, I>();
6364

6465
Outbound {
@@ -78,8 +79,8 @@ impl svc::stack::Predicate<tcp::Accept> for AllowProfile {
7879
type Request = profiles::LogicalAddr;
7980

8081
fn check(&mut self, a: tcp::Accept) -> Result<profiles::LogicalAddr, Error> {
81-
if self.0.matches(a.orig_dst.ip()) {
82-
Ok(profiles::LogicalAddr(a.orig_dst.into()))
82+
if self.0.matches(a.orig_dst.0.ip()) {
83+
Ok(profiles::LogicalAddr(a.orig_dst.0.into()))
8384
} else {
8485
Err(discovery_rejected().into())
8586
}

linkerd/app/outbound/src/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl Param<normalize_uri::DefaultAuthority> for Logical {
7272
if let Some(p) = self.profile.as_ref() {
7373
if let Some(n) = p.borrow().name.as_ref() {
7474
return normalize_uri::DefaultAuthority(Some(
75-
uri::Authority::from_str(&format!("{}:{}", n, self.orig_dst.port()))
75+
uri::Authority::from_str(&format!("{}:{}", n, self.orig_dst.0.port()))
7676
.expect("Address must be a valid authority"),
7777
));
7878
}

linkerd/app/outbound/src/ingress.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use linkerd_app_core::{
66
transport::{self, listen},
77
Addr, AddrMatch, Error,
88
};
9+
use std::convert::TryFrom;
910
use tracing::{debug_span, info_span};
1011

1112
impl Outbound<()> {
@@ -132,7 +133,7 @@ impl Outbound<()> {
132133
))
133134
.check_new_service::<tcp::Accept, transport::metrics::SensorIo<I>>()
134135
.push(self.runtime.metrics.transport.layer_accept())
135-
.push_map_target(tcp::Accept::from)
136+
.push_request_filter(tcp::Accept::try_from)
136137
.check_new_service::<listen::Addrs, I>()
137138
// Boxing is necessary purely to limit the link-time overhead of
138139
// having enormous types.
@@ -193,7 +194,8 @@ impl<B> svc::stack::RecognizeRoute<http::Request<B>> for TargetPerRequest {
193194
fn recognize(&self, req: &http::Request<B>) -> Result<Self::Key, Error> {
194195
Ok(Target {
195196
accept: self.0,
196-
dst: http_request_l5d_override_dst_addr(req).unwrap_or_else(|_| self.0.orig_dst.into()),
197+
dst: http_request_l5d_override_dst_addr(req)
198+
.unwrap_or_else(|_| self.0.orig_dst.0.into()),
197199
})
198200
}
199201
}

linkerd/app/outbound/src/target.rs

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use linkerd_app_core::{
66
},
77
svc::{self, Param},
88
tls,
9-
transport::{self, Remote, ServerAddr},
9+
transport::{self, OrigDstAddr, Remote, ServerAddr},
1010
transport_header, Addr, Conditional, Error,
1111
};
1212
use std::net::SocketAddr;
@@ -16,13 +16,13 @@ pub struct EndpointFromMetadata;
1616

1717
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
1818
pub struct Accept<P> {
19-
pub orig_dst: SocketAddr,
19+
pub orig_dst: OrigDstAddr,
2020
pub protocol: P,
2121
}
2222

2323
#[derive(Clone)]
2424
pub struct Logical<P> {
25-
pub orig_dst: SocketAddr,
25+
pub orig_dst: OrigDstAddr,
2626
pub profile: Option<profiles::Receiver>,
2727
pub protocol: P,
2828
}
@@ -44,22 +44,14 @@ pub struct Endpoint<P> {
4444

4545
// === impl Accept ===
4646

47-
impl<P> Param<SocketAddr> for Accept<P> {
48-
fn param(&self) -> SocketAddr {
49-
self.orig_dst
50-
}
51-
}
52-
53-
impl<P> Param<Addr> for Accept<P> {
54-
fn param(&self) -> Addr {
55-
self.orig_dst.into()
56-
}
57-
}
58-
5947
impl<P> Param<transport::labels::Key> for Accept<P> {
6048
fn param(&self) -> transport::labels::Key {
6149
const NO_TLS: tls::ConditionalServerTls = Conditional::None(tls::NoServerTls::Loopback);
62-
transport::labels::Key::accept(transport::labels::Direction::Out, NO_TLS, self.orig_dst)
50+
transport::labels::Key::accept(
51+
transport::labels::Direction::Out,
52+
NO_TLS,
53+
self.orig_dst.into(),
54+
)
6355
}
6456
}
6557

@@ -89,13 +81,6 @@ impl<P> Param<Option<profiles::Receiver>> for Logical<P> {
8981
}
9082
}
9183

92-
/// Used to determine whether detection should be skipped.
93-
impl<P> Param<SocketAddr> for Logical<P> {
94-
fn param(&self) -> SocketAddr {
95-
self.orig_dst
96-
}
97-
}
98-
9984
/// Used for default traffic split
10085
impl<P> Param<profiles::LogicalAddr> for Logical<P> {
10186
fn param(&self) -> profiles::LogicalAddr {
@@ -108,8 +93,8 @@ impl<P> Logical<P> {
10893
self.profile
10994
.as_ref()
11095
.and_then(|p| p.borrow().name.clone())
111-
.map(|n| Addr::from((n, self.orig_dst.port())))
112-
.unwrap_or_else(|| self.orig_dst.into())
96+
.map(|n| Addr::from((n, self.orig_dst.0.port())))
97+
.unwrap_or_else(|| self.orig_dst.0.into())
11398
}
11499
}
115100

@@ -194,7 +179,7 @@ impl<P> Endpoint<P> {
194179
.and_then(|p| p.borrow().endpoint.clone())
195180
{
196181
None => Self {
197-
addr: Remote(ServerAddr(logical.orig_dst)),
182+
addr: Remote(ServerAddr(logical.orig_dst.into())),
198183
metadata: Metadata::default(),
199184
tls: Conditional::None(reason),
200185
logical_addr: logical.addr(),

linkerd/app/outbound/src/tcp/mod.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,40 @@ mod tests;
66

77
use crate::target;
88
pub use linkerd_app_core::proxy::tcp::Forward;
9-
use linkerd_app_core::{svc::Param, transport::listen, transport_header::SessionProtocol};
9+
use linkerd_app_core::{
10+
svc::Param,
11+
transport::{listen, OrigDstAddr},
12+
transport_header::SessionProtocol,
13+
};
1014

1115
pub type Accept = target::Accept<()>;
1216
pub type Logical = target::Logical<()>;
1317
pub type Concrete = target::Concrete<()>;
1418
pub type Endpoint = target::Endpoint<()>;
1519

16-
impl From<listen::Addrs> for Accept {
17-
fn from(addrs: listen::Addrs) -> Self {
20+
impl From<OrigDstAddr> for Accept {
21+
fn from(orig_dst: OrigDstAddr) -> Self {
1822
Self {
19-
orig_dst: addrs.target_addr(),
23+
orig_dst,
2024
protocol: (),
2125
}
2226
}
2327
}
2428

29+
impl std::convert::TryFrom<listen::Addrs> for Accept {
30+
type Error = std::io::Error;
31+
32+
fn try_from(t: listen::Addrs) -> Result<Self, Self::Error> {
33+
match t.orig_dst() {
34+
Some(addr) => Ok(Self::from(addr)),
35+
None => Err(std::io::Error::new(
36+
std::io::ErrorKind::NotFound,
37+
"No SO_ORIGINAL_DST address found",
38+
)),
39+
}
40+
}
41+
}
42+
2543
impl<P> From<(P, Accept)> for target::Accept<P> {
2644
fn from((protocol, Accept { orig_dst, .. }): (P, Accept)) -> Self {
2745
Self { orig_dst, protocol }

linkerd/app/outbound/src/tcp/tests.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ async fn plaintext_tcp() {
3838
// address to resolve, etc.
3939
let target_addr = SocketAddr::new([0, 0, 0, 0].into(), 666);
4040
let logical = Logical {
41-
orig_dst: target_addr,
41+
orig_dst: OrigDstAddr(target_addr),
4242
profile: Some(profile::only_default()),
4343
protocol: (),
4444
};
@@ -81,14 +81,16 @@ async fn plaintext_tcp() {
8181
async fn tls_when_hinted() {
8282
let _trace = support::trace_init();
8383

84+
let tls_addr = SocketAddr::new([0, 0, 0, 0].into(), 5550);
8485
let tls = Logical {
85-
orig_dst: SocketAddr::new([0, 0, 0, 0].into(), 5550),
86+
orig_dst: OrigDstAddr(tls_addr),
8687
profile: Some(profile::only_default()),
8788
protocol: (),
8889
};
8990

91+
let plain_addr = SocketAddr::new([0, 0, 0, 0].into(), 5551);
9092
let plain = Logical {
91-
orig_dst: SocketAddr::new([0, 0, 0, 0].into(), 5551),
93+
orig_dst: OrigDstAddr(plain_addr),
9294
profile: Some(profile::only_default()),
9395
protocol: (),
9496
};
@@ -103,12 +105,12 @@ async fn tls_when_hinted() {
103105
// Build a mock "connector" that returns the upstream "server" IO.
104106
let connect = support::connect()
105107
// The plaintext endpoint should use plaintext...
106-
.endpoint_fn(plain.orig_dst, move |endpoint: Endpoint| {
108+
.endpoint_fn(plain_addr, move |endpoint: Endpoint| {
107109
assert!(endpoint.tls.is_none());
108110
let io = tls_srv_io.build();
109111
Ok(io)
110112
})
111-
.endpoint_fn(tls.orig_dst, move |endpoint: Endpoint| {
113+
.endpoint_fn(tls_addr, move |endpoint: Endpoint| {
112114
assert_eq!(endpoint.tls, Conditional::Some(id_name2.clone().into()));
113115
let io = srv_io.build();
114116
Ok(io)
@@ -117,10 +119,10 @@ async fn tls_when_hinted() {
117119
// Configure the mock destination resolver to just give us a single endpoint
118120
// for the target, which always exists and has no metadata.
119121
let resolver = support::resolver()
120-
.endpoint_exists(plain.orig_dst, plain.orig_dst, Default::default())
122+
.endpoint_exists(plain_addr, plain_addr, Default::default())
121123
.endpoint_exists(
122-
tls.orig_dst,
123-
tls.orig_dst,
124+
tls_addr,
125+
tls_addr,
124126
support::resolver::Metadata::new(
125127
Default::default(),
126128
support::resolver::ProtocolHint::Unknown,
@@ -136,15 +138,15 @@ async fn tls_when_hinted() {
136138

137139
// Build the outbound TCP balancer stack.
138140
let (rt, _) = runtime();
139-
let plain = Outbound::new(default_config(plain.orig_dst), rt.clone())
141+
let plain = Outbound::new(default_config(plain_addr), rt.clone())
140142
.with_stack(connect.clone())
141143
.push_tcp_logical(resolver.clone())
142144
.into_inner()
143145
.new_service(plain)
144146
.oneshot(client_io.build())
145147
.err_into::<Error>();
146148

147-
let tls = Outbound::new(default_config(tls.orig_dst), rt)
149+
let tls = Outbound::new(default_config(tls_addr), rt)
148150
.with_stack(connect)
149151
.push_tcp_logical(resolver)
150152
.into_inner()

0 commit comments

Comments
 (0)