Skip to content

Commit 103a480

Browse files
authored
gateway: synthesize ClientPolicies when the controller returns NotFound (#2333)
Both outbound and gateway proxies now resolve client policies from the OutboundPolicies API. When the outbound proxy attempts to discover a policy and the policy controller returns NotFound, it synthesizes a default policy from the discovered ServiceProfile. However, when the gateway proxy receives a NotFound, it will currently fail the connection, based on the assumption that only valid cluster DNS names are gatewayed (and not arbitrary IPs that might be forwards). Unfortunately, this is not quite true. Gateway proxies may attempt to discover cluster DNS names that are Pod DNS names, rather than Service DNS names, and the policy controller will return NotFound for those names. This branch therefore changes the gateway proxy to also synthesize default ClientPolicies based on the ServiceProfile when receiving a NotFound status. Some of the code for synthesizing a client policy from a ServiceProfile that's currently used in the outbound proxy was factored out so that it could be reused here.
1 parent ebb0496 commit 103a480

File tree

5 files changed

+111
-62
lines changed

5 files changed

+111
-62
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,9 @@ dependencies = [
949949
"linkerd-app-inbound",
950950
"linkerd-app-outbound",
951951
"linkerd-app-test",
952+
"linkerd-proxy-client-policy",
952953
"linkerd-proxy-server-policy",
954+
"once_cell",
953955
"thiserror",
954956
"tokio",
955957
"tokio-test",

linkerd/app/gateway/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ futures = { version = "0.3", default-features = false }
1212
linkerd-app-core = { path = "../core" }
1313
linkerd-app-inbound = { path = "../inbound" }
1414
linkerd-app-outbound = { path = "../outbound" }
15+
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
16+
once_cell = "1"
1517
thiserror = "1"
1618
tokio = { version = "1", features = ["sync"] }
1719
tonic = { version = "0.8", default-features = false }

linkerd/app/gateway/src/discover.rs

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use crate::Gateway;
2-
use futures::TryFutureExt;
2+
use futures::FutureExt;
33
use linkerd_app_core::{errors, profiles, svc, Error};
44
use linkerd_app_inbound::{GatewayAddr, GatewayDomainInvalid};
55
use linkerd_app_outbound::{self as outbound};
6+
use linkerd_proxy_client_policy::{self as policy, ClientPolicy};
7+
use once_cell::sync::Lazy;
8+
use std::sync::Arc;
69
use tokio::sync::watch;
710
use tracing::Instrument;
811

@@ -30,6 +33,14 @@ impl Gateway {
3033
use futures::future;
3134

3235
let allowlist = self.config.allow_discovery.clone();
36+
let detect_timeout = self.outbound.config().proxy.detect_protocol_timeout;
37+
let queue = {
38+
let queue = self.outbound.config().tcp_connection_queue;
39+
policy::Queue {
40+
capacity: queue.capacity,
41+
failfast_timeout: queue.failfast_timeout,
42+
}
43+
};
3344
svc::mk(move |GatewayAddr(addr)| {
3445
tracing::debug!(%addr, "Discover");
3546

@@ -45,19 +56,55 @@ impl Gateway {
4556

4657
let policy = policies
4758
.get_policy(addr.into())
48-
.map_err(|e| {
49-
// If the policy controller returned `NotFound`, indicating
50-
// that it doesn't have a policy for this addr, then we
51-
// can't gateway this address.
52-
if is_not_found(&e) {
53-
GatewayDomainInvalid.into()
54-
} else {
55-
e
56-
}
57-
})
5859
.instrument(tracing::debug_span!("policy"));
5960

60-
future::Either::Right(future::try_join(profile, policy))
61+
let discovery = future::join(profile, policy).map(move |(profile, policy)| {
62+
tracing::debug!("Discovered");
63+
64+
let profile = profile?.ok_or(GatewayDomainInvalid)?;
65+
66+
// If there was a policy resolution, return it with the profile so
67+
// the stack can determine how to switch on them.
68+
match policy {
69+
Ok(policy) => return Ok((Some(profile), policy)),
70+
// The policy controller currently rejects discovery for DNS
71+
// names that are not Services, so we will get a `NotFound`
72+
// error if we looked up a pod DNS name. In this case, we
73+
// will synthesize a default policy.
74+
Err(error) if is_not_found(&error) => tracing::debug!("Policy not found"),
75+
Err(error) => return Err(error),
76+
}
77+
78+
let policy = outbound::spawn_synthesized_profile_policy(
79+
profile.clone().into(),
80+
move |profile| {
81+
static META: Lazy<Arc<policy::Meta>> = Lazy::new(|| {
82+
Arc::new(policy::Meta::Default {
83+
name: "gateway".into(),
84+
})
85+
});
86+
87+
match profile.endpoint.clone() {
88+
Some((addr, meta)) => outbound::synthesize_forward_policy(
89+
&META,
90+
detect_timeout,
91+
queue,
92+
addr,
93+
meta,
94+
),
95+
None => {
96+
tracing::debug!(
97+
"Gateway ServiceProfile does not contain an endpoint"
98+
);
99+
ClientPolicy::invalid(detect_timeout)
100+
}
101+
}
102+
},
103+
);
104+
Ok((Some(profile), policy))
105+
});
106+
107+
future::Either::Right(discovery)
61108
})
62109
}
63110
}

linkerd/app/outbound/src/discover.rs

Lines changed: 44 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use crate::{policy, Outbound};
1+
use crate::{
2+
policy::{self, ClientPolicy},
3+
Outbound,
4+
};
25
use linkerd_app_core::{errors, profiles, svc, transport::OrigDstAddr, Error};
3-
use linkerd_proxy_client_policy::ClientPolicy;
46
use once_cell::sync::Lazy;
57
use std::{
68
fmt::Debug,
@@ -114,10 +116,21 @@ impl<N> Outbound<N> {
114116
// enpdoint policy.
115117
if let Some(profile) = profile {
116118
let policy = spawn_synthesized_profile_policy(
117-
orig_dst,
118119
profile.clone().into(),
119-
queue,
120-
detect_timeout,
120+
move |profile: &profiles::Profile| {
121+
static META: Lazy<Arc<policy::Meta>> = Lazy::new(|| {
122+
Arc::new(policy::Meta::Default {
123+
name: "endpoint".into(),
124+
})
125+
});
126+
let (addr, meta) = profile
127+
.endpoint
128+
.clone()
129+
.unwrap_or_else(|| (orig_dst, Default::default()));
130+
// TODO(ver) We should be able to figure out resource coordinates for
131+
// the endpoint?
132+
synthesize_forward_policy(&META, detect_timeout, queue, addr, meta)
133+
},
121134
);
122135
return Ok((Some(profile), policy));
123136
}
@@ -137,29 +150,11 @@ fn is_not_found(e: &Error) -> bool {
137150
.unwrap_or(false)
138151
}
139152

140-
fn spawn_synthesized_profile_policy(
141-
orig_dst: SocketAddr,
153+
pub fn spawn_synthesized_profile_policy(
142154
mut profile: watch::Receiver<profiles::Profile>,
143-
queue: policy::Queue,
144-
detect_timeout: Duration,
155+
synthesize: impl Fn(&profiles::Profile) -> policy::ClientPolicy + Send + 'static,
145156
) -> watch::Receiver<policy::ClientPolicy> {
146-
static META: Lazy<Arc<policy::Meta>> = Lazy::new(|| {
147-
Arc::new(policy::Meta::Default {
148-
name: "endpoint".into(),
149-
})
150-
});
151-
152-
let mk = move |profile: &profiles::Profile| {
153-
let (addr, meta) = profile
154-
.endpoint
155-
.clone()
156-
.unwrap_or_else(|| (orig_dst, Default::default()));
157-
// TODO(ver) We should be able to figure out resource coordinates for
158-
// the endpoint?
159-
synthesize_forward_policy(&META, detect_timeout, queue, addr, meta)
160-
};
161-
162-
let policy = mk(&*profile.borrow_and_update());
157+
let policy = synthesize(&*profile.borrow_and_update());
163158
tracing::debug!(?policy, profile = ?*profile.borrow(), "Synthesizing policy from profile");
164159
let (tx, rx) = watch::channel(policy);
165160
tokio::spawn(
@@ -178,7 +173,7 @@ fn spawn_synthesized_profile_policy(
178173
}
179174
}
180175
};
181-
let policy = mk(&*profile.borrow());
176+
let policy = synthesize(&*profile.borrow());
182177
tracing::debug!(?policy, "Profile updated; synthesizing policy");
183178
if tx.send(policy).is_err() {
184179
tracing::debug!("Policy watch closed, terminating");
@@ -191,28 +186,7 @@ fn spawn_synthesized_profile_policy(
191186
rx
192187
}
193188

194-
fn spawn_synthesized_origdst_policy(
195-
orig_dst: SocketAddr,
196-
queue: policy::Queue,
197-
detect_timeout: Duration,
198-
) -> watch::Receiver<policy::ClientPolicy> {
199-
static META: Lazy<Arc<policy::Meta>> = Lazy::new(|| {
200-
Arc::new(policy::Meta::Default {
201-
name: "fallback".into(),
202-
})
203-
});
204-
205-
let policy =
206-
synthesize_forward_policy(&META, detect_timeout, queue, orig_dst, Default::default());
207-
tracing::debug!(?policy, "Synthesizing policy");
208-
let (tx, rx) = watch::channel(policy);
209-
tokio::spawn(async move {
210-
tx.closed().await;
211-
});
212-
rx
213-
}
214-
215-
fn synthesize_forward_policy(
189+
pub fn synthesize_forward_policy(
216190
meta: &Arc<policy::Meta>,
217191
timeout: Duration,
218192
queue: policy::Queue,
@@ -273,6 +247,27 @@ fn synthesize_forward_policy(
273247
}
274248
}
275249

250+
fn spawn_synthesized_origdst_policy(
251+
orig_dst: SocketAddr,
252+
queue: policy::Queue,
253+
detect_timeout: Duration,
254+
) -> watch::Receiver<policy::ClientPolicy> {
255+
static META: Lazy<Arc<policy::Meta>> = Lazy::new(|| {
256+
Arc::new(policy::Meta::Default {
257+
name: "fallback".into(),
258+
})
259+
});
260+
261+
let policy =
262+
synthesize_forward_policy(&META, detect_timeout, queue, orig_dst, Default::default());
263+
tracing::debug!(?policy, "Synthesizing policy");
264+
let (tx, rx) = watch::channel(policy);
265+
tokio::spawn(async move {
266+
tx.closed().await;
267+
});
268+
rx
269+
}
270+
276271
// === impl Discovery ===
277272

278273
impl<T> From<((Option<profiles::Receiver>, policy::Receiver), T)> for Discovery<T> {

linkerd/app/outbound/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ pub mod tcp;
4444
#[cfg(any(test, feature = "test-util"))]
4545
pub mod test_util;
4646

47-
pub use self::{discover::Discovery, metrics::Metrics};
47+
pub use self::{
48+
discover::{spawn_synthesized_profile_policy, synthesize_forward_policy, Discovery},
49+
metrics::Metrics,
50+
};
4851

4952
#[derive(Clone, Debug)]
5053
pub struct Config {

0 commit comments

Comments
 (0)