Skip to content

Commit 822cd15

Browse files
authored
inbound: Discover policies from the control plane (#1205)
We've recently introduced support for authorization policies. These policies are configured statically from the proxy's environment. This change adds an optional new mode to support dynamic configuration from the control plane. The proxy now supports new environment variables, `LINKERD2_PROXY_POLICY_SVC_{ADDR,NAME}` that may be set with the address and identity of the policy controller. When this is set, other static inbound port configurations (requiring identity, marking ports as opaque, etc) are ignored in favor of the results returned by the API. Instead, the proxy uses the `LINKER2_PROXY_INBOUND_PORTS` environment variable to discover the list of ports configured in the pod's spec and discovers the policies for each of these ports before admitting inbound connections. When a connection is received for a port not in this list, the `LINKERD2_PROXY_INBOUND_DEFAULT_POLICY` configuration is used to determine whether the connection may be permitted or whether it should be refused. This change modifies the set of default policies to support only `deny`, `all-authenticated` and `all-unauthenticated` (removing `all-mtls-unauthenticated`). Instead a `LINKERD2_PROXY_INBOUND_PORTS_REQUIRE_TLS` configuration is introduced to enable tls requirements for individual pods. This static configuration should only be necessary for the identity controller, in which case we need to use an unauthenticated default policy to permit kubernetes healtchecking probes; and we can use this new configuration to enforce a default policy on the gRPC API port.
1 parent 32b6d52 commit 822cd15

File tree

22 files changed

+1302
-762
lines changed

22 files changed

+1302
-762
lines changed

.github/workflows/advisory.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
# needed for release builds. This job builds the proxy in release-mode to ensure the build isn't
4848
# OOM-killed.
4949
release-binary:
50-
timeout-minutes: 15
50+
timeout-minutes: 20
5151
runs-on: ubuntu-latest
5252
permissions:
5353
contents: read

Cargo.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,10 +745,13 @@ dependencies = [
745745
"linkerd-app-test",
746746
"linkerd-io",
747747
"linkerd-server-policy",
748+
"linkerd-tonic-watch",
748749
"linkerd-tracing",
750+
"linkerd2-proxy-api",
749751
"thiserror",
750752
"tokio",
751753
"tokio-test",
754+
"tonic",
752755
"tower",
753756
"tracing",
754757
]

linkerd/app/core/src/addr_match.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,21 @@ impl AddrMatch {
2828
}
2929
}
3030

31+
#[inline]
3132
pub fn names(&self) -> &NameMatch {
3233
&self.names
3334
}
3435

36+
#[inline]
3537
pub fn nets(&self) -> &IpMatch {
3638
&self.nets
3739
}
3840

41+
#[inline]
42+
pub fn is_empty(&self) -> bool {
43+
self.names.is_empty() && self.nets.is_empty()
44+
}
45+
3946
#[inline]
4047
pub fn matches(&self, addr: &Addr) -> bool {
4148
match addr {
@@ -89,6 +96,11 @@ impl FromIterator<Suffix> for NameMatch {
8996
}
9097

9198
impl NameMatch {
99+
#[inline]
100+
pub fn is_empty(&self) -> bool {
101+
self.0.is_empty()
102+
}
103+
92104
#[inline]
93105
pub fn matches(&self, name: &Name) -> bool {
94106
self.0.iter().any(|sfx| sfx.contains(name))
@@ -108,6 +120,11 @@ impl IpMatch {
108120
Self(Arc::new(nets.into_iter().collect()))
109121
}
110122

123+
#[inline]
124+
pub fn is_empty(&self) -> bool {
125+
self.0.is_empty()
126+
}
127+
111128
#[inline]
112129
pub fn matches(&self, addr: IpAddr) -> bool {
113130
self.0.iter().any(|net| match (net, addr) {

linkerd/app/core/src/control.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,16 @@ type BalanceBody =
3838

3939
type RspBody = linkerd_http_metrics::requests::ResponseBody<BalanceBody, classify::Eos>;
4040

41-
pub type Client<B> = svc::Buffer<http::Request<B>, http::Response<RspBody>, Error>;
41+
pub type Client = svc::Buffer<http::Request<tonic::body::BoxBody>, http::Response<RspBody>, Error>;
4242

4343
impl Config {
44-
pub fn build<B, L>(
44+
pub fn build<L>(
4545
self,
4646
dns: dns::Resolver,
4747
metrics: metrics::ControlHttp,
4848
identity: Option<L>,
49-
) -> svc::BoxNewService<(), Client<B>>
49+
) -> svc::BoxNewService<(), Client>
5050
where
51-
B: http::HttpBody + Send + 'static,
52-
B::Data: Send,
53-
B::Error: Into<Error> + Send + Sync + 'static,
5451
L: Clone + svc::Param<tls::client::Config> + Send + Sync + 'static,
5552
{
5653
let addr = self.addr;

linkerd/app/core/src/serve.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ pub async fn serve<M, S, I, A>(
7575
}
7676
}
7777
}
78-
};
78+
}
79+
.in_current_span();
7980

8081
// Stop the accept loop when the shutdown signal fires.
8182
//

linkerd/app/inbound/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ http = "0.2"
1515
futures = { version = "0.3", default-features = false }
1616
linkerd-app-core = { path = "../core" }
1717
linkerd-server-policy = { path = "../../server-policy" }
18+
linkerd-tonic-watch = { path = "../../tonic-watch" }
19+
linkerd2-proxy-api = { version = "0.2", features = ["client", "inbound"] }
1820
thiserror = "1.0"
1921
tokio = { version = "1", features = ["sync"] }
22+
tonic = { version = "0.5", default-features = false }
2023
tower = { version = "0.4.8", features = ["util"] }
2124
tracing = "0.1.26"
2225

linkerd/app/inbound/src/accept.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::{port_policies::AllowPolicy, Inbound};
1+
use crate::{
2+
policy::{AllowPolicy, CheckPolicy},
3+
Inbound,
4+
};
25
use linkerd_app_core::{
36
io, svc,
47
transport::addrs::{ClientAddr, OrigDstAddr, Remote},
@@ -7,8 +10,8 @@ use linkerd_app_core::{
710
use std::fmt::Debug;
811
use tracing::info_span;
912

10-
#[derive(Clone, Debug, PartialEq, Eq)]
11-
pub struct Accept {
13+
#[derive(Clone, Debug)]
14+
pub(crate) struct Accept {
1215
client_addr: Remote<ClientAddr>,
1316
orig_dst_addr: OrigDstAddr,
1417
policy: AllowPolicy,
@@ -20,9 +23,10 @@ impl<N> Inbound<N> {
2023
/// Builds a stack that accepts connections. Connections to the proxy port are diverted to the
2124
/// 'direct' stack; otherwise connections are associated with a policy and passed to the inner
2225
/// stack.
23-
pub fn push_accept<T, I, NSvc, D, DSvc>(
26+
pub(crate) fn push_accept<T, I, NSvc, D, DSvc>(
2427
self,
2528
proxy_port: u16,
29+
policies: impl CheckPolicy + Clone + Send + Sync + 'static,
2630
direct: D,
2731
) -> Inbound<svc::BoxNewTcp<T, I>>
2832
where
@@ -40,8 +44,7 @@ impl<N> Inbound<N> {
4044
DSvc::Error: Into<Error>,
4145
DSvc::Future: Send,
4246
{
43-
self.map_stack(|cfg, rt, accept| {
44-
let port_policies = cfg.port_policies.clone();
47+
self.map_stack(|_, rt, accept| {
4548
accept
4649
.push_switch(
4750
// Switch to the `direct` stack when a connection's original destination is the
@@ -52,10 +55,12 @@ impl<N> Inbound<N> {
5255
if addr.port() == proxy_port {
5356
return Ok(svc::Either::B(t));
5457
}
55-
let policy = port_policies.check_allowed(t.param(), t.param())?;
58+
let orig_dst_addr = t.param();
59+
let policy = policies.check_policy(orig_dst_addr)?;
60+
tracing::debug!(?policy, "Accepted");
5661
Ok(svc::Either::A(Accept {
5762
client_addr: t.param(),
58-
orig_dst_addr: t.param(),
63+
orig_dst_addr,
5964
policy,
6065
}))
6166
},
@@ -101,7 +106,10 @@ impl svc::Param<AllowPolicy> for Accept {
101106
#[cfg(test)]
102107
mod tests {
103108
use super::*;
104-
use crate::{test_util, DefaultPolicy, PortPolicies};
109+
use crate::{
110+
policy::{DefaultPolicy, Store},
111+
test_util,
112+
};
105113
use futures::future;
106114
use linkerd_app_core::{
107115
svc::{NewService, ServiceExt},
@@ -112,18 +120,21 @@ mod tests {
112120
#[tokio::test(flavor = "current_thread")]
113121
async fn default_allow() {
114122
let (io, _) = io::duplex(1);
115-
let allow = ServerPolicy {
116-
protocol: linkerd_server_policy::Protocol::Opaque,
117-
authorizations: vec![Authorization {
118-
authentication: Authentication::Unauthenticated,
119-
networks: vec![Default::default()],
123+
let policies = Store::fixed(
124+
ServerPolicy {
125+
protocol: linkerd_server_policy::Protocol::Opaque,
126+
authorizations: vec![Authorization {
127+
authentication: Authentication::Unauthenticated,
128+
networks: vec![Default::default()],
129+
labels: Default::default(),
130+
}],
120131
labels: Default::default(),
121-
}],
122-
labels: Default::default(),
123-
};
124-
inbound(allow)
132+
},
133+
None,
134+
);
135+
inbound()
125136
.with_stack(new_ok())
126-
.push_accept(999, new_panic("direct stack must not be built"))
137+
.push_accept(999, policies, new_panic("direct stack must not be built"))
127138
.into_inner()
128139
.new_service(Target(1000))
129140
.oneshot(io)
@@ -133,10 +144,11 @@ mod tests {
133144

134145
#[tokio::test(flavor = "current_thread")]
135146
async fn default_deny() {
147+
let policies = Store::fixed(DefaultPolicy::Deny, None);
136148
let (io, _) = io::duplex(1);
137-
inbound(DefaultPolicy::Deny)
149+
inbound()
138150
.with_stack(new_ok())
139-
.push_accept(999, new_panic("direct stack must not be built"))
151+
.push_accept(999, policies, new_panic("direct stack must not be built"))
140152
.into_inner()
141153
.new_service(Target(1000))
142154
.oneshot(io)
@@ -146,21 +158,20 @@ mod tests {
146158

147159
#[tokio::test(flavor = "current_thread")]
148160
async fn direct() {
161+
let policies = Store::fixed(DefaultPolicy::Deny, None);
149162
let (io, _) = io::duplex(1);
150-
inbound(DefaultPolicy::Deny)
163+
inbound()
151164
.with_stack(new_panic("detect stack must not be built"))
152-
.push_accept(999, new_ok())
165+
.push_accept(999, policies, new_ok())
153166
.into_inner()
154167
.new_service(Target(999))
155168
.oneshot(io)
156169
.await
157170
.expect("should succeed");
158171
}
159172

160-
fn inbound(port_policies: impl Into<PortPolicies>) -> Inbound<()> {
161-
let mut c = test_util::default_config();
162-
c.port_policies = port_policies.into();
163-
Inbound::new(c, test_util::runtime().0)
173+
fn inbound() -> Inbound<()> {
174+
Inbound::new(test_util::default_config(), test_util::runtime().0)
164175
}
165176

166177
fn new_panic<T>(msg: &'static str) -> svc::BoxNewTcp<T, io::DuplexStream> {

0 commit comments

Comments
 (0)