Skip to content

Commit 7eadac4

Browse files
authored
feat(outbound): Use the policy API for opaq routing (#3306)
The OutboundPolicy API is able to express configuration for opaq routing, but the proxy only currently uses the ServiceProfile to configure routing. To support configuring routing via the Gateway API's TCPRoute, we need to begin using the OutboundPolicy API to configure opaq routing without breaking existing configurations that use profiles. With this change, the outbound proxy is updated so that route configurations are required for the proxy to handle outbound opaq traffic. Route configurations may be provided by: * The Profile API, when it returns an Endpoint, so that the proxy synthesizes a route configuration that forwards traffic to the endpoint. * The Profile API, when it returns a traffic split configuration, so that the proxy synthesizes a route configuration that forwards traffic over backend services. * The OutboundPolicy API when it returns opaq routes. When no routes are returned by the API, the proxy fails to route the connections. Follow up changes will enhance this functionality with metrics and improved metadata.
1 parent b34d32b commit 7eadac4

File tree

23 files changed

+954
-477
lines changed

23 files changed

+954
-477
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,6 +1491,7 @@ dependencies = [
14911491
"linkerd-io",
14921492
"linkerd-meshtls",
14931493
"linkerd-meshtls-rustls",
1494+
"linkerd-opaq-route",
14941495
"linkerd-proxy-client-policy",
14951496
"linkerd-retry",
14961497
"linkerd-stack",
@@ -1890,6 +1891,10 @@ dependencies = [
18901891
"tracing",
18911892
]
18921893

1894+
[[package]]
1895+
name = "linkerd-opaq-route"
1896+
version = "0.1.0"
1897+
18931898
[[package]]
18941899
name = "linkerd-opencensus"
18951900
version = "0.1.0"
@@ -2054,6 +2059,7 @@ dependencies = [
20542059
"linkerd-error",
20552060
"linkerd-exp-backoff",
20562061
"linkerd-http-route",
2062+
"linkerd-opaq-route",
20572063
"linkerd-proxy-api-resolve",
20582064
"linkerd-proxy-core",
20592065
"linkerd-tls-route",

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ members = [
4040
"linkerd/meshtls/rustls",
4141
"linkerd/meshtls/verifier",
4242
"linkerd/metrics",
43+
"linkerd/opaq-route",
4344
"linkerd/opencensus",
4445
"linkerd/opentelemetry",
4546
"linkerd/pool",

linkerd/app/gateway/src/opaq.rs

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
use super::{server::Opaq, Gateway};
22
use inbound::{GatewayAddr, GatewayDomainInvalid};
3-
use linkerd_app_core::{io, profiles, svc, tls, transport::addrs::*, Error};
3+
use linkerd_app_core::{io, svc, tls, transport::addrs::*, Error};
44
use linkerd_app_inbound as inbound;
55
use linkerd_app_outbound as outbound;
6+
use tokio::sync::watch;
67

7-
pub type Target = outbound::opaq::Logical;
8+
#[derive(Clone, Debug)]
9+
pub struct Target {
10+
addr: GatewayAddr,
11+
routes: watch::Receiver<outbound::opaq::Routes>,
12+
}
813

914
impl Gateway {
1015
/// Wrap the provided outbound opaque stack with inbound authorization and
@@ -33,22 +38,55 @@ impl Gateway {
3338
.push_filter(
3439
|(_, opaq): (_, Opaq<T>)| -> Result<_, GatewayDomainInvalid> {
3540
// Fail connections were not resolved.
36-
let profile = svc::Param::<Option<profiles::Receiver>>::param(&*opaq)
37-
.ok_or(GatewayDomainInvalid)?;
38-
if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() {
39-
Ok(outbound::opaq::Logical::Route(addr, profile))
40-
} else if let Some((addr, metadata)) = profile.endpoint() {
41-
Ok(outbound::opaq::Logical::Forward(
42-
Remote(ServerAddr(addr)),
43-
metadata,
44-
))
45-
} else {
46-
Err(GatewayDomainInvalid)
47-
}
41+
Target::try_from(opaq)
4842
},
4943
)
5044
// Authorize connections to the gateway.
5145
.push(self.inbound.authorize_tcp())
5246
.arc_new_tcp()
5347
}
5448
}
49+
50+
impl<T> TryFrom<Opaq<T>> for Target
51+
where
52+
T: svc::Param<GatewayAddr>,
53+
{
54+
type Error = GatewayDomainInvalid;
55+
56+
fn try_from(opaq: Opaq<T>) -> Result<Self, Self::Error> {
57+
use svc::Param;
58+
59+
let addr: GatewayAddr = (**opaq).param();
60+
let Some(profile) = (*opaq).param() else {
61+
// The gateway address must be resolvable via the profile API.
62+
return Err(GatewayDomainInvalid);
63+
};
64+
let routes = outbound::opaq::routes_from_discovery(
65+
addr.0.clone().into(),
66+
Some(profile),
67+
(*opaq).param(),
68+
);
69+
70+
Ok(Target { addr, routes })
71+
}
72+
}
73+
74+
impl svc::Param<watch::Receiver<outbound::opaq::Routes>> for Target {
75+
fn param(&self) -> watch::Receiver<outbound::opaq::Routes> {
76+
self.routes.clone()
77+
}
78+
}
79+
80+
impl PartialEq for Target {
81+
fn eq(&self, other: &Self) -> bool {
82+
self.addr == other.addr
83+
}
84+
}
85+
86+
impl Eq for Target {}
87+
88+
impl std::hash::Hash for Target {
89+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
90+
self.addr.hash(state);
91+
}
92+
}

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ linkerd-http-retry = { path = "../../http/retry" }
4040
linkerd-http-route = { path = "../../http/route" }
4141
linkerd-identity = { path = "../../identity" }
4242
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
43+
linkerd-opaq-route = { path = "../../opaq-route" }
4344
linkerd-proxy-client-policy = { path = "../../proxy/client-policy", features = [
4445
"proto",
4546
] }

linkerd/app/outbound/src/discover.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,16 +234,18 @@ fn policy_for_backend(
234234
static NO_OPAQ_FILTERS: Lazy<Arc<[policy::opaq::Filter]>> = Lazy::new(|| Arc::new([]));
235235

236236
let opaque = policy::opaq::Opaque {
237-
policy: Some(policy::opaq::Policy {
238-
meta: meta.clone(),
239-
filters: NO_OPAQ_FILTERS.clone(),
240-
params: Default::default(),
241-
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
242-
policy::RouteBackend {
243-
filters: NO_OPAQ_FILTERS.clone(),
244-
backend: backend.clone(),
245-
},
246-
])),
237+
routes: Some(policy::opaq::Route {
238+
policy: policy::opaq::Policy {
239+
meta: meta.clone(),
240+
filters: NO_OPAQ_FILTERS.clone(),
241+
params: Default::default(),
242+
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
243+
policy::RouteBackend {
244+
filters: NO_OPAQ_FILTERS.clone(),
245+
backend: backend.clone(),
246+
},
247+
])),
248+
},
247249
}),
248250
};
249251

@@ -327,6 +329,15 @@ impl<T> svc::Param<Option<profiles::Receiver>> for Discovery<T> {
327329
}
328330
}
329331

332+
impl<T> svc::Param<OrigDstAddr> for Discovery<T>
333+
where
334+
T: svc::Param<OrigDstAddr>,
335+
{
336+
fn param(&self) -> OrigDstAddr {
337+
self.parent.param()
338+
}
339+
}
340+
330341
impl<T> svc::Param<Option<watch::Receiver<profiles::Profile>>> for Discovery<T> {
331342
fn param(&self) -> Option<watch::Receiver<profiles::Profile>> {
332343
self.profile.clone().map(Into::into)

linkerd/app/outbound/src/http/logical/profile.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ use super::{
22
super::{concrete, retry},
33
CanonicalDstHeader, Concrete, NoRoute,
44
};
5-
use crate::{policy, BackendRef, ParentRef, UNKNOWN_META};
5+
use crate::{service_meta, BackendRef, ParentRef, UNKNOWN_META};
66
use linkerd_app_core::{
77
classify, metrics,
88
proxy::http::{self, balance},
9-
svc, Error, NameAddr,
9+
svc, Error,
1010
};
1111
use linkerd_distribute as distribute;
1212
use std::{fmt::Debug, hash::Hash, sync::Arc, time};
@@ -107,26 +107,6 @@ where
107107
targets,
108108
} = routes;
109109

110-
fn service_meta(addr: &NameAddr) -> Option<Arc<policy::Meta>> {
111-
let mut parts = addr.name().split('.');
112-
113-
let name = parts.next()?;
114-
let namespace = parts.next()?;
115-
116-
if !parts.next()?.eq_ignore_ascii_case("svc") {
117-
return None;
118-
}
119-
120-
Some(Arc::new(policy::Meta::Resource {
121-
group: "core".to_string(),
122-
kind: "Service".to_string(),
123-
namespace: namespace.to_string(),
124-
name: name.to_string(),
125-
section: None,
126-
port: Some(addr.port().try_into().ok()?),
127-
}))
128-
}
129-
130110
let parent_meta = service_meta(&addr).unwrap_or_else(|| UNKNOWN_META.clone());
131111

132112
// Create concrete targets for all of the profile's routes.

linkerd/app/outbound/src/ingress.rs

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ struct Http<T> {
2222
version: http::Version,
2323
}
2424

25-
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26-
struct Opaq<T>(Discovery<T>);
25+
#[derive(Clone, Debug)]
26+
struct Opaq {
27+
orig_dst: OrigDstAddr,
28+
routes: watch::Receiver<opaq::Routes>,
29+
}
2730

2831
#[derive(Clone, Debug)]
2932
struct SelectTarget<T>(Http<T>);
@@ -91,7 +94,7 @@ impl Outbound<()> {
9194
let discover = discover.clone();
9295
self.to_tcp_connect()
9396
.push_opaq_cached(resolve.clone())
94-
.map_stack(|_, _, stk| stk.push_map_target(Opaq))
97+
.map_stack(|_, _, stk| stk.push_map_target(Opaq::from))
9598
.push_discover(svc::mk(move |OrigDstAddr(addr)| {
9699
discover.clone().oneshot(DiscoverAddr(addr.into()))
97100
}))
@@ -600,42 +603,41 @@ impl std::hash::Hash for Logical {
600603
}
601604
}
602605

603-
// === impl Opaq ===
606+
impl<T> From<Discovery<T>> for Opaq
607+
where
608+
T: svc::Param<OrigDstAddr>,
609+
{
610+
fn from(discovery: Discovery<T>) -> Self {
611+
use svc::Param;
604612

605-
impl<T> std::ops::Deref for Opaq<T> {
606-
type Target = T;
613+
let orig_dst: OrigDstAddr = discovery.param();
614+
let routes = opaq::routes_from_discovery(
615+
Addr::Socket(orig_dst.into()),
616+
discovery.param(),
617+
discovery.param(),
618+
);
607619

608-
fn deref(&self) -> &Self::Target {
609-
&self.0
620+
Self { routes, orig_dst }
610621
}
611622
}
612623

613-
impl<T> svc::Param<Remote<ServerAddr>> for Opaq<T>
614-
where
615-
T: svc::Param<OrigDstAddr>,
616-
{
617-
fn param(&self) -> Remote<ServerAddr> {
618-
let OrigDstAddr(addr) = (*self.0).param();
619-
Remote(ServerAddr(addr))
624+
impl PartialEq for Opaq {
625+
fn eq(&self, other: &Self) -> bool {
626+
self.orig_dst == other.orig_dst
620627
}
621628
}
622629

623-
impl<T> svc::Param<opaq::Logical> for Opaq<T>
624-
where
625-
T: svc::Param<OrigDstAddr>,
626-
{
627-
fn param(&self) -> opaq::Logical {
628-
if let Some(profile) = svc::Param::<Option<profiles::Receiver>>::param(&self.0) {
629-
if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() {
630-
return opaq::Logical::Route(addr, profile);
631-
}
630+
impl Eq for Opaq {}
632631

633-
if let Some((addr, metadata)) = profile.endpoint() {
634-
return opaq::Logical::Forward(Remote(ServerAddr(addr)), metadata);
635-
}
636-
}
632+
impl std::hash::Hash for Opaq {
633+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
634+
self.orig_dst.hash(state);
635+
}
636+
}
637637

638-
opaq::Logical::Forward(self.param(), Default::default())
638+
impl svc::Param<watch::Receiver<opaq::Routes>> for Opaq {
639+
fn param(&self) -> watch::Receiver<opaq::Routes> {
640+
self.routes.clone()
639641
}
640642
}
641643

linkerd/app/outbound/src/lib.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use linkerd_app_core::{
2323
svc::{self, ServiceExt},
2424
tls::ConnectMeta as TlsConnectMeta,
2525
transport::addrs::*,
26-
AddrMatch, Error, ProxyRuntime,
26+
AddrMatch, Error, NameAddr, ProxyRuntime,
2727
};
2828
use linkerd_tonic_stream::ReceiveLimits;
2929
use std::{
@@ -342,3 +342,23 @@ impl EndpointRef {
342342

343343
static UNKNOWN_META: once_cell::sync::Lazy<Arc<policy::Meta>> =
344344
once_cell::sync::Lazy::new(|| policy::Meta::new_default("unknown"));
345+
346+
pub(crate) fn service_meta(addr: &NameAddr) -> Option<Arc<policy::Meta>> {
347+
let mut parts = addr.name().split('.');
348+
349+
let name = parts.next()?;
350+
let namespace = parts.next()?;
351+
352+
if !parts.next()?.eq_ignore_ascii_case("svc") {
353+
return None;
354+
}
355+
356+
Some(Arc::new(policy::Meta::Resource {
357+
group: "core".to_string(),
358+
kind: "Service".to_string(),
359+
namespace: namespace.to_string(),
360+
name: name.to_string(),
361+
section: None,
362+
port: Some(addr.port().try_into().ok()?),
363+
}))
364+
}

0 commit comments

Comments
 (0)