Skip to content

Commit df78f29

Browse files
committed
outbound: Report per-route-backend request count metrics
When performing policy-based routing, proxies may dispatch requests through per-route backend configurations. In order to illustrate how routing rules apply and how backend distributions are being honored, this change adds two new metrics: * `outbound_http_route_backend_requests_total`; and * `outbound_grpc_route_backend_requests_total` Each of these metrics includes labels that identify a routes parent (i.e. a Service), the route resource being used, and the backend resource being used. This implementation does NOT implement any form of metrics eviction for these new metrics. This is tolerable for the short term as the cardinality of services and routes is generally much less than the cardinality of individual endpoints (where we do require timeout/eviction for metrics).
1 parent 172523f commit df78f29

File tree

23 files changed

+353
-67
lines changed

23 files changed

+353
-67
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,7 @@ dependencies = [
10321032
name = "linkerd-app-outbound"
10331033
version = "0.1.0"
10341034
dependencies = [
1035+
"ahash",
10351036
"bytes",
10361037
"futures",
10371038
"http",
@@ -1882,7 +1883,7 @@ dependencies = [
18821883
[[package]]
18831884
name = "linkerd2-proxy-api"
18841885
version = "0.8.0"
1885-
source = "git+https://github.com/linkerd/linkerd2-proxy-api?branch=main#afd13fc7d5a592acd31b4ba822f854e5590e9600"
1886+
source = "git+https://github.com/linkerd/linkerd2-proxy-api?branch=main#afca924a6df2434196e85a265ffb7c15a547a8fa"
18861887
dependencies = [
18871888
"h2",
18881889
"http",

linkerd/app/admin/src/stack.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl Config {
7676
policy: impl inbound::policy::GetPolicy,
7777
identity: identity::Server,
7878
report: R,
79-
metrics: inbound::Metrics,
79+
metrics: inbound::InboundMetrics,
8080
trace: trace::Handle,
8181
drain: drain::Watch,
8282
shutdown: mpsc::UnboundedSender<()>,

linkerd/app/inbound/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod server;
1717
#[cfg(any(test, feature = "test-util", fuzzing))]
1818
pub mod test_util;
1919

20-
pub use self::{metrics::Metrics, policy::DefaultPolicy};
20+
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
2121
use linkerd_app_core::{
2222
config::{ConnectConfig, ProxyConfig, QueueConfig},
2323
drain,
@@ -63,7 +63,7 @@ pub struct Inbound<S> {
6363

6464
#[derive(Clone)]
6565
struct Runtime {
66-
metrics: Metrics,
66+
metrics: InboundMetrics,
6767
identity: identity::creds::Receiver,
6868
tap: tap::Registry,
6969
span_sink: OpenCensusSink,
@@ -150,7 +150,7 @@ impl<S> Inbound<S> {
150150
impl Inbound<()> {
151151
pub fn new(config: Config, runtime: ProxyRuntime) -> Self {
152152
let runtime = Runtime {
153-
metrics: Metrics::new(runtime.metrics),
153+
metrics: InboundMetrics::new(runtime.metrics),
154154
identity: runtime.identity,
155155
tap: runtime.tap,
156156
span_sink: runtime.span_sink,
@@ -170,7 +170,7 @@ impl Inbound<()> {
170170
(this, drain)
171171
}
172172

173-
pub fn metrics(&self) -> Metrics {
173+
pub fn metrics(&self) -> InboundMetrics {
174174
self.runtime.metrics.clone()
175175
}
176176

linkerd/app/inbound/src/metrics.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub use linkerd_app_core::metrics::*;
1515

1616
/// Holds outbound proxy metrics.
1717
#[derive(Clone, Debug)]
18-
pub struct Metrics {
18+
pub struct InboundMetrics {
1919
pub http_authz: authz::HttpAuthzMetrics,
2020
pub http_errors: error::HttpErrorMetrics,
2121

@@ -27,7 +27,7 @@ pub struct Metrics {
2727
pub proxy: Proxy,
2828
}
2929

30-
impl Metrics {
30+
impl InboundMetrics {
3131
pub(crate) fn new(proxy: Proxy) -> Self {
3232
Self {
3333
http_authz: authz::HttpAuthzMetrics::default(),
@@ -39,7 +39,7 @@ impl Metrics {
3939
}
4040
}
4141

42-
impl FmtMetrics for Metrics {
42+
impl FmtMetrics for InboundMetrics {
4343
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4444
self.http_authz.fmt_metrics(f)?;
4545
self.http_errors.fmt_metrics(f)?;

linkerd/app/integration/src/policy.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ pub fn outbound_default(dst: impl ToString) -> outbound::OutboundPolicy {
111111
let dst = dst.to_string();
112112
let route = outbound_default_http_route(dst.clone());
113113
outbound::OutboundPolicy {
114+
metadata: Some(api::meta::Metadata {
115+
kind: Some(api::meta::metadata::Kind::Default("default".to_string())),
116+
}),
114117
protocol: Some(outbound::ProxyProtocol {
115118
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
116119
timeout: Some(Duration::from_secs(10).try_into().unwrap()),

linkerd/app/integration/src/tests/client_policy.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ async fn empty_http1_route() {
5151
.outbound(
5252
srv.addr,
5353
outbound::OutboundPolicy {
54+
metadata: Some(api::meta::Metadata {
55+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
56+
}),
5457
protocol: Some(outbound::ProxyProtocol {
5558
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
5659
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
@@ -137,6 +140,9 @@ async fn empty_http2_route() {
137140
.outbound(
138141
srv.addr,
139142
outbound::OutboundPolicy {
143+
metadata: Some(api::meta::Metadata {
144+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
145+
}),
140146
protocol: Some(outbound::ProxyProtocol {
141147
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
142148
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
@@ -250,6 +256,9 @@ async fn header_based_routing() {
250256
.outbound(
251257
srv.addr,
252258
outbound::OutboundPolicy {
259+
metadata: Some(api::meta::Metadata {
260+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
261+
}),
253262
protocol: Some(outbound::ProxyProtocol {
254263
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
255264
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
@@ -427,6 +436,9 @@ async fn path_based_routing() {
427436
.outbound(
428437
srv.addr,
429438
outbound::OutboundPolicy {
439+
metadata: Some(api::meta::Metadata {
440+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
441+
}),
430442
protocol: Some(outbound::ProxyProtocol {
431443
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
432444
timeout: Some(Duration::from_secs(10).try_into().unwrap()),

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ test-subscriber = []
1616
test-util = ["linkerd-app-test", "linkerd-meshtls-rustls/test-util"]
1717

1818
[dependencies]
19+
ahash = "0.8"
1920
bytes = "1"
2021
http = "0.2"
2122
futures = { version = "0.3", default-features = false }

linkerd/app/outbound/src/discover.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ pub fn synthesize_forward_policy(
248248
};
249249

250250
ClientPolicy {
251+
parent: meta.clone(),
251252
protocol: detect,
252253
backends: Arc::new([backend]),
253254
}

linkerd/app/outbound/src/http/logical.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
//! A stack that routes HTTP requests to concrete backends.
22
33
use super::concrete;
4-
use crate::Outbound;
4+
use crate::{Outbound, OutboundMetrics};
55
use linkerd_app_core::{
6-
metrics,
76
proxy::{api_resolve::Metadata, http},
87
svc,
98
transport::addrs::*,
@@ -118,7 +117,7 @@ impl<N> Outbound<N> {
118117
let watch = concrete
119118
// Share the concrete stack with each router stack.
120119
.lift_new()
121-
.push_on_service(RouterParams::layer(rt.metrics.proxy.clone()))
120+
.push_on_service(RouterParams::layer(rt.metrics.clone()))
122121
// Rebuild the inner router stack every time the watch changes.
123122
.push(svc::NewSpawnWatch::<Routes, _>::layer_into::<RouterParams<T>>());
124123

@@ -136,7 +135,7 @@ where
136135
T: Clone + Debug + Eq + Hash + Send + Sync + 'static,
137136
{
138137
fn layer<N, S>(
139-
metrics: metrics::Proxy,
138+
metrics: OutboundMetrics,
140139
) -> impl svc::Layer<
141140
N,
142141
Service = svc::ArcNewService<
@@ -161,25 +160,22 @@ where
161160
S::Future: Send,
162161
{
163162
svc::layer::mk(move |concrete: N| {
164-
let policy = svc::stack(concrete.clone()).push(policy::Policy::layer());
163+
let policy = svc::stack(concrete.clone())
164+
.push(policy::Policy::layer(metrics.http_route_backends.clone()));
165165
let profile =
166-
svc::stack(concrete.clone()).push(profile::Params::layer(metrics.clone()));
166+
svc::stack(concrete.clone()).push(profile::Params::layer(metrics.proxy.clone()));
167167
svc::stack(concrete)
168168
.push_switch(
169-
|prms: RouterParams<T>| {
169+
|prms: Self| {
170170
Ok::<_, Infallible>(match prms {
171-
RouterParams::Endpoint(remote, meta, parent) => {
172-
svc::Either::A(Concrete {
173-
target: concrete::Dispatch::Forward(remote, meta),
174-
authority: None,
175-
parent,
176-
failure_accrual: Default::default(),
177-
})
178-
}
179-
RouterParams::Profile(profile) => {
180-
svc::Either::B(svc::Either::A(profile))
181-
}
182-
RouterParams::Policy(policy) => svc::Either::B(svc::Either::B(policy)),
171+
Self::Endpoint(remote, meta, parent) => svc::Either::A(Concrete {
172+
target: concrete::Dispatch::Forward(remote, meta),
173+
authority: None,
174+
parent,
175+
failure_accrual: Default::default(),
176+
}),
177+
Self::Profile(profile) => svc::Either::B(svc::Either::A(profile)),
178+
Self::Policy(policy) => svc::Either::B(svc::Either::B(policy)),
183179
})
184180
},
185181
// Switch profile and policy routing.

linkerd/app/outbound/src/http/logical/policy.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ mod router;
88
mod tests;
99

1010
pub use self::{
11-
route::errors,
11+
route::{backend::RouteBackendMetrics, errors},
1212
router::{GrpcParams, HttpParams},
1313
};
1414
pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual};
@@ -45,11 +45,17 @@ where
4545
// Parent target type.
4646
T: Debug + Eq + Hash,
4747
T: Clone + Send + Sync + 'static,
48+
route::backend::ExtractMetrics:
49+
svc::ExtractParam<route::backend::RequestCount, route::backend::Http<T>>,
50+
// route::backend::ExtractMetrics:
51+
// svc::ExtractParam<route::backend::RequestCount, route::backend::Grpc<T>>,
4852
{
4953
/// Builds a stack that dynamically updates and applies HTTP or gRPC policy
5054
/// routing configurations to route requests over cached inner backend
5155
/// services.
52-
pub(super) fn layer<N, S>() -> impl svc::Layer<
56+
pub(super) fn layer<N, S>(
57+
route_backend_metrics: RouteBackendMetrics,
58+
) -> impl svc::Layer<
5359
N,
5460
Service = svc::ArcNewService<
5561
Self,
@@ -73,9 +79,10 @@ where
7379
S: Clone + Send + Sync + 'static,
7480
S::Future: Send,
7581
{
76-
svc::layer::mk(|inner: N| {
77-
let http = svc::stack(inner.clone()).push(router::Http::layer());
78-
let grpc = svc::stack(inner).push(router::Grpc::layer());
82+
svc::layer::mk(move |inner: N| {
83+
let http =
84+
svc::stack(inner.clone()).push(router::Http::layer(route_backend_metrics.clone()));
85+
let grpc = svc::stack(inner).push(router::Grpc::layer(route_backend_metrics.clone()));
7986

8087
http.push_switch(
8188
|pp: Policy<T>| {

0 commit comments

Comments
 (0)