@@ -214,3 +214,122 @@ fn collect(continuations: &mut Vec<Bytes>) -> Bytes {
214214
215215 buf. freeze ( )
216216}
217+
218+ #[ cfg( test) ]
219+ mod tests {
220+ use std:: { future:: Future , task:: Poll } ;
221+
222+ use futures_core:: Stream ;
223+
224+ use super :: { Bytes , Item , Message , MessageStream } ;
225+ use crate :: stream:: tests:: payload_pair;
226+
227+ #[ tokio:: test]
228+ async fn aggregates_continuations ( ) {
229+ std:: future:: poll_fn ( move |cx| {
230+ let ( mut tx, rx) = payload_pair ( 8 ) ;
231+ let message_stream = MessageStream :: new ( rx) . aggregate_continuations ( ) ;
232+ let mut stream = std:: pin:: pin!( message_stream) ;
233+
234+ let messages = [
235+ Message :: Continuation ( Item :: FirstText ( Bytes :: from ( b"first" . to_vec ( ) ) ) ) ,
236+ Message :: Continuation ( Item :: Continue ( Bytes :: from ( b"second" . to_vec ( ) ) ) ) ,
237+ Message :: Continuation ( Item :: Last ( Bytes :: from ( b"third" . to_vec ( ) ) ) ) ,
238+ ] ;
239+
240+ let len = messages. len ( ) ;
241+
242+ for ( idx, msg) in messages. into_iter ( ) . enumerate ( ) {
243+ let poll = stream. as_mut ( ) . poll_next ( cx) ;
244+ assert ! (
245+ poll. is_pending( ) ,
246+ "Stream should be pending when no messages are present {poll:?}"
247+ ) ;
248+
249+ let fut = tx. send ( msg) ;
250+ let fut = std:: pin:: pin!( fut) ;
251+
252+ assert ! ( fut. poll( cx) . is_ready( ) , "Sending should not yield" ) ;
253+
254+ if idx == len - 1 {
255+ assert ! (
256+ stream. as_mut( ) . poll_next( cx) . is_ready( ) ,
257+ "Stream should be ready"
258+ ) ;
259+ } else {
260+ assert ! (
261+ stream. as_mut( ) . poll_next( cx) . is_pending( ) ,
262+ "Stream shouldn't be ready until continuations complete"
263+ ) ;
264+ }
265+ }
266+
267+ assert ! (
268+ stream. as_mut( ) . poll_next( cx) . is_pending( ) ,
269+ "Stream should be pending after processing messages"
270+ ) ;
271+
272+ Poll :: Ready ( ( ) )
273+ } )
274+ . await
275+ }
276+
277+ #[ ignore]
278+ #[ tokio:: test]
279+ async fn aggregates_consecutive_continuations ( ) {
280+ std:: future:: poll_fn ( move |cx| {
281+ let ( mut tx, rx) = payload_pair ( 8 ) ;
282+ let message_stream = MessageStream :: new ( rx) . aggregate_continuations ( ) ;
283+ let mut stream = std:: pin:: pin!( message_stream) ;
284+
285+ let messages = vec ! [
286+ Message :: Continuation ( Item :: FirstText ( Bytes :: from( b"first" . to_vec( ) ) ) ) ,
287+ Message :: Continuation ( Item :: Continue ( Bytes :: from( b"second" . to_vec( ) ) ) ) ,
288+ Message :: Continuation ( Item :: Last ( Bytes :: from( b"third" . to_vec( ) ) ) ) ,
289+ ] ;
290+
291+ let poll = stream. as_mut ( ) . poll_next ( cx) ;
292+ assert ! (
293+ poll. is_pending( ) ,
294+ "Stream should be pending when no messages are present {poll:?}"
295+ ) ;
296+
297+ let fut = tx. send_many ( messages) ;
298+ let fut = std:: pin:: pin!( fut) ;
299+
300+ assert ! ( fut. poll( cx) . is_ready( ) , "Sending should not yield" ) ;
301+
302+ assert ! (
303+ stream. as_mut( ) . poll_next( cx) . is_ready( ) ,
304+ "Stream should be ready when all continuations have been sent"
305+ ) ;
306+
307+ assert ! (
308+ stream. as_mut( ) . poll_next( cx) . is_pending( ) ,
309+ "Stream should be pending after processing messages"
310+ ) ;
311+
312+ Poll :: Ready ( ( ) )
313+ } )
314+ . await
315+ }
316+
317+ #[ tokio:: test]
318+ async fn stream_closes ( ) {
319+ std:: future:: poll_fn ( move |cx| {
320+ let ( tx, rx) = payload_pair ( 8 ) ;
321+ drop ( tx) ;
322+ let message_stream = MessageStream :: new ( rx) . aggregate_continuations ( ) ;
323+ let mut stream = std:: pin:: pin!( message_stream) ;
324+
325+ let poll = stream. as_mut ( ) . poll_next ( cx) ;
326+ assert ! (
327+ matches!( poll, Poll :: Ready ( None ) ) ,
328+ "Stream should be ready when all continuations have been sent"
329+ ) ;
330+
331+ Poll :: Ready ( ( ) )
332+ } )
333+ . await
334+ }
335+ }
0 commit comments