Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
stream_label::{
error::LabelError,
status::{LabelGrpcStatus, LabelHttpStatus},
LabelSet, StreamLabel,
EosRef, LabelSet, StreamLabel,
},
};

Expand Down Expand Up @@ -337,7 +337,7 @@
error.init_response(rsp);
}

fn end_response(&mut self, res: Result<Option<&http::HeaderMap>, &linkerd_app_core::Error>) {
fn end_response(&mut self, res: EosRef<'_>) {
let Self {
parent: _,
status,
Expand Down Expand Up @@ -392,17 +392,17 @@
error,
} = self;
status.init_response(rsp);
error.init_response(rsp);

Check failure on line 395 in linkerd/app/outbound/src/http/logical/policy/route/metrics.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0599]: the method `init_response` exists for mutable reference `&mut LabelError<GrpcRsp>`, but its trait bounds were not satisfied --> linkerd/app/outbound/src/http/logical/policy/route/metrics.rs:395:15 | 395 | error.init_response(rsp); | ^^^^^^^^^^^^^ method cannot be called on `&mut LabelError<GrpcRsp>` due to unsatisfied trait bounds | ::: linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:38:1 | 38 | pub struct GrpcRsp { | ------------------ doesn't satisfy `_: From<EosRef<'a>>` | ::: /__w/linkerd2-proxy/linkerd2-proxy/linkerd/http/prom/src/stream_label/error.rs:8:1 | 8 | pub struct LabelError<E> { | ------------------------ doesn't satisfy `_: StreamLabel` | = note: the following trait bounds were not satisfied: `http::logical::policy::route::metrics::labels::GrpcRsp: std::convert::From<linkerd_http_prom::stream_label::EosRef<'a>>` which is required by `linkerd_http_prom::stream_label::error::LabelError<http::logical::policy::route::metrics::labels::GrpcRsp>: linkerd_http_prom::stream_label::StreamLabel`
}

fn end_response(&mut self, res: Result<Option<&http::HeaderMap>, &linkerd_app_core::Error>) {
fn end_response(&mut self, res: EosRef<'_>) {
let Self {
parent: _,
status,
error,
} = self;
status.end_response(res);
error.end_response(res);

Check failure on line 405 in linkerd/app/outbound/src/http/logical/policy/route/metrics.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0599]: the method `end_response` exists for mutable reference `&mut LabelError<GrpcRsp>`, but its trait bounds were not satisfied --> linkerd/app/outbound/src/http/logical/policy/route/metrics.rs:405:15 | 405 | error.end_response(res); | ^^^^^^^^^^^^ method cannot be called on `&mut LabelError<GrpcRsp>` due to unsatisfied trait bounds | ::: linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:38:1 | 38 | pub struct GrpcRsp { | ------------------ doesn't satisfy `_: From<EosRef<'a>>` | ::: /__w/linkerd2-proxy/linkerd2-proxy/linkerd/http/prom/src/stream_label/error.rs:8:1 | 8 | pub struct LabelError<E> { | ------------------------ doesn't satisfy `_: StreamLabel` | = note: the following trait bounds were not satisfied: `http::logical::policy::route::metrics::labels::GrpcRsp: std::convert::From<linkerd_http_prom::stream_label::EosRef<'a>>` which is required by `linkerd_http_prom::stream_label::error::LabelError<http::logical::policy::route::metrics::labels::GrpcRsp>: linkerd_http_prom::stream_label::StreamLabel`
}

fn status_labels(&self) -> Self::StatusLabels {
Expand All @@ -412,7 +412,7 @@
error,
} = self;

let error = error.status_labels();

Check failure on line 415 in linkerd/app/outbound/src/http/logical/policy/route/metrics.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0599]: the method `status_labels` exists for reference `&LabelError<GrpcRsp>`, but its trait bounds were not satisfied --> linkerd/app/outbound/src/http/logical/policy/route/metrics.rs:415:27 | 415 | let error = error.status_labels(); | ^^^^^^^^^^^^^ method cannot be called on `&LabelError<GrpcRsp>` due to unsatisfied trait bounds | ::: linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:38:1 | 38 | pub struct GrpcRsp { | ------------------ doesn't satisfy `_: From<EosRef<'a>>` | ::: /__w/linkerd2-proxy/linkerd2-proxy/linkerd/http/prom/src/stream_label/error.rs:8:1 | 8 | pub struct LabelError<E> { | ------------------------ doesn't satisfy `_: StreamLabel` | = note: the following trait bounds were not satisfied: `http::logical::policy::route::metrics::labels::GrpcRsp: std::convert::From<linkerd_http_prom::stream_label::EosRef<'a>>` which is required by `linkerd_http_prom::stream_label::error::LabelError<http::logical::policy::route::metrics::labels::GrpcRsp>: linkerd_http_prom::stream_label::StreamLabel`
let status = status.status_labels().map(labels::GrpcRsp::status);
let rsp = labels::GrpcRsp::default().apply(status).apply(error);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use linkerd_app_core::{
dns, errors, metrics::prom::EncodeLabelSetMut, proxy::http, Error as BoxError,
};
use linkerd_http_prom::stream_label::EosRef;
use prometheus_client::encoding::*;

use crate::{BackendRef, ParentRef, RouteRef};
Expand Down Expand Up @@ -34,7 +35,7 @@
}

#[derive(Clone, Debug, Default, Hash, PartialEq, Eq)]
pub struct GrpcRsp {

Check failure on line 38 in linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0599]: the method `status_labels` exists for reference `&LabelError<GrpcRsp>`, but its trait bounds were not satisfied --> linkerd/app/outbound/src/http/logical/policy/route/metrics.rs:415:27 | 415 | let error = error.status_labels(); | ^^^^^^^^^^^^^ method cannot be called on `&LabelError<GrpcRsp>` due to unsatisfied trait bounds | ::: linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:38:1 | 38 | pub struct GrpcRsp { | ------------------ doesn't satisfy `_: From<EosRef<'a>>` | ::: /__w/linkerd2-proxy/linkerd2-proxy/linkerd/http/prom/src/stream_label/error.rs:8:1 | 8 | pub struct LabelError<E> { | ------------------------ doesn't satisfy `_: StreamLabel` | = note: the following trait bounds were not satisfied: `http::logical::policy::route::metrics::labels::GrpcRsp: std::convert::From<linkerd_http_prom::stream_label::EosRef<'a>>` which is required by `linkerd_http_prom::stream_label::error::LabelError<http::logical::policy::route::metrics::labels::GrpcRsp>: linkerd_http_prom::stream_label::StreamLabel`

Check failure on line 38 in linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0599]: the method `end_response` exists for mutable reference `&mut LabelError<GrpcRsp>`, but its trait bounds were not satisfied --> linkerd/app/outbound/src/http/logical/policy/route/metrics.rs:405:15 | 405 | error.end_response(res); | ^^^^^^^^^^^^ method cannot be called on `&mut LabelError<GrpcRsp>` due to unsatisfied trait bounds | ::: linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:38:1 | 38 | pub struct GrpcRsp { | ------------------ doesn't satisfy `_: From<EosRef<'a>>` | ::: /__w/linkerd2-proxy/linkerd2-proxy/linkerd/http/prom/src/stream_label/error.rs:8:1 | 8 | pub struct LabelError<E> { | ------------------------ doesn't satisfy `_: StreamLabel` | = note: the following trait bounds were not satisfied: `http::logical::policy::route::metrics::labels::GrpcRsp: std::convert::From<linkerd_http_prom::stream_label::EosRef<'a>>` which is required by `linkerd_http_prom::stream_label::error::LabelError<http::logical::policy::route::metrics::labels::GrpcRsp>: linkerd_http_prom::stream_label::StreamLabel`

Check failure on line 38 in linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0599]: the method `init_response` exists for mutable reference `&mut LabelError<GrpcRsp>`, but its trait bounds were not satisfied --> linkerd/app/outbound/src/http/logical/policy/route/metrics.rs:395:15 | 395 | error.init_response(rsp); | ^^^^^^^^^^^^^ method cannot be called on `&mut LabelError<GrpcRsp>` due to unsatisfied trait bounds | ::: linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:38:1 | 38 | pub struct GrpcRsp { | ------------------ doesn't satisfy `_: From<EosRef<'a>>` | ::: /__w/linkerd2-proxy/linkerd2-proxy/linkerd/http/prom/src/stream_label/error.rs:8:1 | 8 | pub struct LabelError<E> { | ------------------------ doesn't satisfy `_: StreamLabel` | = note: the following trait bounds were not satisfied: `http::logical::policy::route::metrics::labels::GrpcRsp: std::convert::From<linkerd_http_prom::stream_label::EosRef<'a>>` which is required by `linkerd_http_prom::stream_label::error::LabelError<http::logical::policy::route::metrics::labels::GrpcRsp>: linkerd_http_prom::stream_label::StreamLabel`
pub status: Option<tonic::Code>,
pub error: Option<Error>,
}
Expand Down Expand Up @@ -204,9 +205,9 @@
}
}

impl From<&linkerd_app_core::Error> for HttpRsp {
fn from(error: &linkerd_app_core::Error) -> Self {
impl From<EosRef<'_>> for HttpRsp {
fn from(error: EosRef<'_>) -> Self {
match Error::new_or_status(error) {

Check failure on line 210 in linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0308]: mismatched types --> linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:210:36 | 210 | match Error::new_or_status(error) { | -------------------- ^^^^^ expected `&Box<dyn Error + Send + Sync>`, found `EosRef<'_, Box<dyn Error + Send + Sync>>` | | | arguments to this function are incorrect | = note: expected reference `&std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>` found enum `linkerd_http_prom::stream_label::EosRef<'_>` note: associated function defined here --> linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:314:12 | 314 | pub fn new_or_status(error: &BoxError) -> Result<Self, u16> { | ^^^^^^^^^^^^^ ----------------

Check failure on line 210 in linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs

View workflow job for this annotation

GitHub Actions / rust

error[E0308]: mismatched types --> linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:210:36 | 210 | match Error::new_or_status(error) { | -------------------- ^^^^^ expected `&Box<dyn Error + Send + Sync>`, found `EosRef<'_, Box<dyn Error + Send + Sync>>` | | | arguments to this function are incorrect | = note: expected reference `&std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>` found enum `linkerd_http_prom::stream_label::EosRef<'_>` note: associated function defined here --> linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs:314:12 | 314 | pub fn new_or_status(error: &BoxError) -> Result<Self, u16> { | ^^^^^^^^^^^^^ ----------------
Ok(error) => Self::error(error),
Err(code) => http::StatusCode::from_u16(code)
.map(Self::status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,29 @@ async fn http_response_body_drop_early() {
};

// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
assert_eq!(
ok.get(),
0,
"ok counter is not incremented before body is dropped"
);
assert_eq!(
err.get(),
0,
"err counter is not incremented before body is dropped"
);

// The body reports an error if it was not completed.
drop(body);
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 1);
assert_eq!(
ok.get(),
0,
"ok counter is not incremented after body is dropped"
);
assert_eq!(
err.get(),
1,
"error counter is incremented after body is dropped"
);
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
Expand Down
13 changes: 13 additions & 0 deletions linkerd/http/body-eos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,16 @@ where
};
}
}

// === impl EosRef ===

/// A reference to the end of a stream can be cheaply cloned.
///
/// Each of the variants are empty tags, or immutable references.
impl<'a, E> Clone for EosRef<'a, E> {
fn clone(&self) -> Self {
*self
}
}

impl<'a, E> Copy for EosRef<'a, E> {}
90 changes: 9 additions & 81 deletions linkerd/http/prom/src/record_response.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::stream_label::{LabelSet, MkStreamLabel, StreamLabel};
use http_body::Body;
use linkerd_error::Error;
use linkerd_http_body_eos::{BodyWithEosFn, EosRef};
use linkerd_http_box::BoxBody;
use linkerd_stack as svc;
use prometheus_client::metrics::{
Expand Down Expand Up @@ -66,18 +66,6 @@ where
state: Option<ResponseState<L>>,
}

/// Notifies the response labeler when the response body is flushed.
#[pin_project::pin_project(PinnedDrop)]
struct ResponseBody<L>
where
L: StreamLabel,
L::DurationLabels: LabelSet,
{
#[pin]
inner: BoxBody,
state: Option<ResponseState<L>>,
}

struct ResponseState<L: StreamLabel> {
labeler: L,
duration: DurationFamily<L::DurationLabels>,
Expand Down Expand Up @@ -178,85 +166,25 @@ where
labeler.init_response(&rsp);
}

let (head, inner) = rsp.into_parts();
if inner.is_end_stream() {
end_stream(&mut state, Ok(None));
}
Poll::Ready(Ok(http::Response::from_parts(
head,
BoxBody::new(ResponseBody { inner, state }),
)))
let on_eos = move |eos: EosRef<'_, _>| end_stream(state, eos);
let rsp = rsp
.map(|body| BodyWithEosFn::new(body, on_eos))
.map(BoxBody::new);

Poll::Ready(Ok(rsp))
}
Err(error) => {
end_stream(&mut state, Err(&error));
end_stream(state, EosRef::Error(&error));
Poll::Ready(Err(error))
}
}
}
}

// === impl ResponseBody ===

impl<L> http_body::Body for ResponseBody<L>
fn end_stream<L>(mut state: Option<ResponseState<L>>, res: EosRef<'_>)
where
L: StreamLabel,
L::DurationLabels: LabelSet,
{
type Data = <BoxBody as http_body::Body>::Data;
type Error = Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
let mut this = self.project();

// Poll the inner body for the next frame.
let poll = this.inner.as_mut().poll_frame(cx);
let frame = futures::ready!(poll);

match &frame {
Some(Ok(frame)) => {
if let trls @ Some(_) = frame.trailers_ref() {
end_stream(this.state, Ok(trls));
} else if this.inner.is_end_stream() {
end_stream(this.state, Ok(None));
}
}
Some(Err(error)) => end_stream(this.state, Err(error)),
None => end_stream(this.state, Ok(None)),
}

Poll::Ready(frame)
}

fn is_end_stream(&self) -> bool {
// If the inner response state is still in place, the end of the stream has not been
// classified and recorded yet.
self.state.is_none()
}
}

#[pin_project::pinned_drop]
impl<L> PinnedDrop for ResponseBody<L>
where
L: StreamLabel,
L::DurationLabels: LabelSet,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if this.state.is_some() {
end_stream(this.state, Err(&RequestCancelled.into()));
}
}
}

fn end_stream<L>(
state: &mut Option<ResponseState<L>>,
res: Result<Option<&http::HeaderMap>, &Error>,
) where
L: StreamLabel,
L::DurationLabels: LabelSet,
{
let Some(ResponseState {
duration,
Expand Down
61 changes: 7 additions & 54 deletions linkerd/http/prom/src/record_response/response.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::{DurationFamily, MkDurationHistogram};
use crate::stream_label::{LabelSet, MkStreamLabel};
use http_body::Frame;
use linkerd_error::Error;
use linkerd_http_body_eos::{BodyWithEosFn, EosRef};
use linkerd_http_box::BoxBody;
use linkerd_stack as svc;
use prometheus_client::registry::{Registry, Unit};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
Expand All @@ -23,14 +22,6 @@ pub type NewResponseDuration<L, X, N> =
pub type RecordResponseDuration<L, S> =
super::RecordResponse<L, ResponseMetrics<<L as MkStreamLabel>::DurationLabels>, S>;

/// Notifies the response body when the request body is flushed.
#[pin_project::pin_project(PinnedDrop)]
struct RequestBody<B> {
#[pin]
inner: B,
flushed: Option<oneshot::Sender<time::Instant>>,
}

// === impl ResponseMetrics ===

impl<L: LabelSet> ResponseMetrics<L> {
Expand Down Expand Up @@ -84,12 +75,12 @@ where
// the respond flushes.
let state = if let Some(labeler) = self.labeler.mk_stream_labeler(&req) {
let (tx, start) = oneshot::channel();
req = req.map(|inner| {
BoxBody::new(RequestBody {
inner,
flushed: Some(tx),
})
});
let on_eos = move |_: EosRef<'_>| {
tx.send(time::Instant::now()).ok();
};
req = req
.map(|inner| BodyWithEosFn::new(inner, on_eos))
.map(BoxBody::new);
let ResponseMetrics { duration } = self.metric.clone();
Some(super::ResponseState {
labeler,
Expand All @@ -104,41 +95,3 @@ where
super::ResponseFuture { state, inner }
}
}

// === impl ResponseBody ===

impl<B> http_body::Body for RequestBody<B>
where
B: http_body::Body,
{
type Data = B::Data;
type Error = B::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, B::Error>>> {
let mut this = self.project();
let res = futures::ready!(this.inner.as_mut().poll_frame(cx));
if (*this.inner).is_end_stream() {
if let Some(tx) = this.flushed.take() {
let _ = tx.send(time::Instant::now());
}
}
Poll::Ready(res)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
}

#[pin_project::pinned_drop]
impl<B> PinnedDrop for RequestBody<B> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if let Some(tx) = this.flushed.take() {
let _ = tx.send(time::Instant::now());
}
}
}
16 changes: 2 additions & 14 deletions linkerd/http/prom/src/status.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
//! A tower middleware for counting response status codes.

use crate::{
record_response::RequestCancelled,
stream_label::{LabelSet, MkStreamLabel, StreamLabel},
};
use crate::stream_label::{LabelSet, MkStreamLabel, StreamLabel};
use http::{Request, Response};
use http_body::Body;
use linkerd_error::Error;
Expand Down Expand Up @@ -258,16 +255,7 @@ where
L: Clone + Hash + Eq + Send + Sync + 'static,
{
fn on_eos(eos: EosRef<'_, Error>, mut stream_label: SL, metrics: StatusMetrics<L>) {
// TODO(kate): a static cancellation error. see linkerd/linkerd2-proxy#4306.
let cancelled = RequestCancelled.into();

stream_label.end_response(match eos {
EosRef::None => Ok(None),
EosRef::Trailers(trls) => Ok(Some(trls)),
EosRef::Error(error) => Err(error),
EosRef::Cancelled => Err(&cancelled),
});

stream_label.end_response(eos);
let labels = stream_label.status_labels();
let counter = metrics.metric(&labels);
counter.inc();
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http/prom/src/status/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ mod util {
http.init_response(rsp);
}

fn end_response(&mut self, trailers: Result<Option<&http::HeaderMap>, &Error>) {
fn end_response(&mut self, trailers: EosRef<'_>) {
let Self { grpc, http } = self;
grpc.end_response(trailers);
http.end_response(trailers);
Expand Down
5 changes: 3 additions & 2 deletions linkerd/http/prom/src/stream_label.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Stream labeling facilities.

use linkerd_error::Error;
use prometheus_client::encoding::EncodeLabelSet;

pub mod error;
pub mod status;
pub mod with;

pub use linkerd_http_body_eos::EosRef;

/// A strategy for labeling request/responses streams for status and duration
/// metrics.
///
Expand All @@ -30,7 +31,7 @@ pub trait StreamLabel: Send + 'static {
type StatusLabels;

fn init_response<B>(&mut self, rsp: &http::Response<B>);
fn end_response(&mut self, trailers: Result<Option<&http::HeaderMap>, &Error>);
fn end_response(&mut self, trailers: EosRef<'_>);

fn status_labels(&self) -> Self::StatusLabels;
fn duration_labels(&self) -> Self::DurationLabels;
Expand Down
Loading
Loading