Skip to content

Commit fdd2128

Browse files
committed
Merge branch 'ver/route-reqs-mtx'
2 parents 7859b9a + 9e8c1fc commit fdd2128

File tree

11 files changed

+285
-30
lines changed

11 files changed

+285
-30
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,7 @@ dependencies = [
10321032
name = "linkerd-app-outbound"
10331033
version = "0.1.0"
10341034
dependencies = [
1035+
"ahash",
10351036
"bytes",
10361037
"futures",
10371038
"http",

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ test-subscriber = []
1616
test-util = ["linkerd-app-test", "linkerd-meshtls-rustls/test-util"]
1717

1818
[dependencies]
19+
ahash = "0.8"
1920
bytes = "1"
2021
http = "0.2"
2122
futures = { version = "0.3", default-features = false }

linkerd/app/outbound/src/http/logical.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ where
162162
S::Future: Send,
163163
{
164164
svc::layer::mk(move |concrete: N| {
165-
let policy = svc::stack(concrete.clone()).push(policy::Policy::layer());
165+
let policy = svc::stack(concrete.clone())
166+
.push(policy::Policy::layer(metrics.http_route_backends.clone()));
166167
let profile =
167168
svc::stack(concrete.clone()).push(profile::Params::layer(metrics.proxy.clone()));
168169
svc::stack(concrete)

linkerd/app/outbound/src/http/logical/policy.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ mod router;
88
mod tests;
99

1010
pub use self::{
11-
route::errors,
11+
route::{backend::RouteBackendMetrics, errors},
1212
router::{GrpcParams, HttpParams},
1313
};
1414
pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual};
@@ -45,11 +45,17 @@ where
4545
// Parent target type.
4646
T: Debug + Eq + Hash,
4747
T: Clone + Send + Sync + 'static,
48+
route::backend::ExtractMetrics:
49+
svc::ExtractParam<route::backend::RequestCount, route::backend::Http<T>>,
50+
// route::backend::ExtractMetrics:
51+
// svc::ExtractParam<route::backend::RequestCount, route::backend::Grpc<T>>,
4852
{
4953
/// Builds a stack that dynamically updates and applies HTTP or gRPC policy
5054
/// routing configurations to route requests over cached inner backend
5155
/// services.
52-
pub(super) fn layer<N, S>() -> impl svc::Layer<
56+
pub(super) fn layer<N, S>(
57+
route_backend_metrics: RouteBackendMetrics,
58+
) -> impl svc::Layer<
5359
N,
5460
Service = svc::ArcNewService<
5561
Self,
@@ -74,8 +80,9 @@ where
7480
S::Future: Send,
7581
{
7682
svc::layer::mk(move |inner: N| {
77-
let http = svc::stack(inner.clone()).push(router::Http::layer());
78-
let grpc = svc::stack(inner).push(router::Grpc::layer());
83+
let http =
84+
svc::stack(inner.clone()).push(router::Http::layer(route_backend_metrics.clone()));
85+
let grpc = svc::stack(inner).push(router::Grpc::layer(route_backend_metrics.clone()));
7986

8087
http.push_switch(
8188
|pp: Policy<T>| {

linkerd/app/outbound/src/http/logical/policy/route.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use super::super::Concrete;
1+
use crate::RouteRef;
2+
3+
use super::{super::Concrete, RouteBackendMetrics};
24
use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result};
35
use linkerd_distribute as distribute;
46
use linkerd_http_route as http_route;
@@ -25,7 +27,7 @@ pub(crate) struct Matched<M, P> {
2527
pub(crate) struct Route<T, F, E> {
2628
pub(super) parent: T,
2729
pub(super) addr: Addr,
28-
pub(super) meta: Arc<policy::Meta>,
30+
pub(super) route_ref: RouteRef,
2931
pub(super) filters: Arc<[F]>,
3032
pub(super) distribution: BackendDistribution<T, F>,
3133
pub(super) failure_policy: E,
@@ -66,11 +68,14 @@ where
6668
Self: filters::Apply,
6769
Self: svc::Param<classify::Request>,
6870
MatchedBackend<T, M, F>: filters::Apply,
71+
backend::ExtractMetrics: svc::ExtractParam<backend::RequestCount, MatchedBackend<T, M, F>>,
6972
{
7073
/// Builds a route stack that applies policy filters to requests and
7174
/// distributes requests over each route's backends. These [`Concrete`]
7275
/// backends are expected to be cached/shared by the inner stack.
73-
pub(crate) fn layer<N, S>() -> impl svc::Layer<
76+
pub(crate) fn layer<N, S>(
77+
backend_metrics: RouteBackendMetrics,
78+
) -> impl svc::Layer<
7479
N,
7580
Service = svc::ArcNewService<
7681
Self,
@@ -98,7 +103,7 @@ where
98103
svc::stack(inner)
99104
// Distribute requests across route backends, applying policies
100105
// and filters for each of the route-backends.
101-
.push(MatchedBackend::layer())
106+
.push(MatchedBackend::layer(backend_metrics.clone()))
102107
.lift_new_with_target()
103108
.push(NewDistribute::layer())
104109
// The router does not take the backend's availability into

linkerd/app/outbound/src/http/logical/policy/route/backend.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1+
use crate::RouteRef;
2+
13
use super::{super::Concrete, filters};
24
use linkerd_app_core::{proxy::http, svc, Error, Result};
35
use linkerd_http_route as http_route;
46
use linkerd_proxy_client_policy as policy;
57
use std::{fmt::Debug, hash::Hash, sync::Arc};
68

9+
mod count_reqs;
10+
mod metrics;
11+
12+
pub use self::{count_reqs::RequestCount, metrics::RouteBackendMetrics};
13+
714
#[derive(Debug, PartialEq, Eq, Hash)]
815
pub(crate) struct Backend<T, F> {
16+
pub(crate) route_ref: RouteRef,
917
pub(crate) concrete: Concrete<T>,
1018
pub(crate) filters: Arc<[F]>,
1119
}
@@ -16,11 +24,17 @@ pub(crate) type Http<T> =
1624
pub(crate) type Grpc<T> =
1725
MatchedBackend<T, http_route::grpc::r#match::RouteMatch, policy::grpc::Filter>;
1826

27+
#[derive(Clone, Debug)]
28+
pub struct ExtractMetrics {
29+
metrics: RouteBackendMetrics,
30+
}
31+
1932
// === impl Backend ===
2033

2134
impl<T: Clone, F> Clone for Backend<T, F> {
2235
fn clone(&self) -> Self {
2336
Self {
37+
route_ref: self.route_ref.clone(),
2438
filters: self.filters.clone(),
2539
concrete: self.concrete.clone(),
2640
}
@@ -51,13 +65,16 @@ where
5165
F: Clone + Send + Sync + 'static,
5266
// Assert that filters can be applied.
5367
Self: filters::Apply,
68+
ExtractMetrics: svc::ExtractParam<RequestCount, Self>,
5469
{
5570
/// Builds a stack that applies per-route-backend policy filters over an
5671
/// inner [`Concrete`] stack.
5772
///
5873
/// This [`MatchedBackend`] must implement [`filters::Apply`] to apply these
5974
/// filters.
60-
pub(crate) fn layer<N, S>() -> impl svc::Layer<
75+
pub(crate) fn layer<N, S>(
76+
metrics: RouteBackendMetrics,
77+
) -> impl svc::Layer<
6178
N,
6279
Service = svc::ArcNewService<
6380
Self,
@@ -90,6 +107,9 @@ where
90107
}| concrete,
91108
)
92109
.push(filters::NewApplyFilters::<Self, _, _>::layer())
110+
.push(count_reqs::NewCountRequests::layer_via(ExtractMetrics {
111+
metrics: metrics.clone(),
112+
}))
93113
.push(svc::ArcNewService::layer())
94114
.into_inner()
95115
})
@@ -109,3 +129,23 @@ impl<T> filters::Apply for Grpc<T> {
109129
filters::apply_grpc(&self.r#match, &self.params.filters, req)
110130
}
111131
}
132+
133+
impl<T> svc::ExtractParam<RequestCount, Http<T>> for ExtractMetrics {
134+
fn extract_param(&self, params: &Http<T>) -> RequestCount {
135+
RequestCount(self.metrics.http_requests_total(
136+
params.params.concrete.parent_ref.clone(),
137+
params.params.route_ref.clone(),
138+
params.params.concrete.backend_ref.clone(),
139+
))
140+
}
141+
}
142+
143+
impl<T> svc::ExtractParam<RequestCount, Grpc<T>> for ExtractMetrics {
144+
fn extract_param(&self, params: &Grpc<T>) -> RequestCount {
145+
RequestCount(self.metrics.grpc_requests_total(
146+
params.params.concrete.parent_ref.clone(),
147+
params.params.route_ref.clone(),
148+
params.params.concrete.backend_ref.clone(),
149+
))
150+
}
151+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use linkerd_app_core::{metrics::Counter, svc};
2+
use std::{
3+
sync::Arc,
4+
task::{Context, Poll},
5+
};
6+
7+
#[derive(Clone, Debug, Default)]
8+
pub struct RequestCount(pub Arc<Counter>);
9+
10+
#[derive(Clone, Debug)]
11+
pub struct NewCountRequests<X, N> {
12+
inner: N,
13+
extract: X,
14+
}
15+
16+
#[derive(Clone, Debug)]
17+
pub struct CountRequests<S> {
18+
inner: S,
19+
requests: Arc<Counter>,
20+
}
21+
22+
// === impl NewCountRequests ===
23+
24+
impl<X: Clone, N> NewCountRequests<X, N> {
25+
pub fn new(extract: X, inner: N) -> Self {
26+
Self { extract, inner }
27+
}
28+
29+
pub fn layer_via(extract: X) -> impl svc::Layer<N, Service = Self> + Clone {
30+
svc::layer::mk(move |inner| Self::new(extract.clone(), inner))
31+
}
32+
}
33+
34+
impl<T, X, N> svc::NewService<T> for NewCountRequests<X, N>
35+
where
36+
X: svc::ExtractParam<RequestCount, T>,
37+
N: svc::NewService<T>,
38+
{
39+
type Service = CountRequests<N::Service>;
40+
41+
fn new_service(&self, target: T) -> Self::Service {
42+
let RequestCount(counter) = self.extract.extract_param(&target);
43+
let inner = self.inner.new_service(target);
44+
CountRequests::new(counter, inner)
45+
}
46+
}
47+
48+
// === impl CountRequests ===
49+
50+
impl<S> CountRequests<S> {
51+
fn new(requests: Arc<Counter>, inner: S) -> Self {
52+
Self { requests, inner }
53+
}
54+
}
55+
56+
impl<B, S> svc::Service<http::Request<B>> for CountRequests<S>
57+
where
58+
S: svc::Service<http::Request<B>>,
59+
{
60+
type Response = S::Response;
61+
type Error = S::Error;
62+
type Future = S::Future;
63+
64+
#[inline]
65+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
66+
self.inner.poll_ready(cx)
67+
}
68+
69+
fn call(&mut self, req: http::Request<B>) -> Self::Future {
70+
self.requests.incr();
71+
self.inner.call(req)
72+
}
73+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use ahash::AHashMap;
2+
use linkerd_app_core::metrics::{metrics, Counter, FmtLabels, FmtMetrics};
3+
use linkerd_proxy_client_policy as policy;
4+
use parking_lot::Mutex;
5+
use std::{fmt::Write, sync::Arc};
6+
7+
use crate::{BackendRef, ParentRef, RouteRef};
8+
9+
metrics! {
10+
outbound_http_route_backend_requests_total: Counter {
11+
"The total number of outbound requests dispatched to a HTTP route backend"
12+
},
13+
outbound_grpc_route_backend_requests_total: Counter {
14+
"The total number of outbound requests dispatched to a gRPC route backend"
15+
}
16+
}
17+
18+
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
19+
struct Labels(ParentRef, RouteRef, BackendRef);
20+
21+
#[derive(Clone, Debug, Default)]
22+
pub struct RouteBackendMetrics {
23+
http: Arc<Mutex<AHashMap<Labels, Arc<Counter>>>>,
24+
grpc: Arc<Mutex<AHashMap<Labels, Arc<Counter>>>>,
25+
}
26+
27+
// === impl RouteBackendMetrics ===
28+
29+
impl RouteBackendMetrics {
30+
pub fn http_requests_total(&self, pr: ParentRef, rr: RouteRef, br: BackendRef) -> Arc<Counter> {
31+
self.http
32+
.lock()
33+
.entry(Labels(pr, rr, br))
34+
.or_default()
35+
.clone()
36+
}
37+
38+
pub fn grpc_requests_total(&self, pr: ParentRef, rr: RouteRef, br: BackendRef) -> Arc<Counter> {
39+
self.grpc
40+
.lock()
41+
.entry(Labels(pr, rr, br))
42+
.or_default()
43+
.clone()
44+
}
45+
}
46+
47+
impl FmtMetrics for RouteBackendMetrics {
48+
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49+
let http = self.http.lock();
50+
if !http.is_empty() {
51+
outbound_http_route_backend_requests_total.fmt_help(f)?;
52+
outbound_http_route_backend_requests_total.fmt_scopes(f, http.iter(), |c| c)?;
53+
}
54+
drop(http);
55+
56+
let grpc = self.grpc.lock();
57+
if !grpc.is_empty() {
58+
outbound_grpc_route_backend_requests_total.fmt_help(f)?;
59+
outbound_grpc_route_backend_requests_total.fmt_scopes(f, grpc.iter(), |c| c)?;
60+
}
61+
drop(grpc);
62+
63+
Ok(())
64+
}
65+
}
66+
67+
impl FmtLabels for Labels {
68+
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69+
let Self(parent, route, backend) = self;
70+
71+
Self::write_meta("parent", parent, f)?;
72+
f.write_char(',')?;
73+
74+
Self::write_meta("route", route, f)?;
75+
f.write_char(',')?;
76+
77+
Self::write_meta("backend", backend, f)?;
78+
Ok(())
79+
}
80+
}
81+
82+
impl Labels {
83+
fn write_meta(
84+
scope: &str,
85+
meta: &policy::Meta,
86+
f: &mut std::fmt::Formatter<'_>,
87+
) -> std::fmt::Result {
88+
write!(f, "{scope}_group=\"{}\"", meta.group())?;
89+
write!(f, ",{scope}_kind=\"{}\"", meta.kind())?;
90+
write!(f, ",{scope}_namespace=\"{}\"", meta.namespace())?;
91+
write!(f, ",{scope}_name=\"{}\"", meta.name())?;
92+
Ok(())
93+
}
94+
}

0 commit comments

Comments
 (0)