Skip to content

Commit 2871de4

Browse files
committed
refactor(http/prom): use BodyWithEosFn in duration middleware
Signed-off-by: katelyn martin <[email protected]>
1 parent d250f76 commit 2871de4

File tree

1 file changed

+9
-80
lines changed

1 file changed

+9
-80
lines changed

linkerd/http/prom/src/record_response.rs

Lines changed: 9 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::stream_label::{LabelSet, MkStreamLabel, StreamLabel};
2-
use http_body::Body;
32
use linkerd_error::Error;
4-
use linkerd_http_body_eos::EosRef;
3+
use linkerd_http_body_eos::{BodyWithEosFn, EosRef};
54
use linkerd_http_box::BoxBody;
65
use linkerd_stack as svc;
76
use prometheus_client::metrics::{
@@ -67,18 +66,6 @@ where
6766
state: Option<ResponseState<L>>,
6867
}
6968

70-
/// Notifies the response labeler when the response body is flushed.
71-
#[pin_project::pin_project(PinnedDrop)]
72-
struct ResponseBody<L>
73-
where
74-
L: StreamLabel,
75-
L::DurationLabels: LabelSet,
76-
{
77-
#[pin]
78-
inner: BoxBody,
79-
state: Option<ResponseState<L>>,
80-
}
81-
8269
struct ResponseState<L: StreamLabel> {
8370
labeler: L,
8471
duration: DurationFamily<L::DurationLabels>,
@@ -179,80 +166,22 @@ where
179166
labeler.init_response(&rsp);
180167
}
181168

182-
let (head, inner) = rsp.into_parts();
183-
if inner.is_end_stream() {
184-
end_stream(&mut state, EosRef::None);
185-
}
186-
Poll::Ready(Ok(http::Response::from_parts(
187-
head,
188-
BoxBody::new(ResponseBody { inner, state }),
189-
)))
169+
let on_eos = move |eos: EosRef<'_, _>| end_stream(state, eos);
170+
let rsp = rsp
171+
.map(|body| BodyWithEosFn::new(body, on_eos))
172+
.map(BoxBody::new);
173+
174+
Poll::Ready(Ok(rsp))
190175
}
191176
Err(error) => {
192-
end_stream(&mut state, EosRef::Error(&error));
177+
end_stream(state, EosRef::Error(&error));
193178
Poll::Ready(Err(error))
194179
}
195180
}
196181
}
197182
}
198183

199-
// === impl ResponseBody ===
200-
201-
impl<L> http_body::Body for ResponseBody<L>
202-
where
203-
L: StreamLabel,
204-
L::DurationLabels: LabelSet,
205-
{
206-
type Data = <BoxBody as http_body::Body>::Data;
207-
type Error = Error;
208-
209-
fn poll_frame(
210-
self: Pin<&mut Self>,
211-
cx: &mut Context<'_>,
212-
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
213-
let mut this = self.project();
214-
215-
// Poll the inner body for the next frame.
216-
let poll = this.inner.as_mut().poll_frame(cx);
217-
let frame = futures::ready!(poll);
218-
219-
match &frame {
220-
Some(Ok(frame)) => {
221-
if let Some(trls) = frame.trailers_ref() {
222-
end_stream(this.state, EosRef::Trailers(trls));
223-
} else if this.inner.is_end_stream() {
224-
end_stream(this.state, EosRef::None);
225-
}
226-
}
227-
Some(Err(error)) => end_stream(this.state, EosRef::Error(error)),
228-
None => end_stream(this.state, EosRef::None),
229-
}
230-
231-
Poll::Ready(frame)
232-
}
233-
234-
fn is_end_stream(&self) -> bool {
235-
// If the inner response state is still in place, the end of the stream has not been
236-
// classified and recorded yet.
237-
self.state.is_none()
238-
}
239-
}
240-
241-
#[pin_project::pinned_drop]
242-
impl<L> PinnedDrop for ResponseBody<L>
243-
where
244-
L: StreamLabel,
245-
L::DurationLabels: LabelSet,
246-
{
247-
fn drop(self: Pin<&mut Self>) {
248-
let this = self.project();
249-
if this.state.is_some() {
250-
end_stream(this.state, EosRef::Cancelled);
251-
}
252-
}
253-
}
254-
255-
fn end_stream<L>(state: &mut Option<ResponseState<L>>, res: EosRef<'_>)
184+
fn end_stream<L>(mut state: Option<ResponseState<L>>, res: EosRef<'_>)
256185
where
257186
L: StreamLabel,
258187
L::DurationLabels: LabelSet,

0 commit comments

Comments
 (0)