Skip to content

Commit ad4d5b6

Browse files
authored
inbound: determine default policies using the opaque ports env var (#2395)
The proxy injector populates an environment variable, `LINKERD2_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION`, with a list of all ports marked as opaque. Currently, however, _the proxy _does not actually use this environment variable_. Instead, opaque ports are discovered from the policy controller. The opaque ports environment variable was used only when running in the "fixed" inbound policy mode, where all inbound policies are determined from environment variables, and no policy controller address is provided. This mode is no longer supported, and the policy controller address is now required, so the `LINKERD2_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` environment variable is not currently used to discover inbound opaque ports. There are two issues with the current state of things. One is that inbound policy discovery is _non-blocking_: when an inbound proxy receives a connection on a port that it has not previously discovered a policy for, it uses the default policy until it has successfully discovered a policy for that port from the policy controller. This means that the proxy may perform protocol detection on the first connection to an opaque port. This isn't great, as it may result in a protocol detection timeout error on a port that the user had previously marked as opaque. It would be preferable for the proxy to read the environment variable, and use it to determine whether the default policy for a port is opaque, so that ports marked as opaque disable protocol detection even before the "actual" policy is discovered. The other issue with the `LINKERD2_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` environment variable is that it is currently a list of _individual port numbers_, while the proxy injector can accept annotations that specify _ranges_ of opaque ports. This means that when a very large number of ports are marked as opaque, the proxy manifest must contain a list of each individual port number in those ranges, making it potentially quite large. See linkerd/linkerd2#9803 for details on this issue. This branch addresses both of these problems. The proxy is changed so that it will once again read the `LINKERD2_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` environment variable, and use it to determine which ports should have opaque policies by default. The parsing of the environment variable is changed to support specifying ports as a list of ranges, rather than a list of individual port numbers. Along with a proxy-injector change, this would resolve the manifest size issue described in linkerd/linkerd2#9803. This is implemented by changing the `inbound::policy::Store` type to also include a set of port ranges that are marked as opaque. When the `Store` handles a `get_policy` call for a port that is not already in the cache, it starts a control plane watch for that port just as it did previously. However, when determining the initial _default_ value for the policy, before the control plane discovery provides one, it checks whether the port is in a range that is marked as opaque, and, if it is, uses an opaque default policy instead. This approach was chosen rather than pre-populating the `Store` with policies for all opaque ports to better handle the case where very large ranges are marked as opaque and are used infrequently. If the `Store` was pre-populated with default policies for all such ports, it would essentially behave as though all ports in `LINKERD2_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` were also in `LINKERD2_PROXY_INBOUND_PORTS`, and the proxy would immediately start a policy controller discovery watch for all opaque ports, which would be kept open for the proxy's entire lifetime. In cases where the opaque ports ranges include ~10,000s of ports, this causes significant unnecessary load on the policy controller. Storing opaque port ranges separately and using them to determine the default policy as needed allows opaque port policies to be treated the same as non-default ports, which are discovered as needed and can be evicted from the cache if they are unused. If a port is in both `LINKERD2_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` *and* `LINKERD2_PROXY_INBOUND_PORTS`, the proxy will start discovery eagerly and retain the port in the cache forever, but the default policy will be opaque. I've also added a test for the behavior of opaque ports where the port's policy has not been discovered from the policy controller. That test fails on `main`, as the proxy attempts protocol detection, but passes on this branch. In addition, I changed the parsing of the `LINKERD2_PROXY_INBOUND_PORTS` environment variable to also accept ranges, because it seemed like a nice thing to do while I was here. :)
1 parent 15bebe4 commit ad4d5b6

File tree

9 files changed

+250
-60
lines changed

9 files changed

+250
-60
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,7 @@ dependencies = [
849849
"linkerd-app-outbound",
850850
"linkerd-error",
851851
"linkerd-opencensus",
852+
"rangemap",
852853
"regex",
853854
"thiserror",
854855
"tokio",
@@ -985,6 +986,7 @@ dependencies = [
985986
"linkerd2-proxy-api",
986987
"once_cell",
987988
"parking_lot",
989+
"rangemap",
988990
"thiserror",
989991
"tokio",
990992
"tokio-test",
@@ -2300,6 +2302,12 @@ dependencies = [
23002302
"getrandom",
23012303
]
23022304

2305+
[[package]]
2306+
name = "rangemap"
2307+
version = "1.3.0"
2308+
source = "registry+https://github.com/rust-lang/crates.io-index"
2309+
checksum = "8b9283c6b06096b47afc7109834fdedab891175bb5241ee5d4f7d2546549f263"
2310+
23032311
[[package]]
23042312
name = "redox_syscall"
23052313
version = "0.2.15"

linkerd/app/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ linkerd-app-inbound = { path = "./inbound" }
2424
linkerd-app-outbound = { path = "./outbound" }
2525
linkerd-error = { path = "../error" }
2626
linkerd-opencensus = { path = "../opencensus" }
27+
rangemap = "1"
2728
regex = "1"
2829
thiserror = "1"
2930
tokio = { version = "1", features = ["rt"] }

linkerd/app/inbound/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ linkerd-tonic-watch = { path = "../../tonic-watch" }
3232
linkerd2-proxy-api = { version = "0.9", features = ["inbound"] }
3333
once_cell = "1"
3434
parking_lot = "0.12"
35+
rangemap = "1"
3536
thiserror = "1"
3637
tokio = { version = "1", features = ["sync"] }
3738
tonic = { version = "0.8", default-features = false }

linkerd/app/inbound/src/policy/config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::{api::Api, DefaultPolicy, GetPolicy, Protocol, ServerPolicy, Store};
22
use linkerd_app_core::{exp_backoff::ExponentialBackoff, proxy::http, Error};
3+
use rangemap::RangeInclusiveSet;
34
use std::{
45
collections::{HashMap, HashSet},
56
sync::Arc,
@@ -17,11 +18,13 @@ pub enum Config {
1718
default: DefaultPolicy,
1819
cache_max_idle_age: Duration,
1920
ports: HashSet<u16>,
21+
opaque_ports: RangeInclusiveSet<u16>,
2022
},
2123
Fixed {
2224
default: DefaultPolicy,
2325
cache_max_idle_age: Duration,
2426
ports: HashMap<u16, ServerPolicy>,
27+
opaque_ports: RangeInclusiveSet<u16>,
2528
},
2629
}
2730

@@ -46,12 +49,14 @@ impl Config {
4649
default,
4750
ports,
4851
cache_max_idle_age,
49-
} => Store::spawn_fixed(default, cache_max_idle_age, ports),
52+
opaque_ports,
53+
} => Store::spawn_fixed(default, cache_max_idle_age, ports, opaque_ports),
5054

5155
Self::Discover {
5256
default,
5357
ports,
5458
cache_max_idle_age,
59+
opaque_ports,
5560
} => {
5661
let watch = {
5762
let detect_timeout = match default {
@@ -63,7 +68,7 @@ impl Config {
6368
};
6469
Api::new(workload, detect_timeout, client).into_watch(backoff)
6570
};
66-
Store::spawn_discover(default, cache_max_idle_age, watch, ports)
71+
Store::spawn_discover(default, cache_max_idle_age, watch, ports, opaque_ports)
6772
}
6873
}
6974
}

linkerd/app/inbound/src/policy/store.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ use linkerd_idle_cache::IdleCache;
44
pub use linkerd_proxy_server_policy::{
55
authz::Suffix, Authentication, Authorization, Protocol, ServerPolicy,
66
};
7+
use rangemap::RangeInclusiveSet;
78
use std::{
89
collections::HashSet,
910
hash::{BuildHasherDefault, Hasher},
11+
sync::Arc,
1012
};
1113
use tokio::{sync::watch, time::Duration};
1214
use tracing::info_span;
@@ -15,6 +17,8 @@ use tracing::info_span;
1517
pub struct Store<S> {
1618
cache: IdleCache<u16, Rx, BuildHasherDefault<PortHasher>>,
1719
default_rx: Rx,
20+
opaque_ports: Arc<RangeInclusiveSet<u16>>,
21+
opaque_default_rx: Rx,
1822
discover: Option<api::Watch<S>>,
1923
}
2024

@@ -34,22 +38,32 @@ impl<S> Store<S> {
3438
default: DefaultPolicy,
3539
idle_timeout: Duration,
3640
ports: impl IntoIterator<Item = (u16, ServerPolicy)>,
41+
opaque_ports: RangeInclusiveSet<u16>,
3742
) -> Self {
43+
let opaque_default_rx = Self::spawn_default(Self::make_opaque(default.clone()));
3844
let cache = {
45+
let opaque_rxs = opaque_ports.iter().flat_map(|range| {
46+
range
47+
.clone()
48+
.into_iter()
49+
.map(|port| (port, opaque_default_rx.clone()))
50+
});
3951
let rxs = ports.into_iter().map(|(p, s)| {
4052
// When using a fixed policy, we don't need to watch for changes. It's
4153
// safe to discard the sender, as the receiver will continue to let us
4254
// borrow/clone each fixed policy.
4355
let (_, rx) = watch::channel(s);
4456
(p, rx)
4557
});
46-
IdleCache::with_permanent_from_iter(idle_timeout, rxs)
58+
IdleCache::with_permanent_from_iter(idle_timeout, opaque_rxs.chain(rxs))
4759
};
4860

4961
Self {
5062
cache,
5163
discover: None,
5264
default_rx: Self::spawn_default(default),
65+
opaque_ports: Arc::new(opaque_ports),
66+
opaque_default_rx,
5367
}
5468
}
5569

@@ -62,6 +76,7 @@ impl<S> Store<S> {
6276
idle_timeout: Duration,
6377
discover: api::Watch<S>,
6478
ports: HashSet<u16>,
79+
opaque_ports: RangeInclusiveSet<u16>,
6580
) -> Self
6681
where
6782
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
@@ -70,6 +85,7 @@ impl<S> Store<S> {
7085
S::ResponseBody:
7186
http::HttpBody<Data = tonic::codegen::Bytes, Error = Error> + Default + Send + 'static,
7287
{
88+
let opaque_default = Self::make_opaque(default.clone());
7389
// The initial set of policies never expire from the cache.
7490
//
7591
// Policies that are dynamically discovered at runtime will expire after
@@ -78,7 +94,11 @@ impl<S> Store<S> {
7894
let cache = {
7995
let rxs = ports.into_iter().map(|port| {
8096
let discover = discover.clone();
81-
let default = default.clone();
97+
let default = if opaque_ports.contains(&port) {
98+
opaque_default.clone()
99+
} else {
100+
default.clone()
101+
};
82102
let rx = info_span!("watch", port)
83103
.in_scope(|| discover.spawn_with_init(port, default.into()));
84104
(port, rx)
@@ -90,6 +110,24 @@ impl<S> Store<S> {
90110
cache,
91111
discover: Some(discover),
92112
default_rx: Self::spawn_default(default),
113+
opaque_ports: Arc::new(opaque_ports),
114+
opaque_default_rx: Self::spawn_default(opaque_default),
115+
}
116+
}
117+
118+
fn make_opaque(default: DefaultPolicy) -> DefaultPolicy {
119+
match default {
120+
DefaultPolicy::Allow(mut policy) => {
121+
policy.protocol = match policy.protocol {
122+
Protocol::Detect { tcp_authorizations, .. } => {
123+
Protocol::Opaque(tcp_authorizations)
124+
}
125+
opaq @ Protocol::Opaque(_) => opaq,
126+
_ => unreachable!("default policy must have been configured to detect prior to marking it opaque"),
127+
};
128+
DefaultPolicy::Allow(policy)
129+
}
130+
DefaultPolicy::Deny => DefaultPolicy::Deny,
93131
}
94132
}
95133

@@ -113,15 +151,23 @@ where
113151
http::HttpBody<Data = tonic::codegen::Bytes, Error = Error> + Default + Send + 'static,
114152
{
115153
fn get_policy(&self, dst: OrigDstAddr) -> AllowPolicy {
116-
// Lookup the polcify for the target port in the cache. If it doesn't
154+
// Lookup the policy for the target port in the cache. If it doesn't
117155
// already exist, we spawn a watch on the API (if it is configured). If
118156
// no discovery API is configured we use the default policy.
119157
let server =
120158
self.cache
121159
.get_or_insert_with(dst.port(), |port| match self.discover.clone() {
122160
Some(disco) => info_span!("watch", port).in_scope(|| {
123-
tracing::trace!(%port, "spawning policy discovery");
124-
disco.spawn_with_init(*port, self.default_rx.borrow().clone())
161+
let is_default_opaque = self.opaque_ports.contains(port);
162+
tracing::trace!(%port, is_default_opaque, "spawning policy discovery");
163+
// If the port is in the range of ports marked as
164+
// opaque, use the opaque default policy instead.
165+
let init = if is_default_opaque {
166+
self.opaque_default_rx.borrow().clone()
167+
} else {
168+
self.default_rx.borrow().clone()
169+
};
170+
disco.spawn_with_init(*port, init)
125171
}),
126172

127173
// If no discovery API is configured, then we use the

linkerd/app/inbound/src/policy/tcp/tests.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ impl Store<MockSvc> {
272272
default: impl Into<DefaultPolicy>,
273273
ports: impl IntoIterator<Item = (u16, ServerPolicy)>,
274274
) -> Self {
275-
Self::spawn_fixed(default.into(), std::time::Duration::MAX, ports)
275+
Self::spawn_fixed(
276+
default.into(),
277+
std::time::Duration::MAX,
278+
ports,
279+
Default::default(),
280+
)
276281
}
277282
}

linkerd/app/inbound/src/test_util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub fn default_config() -> Config {
4949
}
5050
.into(),
5151
ports: Default::default(),
52+
opaque_ports: Default::default(),
5253
};
5354

5455
Config {

linkerd/app/integration/src/tests/transparency.rs

Lines changed: 74 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,80 @@ async fn inbound_tcp_server_first() {
225225
test_inbound_server_speaks_first(TestEnv::default()).await;
226226
}
227227

228+
/// Like `tcp_server_first`, but the opaque port configuration is not discovered
229+
/// from the policy controller before accepting the connection (i.e. the port is
230+
/// *not* in `LINKERD2_PROXY_INBOUND_PORTS`).
231+
#[tokio::test]
232+
async fn inbound_tcp_server_first_no_discovery() {
233+
const TIMEOUT: Duration = Duration::from_secs(5);
234+
235+
let _trace = trace_init();
236+
237+
let (tx, rx) = mpsc::channel(1);
238+
let srv = server::tcp()
239+
.accept_fut(move |sock| serve_server_first(sock, tx))
240+
.run()
241+
.await;
242+
243+
let mut env = TestEnv::default();
244+
env.put(
245+
app::env::ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION,
246+
srv.addr.port().to_string(),
247+
);
248+
249+
let proxy = proxy::new().inbound(srv).run_with_test_env(env).await;
250+
251+
server_first_client(proxy.inbound, rx).await;
252+
253+
// ensure panics from the server are propagated
254+
proxy.join_servers().await;
255+
}
256+
257+
// FIXME(ver) this test doesn't actually test TLS functionality.
258+
#[ignore]
259+
#[tokio::test]
260+
async fn inbound_tcp_server_first_tls() {
261+
use std::path::PathBuf;
262+
263+
let (_cert, _key, _trust_anchors) = {
264+
let path_to_string = |path: &PathBuf| {
265+
path.as_path()
266+
.to_owned()
267+
.into_os_string()
268+
.into_string()
269+
.unwrap()
270+
};
271+
let mut tls = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
272+
tls.push("src");
273+
tls.push("transport");
274+
tls.push("tls");
275+
tls.push("testdata");
276+
277+
tls.push("foo-ns1-ca1.crt");
278+
let cert = path_to_string(&tls);
279+
280+
tls.set_file_name("foo-ns1-ca1.p8");
281+
let key = path_to_string(&tls);
282+
283+
tls.set_file_name("ca1.pem");
284+
let trust_anchors = path_to_string(&tls);
285+
(cert, key, trust_anchors)
286+
};
287+
288+
let env = TestEnv::default();
289+
290+
// FIXME
291+
//env.put(app::env::ENV_TLS_CERT, cert);
292+
//env.put(app::env::ENV_TLS_PRIVATE_KEY, key);
293+
//env.put(app::env::ENV_TLS_TRUST_ANCHORS, trust_anchors);
294+
//env.put(
295+
// app::env::ENV_TLS_LOCAL_IDENTITY,
296+
// "foo.deployment.ns1.linkerd-managed.linkerd.svc.cluster.local".to_string(),
297+
//);
298+
299+
test_inbound_server_speaks_first(env).await
300+
}
301+
228302
/// Tests that the outbound proxy does not attempt to perform protocol detection
229303
/// when the policy controller returns an `OutboundPolicy` indicating that the
230304
/// destination is opaque.
@@ -304,52 +378,6 @@ async fn server_first_client(addr: SocketAddr, mut rx: mpsc::Receiver<()>) {
304378
// TCP client must close first
305379
tcp_client.shutdown().await;
306380
}
307-
308-
// FIXME(ver) this test doesn't actually test TLS functionality.
309-
#[ignore]
310-
#[tokio::test]
311-
async fn inbound_tcp_server_first_tls() {
312-
use std::path::PathBuf;
313-
314-
let (_cert, _key, _trust_anchors) = {
315-
let path_to_string = |path: &PathBuf| {
316-
path.as_path()
317-
.to_owned()
318-
.into_os_string()
319-
.into_string()
320-
.unwrap()
321-
};
322-
let mut tls = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
323-
tls.push("src");
324-
tls.push("transport");
325-
tls.push("tls");
326-
tls.push("testdata");
327-
328-
tls.push("foo-ns1-ca1.crt");
329-
let cert = path_to_string(&tls);
330-
331-
tls.set_file_name("foo-ns1-ca1.p8");
332-
let key = path_to_string(&tls);
333-
334-
tls.set_file_name("ca1.pem");
335-
let trust_anchors = path_to_string(&tls);
336-
(cert, key, trust_anchors)
337-
};
338-
339-
let env = TestEnv::default();
340-
341-
// FIXME
342-
//env.put(app::env::ENV_TLS_CERT, cert);
343-
//env.put(app::env::ENV_TLS_PRIVATE_KEY, key);
344-
//env.put(app::env::ENV_TLS_TRUST_ANCHORS, trust_anchors);
345-
//env.put(
346-
// app::env::ENV_TLS_LOCAL_IDENTITY,
347-
// "foo.deployment.ns1.linkerd-managed.linkerd.svc.cluster.local".to_string(),
348-
//);
349-
350-
test_inbound_server_speaks_first(env).await
351-
}
352-
353381
#[tokio::test]
354382
async fn tcp_connections_close_if_client_closes() {
355383
let _trace = trace_init();

0 commit comments

Comments
 (0)