diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs index e0e90dcd06..a5837e1fd1 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs @@ -10,7 +10,7 @@ use linkerd_http_prom::{ stream_label::{ error::LabelError, status::{LabelGrpcStatus, LabelHttpStatus}, - LabelSet, StreamLabel, + EosRef, LabelSet, StreamLabel, }, }; @@ -337,7 +337,7 @@ where error.init_response(rsp); } - fn end_response(&mut self, res: Result, &linkerd_app_core::Error>) { + fn end_response(&mut self, res: EosRef<'_>) { let Self { parent: _, status, @@ -395,7 +395,7 @@ where error.init_response(rsp); } - fn end_response(&mut self, res: Result, &linkerd_app_core::Error>) { + fn end_response(&mut self, res: EosRef<'_>) { let Self { parent: _, status, diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs index 27e8eb8282..48f85bc805 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs @@ -2,6 +2,7 @@ use linkerd_app_core::{ dns, errors, metrics::prom::EncodeLabelSetMut, proxy::http, Error as BoxError, }; +use linkerd_http_prom::stream_label::EosRef; use prometheus_client::encoding::*; use crate::{BackendRef, ParentRef, RouteRef}; @@ -204,8 +205,8 @@ impl EncodeLabelSet for HttpRsp { } } -impl From<&linkerd_app_core::Error> for HttpRsp { - fn from(error: &linkerd_app_core::Error) -> Self { +impl From> for HttpRsp { + fn from(error: EosRef<'_>) -> Self { match Error::new_or_status(error) { Ok(error) => Self::error(error), Err(code) => http::StatusCode::from_u16(code) diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs index 1dd76f5002..2865125329 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -687,13 +687,29 @@ async fn http_response_body_drop_early() { }; // The counters are not incremented yet. - assert_eq!(ok.get(), 0); - assert_eq!(err.get(), 0); + assert_eq!( + ok.get(), + 0, + "ok counter is not incremented before body is dropped" + ); + assert_eq!( + err.get(), + 0, + "err counter is not incremented before body is dropped" + ); // The body reports an error if it was not completed. drop(body); - assert_eq!(ok.get(), 0); - assert_eq!(err.get(), 1); + assert_eq!( + ok.get(), + 0, + "ok counter is not incremented after body is dropped" + ); + assert_eq!( + err.get(), + 1, + "error counter is incremented after body is dropped" + ); } #[tokio::test(flavor = "current_thread", start_paused = true)] diff --git a/linkerd/http/body-eos/src/lib.rs b/linkerd/http/body-eos/src/lib.rs index 91ce0b33bf..0990ac2a86 100644 --- a/linkerd/http/body-eos/src/lib.rs +++ b/linkerd/http/body-eos/src/lib.rs @@ -130,3 +130,16 @@ where }; } } + +// === impl EosRef === + +/// A reference to the end of a stream can be cheaply cloned. +/// +/// Each of the variants are empty tags, or immutable references. +impl<'a, E> Clone for EosRef<'a, E> { + fn clone(&self) -> Self { + *self + } +} + +impl<'a, E> Copy for EosRef<'a, E> {} diff --git a/linkerd/http/prom/src/record_response.rs b/linkerd/http/prom/src/record_response.rs index 9376f8f524..f867f4dd1f 100644 --- a/linkerd/http/prom/src/record_response.rs +++ b/linkerd/http/prom/src/record_response.rs @@ -1,6 +1,6 @@ use crate::stream_label::{LabelSet, MkStreamLabel, StreamLabel}; -use http_body::Body; use linkerd_error::Error; +use linkerd_http_body_eos::{BodyWithEosFn, EosRef}; use linkerd_http_box::BoxBody; use linkerd_stack as svc; use prometheus_client::metrics::{ @@ -66,18 +66,6 @@ where state: Option>, } -/// Notifies the response labeler when the response body is flushed. -#[pin_project::pin_project(PinnedDrop)] -struct ResponseBody -where - L: StreamLabel, - L::DurationLabels: LabelSet, -{ - #[pin] - inner: BoxBody, - state: Option>, -} - struct ResponseState { labeler: L, duration: DurationFamily, @@ -178,85 +166,25 @@ where labeler.init_response(&rsp); } - let (head, inner) = rsp.into_parts(); - if inner.is_end_stream() { - end_stream(&mut state, Ok(None)); - } - Poll::Ready(Ok(http::Response::from_parts( - head, - BoxBody::new(ResponseBody { inner, state }), - ))) + let on_eos = move |eos: EosRef<'_, _>| end_stream(state, eos); + let rsp = rsp + .map(|body| BodyWithEosFn::new(body, on_eos)) + .map(BoxBody::new); + + Poll::Ready(Ok(rsp)) } Err(error) => { - end_stream(&mut state, Err(&error)); + end_stream(state, EosRef::Error(&error)); Poll::Ready(Err(error)) } } } } -// === impl ResponseBody === - -impl http_body::Body for ResponseBody +fn end_stream(mut state: Option>, res: EosRef<'_>) where L: StreamLabel, L::DurationLabels: LabelSet, -{ - type Data = ::Data; - type Error = Error; - - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - let mut this = self.project(); - - // Poll the inner body for the next frame. - let poll = this.inner.as_mut().poll_frame(cx); - let frame = futures::ready!(poll); - - match &frame { - Some(Ok(frame)) => { - if let trls @ Some(_) = frame.trailers_ref() { - end_stream(this.state, Ok(trls)); - } else if this.inner.is_end_stream() { - end_stream(this.state, Ok(None)); - } - } - Some(Err(error)) => end_stream(this.state, Err(error)), - None => end_stream(this.state, Ok(None)), - } - - Poll::Ready(frame) - } - - fn is_end_stream(&self) -> bool { - // If the inner response state is still in place, the end of the stream has not been - // classified and recorded yet. - self.state.is_none() - } -} - -#[pin_project::pinned_drop] -impl PinnedDrop for ResponseBody -where - L: StreamLabel, - L::DurationLabels: LabelSet, -{ - fn drop(self: Pin<&mut Self>) { - let this = self.project(); - if this.state.is_some() { - end_stream(this.state, Err(&RequestCancelled.into())); - } - } -} - -fn end_stream( - state: &mut Option>, - res: Result, &Error>, -) where - L: StreamLabel, - L::DurationLabels: LabelSet, { let Some(ResponseState { duration, diff --git a/linkerd/http/prom/src/record_response/response.rs b/linkerd/http/prom/src/record_response/response.rs index 3b544073ed..6fd6d4ef4d 100644 --- a/linkerd/http/prom/src/record_response/response.rs +++ b/linkerd/http/prom/src/record_response/response.rs @@ -1,12 +1,11 @@ use super::{DurationFamily, MkDurationHistogram}; use crate::stream_label::{LabelSet, MkStreamLabel}; -use http_body::Frame; use linkerd_error::Error; +use linkerd_http_body_eos::{BodyWithEosFn, EosRef}; use linkerd_http_box::BoxBody; use linkerd_stack as svc; use prometheus_client::registry::{Registry, Unit}; use std::{ - pin::Pin, sync::Arc, task::{Context, Poll}, }; @@ -23,14 +22,6 @@ pub type NewResponseDuration = pub type RecordResponseDuration = super::RecordResponse::DurationLabels>, S>; -/// Notifies the response body when the request body is flushed. -#[pin_project::pin_project(PinnedDrop)] -struct RequestBody { - #[pin] - inner: B, - flushed: Option>, -} - // === impl ResponseMetrics === impl ResponseMetrics { @@ -84,12 +75,12 @@ where // the respond flushes. let state = if let Some(labeler) = self.labeler.mk_stream_labeler(&req) { let (tx, start) = oneshot::channel(); - req = req.map(|inner| { - BoxBody::new(RequestBody { - inner, - flushed: Some(tx), - }) - }); + let on_eos = move |_: EosRef<'_>| { + tx.send(time::Instant::now()).ok(); + }; + req = req + .map(|inner| BodyWithEosFn::new(inner, on_eos)) + .map(BoxBody::new); let ResponseMetrics { duration } = self.metric.clone(); Some(super::ResponseState { labeler, @@ -104,41 +95,3 @@ where super::ResponseFuture { state, inner } } } - -// === impl ResponseBody === - -impl http_body::Body for RequestBody -where - B: http_body::Body, -{ - type Data = B::Data; - type Error = B::Error; - - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, B::Error>>> { - let mut this = self.project(); - let res = futures::ready!(this.inner.as_mut().poll_frame(cx)); - if (*this.inner).is_end_stream() { - if let Some(tx) = this.flushed.take() { - let _ = tx.send(time::Instant::now()); - } - } - Poll::Ready(res) - } - - fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() - } -} - -#[pin_project::pinned_drop] -impl PinnedDrop for RequestBody { - fn drop(self: Pin<&mut Self>) { - let this = self.project(); - if let Some(tx) = this.flushed.take() { - let _ = tx.send(time::Instant::now()); - } - } -} diff --git a/linkerd/http/prom/src/status.rs b/linkerd/http/prom/src/status.rs index 9b32b9ba00..62cb8fcb07 100644 --- a/linkerd/http/prom/src/status.rs +++ b/linkerd/http/prom/src/status.rs @@ -1,9 +1,6 @@ //! A tower middleware for counting response status codes. -use crate::{ - record_response::RequestCancelled, - stream_label::{LabelSet, MkStreamLabel, StreamLabel}, -}; +use crate::stream_label::{LabelSet, MkStreamLabel, StreamLabel}; use http::{Request, Response}; use http_body::Body; use linkerd_error::Error; @@ -258,16 +255,7 @@ where L: Clone + Hash + Eq + Send + Sync + 'static, { fn on_eos(eos: EosRef<'_, Error>, mut stream_label: SL, metrics: StatusMetrics) { - // TODO(kate): a static cancellation error. see linkerd/linkerd2-proxy#4306. - let cancelled = RequestCancelled.into(); - - stream_label.end_response(match eos { - EosRef::None => Ok(None), - EosRef::Trailers(trls) => Ok(Some(trls)), - EosRef::Error(error) => Err(error), - EosRef::Cancelled => Err(&cancelled), - }); - + stream_label.end_response(eos); let labels = stream_label.status_labels(); let counter = metrics.metric(&labels); counter.inc(); diff --git a/linkerd/http/prom/src/status/tests.rs b/linkerd/http/prom/src/status/tests.rs index c0daabd548..4cc7f54bda 100644 --- a/linkerd/http/prom/src/status/tests.rs +++ b/linkerd/http/prom/src/status/tests.rs @@ -484,7 +484,7 @@ mod util { http.init_response(rsp); } - fn end_response(&mut self, trailers: Result, &Error>) { + fn end_response(&mut self, trailers: EosRef<'_>) { let Self { grpc, http } = self; grpc.end_response(trailers); http.end_response(trailers); diff --git a/linkerd/http/prom/src/stream_label.rs b/linkerd/http/prom/src/stream_label.rs index 69a3bd9e9e..a018d14ad9 100644 --- a/linkerd/http/prom/src/stream_label.rs +++ b/linkerd/http/prom/src/stream_label.rs @@ -1,12 +1,13 @@ //! Stream labeling facilities. -use linkerd_error::Error; use prometheus_client::encoding::EncodeLabelSet; pub mod error; pub mod status; pub mod with; +pub use linkerd_http_body_eos::EosRef; + /// A strategy for labeling request/responses streams for status and duration /// metrics. /// @@ -30,7 +31,7 @@ pub trait StreamLabel: Send + 'static { type StatusLabels; fn init_response(&mut self, rsp: &http::Response); - fn end_response(&mut self, trailers: Result, &Error>); + fn end_response(&mut self, trailers: EosRef<'_>); fn status_labels(&self) -> Self::StatusLabels; fn duration_labels(&self) -> Self::DurationLabels; diff --git a/linkerd/http/prom/src/stream_label/error.rs b/linkerd/http/prom/src/stream_label/error.rs index 14e060e3c0..48229a0f83 100644 --- a/linkerd/http/prom/src/stream_label/error.rs +++ b/linkerd/http/prom/src/stream_label/error.rs @@ -1,7 +1,7 @@ //! [`StreamLabel`] implementation for labeling errors. use super::StreamLabel; -use linkerd_error::Error; +use linkerd_http_body_eos::EosRef; /// A [`StreamLabel`] implementation that maps boxed errors to labels. #[derive(Clone, Debug, Default)] @@ -13,7 +13,7 @@ pub struct LabelError { impl StreamLabel for LabelError where - E: for<'a> From<&'a Error>, + E: for<'a> From>, E: Clone + Send + 'static, { type DurationLabels = (); @@ -21,9 +21,9 @@ where fn init_response(&mut self, _: &http::Response) {} - fn end_response(&mut self, res: Result, &Error>) { - let Err(err) = res else { return }; - let labels = E::from(err); + fn end_response(&mut self, eos: EosRef<'_>) { + // XXX(kate): this also needs to account for cancelled streams! + let labels = E::from(eos); self.error = Some(labels); } diff --git a/linkerd/http/prom/src/stream_label/status.rs b/linkerd/http/prom/src/stream_label/status.rs index 3d2976421f..9a86c39330 100644 --- a/linkerd/http/prom/src/stream_label/status.rs +++ b/linkerd/http/prom/src/stream_label/status.rs @@ -4,7 +4,7 @@ use crate::stream_label::{MkStreamLabel, StreamLabel}; use http::{HeaderMap, HeaderValue, Response, StatusCode}; -use linkerd_error::Error; +use linkerd_http_body_eos::EosRef; use tonic::Code; /// A [`MkStreamLabel`] implementation for gRPC traffic. @@ -58,8 +58,10 @@ impl StreamLabel for LabelGrpcStatus { self.code = Self::get_grpc_status(headers); } - fn end_response(&mut self, trailers: Result, &Error>) { - let Ok(Some(trailers)) = trailers else { return }; + fn end_response(&mut self, trailers: EosRef<'_>) { + let EosRef::Trailers(trailers) = trailers else { + return; + }; self.code = Self::get_grpc_status(trailers); } @@ -101,7 +103,7 @@ impl StreamLabel for LabelHttpStatus { self.status = Some(rsp.status()); } - fn end_response(&mut self, _: Result, &linkerd_error::Error>) {} + fn end_response(&mut self, _: EosRef<'_>) {} fn status_labels(&self) -> Self::StatusLabels { self.status diff --git a/linkerd/http/prom/src/stream_label/with.rs b/linkerd/http/prom/src/stream_label/with.rs index 2e38e65d11..f96f4b7df3 100644 --- a/linkerd/http/prom/src/stream_label/with.rs +++ b/linkerd/http/prom/src/stream_label/with.rs @@ -1,6 +1,7 @@ //! [`StreamLabel`] implementation for labels known in advance. use super::{MkStreamLabel, StreamLabel}; +use linkerd_http_body_eos::EosRef; /// A [`MkStreamLabel`] implementation for `L`-typed labels. /// @@ -45,7 +46,7 @@ where type StatusLabels = L; fn init_response(&mut self, _: &http::Response) {} - fn end_response(&mut self, _: Result, &linkerd_error::Error>) {} + fn end_response(&mut self, _: EosRef<'_>) {} fn status_labels(&self) -> Self::StatusLabels { self.labels.clone()