Skip to content

Commit 104ff6c

Browse files
authored
transport: expire idle TCP metrics (#781)
The proxy now exposes transport metrics with per-endpoint metadata; but, unlike HTTP metrics, these metrics are never dropped. So an application that has communicated with a now-defunct pod will continue to expose metrics about that pod indefinitely. This branch factors out the existing code for expiring idle HTTP metrics so that it can be used for transport metrics as well, and adds eviction for transport metrics. I also fixed a couple issues in the `linkerd2_metrics::summary` module that were making the crate fail to compile with `--all-features`. Closes linkerd/linkerd2#5372. Signed-off-by: Eliza Weisman <[email protected]>
1 parent 9b7e2bd commit 104ff6c

File tree

11 files changed

+361
-216
lines changed

11 files changed

+361
-216
lines changed

linkerd/app/core/src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ impl Metrics {
140140

141141
let stack = stack_metrics::Registry::default();
142142

143-
let (transport, transport_report) = transport::metrics::new();
143+
let (transport, transport_report) = transport::metrics::new(retain_idle);
144144

145145
let (opencensus, opencensus_report) = opencensus::metrics::new();
146146

linkerd/http-metrics/src/lib.rs

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
11
#![deny(warnings, rust_2018_idioms)]
22

33
pub use self::{requests::Requests, retries::Retries};
4-
use indexmap::IndexMap;
4+
use linkerd2_metrics::{LastUpdate, Store};
55
use std::fmt;
66
use std::hash::Hash;
77
use std::sync::{Arc, Mutex};
8-
use std::time::{Duration, Instant};
8+
use std::time::Duration;
99

1010
pub mod requests;
1111
pub mod retries;
1212

13-
#[derive(Debug)]
14-
struct Registry<T, M>
15-
where
16-
T: Hash + Eq,
17-
{
18-
by_target: IndexMap<T, Arc<Mutex<M>>>,
19-
}
13+
type Registry<T, M> = Store<T, Mutex<M>>;
2014

2115
/// Reports metrics for prometheus.
2216
#[derive(Debug)]
@@ -26,7 +20,7 @@ where
2620
{
2721
prefix: &'static str,
2822
registry: Arc<Mutex<Registry<T, M>>>,
29-
/// The amount time metrics with no updates should be retained for reports
23+
/// The amount of time metrics with no updates should be retained for reports
3024
retain_idle: Duration,
3125
/// Whether latencies should be reported.
3226
include_latencies: bool,
@@ -48,35 +42,6 @@ struct Prefixed<'p, N: fmt::Display> {
4842
name: N,
4943
}
5044

51-
trait LastUpdate {
52-
fn last_update(&self) -> Instant;
53-
}
54-
55-
impl<T, M> Default for Registry<T, M>
56-
where
57-
T: Hash + Eq,
58-
{
59-
fn default() -> Self {
60-
Self {
61-
by_target: IndexMap::default(),
62-
}
63-
}
64-
}
65-
66-
impl<T, M> Registry<T, M>
67-
where
68-
T: Hash + Eq,
69-
M: LastUpdate,
70-
{
71-
/// Retains metrics for all targets that (1) no longer have an active
72-
/// reference to the `RequestMetrics` structure and (2) have not been updated since `epoch`.
73-
fn retain_since(&mut self, epoch: Instant) {
74-
self.by_target.retain(|_, m| {
75-
Arc::strong_count(&m) > 1 || m.lock().map(|m| m.last_update() >= epoch).unwrap_or(false)
76-
})
77-
}
78-
}
79-
8045
impl<T, M> Report<T, M>
8146
where
8247
T: Hash + Eq,

linkerd/http-metrics/src/requests/layer.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,7 @@ where
172172
fn new_service(&mut self, target: T) -> Self::Service {
173173
let metrics = match self.registry.lock() {
174174
Ok(mut r) => Some(
175-
r.by_target
176-
.entry((&target).into())
175+
r.entry((&target).into())
177176
.or_insert_with(|| Arc::new(Mutex::new(Metrics::default())))
178177
.clone(),
179178
),
@@ -208,12 +207,7 @@ where
208207

209208
fn call(&mut self, target: T) -> Self::Future {
210209
let metrics = match self.registry.lock() {
211-
Ok(mut r) => Some(
212-
r.by_target
213-
.entry((&target).into())
214-
.or_insert_with(|| Arc::new(Mutex::new(Metrics::default())))
215-
.clone(),
216-
),
210+
Ok(mut r) => Some(r.entry((&target).into()).or_default().clone()),
217211
Err(_) => None,
218212
};
219213

linkerd/http-metrics/src/requests/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,31 +143,30 @@ mod tests {
143143

144144
let before_update = Instant::now();
145145
let metrics = registry
146-
.by_target
147146
.entry(Target(123))
148147
.or_insert_with(Default::default)
149148
.clone();
150-
assert_eq!(registry.by_target.len(), 1, "target should be registered");
149+
assert_eq!(registry.len(), 1, "target should be registered");
151150
let after_update = Instant::now();
152151

153152
registry.retain_since(after_update);
154153
assert_eq!(
155-
registry.by_target.len(),
154+
registry.len(),
156155
1,
157156
"target should not be evicted by time alone"
158157
);
159158

160159
drop(metrics);
161160
registry.retain_since(before_update);
162161
assert_eq!(
163-
registry.by_target.len(),
162+
registry.len(),
164163
1,
165164
"target should not be evicted by availability alone"
166165
);
167166

168167
registry.retain_since(after_update);
169168
assert_eq!(
170-
registry.by_target.len(),
169+
registry.len(),
171170
0,
172171
"target should be evicted by time once the handle is dropped"
173172
);

linkerd/http-metrics/src/requests/report.rs

Lines changed: 58 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use super::{ClassMetrics, Metrics, StatusMetrics};
22
use crate::{Prefixed, Registry, Report};
33
use linkerd2_metrics::{latency, Counter, FmtLabels, FmtMetric, FmtMetrics, Histogram, Metric};
4-
use std::fmt;
5-
use std::hash::Hash;
6-
use std::time::Instant;
4+
use std::{fmt, hash::Hash, time::Instant};
75
use tracing::trace;
86

7+
#[derive(Copy, Clone)]
98
struct Status(http::StatusCode);
109

1110
impl<T, C> Report<T, Metrics<C>>
@@ -38,84 +37,35 @@ where
3837
}
3938
}
4039

41-
impl<T, C> FmtMetrics for Report<T, Metrics<C>>
42-
where
43-
T: FmtLabels + Hash + Eq,
44-
C: FmtLabels + Hash + Eq,
45-
{
46-
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47-
let mut registry = match self.registry.lock() {
48-
Err(_) => return Ok(()),
49-
Ok(r) => r,
50-
};
51-
trace!(
52-
prefix = self.prefix,
53-
targets = registry.by_target.len(),
54-
include_latencies = self.include_latencies,
55-
"Formatting HTTP request metrics",
56-
);
57-
58-
if registry.by_target.is_empty() {
59-
return Ok(());
60-
}
61-
62-
let metric = self.request_total();
63-
metric.fmt_help(f)?;
64-
registry.fmt_by_target(f, metric, |s| &s.total)?;
65-
66-
if self.include_latencies {
67-
let metric = self.response_latency_ms();
68-
metric.fmt_help(f)?;
69-
registry.fmt_by_status(f, metric, |s| &s.latency)?;
70-
}
71-
72-
let metric = self.response_total();
73-
metric.fmt_help(f)?;
74-
registry.fmt_by_class(f, metric, |s| &s.total)?;
75-
76-
registry.retain_since(Instant::now() - self.retain_idle);
77-
78-
Ok(())
79-
}
80-
}
81-
82-
impl<T, C> Registry<T, Metrics<C>>
40+
impl<T, C> Report<T, Metrics<C>>
8341
where
8442
T: FmtLabels + Hash + Eq,
8543
C: FmtLabels + Hash + Eq,
8644
{
87-
fn fmt_by_target<N, V, F>(
88-
&self,
45+
fn fmt_by_target<N, M>(
46+
registry: &Registry<T, Metrics<C>>,
8947
f: &mut fmt::Formatter<'_>,
90-
metric: Metric<'_, N, V>,
91-
get_metric: F,
48+
metric: Metric<'_, N, M>,
49+
get_metric: impl Fn(&Metrics<C>) -> &M,
9250
) -> fmt::Result
9351
where
9452
N: fmt::Display,
95-
V: FmtMetric,
96-
F: Fn(&Metrics<C>) -> &V,
53+
M: FmtMetric,
9754
{
98-
for (tgt, tm) in &self.by_target {
99-
if let Ok(m) = tm.lock() {
100-
get_metric(&*m).fmt_metric_labeled(f, &metric.name, tgt)?;
101-
}
102-
}
103-
104-
Ok(())
55+
registry.fmt_by_locked(f, metric, get_metric)
10556
}
10657

107-
fn fmt_by_status<N, M, F>(
108-
&self,
58+
fn fmt_by_status<N, M>(
59+
registry: &Registry<T, Metrics<C>>,
10960
f: &mut fmt::Formatter<'_>,
11061
metric: Metric<'_, N, M>,
111-
get_metric: F,
62+
get_metric: impl Fn(&StatusMetrics<C>) -> &M,
11263
) -> fmt::Result
11364
where
11465
N: fmt::Display,
11566
M: FmtMetric,
116-
F: Fn(&StatusMetrics<C>) -> &M,
11767
{
118-
for (tgt, tm) in &self.by_target {
68+
for (tgt, tm) in registry.iter() {
11969
if let Ok(tm) = tm.lock() {
12070
for (status, m) in &tm.by_status {
12171
let status = status.as_ref().map(|s| Status(*s));
@@ -128,18 +78,17 @@ where
12878
Ok(())
12979
}
13080

131-
fn fmt_by_class<N, M, F>(
132-
&self,
81+
fn fmt_by_class<N, M>(
82+
registry: &Registry<T, Metrics<C>>,
13383
f: &mut fmt::Formatter<'_>,
13484
metric: Metric<'_, N, M>,
135-
get_metric: F,
85+
get_metric: impl Fn(&ClassMetrics) -> &M,
13686
) -> fmt::Result
13787
where
13888
N: fmt::Display,
13989
M: FmtMetric,
140-
F: Fn(&ClassMetrics) -> &M,
14190
{
142-
for (tgt, tm) in &self.by_target {
91+
for (tgt, tm) in registry.iter() {
14392
if let Ok(tm) = tm.lock() {
14493
for (status, sm) in &tm.by_status {
14594
for (cls, m) in &sm.by_class {
@@ -155,6 +104,47 @@ where
155104
}
156105
}
157106

107+
impl<T, C> FmtMetrics for Report<T, Metrics<C>>
108+
where
109+
T: FmtLabels + Hash + Eq,
110+
C: FmtLabels + Hash + Eq,
111+
{
112+
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113+
let mut registry = match self.registry.lock() {
114+
Err(_) => return Ok(()),
115+
Ok(r) => r,
116+
};
117+
trace!(
118+
prefix = self.prefix,
119+
targets = registry.len(),
120+
include_latencies = self.include_latencies,
121+
"Formatting HTTP request metrics",
122+
);
123+
124+
if registry.is_empty() {
125+
return Ok(());
126+
}
127+
128+
let metric = self.request_total();
129+
metric.fmt_help(f)?;
130+
Self::fmt_by_target(&registry, f, metric, |s| &s.total)?;
131+
132+
if self.include_latencies {
133+
let metric = self.response_latency_ms();
134+
metric.fmt_help(f)?;
135+
Self::fmt_by_status(&registry, f, metric, |s| &s.latency)?;
136+
}
137+
138+
let metric = self.response_total();
139+
metric.fmt_help(f)?;
140+
Self::fmt_by_class(&registry, f, metric, |s| &s.total)?;
141+
142+
registry.retain_since(Instant::now() - self.retain_idle);
143+
144+
Ok(())
145+
}
146+
}
147+
158148
impl FmtLabels for Status {
159149
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160150
write!(f, "status_code=\"{}\"", self.0.as_u16())

linkerd/http-metrics/src/retries.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,7 @@ impl<T: Hash + Eq> Retries<T> {
3838

3939
pub fn get_handle(&self, target: impl Into<T>) -> Handle {
4040
let mut reg = self.0.lock().expect("retry metrics registry poisoned");
41-
Handle(
42-
reg.by_target
43-
.entry(target.into())
44-
.or_insert_with(|| Arc::new(Mutex::new(Metrics::default())))
45-
.clone(),
46-
)
41+
Handle(reg.entry(target.into()).or_default().clone())
4742
}
4843
}
4944

@@ -110,17 +105,17 @@ where
110105
};
111106
trace!(
112107
prfefix = %self.prefix,
113-
targets = %registry.by_target.len(),
108+
targets = %registry.len(),
114109
"Formatting HTTP retry metrics",
115110
);
116111

117-
if registry.by_target.is_empty() {
112+
if registry.is_empty() {
118113
return Ok(());
119114
}
120115

121116
let metric = self.retryable_total();
122117
metric.fmt_help(f)?;
123-
for (tgt, tm) in &registry.by_target {
118+
for (tgt, tm) in registry.iter() {
124119
if let Ok(m) = tm.lock() {
125120
m.retryable.fmt_metric_labeled(f, &metric.name, tgt)?;
126121
m.no_budget

linkerd/metrics/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ tracing = "0.1.2"
2222

2323
[dev-dependencies]
2424
quickcheck = { version = "0.9", default-features = false }
25-
tokio = { version = "0.3", features = ["macros", "rt-multi-thread", "test-util", "time"] }
25+
tokio = { version = "0.3", features = ["rt", "macros", "test-util", "time"] }

linkerd/metrics/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod latency;
99
mod prom;
1010
mod scopes;
1111
mod serve;
12+
mod store;
1213
#[cfg(feature = "summary")]
1314
mod summary;
1415

@@ -18,6 +19,7 @@ pub use self::histogram::Histogram;
1819
pub use self::prom::{FmtLabels, FmtMetric, FmtMetrics, Metric};
1920
pub use self::scopes::Scopes;
2021
pub use self::serve::Serve;
22+
pub use self::store::{LastUpdate, Store};
2123
#[cfg(feature = "summary")]
2224
pub use self::summary::Summary;
2325

0 commit comments

Comments
 (0)