Skip to content

Commit 8250372

Browse files
committed
refactor(http/prom): BodyWithEosFn records flush timestamp
Signed-off-by: katelyn martin <[email protected]>
1 parent 2871de4 commit 8250372

File tree

1 file changed

+7
-54
lines changed

1 file changed

+7
-54
lines changed

linkerd/http/prom/src/record_response/response.rs

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use super::{DurationFamily, MkDurationHistogram};
22
use crate::stream_label::{LabelSet, MkStreamLabel};
3-
use http_body::Frame;
43
use linkerd_error::Error;
4+
use linkerd_http_body_eos::{BodyWithEosFn, EosRef};
55
use linkerd_http_box::BoxBody;
66
use linkerd_stack as svc;
77
use prometheus_client::registry::{Registry, Unit};
88
use std::{
9-
pin::Pin,
109
sync::Arc,
1110
task::{Context, Poll},
1211
};
@@ -23,14 +22,6 @@ pub type NewResponseDuration<L, X, N> =
2322
pub type RecordResponseDuration<L, S> =
2423
super::RecordResponse<L, ResponseMetrics<<L as MkStreamLabel>::DurationLabels>, S>;
2524

26-
/// Notifies the response body when the request body is flushed.
27-
#[pin_project::pin_project(PinnedDrop)]
28-
struct RequestBody<B> {
29-
#[pin]
30-
inner: B,
31-
flushed: Option<oneshot::Sender<time::Instant>>,
32-
}
33-
3425
// === impl ResponseMetrics ===
3526

3627
impl<L: LabelSet> ResponseMetrics<L> {
@@ -84,12 +75,12 @@ where
8475
// the respond flushes.
8576
let state = if let Some(labeler) = self.labeler.mk_stream_labeler(&req) {
8677
let (tx, start) = oneshot::channel();
87-
req = req.map(|inner| {
88-
BoxBody::new(RequestBody {
89-
inner,
90-
flushed: Some(tx),
91-
})
92-
});
78+
let on_eos = move |_: EosRef<'_>| {
79+
tx.send(time::Instant::now()).ok();
80+
};
81+
req = req
82+
.map(|inner| BodyWithEosFn::new(inner, on_eos))
83+
.map(BoxBody::new);
9384
let ResponseMetrics { duration } = self.metric.clone();
9485
Some(super::ResponseState {
9586
labeler,
@@ -104,41 +95,3 @@ where
10495
super::ResponseFuture { state, inner }
10596
}
10697
}
107-
108-
// === impl ResponseBody ===
109-
110-
impl<B> http_body::Body for RequestBody<B>
111-
where
112-
B: http_body::Body,
113-
{
114-
type Data = B::Data;
115-
type Error = B::Error;
116-
117-
fn poll_frame(
118-
self: Pin<&mut Self>,
119-
cx: &mut Context<'_>,
120-
) -> Poll<Option<Result<Frame<Self::Data>, B::Error>>> {
121-
let mut this = self.project();
122-
let res = futures::ready!(this.inner.as_mut().poll_frame(cx));
123-
if (*this.inner).is_end_stream() {
124-
if let Some(tx) = this.flushed.take() {
125-
let _ = tx.send(time::Instant::now());
126-
}
127-
}
128-
Poll::Ready(res)
129-
}
130-
131-
fn is_end_stream(&self) -> bool {
132-
self.inner.is_end_stream()
133-
}
134-
}
135-
136-
#[pin_project::pinned_drop]
137-
impl<B> PinnedDrop for RequestBody<B> {
138-
fn drop(self: Pin<&mut Self>) {
139-
let this = self.project();
140-
if let Some(tx) = this.flushed.take() {
141-
let _ = tx.send(time::Instant::now());
142-
}
143-
}
144-
}

0 commit comments

Comments
 (0)