Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ default = ["json"]
json = ["dep:serde", "dep:serde_json"]

[dependencies]
anyhow.workspace = true
async-task.workspace = true
async-trait.workspace = true
bytes.workspace = true
futures-lite.workspace = true
http.workspace = true
http-body.workspace = true
http-body-util.workspace = true
itoa.workspace = true
pin-project-lite.workspace = true
slab.workspace = true
Expand All @@ -32,7 +38,7 @@ serde_json = { workspace = true, optional = true }
[dev-dependencies]
anyhow.workspace = true
clap.workspace = true
futures-lite.workspace = true
http-body-util.workspace = true
futures-concurrency.workspace = true
humantime.workspace = true
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -64,6 +70,8 @@ authors = [
[workspace.dependencies]
anyhow = "1"
async-task = "4.7"
async-trait = "*"
bytes = "1.10.1"
cargo_metadata = "0.22"
clap = { version = "4.5.26", features = ["derive"] }
futures-core = "0.3.19"
Expand All @@ -72,6 +80,8 @@ futures-concurrency = "7.6"
humantime = "2.1.0"
heck = "0.5"
http = "1.1"
http-body = "1.0.1"
http-body-util = "0.1.3"
itoa = "1"
pin-project-lite = "0.2.8"
quote = "1.0"
Expand Down
43 changes: 23 additions & 20 deletions examples/complex_http_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use anyhow::{anyhow, Result};
use clap::{ArgAction, Parser};
use std::str::FromStr;
use wstd::http::{
body::BodyForthcoming, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri,
};
use wstd::http::{Body, BodyExt, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri};
use wstd::io::AsyncWrite;

/// Complex HTTP client
///
Expand Down Expand Up @@ -86,23 +85,29 @@ async fn main() -> Result<()> {
trailers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?);
}

// Send the request.

let request = request.body(BodyForthcoming)?;
let body = if args.body {
Body::from_input_stream(wstd::io::stdin().into_inner()).into_boxed_body()
} else {
Body::empty().into_boxed_body()
};
let t = trailers.clone();
let body = body.with_trailers(async move {
if t.is_empty() {
None
} else {
Some(Ok(t))
}
});
let request = request.body(body)?;

// Send the request.
eprintln!("> {} / {:?}", request.method(), request.version());
for (key, value) in request.headers().iter() {
let value = String::from_utf8_lossy(value.as_bytes());
eprintln!("> {key}: {value}");
}

let (mut outgoing_body, response) = client.start_request(request).await?;

if args.body {
wstd::io::copy(wstd::io::stdin(), &mut outgoing_body).await?;
} else {
wstd::io::copy(wstd::io::empty(), &mut outgoing_body).await?;
}
let response = client.send(request).await?;

if !trailers.is_empty() {
eprintln!("...");
Expand All @@ -112,10 +117,6 @@ async fn main() -> Result<()> {
eprintln!("> {key}: {value}");
}

Client::finish(outgoing_body, Some(trailers))?;

let response = response.await?;

// Print the response.

eprintln!("< {:?} {}", response.version(), response.status());
Expand All @@ -124,10 +125,12 @@ async fn main() -> Result<()> {
eprintln!("< {key}: {value}");
}

let mut body = response.into_body();
wstd::io::copy(&mut body, wstd::io::stdout()).await?;
let body = response.into_body().into_http_body().collect().await?;
let trailers = body.trailers().cloned();
wstd::io::stdout()
.write_all(body.to_bytes().as_ref())
.await?;

let trailers = body.finish().await?;
if let Some(trailers) = trailers {
for (key, value) in trailers.iter() {
let value = String::from_utf8_lossy(value.as_bytes());
Expand Down
45 changes: 19 additions & 26 deletions examples/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use anyhow::{anyhow, Result};
use clap::{ArgAction, Parser};
use wstd::http::{
body::{IncomingBody, StreamedBody},
request::Builder,
Body, Client, Method, Request, Response, Uri,
};
use wstd::http::{Body, BodyExt, Client, Method, Request, Uri};
use wstd::io::AsyncWrite;

/// Simple HTTP client
///
Expand Down Expand Up @@ -75,39 +72,35 @@ async fn main() -> Result<()> {

// Send the request.

async fn send_request<B: Body>(
client: &Client,
request: Builder,
body: B,
) -> Result<Response<IncomingBody>> {
let request = request.body(body)?;
let body = if args.body {
Body::from_input_stream(wstd::io::stdin().into_inner())
} else {
Body::empty()
};

eprintln!("> {} / {:?}", request.method(), request.version());
for (key, value) in request.headers().iter() {
let value = String::from_utf8_lossy(value.as_bytes());
eprintln!("> {key}: {value}");
}
let request = request.body(body)?;

Ok(client.send(request).await?)
eprintln!("> {} / {:?}", request.method(), request.version());
for (key, value) in request.headers().iter() {
let value = String::from_utf8_lossy(value.as_bytes());
eprintln!("> {key}: {value}");
}
let response = if args.body {
send_request(&client, request, StreamedBody::new(wstd::io::stdin())).await
} else {
send_request(&client, request, wstd::io::empty()).await
}?;

// Print the response.
let response = client.send(request).await?;

// Print the response.
eprintln!("< {:?} {}", response.version(), response.status());
for (key, value) in response.headers().iter() {
let value = String::from_utf8_lossy(value.as_bytes());
eprintln!("< {key}: {value}");
}

let mut body = response.into_body();
wstd::io::copy(&mut body, wstd::io::stdout()).await?;
let body = response.into_body().into_http_body().collect().await?;
let trailers = body.trailers().cloned();
wstd::io::stdout()
.write_all(body.to_bytes().as_ref())
.await?;

let trailers = body.finish().await?;
if let Some(trailers) = trailers {
for (key, value) in trailers.iter() {
let value = String::from_utf8_lossy(value.as_bytes());
Expand Down
146 changes: 88 additions & 58 deletions examples/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
use wstd::http::body::{BodyForthcoming, IncomingBody, OutgoingBody};
use wstd::http::server::{Finished, Responder};
use wstd::http::{IntoBody, Request, Response, StatusCode};
use wstd::io::{copy, empty, AsyncWrite};
use anyhow::{Context, Result};
use futures_lite::stream::once_future;
use http_body_util::{BodyExt, StreamBody};
use wstd::http::body::{Body, Bytes, Frame, Incoming};
use wstd::http::{Error, HeaderMap, Request, Response, StatusCode};
use wstd::time::{Duration, Instant};

#[wstd::http_server]
async fn main(request: Request<IncomingBody>, responder: Responder) -> Finished {
match request.uri().path_and_query().unwrap().as_str() {
"/wait" => http_wait(request, responder).await,
"/echo" => http_echo(request, responder).await,
"/echo-headers" => http_echo_headers(request, responder).await,
"/echo-trailers" => http_echo_trailers(request, responder).await,
"/fail" => http_fail(request, responder).await,
"/bigfail" => http_bigfail(request, responder).await,
"/" => http_home(request, responder).await,
_ => http_not_found(request, responder).await,
async fn main(request: Request<Incoming>) -> Result<Response<Body>, Error> {
let path = request.uri().path_and_query().unwrap().as_str();
println!("serving {path}");
match path {
"/" => http_home(request).await,
"/wait-response" => http_wait_response(request).await,
"/wait-body" => http_wait_body(request).await,
"/echo" => http_echo(request).await,
"/echo-headers" => http_echo_headers(request).await,
"/echo-trailers" => http_echo_trailers(request).await,
"/response-status" => http_response_status(request).await,
"/response-fail" => http_response_fail(request).await,
"/response-body-fail" => http_body_fail(request).await,
_ => http_not_found(request).await,
}
}

async fn http_home(_request: Request<IncomingBody>, responder: Responder) -> Finished {
async fn http_home(_request: Request<Incoming>) -> Result<Response<Body>> {
// To send a single string as the response body, use `Responder::respond`.
responder
.respond(Response::new("Hello, wasi:http/proxy world!\n".into_body()))
.await
Ok(Response::new(
"Hello, wasi:http/proxy world!\n".to_owned().into(),
))
}

async fn http_wait(_request: Request<IncomingBody>, responder: Responder) -> Finished {
async fn http_wait_response(_request: Request<Incoming>) -> Result<Response<Body>> {
// Get the time now
let now = Instant::now();

Expand All @@ -35,60 +40,85 @@ async fn http_wait(_request: Request<IncomingBody>, responder: Responder) -> Fin
// Compute how long we slept for.
let elapsed = Instant::now().duration_since(now).as_millis();

// To stream data to the response body, use `Responder::start_response`.
let mut body = responder.start_response(Response::new(BodyForthcoming));
let result = body
.write_all(format!("slept for {elapsed} millis\n").as_bytes())
.await;
Finished::finish(body, result, None)
Ok(Response::new(
format!("slept for {elapsed} millis\n").into(),
))
}

async fn http_echo(mut request: Request<IncomingBody>, responder: Responder) -> Finished {
// Stream data from the request body to the response body.
let mut body = responder.start_response(Response::new(BodyForthcoming));
let result = copy(request.body_mut(), &mut body).await;
Finished::finish(body, result, None)
}
async fn http_wait_body(_request: Request<Incoming>) -> Result<Response<Body>> {
// Get the time now
let now = Instant::now();

async fn http_fail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
let body = responder.start_response(Response::new(BodyForthcoming));
Finished::fail(body)
}
let body = StreamBody::new(once_future(async move {
// Sleep for one second.
wstd::task::sleep(Duration::from_secs(1)).await;

async fn http_bigfail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
async fn write_body(body: &mut OutgoingBody) -> wstd::io::Result<()> {
for _ in 0..0x10 {
body.write_all("big big big big\n".as_bytes()).await?;
}
body.flush().await?;
Ok(())
}
// Compute how long we slept for.
let elapsed = Instant::now().duration_since(now).as_millis();
anyhow::Ok(Frame::data(Bytes::from(format!(
"slept for {elapsed} millis\n"
))))
}));

Ok(Response::new(body.into()))
}

let mut body = responder.start_response(Response::new(BodyForthcoming));
let _ = write_body(&mut body).await;
Finished::fail(body)
async fn http_echo(request: Request<Incoming>) -> Result<Response<Body>> {
let (_parts, body) = request.into_parts();
Ok(Response::new(body.into()))
}

async fn http_echo_headers(request: Request<IncomingBody>, responder: Responder) -> Finished {
async fn http_echo_headers(request: Request<Incoming>) -> Result<Response<Body>> {
let mut response = Response::builder();
*response.headers_mut().unwrap() = request.into_parts().0.headers;
let response = response.body(empty()).unwrap();
responder.respond(response).await
Ok(response.body("".to_owned().into())?)
}

async fn http_echo_trailers(request: Request<IncomingBody>, responder: Responder) -> Finished {
let body = responder.start_response(Response::new(BodyForthcoming));
let (trailers, result) = match request.into_body().finish().await {
Ok(trailers) => (trailers, Ok(())),
Err(err) => (Default::default(), Err(std::io::Error::other(err))),
async fn http_echo_trailers(request: Request<Incoming>) -> Result<Response<Body>> {
let collected = request.into_body().into_http_body().collect().await?;
let trailers = collected.trailers().cloned().unwrap_or_else(|| {
let mut trailers = HeaderMap::new();
trailers.insert("x-no-trailers", "1".parse().unwrap());
trailers
});

let body = StreamBody::new(once_future(async move {
anyhow::Ok(Frame::<Bytes>::trailers(trailers))
}));
Ok(Response::new(body.into()))
}

async fn http_response_status(request: Request<Incoming>) -> Result<Response<Body>> {
let status = if let Some(header_val) = request.headers().get("x-response-status") {
header_val
.to_str()
.context("contents of x-response-status")?
.parse::<u16>()
.context("u16 value from x-response-status")?
} else {
500
};
Finished::finish(body, result, trailers)
Ok(Response::builder()
.status(status)
.body(String::new().into())?)
}

async fn http_response_fail(_request: Request<Incoming>) -> Result<Response<Body>> {
Err(anyhow::anyhow!("error creating response"))
}

async fn http_body_fail(_request: Request<Incoming>) -> Result<Response<Body>> {
let body = StreamBody::new(once_future(async move {
Err::<Frame<Bytes>, _>(anyhow::anyhow!("error creating body"))
}));

Ok(Response::new(body.into()))
}

async fn http_not_found(_request: Request<IncomingBody>, responder: Responder) -> Finished {
async fn http_not_found(_request: Request<Incoming>) -> Result<Response<Body>> {
let response = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(empty())
.body(Body::empty())
.unwrap();
responder.respond(response).await
Ok(response)
}
Loading
Loading