@@ -296,3 +296,171 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
296296 }
297297 }
298298}
299+
300+ #[ cfg( test) ]
301+ mod tests {
302+ use crate :: { * , testing:: * } ;
303+
304+ #[ test]
305+ fn test_fork_clone ( ) {
306+ let mut context = TestingContext :: minimal_plugins ( ) ;
307+
308+ let workflow = context. spawn_io_workflow ( |scope, builder| {
309+ let fork = scope. input . fork_clone ( builder) ;
310+ let branch_a = fork. clone_output ( builder) ;
311+ let branch_b = fork. clone_output ( builder) ;
312+ builder. connect ( branch_a, scope. terminate ) ;
313+ builder. connect ( branch_b, scope. terminate ) ;
314+ } ) ;
315+
316+ let mut promise = context. command ( |commands| {
317+ commands
318+ . request ( 5.0 , workflow)
319+ . take_response ( )
320+ } ) ;
321+
322+ context. run_with_conditions ( & mut promise, Duration :: from_secs ( 1 ) ) ;
323+ assert ! ( promise. take( ) . available( ) . is_some_and( |v| v == 5.0 ) ) ;
324+ assert ! ( context. no_unhandled_errors( ) ) ;
325+
326+ let workflow = context. spawn_io_workflow ( |scope, builder| {
327+ scope. input . chain ( builder)
328+ . fork_clone ( (
329+ |chain : Chain < f64 > | chain. connect ( scope. terminate ) ,
330+ |chain : Chain < f64 > | chain. connect ( scope. terminate ) ,
331+ ) ) ;
332+ } ) ;
333+
334+ let mut promise = context. command ( |commands| {
335+ commands
336+ . request ( 3.0 , workflow)
337+ . take_response ( )
338+ } ) ;
339+
340+ context. run_with_conditions ( & mut promise, Duration :: from_secs ( 1 ) ) ;
341+ assert ! ( promise. take( ) . available( ) . is_some_and( |v| v == 3.0 ) ) ;
342+ assert ! ( context. no_unhandled_errors( ) ) ;
343+
344+ let workflow = context. spawn_io_workflow ( |scope, builder| {
345+ scope. input . chain ( builder)
346+ . fork_clone ( (
347+ |chain : Chain < f64 > | chain
348+ . map_block ( |t| WaitRequest { duration : Duration :: from_secs_f64 ( 10.0 * t) , value : 10.0 * t } )
349+ . map ( |r : AsyncMap < WaitRequest < f64 > > | {
350+ wait ( r. request )
351+ } )
352+ . connect ( scope. terminate ) ,
353+ |chain : Chain < f64 > | chain
354+ . map_block ( |t| WaitRequest { duration : Duration :: from_secs_f64 ( t/100.0 ) , value : t/100.0 } )
355+ . map ( |r : AsyncMap < WaitRequest < f64 > > | {
356+ wait ( r. request )
357+ } )
358+ . connect ( scope. terminate ) ,
359+ ) ) ;
360+ } ) ;
361+
362+ let mut promise = context. command ( |commands| {
363+ commands
364+ . request ( 1.0 , workflow)
365+ . take_response ( )
366+ } ) ;
367+
368+ context. run_with_conditions ( & mut promise, Duration :: from_secs_f64 ( 0.5 ) ) ;
369+ assert ! ( promise. take( ) . available( ) . is_some_and( |v| v == 0.01 ) ) ;
370+ assert ! ( context. no_unhandled_errors( ) ) ;
371+ }
372+
373+ #[ test]
374+ fn test_stream_reachability ( ) {
375+ let mut context = TestingContext :: minimal_plugins ( ) ;
376+
377+ // Test for streams from a blocking node
378+ let workflow = context. spawn_io_workflow ( |scope, builder| {
379+ let stream_node = builder. create_map ( |_: BlockingMap < ( ) , StreamOf < u32 > > | {
380+ // Do nothing. The purpose of this node is to just return without
381+ // sending off any streams.
382+ } ) ;
383+
384+ builder. connect ( scope. input , stream_node. input ) ;
385+ stream_node. streams . chain ( builder)
386+ . inner ( )
387+ . map_block ( |value| 2 * value)
388+ . connect ( scope. terminate ) ;
389+ } ) ;
390+
391+ let mut promise = context. command ( |commands| {
392+ commands. request ( ( ) , workflow) . take_response ( )
393+ } ) ;
394+
395+ context. run_with_conditions ( & mut promise, Duration :: from_secs ( 2 ) ) ;
396+ assert ! ( promise. peek( ) . is_cancelled( ) ) ;
397+ assert ! ( context. no_unhandled_errors( ) ) ;
398+
399+ // Test for streams from an async node
400+ let workflow = context. spawn_io_workflow ( |scope, builder| {
401+ let stream_node = builder. create_map ( |_: AsyncMap < ( ) , StreamOf < u32 > > | {
402+ async { /* Do nothing */ }
403+ } ) ;
404+
405+ builder. connect ( scope. input , stream_node. input ) ;
406+ stream_node. streams . chain ( builder)
407+ . inner ( )
408+ . map_block ( |value| 2 * value)
409+ . connect ( scope. terminate ) ;
410+ } ) ;
411+
412+ let mut promise = context. command ( |commands| {
413+ commands. request ( ( ) , workflow) . take_response ( )
414+ } ) ;
415+
416+ context. run_with_conditions ( & mut promise, Duration :: from_secs ( 2 ) ) ;
417+ assert ! ( promise. peek( ) . is_cancelled( ) ) ;
418+ assert ! ( context. no_unhandled_errors( ) ) ;
419+ }
420+
421+ use crossbeam:: channel:: unbounded;
422+
423+ #[ test]
424+ fn test_on_cancel ( ) {
425+ let ( sender, receiver) = unbounded ( ) ;
426+
427+ let mut context = TestingContext :: minimal_plugins ( ) ;
428+ let workflow = context. spawn_io_workflow ( |scope, builder| {
429+
430+ let input = scope. input . fork_clone ( builder) ;
431+
432+ let buffer = builder. create_buffer ( BufferSettings :: default ( ) ) ;
433+ let input_to_buffer = input. clone_output ( builder) ;
434+ builder. connect ( input_to_buffer, buffer. input_slot ( ) ) ;
435+
436+ let none_node = builder. create_map_block ( produce_none) ;
437+ let input_to_node = input. clone_output ( builder) ;
438+ builder. connect ( input_to_node, none_node. input ) ;
439+ none_node. output . chain ( builder)
440+ . cancel_on_none ( )
441+ . connect ( scope. terminate ) ;
442+
443+ // The chain coming out of the none_node will result in the scope
444+ // being cancelled. After that, this scope should run, and the value
445+ // that went into the buffer should get sent over the channel.
446+ builder. on_cancel ( buffer, |scope, builder| {
447+ scope. input . chain ( builder)
448+ . map_block ( move |value| {
449+ sender. send ( value) . ok ( ) ;
450+ } )
451+ . connect ( scope. terminate ) ;
452+ } ) ;
453+ } ) ;
454+
455+ let mut promise = context. command ( |commands| {
456+ commands. request ( 5 , workflow) . take_response ( )
457+ } ) ;
458+
459+ context. run_with_conditions ( & mut promise, Duration :: from_secs ( 2 ) ) ;
460+ assert ! ( promise. peek( ) . is_cancelled( ) ) ;
461+ let channel_output = receiver. try_recv ( ) . unwrap ( ) ;
462+ assert_eq ! ( channel_output, 5 ) ;
463+ assert ! ( context. no_unhandled_errors( ) ) ;
464+ assert ! ( context. confirm_buffers_empty( ) . is_ok( ) ) ;
465+ }
466+ }
0 commit comments