6565 err : Some ( err) ,
6666 }
6767 }
68+
69+ /// Decodes one complete logical line using the request's configured encoding.
70+ ///
71+ /// Callers are expected to pass only the bytes that belong to the line being yielded,
72+ /// whether they came from the internal buffer, the current payload chunk, or both.
73+ fn decode ( encoding : & ' static Encoding , bytes : & [ u8 ] ) -> Result < String , ReadlinesError > {
74+ if encoding == UTF_8 {
75+ str:: from_utf8 ( bytes)
76+ . map_err ( |_| ReadlinesError :: EncodingError )
77+ . map ( str:: to_owned)
78+ } else {
79+ encoding
80+ . decode_without_bom_handling_and_without_replacement ( bytes)
81+ . map ( Cow :: into_owned)
82+ . ok_or ( ReadlinesError :: EncodingError )
83+ }
84+ }
6885}
6986
7087impl < T > Stream for Readlines < T >
@@ -95,18 +112,7 @@ where
95112 if ind + 1 > this. limit {
96113 return Poll :: Ready ( Some ( Err ( ReadlinesError :: LimitOverflow ) ) ) ;
97114 }
98- let line = if this. encoding == UTF_8 {
99- str:: from_utf8 ( & this. buf . split_to ( ind + 1 ) )
100- . map_err ( |_| ReadlinesError :: EncodingError ) ?
101- . to_owned ( )
102- } else {
103- this. encoding
104- . decode_without_bom_handling_and_without_replacement (
105- & this. buf . split_to ( ind + 1 ) ,
106- )
107- . map ( Cow :: into_owned)
108- . ok_or ( ReadlinesError :: EncodingError ) ?
109- } ;
115+ let line = Self :: decode ( this. encoding , & this. buf . split_to ( ind + 1 ) ) ?;
110116 return Poll :: Ready ( Some ( Ok ( line) ) ) ;
111117 }
112118 this. checked_buff = true ;
@@ -125,24 +131,17 @@ where
125131 }
126132 if let Some ( ind) = found {
127133 // check if line is longer than limit
128- if ind + 1 > this. limit {
134+ if this . buf . len ( ) + ind + 1 > this. limit {
129135 return Poll :: Ready ( Some ( Err ( ReadlinesError :: LimitOverflow ) ) ) ;
130136 }
131- let line = if this. encoding == UTF_8 {
132- str:: from_utf8 ( & bytes. split_to ( ind + 1 ) )
133- . map_err ( |_| ReadlinesError :: EncodingError ) ?
134- . to_owned ( )
135- } else {
136- this. encoding
137- . decode_without_bom_handling_and_without_replacement (
138- & bytes. split_to ( ind + 1 ) ,
139- )
140- . map ( Cow :: into_owned)
141- . ok_or ( ReadlinesError :: EncodingError ) ?
142- } ;
143- // extend buffer with rest of the bytes;
137+
138+ this. buf . extend_from_slice ( & bytes. split_to ( ind + 1 ) ) ;
139+ let line = Self :: decode ( this. encoding , & this. buf ) ?;
140+ this. buf . clear ( ) ;
141+
142+ // buffer bytes following the returned line
144143 this. buf . extend_from_slice ( & bytes) ;
145- this. checked_buff = false ;
144+ this. checked_buff = this . buf . is_empty ( ) ;
146145 return Poll :: Ready ( Some ( Ok ( line) ) ) ;
147146 }
148147 this. buf . extend_from_slice ( & bytes) ;
@@ -156,16 +155,7 @@ where
156155 if this. buf . len ( ) > this. limit {
157156 return Poll :: Ready ( Some ( Err ( ReadlinesError :: LimitOverflow ) ) ) ;
158157 }
159- let line = if this. encoding == UTF_8 {
160- str:: from_utf8 ( & this. buf )
161- . map_err ( |_| ReadlinesError :: EncodingError ) ?
162- . to_owned ( )
163- } else {
164- this. encoding
165- . decode_without_bom_handling_and_without_replacement ( & this. buf )
166- . map ( Cow :: into_owned)
167- . ok_or ( ReadlinesError :: EncodingError ) ?
168- } ;
158+ let line = Self :: decode ( this. encoding , & this. buf ) ?;
169159 this. buf . clear ( ) ;
170160 Poll :: Ready ( Some ( Ok ( line) ) )
171161 }
@@ -177,10 +167,16 @@ where
177167
178168#[ cfg( test) ]
179169mod tests {
180- use futures_util:: StreamExt as _;
170+ use std:: {
171+ pin:: Pin ,
172+ task:: { Context , Poll } ,
173+ } ;
174+
175+ use actix_http:: { h1, Request } ;
176+ use futures_util:: { task:: noop_waker_ref, StreamExt as _} ;
181177
182178 use super :: * ;
183- use crate :: test:: TestRequest ;
179+ use crate :: { error :: ReadlinesError , test:: TestRequest } ;
184180
185181 #[ actix_rt:: test]
186182 async fn test_readlines ( ) {
@@ -208,4 +204,46 @@ mod tests {
208204 "Contrary to popular belief, Lorem Ipsum is not simply random text."
209205 ) ;
210206 }
207+
208+ #[ test]
209+ fn test_readlines_limit_across_chunks ( ) {
210+ let ( mut sender, payload) = h1:: Payload :: create ( false ) ;
211+ let payload: actix_http:: Payload = payload. into ( ) ;
212+ let mut req = Request :: with_payload ( payload) ;
213+ let mut stream = Readlines :: new ( & mut req) . limit ( 10 ) ;
214+ let mut cx = Context :: from_waker ( noop_waker_ref ( ) ) ;
215+
216+ sender. feed_data ( Bytes :: from_static ( b"AAAAAAAAAA" ) ) ;
217+ assert ! ( matches!(
218+ Pin :: new( & mut stream) . poll_next( & mut cx) ,
219+ Poll :: Pending
220+ ) ) ;
221+
222+ sender. feed_data ( Bytes :: from_static ( b"A\n " ) ) ;
223+ assert ! ( matches!(
224+ Pin :: new( & mut stream) . poll_next( & mut cx) ,
225+ Poll :: Ready ( Some ( Err ( ReadlinesError :: LimitOverflow ) ) )
226+ ) ) ;
227+ }
228+
229+ #[ test]
230+ fn test_readlines_returns_full_line_across_chunks ( ) {
231+ let ( mut sender, payload) = h1:: Payload :: create ( false ) ;
232+ let payload: actix_http:: Payload = payload. into ( ) ;
233+ let mut req = Request :: with_payload ( payload) ;
234+ let mut stream = Readlines :: new ( & mut req) ;
235+ let mut cx = Context :: from_waker ( noop_waker_ref ( ) ) ;
236+
237+ sender. feed_data ( Bytes :: from_static ( b"hello " ) ) ;
238+ assert ! ( matches!(
239+ Pin :: new( & mut stream) . poll_next( & mut cx) ,
240+ Poll :: Pending
241+ ) ) ;
242+
243+ sender. feed_data ( Bytes :: from_static ( b"world\n next" ) ) ;
244+ assert ! ( matches!(
245+ Pin :: new( & mut stream) . poll_next( & mut cx) ,
246+ Poll :: Ready ( Some ( Ok ( ref line) ) ) if line == "hello world\n "
247+ ) ) ;
248+ }
211249}
0 commit comments