Skip to content

Commit d604106

Browse files
authored
feat(app/inbound): introduce Permitted<T> target (#4119)
this commit introduces a new target type to the inbound proxy's policy enforcement middleware. for context, this middleware is used by "lifting" a `NewService<T, Service = S>` into an `NewService<(HttpRoutePermit, T), Service = S>`, where `S: svc::Service<http::Request<B>>`. this _lazily_ enforces policy, allowing a connection to progress before eventually enforcing authorization policy at the request-level, i.e. when a `S`-typed `Service<Request<_>>` is yielded. this is helpful in part because it allows us to implement generic connection-level middleware across `T`-typed targets, while affixing permits with information about the policy authorizing a request across these connections. this also allows us to inject policy enforcement in the stack before we deal with routing traffic to `Logical` request targets. in subsequent work, we intend to introduce `linkerd_http_prom` metrics middleware to the inbound proxy at this level of the stack. broadly, our middleware layers make use of our `Param<T>` and `ExtractParam<P, T>` traits to abstract over field access from target types. this is useful at the protocol- and connection-level layers of our networking stack because it allows `NewService<T>` middleware like `NewCountRequests` to be agnostic to its `T`-typed target, so long as it (a) is provided an extractor to find a single time series from the labeled metric family to increment given that `T`, and (b) wraps a `NewService<T>` that yields a `Service<Request<B>`. rust's type system does not support generic specialization, which means that our `T: Param<U>`-shaped bounds do not play well with a `(_, T)` tuple, which the outer layers of our inbound proxy's http router deals with. to facilitate the introduction of telemetry middleware wrapping our http router, this commit introduces a `Permitted<T>` type to replace these `(HttpRoutePermit, T)` tuples: ```rust /// Describes an authorized `T`-typed target. pub struct Permitted<T> { pub permit: HttpRoutePermit, pub target: T, } ``` Signed-off-by: katelyn martin <[email protected]>
1 parent 8c1a04f commit d604106

File tree

6 files changed

+100
-58
lines changed

6 files changed

+100
-58
lines changed

linkerd/app/admin/src/stack.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl Config {
114114
.to_layer::<classify::Response, _, Permitted>(),
115115
)
116116
.push(classify::NewClassify::layer_default())
117-
.push_map_target(|(permit, http)| Permitted { permit, http })
117+
.push_map_target(Permitted::from)
118118
.push(inbound::policy::NewHttpPolicy::layer(
119119
metrics.http_authz.clone(),
120120
))
@@ -281,6 +281,17 @@ impl Param<metrics::EndpointLabels> for Permitted {
281281
}
282282
}
283283

284+
impl From<inbound::policy::Permitted<Http>> for Permitted {
285+
fn from(
286+
inbound::policy::Permitted { permit, target }: inbound::policy::Permitted<Http>,
287+
) -> Self {
288+
Self {
289+
permit,
290+
http: target,
291+
}
292+
}
293+
}
294+
284295
// === TlsParams ===
285296

286297
impl<T> ExtractParam<tls::server::Timeout, T> for TlsParams {

linkerd/app/gateway/src/http.rs

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::Gateway;
2-
use inbound::{GatewayAddr, GatewayDomainInvalid};
2+
use inbound::{policy::Permitted, GatewayAddr, GatewayDomainInvalid};
33
use linkerd_app_core::{
44
metrics::ServerLabel,
55
profiles,
@@ -81,50 +81,57 @@ impl Gateway {
8181
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
8282
R::Resolution: Unpin,
8383
{
84-
let http = self
85-
.outbound
86-
.clone()
87-
.with_stack(inner)
88-
.push_http_cached(resolve)
89-
.into_stack()
90-
// Discard `T` and its associated client-specific metadata.
91-
.push_map_target(Target::discard_parent)
92-
.push(svc::ArcNewService::layer())
93-
// Add headers to prevent loops.
94-
.push(NewHttpGateway::layer(
95-
self.inbound.identity().local_id().clone(),
96-
))
97-
.push_on_service(svc::LoadShed::layer())
98-
.lift_new()
99-
.push(svc::ArcNewService::layer())
100-
// After protocol-downgrade, we need to build an inner stack for
101-
// each request-level HTTP version.
102-
.push(svc::NewOneshotRoute::layer_via(|t: &Target<T>| {
103-
ByRequestVersion(t.clone())
104-
}))
105-
// Only permit gateway traffic to endpoints for which we have
106-
// discovery information.
107-
.push_filter(|(_, parent): (_, T)| -> Result<_, GatewayDomainInvalid> {
108-
let routes = {
109-
let mut profile =
110-
svc::Param::<Option<watch::Receiver<profiles::Profile>>>::param(&parent)
84+
let http =
85+
self.outbound
86+
.clone()
87+
.with_stack(inner)
88+
.push_http_cached(resolve)
89+
.into_stack()
90+
// Discard `T` and its associated client-specific metadata.
91+
.push_map_target(Target::discard_parent)
92+
.push(svc::ArcNewService::layer())
93+
// Add headers to prevent loops.
94+
.push(NewHttpGateway::layer(
95+
self.inbound.identity().local_id().clone(),
96+
))
97+
.push_on_service(svc::LoadShed::layer())
98+
.lift_new()
99+
.push(svc::ArcNewService::layer())
100+
// After protocol-downgrade, we need to build an inner stack for
101+
// each request-level HTTP version.
102+
.push(svc::NewOneshotRoute::layer_via(|t: &Target<T>| {
103+
ByRequestVersion(t.clone())
104+
}))
105+
// Only permit gateway traffic to endpoints for which we have
106+
// discovery information.
107+
.push_filter(
108+
|Permitted {
109+
permit: _,
110+
target: parent,
111+
}: Permitted<T>|
112+
-> Result<_, GatewayDomainInvalid> {
113+
let routes = {
114+
let mut profile = svc::Param::<
115+
Option<watch::Receiver<profiles::Profile>>,
116+
>::param(&parent)
111117
.ok_or(GatewayDomainInvalid)?;
112-
let init =
113-
mk_routes(&profile.borrow_and_update()).ok_or(GatewayDomainInvalid)?;
114-
outbound::http::spawn_routes(profile, init, mk_routes)
115-
};
116-
117-
Ok(Target {
118-
routes,
119-
addr: parent.param(),
120-
version: parent.param(),
121-
parent,
122-
})
123-
})
124-
.push(svc::ArcNewService::layer())
125-
// Authorize requests to the gateway.
126-
.push(self.inbound.authorize_http())
127-
.arc_new_clone_http();
118+
let init = mk_routes(&profile.borrow_and_update())
119+
.ok_or(GatewayDomainInvalid)?;
120+
outbound::http::spawn_routes(profile, init, mk_routes)
121+
};
122+
123+
Ok(Target {
124+
routes,
125+
addr: parent.param(),
126+
version: parent.param(),
127+
parent,
128+
})
129+
},
130+
)
131+
.push(svc::ArcNewService::layer())
132+
// Authorize requests to the gateway.
133+
.push(self.inbound.authorize_http())
134+
.arc_new_clone_http();
128135

129136
self.inbound
130137
.clone()

linkerd/app/inbound/src/http/router.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,12 @@ impl<C> Inbound<C> {
236236
.push_on_service(svc::LoadShed::layer())
237237
.push(svc::NewMapErr::layer_from_target::<LogicalError, _>())
238238
.lift_new()
239-
.check_new_new::<(policy::HttpRoutePermit, T), Logical>()
239+
.check_new_new::<policy::Permitted<T>, Logical>()
240240
.push(svc::ArcNewService::layer())
241-
.push(svc::NewOneshotRoute::layer_via(|(permit, t): &(policy::HttpRoutePermit, T)| {
242-
LogicalPerRequest::from((permit.clone(), t.clone()))
241+
.push(svc::NewOneshotRoute::layer_via(|t: &policy::Permitted<T>| {
242+
LogicalPerRequest::from(t)
243243
}))
244-
.check_new_service::<(policy::HttpRoutePermit, T), http::Request<http::BoxBody>>()
244+
.check_new_service::<policy::Permitted<T>, http::Request<http::BoxBody>>()
245245
.push(svc::ArcNewService::layer())
246246
.push(policy::NewHttpPolicy::layer(rt.metrics.http_authz.clone()))
247247
// Used by tap.
@@ -254,12 +254,12 @@ impl<C> Inbound<C> {
254254

255255
// === impl LogicalPerRequest ===
256256

257-
impl<T> From<(policy::HttpRoutePermit, T)> for LogicalPerRequest
257+
impl<'a, T> From<&'a policy::Permitted<T>> for LogicalPerRequest
258258
where
259259
T: Param<Remote<ServerAddr>>,
260260
T: Param<tls::ConditionalServerTls>,
261261
{
262-
fn from((permit, t): (policy::HttpRoutePermit, T)) -> Self {
262+
fn from(policy::Permitted { permit, target }: &'a policy::Permitted<T>) -> Self {
263263
let labels = [
264264
("srv", &permit.labels.route.server.0),
265265
("route", &permit.labels.route.route),
@@ -276,9 +276,9 @@ where
276276
.collect::<std::collections::BTreeMap<_, _>>();
277277

278278
Self {
279-
server: t.param(),
280-
tls: t.param(),
281-
permit,
279+
server: target.param(),
280+
tls: target.param(),
281+
permit: permit.clone(),
282282
labels: labels.into(),
283283
}
284284
}

linkerd/app/inbound/src/policy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub use self::{
1212
config::Config,
1313
http::{
1414
HttpInvalidPolicy, HttpRouteInvalidRedirect, HttpRouteNotFound, HttpRouteRedirect,
15-
HttpRouteUnauthorized, NewHttpPolicy,
15+
HttpRouteUnauthorized, NewHttpPolicy, Permitted,
1616
},
1717
tcp::NewTcpPolicy,
1818
};

linkerd/app/inbound/src/policy/http.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ struct ConnectionMeta {
4646
tls: tls::ConditionalServerTls,
4747
}
4848

49+
/// A `T`-typed target with policy enforced by a [`NewHttpPolicy<N>`] layer.
50+
#[derive(Debug)]
51+
pub struct Permitted<T> {
52+
pub permit: HttpRoutePermit,
53+
pub target: T,
54+
}
55+
4956
#[derive(Debug, thiserror::Error)]
5057
#[error("no route found for request")]
5158
pub struct HttpRouteNotFound(());
@@ -138,7 +145,7 @@ macro_rules! try_fut {
138145
impl<B, T, N, S> svc::Service<::http::Request<B>> for HttpPolicyService<T, N>
139146
where
140147
T: Clone,
141-
N: svc::NewService<(HttpRoutePermit, T), Service = S>,
148+
N: svc::NewService<Permitted<T>, Service = S>,
142149
S: svc::Service<::http::Request<B>>,
143150
S::Error: Into<Error>,
144151
{
@@ -175,7 +182,10 @@ where
175182

176183
future::Either::Left(
177184
self.inner
178-
.new_service((permit, self.target.clone()))
185+
.new_service(Permitted {
186+
permit,
187+
target: self.target.clone(),
188+
})
179189
.oneshot(req)
180190
.err_into::<Error>(),
181191
)
@@ -383,3 +393,17 @@ fn apply_grpc_filters<B>(route: &grpc::Policy, req: &mut ::http::Request<B>) ->
383393

384394
Ok(())
385395
}
396+
397+
// === impl Permitted ===
398+
399+
/// An authorized `T`-typed target can produce `P`-typed parameters.
400+
impl<T, P> svc::Param<P> for Permitted<T>
401+
where
402+
T: svc::Param<P>,
403+
{
404+
fn param(&self) -> P {
405+
let Self { target, .. } = self;
406+
407+
target.param()
408+
}
409+
}

linkerd/app/inbound/src/policy/http/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ macro_rules! new_svc {
3939
policy,
4040
connection: $conn,
4141
metrics: HttpAuthzMetrics::default(),
42-
inner: |(permit, _): (HttpRoutePermit, ())| {
42+
inner: |Permitted { permit, target: () }: Permitted<()>| {
4343
let f = $rsp;
4444
svc::mk(move |req: ::http::Request<BoxBody>| {
4545
futures::future::ready((f)(permit.clone(), req))

0 commit comments

Comments
 (0)