Skip to content

Commit d9fde0a

Browse files
authored
outbound: Report per-route-backend request count metrics (#2377)
When performing policy-based routing, proxies may dispatch requests through per-route backend configurations. In order to illustrate how routing rules apply and how backend distributions are being honored, this change adds two new metrics: * `outbound_http_route_backend_requests_total`; and * `outbound_grpc_route_backend_requests_total` Each of these metrics includes labels that identify a routes parent (i.e. a Service), the route resource being used, and the backend resource being used. This implementation does NOT implement any form of metrics eviction for these new metrics. This is tolerable for the short term as the cardinality of services and routes is generally much less than the cardinality of individual endpoints (where we do require timeout/eviction for metrics).
1 parent 7a811f6 commit d9fde0a

File tree

11 files changed

+336
-44
lines changed

11 files changed

+336
-44
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,7 @@ dependencies = [
10321032
name = "linkerd-app-outbound"
10331033
version = "0.1.0"
10341034
dependencies = [
1035+
"ahash",
10351036
"bytes",
10361037
"futures",
10371038
"http",

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ test-subscriber = []
1616
test-util = ["linkerd-app-test", "linkerd-meshtls-rustls/test-util"]
1717

1818
[dependencies]
19+
ahash = "0.8"
1920
bytes = "1"
2021
http = "0.2"
2122
futures = { version = "0.3", default-features = false }

linkerd/app/outbound/src/http/logical.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ where
162162
S::Future: Send,
163163
{
164164
svc::layer::mk(move |concrete: N| {
165-
let policy = svc::stack(concrete.clone()).push(policy::Policy::layer());
165+
let policy = svc::stack(concrete.clone())
166+
.push(policy::Policy::layer(metrics.http_route_backends.clone()));
166167
let profile =
167168
svc::stack(concrete.clone()).push(profile::Params::layer(metrics.proxy.clone()));
168169
svc::stack(concrete)

linkerd/app/outbound/src/http/logical/policy.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ mod router;
88
mod tests;
99

1010
pub use self::{
11-
route::errors,
11+
route::{backend::RouteBackendMetrics, errors},
1212
router::{GrpcParams, HttpParams},
1313
};
1414
pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual};
@@ -49,7 +49,9 @@ where
4949
/// Builds a stack that dynamically updates and applies HTTP or gRPC policy
5050
/// routing configurations to route requests over cached inner backend
5151
/// services.
52-
pub(super) fn layer<N, S>() -> impl svc::Layer<
52+
pub(super) fn layer<N, S>(
53+
route_backend_metrics: RouteBackendMetrics,
54+
) -> impl svc::Layer<
5355
N,
5456
Service = svc::ArcNewService<
5557
Self,
@@ -74,8 +76,9 @@ where
7476
S::Future: Send,
7577
{
7678
svc::layer::mk(move |inner: N| {
77-
let http = svc::stack(inner.clone()).push(router::Http::layer());
78-
let grpc = svc::stack(inner).push(router::Grpc::layer());
79+
let http =
80+
svc::stack(inner.clone()).push(router::Http::layer(route_backend_metrics.clone()));
81+
let grpc = svc::stack(inner).push(router::Grpc::layer(route_backend_metrics.clone()));
7982

8083
http.push_switch(
8184
|pp: Policy<T>| {

linkerd/app/outbound/src/http/logical/policy/route.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use super::super::Concrete;
1+
use crate::RouteRef;
2+
3+
use super::{super::Concrete, RouteBackendMetrics};
24
use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result};
35
use linkerd_distribute as distribute;
46
use linkerd_http_route as http_route;
@@ -25,7 +27,7 @@ pub(crate) struct Matched<M, P> {
2527
pub(crate) struct Route<T, F, E> {
2628
pub(super) parent: T,
2729
pub(super) addr: Addr,
28-
pub(super) meta: Arc<policy::Meta>,
30+
pub(super) route_ref: RouteRef,
2931
pub(super) filters: Arc<[F]>,
3032
pub(super) distribution: BackendDistribution<T, F>,
3133
pub(super) failure_policy: E,
@@ -66,11 +68,14 @@ where
6668
Self: filters::Apply,
6769
Self: svc::Param<classify::Request>,
6870
MatchedBackend<T, M, F>: filters::Apply,
71+
backend::ExtractMetrics: svc::ExtractParam<backend::RequestCount, MatchedBackend<T, M, F>>,
6972
{
7073
/// Builds a route stack that applies policy filters to requests and
7174
/// distributes requests over each route's backends. These [`Concrete`]
7275
/// backends are expected to be cached/shared by the inner stack.
73-
pub(crate) fn layer<N, S>() -> impl svc::Layer<
76+
pub(crate) fn layer<N, S>(
77+
backend_metrics: RouteBackendMetrics,
78+
) -> impl svc::Layer<
7479
N,
7580
Service = svc::ArcNewService<
7681
Self,
@@ -98,7 +103,7 @@ where
98103
svc::stack(inner)
99104
// Distribute requests across route backends, applying policies
100105
// and filters for each of the route-backends.
101-
.push(MatchedBackend::layer())
106+
.push(MatchedBackend::layer(backend_metrics.clone()))
102107
.lift_new_with_target()
103108
.push(NewDistribute::layer())
104109
// The router does not take the backend's availability into

linkerd/app/outbound/src/http/logical/policy/route/backend.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
use super::{super::Concrete, filters};
2+
use crate::RouteRef;
23
use linkerd_app_core::{proxy::http, svc, Error, Result};
34
use linkerd_http_route as http_route;
45
use linkerd_proxy_client_policy as policy;
56
use std::{fmt::Debug, hash::Hash, sync::Arc};
67

8+
mod count_reqs;
9+
mod metrics;
10+
11+
pub use self::{count_reqs::RequestCount, metrics::RouteBackendMetrics};
12+
713
#[derive(Debug, PartialEq, Eq, Hash)]
814
pub(crate) struct Backend<T, F> {
15+
pub(crate) route_ref: RouteRef,
916
pub(crate) concrete: Concrete<T>,
1017
pub(crate) filters: Arc<[F]>,
1118
}
@@ -16,11 +23,17 @@ pub(crate) type Http<T> =
1623
pub(crate) type Grpc<T> =
1724
MatchedBackend<T, http_route::grpc::r#match::RouteMatch, policy::grpc::Filter>;
1825

26+
#[derive(Clone, Debug)]
27+
pub struct ExtractMetrics {
28+
metrics: RouteBackendMetrics,
29+
}
30+
1931
// === impl Backend ===
2032

2133
impl<T: Clone, F> Clone for Backend<T, F> {
2234
fn clone(&self) -> Self {
2335
Self {
36+
route_ref: self.route_ref.clone(),
2437
filters: self.filters.clone(),
2538
concrete: self.concrete.clone(),
2639
}
@@ -51,13 +64,16 @@ where
5164
F: Clone + Send + Sync + 'static,
5265
// Assert that filters can be applied.
5366
Self: filters::Apply,
67+
ExtractMetrics: svc::ExtractParam<RequestCount, Self>,
5468
{
5569
/// Builds a stack that applies per-route-backend policy filters over an
5670
/// inner [`Concrete`] stack.
5771
///
5872
/// This [`MatchedBackend`] must implement [`filters::Apply`] to apply these
5973
/// filters.
60-
pub(crate) fn layer<N, S>() -> impl svc::Layer<
74+
pub(crate) fn layer<N, S>(
75+
metrics: RouteBackendMetrics,
76+
) -> impl svc::Layer<
6177
N,
6278
Service = svc::ArcNewService<
6379
Self,
@@ -90,6 +106,9 @@ where
90106
}| concrete,
91107
)
92108
.push(filters::NewApplyFilters::<Self, _, _>::layer())
109+
.push(count_reqs::NewCountRequests::layer_via(ExtractMetrics {
110+
metrics: metrics.clone(),
111+
}))
93112
.push(svc::ArcNewService::layer())
94113
.into_inner()
95114
})
@@ -109,3 +128,23 @@ impl<T> filters::Apply for Grpc<T> {
109128
filters::apply_grpc(&self.r#match, &self.params.filters, req)
110129
}
111130
}
131+
132+
impl<T> svc::ExtractParam<RequestCount, Http<T>> for ExtractMetrics {
133+
fn extract_param(&self, params: &Http<T>) -> RequestCount {
134+
RequestCount(self.metrics.http_requests_total(
135+
params.params.concrete.parent_ref.clone(),
136+
params.params.route_ref.clone(),
137+
params.params.concrete.backend_ref.clone(),
138+
))
139+
}
140+
}
141+
142+
impl<T> svc::ExtractParam<RequestCount, Grpc<T>> for ExtractMetrics {
143+
fn extract_param(&self, params: &Grpc<T>) -> RequestCount {
144+
RequestCount(self.metrics.grpc_requests_total(
145+
params.params.concrete.parent_ref.clone(),
146+
params.params.route_ref.clone(),
147+
params.params.concrete.backend_ref.clone(),
148+
))
149+
}
150+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use linkerd_app_core::{metrics::Counter, svc};
2+
use std::{
3+
sync::Arc,
4+
task::{Context, Poll},
5+
};
6+
7+
#[derive(Clone, Debug, Default)]
8+
pub struct RequestCount(pub Arc<Counter>);
9+
10+
#[derive(Clone, Debug)]
11+
pub struct NewCountRequests<X, N> {
12+
inner: N,
13+
extract: X,
14+
}
15+
16+
#[derive(Clone, Debug)]
17+
pub struct CountRequests<S> {
18+
inner: S,
19+
requests: Arc<Counter>,
20+
}
21+
22+
// === impl NewCountRequests ===
23+
24+
impl<X: Clone, N> NewCountRequests<X, N> {
25+
pub fn new(extract: X, inner: N) -> Self {
26+
Self { extract, inner }
27+
}
28+
29+
pub fn layer_via(extract: X) -> impl svc::Layer<N, Service = Self> + Clone {
30+
svc::layer::mk(move |inner| Self::new(extract.clone(), inner))
31+
}
32+
}
33+
34+
impl<T, X, N> svc::NewService<T> for NewCountRequests<X, N>
35+
where
36+
X: svc::ExtractParam<RequestCount, T>,
37+
N: svc::NewService<T>,
38+
{
39+
type Service = CountRequests<N::Service>;
40+
41+
fn new_service(&self, target: T) -> Self::Service {
42+
let RequestCount(counter) = self.extract.extract_param(&target);
43+
let inner = self.inner.new_service(target);
44+
CountRequests::new(counter, inner)
45+
}
46+
}
47+
48+
// === impl CountRequests ===
49+
50+
impl<S> CountRequests<S> {
51+
fn new(requests: Arc<Counter>, inner: S) -> Self {
52+
Self { requests, inner }
53+
}
54+
}
55+
56+
impl<B, S> svc::Service<http::Request<B>> for CountRequests<S>
57+
where
58+
S: svc::Service<http::Request<B>>,
59+
{
60+
type Response = S::Response;
61+
type Error = S::Error;
62+
type Future = S::Future;
63+
64+
#[inline]
65+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
66+
self.inner.poll_ready(cx)
67+
}
68+
69+
fn call(&mut self, req: http::Request<B>) -> Self::Future {
70+
self.requests.incr();
71+
self.inner.call(req)
72+
}
73+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use ahash::AHashMap;
2+
use linkerd_app_core::metrics::{metrics, Counter, FmtLabels, FmtMetrics};
3+
use linkerd_proxy_client_policy as policy;
4+
use parking_lot::Mutex;
5+
use std::{fmt::Write, sync::Arc};
6+
7+
use crate::{BackendRef, ParentRef, RouteRef};
8+
9+
metrics! {
10+
outbound_http_route_backend_requests_total: Counter {
11+
"The total number of outbound requests dispatched to a HTTP route backend"
12+
},
13+
outbound_grpc_route_backend_requests_total: Counter {
14+
"The total number of outbound requests dispatched to a gRPC route backend"
15+
}
16+
}
17+
18+
#[derive(Clone, Debug, Default)]
19+
pub struct RouteBackendMetrics {
20+
http: Arc<Mutex<AHashMap<Labels, Arc<Counter>>>>,
21+
grpc: Arc<Mutex<AHashMap<Labels, Arc<Counter>>>>,
22+
}
23+
24+
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
25+
struct Labels(ParentRef, RouteRef, BackendRef);
26+
27+
// === impl RouteBackendMetrics ===
28+
29+
impl RouteBackendMetrics {
30+
pub fn http_requests_total(&self, pr: ParentRef, rr: RouteRef, br: BackendRef) -> Arc<Counter> {
31+
self.http
32+
.lock()
33+
.entry(Labels(pr, rr, br))
34+
.or_default()
35+
.clone()
36+
}
37+
38+
pub fn grpc_requests_total(&self, pr: ParentRef, rr: RouteRef, br: BackendRef) -> Arc<Counter> {
39+
self.grpc
40+
.lock()
41+
.entry(Labels(pr, rr, br))
42+
.or_default()
43+
.clone()
44+
}
45+
}
46+
47+
impl FmtMetrics for RouteBackendMetrics {
48+
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49+
let http = self.http.lock();
50+
if !http.is_empty() {
51+
outbound_http_route_backend_requests_total.fmt_help(f)?;
52+
outbound_http_route_backend_requests_total.fmt_scopes(f, http.iter(), |c| c)?;
53+
}
54+
drop(http);
55+
56+
let grpc = self.grpc.lock();
57+
if !grpc.is_empty() {
58+
outbound_grpc_route_backend_requests_total.fmt_help(f)?;
59+
outbound_grpc_route_backend_requests_total.fmt_scopes(f, grpc.iter(), |c| c)?;
60+
}
61+
drop(grpc);
62+
63+
Ok(())
64+
}
65+
}
66+
67+
impl FmtLabels for Labels {
68+
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69+
let Labels(parent, route, backend) = self;
70+
71+
Self::write_extended_meta("parent", parent, f)?;
72+
f.write_char(',')?;
73+
Self::write_basic_meta("route", route, f)?;
74+
f.write_char(',')?;
75+
Self::write_extended_meta("backend", backend, f)?;
76+
77+
Ok(())
78+
}
79+
}
80+
81+
impl Labels {
82+
fn write_basic_meta(
83+
scope: &str,
84+
meta: &policy::Meta,
85+
f: &mut std::fmt::Formatter<'_>,
86+
) -> std::fmt::Result {
87+
write!(f, "{scope}_group=\"{}\"", meta.group())?;
88+
write!(f, ",{scope}_kind=\"{}\"", meta.kind())?;
89+
write!(f, ",{scope}_namespace=\"{}\"", meta.namespace())?;
90+
write!(f, ",{scope}_name=\"{}\"", meta.name())?;
91+
92+
Ok(())
93+
}
94+
95+
fn write_extended_meta(
96+
scope: &str,
97+
meta: &policy::Meta,
98+
f: &mut std::fmt::Formatter<'_>,
99+
) -> std::fmt::Result {
100+
Self::write_basic_meta(scope, meta, f)?;
101+
102+
match meta.port() {
103+
Some(port) => write!(f, ",{scope}_port=\"{port}\"")?,
104+
None => write!(f, ",{scope}_port=\"\"")?,
105+
}
106+
write!(f, ",{scope}_section_name=\"{}\"", meta.section())?;
107+
108+
Ok(())
109+
}
110+
}

0 commit comments

Comments
 (0)