Skip to content

Commit eb38172

Browse files
olix0rhawkw
andauthored
metrics: Use parking_lot::Mutex in telemetry (#1142)
PR #1117 includes a change to move some uses of `std::sync::Mutex` to `parking_lot::Mutex`, as the latter implementation is generally more efficient. This change updates all telemetry-related uses of Mutex with the implementation in `parking_lot`. Co-authored-by: Eliza Weisman <[email protected]>
1 parent 0280121 commit eb38172

File tree

19 files changed

+125
-158
lines changed

19 files changed

+125
-158
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ version = "0.1.0"
911911
dependencies = [
912912
"futures",
913913
"linkerd-metrics",
914+
"parking_lot",
914915
"pin-project",
915916
"tower",
916917
]
@@ -974,6 +975,7 @@ dependencies = [
974975
"linkerd-http-classify",
975976
"linkerd-metrics",
976977
"linkerd-stack",
978+
"parking_lot",
977979
"pin-project",
978980
"tower",
979981
"tracing",
@@ -1200,6 +1202,7 @@ dependencies = [
12001202
"linkerd-stack",
12011203
"linkerd-tls",
12021204
"linkerd2-proxy-api",
1205+
"parking_lot",
12031206
"pin-project",
12041207
"prost-types",
12051208
"quickcheck",
@@ -1237,6 +1240,7 @@ dependencies = [
12371240
"linkerd-io",
12381241
"linkerd-metrics",
12391242
"linkerd-stack",
1243+
"parking_lot",
12401244
"pin-project",
12411245
"socket2 0.4.0",
12421246
"tokio",
@@ -1331,6 +1335,7 @@ name = "linkerd-stack-metrics"
13311335
version = "0.1.0"
13321336
dependencies = [
13331337
"linkerd-metrics",
1338+
"parking_lot",
13341339
"tokio",
13351340
"tower",
13361341
]

linkerd/error-metrics/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ publish = false
1010
[dependencies]
1111
futures = { version = "0.3", default-features = false }
1212
linkerd-metrics = { path = "../metrics" }
13+
parking_lot = "0.11"
1314
pin-project = "1"
1415
tower = { version = "0.4.8", default-features = false }

linkerd/error-metrics/src/layer.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use crate::RecordError;
22
use linkerd_metrics::Counter;
3-
use std::{
4-
collections::HashMap,
5-
hash::Hash,
6-
sync::{Arc, Mutex},
7-
};
3+
use parking_lot::Mutex;
4+
use std::{collections::HashMap, hash::Hash, sync::Arc};
85

96
#[derive(Debug)]
107
pub struct RecordErrorLayer<L, K: Hash + Eq> {

linkerd/error-metrics/src/lib.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,8 @@ pub use self::layer::RecordErrorLayer;
99
pub use self::service::RecordError;
1010
pub use linkerd_metrics::FmtLabels;
1111
use linkerd_metrics::{self as metrics, Counter, FmtMetrics};
12-
use std::{
13-
collections::HashMap,
14-
fmt,
15-
hash::Hash,
16-
sync::{Arc, Mutex},
17-
};
12+
use parking_lot::Mutex;
13+
use std::{collections::HashMap, fmt, hash::Hash, sync::Arc};
1814

1915
pub trait LabelError<E> {
2016
type Labels: FmtLabels + Hash + Eq;
@@ -58,10 +54,7 @@ impl<K: Hash + Eq> Clone for Registry<K> {
5854

5955
impl<K: FmtLabels + Hash + Eq> FmtMetrics for Registry<K> {
6056
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61-
let errors = match self.errors.lock() {
62-
Ok(errors) => errors,
63-
Err(_) => return Ok(()),
64-
};
57+
let errors = self.errors.lock();
6558
if errors.is_empty() {
6659
return Ok(());
6760
}

linkerd/error-metrics/src/service.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use crate::LabelError;
22
use futures::TryFuture;
33
use linkerd_metrics::{Counter, FmtLabels};
4+
use parking_lot::Mutex;
45
use pin_project::pin_project;
56
use std::{
67
collections::HashMap,
78
future::Future,
89
hash::Hash,
910
pin::Pin,
10-
sync::{Arc, Mutex},
11+
sync::Arc,
1112
task::{Context, Poll},
1213
};
1314

@@ -38,9 +39,11 @@ impl<L, K: FmtLabels + Hash + Eq, S> RecordError<L, K, S> {
3839
L: LabelError<E, Labels = K> + Clone,
3940
{
4041
let labels = label.label_error(&err);
41-
if let Ok(mut errors) = errors.lock() {
42-
errors.entry(labels).or_insert_with(Default::default).incr();
43-
}
42+
errors
43+
.lock()
44+
.entry(labels)
45+
.or_insert_with(Default::default)
46+
.incr();
4447
}
4548
}
4649

linkerd/http-metrics/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ hyper = { version = "0.14.10", features = ["http1", "http2"] }
1515
linkerd-error = { path = "../error" }
1616
linkerd-http-classify = { path = "../http-classify" }
1717
linkerd-metrics = { path = "../metrics" }
18-
linkerd-stack = { path = "../stack" }
18+
linkerd-stack = { path = "../stack" }
19+
parking_lot = "0.11"
20+
pin-project = "1"
1921
tower = "0.4.8"
2022
tracing = "0.1.26"
21-
pin-project = "1"

linkerd/http-metrics/src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44

55
pub use self::{requests::Requests, retries::Retries};
66
use linkerd_metrics::{LastUpdate, Store};
7-
use std::fmt;
8-
use std::hash::Hash;
9-
use std::sync::{Arc, Mutex};
10-
use std::time::Duration;
7+
use parking_lot::Mutex;
8+
use std::{fmt, hash::Hash, sync::Arc, time::Duration};
119

1210
pub mod requests;
1311
pub mod retries;

linkerd/http-metrics/src/requests/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ use super::{LastUpdate, Registry, Report};
55
use linkerd_http_classify::ClassifyResponse;
66
use linkerd_metrics::{latency, Counter, FmtMetrics, Histogram};
77
use linkerd_stack::layer;
8+
use parking_lot::Mutex;
89
use std::{
910
collections::HashMap,
1011
fmt::Debug,
1112
hash::Hash,
12-
sync::{Arc, Mutex},
13+
sync::Arc,
1314
time::{Duration, Instant},
1415
};
1516

@@ -144,7 +145,7 @@ mod tests {
144145
let retain_idle_for = Duration::from_secs(1);
145146
let r = super::Requests::<Target, Class>::default();
146147
let report = r.clone().into_report(retain_idle_for);
147-
let mut registry = r.0.lock().unwrap();
148+
let mut registry = r.0.lock();
148149

149150
let before_update = Instant::now();
150151
let metrics = registry

linkerd/http-metrics/src/requests/report.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,11 @@ where
6666
M: FmtMetric,
6767
{
6868
for (tgt, tm) in registry.iter() {
69-
if let Ok(tm) = tm.lock() {
70-
for (status, m) in &tm.by_status {
71-
let status = status.as_ref().map(|s| Status(*s));
72-
let labels = (tgt, status);
73-
get_metric(&*m).fmt_metric_labeled(f, &metric.name, labels)?;
74-
}
69+
let tm = tm.lock();
70+
for (status, m) in &tm.by_status {
71+
let status = status.as_ref().map(|s| Status(*s));
72+
let labels = (tgt, status);
73+
get_metric(&*m).fmt_metric_labeled(f, &metric.name, labels)?;
7574
}
7675
}
7776

@@ -89,13 +88,12 @@ where
8988
M: FmtMetric,
9089
{
9190
for (tgt, tm) in registry.iter() {
92-
if let Ok(tm) = tm.lock() {
93-
for (status, sm) in &tm.by_status {
94-
for (cls, m) in &sm.by_class {
95-
let status = status.as_ref().map(|s| Status(*s));
96-
let labels = (tgt, (status, cls));
97-
get_metric(&*m).fmt_metric_labeled(f, &metric.name, labels)?;
98-
}
91+
let tm = tm.lock();
92+
for (status, sm) in &tm.by_status {
93+
for (cls, m) in &sm.by_class {
94+
let status = status.as_ref().map(|s| Status(*s));
95+
let labels = (tgt, (status, cls));
96+
get_metric(&*m).fmt_metric_labeled(f, &metric.name, labels)?;
9997
}
10098
}
10199
}
@@ -110,10 +108,7 @@ where
110108
C: FmtLabels + Hash + Eq,
111109
{
112110
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113-
let mut registry = match self.registry.lock() {
114-
Err(_) => return Ok(()),
115-
Ok(r) => r,
116-
};
111+
let mut registry = self.registry.lock();
117112
trace!(
118113
prefix = self.prefix,
119114
targets = registry.len(),

linkerd/http-metrics/src/requests/service.rs

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ use http_body::Body;
44
use linkerd_error::Error;
55
use linkerd_http_classify::{ClassifyEos, ClassifyResponse};
66
use linkerd_stack::{NewService, Param, Proxy};
7+
use parking_lot::Mutex;
78
use pin_project::{pin_project, pinned_drop};
8-
use std::fmt::Debug;
9-
use std::hash::Hash;
10-
use std::marker::PhantomData;
11-
use std::pin::Pin;
12-
use std::sync::{Arc, Mutex};
13-
use std::task::{Context, Poll};
14-
use std::time::Instant;
9+
use std::{
10+
fmt::Debug,
11+
hash::Hash,
12+
marker::PhantomData,
13+
pin::Pin,
14+
sync::Arc,
15+
task::{Context, Poll},
16+
time::Instant,
17+
};
1518

1619
/// Wraps services to record metrics.
1720
#[derive(Debug)]
@@ -126,13 +129,13 @@ where
126129
type Service = HttpMetrics<N::Service, C>;
127130

128131
fn new_service(&mut self, target: T) -> Self::Service {
129-
let metrics = match self.registry.lock() {
130-
Ok(mut r) => Some(
132+
let metrics = {
133+
let mut r = self.registry.lock();
134+
Some(
131135
r.entry(target.param())
132136
.or_insert_with(|| Arc::new(Mutex::new(Metrics::default())))
133137
.clone(),
134-
),
135-
Err(_) => None,
138+
)
136139
};
137140

138141
let inner = self.inner.new_service(target);
@@ -182,10 +185,9 @@ where
182185
if req.body().is_end_stream() {
183186
if let Some(lock) = req_metrics.take() {
184187
let now = Instant::now();
185-
if let Ok(mut metrics) = lock.lock() {
186-
(*metrics).last_update = now;
187-
(*metrics).total.incr();
188-
}
188+
let mut metrics = lock.lock();
189+
(*metrics).last_update = now;
190+
(*metrics).total.incr();
189191
}
190192
}
191193

@@ -232,10 +234,9 @@ where
232234
if req.body().is_end_stream() {
233235
if let Some(lock) = req_metrics.take() {
234236
let now = Instant::now();
235-
if let Ok(mut metrics) = lock.lock() {
236-
(*metrics).last_update = now;
237-
(*metrics).total.incr();
238-
}
237+
let mut metrics = lock.lock();
238+
(*metrics).last_update = now;
239+
(*metrics).total.incr();
239240
}
240241
}
241242

@@ -324,10 +325,9 @@ where
324325

325326
if let Some(lock) = this.metrics.take() {
326327
let now = Instant::now();
327-
if let Ok(mut metrics) = lock.lock() {
328-
(*metrics).last_update = now;
329-
(*metrics).total.incr();
330-
}
328+
let mut metrics = lock.lock();
329+
(*metrics).last_update = now;
330+
(*metrics).total.incr();
331331
}
332332

333333
Poll::Ready(frame)
@@ -390,10 +390,7 @@ where
390390
Some(lock) => lock,
391391
None => return,
392392
};
393-
let mut metrics = match lock.lock() {
394-
Ok(m) => m,
395-
Err(_) => return,
396-
};
393+
let mut metrics = lock.lock();
397394

398395
(*metrics).last_update = now;
399396

@@ -434,10 +431,7 @@ fn measure_class<C: Hash + Eq>(
434431
status: Option<http::StatusCode>,
435432
) {
436433
let now = Instant::now();
437-
let mut metrics = match lock.lock() {
438-
Ok(m) => m,
439-
Err(_) => return,
440-
};
434+
let mut metrics = lock.lock();
441435

442436
(*metrics).last_update = now;
443437

0 commit comments

Comments
 (0)