Skip to content

Commit 7ad0689

Browse files
committed
refactor: use message_writer instead of framed_writer
1 parent 4df0761 commit 7ad0689

File tree

4 files changed

+72
-23
lines changed

4 files changed

+72
-23
lines changed

crates/http/src/codec/body/payload_encoder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ impl PayloadEncoder {
8989
}
9090

9191
/// Returns whether the encoder has finished sending all data.
92+
#[inline]
9293
pub fn is_finish(&self) -> bool {
9394
match &self.kind {
9495
Kind::Length(encoder) => encoder.is_finish(),

crates/http/src/connection/http_connection.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
1+
use bytes::Bytes;
12
use std::error::Error;
23
use std::fmt::{Debug, Display};
3-
use bytes::Bytes;
44
use std::sync::Arc;
55

6-
use futures::{SinkExt, StreamExt};
6+
use futures::StreamExt;
77
use http::header::EXPECT;
88
use http::{Response, StatusCode};
99
use http_body::Body;
1010
use http_body_util::{BodyExt, Empty};
1111
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
1212

13-
use crate::codec::{RequestDecoder, ResponseEncoder};
13+
use crate::codec::RequestDecoder;
1414
use crate::handler::Handler;
1515
use crate::protocol::body::ReqBody;
1616
use crate::protocol::{HttpError, Message, ParseError, PayloadItem, PayloadSize, RequestHeader, ResponseHead, SendError};
1717

18-
use tokio_util::codec::{FramedRead, FramedWrite};
18+
use crate::connection::message_writer::MessageWriter;
19+
use tokio_util::codec::FramedRead;
1920
use tracing::{error, info};
2021

2122
/// An HTTP connection that manages request processing and response streaming
@@ -32,9 +33,13 @@ use tracing::{error, info};
3233
/// * `W`: The async writable stream type
3334
///
3435
#[derive(Debug)]
35-
pub struct HttpConnection<R, W> where R: Debug, W: Debug{
36+
pub struct HttpConnection<R, W>
37+
where
38+
R: Debug,
39+
W: Debug,
40+
{
3641
framed_read: FramedRead<R, RequestDecoder>,
37-
framed_write: FramedWrite<W, ResponseEncoder>,
42+
message_writer: MessageWriter<W>,
3843
}
3944

4045
impl<R, W> HttpConnection<R, W>
@@ -45,7 +50,7 @@ where
4550
pub fn new(reader: R, writer: W) -> Self {
4651
Self {
4752
framed_read: FramedRead::with_capacity(reader, RequestDecoder::new(), 8 * 1024),
48-
framed_write: FramedWrite::new(writer, ResponseEncoder::new()),
53+
message_writer: MessageWriter::with_capacity(writer, 8 * 1024),
4954
}
5055
}
5156

@@ -70,7 +75,7 @@ where
7075
return Err(ParseError::invalid_body("need header while receive body").into());
7176
}
7277

73-
Some(Err(ParseError::Io { source})) => {
78+
Some(Err(ParseError::Io { source })) => {
7479
info!("connection io error: {}", source);
7580
return Ok(());
7681
}
@@ -99,7 +104,7 @@ where
99104
let slice = value.as_bytes();
100105
// Verify if the value of the "Expect" field is "100-continue".
101106
if slice.len() >= 4 && &slice[0..4] == b"100-" {
102-
let writer = self.framed_write.get_mut();
107+
let writer = self.message_writer.get_mut();
103108
// Send a "100 Continue" response to the client.
104109
let _ = writer.write(b"HTTP/1.1 100 Continue\r\n\r\n").await.map_err(SendError::io)?;
105110
writer.flush().await.map_err(SendError::io)?;
@@ -158,33 +163,27 @@ where
158163
};
159164

160165
let header = Message::<_, T::Data>::Header((ResponseHead::from_parts(header_parts, ()), payload_size));
161-
if payload_size.is_not_empty() {
162-
self.framed_write.feed(header).await?;
163-
} else {
164-
// using send instead of feed, because we want to flush the underlying IO
165-
// when response only has header, we need to send header,
166-
// otherwise, we just feed header to the buffer
167-
self.framed_write.send(header).await?;
168-
}
166+
167+
self.message_writer.write(header)?;
169168

170169
loop {
171170
match body.frame().await {
172171
Some(Ok(frame)) => {
173172
let payload_item =
174173
frame.into_data().map(PayloadItem::Chunk).map_err(|_e| SendError::invalid_body("resolve body response error"))?;
175174

176-
self.framed_write
177-
.send(Message::Payload(payload_item))
178-
.await
175+
self.message_writer
176+
.write(Message::Payload(payload_item))
179177
.map_err(|_e| SendError::invalid_body("can't send response"))?;
180178
}
181179
Some(Err(e)) => return Err(SendError::invalid_body(format!("resolve response body error: {e}")).into()),
182180
None => {
183-
self.framed_write
181+
self.message_writer
184182
// using feed instead of send, because we don't want to flush the underlying IO
185-
.feed(Message::Payload(PayloadItem::<T::Data>::Eof))
186-
.await
183+
.write(Message::Payload(PayloadItem::<T::Data>::Eof))
187184
.map_err(|e| SendError::invalid_body(format!("can't send eof response: {}", e)))?;
185+
self.message_writer.flush().await?;
186+
self.message_writer.clear_buf();
188187
return Ok(());
189188
}
190189
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use crate::codec::ResponseEncoder;
2+
use crate::protocol::{Message, PayloadSize, ResponseHead, SendError};
3+
use bytes::{Buf, BytesMut};
4+
use tokio::io::{AsyncWrite, AsyncWriteExt};
5+
use tokio_util::codec::Encoder;
6+
7+
#[derive(Debug)]
8+
pub struct MessageWriter<W> {
9+
writer: W,
10+
buffer: BytesMut,
11+
encoder: ResponseEncoder,
12+
}
13+
14+
impl<W> MessageWriter<W>
15+
where
16+
W: AsyncWrite + Unpin,
17+
{
18+
pub fn with_capacity(writer: W, buffer_size: usize) -> Self {
19+
Self { writer, buffer: BytesMut::with_capacity(buffer_size), encoder: ResponseEncoder::new() }
20+
}
21+
22+
#[inline]
23+
pub fn get_mut(&mut self) -> &mut W {
24+
&mut self.writer
25+
}
26+
27+
pub fn clear_buf(&mut self) {
28+
self.buffer.clear();
29+
}
30+
31+
#[inline]
32+
pub fn write<D>(&mut self, item: Message<(ResponseHead, PayloadSize), D>) -> Result<(), SendError>
33+
where
34+
D: Buf,
35+
{
36+
self.encoder.encode(item, &mut self.buffer)
37+
}
38+
39+
#[inline]
40+
pub async fn flush(&mut self) -> Result<(), SendError> {
41+
if self.buffer.is_empty() {
42+
return Ok(());
43+
}
44+
45+
self.writer.write_all(self.buffer.as_ref()).await?;
46+
Ok(self.writer.flush().await?)
47+
}
48+
}

crates/http/src/connection/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@
2323
//! - Efficient memory usage through buffering
2424
2525
mod http_connection;
26+
mod message_writer;
2627

2728
pub use http_connection::HttpConnection;

0 commit comments

Comments
 (0)