Skip to content

Commit d6f20a6

Browse files
authored
outbound: Report HTTP balancer endpoint gauges (#2380)
It's not currently possible to know how many endpoints are in a balancer. This change adds an `outbound_http_balancer_endpoints` gauge that exposes the number of endpoints a balancer has by their current readiness status. Note that in the concrete stack we do not currently differentiate between gRPC and HTTP backends, so all balancers are exposed under this single metric.
1 parent cef2ba9 commit d6f20a6

File tree

13 files changed

+442
-27
lines changed

13 files changed

+442
-27
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1438,6 +1438,7 @@ dependencies = [
14381438
"futures-util",
14391439
"indexmap",
14401440
"linkerd-error",
1441+
"linkerd-metrics",
14411442
"linkerd-proxy-core",
14421443
"linkerd-stack",
14431444
"pin-project",

linkerd/app/outbound/src/http/breaker.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
mod consecutive_failures;
2-
use consecutive_failures::ConsecutiveFailures;
31
use linkerd_app_core::{classify, proxy::http::classify::gate, svc};
42
use linkerd_proxy_client_policy::FailureAccrual;
53
use tracing::{trace_span, Instrument};
64

5+
mod consecutive_failures;
6+
7+
use self::consecutive_failures::ConsecutiveFailures;
8+
79
/// Params configuring a circuit breaker stack.
810
#[derive(Copy, Clone, Debug)]
911
pub(crate) struct Params {

linkerd/app/outbound/src/http/concrete.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
use super::{balance, breaker, client, handle_proxy_error_headers};
55
use crate::{http, stack_labels, BackendRef, Outbound, ParentRef};
66
use linkerd_app_core::{
7-
classify, metrics, profiles,
7+
classify,
8+
metrics::{prefix_labels, EndpointLabels, OutboundEndpointLabels},
9+
profiles,
810
proxy::{
911
api_resolve::{ConcreteAddr, Metadata, ProtocolHint},
1012
core::Resolve,
@@ -19,6 +21,12 @@ use linkerd_proxy_client_policy::FailureAccrual;
1921
use std::{fmt::Debug, net::SocketAddr, sync::Arc};
2022
use tracing::info_span;
2123

24+
mod metrics;
25+
#[cfg(test)]
26+
mod tests;
27+
28+
pub use self::metrics::BalancerMetrics;
29+
2230
/// Parameter configuring dispatcher behavior.
2331
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
2432
pub enum Dispatch {
@@ -182,7 +190,9 @@ where
182190
http::Request<http::BoxBody>,
183191
Response = http::Response<http::BoxBody>,
184192
Error = BalanceError,
185-
Future = impl Send,
193+
Future = impl std::future::Future<
194+
Output = Result<http::Response<http::BoxBody>, BalanceError>,
195+
> + Send,
186196
>,
187197
>,
188198
> + Clone
@@ -234,6 +244,12 @@ where
234244
}
235245
}),
236246
)
247+
.push(balance::NewGaugeEndpoints::layer_via({
248+
let metrics = metrics.http_balancer.clone();
249+
move |target: &Self| {
250+
metrics.http_endpoints(target.parent.param(), target.parent.param())
251+
}
252+
}))
237253
.push_on_service(svc::OnServiceLayer::new(
238254
metrics.proxy.stack.layer(stack_labels("http", "endpoint")),
239255
))
@@ -321,26 +337,26 @@ where
321337
}
322338
}
323339

324-
impl<T> svc::Param<metrics::OutboundEndpointLabels> for Endpoint<T>
340+
impl<T> svc::Param<OutboundEndpointLabels> for Endpoint<T>
325341
where
326342
T: svc::Param<Option<http::uri::Authority>>,
327343
{
328-
fn param(&self) -> metrics::OutboundEndpointLabels {
329-
metrics::OutboundEndpointLabels {
344+
fn param(&self) -> OutboundEndpointLabels {
345+
OutboundEndpointLabels {
330346
authority: self.parent.param(),
331-
labels: metrics::prefix_labels("dst", self.metadata.labels().iter()),
347+
labels: prefix_labels("dst", self.metadata.labels().iter()),
332348
server_id: self.param(),
333349
target_addr: self.addr.into(),
334350
}
335351
}
336352
}
337353

338-
impl<T> svc::Param<metrics::EndpointLabels> for Endpoint<T>
354+
impl<T> svc::Param<EndpointLabels> for Endpoint<T>
339355
where
340356
T: svc::Param<Option<http::uri::Authority>>,
341357
{
342-
fn param(&self) -> metrics::EndpointLabels {
343-
metrics::EndpointLabels::Outbound(self.param())
358+
fn param(&self) -> EndpointLabels {
359+
EndpointLabels::Outbound(self.param())
344360
}
345361
}
346362

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use crate::{BackendRef, ParentRef};
2+
use ahash::AHashMap;
3+
use linkerd_app_core::{
4+
metrics::{metrics, FmtLabels, FmtMetrics, Gauge},
5+
svc::http::balance,
6+
};
7+
use parking_lot::Mutex;
8+
use std::{fmt::Write, sync::Arc};
9+
10+
metrics! {
11+
outbound_http_balancer_endpoints: Gauge {
12+
"The number of endpoints currently in a HTTP request balancer"
13+
}
14+
}
15+
16+
#[derive(Clone, Debug, Default)]
17+
pub struct BalancerMetrics {
18+
balancers: Arc<Mutex<AHashMap<Labels, balance::EndpointsGauges>>>,
19+
}
20+
21+
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
22+
struct Labels(ParentRef, BackendRef);
23+
24+
struct Ready<'l>(&'l Labels);
25+
struct Pending<'l>(&'l Labels);
26+
27+
// === impl RouteBackendMetrics ===
28+
29+
impl BalancerMetrics {
30+
pub(super) fn http_endpoints(&self, pr: ParentRef, br: BackendRef) -> balance::EndpointsGauges {
31+
self.balancers
32+
.lock()
33+
.entry(Labels(pr, br))
34+
.or_default()
35+
.clone()
36+
}
37+
}
38+
39+
impl FmtMetrics for BalancerMetrics {
40+
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41+
let balancers = self.balancers.lock();
42+
if !balancers.is_empty() {
43+
outbound_http_balancer_endpoints.fmt_help(f)?;
44+
outbound_http_balancer_endpoints.fmt_scopes(
45+
f,
46+
balancers.iter().map(|(l, e)| (Pending(l), e)),
47+
|c| &c.pending,
48+
)?;
49+
outbound_http_balancer_endpoints.fmt_scopes(
50+
f,
51+
balancers.iter().map(|(l, e)| (Ready(l), e)),
52+
|c| &c.ready,
53+
)?;
54+
}
55+
drop(balancers);
56+
57+
Ok(())
58+
}
59+
}
60+
61+
impl FmtLabels for Labels {
62+
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63+
let Labels(parent, backend) = self;
64+
65+
crate::metrics::write_service_meta_labels("parent", parent, f)?;
66+
f.write_char(',')?;
67+
crate::metrics::write_service_meta_labels("backend", backend, f)?;
68+
69+
Ok(())
70+
}
71+
}
72+
73+
impl<'l> FmtLabels for Ready<'l> {
74+
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75+
self.0.fmt_labels(f)?;
76+
write!(f, ",endpoint_state=\"ready\"")
77+
}
78+
}
79+
80+
impl<'l> FmtLabels for Pending<'l> {
81+
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82+
self.0.fmt_labels(f)?;
83+
write!(f, ",endpoint_state=\"pending\"")
84+
}
85+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use super::*;
2+
use crate::test_util::*;
3+
use linkerd_app_core::{
4+
svc::{http::balance::EwmaConfig, ServiceExt},
5+
svc::{NewService, Service},
6+
trace,
7+
};
8+
use linkerd_proxy_client_policy as policy;
9+
use std::{net::SocketAddr, num::NonZeroU16, sync::Arc};
10+
use tokio::time;
11+
12+
#[tokio::test(flavor = "current_thread")]
13+
async fn gauges_endpoints() {
14+
let _trace = trace::test::trace_init();
15+
let (rt, _shutdown) = runtime();
16+
let outbound = Outbound::new(default_config(), rt);
17+
18+
let addr = "mysvc.myns.svc.cluster.local:80"
19+
.parse::<NameAddr>()
20+
.unwrap();
21+
let ep0 = SocketAddr::new([192, 0, 2, 41].into(), 8080);
22+
let ep1 = SocketAddr::new([192, 0, 2, 42].into(), 8080);
23+
24+
let resolve = support::resolver::<Metadata>();
25+
let mut resolve_tx = resolve.endpoint_tx(addr.clone());
26+
27+
let (svc0, mut handle0) = tower_test::mock::pair();
28+
let (svc1, mut handle1) = tower_test::mock::pair();
29+
30+
let stk = move |ep: Endpoint<_>| {
31+
if *ep.addr == ep0 {
32+
return svc0.clone();
33+
}
34+
if *ep.addr == ep1 {
35+
return svc1.clone();
36+
}
37+
panic!("unexpected endpoint: {:?}", ep)
38+
};
39+
40+
let mut svc = svc::stack(stk)
41+
.push(Balance::layer(&outbound.config, &outbound.runtime, resolve))
42+
.into_inner()
43+
.new_service(Balance {
44+
addr,
45+
parent: Target,
46+
ewma: EwmaConfig {
47+
default_rtt: time::Duration::from_millis(100),
48+
decay: time::Duration::from_secs(10),
49+
},
50+
});
51+
52+
let ready = Arc::new(tokio::sync::Notify::new());
53+
let _task = tokio::spawn({
54+
let ready = ready.clone();
55+
async move {
56+
loop {
57+
ready.notified().await;
58+
svc.ready().await.unwrap();
59+
svc.call(http::Request::default()).await.unwrap();
60+
}
61+
}
62+
});
63+
64+
let gauge = outbound
65+
.runtime
66+
.metrics
67+
.http_balancer
68+
.http_endpoints(svc::Param::param(&Target), svc::Param::param(&Target));
69+
assert_eq!(gauge.pending.value(), 0);
70+
assert_eq!(gauge.ready.value(), 0);
71+
72+
// Begin with a single endpoint. When the balancer can process requests, the
73+
// gauge is accurate.
74+
resolve_tx.add(vec![(ep0, Metadata::default())]).unwrap();
75+
handle0.allow(1);
76+
ready.notify_one();
77+
tokio::task::yield_now().await;
78+
assert_eq!(gauge.pending.value(), 0);
79+
assert_eq!(gauge.ready.value(), 1);
80+
let (_, res) = handle0.next_request().await.unwrap();
81+
res.send_response(http::Response::default());
82+
83+
// Add a second endpoint and ensure the gauge is updated.
84+
resolve_tx.add(vec![(ep1, Metadata::default())]).unwrap();
85+
handle0.allow(0);
86+
handle1.allow(1);
87+
ready.notify_one();
88+
tokio::task::yield_now().await;
89+
assert_eq!(gauge.pending.value(), 1);
90+
assert_eq!(gauge.ready.value(), 1);
91+
let (_, res) = handle1.next_request().await.unwrap();
92+
res.send_response(http::Response::default());
93+
94+
// Remove the first endpoint.
95+
resolve_tx.remove(vec![ep0]).unwrap();
96+
handle1.allow(2);
97+
ready.notify_one();
98+
let (_, res) = handle1.next_request().await.unwrap();
99+
res.send_response(http::Response::default());
100+
101+
// The inner endpoint isn't actually dropped until the balancer's subsequent poll.
102+
ready.notify_one();
103+
tokio::task::yield_now().await;
104+
assert_eq!(gauge.pending.value(), 0);
105+
assert_eq!(gauge.ready.value(), 1);
106+
let (_, res) = handle1.next_request().await.unwrap();
107+
res.send_response(http::Response::default());
108+
109+
// Dropping the remaining endpoint, the gauge is updated.
110+
resolve_tx.remove(vec![ep1]).unwrap();
111+
ready.notify_one();
112+
tokio::task::yield_now().await;
113+
assert_eq!(gauge.pending.value(), 0);
114+
assert_eq!(gauge.ready.value(), 0);
115+
}
116+
117+
#[derive(Clone, Debug)]
118+
struct Target;
119+
120+
// === impl Target ===
121+
122+
impl svc::Param<ParentRef> for Target {
123+
fn param(&self) -> ParentRef {
124+
ParentRef(Arc::new(policy::Meta::Resource {
125+
group: "core".into(),
126+
kind: "Service".into(),
127+
namespace: "myns".into(),
128+
name: "mysvc".into(),
129+
port: NonZeroU16::new(80),
130+
section: None,
131+
}))
132+
}
133+
}
134+
135+
impl svc::Param<BackendRef> for Target {
136+
fn param(&self) -> BackendRef {
137+
BackendRef(Arc::new(policy::Meta::Resource {
138+
group: "core".into(),
139+
kind: "Service".into(),
140+
namespace: "myns".into(),
141+
name: "mysvc".into(),
142+
port: NonZeroU16::new(80),
143+
section: None,
144+
}))
145+
}
146+
}
147+
148+
impl svc::Param<FailureAccrual> for Target {
149+
fn param(&self) -> FailureAccrual {
150+
FailureAccrual::default()
151+
}
152+
}

linkerd/app/outbound/src/http/logical/policy/route.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1+
use super::super::Concrete;
12
use crate::RouteRef;
2-
3-
use super::{super::Concrete, RouteBackendMetrics};
43
use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result};
54
use linkerd_distribute as distribute;
65
use linkerd_http_route as http_route;
@@ -74,7 +73,7 @@ where
7473
/// distributes requests over each route's backends. These [`Concrete`]
7574
/// backends are expected to be cached/shared by the inner stack.
7675
pub(crate) fn layer<N, S>(
77-
backend_metrics: RouteBackendMetrics,
76+
backend_metrics: backend::RouteBackendMetrics,
7877
) -> impl svc::Layer<
7978
N,
8079
Service = svc::ArcNewService<

linkerd/app/outbound/src/http/logical/policy/route/backend.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use std::{fmt::Debug, hash::Hash, sync::Arc};
88
mod count_reqs;
99
mod metrics;
1010

11-
pub use self::{count_reqs::RequestCount, metrics::RouteBackendMetrics};
11+
pub use self::count_reqs::RequestCount;
12+
pub use self::metrics::RouteBackendMetrics;
1213

1314
#[derive(Debug, PartialEq, Eq, Hash)]
1415
pub(crate) struct Backend<T, F> {

linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1+
use crate::{BackendRef, ParentRef, RouteRef};
12
use ahash::AHashMap;
23
use linkerd_app_core::metrics::{metrics, Counter, FmtLabels, FmtMetrics};
34
use linkerd_proxy_client_policy as policy;
45
use parking_lot::Mutex;
56
use std::{fmt::Write, sync::Arc};
67

7-
use crate::{BackendRef, ParentRef, RouteRef};
8-
98
metrics! {
109
outbound_http_route_backend_requests_total: Counter {
1110
"The total number of outbound requests dispatched to a HTTP route backend"

0 commit comments

Comments
 (0)