Skip to content

Commit fce8c4a

Browse files
authored
inbound, outbound: Param-ify listen::Addrs (#955)
This PR changes the `Service`s constructed by the inbound and outbound stacks from always taking the `listen::Addrs` type as a target, to being parameterized over a target type that implements `Param` for various address types. This required changing some middleware layers that were previously hardcoded to `listen::Addrs`, and adding `Param` impls to `listen::Addrs` for various address types. I also added a new `TcpAccept::from_local_addr` constructor, which sets the target address based on the server's listen address, rather than from the SO_ORIGINAL_DST address. This is used by the admin server, which doesn't configure its listening socket to include original dst addresses. Previously, the `From` implementation for `TcpAccept` just always tried the orig dst addr and fell back to the listen addr if it was not present. Now, we have separate behavior for the admin server and the proxy servers --- admin always uses the local addr, and the proxies only try SO_ORIGINAL_DST or fail. There's no functional change here, but this will set up for a future change to make stacks generic over SO_ORIGINAL_DST detection, allowing us to remove the currently quite awkward feature flagging. Signed-off-by: Eliza Weisman <[email protected]>
1 parent 32e459b commit fce8c4a

File tree

10 files changed

+202
-88
lines changed

10 files changed

+202
-88
lines changed

linkerd/app/inbound/src/direct.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use linkerd_app_core::{
44
proxy::identity::LocalCrtKey,
55
svc::{self, Param},
66
tls,
7-
transport::{self, listen, metrics::SensorIo, ClientAddr, Remote},
7+
transport::{self, metrics::SensorIo, ClientAddr, OrigDstAddr, Remote},
88
transport_header::{self, NewTransportHeaderServer, SessionProtocol, TransportHeader},
99
Conditional, Error, NameAddr, Never,
1010
};
@@ -52,7 +52,7 @@ pub struct ClientInfo {
5252
type FwdIo<I> = io::PrefixedIo<SensorIo<tls::server::Io<I>>>;
5353
pub type GatewayIo<I> = io::EitherIo<FwdIo<I>, SensorIo<tls::server::Io<I>>>;
5454

55-
impl<T> Inbound<T> {
55+
impl<N> Inbound<N> {
5656
/// Builds a stack that handles connections that target the proxy's inbound port
5757
/// (i.e. without an SO_ORIGINAL_DST setting). This port behaves differently from
5858
/// the main proxy stack:
@@ -61,22 +61,23 @@ impl<T> Inbound<T> {
6161
/// 2. TLS is required;
6262
/// 3. A transport header is expected. It's not strictly required, as
6363
/// gateways may need to accept HTTP requests from older proxy versions
64-
pub fn push_direct<I, TSvc, G, GSvc>(
64+
pub fn push_direct<T, I, NSvc, G, GSvc>(
6565
self,
6666
gateway: G,
6767
) -> Inbound<
6868
impl svc::NewService<
69-
listen::Addrs,
69+
T,
7070
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
7171
> + Clone,
7272
>
7373
where
74+
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>> + Clone + Send + 'static,
7475
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
7576
I: Debug + Send + Sync + Unpin + 'static,
76-
T: svc::NewService<TcpEndpoint, Service = TSvc> + Clone + Send + Sync + Unpin + 'static,
77-
TSvc: svc::Service<FwdIo<I>, Response = ()> + Clone + Send + Sync + Unpin + 'static,
78-
TSvc::Error: Into<Error>,
79-
TSvc::Future: Send + Unpin,
77+
N: svc::NewService<TcpEndpoint, Service = NSvc> + Clone + Send + Sync + Unpin + 'static,
78+
NSvc: svc::Service<FwdIo<I>, Response = ()> + Clone + Send + Sync + Unpin + 'static,
79+
NSvc::Error: Into<Error>,
80+
NSvc::Future: Send + Unpin,
8081
G: svc::NewService<GatewayConnection, Service = GSvc>
8182
+ Clone
8283
+ Send
@@ -158,7 +159,7 @@ impl<T> Inbound<T> {
158159
rt.identity.clone().map(WithTransportHeaderAlpn),
159160
detect_timeout,
160161
))
161-
.check_new_service::<listen::Addrs, I>();
162+
.check_new_service::<T, I>();
162163

163164
Inbound {
164165
config,
@@ -170,23 +171,35 @@ impl<T> Inbound<T> {
170171

171172
// === impl ClientInfo ===
172173

173-
impl TryFrom<(tls::ConditionalServerTls, listen::Addrs)> for ClientInfo {
174-
type Error = RefusedNoIdentity;
174+
impl<T> TryFrom<(tls::ConditionalServerTls, T)> for ClientInfo
175+
where
176+
T: Param<Option<OrigDstAddr>>,
177+
T: Param<Remote<ClientAddr>>,
178+
{
179+
type Error = Error;
175180

176-
fn try_from(
177-
(tls, addrs): (tls::ConditionalServerTls, listen::Addrs),
178-
) -> Result<Self, Self::Error> {
181+
fn try_from((tls, addrs): (tls::ConditionalServerTls, T)) -> Result<Self, Self::Error> {
179182
match tls {
180183
Conditional::Some(tls::ServerTls::Established {
181184
client_id: Some(client_id),
182185
negotiated_protocol,
183-
}) => Ok(Self {
184-
client_id,
185-
alpn: negotiated_protocol,
186-
client_addr: addrs.client(),
187-
local_addr: addrs.target_addr(),
188-
}),
189-
_ => Err(RefusedNoIdentity(())),
186+
}) => {
187+
let local: Option<OrigDstAddr> = addrs.param();
188+
let OrigDstAddr(local_addr) = local.ok_or_else(|| {
189+
tracing::warn!("No SO_ORIGINAL_DST address found!");
190+
std::io::Error::new(
191+
std::io::ErrorKind::NotFound,
192+
"No SO_ORIGINAL_DST address found",
193+
)
194+
})?;
195+
Ok(Self {
196+
client_id,
197+
alpn: negotiated_protocol,
198+
client_addr: addrs.param(),
199+
local_addr,
200+
})
201+
}
202+
_ => Err(RefusedNoIdentity(()).into()),
190203
}
191204
}
192205
}

linkerd/app/inbound/src/lib.rs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ use linkerd_app_core::{
2727
serve,
2828
svc::{self, Param},
2929
tls,
30-
transport::{self, listen, Remote, ServerAddr},
30+
transport::{self, ClientAddr, OrigDstAddr, Remote, ServerAddr},
3131
Error, NameMatch, ProxyRuntime,
3232
};
33-
use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
33+
use std::{convert::TryFrom, fmt::Debug, future::Future, net::SocketAddr, time::Duration};
3434
use tracing::{debug_span, info_span};
3535

3636
#[derive(Clone, Debug)]
@@ -208,16 +208,17 @@ where
208208
}
209209
}
210210

211-
pub fn into_server<I, G, GSvc, P>(
211+
pub fn into_server<T, I, G, GSvc, P>(
212212
self,
213213
server_port: u16,
214214
profiles: P,
215215
gateway: G,
216216
) -> impl svc::NewService<
217-
listen::Addrs,
217+
T,
218218
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
219219
> + Clone
220220
where
221+
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>> + Clone + Send + 'static,
221222
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
222223
I: Debug + Send + Sync + Unpin + 'static,
223224
G: svc::NewService<direct::GatewayConnection, Service = GSvc>,
@@ -253,7 +254,7 @@ where
253254
))
254255
.push_request_filter(require_id)
255256
.push(self.runtime.metrics.transport.layer_accept())
256-
.push_map_target(TcpAccept::from)
257+
.push_request_filter(TcpAccept::try_from)
257258
.push(tls::NewDetectTls::layer(
258259
self.runtime.identity.clone(),
259260
config.detect_protocol_timeout,
@@ -266,21 +267,28 @@ where
266267
.stack
267268
.push_map_target(TcpEndpoint::from)
268269
.push(self.runtime.metrics.transport.layer_accept())
269-
.push_map_target(TcpAccept::port_skipped)
270-
.instrument(|_: &_| debug_span!("forward"))
270+
.push_request_filter(TcpAccept::port_skipped)
271+
.check_new_service::<T, _>()
272+
.instrument(|_: &T| debug_span!("forward"))
271273
.into_inner(),
272274
)
273-
.check_new_service::<listen::Addrs, I>()
275+
.check_new_service::<T, I>()
274276
.push_switch(
275-
PreventLoop::from(server_port),
277+
PreventLoop::from(server_port).to_switch(),
276278
self.push_tcp_forward(server_port)
277279
.push_direct(gateway)
278280
.stack
279281
.instrument(|_: &_| debug_span!("direct"))
280282
.into_inner(),
281283
)
282-
.instrument(|a: &listen::Addrs| info_span!("server", port = %a.target_addr().port()))
283-
.check_new_service::<listen::Addrs, I>()
284+
.instrument(|a: &T| {
285+
if let Some(OrigDstAddr(target_addr)) = a.param() {
286+
info_span!("server", port = target_addr.port())
287+
} else {
288+
info_span!("server", port = %"<no original dst>")
289+
}
290+
})
291+
.check_new_service::<T, I>()
284292
.into_inner()
285293
}
286294
}
@@ -293,11 +301,21 @@ impl From<indexmap::IndexSet<u16>> for SkipByPort {
293301
}
294302
}
295303

296-
impl svc::Predicate<listen::Addrs> for SkipByPort {
297-
type Request = svc::Either<listen::Addrs, listen::Addrs>;
304+
impl<T> svc::Predicate<T> for SkipByPort
305+
where
306+
T: Param<Option<OrigDstAddr>>,
307+
{
308+
type Request = svc::Either<T, T>;
298309

299-
fn check(&mut self, t: listen::Addrs) -> Result<Self::Request, Error> {
300-
if !self.0.contains(&t.target_addr().port()) {
310+
fn check(&mut self, t: T) -> Result<Self::Request, Error> {
311+
let OrigDstAddr(addr) = t.param().ok_or_else(|| {
312+
tracing::warn!("No SO_ORIGINAL_DST address found!");
313+
std::io::Error::new(
314+
std::io::ErrorKind::NotFound,
315+
"No SO_ORIGINAL_DST address found",
316+
)
317+
})?;
318+
if !self.0.contains(&addr.port()) {
301319
Ok(svc::Either::A(t))
302320
} else {
303321
Ok(svc::Either::B(t))

linkerd/app/inbound/src/prevent_loop.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::TcpEndpoint;
22
use linkerd_app_core::{
3-
svc::stack::{Either, Predicate},
4-
transport::listen::Addrs,
3+
svc::stack::{Either, Param, Predicate},
4+
transport::addrs::OrigDstAddr,
55
Error,
66
};
77

@@ -10,6 +10,10 @@ use linkerd_app_core::{
1010
pub struct PreventLoop {
1111
port: u16,
1212
}
13+
#[derive(Copy, Clone, Debug)]
14+
pub struct SwitchLoop {
15+
port: u16,
16+
}
1317

1418
#[derive(Copy, Clone, Debug)]
1519
pub struct LoopPrevented {
@@ -34,11 +38,23 @@ impl Predicate<TcpEndpoint> for PreventLoop {
3438
}
3539
}
3640

37-
impl Predicate<Addrs> for PreventLoop {
38-
type Request = Either<Addrs, Addrs>;
41+
impl PreventLoop {
42+
pub fn to_switch(self) -> SwitchLoop {
43+
SwitchLoop { port: self.port }
44+
}
45+
}
46+
47+
impl<T: Param<Option<OrigDstAddr>>> Predicate<T> for SwitchLoop {
48+
type Request = Either<T, T>;
3949

40-
fn check(&mut self, addrs: Addrs) -> Result<Either<Addrs, Addrs>, Error> {
41-
let addr = addrs.target_addr();
50+
fn check(&mut self, addrs: T) -> Result<Either<T, T>, Error> {
51+
let OrigDstAddr(addr) = addrs.param().ok_or_else(|| {
52+
tracing::warn!("No SO_ORIGINAL_DST address found!");
53+
std::io::Error::new(
54+
std::io::ErrorKind::NotFound,
55+
"No SO_ORIGINAL_DST address found",
56+
)
57+
})?;
4258
tracing::debug!(%addr, self.port);
4359
if addr.port() != self.port {
4460
Ok(Either::A(addrs))

linkerd/app/inbound/src/target.rs

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,17 @@ use linkerd_app_core::{
66
stack_tracing,
77
svc::{self, Param},
88
tls,
9-
transport::{self, addrs::*, listen},
9+
transport::{self, addrs::*},
1010
transport_header::TransportHeader,
1111
Addr, Conditional, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER,
1212
};
13-
use std::{convert::TryInto, net::SocketAddr, str::FromStr, sync::Arc};
13+
use std::{
14+
convert::{TryFrom, TryInto},
15+
io,
16+
net::SocketAddr,
17+
str::FromStr,
18+
sync::Arc,
19+
};
1420
use tracing::debug;
1521

1622
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@@ -60,25 +66,57 @@ pub struct RequestTarget {
6066
// === impl TcpAccept ===
6167

6268
impl TcpAccept {
63-
pub fn port_skipped(tcp: listen::Addrs) -> Self {
64-
Self {
65-
target_addr: tcp.target_addr(),
66-
client_addr: tcp.client(),
69+
pub fn port_skipped<T>(tcp: T) -> Result<Self, io::Error>
70+
where
71+
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>>,
72+
{
73+
let orig_dst: Option<OrigDstAddr> = tcp.param();
74+
let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| {
75+
tracing::warn!("No SO_ORIGINAL_DST address found!");
76+
io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found")
77+
})?;
78+
Ok(Self {
79+
target_addr,
80+
client_addr: tcp.param(),
6781
tls: Conditional::None(tls::NoServerTls::PortSkipped),
68-
}
82+
})
6983
}
70-
}
7184

72-
impl From<tls::server::Meta<listen::Addrs>> for TcpAccept {
73-
fn from((tls, addrs): tls::server::Meta<listen::Addrs>) -> Self {
85+
/// Returns a `TcpAccept` for the provided TLS metadata and addresses,
86+
/// determining the target address from the server's local listener address
87+
/// rather than a `SO_ORIGINAL_DST` address.
88+
pub fn from_local_addr<T>((tls, addrs): tls::server::Meta<T>) -> Self
89+
where
90+
T: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>>,
91+
{
92+
let Local(ServerAddr(target_addr)) = addrs.param();
7493
Self {
75-
target_addr: addrs.target_addr(),
76-
client_addr: addrs.client(),
94+
target_addr,
95+
client_addr: addrs.param(),
7796
tls,
7897
}
7998
}
8099
}
81100

101+
impl<T> TryFrom<tls::server::Meta<T>> for TcpAccept
102+
where
103+
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>>,
104+
{
105+
type Error = io::Error;
106+
fn try_from((tls, addrs): tls::server::Meta<T>) -> Result<Self, Self::Error> {
107+
let orig_dst: Option<OrigDstAddr> = addrs.param();
108+
let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| {
109+
tracing::warn!("No SO_ORIGINAL_DST address found!");
110+
io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found")
111+
})?;
112+
Ok(Self {
113+
target_addr,
114+
client_addr: addrs.param(),
115+
tls,
116+
})
117+
}
118+
}
119+
82120
impl Param<SocketAddr> for TcpAccept {
83121
fn param(&self) -> SocketAddr {
84122
self.target_addr

linkerd/app/outbound/src/discover.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::{tcp, Outbound};
22
use linkerd_app_core::{
3-
discovery_rejected, io, profiles, svc,
4-
transport::{listen, metrics::SensorIo},
3+
discovery_rejected, io, profiles,
4+
svc::{self, stack::Param},
5+
transport::{metrics::SensorIo, OrigDstAddr},
56
Error, IpMatch,
67
};
78
use std::convert::TryFrom;
@@ -11,16 +12,17 @@ impl<N> Outbound<N> {
1112
/// Discovers the profile for a TCP endpoint.
1213
///
1314
/// Resolved services are cached and buffered.
14-
pub fn push_discover<I, NSvc, P>(
15+
pub fn push_discover<T, I, NSvc, P>(
1516
self,
1617
profiles: P,
1718
) -> Outbound<
1819
impl svc::NewService<
19-
listen::Addrs,
20+
T,
2021
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send> + Clone,
2122
>,
2223
>
2324
where
25+
T: Param<Option<OrigDstAddr>>,
2426
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
2527
N: svc::NewService<tcp::Logical, Service = NSvc> + Clone + Send + 'static,
2628
NSvc: svc::Service<SensorIo<I>, Response = (), Error = Error> + Send + 'static,
@@ -56,8 +58,8 @@ impl<N> Outbound<N> {
5658
.push(rt.metrics.transport.layer_accept())
5759
.push_cache(config.proxy.cache_max_idle_age)
5860
.instrument(|a: &tcp::Accept| info_span!("server", orig_dst = %a.orig_dst))
59-
.push_request_filter(tcp::Accept::try_from)
60-
.check_new_service::<listen::Addrs, I>();
61+
.push_request_filter(|addrs: T| tcp::Accept::try_from(addrs.param()))
62+
.check_new_service::<T, I>();
6163

6264
Outbound {
6365
config,

0 commit comments

Comments
 (0)