Skip to content

Commit c1451ba

Browse files
committed
implement bodies using http_body::Body
1 parent 42b1426 commit c1451ba

31 files changed

+940
-1029
lines changed

Cargo.toml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@ default = ["json"]
1717
json = ["dep:serde", "dep:serde_json"]
1818

1919
[dependencies]
20+
anyhow.workspace = true
2021
async-task.workspace = true
21-
futures-core.workspace = true
22+
async-trait.workspace = true
23+
bytes.workspace = true
24+
futures-lite.workspace = true
25+
http-body-util.workspace = true
26+
http-body.workspace = true
2227
http.workspace = true
2328
itoa.workspace = true
2429
pin-project-lite.workspace = true
@@ -33,7 +38,7 @@ serde_json = { workspace = true, optional = true }
3338
[dev-dependencies]
3439
anyhow.workspace = true
3540
clap.workspace = true
36-
futures-lite.workspace = true
41+
http-body-util.workspace = true
3742
futures-concurrency.workspace = true
3843
humantime.workspace = true
3944
serde = { workspace = true, features = ["derive"] }
@@ -65,6 +70,8 @@ authors = [
6570
[workspace.dependencies]
6671
anyhow = "1"
6772
async-task = "4.7"
73+
async-trait = "*"
74+
bytes = "1.10.1"
6875
cargo_metadata = "0.22"
6976
clap = { version = "4.5.26", features = ["derive"] }
7077
futures-core = "0.3.19"
@@ -73,6 +80,8 @@ futures-concurrency = "7.6"
7380
humantime = "2.1.0"
7481
heck = "0.5"
7582
http = "1.1"
83+
http-body = "1.0.1"
84+
http-body-util = "0.1.3"
7685
itoa = "1"
7786
pin-project-lite = "0.2.8"
7887
quote = "1.0"

examples/complex_http_client.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use anyhow::{anyhow, Result};
22
use clap::{ArgAction, Parser};
33
use std::str::FromStr;
4-
use wstd::http::{
5-
body::BodyForthcoming, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri,
6-
};
4+
use wstd::http::{Body, BodyExt, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri};
5+
use wstd::io::AsyncWrite;
76

87
/// Complex HTTP client
98
///
@@ -86,23 +85,29 @@ async fn main() -> Result<()> {
8685
trailers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?);
8786
}
8887

89-
// Send the request.
90-
91-
let request = request.body(BodyForthcoming)?;
88+
let body = if args.body {
89+
Body::from_input_stream(wstd::io::stdin().into_inner()).into_boxed_body()
90+
} else {
91+
Body::empty().into_boxed_body()
92+
};
93+
let t = trailers.clone();
94+
let body = body.with_trailers(async move {
95+
if t.is_empty() {
96+
None
97+
} else {
98+
Some(Ok(t))
99+
}
100+
});
101+
let request = request.body(body)?;
92102

103+
// Send the request.
93104
eprintln!("> {} / {:?}", request.method(), request.version());
94105
for (key, value) in request.headers().iter() {
95106
let value = String::from_utf8_lossy(value.as_bytes());
96107
eprintln!("> {key}: {value}");
97108
}
98109

99-
let (mut outgoing_body, response) = client.start_request(request).await?;
100-
101-
if args.body {
102-
wstd::io::copy(wstd::io::stdin(), &mut outgoing_body).await?;
103-
} else {
104-
wstd::io::copy(wstd::io::empty(), &mut outgoing_body).await?;
105-
}
110+
let response = client.send(request).await?;
106111

107112
if !trailers.is_empty() {
108113
eprintln!("...");
@@ -112,10 +117,6 @@ async fn main() -> Result<()> {
112117
eprintln!("> {key}: {value}");
113118
}
114119

115-
Client::finish(outgoing_body, Some(trailers))?;
116-
117-
let response = response.await?;
118-
119120
// Print the response.
120121

121122
eprintln!("< {:?} {}", response.version(), response.status());
@@ -124,10 +125,12 @@ async fn main() -> Result<()> {
124125
eprintln!("< {key}: {value}");
125126
}
126127

127-
let mut body = response.into_body();
128-
wstd::io::copy(&mut body, wstd::io::stdout()).await?;
128+
let body = response.into_body().into_http_body().collect().await?;
129+
let trailers = body.trailers().cloned();
130+
wstd::io::stdout()
131+
.write_all(body.to_bytes().as_ref())
132+
.await?;
129133

130-
let trailers = body.finish().await?;
131134
if let Some(trailers) = trailers {
132135
for (key, value) in trailers.iter() {
133136
let value = String::from_utf8_lossy(value.as_bytes());

examples/http_client.rs

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use anyhow::{anyhow, Result};
22
use clap::{ArgAction, Parser};
3-
use wstd::http::{
4-
body::{IncomingBody, StreamedBody},
5-
request::Builder,
6-
Body, Client, Method, Request, Response, Uri,
7-
};
3+
use wstd::http::{Body, BodyExt, Client, Method, Request, Uri};
4+
use wstd::io::AsyncWrite;
85

96
/// Simple HTTP client
107
///
@@ -75,39 +72,35 @@ async fn main() -> Result<()> {
7572

7673
// Send the request.
7774

78-
async fn send_request<B: Body>(
79-
client: &Client,
80-
request: Builder,
81-
body: B,
82-
) -> Result<Response<IncomingBody>> {
83-
let request = request.body(body)?;
75+
let body = if args.body {
76+
Body::from_input_stream(wstd::io::stdin().into_inner())
77+
} else {
78+
Body::empty()
79+
};
8480

85-
eprintln!("> {} / {:?}", request.method(), request.version());
86-
for (key, value) in request.headers().iter() {
87-
let value = String::from_utf8_lossy(value.as_bytes());
88-
eprintln!("> {key}: {value}");
89-
}
81+
let request = request.body(body)?;
9082

91-
Ok(client.send(request).await?)
83+
eprintln!("> {} / {:?}", request.method(), request.version());
84+
for (key, value) in request.headers().iter() {
85+
let value = String::from_utf8_lossy(value.as_bytes());
86+
eprintln!("> {key}: {value}");
9287
}
93-
let response = if args.body {
94-
send_request(&client, request, StreamedBody::new(wstd::io::stdin())).await
95-
} else {
96-
send_request(&client, request, wstd::io::empty()).await
97-
}?;
9888

99-
// Print the response.
89+
let response = client.send(request).await?;
10090

91+
// Print the response.
10192
eprintln!("< {:?} {}", response.version(), response.status());
10293
for (key, value) in response.headers().iter() {
10394
let value = String::from_utf8_lossy(value.as_bytes());
10495
eprintln!("< {key}: {value}");
10596
}
10697

107-
let mut body = response.into_body();
108-
wstd::io::copy(&mut body, wstd::io::stdout()).await?;
98+
let body = response.into_body().into_http_body().collect().await?;
99+
let trailers = body.trailers().cloned();
100+
wstd::io::stdout()
101+
.write_all(body.to_bytes().as_ref())
102+
.await?;
109103

110-
let trailers = body.finish().await?;
111104
if let Some(trailers) = trailers {
112105
for (key, value) in trailers.iter() {
113106
let value = String::from_utf8_lossy(value.as_bytes());

examples/http_server.rs

Lines changed: 88 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,36 @@
1-
use wstd::http::body::{BodyForthcoming, IncomingBody, OutgoingBody};
2-
use wstd::http::server::{Finished, Responder};
3-
use wstd::http::{IntoBody, Request, Response, StatusCode};
4-
use wstd::io::{copy, empty, AsyncWrite};
1+
use anyhow::{Context, Result};
2+
use futures_lite::stream::once_future;
3+
use http_body_util::{BodyExt, StreamBody};
4+
use wstd::http::body::{Body, Bytes, Frame, Incoming};
5+
use wstd::http::{Error, HeaderMap, Request, Response, StatusCode};
56
use wstd::time::{Duration, Instant};
67

78
#[wstd::http_server]
8-
async fn main(request: Request<IncomingBody>, responder: Responder) -> Finished {
9-
match request.uri().path_and_query().unwrap().as_str() {
10-
"/wait" => http_wait(request, responder).await,
11-
"/echo" => http_echo(request, responder).await,
12-
"/echo-headers" => http_echo_headers(request, responder).await,
13-
"/echo-trailers" => http_echo_trailers(request, responder).await,
14-
"/fail" => http_fail(request, responder).await,
15-
"/bigfail" => http_bigfail(request, responder).await,
16-
"/" => http_home(request, responder).await,
17-
_ => http_not_found(request, responder).await,
9+
async fn main(request: Request<Incoming>) -> Result<Response<Body>, Error> {
10+
let path = request.uri().path_and_query().unwrap().as_str();
11+
println!("serving {path}");
12+
match path {
13+
"/" => http_home(request).await,
14+
"/wait-response" => http_wait_response(request).await,
15+
"/wait-body" => http_wait_body(request).await,
16+
"/echo" => http_echo(request).await,
17+
"/echo-headers" => http_echo_headers(request).await,
18+
"/echo-trailers" => http_echo_trailers(request).await,
19+
"/response-status" => http_response_status(request).await,
20+
"/response-fail" => http_response_fail(request).await,
21+
"/response-body-fail" => http_body_fail(request).await,
22+
_ => http_not_found(request).await,
1823
}
1924
}
2025

21-
async fn http_home(_request: Request<IncomingBody>, responder: Responder) -> Finished {
26+
async fn http_home(_request: Request<Incoming>) -> Result<Response<Body>> {
2227
// To send a single string as the response body, use `Responder::respond`.
23-
responder
24-
.respond(Response::new("Hello, wasi:http/proxy world!\n".into_body()))
25-
.await
28+
Ok(Response::new(
29+
"Hello, wasi:http/proxy world!\n".to_owned().into(),
30+
))
2631
}
2732

28-
async fn http_wait(_request: Request<IncomingBody>, responder: Responder) -> Finished {
33+
async fn http_wait_response(_request: Request<Incoming>) -> Result<Response<Body>> {
2934
// Get the time now
3035
let now = Instant::now();
3136

@@ -35,60 +40,85 @@ async fn http_wait(_request: Request<IncomingBody>, responder: Responder) -> Fin
3540
// Compute how long we slept for.
3641
let elapsed = Instant::now().duration_since(now).as_millis();
3742

38-
// To stream data to the response body, use `Responder::start_response`.
39-
let mut body = responder.start_response(Response::new(BodyForthcoming));
40-
let result = body
41-
.write_all(format!("slept for {elapsed} millis\n").as_bytes())
42-
.await;
43-
Finished::finish(body, result, None)
43+
Ok(Response::new(
44+
format!("slept for {elapsed} millis\n").into(),
45+
))
4446
}
4547

46-
async fn http_echo(mut request: Request<IncomingBody>, responder: Responder) -> Finished {
47-
// Stream data from the request body to the response body.
48-
let mut body = responder.start_response(Response::new(BodyForthcoming));
49-
let result = copy(request.body_mut(), &mut body).await;
50-
Finished::finish(body, result, None)
51-
}
48+
async fn http_wait_body(_request: Request<Incoming>) -> Result<Response<Body>> {
49+
// Get the time now
50+
let now = Instant::now();
5251

53-
async fn http_fail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
54-
let body = responder.start_response(Response::new(BodyForthcoming));
55-
Finished::fail(body)
56-
}
52+
let body = StreamBody::new(once_future(async move {
53+
// Sleep for one second.
54+
wstd::task::sleep(Duration::from_secs(1)).await;
5755

58-
async fn http_bigfail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
59-
async fn write_body(body: &mut OutgoingBody) -> wstd::io::Result<()> {
60-
for _ in 0..0x10 {
61-
body.write_all("big big big big\n".as_bytes()).await?;
62-
}
63-
body.flush().await?;
64-
Ok(())
65-
}
56+
// Compute how long we slept for.
57+
let elapsed = Instant::now().duration_since(now).as_millis();
58+
anyhow::Ok(Frame::data(Bytes::from(format!(
59+
"slept for {elapsed} millis\n"
60+
))))
61+
}));
62+
63+
Ok(Response::new(body.into()))
64+
}
6665

67-
let mut body = responder.start_response(Response::new(BodyForthcoming));
68-
let _ = write_body(&mut body).await;
69-
Finished::fail(body)
66+
async fn http_echo(request: Request<Incoming>) -> Result<Response<Body>> {
67+
let (_parts, body) = request.into_parts();
68+
Ok(Response::new(body.into()))
7069
}
7170

72-
async fn http_echo_headers(request: Request<IncomingBody>, responder: Responder) -> Finished {
71+
async fn http_echo_headers(request: Request<Incoming>) -> Result<Response<Body>> {
7372
let mut response = Response::builder();
7473
*response.headers_mut().unwrap() = request.into_parts().0.headers;
75-
let response = response.body(empty()).unwrap();
76-
responder.respond(response).await
74+
Ok(response.body("".to_owned().into())?)
7775
}
7876

79-
async fn http_echo_trailers(request: Request<IncomingBody>, responder: Responder) -> Finished {
80-
let body = responder.start_response(Response::new(BodyForthcoming));
81-
let (trailers, result) = match request.into_body().finish().await {
82-
Ok(trailers) => (trailers, Ok(())),
83-
Err(err) => (Default::default(), Err(std::io::Error::other(err))),
77+
async fn http_echo_trailers(request: Request<Incoming>) -> Result<Response<Body>> {
78+
let collected = request.into_body().into_http_body().collect().await?;
79+
let trailers = collected.trailers().cloned().unwrap_or_else(|| {
80+
let mut trailers = HeaderMap::new();
81+
trailers.insert("x-no-trailers", "1".parse().unwrap());
82+
trailers
83+
});
84+
85+
let body = StreamBody::new(once_future(async move {
86+
anyhow::Ok(Frame::<Bytes>::trailers(trailers))
87+
}));
88+
Ok(Response::new(body.into()))
89+
}
90+
91+
async fn http_response_status(request: Request<Incoming>) -> Result<Response<Body>> {
92+
let status = if let Some(header_val) = request.headers().get("x-response-status") {
93+
header_val
94+
.to_str()
95+
.context("contents of x-response-status")?
96+
.parse::<u16>()
97+
.context("u16 value from x-response-status")?
98+
} else {
99+
500
84100
};
85-
Finished::finish(body, result, trailers)
101+
Ok(Response::builder()
102+
.status(status)
103+
.body(String::new().into())?)
104+
}
105+
106+
async fn http_response_fail(_request: Request<Incoming>) -> Result<Response<Body>> {
107+
Err(anyhow::anyhow!("error creating response"))
108+
}
109+
110+
async fn http_body_fail(_request: Request<Incoming>) -> Result<Response<Body>> {
111+
let body = StreamBody::new(once_future(async move {
112+
Err::<Frame<Bytes>, _>(anyhow::anyhow!("error creating body"))
113+
}));
114+
115+
Ok(Response::new(body.into()))
86116
}
87117

88-
async fn http_not_found(_request: Request<IncomingBody>, responder: Responder) -> Finished {
118+
async fn http_not_found(_request: Request<Incoming>) -> Result<Response<Body>> {
89119
let response = Response::builder()
90120
.status(StatusCode::NOT_FOUND)
91-
.body(empty())
121+
.body(Body::empty())
92122
.unwrap();
93-
responder.respond(response).await
123+
Ok(response)
94124
}

0 commit comments

Comments
 (0)