Skip to content

Commit 6ef1ec8

Browse files
authored
feat(http/prom): introduce StreamLabel components (#4251)
see #4250 (59f90ac) for previous related work. * feat(http/prom): add http and grpc status labelers this commit introduces a collection of `MkStreamLabel` and `StreamLabel` implementations that can be used to inspect status codes for HTTP and gRPC traffic. these do not emit duration labels, but can be used by other implementors of these traits to perform the common logic of checking HTTP status codes in response front matter, and for inspecting body trailers in gRPC traffic. Signed-off-by: katelyn martin <[email protected]> * feat(http/prom): add an error labeler this commit introduces a `LabelError<E>` type, which provides a `StreamLabel` implementation that maps boxed errors to labels. this is generic across `E`-typed labels that can be constructed `From` a reference to a boxed `linkerd_error::Error`. Signed-off-by: katelyn martin <[email protected]> * feat(http/prom): add an identity labeler this commit introduces a simple "identity" implementation of `MkStreamLabel` and `StreamLabel`. this is useful for metrics such as our duration recording histogram, that do not inspect the request, response, or outcome for further labels. Signed-off-by: katelyn martin <[email protected]> --------- Signed-off-by: katelyn martin <[email protected]>
1 parent 59f90ac commit 6ef1ec8

File tree

6 files changed

+209
-0
lines changed

6 files changed

+209
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,6 +1833,7 @@ dependencies = [
18331833
"prometheus-client",
18341834
"thiserror",
18351835
"tokio",
1836+
"tonic",
18361837
]
18371838

18381839
[[package]]

linkerd/http/prom/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pin-project = "1"
2121
prometheus-client = { workspace = true }
2222
thiserror = "2"
2323
tokio = { version = "1", features = ["time"] }
24+
tonic = { workspace = true }
2425

2526
linkerd-error = { path = "../../error" }
2627
linkerd-http-box = { path = "../box" }

linkerd/http/prom/src/stream_label.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
use linkerd_error::Error;
44
use prometheus_client::encoding::EncodeLabelSet;
55

6+
pub mod error;
7+
pub mod status;
8+
pub mod with;
9+
610
/// A strategy for labeling request/responses streams for status and duration
711
/// metrics.
812
///
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//! [`StreamLabel`] implementation for labeling errors.
2+
3+
use super::StreamLabel;
4+
use linkerd_error::Error;
5+
6+
/// A [`StreamLabel`] implementation that maps boxed errors to labels.
7+
#[derive(Clone, Debug, Default)]
8+
pub struct LabelError<E> {
9+
error: Option<E>,
10+
}
11+
12+
// === impl LabelError ===
13+
14+
impl<E> StreamLabel for LabelError<E>
15+
where
16+
E: for<'a> From<&'a Error>,
17+
E: Clone + Send + 'static,
18+
{
19+
type DurationLabels = ();
20+
type StatusLabels = Option<E>;
21+
22+
fn init_response<B>(&mut self, _: &http::Response<B>) {}
23+
24+
fn end_response(&mut self, res: Result<Option<&http::HeaderMap>, &Error>) {
25+
let Err(err) = res else { return };
26+
let labels = E::from(err);
27+
self.error = Some(labels);
28+
}
29+
30+
fn status_labels(&self) -> Self::StatusLabels {
31+
self.error.clone()
32+
}
33+
34+
fn duration_labels(&self) -> Self::DurationLabels {}
35+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//! [`MkStreamLabel`] and [`StreamLabel`] implementations for status codes.
2+
//!
3+
//! This submodule provides [`MkLabelGrpcStatus`] and [`MkLabelHttpStatus`]
4+
5+
use crate::stream_label::{MkStreamLabel, StreamLabel};
6+
use http::{HeaderMap, HeaderValue, Response, StatusCode};
7+
use linkerd_error::Error;
8+
use tonic::Code;
9+
10+
/// A [`MkStreamLabel`] implementation for gRPC traffic.
11+
///
12+
/// This generates [`LabelGrpcStatus`] labelers.
13+
#[derive(Debug)]
14+
pub struct MkLabelGrpcStatus;
15+
16+
/// A [`StreamLabel`] implementation for gRPC traffic.
17+
///
18+
/// This will inspect response headers and trailers for a [`Code`].
19+
#[derive(Clone, Debug, Default)]
20+
pub struct LabelGrpcStatus {
21+
code: Option<Code>,
22+
}
23+
24+
/// A [`MkStreamLabel`] implementation for HTTP traffic.
25+
///
26+
/// This generates [`LabelHttpStatus`] labelers.
27+
#[derive(Debug)]
28+
pub struct MkLabelHttpStatus;
29+
30+
/// A [`StreamLabel`] implementation for HTTP traffic.
31+
///
32+
/// This will inspect the response headers for a [`StatusCode`].
33+
#[derive(Clone, Debug, Default)]
34+
pub struct LabelHttpStatus {
35+
status: Option<StatusCode>,
36+
}
37+
38+
// === impl MkLabelGrpcStatus ===
39+
40+
impl MkStreamLabel for MkLabelGrpcStatus {
41+
type DurationLabels = <Self::StreamLabel as StreamLabel>::DurationLabels;
42+
type StatusLabels = <Self::StreamLabel as StreamLabel>::StatusLabels;
43+
type StreamLabel = LabelGrpcStatus;
44+
45+
fn mk_stream_labeler<B>(&self, _: &http::Request<B>) -> Option<Self::StreamLabel> {
46+
Some(LabelGrpcStatus::default())
47+
}
48+
}
49+
50+
// === LabelGrpcStatus ===
51+
52+
impl StreamLabel for LabelGrpcStatus {
53+
type DurationLabels = ();
54+
type StatusLabels = Option<Code>;
55+
56+
fn init_response<B>(&mut self, rsp: &Response<B>) {
57+
let headers = rsp.headers();
58+
self.code = Self::get_grpc_status(headers);
59+
}
60+
61+
fn end_response(&mut self, trailers: Result<Option<&HeaderMap>, &Error>) {
62+
let Ok(Some(trailers)) = trailers else { return };
63+
self.code = Self::get_grpc_status(trailers);
64+
}
65+
66+
fn status_labels(&self) -> Self::StatusLabels {
67+
self.code
68+
}
69+
70+
fn duration_labels(&self) -> Self::DurationLabels {}
71+
}
72+
73+
impl LabelGrpcStatus {
74+
fn get_grpc_status(headers: &HeaderMap) -> Option<Code> {
75+
headers
76+
.get("grpc-status")
77+
.map(HeaderValue::as_bytes)
78+
.map(tonic::Code::from_bytes)
79+
}
80+
}
81+
82+
// === impl MkLabelHttpStatus ===
83+
84+
impl MkStreamLabel for MkLabelHttpStatus {
85+
type DurationLabels = <Self::StreamLabel as StreamLabel>::DurationLabels;
86+
type StatusLabels = <Self::StreamLabel as StreamLabel>::StatusLabels;
87+
type StreamLabel = LabelHttpStatus;
88+
89+
fn mk_stream_labeler<B>(&self, _: &http::Request<B>) -> Option<Self::StreamLabel> {
90+
Some(LabelHttpStatus::default())
91+
}
92+
}
93+
94+
// === impl LabelHttpStatus ===
95+
96+
impl StreamLabel for LabelHttpStatus {
97+
type DurationLabels = ();
98+
type StatusLabels = Option<StatusCode>;
99+
100+
fn init_response<B>(&mut self, rsp: &http::Response<B>) {
101+
self.status = Some(rsp.status());
102+
}
103+
104+
fn end_response(&mut self, _: Result<Option<&http::HeaderMap>, &linkerd_error::Error>) {}
105+
106+
fn status_labels(&self) -> Self::StatusLabels {
107+
self.status
108+
}
109+
110+
fn duration_labels(&self) -> Self::DurationLabels {}
111+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//! [`StreamLabel`] implementation for labels known in advance.
2+
3+
use super::{MkStreamLabel, StreamLabel};
4+
5+
/// A [`MkStreamLabel`] implementation for `L`-typed labels.
6+
///
7+
/// This is useful for situations in which a tower middleware does not need to insect the request,
8+
/// response, or the final outcome.
9+
pub struct MkWithLabels<L> {
10+
labels: L,
11+
}
12+
13+
/// A [`StreamLabel`] implementation for `L`-typed labels.
14+
///
15+
/// This is useful for situations in which a tower middleware does not need to insect the request,
16+
/// response, or the final outcome.
17+
#[derive(Clone, Debug, Default)]
18+
pub struct WithLabels<L> {
19+
labels: L,
20+
}
21+
22+
// === impl MkWithLabels ===
23+
24+
impl<L> MkStreamLabel for MkWithLabels<L>
25+
where
26+
L: Clone + Send + 'static,
27+
{
28+
type DurationLabels = L;
29+
type StatusLabels = L;
30+
type StreamLabel = WithLabels<L>;
31+
fn mk_stream_labeler<B>(&self, _: &http::Request<B>) -> Option<Self::StreamLabel> {
32+
Some(WithLabels {
33+
labels: self.labels.clone(),
34+
})
35+
}
36+
}
37+
38+
// === impl WithLabels ===
39+
40+
impl<L> StreamLabel for WithLabels<L>
41+
where
42+
L: Clone + Send + 'static,
43+
{
44+
type DurationLabels = L;
45+
type StatusLabels = L;
46+
47+
fn init_response<B>(&mut self, _: &http::Response<B>) {}
48+
fn end_response(&mut self, _: Result<Option<&http::HeaderMap>, &linkerd_error::Error>) {}
49+
50+
fn status_labels(&self) -> Self::StatusLabels {
51+
self.labels.clone()
52+
}
53+
54+
fn duration_labels(&self) -> Self::DurationLabels {
55+
self.labels.clone()
56+
}
57+
}

0 commit comments

Comments
 (0)