Skip to content

Commit 46abc32

Browse files
authored
Merge pull request #5 from yoshuawuyts/client
Beginnings of Client
2 parents 0464e54 + 287faa8 commit 46abc32

File tree

3 files changed

+181
-16
lines changed

3 files changed

+181
-16
lines changed

examples/client.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use async_h1::{client, Body};
2+
use async_std::{io, net, task};
3+
4+
fn main() -> Result<(), async_h1::Exception> {
5+
task::block_on(async {
6+
let stream = net::TcpStream::connect("127.0.0.1:8080").await?;
7+
let (reader, writer) = &mut (&stream, &stream);
8+
let body = Body::from_string("hello chashu".to_owned());
9+
let mut req = client::encode(http::Request::new(body)).await?;
10+
io::copy(&mut req, writer).await?;
11+
let res = client::decode(reader).await?;
12+
println!("Response {:?}", res);
13+
Ok(())
14+
})
15+
}

src/body.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ pub struct Body<R: AsyncRead> {
1313

1414
impl<R: AsyncRead> fmt::Debug for Body<R> {
1515
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16-
f.debug_struct("Body").finish()
16+
f.debug_struct("Body")
17+
.field("length", &self.length)
18+
.finish()
1719
}
1820
}
1921

src/client.rs

Lines changed: 163 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,172 @@
11
//! Process HTTP connections on the client.
22
3-
// use async_std::io::{self, BufRead, BufReader, Read};
4-
// use async_std::task::{Context, Poll};
5-
// use futures_io::AsyncRead;
6-
// use http::{Request, Response};
3+
use async_std::io::{self, BufReader};
4+
use async_std::task::{Context, Poll};
5+
use async_std::prelude::*;
6+
use futures_io::AsyncRead;
7+
use futures_core::ready;
8+
use http::{Request, Response, Version};
79

8-
// use std::pin::Pin;
10+
use std::pin::Pin;
911

10-
// use crate::{Body, Exception};
12+
use crate::{Body, Exception, MAX_HEADERS};
1113

1214
/// An HTTP encoder.
1315
#[derive(Debug)]
14-
pub struct Encoder;
16+
pub struct Encoder<R: AsyncRead> {
17+
/// Keep track how far we've indexed into the headers + body.
18+
cursor: usize,
19+
/// HTTP headers to be sent.
20+
headers: Vec<u8>,
21+
/// Check whether we're done sending headers.
22+
headers_done: bool,
23+
/// HTTP body to be sent.
24+
body: Body<R>,
25+
/// Check whether we're done with the body.
26+
body_done: bool,
27+
/// Keep track of how many bytes have been read from the body stream.
28+
body_bytes_read: usize,
29+
}
1530

16-
// /// Encode an HTTP request on the client.
17-
// pub async fn encode(_res: Request<Body>) -> Result<Encoder, std::io::Error> {
18-
// unimplemented!();
19-
// }
31+
impl <R: AsyncRead> Encoder<R> {
32+
/// Create a new instance.
33+
pub(crate) fn new(headers: Vec<u8>, body: Body<R>) -> Self {
34+
Self {
35+
body,
36+
headers,
37+
cursor: 0,
38+
headers_done: false,
39+
body_done: false,
40+
body_bytes_read: 0,
41+
}
42+
}
43+
}
2044

21-
// /// Decode an HTTP request on the client.
22-
// pub async fn decode(_reader: &mut (impl AsyncRead + Unpin)) -> Result<Response<Body>, Exception> {
23-
// unimplemented!();
24-
// }
45+
/// Encode an HTTP request on the client.
46+
pub async fn encode<R: AsyncRead>(req: Request<Body<R>>) -> Result<Encoder<R>, std::io::Error> {
47+
use std::io::Write;
48+
let mut buf: Vec<u8> = vec![];
49+
50+
write!(
51+
&mut buf,
52+
"{} {} HTTP/1.1\r\n",
53+
req.method().as_str(),
54+
req.uri()
55+
)?;
56+
57+
// If the body isn't streaming, we can set the content-length ahead of time. Else we need to
58+
// send all items in chunks.
59+
if let Some(len) = req.body().len() {
60+
write!(&mut buf, "Content-Length: {}\r\n", len)?;
61+
} else {
62+
// write!(&mut buf, "Transfer-Encoding: chunked\r\n")?;
63+
panic!("chunked encoding is not implemented yet");
64+
// See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
65+
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
66+
}
67+
68+
for (header, value) in req.headers() {
69+
write!(
70+
&mut buf,
71+
"{}: {}\r\n",
72+
header.as_str(),
73+
value.to_str().unwrap()
74+
)?;
75+
}
76+
77+
write!(&mut buf, "\r\n")?;
78+
Ok(Encoder::new(buf, req.into_body()))
79+
}
80+
81+
/// Decode an HTTP respons on the client.
82+
pub async fn decode<R>(
83+
reader: R,
84+
) -> Result<Response<Body<BufReader<R>>>, Exception>
85+
where
86+
R: AsyncRead + Unpin + Send,
87+
{
88+
let mut reader = BufReader::new(reader);
89+
let mut buf = Vec::new();
90+
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
91+
let mut httparse_res = httparse::Response::new(&mut headers);
92+
93+
// Keep reading bytes from the stream until we hit the end of the stream.
94+
loop {
95+
let bytes_read = reader.read_until(b'\n', &mut buf).await?;
96+
// No more bytes are yielded from the stream.
97+
if bytes_read == 0 {
98+
panic!("empty response");
99+
}
100+
101+
// We've hit the end delimiter of the stream.
102+
let idx = buf.len() - 1;
103+
if idx >= 3 && &buf[idx - 3..=idx] == b"\r\n\r\n" {
104+
break;
105+
}
106+
}
107+
108+
// Convert our header buf into an httparse instance, and validate.
109+
let status = httparse_res.parse(&buf)?;
110+
if status.is_partial() {
111+
dbg!(String::from_utf8(buf).unwrap());
112+
return Err("Malformed HTTP head".into());
113+
}
114+
115+
// Convert httparse headers + body into a `http::Response` type.
116+
let mut res = Response::builder();
117+
for header in httparse_res.headers.iter() {
118+
res.header(header.name, header.value);
119+
}
120+
if let Some(version) = httparse_res.version {
121+
res.version(match version {
122+
1 => Version::HTTP_11,
123+
_ => return Err("Unsupported HTTP version".into()),
124+
});
125+
}
126+
127+
// Process the body if `Content-Length` was passed.
128+
let body = match httparse_res
129+
.headers
130+
.iter()
131+
.find(|h| h.name == "Content-Length")
132+
{
133+
Some(_header) => Body::new(reader), // TODO: use the header value
134+
None => Body::empty(),
135+
};
136+
137+
// Return the response.
138+
Ok(res.body(body)?)
139+
}
140+
141+
impl <R: AsyncRead + Unpin> AsyncRead for Encoder<R> {
142+
fn poll_read(
143+
mut self: Pin<&mut Self>,
144+
cx: &mut Context<'_>,
145+
buf: &mut [u8],
146+
) -> Poll<io::Result<usize>> {
147+
// Send the headers. As long as the headers aren't fully sent yet we
148+
// keep sending more of the headers.
149+
let mut bytes_read = 0;
150+
if !self.headers_done {
151+
let len = std::cmp::min(self.headers.len() - self.cursor, buf.len());
152+
let range = self.cursor..self.cursor + len;
153+
buf[0..len].copy_from_slice(&mut self.headers[range]);
154+
self.cursor += len;
155+
if self.cursor == self.headers.len() {
156+
self.headers_done = true;
157+
}
158+
bytes_read += len;
159+
}
160+
161+
if !self.body_done {
162+
let n = ready!(Pin::new(&mut self.body).poll_read(cx, &mut buf[bytes_read..]))?;
163+
bytes_read += n;
164+
self.body_bytes_read += n;
165+
if bytes_read == 0 {
166+
self.body_done = true;
167+
}
168+
}
169+
170+
Poll::Ready(Ok(bytes_read as usize))
171+
}
172+
}

0 commit comments

Comments
 (0)