Skip to content

Commit aee27f8

Browse files
authored
inbound: Load policies lazily (#1249)
Rather than block inbound communication on policy discovery, this change modifies policy initialization to use the default policy until policy has been synced from the control plane. This will allow us to use policies with the admin server.
1 parent 556cc61 commit aee27f8

File tree

8 files changed

+77
-54
lines changed

8 files changed

+77
-54
lines changed

linkerd/app/inbound/src/metrics/authz.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ metrics! {
2626
}
2727

2828
#[derive(Clone, Debug, Default)]
29-
pub(crate) struct HttpAuthzMetrics(Arc<HttpInner>);
29+
pub struct HttpAuthzMetrics(Arc<HttpInner>);
3030

3131
#[derive(Clone, Debug, Default)]
3232
pub(crate) struct TcpAuthzMetrics(Arc<TcpInner>);

linkerd/app/inbound/src/policy/authorize/http.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@ pub struct AuthorizeHttp<T, N> {
3636
// === impl NewAuthorizeHttp ===
3737

3838
impl<N> NewAuthorizeHttp<N> {
39-
pub(crate) fn layer(
40-
metrics: HttpAuthzMetrics,
41-
) -> impl svc::layer::Layer<N, Service = Self> + Clone {
39+
pub fn layer(metrics: HttpAuthzMetrics) -> impl svc::layer::Layer<N, Service = Self> + Clone {
4240
svc::layer::mk(move |inner| Self {
4341
metrics: metrics.clone(),
4442
inner,

linkerd/app/inbound/src/policy/config.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use super::{discover::Discover, DefaultPolicy, ServerPolicy, Store};
2-
use linkerd_app_core::{
3-
control, dns, metrics, proxy::identity::LocalCrtKey, svc::NewService, Result,
4-
};
2+
use linkerd_app_core::{control, dns, metrics, proxy::identity::LocalCrtKey, svc::NewService};
53
use std::collections::{HashMap, HashSet};
64

75
/// Configures inbound policies.
@@ -25,12 +23,12 @@ pub enum Config {
2523
// === impl Config ===
2624

2725
impl Config {
28-
pub(crate) async fn build(
26+
pub(crate) fn build(
2927
self,
3028
dns: dns::Resolver,
3129
metrics: metrics::ControlHttp,
3230
identity: Option<LocalCrtKey>,
33-
) -> Result<Store> {
31+
) -> Store {
3432
match self {
3533
Self::Fixed { default, ports } => {
3634
let (store, tx) = Store::fixed(default, ports);
@@ -39,7 +37,7 @@ impl Config {
3937
tx.closed().await;
4038
});
4139
}
42-
Ok(store)
40+
store
4341
}
4442
Self::Discover {
4543
control,
@@ -52,7 +50,7 @@ impl Config {
5250
let c = control.build(dns, metrics, identity).new_service(());
5351
Discover::new(workload, c).into_watch(backoff)
5452
};
55-
Store::spawn_discover(default, ports, watch).await
53+
Store::spawn_discover(default, ports, watch)
5654
}
5755
}
5856
}

linkerd/app/inbound/src/policy/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,19 @@ impl From<ServerPolicy> for DefaultPolicy {
6363
}
6464
}
6565

66+
impl From<DefaultPolicy> for ServerPolicy {
67+
fn from(d: DefaultPolicy) -> Self {
68+
match d {
69+
DefaultPolicy::Allow(p) => p,
70+
DefaultPolicy::Deny => ServerPolicy {
71+
protocol: Protocol::Opaque,
72+
authorizations: vec![],
73+
name: "default:deny".to_string(),
74+
},
75+
}
76+
}
77+
}
78+
6679
// === impl AllowPolicy ===
6780

6881
impl AllowPolicy {

linkerd/app/inbound/src/policy/store.rs

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
use super::{discover, AllowPolicy, CheckPolicy, DefaultPolicy, DeniedUnknownPort};
2-
use futures::prelude::*;
32
use linkerd_app_core::{proxy::http, transport::OrigDstAddr, Error, Result};
43
pub use linkerd_server_policy::{Authentication, Authorization, Protocol, ServerPolicy, Suffix};
54
use std::{
65
collections::{HashMap, HashSet},
7-
future::Future,
86
hash::{BuildHasherDefault, Hasher},
97
sync::Arc,
108
};
119
use tokio::sync::watch;
12-
use tracing::{info_span, Instrument};
10+
use tracing::info_span;
1311

1412
#[derive(Clone, Debug)]
1513
pub struct Store {
@@ -74,49 +72,41 @@ impl Store {
7472
/// provided ports. The store maintains these watches so that each time a policy is checked, it
7573
/// may obtain the latest policy provided by the watch. An error is returned if any of the
7674
/// watches cannot be established.
77-
//
78-
// XXX(ver): rustc can't seem to figure out that this Future is `Send` unless we annotate it
79-
// explicitly, hence the manual_async_fn.
80-
#[allow(clippy::manual_async_fn)]
8175
pub(super) fn spawn_discover<S>(
8276
default: DefaultPolicy,
8377
ports: HashSet<u16>,
8478
discover: discover::Watch<S>,
85-
) -> impl Future<Output = Result<Self>> + Send
79+
) -> Self
8680
where
8781
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
8882
S: Clone + Send + Sync + 'static,
8983
S::Future: Send,
9084
S::ResponseBody: http::HttpBody<Error = Error> + Send + Sync + 'static,
9185
{
92-
async move {
93-
let rxs = ports.into_iter().map(|port| {
94-
discover
95-
.clone()
96-
.spawn_watch(port)
97-
.instrument(info_span!("watch", %port))
98-
.map_ok(move |rsp| (port, rsp.into_inner()))
99-
});
100-
101-
let ports = futures::future::join_all(rxs)
102-
.await
103-
.into_iter()
104-
.collect::<Result<PortMap<_>, tonic::Status>>()?;
105-
106-
let default = match Self::mk_default(default) {
107-
Some((tx, rx)) => {
108-
tokio::spawn(async move {
109-
tx.closed().await;
110-
});
111-
Some(rx)
112-
}
113-
None => None,
114-
};
115-
116-
Ok(Self {
117-
default,
118-
ports: Arc::new(ports),
86+
let rxs = ports
87+
.into_iter()
88+
.map(|port| {
89+
let discover = discover.clone();
90+
let default = default.clone();
91+
let rx = info_span!("watch", %port)
92+
.in_scope(|| discover.spawn_with_init(port, default.into()));
93+
(port, rx)
11994
})
95+
.collect::<PortMap<_>>();
96+
97+
let default = match Self::mk_default(default) {
98+
Some((tx, rx)) => {
99+
tokio::spawn(async move {
100+
tx.closed().await;
101+
});
102+
Some(rx)
103+
}
104+
None => None,
105+
};
106+
107+
Self {
108+
default,
109+
ports: Arc::new(rxs),
120110
}
121111
}
122112
}

linkerd/app/inbound/src/server.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ struct TcpEndpoint {
1616
// === impl Inbound ===
1717

1818
impl Inbound<()> {
19-
pub async fn build_policies(
19+
pub fn build_policies(
2020
&self,
2121
dns: dns::Resolver,
2222
control_metrics: metrics::ControlHttp,
@@ -25,8 +25,6 @@ impl Inbound<()> {
2525
.policy
2626
.clone()
2727
.build(dns, control_metrics, self.runtime.identity.clone())
28-
.await
29-
.expect("Failed to fetch port policy")
3028
}
3129

3230
pub async fn serve<A, I, G, GSvc, P>(

linkerd/app/src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,8 @@ impl Config {
210210
.instrument(info_span!("outbound")),
211211
);
212212

213-
let inbound_policies = inbound
214-
.build_policies(dns, control_metrics)
215-
.instrument(info_span!("policy"))
216-
.await;
213+
let inbound_policies =
214+
info_span!("policy").in_scope(|| inbound.build_policies(dns, control_metrics));
217215

218216
tokio::spawn(
219217
inbound

linkerd/tonic-watch/src/lib.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ where
4848
S: Service<T, Response = InnerRsp<U>, Error = tonic::Status>,
4949
S::Future: Send,
5050
{
51-
// Get an update and stream or return None.
51+
// Get an update and stream.
5252
let (init, rsp) = self.init(&target, None).await?;
5353

5454
Ok(rsp.map(move |inner| {
@@ -60,6 +60,34 @@ where
6060
}))
6161
}
6262

63+
/// Returns a watch using the provided initial value.
64+
///
65+
/// The watch is spawned on a background task that completes when the watch service errors in an
66+
/// unrecoverable way or all receivers are dropped.
67+
pub fn spawn_with_init<T, U>(mut self, target: T, init: U) -> watch::Receiver<U>
68+
where
69+
T: Clone + Send + Sync + 'static,
70+
U: Send + Sync + 'static,
71+
S: Service<T, Response = InnerRsp<U>, Error = tonic::Status>,
72+
S::Future: Send,
73+
{
74+
let (tx, rx) = watch::channel(init);
75+
76+
// Spawn a background task to watch the inner service.
77+
tokio::spawn(
78+
async move {
79+
let (up, rsp) = self.init(&target, None).await?;
80+
if tx.send(up).is_ok() {
81+
self.publish_updates(target, tx, rsp.into_inner()).await;
82+
}
83+
Ok::<_, tonic::Status>(())
84+
}
85+
.in_current_span(),
86+
);
87+
88+
rx
89+
}
90+
6391
/// Initiates a lookup stream and obtains the first profile from it.
6492
///
6593
/// If the call fails, recovery and back-off are applied.

0 commit comments

Comments
 (0)