Skip to content

Commit 9a3b864

Browse files
committed
feat(app/inbound): http and grpc status code metrics
in #4298, we introduced a new metrics telemetry layer that can measure and report status codes, in a protocol-agnostic fashion. this commit integrates this status code telemtry into the inbound proxy, so that HTTP and gRPC traffic can be observed. a new family of metrics is introduced to the `InboundMetrics` structure, and the inbound http\* router's metrics layer is accordingly updated to thread this metrics family into an extractor, which is in turn provided to the `NewRecordStatusCode` layer. \* as a note for reviewers, the inbound proxy does not model the http and grpc protocols as distinct concepts in the network stack's type system, unlike the outbound proxy. this means that while target types in the outbound proxy like `Http<()>` are specific to HTTP, the inbound proxy's distinction of HTTP/gRPC is determined by obtaining and inspecting the `PermitVariant`. #### 🔗 related some previous pull requests related to this change: * #4298 * #4180 * #4203 * #4127 * #4119 Signed-off-by: katelyn martin <[email protected]>
1 parent b5aaab1 commit 9a3b864

File tree

3 files changed

+240
-4
lines changed

3 files changed

+240
-4
lines changed

linkerd/app/inbound/src/http/router/metrics.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use crate::InboundMetrics;
22
use linkerd_app_core::svc;
33

4-
pub use self::{count_reqs::*, labels::RouteLabels, req_body::*, rsp_body::*};
4+
pub use self::{count_reqs::*, labels::RouteLabels, req_body::*, rsp_body::*, status::*};
55

66
mod count_reqs;
77
mod labels;
88
mod req_body;
99
mod rsp_body;
10+
mod status;
1011

1112
pub(super) fn layer<N>(
1213
InboundMetrics {
1314
request_count,
1415
request_body_data,
1516
response_body_data,
17+
status_codes,
1618
..
1719
}: &InboundMetrics,
1820
) -> impl svc::Layer<N, Service = Instrumented<N>> {
@@ -33,8 +35,14 @@ pub(super) fn layer<N>(
3335
NewRecordRequestBodyData::layer_via(extract)
3436
};
3537

36-
svc::layer::mk(move |inner| count.layer(body.layer(request.layer(inner))))
38+
let status = {
39+
let extract = ExtractStatusCodeParams::new(status_codes.clone());
40+
NewRecordStatusCode::layer_via(extract)
41+
};
42+
43+
svc::layer::mk(move |inner| count.layer(body.layer(request.layer(status.layer(inner)))))
3744
}
3845

3946
/// An `N`-typed service instrumented with metrics middleware.
40-
type Instrumented<N> = NewCountRequests<NewRecordResponseBodyData<NewRecordRequestBodyData<N>>>;
47+
type Instrumented<N> =
48+
NewCountRequests<NewRecordResponseBodyData<NewRecordRequestBodyData<NewRecordStatusCode<N>>>>;
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
use super::RouteLabels;
2+
use crate::policy::PermitVariant;
3+
use http::StatusCode;
4+
use linkerd_app_core::{
5+
metrics::prom::{
6+
self,
7+
encoding::{EncodeLabel, EncodeLabelSet, LabelSetEncoder},
8+
EncodeLabelSetMut,
9+
},
10+
svc, Error,
11+
};
12+
use linkerd_http_prom::{
13+
status,
14+
stream_label::{
15+
status::{LabelGrpcStatus, LabelHttpStatus, MkLabelGrpcStatus, MkLabelHttpStatus},
16+
MkStreamLabel, StreamLabel,
17+
},
18+
};
19+
20+
pub type NewRecordStatusCode<N> =
21+
status::NewRecordStatusCode<N, ExtractStatusCodeParams, MkLabelStatus, StatusCodeLabels>;
22+
23+
type StatusMetrics = status::StatusMetrics<StatusCodeLabels>;
24+
25+
type Params = status::Params<MkLabelStatus, StatusCodeLabels>;
26+
27+
#[derive(Clone, Debug)]
28+
pub struct StatusCodeFamilies {
29+
grpc: StatusMetrics,
30+
http: StatusMetrics,
31+
}
32+
33+
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
34+
pub struct StatusCodeLabels {
35+
/// Labels from the inbound route authorizing traffic.
36+
route: RouteLabels,
37+
/// A status code.
38+
status: Option<u16>,
39+
}
40+
41+
#[derive(Clone, Debug)]
42+
pub struct ExtractStatusCodeParams(pub StatusCodeFamilies);
43+
44+
pub enum MkLabelStatus {
45+
Grpc {
46+
mk_label_grpc: MkLabelGrpcStatus,
47+
route: RouteLabels,
48+
},
49+
Http {
50+
mk_label_http: MkLabelHttpStatus,
51+
route: RouteLabels,
52+
},
53+
}
54+
55+
pub enum LabelStatus {
56+
Grpc {
57+
label_grpc: LabelGrpcStatus,
58+
route: RouteLabels,
59+
},
60+
Http {
61+
label_http: LabelHttpStatus,
62+
route: RouteLabels,
63+
},
64+
}
65+
66+
// === impl StatusCodeFamilies ===
67+
68+
impl StatusCodeFamilies {
69+
/// Registers a new [`StatusCodeFamilies`] with the given registry.
70+
pub fn register(reg: &mut prom::Registry) -> Self {
71+
let grpc = {
72+
let reg = reg.sub_registry_with_prefix("grpc");
73+
status::StatusMetrics::register(reg, "Completed gRPC responses")
74+
};
75+
76+
let http = {
77+
let reg = reg.sub_registry_with_prefix("http");
78+
status::StatusMetrics::register(reg, "Completed HTTP responses")
79+
};
80+
81+
Self { grpc, http }
82+
}
83+
84+
/// Fetches the proper status code family, given a permitted target.
85+
fn family(&self, variant: PermitVariant) -> &StatusMetrics {
86+
let Self { grpc, http } = self;
87+
match variant {
88+
PermitVariant::Grpc => grpc,
89+
PermitVariant::Http => http,
90+
}
91+
}
92+
}
93+
94+
// === impl ExtractStatusCodeParams ===
95+
96+
impl ExtractStatusCodeParams {
97+
pub fn new(metrics: StatusCodeFamilies) -> Self {
98+
Self(metrics)
99+
}
100+
}
101+
102+
impl<T> svc::ExtractParam<Params, T> for ExtractStatusCodeParams
103+
where
104+
T: svc::Param<PermitVariant> + svc::Param<RouteLabels>,
105+
{
106+
fn extract_param(&self, target: &T) -> Params {
107+
let Self(families) = self;
108+
let route: RouteLabels = target.param();
109+
let variant: PermitVariant = target.param();
110+
111+
let metrics = families.family(variant).clone();
112+
let mk_stream_label = match variant {
113+
PermitVariant::Grpc => {
114+
let mk_label_grpc = MkLabelGrpcStatus;
115+
MkLabelStatus::Grpc {
116+
mk_label_grpc,
117+
route,
118+
}
119+
}
120+
PermitVariant::Http => {
121+
let mk_label_http = MkLabelHttpStatus;
122+
MkLabelStatus::Http {
123+
mk_label_http,
124+
route,
125+
}
126+
}
127+
};
128+
129+
Params {
130+
mk_stream_label,
131+
metrics,
132+
}
133+
}
134+
}
135+
136+
// === impl StatusCodeLabels ===
137+
138+
impl EncodeLabelSetMut for StatusCodeLabels {
139+
fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result {
140+
let Self { route, status } = self;
141+
142+
route.encode_label_set(enc)?;
143+
("status", *status).encode(enc.encode_label())?;
144+
145+
Ok(())
146+
}
147+
}
148+
149+
impl EncodeLabelSet for StatusCodeLabels {
150+
fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result {
151+
self.encode_label_set(&mut enc)
152+
}
153+
}
154+
155+
// === impl MkLabelStatus ===
156+
157+
impl MkStreamLabel for MkLabelStatus {
158+
type StreamLabel = LabelStatus;
159+
160+
type StatusLabels = StatusCodeLabels;
161+
type DurationLabels = ();
162+
163+
fn mk_stream_labeler<B>(&self, req: &http::Request<B>) -> Option<Self::StreamLabel> {
164+
match self {
165+
Self::Grpc {
166+
mk_label_grpc,
167+
route,
168+
} => mk_label_grpc
169+
.mk_stream_labeler(req)
170+
.map(|label_grpc| LabelStatus::Grpc {
171+
label_grpc,
172+
route: route.clone(),
173+
}),
174+
Self::Http {
175+
mk_label_http,
176+
route,
177+
} => mk_label_http
178+
.mk_stream_labeler(req)
179+
.map(|label_http| LabelStatus::Http {
180+
label_http,
181+
route: route.clone(),
182+
}),
183+
}
184+
}
185+
}
186+
187+
// === impl LabelStatus ===
188+
189+
impl StreamLabel for LabelStatus {
190+
type StatusLabels = StatusCodeLabels;
191+
type DurationLabels = ();
192+
193+
fn init_response<B>(&mut self, rsp: &http::Response<B>) {
194+
match self {
195+
Self::Grpc { label_grpc, .. } => label_grpc.init_response(rsp),
196+
Self::Http { label_http, .. } => label_http.init_response(rsp),
197+
}
198+
}
199+
200+
fn end_response(&mut self, rsp: Result<Option<&http::HeaderMap>, &Error>) {
201+
match self {
202+
Self::Grpc { label_grpc, .. } => label_grpc.end_response(rsp),
203+
Self::Http { label_http, .. } => label_http.end_response(rsp),
204+
}
205+
}
206+
207+
fn status_labels(&self) -> Self::StatusLabels {
208+
match self {
209+
Self::Grpc { label_grpc, route } => {
210+
let route = route.clone();
211+
let status = label_grpc.status_labels().map(|code| code as u16);
212+
StatusCodeLabels { route, status }
213+
}
214+
Self::Http { label_http, route } => {
215+
let route = route.clone();
216+
let status = label_http.status_labels().as_ref().map(StatusCode::as_u16);
217+
StatusCodeLabels { route, status }
218+
}
219+
}
220+
}
221+
222+
fn duration_labels(&self) -> Self::DurationLabels {}
223+
}

linkerd/app/inbound/src/metrics.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
pub(crate) mod authz;
1212
pub(crate) mod error;
1313

14-
use crate::http::router::{RequestBodyFamilies, RequestCountFamilies, ResponseBodyFamilies};
14+
use crate::http::router::{
15+
RequestBodyFamilies, RequestCountFamilies, ResponseBodyFamilies, StatusCodeFamilies,
16+
};
1517
pub use linkerd_app_core::metrics::*;
1618

1719
/// Holds LEGACY inbound proxy metrics.
@@ -32,6 +34,7 @@ pub struct InboundMetrics {
3234
pub request_count: RequestCountFamilies,
3335
pub request_body_data: RequestBodyFamilies,
3436
pub response_body_data: ResponseBodyFamilies,
37+
pub status_codes: StatusCodeFamilies,
3538
}
3639

3740
impl InboundMetrics {
@@ -44,6 +47,7 @@ impl InboundMetrics {
4447
let request_count = RequestCountFamilies::register(reg);
4548
let request_body_data = RequestBodyFamilies::register(reg);
4649
let response_body_data = ResponseBodyFamilies::register(reg);
50+
let status_codes = StatusCodeFamilies::register(reg);
4751

4852
Self {
4953
http_authz: authz::HttpAuthzMetrics::default(),
@@ -56,6 +60,7 @@ impl InboundMetrics {
5660
request_count,
5761
request_body_data,
5862
response_body_data,
63+
status_codes,
5964
}
6065
}
6166
}

0 commit comments

Comments
 (0)