@@ -3452,48 +3452,44 @@ mod tests {
34523452 #[ cfg( feature = "std" ) ]
34533453 fn test_process_events_multithreaded ( ) {
34543454 use std:: time:: { Duration , Instant } ;
3455- // Test that `process_events` getting called on multiple threads doesn't generate too many
3456- // loop iterations.
3455+ // `process_events` shouldn't block on another thread processing events and instead should
3456+ // simply signal the currently processing thread to go around the loop again.
3457+ // Here we test that this happens by spawning a few threads and checking that we see one go
3458+ // around again at least once.
3459+ //
34573460 // Each time `process_events` goes around the loop we call
3458- // `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`.
3459- // Because the loop should go around once more after a call which fails to take the
3460- // single-threaded lock, if we write zero to the counter before calling `process_events` we
3461- // should never observe there having been more than 2 loop iterations.
3462- // Further, because the last thread to exit will call `process_events` before returning, we
3463- // should always have at least one count at the end.
3461+ // `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. Thus,
3462+ // to test we simply write zero to the counter before calling `process_events` and make
3463+ // sure we observe a value greater than one at least once.
34643464 let cfg = Arc :: new ( create_peermgr_cfgs ( 1 ) ) ;
34653465 // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
34663466 let peer = Arc :: new ( create_network ( 1 , unsafe { & * ( & * cfg as * const _ ) as & ' static _ } ) . pop ( ) . unwrap ( ) ) ;
34673467
3468- let exit_flag = Arc :: new ( AtomicBool :: new ( false ) ) ;
3469- macro_rules! spawn_thread { ( ) => { {
3470- let thread_cfg = Arc :: clone ( & cfg ) ;
3468+ let end_time = Instant :: now ( ) + Duration :: from_millis ( 100 ) ;
3469+ let observed_loop = Arc :: new ( AtomicBool :: new ( false ) ) ;
3470+ let thread_fn = || {
34713471 let thread_peer = Arc :: clone ( & peer) ;
3472- let thread_exit = Arc :: clone( & exit_flag ) ;
3473- std :: thread :: spawn ( move || {
3474- while !thread_exit . load( Ordering :: Acquire ) {
3475- thread_cfg [ 0 ] . chan_handler . message_fetch_counter . store( 0 , Ordering :: Release ) ;
3472+ let thread_observed_loop = Arc :: clone ( & observed_loop ) ;
3473+ move || {
3474+ while Instant :: now ( ) < end_time || !thread_observed_loop . load ( Ordering :: Acquire ) {
3475+ test_utils :: TestChannelMessageHandler :: MESSAGE_FETCH_COUNTER . with ( |val| val . store ( 0 , Ordering :: Relaxed ) ) ;
34763476 thread_peer. process_events ( ) ;
3477+ if test_utils:: TestChannelMessageHandler :: MESSAGE_FETCH_COUNTER . with ( |val| val. load ( Ordering :: Relaxed ) ) > 1 {
3478+ thread_observed_loop. store ( true , Ordering :: Release ) ;
3479+ return ;
3480+ }
34773481 std:: thread:: sleep ( Duration :: from_micros ( 1 ) ) ;
34783482 }
3479- } )
3480- } } }
3481-
3482- let thread_a = spawn_thread ! ( ) ;
3483- let thread_b = spawn_thread ! ( ) ;
3484- let thread_c = spawn_thread ! ( ) ;
3485-
3486- let start_time = Instant :: now ( ) ;
3487- while start_time. elapsed ( ) < Duration :: from_millis ( 100 ) {
3488- let val = cfg[ 0 ] . chan_handler . message_fetch_counter . load ( Ordering :: Acquire ) ;
3489- assert ! ( val <= 2 ) ;
3490- std:: thread:: yield_now ( ) ; // Winblowz seemingly doesn't ever interrupt threads?!
3491- }
3483+ }
3484+ } ;
34923485
3493- exit_flag. store ( true , Ordering :: Release ) ;
3486+ let thread_a = std:: thread:: spawn ( thread_fn ( ) ) ;
3487+ let thread_b = std:: thread:: spawn ( thread_fn ( ) ) ;
3488+ let thread_c = std:: thread:: spawn ( thread_fn ( ) ) ;
3489+ thread_fn ( ) ( ) ;
34943490 thread_a. join ( ) . unwrap ( ) ;
34953491 thread_b. join ( ) . unwrap ( ) ;
34963492 thread_c. join ( ) . unwrap ( ) ;
3497- assert ! ( cfg [ 0 ] . chan_handler . message_fetch_counter . load( Ordering :: Acquire ) >= 1 ) ;
3493+ assert ! ( observed_loop . load( Ordering :: Acquire ) ) ;
34983494 }
34993495}
0 commit comments