@@ -2204,12 +2204,13 @@ mod tests {
22042204
22052205 use crate :: prelude:: * ;
22062206 use crate :: sync:: { Arc , Mutex } ;
2207- use core:: sync:: atomic:: Ordering ;
2207+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
22082208
22092209 #[ derive( Clone ) ]
22102210 struct FileDescriptor {
22112211 fd : u16 ,
22122212 outbound_data : Arc < Mutex < Vec < u8 > > > ,
2213+ disconnect : Arc < AtomicBool > ,
22132214 }
22142215 impl PartialEq for FileDescriptor {
22152216 fn eq ( & self , other : & Self ) -> bool {
@@ -2229,7 +2230,7 @@ mod tests {
22292230 data. len ( )
22302231 }
22312232
2232- fn disconnect_socket ( & mut self ) { }
2233+ fn disconnect_socket ( & mut self ) { self . disconnect . store ( true , Ordering :: Release ) ; }
22332234 }
22342235
22352236 struct PeerManagerCfg {
@@ -2270,10 +2271,16 @@ mod tests {
22702271
22712272 fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > ) -> ( FileDescriptor , FileDescriptor ) {
22722273 let id_a = peer_a. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2273- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2274+ let mut fd_a = FileDescriptor {
2275+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2276+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2277+ } ;
22742278 let addr_a = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1000 } ;
22752279 let id_b = peer_b. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2276- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2280+ let mut fd_b = FileDescriptor {
2281+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2282+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2283+ } ;
22772284 let addr_b = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1001 } ;
22782285 let initial_data = peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
22792286 peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
@@ -2297,6 +2304,84 @@ mod tests {
22972304 ( fd_a. clone ( ) , fd_b. clone ( ) )
22982305 }
22992306
2307+ #[ test]
2308+ #[ cfg( feature = "std" ) ]
2309+ fn fuzz_threaded_connections ( ) {
2310+ // Spawn two threads which repeatedly connect two peers together, leading to "got second
2311+ // connection with peer" disconnections and rapid reconnect. This previously found an issue
2312+ // with our internal map consistency, and is a generally good smoke test of disconnection.
2313+ let cfgs = Arc :: new ( create_peermgr_cfgs ( 2 ) ) ;
2314+ // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
2315+ let peers = Arc :: new ( create_network ( 2 , unsafe { & * ( & * cfgs as * const _ ) as & ' static _ } ) ) ;
2316+
2317+ let start_time = std:: time:: Instant :: now ( ) ;
2318+ macro_rules! spawn_thread { ( $id: expr) => { {
2319+ let peers = Arc :: clone( & peers) ;
2320+ let cfgs = Arc :: clone( & cfgs) ;
2321+ std:: thread:: spawn( move || {
2322+ let mut ctr = 0 ;
2323+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2324+ let id_a = peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ;
2325+ let mut fd_a = FileDescriptor {
2326+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2327+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2328+ } ;
2329+ let addr_a = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1000 } ;
2330+ let mut fd_b = FileDescriptor {
2331+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2332+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2333+ } ;
2334+ let addr_b = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1001 } ;
2335+ let initial_data = peers[ 1 ] . new_outbound_connection( id_a, fd_b. clone( ) , Some ( addr_a. clone( ) ) ) . unwrap( ) ;
2336+ peers[ 0 ] . new_inbound_connection( fd_a. clone( ) , Some ( addr_b. clone( ) ) ) . unwrap( ) ;
2337+ assert_eq!( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
2338+
2339+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2340+ peers[ 0 ] . process_events( ) ;
2341+ if fd_a. disconnect. load( Ordering :: Acquire ) { break ; }
2342+ let a_data = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2343+ if peers[ 1 ] . read_event( & mut fd_b, & a_data) . is_err( ) { break ; }
2344+
2345+ peers[ 1 ] . process_events( ) ;
2346+ if fd_b. disconnect. load( Ordering :: Acquire ) { break ; }
2347+ let b_data = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2348+ if peers[ 0 ] . read_event( & mut fd_a, & b_data) . is_err( ) { break ; }
2349+
2350+ cfgs[ 0 ] . chan_handler. pending_events. lock( ) . unwrap( )
2351+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2352+ node_id: peers[ 1 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2353+ msg: msgs:: Shutdown {
2354+ channel_id: [ 0 ; 32 ] ,
2355+ scriptpubkey: bitcoin:: Script :: new( ) ,
2356+ } ,
2357+ } ) ;
2358+ cfgs[ 1 ] . chan_handler. pending_events. lock( ) . unwrap( )
2359+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2360+ node_id: peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2361+ msg: msgs:: Shutdown {
2362+ channel_id: [ 0 ; 32 ] ,
2363+ scriptpubkey: bitcoin:: Script :: new( ) ,
2364+ } ,
2365+ } ) ;
2366+
2367+ peers[ 0 ] . timer_tick_occurred( ) ;
2368+ peers[ 1 ] . timer_tick_occurred( ) ;
2369+ }
2370+
2371+ peers[ 0 ] . socket_disconnected( & fd_a) ;
2372+ peers[ 1 ] . socket_disconnected( & fd_b) ;
2373+ ctr += 1 ;
2374+ std:: thread:: sleep( std:: time:: Duration :: from_micros( 1 ) ) ;
2375+ }
2376+ } )
2377+ } } }
2378+ let thrd_a = spawn_thread ! ( 1 ) ;
2379+ let thrd_b = spawn_thread ! ( 2 ) ;
2380+
2381+ thrd_a. join ( ) . unwrap ( ) ;
2382+ thrd_b. join ( ) . unwrap ( ) ;
2383+ }
2384+
23002385 #[ test]
23012386 fn test_disconnect_peer ( ) {
23022387 // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -2353,7 +2438,10 @@ mod tests {
23532438 let cfgs = create_peermgr_cfgs ( 2 ) ;
23542439 let peers = create_network ( 2 , & cfgs) ;
23552440
2356- let mut fd_dup = FileDescriptor { fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2441+ let mut fd_dup = FileDescriptor {
2442+ fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2443+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2444+ } ;
23572445 let addr_dup = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1003 } ;
23582446 let id_a = cfgs[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
23592447 peers[ 0 ] . new_inbound_connection ( fd_dup. clone ( ) , Some ( addr_dup. clone ( ) ) ) . unwrap ( ) ;
@@ -2457,8 +2545,14 @@ mod tests {
24572545 let peers = create_network ( 2 , & cfgs) ;
24582546
24592547 let a_id = peers[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2460- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2461- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2548+ let mut fd_a = FileDescriptor {
2549+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2550+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2551+ } ;
2552+ let mut fd_b = FileDescriptor {
2553+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2554+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2555+ } ;
24622556 let initial_data = peers[ 1 ] . new_outbound_connection ( a_id, fd_b. clone ( ) , None ) . unwrap ( ) ;
24632557 peers[ 0 ] . new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
24642558
0 commit comments