Skip to content

Commit e63a896

Browse files
authored
feat(http/body-eos): introduce BodyWithEosFn<B, F> (#4246)
this commit introduces a piece of `http_body::Body` middleware. `BodyWithEosFn<B, F>` wraps an inner `B`-typed body, and invokes an `F`-typed callback when the end of the stream is reached. detecting the end of a request body or response body stream is a tricky, reoccuring problem in different pieces of our `tower` middleware. streams can end with a `None`, a `Some(Ok(_))` trailers frame, a `Some(Err(_))` error, can be dropped prior to completion, or a `Some(Ok(_))` data frame that leaves the body reporting a `true` hint from `Body::is_end_stream()`. this is very easy to do incorrectly. this piece of body middleware abstracts over all of these state transitions, and allows callers to instead provide a callback that will be invoked when the stream finishes. Signed-off-by: katelyn martin <[email protected]>
1 parent d9883e8 commit e63a896

File tree

5 files changed

+535
-0
lines changed

5 files changed

+535
-0
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1710,6 +1710,21 @@ dependencies = [
17101710
"tracing",
17111711
]
17121712

1713+
[[package]]
1714+
name = "linkerd-http-body-eos"
1715+
version = "0.1.0"
1716+
dependencies = [
1717+
"bytes",
1718+
"futures",
1719+
"http",
1720+
"http-body",
1721+
"http-body-util",
1722+
"linkerd-error",
1723+
"linkerd-mock-http-body",
1724+
"pin-project",
1725+
"tokio",
1726+
]
1727+
17131728
[[package]]
17141729
name = "linkerd-http-box"
17151730
version = "0.1.0"

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ members = [
2424
"linkerd/error-respond",
2525
"linkerd/exp-backoff",
2626
"linkerd/http/access-log",
27+
"linkerd/http/body-eos",
2728
"linkerd/http/box",
2829
"linkerd/http/classify",
2930
"linkerd/http/detect",

linkerd/http/body-eos/Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "linkerd-http-body-eos"
3+
version = { workspace = true }
4+
authors = { workspace = true }
5+
license = { workspace = true }
6+
edition = { workspace = true }
7+
publish = { workspace = true }
8+
description = """
9+
Body middleware for processing the ends of streams.
10+
"""
11+
12+
[dependencies]
13+
futures = { version = "0.3", default-features = false }
14+
http = { workspace = true }
15+
http-body = { workspace = true }
16+
pin-project = "1"
17+
18+
linkerd-error = { path = "../../error" }
19+
20+
[dev-dependencies]
21+
bytes = { workspace = true }
22+
http-body-util = { workspace = true }
23+
tokio = { version = "1", features = ["macros", "test-util"] }
24+
25+
linkerd-mock-http-body = { path = "../../mock/http-body" }

linkerd/http/body-eos/src/lib.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
//! [`Body`] middleware that calls a function when the body ends.
2+
3+
use http::HeaderMap;
4+
use http_body::{Body, Frame};
5+
use linkerd_error::Error;
6+
use std::{
7+
pin::Pin,
8+
task::{Context, Poll},
9+
};
10+
11+
#[cfg(test)]
12+
mod tests;
13+
14+
/// A [`Body`] that calls a function when the body ends.
15+
#[pin_project::pin_project(PinnedDrop, project = BodyWithEosFnProj)]
16+
pub struct BodyWithEosFn<B, F>
17+
where
18+
B: Body,
19+
F: FnOnce(EosRef<<B as Body>::Error>),
20+
{
21+
#[pin]
22+
inner: B,
23+
callback: Option<F>,
24+
}
25+
26+
/// A reference to the end of a [`Body`] stream.
27+
pub enum EosRef<'a, E = Error> {
28+
None,
29+
Trailers(&'a HeaderMap),
30+
Error(&'a E),
31+
Cancelled,
32+
}
33+
34+
// === imple BodyWithEosFn ===
35+
36+
impl<B, F> BodyWithEosFn<B, F>
37+
where
38+
B: Body,
39+
F: FnOnce(EosRef<<B as Body>::Error>),
40+
{
41+
/// Returns a new [`BodyWithEosFn<B, F>`].
42+
pub fn new(body: B, f: F) -> Self {
43+
let callback = if body.is_end_stream() {
44+
// If the body is empty, invoke the callback immediately.
45+
f(EosRef::None);
46+
None
47+
} else {
48+
// Otherwise, hold the callback until the end of the stream is reached.
49+
Some(f)
50+
};
51+
52+
Self {
53+
inner: body,
54+
callback,
55+
}
56+
}
57+
58+
/// Returns an [`EosRef<'a, E>`] view of the end-of-stream, if applicable.
59+
fn eos_ref<'frame, T, E>(
60+
frame: &'frame Option<Result<Frame<T>, E>>,
61+
body: &B,
62+
) -> Option<EosRef<'frame, E>> {
63+
let frame = match frame {
64+
Some(Ok(f)) => f,
65+
// Errors indicate the end of the stream.
66+
Some(Err(error)) => return Some(EosRef::Error(error)),
67+
// If nothing was yielded, the end of the stream has been reached.
68+
None => return Some(EosRef::None),
69+
};
70+
71+
if let Some(trls) = frame.trailers_ref() {
72+
// A trailers frame indicates the end of the stream.
73+
Some(EosRef::Trailers(trls))
74+
} else {
75+
// `is_end_stream()` hints that the end of the stream has been reached.
76+
body.is_end_stream().then_some(EosRef::None)
77+
}
78+
}
79+
}
80+
81+
impl<B, F> http_body::Body for BodyWithEosFn<B, F>
82+
where
83+
B: Body,
84+
F: FnOnce(EosRef<<B as Body>::Error>),
85+
{
86+
type Data = <B as Body>::Data;
87+
type Error = <B as Body>::Error;
88+
89+
fn poll_frame(
90+
self: Pin<&mut Self>,
91+
cx: &mut Context<'_>,
92+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
93+
let BodyWithEosFnProj {
94+
mut inner,
95+
callback,
96+
} = self.project();
97+
98+
// Poll the inner body for the next frame.
99+
let poll = inner.as_mut().poll_frame(cx);
100+
let frame = futures::ready!(poll);
101+
102+
// Invoke the callback if we have reached the end of the stream.
103+
if let Some(eos) = Self::eos_ref(&frame, &inner) {
104+
if let Some(callback) = callback.take() {
105+
callback(eos);
106+
}
107+
}
108+
109+
Poll::Ready(frame)
110+
}
111+
112+
fn is_end_stream(&self) -> bool {
113+
let Self { inner: _, callback } = self;
114+
115+
callback.is_none()
116+
}
117+
}
118+
119+
#[pin_project::pinned_drop]
120+
impl<B, F> PinnedDrop for BodyWithEosFn<B, F>
121+
where
122+
B: Body,
123+
F: FnOnce(EosRef<<B as Body>::Error>),
124+
{
125+
fn drop(self: Pin<&mut Self>) {
126+
let BodyWithEosFnProj { inner: _, callback } = self.project();
127+
if let Some(callback) = callback.take() {
128+
// Invoke the callback if the body was dropped before finishing.
129+
callback(EosRef::Cancelled)
130+
};
131+
}
132+
}

0 commit comments

Comments
 (0)