@@ -33,7 +33,7 @@ impl StreamWriter {
3333 Ok ( ( ) )
3434 } ,
3535 Err ( e) =>
36- Err ( anyhow:: anyhow!( "Internal error: StreamWriter::append can't take lock: {}" , e) )
36+ Err ( anyhow:: anyhow!( "Internal error: StreamWriter::append can't take lock: {}" , e) )
3737 } ;
3838 // This was meant to wake up listener threads when there was new data but it ended up
3939 // just stalling until input was complete. TODO: investigate so we can get rid of the
@@ -181,6 +181,8 @@ fn split_at_two_newlines(source: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
181181
182182#[ cfg( test) ]
183183mod test {
184+ use futures:: StreamExt ;
185+
184186 use super :: * ;
185187
186188 #[ test]
@@ -205,4 +207,31 @@ mod test {
205207 assert_eq ! ( vec![ 0x41 , 0x42 , 0x0a , 0x43 , 0x44 , 0x0a ] , result. 0 ) ;
206208 assert ! ( result. 1 . is_empty( ) ) ;
207209 }
210+
211+ #[ tokio:: test]
212+ async fn streaming_splits_out_headers ( ) {
213+ let mut sw = StreamWriter :: new ( ) ;
214+ let mut sw2 = sw. clone ( ) ;
215+ tokio:: spawn ( async move {
216+ write ! ( sw2, "Header 1\n " ) . unwrap ( ) ;
217+ write ! ( sw2, "Header 2\n " ) . unwrap ( ) ;
218+ write ! ( sw2, "\n " ) . unwrap ( ) ;
219+ write ! ( sw2, "Body 1\n " ) . unwrap ( ) ;
220+ write ! ( sw2, "Body 2\n " ) . unwrap ( ) ;
221+ sw2. done ( ) . unwrap ( ) ;
222+ } ) ;
223+ let header = sw. header_block ( ) . await . unwrap ( ) ;
224+ let header_text = String :: from_utf8 ( header) . unwrap ( ) ;
225+ assert ! ( header_text. contains( "Header 1\n " ) ) ;
226+ assert ! ( header_text. contains( "Header 2\n " ) ) ;
227+
228+ let mut stm = Box :: pin ( sw. as_stream ( ) ) ;
229+ let mut body = vec ! [ ] ;
230+ while let Some ( Ok ( v) ) = stm. next ( ) . await {
231+ body. extend_from_slice ( & v) ;
232+ }
233+ let body_text = String :: from_utf8 ( body) . unwrap ( ) ;
234+ assert ! ( body_text. contains( "Body 1\n " ) ) ;
235+ assert ! ( body_text. contains( "Body 2\n " ) ) ;
236+ }
208237}
0 commit comments