Skip to content

Commit f2fb0c4

Browse files
authored
Configure inbound HTTP routes via gRPC (#1814)
The policy controller serves inbound server configuration. Recent changes have updated the inbound proxy to support applying HTTP route-specific policies, but only a single default route configuration was used. This change updates the proxy to use a (not yet released) new proxy API version that includes HTTP routes in server responses. This change adds protobuf conversions from these types to the proxy's HTTP route types. If the proxy receives a router filter of an unknown type (i.e., because the controller is running a later version of the API that includes new types), then the proxy will FAIL all requests on that route with an internal server error. It's considered safer to fail hard in this case, rather than to silently ignore a configured policy that could potentially be security-sensitive. Signed-off-by: Oliver Gould <[email protected]>
1 parent c616446 commit f2fb0c4

File tree

36 files changed

+1145
-187
lines changed

36 files changed

+1145
-187
lines changed

Cargo.lock

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,7 @@ dependencies = [
870870
"linkerd-tonic-watch",
871871
"linkerd-tracing",
872872
"linkerd2-proxy-api",
873+
"once_cell",
873874
"parking_lot",
874875
"thiserror",
875876
"tokio",
@@ -1145,6 +1146,8 @@ name = "linkerd-http-route"
11451146
version = "0.1.0"
11461147
dependencies = [
11471148
"http",
1149+
"linkerd2-proxy-api",
1150+
"maplit",
11481151
"rand",
11491152
"regex",
11501153
"thiserror",
@@ -1697,9 +1700,9 @@ dependencies = [
16971700

16981701
[[package]]
16991702
name = "linkerd2-proxy-api"
1700-
version = "0.5.0"
1703+
version = "0.6.0"
17011704
source = "registry+https://github.com/rust-lang/crates.io-index"
1702-
checksum = "12c93aba8dbdc8f465de51ef08c5e1938790ea0ae7390d66b3f4d2d36c297d0b"
1705+
checksum = "6af1b0893c92d50e1af9a9342df7bd1ba73b9ef2abce700e170f2b2d4c84ac72"
17031706
dependencies = [
17041707
"h2",
17051708
"http",
@@ -1739,6 +1742,12 @@ dependencies = [
17391742
"linked-hash-map",
17401743
]
17411744

1745+
[[package]]
1746+
name = "maplit"
1747+
version = "1.0.2"
1748+
source = "registry+https://github.com/rust-lang/crates.io-index"
1749+
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
1750+
17421751
[[package]]
17431752
name = "match_cfg"
17441753
version = "0.1.0"

linkerd/app/core/src/errors/respond.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,15 @@ where
9191

9292
impl SyntheticHttpResponse {
9393
pub fn unexpected_error() -> Self {
94+
Self::internal_error("unexpected error")
95+
}
96+
97+
pub fn internal_error(msg: impl Into<Cow<'static, str>>) -> Self {
9498
Self {
9599
close_connection: true,
96100
http_status: http::StatusCode::INTERNAL_SERVER_ERROR,
97101
grpc_status: tonic::Code::Internal,
98-
message: Cow::Borrowed("unexpected error"),
102+
message: msg.into(),
99103
location: None,
100104
}
101105
}

linkerd/app/inbound/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ linkerd-cache = { path = "../../cache" }
1818
linkerd-http-access-log = { path = "../../http-access-log" }
1919
linkerd-server-policy = { path = "../../server-policy", features = ["proto"] }
2020
linkerd-tonic-watch = { path = "../../tonic-watch" }
21-
linkerd2-proxy-api = { version = "0.5", features = ["inbound"] }
21+
linkerd2-proxy-api = { version = "0.6", features = ["inbound"] }
22+
once_cell = "1"
2223
parking_lot = "0.12"
2324
thiserror = "1"
2425
tokio = { version = "1", features = ["sync"] }

linkerd/app/inbound/src/http/server.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ impl errors::HttpRescue<Error> for ServerRescue {
144144
{
145145
return Ok(errors::SyntheticHttpResponse::redirect(*status, location));
146146
}
147+
if let Some(cause) = errors::cause_ref::<policy::HttpInvalidPolicy>(&*error) {
148+
return Ok(errors::SyntheticHttpResponse::internal_error(
149+
cause.to_string(),
150+
));
151+
}
147152

148153
if let Some(cause) = errors::cause_ref::<crate::GatewayDomainInvalid>(&*error) {
149154
return Ok(errors::SyntheticHttpResponse::not_found(cause));

linkerd/app/inbound/src/policy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ pub(crate) use self::store::Store;
99
pub use self::{
1010
config::Config,
1111
http::{
12-
HttpRouteInvalidRedirect, HttpRouteNotFound, HttpRouteRedirect, HttpRouteUnauthorized,
13-
NewHttpPolicy,
12+
HttpInvalidPolicy, HttpRouteInvalidRedirect, HttpRouteNotFound, HttpRouteRedirect,
13+
HttpRouteUnauthorized, NewHttpPolicy,
1414
},
1515
tcp::NewTcpPolicy,
1616
};

linkerd/app/inbound/src/policy/api.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ use linkerd_app_core::{
1010
};
1111
use linkerd_server_policy::ServerPolicy;
1212
use linkerd_tonic_watch::StreamWatch;
13+
use std::time;
1314

1415
#[derive(Clone, Debug)]
1516
pub(super) struct Api<S> {
1617
workload: String,
18+
detect_timeout: time::Duration,
1719
client: Client<S>,
1820
}
1921

@@ -22,15 +24,20 @@ pub(super) struct GrpcRecover(ExponentialBackoff);
2224

2325
pub(super) type Watch<S> = StreamWatch<GrpcRecover, Api<S>>;
2426

27+
/// If an invalid policy is encountered, then this will be updated to hold a
28+
/// default, invalid policy.
29+
static INVALID_POLICY: once_cell::sync::OnceCell<ServerPolicy> = once_cell::sync::OnceCell::new();
30+
2531
impl<S> Api<S>
2632
where
2733
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error> + Clone,
2834
S::ResponseBody:
2935
http::HttpBody<Data = tonic::codegen::Bytes, Error = Error> + Default + Send + 'static,
3036
{
31-
pub(super) fn new(workload: String, client: S) -> Self {
37+
pub(super) fn new(workload: String, detect_timeout: time::Duration, client: S) -> Self {
3238
Self {
3339
workload,
40+
detect_timeout,
3441
client: Client::new(client),
3542
}
3643
}
@@ -65,20 +72,24 @@ where
6572
port: port.into(),
6673
workload: self.workload.clone(),
6774
};
75+
let detect_timeout = self.detect_timeout;
6876
let mut client = self.client.clone();
6977
Box::pin(async move {
7078
let rsp = client.watch_port(tonic::Request::new(req)).await?;
7179
Ok(rsp.map(|updates| {
7280
updates
73-
.map(|up| {
74-
let policy = up?.try_into().map_err(|e| {
75-
tonic::Status::invalid_argument(&*format!(
76-
"received invalid policy: {}",
77-
e
78-
))
79-
})?;
81+
.map_ok(move |up| {
82+
// If the server returned an invalid server policy, we
83+
// default to using an invalid policy that causes all
84+
// requests to report an internal error.
85+
let policy = ServerPolicy::try_from(up).unwrap_or_else(|error| {
86+
tracing::warn!(%error, "Server misconfigured");
87+
INVALID_POLICY
88+
.get_or_init(|| ServerPolicy::invalid(detect_timeout))
89+
.clone()
90+
});
8091
tracing::debug!(?policy);
81-
Ok(policy)
92+
policy
8293
})
8394
.boxed()
8495
}))

linkerd/app/inbound/src/policy/config.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{api::Api, DefaultPolicy, GetPolicy, ServerPolicy, Store};
1+
use super::{api::Api, DefaultPolicy, GetPolicy, Protocol, ServerPolicy, Store};
22
use linkerd_app_core::{control, dns, identity, metrics, svc::NewService};
33
use std::collections::{HashMap, HashSet};
44
use tokio::time::Duration;
@@ -49,8 +49,15 @@ impl Config {
4949
} => {
5050
let watch = {
5151
let backoff = control.connect.backoff;
52-
let c = control.build(dns, metrics, identity).new_service(());
53-
Api::new(workload, c).into_watch(backoff)
52+
let client = control.build(dns, metrics, identity).new_service(());
53+
let detect_timeout = match default {
54+
DefaultPolicy::Allow(ServerPolicy {
55+
protocol: Protocol::Detect { timeout, .. },
56+
..
57+
}) => timeout,
58+
_ => Duration::from_secs(10),
59+
};
60+
Api::new(workload, detect_timeout, client).into_watch(backoff)
5461
};
5562
Store::spawn_discover(default, cache_max_idle_age, watch, ports)
5663
}

linkerd/app/inbound/src/policy/defaults.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use linkerd_app_core::{IpNet, Ipv4Net, Ipv6Net};
22
use linkerd_server_policy::{
33
authz::Suffix, http, Authentication, Authorization, Meta, Protocol, ServerPolicy,
44
};
5-
use std::time::Duration;
5+
use std::{sync::Arc, time::Duration};
66

77
pub fn all_authenticated(timeout: Duration) -> ServerPolicy {
88
mk("all-authenticated", all_nets(), authenticated(), timeout)
@@ -62,18 +62,22 @@ fn mk(
6262
authentication: Authentication,
6363
timeout: Duration,
6464
) -> ServerPolicy {
65-
let authorizations = std::sync::Arc::new([Authorization {
65+
let authorizations = Arc::new([Authorization {
66+
meta: Meta::new_default(name),
6667
networks: nets.into_iter().map(Into::into).collect(),
6768
authentication,
68-
meta: Meta::new_default(name),
6969
}]);
70-
let http = std::sync::Arc::new([http::default(authorizations.clone())]);
70+
71+
// The default policy supports protocol detection and uses the default
72+
// authorization policy on a default route that matches all requests.
73+
let protocol = Protocol::Detect {
74+
timeout,
75+
http: Arc::new([http::default(authorizations.clone())]),
76+
tcp_authorizations: authorizations,
77+
};
78+
7179
ServerPolicy {
72-
protocol: Protocol::Detect {
73-
timeout,
74-
http,
75-
tcp_authorizations: authorizations,
76-
},
7780
meta: Meta::new_default(name),
81+
protocol,
7882
}
7983
}

linkerd/app/inbound/src/policy/http.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ pub struct GrpcRouteInjectedFailure {
7979
pub message: Arc<str>,
8080
}
8181

82+
#[derive(Debug, thiserror::Error)]
83+
#[error("invalid server policy: {0}")]
84+
pub struct HttpInvalidPolicy(&'static str);
85+
8286
// === impl NewHttpPolicy ===
8387

8488
impl<N> NewHttpPolicy<N> {
@@ -285,6 +289,10 @@ fn apply_http_filters<B>(
285289
http::Filter::RequestHeaders(rh) => {
286290
rh.apply(req.headers_mut());
287291
}
292+
293+
http::Filter::InternalError(msg) => {
294+
return Err(HttpInvalidPolicy(msg).into());
295+
}
288296
}
289297
}
290298

@@ -303,6 +311,10 @@ fn apply_grpc_filters<B>(route: &grpc::Policy, req: &mut ::http::Request<B>) ->
303311
grpc::Filter::RequestHeaders(rh) => {
304312
rh.apply(req.headers_mut());
305313
}
314+
315+
grpc::Filter::InternalError(msg) => {
316+
return Err(HttpInvalidPolicy(msg).into());
317+
}
306318
}
307319
}
308320

0 commit comments

Comments
 (0)