Skip to content

Commit 85c2ab7

Browse files
committed
http server: more complex streaming example
1 parent 5d82cd8 commit 85c2ab7

File tree

3 files changed

+64
-3
lines changed

3 files changed

+64
-3
lines changed

examples/http_server.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::{Context, Result};
2-
use futures_lite::stream::once_future;
2+
use futures_lite::stream::{once_future, unfold};
33
use http_body_util::{BodyExt, StreamBody};
44
use std::convert::Infallible;
55
use wstd::http::body::{Body, Bytes, Frame};
@@ -14,6 +14,7 @@ async fn main(request: Request<Body>) -> Result<Response<Body>, Error> {
1414
"/" => http_home(request).await,
1515
"/wait-response" => http_wait_response(request).await,
1616
"/wait-body" => http_wait_body(request).await,
17+
"/stream-body" => http_stream_body(request).await,
1718
"/echo" => http_echo(request).await,
1819
"/echo-headers" => http_echo_headers(request).await,
1920
"/echo-trailers" => http_echo_trailers(request).await,
@@ -62,6 +63,30 @@ async fn http_wait_body(_request: Request<Body>) -> Result<Response<Body>> {
6263
Ok(Response::new(Body::from_try_stream(once_future(body))))
6364
}
6465

66+
async fn http_stream_body(_request: Request<Body>) -> Result<Response<Body>> {
67+
// Get the time now
68+
let start = Instant::now();
69+
70+
let body = move |iters: usize| async move {
71+
if iters == 0 {
72+
return None;
73+
}
74+
// Sleep for 0.1 second.
75+
wstd::task::sleep(Duration::from_millis(100)).await;
76+
77+
// Compute how long we slept for.
78+
let elapsed = Instant::now().duration_since(start).as_millis();
79+
Some((
80+
Ok::<_, Infallible>(Bytes::from(format!(
81+
"stream started {elapsed} millis ago\n"
82+
))),
83+
iters - 1,
84+
))
85+
};
86+
87+
Ok(Response::new(Body::from_try_stream(unfold(5, body))))
88+
}
89+
6590
async fn http_echo(request: Request<Body>) -> Result<Response<Body>> {
6691
let (_parts, body) = request.into_parts();
6792
Ok(Response::new(body))

src/http/body.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,19 @@ pub mod util {
4040
/// `Stream` of `Into<Bytes>`
4141
///
4242
/// Consume this HTTP body using:
43-
///
44-
///
43+
/// * `Body::into_boxed_body` converts it to an `UnsyncBoxBody<Bytes, Error>`.
44+
/// This is a boxed representation of `http_body::Body` that is `Send` but not
45+
/// ` Sync`. The Unsync variant is required for compatibility with the `axum`
46+
/// crate.
47+
/// * `async fn Body::contents(&mut self) -> Result<&[u8], Error>` is ready
48+
/// when all contents of the body have been collected, and gives them as a
49+
/// byte slice.
50+
/// * `async fn Body::str_contents(&mut self) -> Result<&str, Error>` is ready
51+
/// when all contents of the body have been collected, and gives them as a str
52+
/// slice.
53+
/// * `async fn Body::json(&mut self) -> Result<T, Error>` gathers body
54+
/// contents and then uses `T: serde::Deserialize` to deserialize to json
55+
/// (requires feature `json`).
4556
#[derive(Debug)]
4657
pub struct Body(BodyInner);
4758

test-programs/artifacts/tests/http_server.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,31 @@ fn http_server() -> Result<()> {
8181
);
8282
assert!(duration >= Duration::from_secs(1));
8383

84+
// TEST /stream-body http_stream_body
85+
// Sends response status and headers, then unfolds 5 iterations of a
86+
// stream that sleeps for 100ms and then prints the time since stream
87+
// started.
88+
// With ureq we can't tell that the response status and headers were sent
89+
// with a delay in the body. Additionally, the implementation MAY buffer up the
90+
// entire response and body before sending it, though wasmtime does not.
91+
let start = Instant::now();
92+
let body: String = ureq::get("http://127.0.0.1:8081/stream-body")
93+
.call()?
94+
.into_string()?;
95+
let duration = start.elapsed();
96+
assert_eq!(body.lines().count(), 5, "body has 5 lines");
97+
for (iter, line) in body.lines().enumerate() {
98+
let sleep_report = line
99+
.split(' ')
100+
.find_map(|s| s.parse::<usize>().ok())
101+
.expect("body should print 'stream started Nxx millis ago'");
102+
assert!(
103+
sleep_report >= (iter * 100),
104+
"should have slept for {iter} * 100 or more millis, got {sleep_report}"
105+
);
106+
}
107+
assert!(duration >= Duration::from_millis(500));
108+
84109
// TEST /echo htto_echo
85110
// Send a request body, see that we got the same back in response body.
86111
const MESSAGE: &[u8] = b"hello, echoserver!\n";

0 commit comments

Comments
 (0)