Skip to content

Commit 62c5b9d

Browse files
hawkwolix0r
andauthored
metrics: add target_addr label to TCP accept error metrics (#1118)
The TCP accept error metrics do not include any context about the target address/port, which makes them difficult to act on. This change adds a `target_addr` label to the `inbound_tcp_accept_errors_total` and `outbound_tcp_accept_errors_total` metrics. Co-authored-by: Oliver Gould <[email protected]>
1 parent c139487 commit 62c5b9d

File tree

15 files changed

+149
-75
lines changed

15 files changed

+149
-75
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,7 @@ dependencies = [
701701
"linkerd-trace-context",
702702
"linkerd-tracing",
703703
"linkerd-transport-header",
704+
"parking_lot",
704705
"pin-project",
705706
"quickcheck",
706707
"regex",

linkerd/app/core/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ linkerd-http-metrics = { path = "../../http-metrics" }
3636
linkerd-http-retry = { path = "../../http-retry" }
3737
linkerd-identity = { path = "../../identity" }
3838
linkerd-io = { path = "../../io" }
39-
linkerd-metrics = { path = "../../metrics" }
39+
linkerd-metrics = { path = "../../metrics", features = ["linkerd-stack"] }
4040
linkerd-transport-header = { path = "../../transport-header" }
4141
linkerd-opencensus = { path = "../../opencensus" }
4242
linkerd-proxy-core = { path = "../../proxy/core" }
@@ -66,6 +66,7 @@ tokio = { version = "1", features = ["macros", "sync", "parking_lot"]}
6666
tokio-stream = { version = "0.1.7", features = ["time"] }
6767
tonic = { version = "0.5", default-features = false, features = ["prost"] }
6868
tracing = "0.1.26"
69+
parking_lot = "0.11"
6970
pin-project = "1"
7071

7172
[dependencies.tower]

linkerd/app/core/src/metrics/mod.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
mod tcp_accept_errors;
22

3-
pub use crate::{
3+
use crate::{
44
classify::{Class, SuccessOrFailure},
5-
control, dst, errors, http_metrics, http_metrics as metrics, opencensus, proxy,
6-
proxy::identity,
7-
stack_metrics,
5+
control, dst, errors, http_metrics, http_metrics as metrics, opencensus, stack_metrics,
86
svc::Param,
97
telemetry, tls,
108
transport::{
119
self,
12-
labels::{TlsAccept, TlsConnect},
10+
labels::{TargetAddr, TlsAccept, TlsConnect},
1311
},
1412
};
1513
use linkerd_addr::Addr;
1614
use linkerd_metrics::FmtLabels;
1715
pub use linkerd_metrics::*;
18-
use std::fmt::{self, Write};
19-
use std::net::SocketAddr;
20-
use std::time::{Duration, SystemTime};
16+
use std::{
17+
fmt::{self, Write},
18+
net::SocketAddr,
19+
time::{Duration, SystemTime},
20+
};
2121

2222
pub type ControlHttp = http_metrics::Requests<ControlLabels, Class>;
2323

@@ -38,7 +38,7 @@ pub struct Proxy {
3838
pub http_errors: errors::MetricsLayer,
3939
pub stack: Stack,
4040
pub transport: transport::Metrics,
41-
pub tcp_accept_errors: tcp_accept_errors::Layer,
41+
pub tcp_accept_errors: tcp_accept_errors::Registry,
4242
}
4343

4444
#[derive(Clone, Debug)]
@@ -173,7 +173,7 @@ impl Metrics {
173173
http_errors: http_errors.inbound(),
174174
stack: stack.clone(),
175175
transport: transport.clone(),
176-
tcp_accept_errors: inbound_tcp_accept_errors.layer(),
176+
tcp_accept_errors: inbound_tcp_accept_errors.clone(),
177177
},
178178
outbound: Proxy {
179179
http_endpoint,
@@ -183,7 +183,7 @@ impl Metrics {
183183
http_errors: http_errors.outbound(),
184184
stack: stack.clone(),
185185
transport,
186-
tcp_accept_errors: outbound_tcp_accept_errors.layer(),
186+
tcp_accept_errors: outbound_tcp_accept_errors.clone(),
187187
},
188188
control,
189189
opencensus,
@@ -282,9 +282,7 @@ impl FmtLabels for InboundEndpointLabels {
282282
write!(f, ",")?;
283283
}
284284

285-
write!(f, "target_addr=\"{}\",", self.target_addr)?;
286-
287-
TlsAccept::from(&self.tls).fmt_labels(f)?;
285+
(TargetAddr(self.target_addr), TlsAccept::from(&self.tls)).fmt_labels(f)?;
288286

289287
Ok(())
290288
}
@@ -297,9 +295,9 @@ impl FmtLabels for OutboundEndpointLabels {
297295
write!(f, ",")?;
298296
}
299297

300-
write!(f, "target_addr=\"{}\",", self.target_addr)?;
301-
302-
TlsConnect::from(&self.server_id).fmt_labels(f)?;
298+
let ta = TargetAddr(self.target_addr);
299+
let tls = TlsConnect::from(&self.server_id);
300+
(ta, tls).fmt_labels(f)?;
303301

304302
if let Some(labels) = self.labels.as_ref() {
305303
write!(f, ",{}", labels)?;

linkerd/app/core/src/metrics/tcp_accept_errors.rs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
use crate::{
2+
metrics::{self, Counter, FmtMetrics},
3+
svc,
4+
transport::{labels, OrigDstAddr},
5+
};
16
use linkerd_error::Error;
2-
use linkerd_error_metrics::{FmtLabels, LabelError, RecordErrorLayer};
3-
use linkerd_metrics::{metrics, Counter, FmtMetrics};
7+
use linkerd_error_metrics::{FmtLabels, LabelError, RecordError};
48
use linkerd_tls::server::DetectTimeout as TlsDetectTimeout;
5-
use std::fmt;
9+
use parking_lot::Mutex;
10+
use std::{collections::HashMap, fmt};
611

7-
metrics! {
12+
metrics::metrics! {
813
inbound_tcp_accept_errors_total: Counter {
914
"The total number of inbound TCP connections that could not be processed due to a proxy error."
1015
},
@@ -15,9 +20,17 @@ metrics! {
1520
}
1621

1722
#[derive(Clone, Debug)]
18-
pub struct Registry(linkerd_error_metrics::Registry<AcceptErrors>);
23+
pub struct Registry {
24+
scopes: metrics::SharedStore<OrigDstAddr, Scope>,
25+
metric: linkerd_error_metrics::Metric,
26+
}
27+
28+
type Scope = Mutex<HashMap<AcceptErrors, metrics::Counter>>;
29+
30+
type NewErrorMetrics<N, S> =
31+
metrics::NewMetrics<N, OrigDstAddr, Scope, RecordError<LabelAcceptErrors, AcceptErrors, S>>;
1932

20-
#[derive(Clone, Copy, Debug)]
33+
#[derive(Clone, Copy, Debug, Default)]
2134
pub struct LabelAcceptErrors(());
2235

2336
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
@@ -27,32 +40,44 @@ pub enum AcceptErrors {
2740
Other,
2841
}
2942

30-
pub type Layer = RecordErrorLayer<LabelAcceptErrors, AcceptErrors>;
31-
3243
// === impl Registry ===
3344

3445
impl Registry {
3546
pub fn inbound() -> Self {
36-
Self(linkerd_error_metrics::Registry::new(
37-
inbound_tcp_accept_errors_total,
38-
))
47+
Self {
48+
metric: inbound_tcp_accept_errors_total,
49+
scopes: Default::default(),
50+
}
3951
}
4052

4153
pub fn outbound() -> Self {
42-
Self(linkerd_error_metrics::Registry::new(
43-
outbound_tcp_accept_errors_total,
44-
))
54+
Self {
55+
metric: outbound_tcp_accept_errors_total,
56+
scopes: Default::default(),
57+
}
4558
}
4659

47-
pub fn layer(&self) -> Layer {
48-
self.0.layer(LabelAcceptErrors(()))
60+
pub fn layer<T, N: svc::NewService<T>>(
61+
&self,
62+
) -> impl svc::Layer<N, Service = NewErrorMetrics<N, N::Service>> + Clone {
63+
metrics::NewMetrics::layer(self.scopes.clone())
4964
}
5065
}
5166

5267
impl FmtMetrics for Registry {
53-
#[inline]
5468
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55-
self.0.fmt_metrics(f)
69+
use metrics::FmtMetric;
70+
let errors = self.scopes.lock();
71+
72+
self.metric.fmt_help(f)?;
73+
for (OrigDstAddr(a), ms) in errors.iter() {
74+
let ta = labels::TargetAddr(*a);
75+
for (e, m) in ms.lock().iter() {
76+
m.fmt_metric_labeled(f, self.metric.name, (ta, e))?;
77+
}
78+
}
79+
80+
Ok(())
5681
}
5782
}
5883

linkerd/app/core/src/transport/labels.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ pub enum Key {
2121
}
2222

2323
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
24-
pub struct TlsAccept<'t>(&'t tls::ConditionalServerTls);
24+
pub(crate) struct TlsAccept<'t>(&'t tls::ConditionalServerTls);
2525

2626
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
27-
pub struct TlsConnect<'t>(&'t tls::ConditionalClientTls);
27+
pub(crate) struct TlsConnect<'t>(&'t tls::ConditionalClientTls);
28+
29+
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
30+
pub(crate) struct TargetAddr(pub(crate) SocketAddr);
2831

2932
// === impl Key ===
3033

@@ -51,8 +54,8 @@ impl FmtLabels for Key {
5154
target_addr,
5255
} => {
5356
direction.fmt_labels(f)?;
54-
write!(f, ",peer=\"src\",target_addr=\"{}\",", target_addr)?;
55-
TlsAccept::from(tls).fmt_labels(f)
57+
f.write_str(",peer=\"src\",")?;
58+
(TargetAddr(*target_addr), TlsAccept::from(tls)).fmt_labels(f)
5659
}
5760
Self::OutboundConnect(endpoint) => {
5861
Direction::Out.fmt_labels(f)?;
@@ -122,3 +125,11 @@ impl<'t> FmtLabels for TlsConnect<'t> {
122125
}
123126
}
124127
}
128+
129+
// === impl TargetAddr ===
130+
131+
impl FmtLabels for TargetAddr {
132+
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133+
write!(f, "target_addr=\"{}\"", self.0)
134+
}
135+
}

linkerd/app/inbound/src/direct.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ impl<N> Inbound<N> {
6969
gateway: G,
7070
) -> Inbound<svc::BoxNewService<T, svc::BoxService<I, (), Error>>>
7171
where
72-
T: Param<Remote<ClientAddr>> + Param<OrigDstAddr> + Clone + Send + 'static,
72+
T: Param<Remote<ClientAddr>> + Param<OrigDstAddr>,
73+
T: Clone + Send + 'static,
7374
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
7475
I: Debug + Send + Sync + Unpin + 'static,
7576
N: svc::NewService<TcpEndpoint, Service = NSvc> + Clone + Send + Sync + Unpin + 'static,

linkerd/app/inbound/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ where
204204
gateway: G,
205205
) -> svc::BoxNewService<T, svc::BoxService<I, (), Error>>
206206
where
207-
T: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr> + Clone + Send + 'static,
207+
T: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr>,
208+
T: Clone + Send + 'static,
208209
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
209210
I: Debug + Send + Sync + Unpin + 'static,
210211
G: svc::NewService<direct::GatewayConnection, Service = GSvc>,
@@ -292,7 +293,7 @@ where
292293
let OrigDstAddr(target_addr) = a.param();
293294
info_span!("server", port = target_addr.port())
294295
})
295-
.push_on_response(rt.metrics.tcp_accept_errors)
296+
.push(rt.metrics.tcp_accept_errors.layer())
296297
.push_on_response(svc::BoxService::layer())
297298
.push(svc::BoxNewService::layer())
298299
.into_inner()

linkerd/app/inbound/src/target.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ impl Param<SocketAddr> for TcpAccept {
9292
}
9393
}
9494

95+
impl Param<OrigDstAddr> for TcpAccept {
96+
fn param(&self) -> OrigDstAddr {
97+
OrigDstAddr(self.target_addr)
98+
}
99+
}
100+
95101
impl Param<transport::labels::Key> for TcpAccept {
96102
fn param(&self) -> transport::labels::Key {
97103
transport::labels::Key::accept(

linkerd/app/integration/src/tests/telemetry/tcp_accept_errors.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ async fn run_proxy(
6060
(proxy, admin_client)
6161
}
6262

63+
fn metric(proxy: &proxy::Listening) -> metrics::MetricMatch {
64+
metrics::metric(METRIC).label("target_addr", proxy.inbound_server.as_ref().unwrap().addr)
65+
}
66+
6367
/// Tests that the detect metric is labeled and incremented on timeout.
6468
#[tokio::test]
6569
async fn inbound_timeout() {
@@ -73,7 +77,7 @@ async fn inbound_timeout() {
7377
tokio::time::sleep(TIMEOUT + Duration::from_millis(15)) // just in case
7478
.await;
7579

76-
metrics::metric(METRIC)
80+
metric(&proxy)
7781
.label("error", "tls_detect_timeout")
7882
.value(1u64)
7983
.assert_in(&metrics)
@@ -93,7 +97,7 @@ async fn inbound_io_err() {
9397
tcp_client.write(TcpFixture::HELLO_MSG).await;
9498
drop(tcp_client);
9599

96-
metrics::metric(METRIC)
100+
metric(&proxy)
97101
.label("error", "io")
98102
.value(1u64)
99103
.assert_in(&metrics)
@@ -119,7 +123,7 @@ async fn inbound_success() {
119123
);
120124
let no_tls_client = client::tcp(proxy.inbound);
121125

122-
let metric = metrics::metric(METRIC)
126+
let metric = metric(&proxy)
123127
.label("error", "tls_detect_timeout")
124128
.value(1u64);
125129

@@ -150,8 +154,9 @@ async fn inbound_multi() {
150154
let (proxy, metrics) = default_proxy().await;
151155
let client = client::tcp(proxy.inbound);
152156

153-
let timeout_metric = metrics::metric(METRIC).label("error", "tls_detect_timeout");
154-
let io_metric = metrics::metric(METRIC).label("error", "io");
157+
let metric = metric(&proxy);
158+
let timeout_metric = metric.clone().label("error", "tls_detect_timeout");
159+
let io_metric = metric.label("error", "io");
155160

156161
let tcp_client = client.connect().await;
157162

@@ -200,8 +205,9 @@ async fn inbound_direct_multi() {
200205
let (proxy, metrics) = run_proxy(proxy, identity).await;
201206
let client = client::tcp(proxy.inbound);
202207

203-
let timeout_metric = metrics::metric(METRIC).label("error", "tls_detect_timeout");
204-
let no_tls_metric = metrics::metric(METRIC).label("error", "other");
208+
let metric = metrics::metric(METRIC).label("target_addr", proxy.inbound);
209+
let timeout_metric = metric.clone().label("error", "tls_detect_timeout");
210+
let no_tls_metric = metric.clone().label("error", "other");
205211

206212
let tcp_client = client.connect().await;
207213

@@ -248,7 +254,11 @@ async fn inbound_direct_success() {
248254
// connections require mutual authentication.
249255
let auth = "bar.ns1.svc.cluster.local";
250256
let ctrl = controller::new();
251-
let dst = format!("{}:{}", auth, proxy1.inbound.port());
257+
let dst = format!(
258+
"{}:{}",
259+
auth,
260+
proxy1.inbound_server.as_ref().unwrap().addr.port()
261+
);
252262
let _profile_out = ctrl.profile_tx_default(proxy1.inbound, auth);
253263
let dst = ctrl.destination_tx(dst);
254264
dst.send(controller::destination_add_tls(
@@ -265,6 +275,7 @@ async fn inbound_direct_success() {
265275
let no_tls_client = client::tcp(proxy1.inbound);
266276

267277
let metric = metrics::metric(METRIC)
278+
.label("target_addr", proxy1.inbound)
268279
.label("error", "tls_detect_timeout")
269280
.value(1u64);
270281

linkerd/app/outbound/src/discover.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl<N> Outbound<N> {
7676
.push_cache(config.proxy.cache_max_idle_age)
7777
.instrument(|a: &tcp::Accept| info_span!("server", orig_dst = %a.orig_dst))
7878
.push_request_filter(|t: T| tcp::Accept::try_from(t.param()))
79-
.push_on_response(rt.metrics.tcp_accept_errors.clone())
79+
.push(rt.metrics.tcp_accept_errors.layer())
8080
.push(svc::BoxNewService::layer())
8181
.check_new_service::<T, I>()
8282
})

0 commit comments

Comments
 (0)