1+ use bytes:: Bytes ;
12use std:: error:: Error ;
23use std:: fmt:: { Debug , Display } ;
3- use bytes:: Bytes ;
44use std:: sync:: Arc ;
55
6- use futures:: { SinkExt , StreamExt } ;
6+ use futures:: StreamExt ;
77use http:: header:: EXPECT ;
88use http:: { Response , StatusCode } ;
99use http_body:: Body ;
1010use http_body_util:: { BodyExt , Empty } ;
1111use tokio:: io:: { AsyncRead , AsyncWrite , AsyncWriteExt } ;
1212
13- use crate :: codec:: { RequestDecoder , ResponseEncoder } ;
13+ use crate :: codec:: RequestDecoder ;
1414use crate :: handler:: Handler ;
1515use crate :: protocol:: body:: ReqBody ;
1616use crate :: protocol:: { HttpError , Message , ParseError , PayloadItem , PayloadSize , RequestHeader , ResponseHead , SendError } ;
1717
18- use tokio_util:: codec:: { FramedRead , FramedWrite } ;
18+ use crate :: connection:: message_writer:: MessageWriter ;
19+ use tokio_util:: codec:: FramedRead ;
1920use tracing:: { error, info} ;
2021
2122/// An HTTP connection that manages request processing and response streaming
@@ -32,9 +33,13 @@ use tracing::{error, info};
3233/// * `W`: The async writable stream type
3334///
3435#[ derive( Debug ) ]
35- pub struct HttpConnection < R , W > where R : Debug , W : Debug {
36+ pub struct HttpConnection < R , W >
37+ where
38+ R : Debug ,
39+ W : Debug ,
40+ {
3641 framed_read : FramedRead < R , RequestDecoder > ,
37- framed_write : FramedWrite < W , ResponseEncoder > ,
42+ message_writer : MessageWriter < W > ,
3843}
3944
4045impl < R , W > HttpConnection < R , W >
4550 pub fn new ( reader : R , writer : W ) -> Self {
4651 Self {
4752 framed_read : FramedRead :: with_capacity ( reader, RequestDecoder :: new ( ) , 8 * 1024 ) ,
48- framed_write : FramedWrite :: new ( writer, ResponseEncoder :: new ( ) ) ,
53+ message_writer : MessageWriter :: from ( writer, 8 * 1024 ) ,
4954 }
5055 }
5156
7075 return Err ( ParseError :: invalid_body ( "need header while receive body" ) . into ( ) ) ;
7176 }
7277
73- Some ( Err ( ParseError :: Io { source} ) ) => {
78+ Some ( Err ( ParseError :: Io { source } ) ) => {
7479 info ! ( "connection io error: {}" , source) ;
7580 return Ok ( ( ) ) ;
7681 }
99104 let slice = value. as_bytes ( ) ;
100105 // Verify if the value of the "Expect" field is "100-continue".
101106 if slice. len ( ) >= 4 && & slice[ 0 ..4 ] == b"100-" {
102- let writer = self . framed_write . get_mut ( ) ;
107+ let writer = self . message_writer . get_mut ( ) ;
103108 // Send a "100 Continue" response to the client.
104109 let _ = writer. write ( b"HTTP/1.1 100 Continue\r \n \r \n " ) . await . map_err ( SendError :: io) ?;
105110 writer. flush ( ) . await . map_err ( SendError :: io) ?;
@@ -158,33 +163,27 @@ where
158163 } ;
159164
160165 let header = Message :: < _ , T :: Data > :: Header ( ( ResponseHead :: from_parts ( header_parts, ( ) ) , payload_size) ) ;
161- if payload_size. is_not_empty ( ) {
162- self . framed_write . feed ( header) . await ?;
163- } else {
164- // using send instead of feed, because we want to flush the underlying IO
165- // when response only has header, we need to send header,
166- // otherwise, we just feed header to the buffer
167- self . framed_write . send ( header) . await ?;
168- }
166+
167+ self . message_writer . write ( header) ?;
169168
170169 loop {
171170 match body. frame ( ) . await {
172171 Some ( Ok ( frame) ) => {
173172 let payload_item =
174173 frame. into_data ( ) . map ( PayloadItem :: Chunk ) . map_err ( |_e| SendError :: invalid_body ( "resolve body response error" ) ) ?;
175174
176- self . framed_write
177- . send ( Message :: Payload ( payload_item) )
178- . await
175+ self . message_writer
176+ . write ( Message :: Payload ( payload_item) )
179177 . map_err ( |_e| SendError :: invalid_body ( "can't send response" ) ) ?;
180178 }
181179 Some ( Err ( e) ) => return Err ( SendError :: invalid_body ( format ! ( "resolve response body error: {e}" ) ) . into ( ) ) ,
182180 None => {
183- self . framed_write
181+ self . message_writer
184182 // using feed instead of send, because we don't want to flush the underlying IO
185- . feed ( Message :: Payload ( PayloadItem :: < T :: Data > :: Eof ) )
186- . await
183+ . write ( Message :: Payload ( PayloadItem :: < T :: Data > :: Eof ) )
187184 . map_err ( |e| SendError :: invalid_body ( format ! ( "can't send eof response: {}" , e) ) ) ?;
185+ self . message_writer . flush ( ) . await ?;
186+ self . message_writer . clear_buf ( ) ;
188187 return Ok ( ( ) ) ;
189188 }
190189 }
0 commit comments