@@ -815,34 +815,40 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
815815 let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act two is 50 bytes
816816
817817 let mut peers = self . peers . write ( ) . unwrap ( ) ;
818- if peers. insert ( descriptor, Mutex :: new ( Peer {
819- channel_encryptor : peer_encryptor,
820- their_node_id : None ,
821- their_features : None ,
822- their_net_address : remote_network_address,
823-
824- pending_outbound_buffer : LinkedList :: new ( ) ,
825- pending_outbound_buffer_first_msg_offset : 0 ,
826- gossip_broadcast_buffer : LinkedList :: new ( ) ,
827- awaiting_write_event : false ,
828-
829- pending_read_buffer,
830- pending_read_buffer_pos : 0 ,
831- pending_read_is_header : false ,
832-
833- sync_status : InitSyncTracker :: NoSyncRequested ,
834-
835- msgs_sent_since_pong : 0 ,
836- awaiting_pong_timer_tick_intervals : 0 ,
837- received_message_since_timer_tick : false ,
838- sent_gossip_timestamp_filter : false ,
839-
840- received_channel_announce_since_backlogged : false ,
841- inbound_connection : false ,
842- } ) ) . is_some ( ) {
843- panic ! ( "PeerManager driver duplicated descriptors!" ) ;
844- } ;
845- Ok ( res)
818+ match peers. entry ( descriptor) {
819+ hash_map:: Entry :: Occupied ( _) => {
820+ debug_assert ! ( false , "PeerManager driver duplicated descriptors!" ) ;
821+ Err ( PeerHandleError { } )
822+ } ,
823+ hash_map:: Entry :: Vacant ( e) => {
824+ e. insert ( Mutex :: new ( Peer {
825+ channel_encryptor : peer_encryptor,
826+ their_node_id : None ,
827+ their_features : None ,
828+ their_net_address : remote_network_address,
829+
830+ pending_outbound_buffer : LinkedList :: new ( ) ,
831+ pending_outbound_buffer_first_msg_offset : 0 ,
832+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
833+ awaiting_write_event : false ,
834+
835+ pending_read_buffer,
836+ pending_read_buffer_pos : 0 ,
837+ pending_read_is_header : false ,
838+
839+ sync_status : InitSyncTracker :: NoSyncRequested ,
840+
841+ msgs_sent_since_pong : 0 ,
842+ awaiting_pong_timer_tick_intervals : 0 ,
843+ received_message_since_timer_tick : false ,
844+ sent_gossip_timestamp_filter : false ,
845+
846+ received_channel_announce_since_backlogged : false ,
847+ inbound_connection : false ,
848+ } ) ) ;
849+ Ok ( res)
850+ }
851+ }
846852 }
847853
848854 /// Indicates a new inbound connection has been established to a node with an optional remote
@@ -865,34 +871,40 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
865871 let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act one is 50 bytes
866872
867873 let mut peers = self . peers . write ( ) . unwrap ( ) ;
868- if peers. insert ( descriptor, Mutex :: new ( Peer {
869- channel_encryptor : peer_encryptor,
870- their_node_id : None ,
871- their_features : None ,
872- their_net_address : remote_network_address,
873-
874- pending_outbound_buffer : LinkedList :: new ( ) ,
875- pending_outbound_buffer_first_msg_offset : 0 ,
876- gossip_broadcast_buffer : LinkedList :: new ( ) ,
877- awaiting_write_event : false ,
878-
879- pending_read_buffer,
880- pending_read_buffer_pos : 0 ,
881- pending_read_is_header : false ,
882-
883- sync_status : InitSyncTracker :: NoSyncRequested ,
884-
885- msgs_sent_since_pong : 0 ,
886- awaiting_pong_timer_tick_intervals : 0 ,
887- received_message_since_timer_tick : false ,
888- sent_gossip_timestamp_filter : false ,
889-
890- received_channel_announce_since_backlogged : false ,
891- inbound_connection : true ,
892- } ) ) . is_some ( ) {
893- panic ! ( "PeerManager driver duplicated descriptors!" ) ;
894- } ;
895- Ok ( ( ) )
874+ match peers. entry ( descriptor) {
875+ hash_map:: Entry :: Occupied ( _) => {
876+ debug_assert ! ( false , "PeerManager driver duplicated descriptors!" ) ;
877+ Err ( PeerHandleError { } )
878+ } ,
879+ hash_map:: Entry :: Vacant ( e) => {
880+ e. insert ( Mutex :: new ( Peer {
881+ channel_encryptor : peer_encryptor,
882+ their_node_id : None ,
883+ their_features : None ,
884+ their_net_address : remote_network_address,
885+
886+ pending_outbound_buffer : LinkedList :: new ( ) ,
887+ pending_outbound_buffer_first_msg_offset : 0 ,
888+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
889+ awaiting_write_event : false ,
890+
891+ pending_read_buffer,
892+ pending_read_buffer_pos : 0 ,
893+ pending_read_is_header : false ,
894+
895+ sync_status : InitSyncTracker :: NoSyncRequested ,
896+
897+ msgs_sent_since_pong : 0 ,
898+ awaiting_pong_timer_tick_intervals : 0 ,
899+ received_message_since_timer_tick : false ,
900+ sent_gossip_timestamp_filter : false ,
901+
902+ received_channel_announce_since_backlogged : false ,
903+ inbound_connection : true ,
904+ } ) ) ;
905+ Ok ( ( ) )
906+ }
907+ }
896908 }
897909
898910 fn peer_should_read ( & self , peer : & mut Peer ) -> bool {
@@ -1141,9 +1153,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11411153 macro_rules! insert_node_id {
11421154 ( ) => {
11431155 match self . node_id_to_descriptor. lock( ) . unwrap( ) . entry( peer. their_node_id. unwrap( ) . 0 ) {
1144- hash_map:: Entry :: Occupied ( _ ) => {
1156+ hash_map:: Entry :: Occupied ( e ) => {
11451157 log_trace!( self . logger, "Got second connection with {}, closing" , log_pubkey!( peer. their_node_id. unwrap( ) . 0 ) ) ;
11461158 peer. their_node_id = None ; // Unset so that we don't generate a peer_disconnected event
1159+ // Check that the peers map is consistent with the
1160+ // node_id_to_descriptor map, as this has been broken
1161+ // before.
1162+ debug_assert!( peers. get( e. get( ) ) . is_some( ) ) ;
11471163 return Err ( PeerHandleError { } )
11481164 } ,
11491165 hash_map:: Entry :: Vacant ( entry) => {
@@ -1913,7 +1929,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19131929 self . do_attempt_write_data ( & mut descriptor, & mut * peer, false ) ;
19141930 }
19151931 self . do_disconnect ( descriptor, & * peer, "DisconnectPeer HandleError" ) ;
1916- }
1932+ } else { debug_assert ! ( false , "Missing connection for peer" ) ; }
19171933 }
19181934 }
19191935 }
@@ -1951,11 +1967,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19511967 } ,
19521968 Some ( peer_lock) => {
19531969 let peer = peer_lock. lock ( ) . unwrap ( ) ;
1954- if !peer. handshake_complete ( ) { return ; }
1955- debug_assert ! ( peer. their_node_id. is_some( ) ) ;
19561970 if let Some ( ( node_id, _) ) = peer. their_node_id {
19571971 log_trace ! ( self . logger, "Handling disconnection of peer {}" , log_pubkey!( node_id) ) ;
1958- self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( & node_id) ;
1972+ let removed = self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( & node_id) ;
1973+ debug_assert ! ( removed. is_some( ) , "descriptor maps should be consistent" ) ;
1974+ if !peer. handshake_complete ( ) { return ; }
19591975 self . message_handler . chan_handler . peer_disconnected ( & node_id) ;
19601976 self . message_handler . onion_message_handler . peer_disconnected ( & node_id) ;
19611977 }
@@ -2188,12 +2204,13 @@ mod tests {
21882204
21892205 use crate :: prelude:: * ;
21902206 use crate :: sync:: { Arc , Mutex } ;
2191- use core:: sync:: atomic:: Ordering ;
2207+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
21922208
21932209 #[ derive( Clone ) ]
21942210 struct FileDescriptor {
21952211 fd : u16 ,
21962212 outbound_data : Arc < Mutex < Vec < u8 > > > ,
2213+ disconnect : Arc < AtomicBool > ,
21972214 }
21982215 impl PartialEq for FileDescriptor {
21992216 fn eq ( & self , other : & Self ) -> bool {
@@ -2213,7 +2230,7 @@ mod tests {
22132230 data. len ( )
22142231 }
22152232
2216- fn disconnect_socket ( & mut self ) { }
2233+ fn disconnect_socket ( & mut self ) { self . disconnect . store ( true , Ordering :: Release ) ; }
22172234 }
22182235
22192236 struct PeerManagerCfg {
@@ -2254,10 +2271,16 @@ mod tests {
22542271
22552272 fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > ) -> ( FileDescriptor , FileDescriptor ) {
22562273 let id_a = peer_a. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2257- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2274+ let mut fd_a = FileDescriptor {
2275+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2276+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2277+ } ;
22582278 let addr_a = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1000 } ;
22592279 let id_b = peer_b. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2260- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2280+ let mut fd_b = FileDescriptor {
2281+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2282+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2283+ } ;
22612284 let addr_b = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1001 } ;
22622285 let initial_data = peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
22632286 peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
@@ -2281,6 +2304,84 @@ mod tests {
22812304 ( fd_a. clone ( ) , fd_b. clone ( ) )
22822305 }
22832306
2307+ #[ test]
2308+ #[ cfg( feature = "std" ) ]
2309+ fn fuzz_threaded_connections ( ) {
2310+ // Spawn two threads which repeatedly connect two peers together, leading to "got second
2311+ // connection with peer" disconnections and rapid reconnect. This previously found an issue
2312+ // with our internal map consistency, and is a generally good smoke test of disconnection.
2313+ let cfgs = Arc :: new ( create_peermgr_cfgs ( 2 ) ) ;
2314+ // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
2315+ let peers = Arc :: new ( create_network ( 2 , unsafe { & * ( & * cfgs as * const _ ) as & ' static _ } ) ) ;
2316+
2317+ let start_time = std:: time:: Instant :: now ( ) ;
2318+ macro_rules! spawn_thread { ( $id: expr) => { {
2319+ let peers = Arc :: clone( & peers) ;
2320+ let cfgs = Arc :: clone( & cfgs) ;
2321+ std:: thread:: spawn( move || {
2322+ let mut ctr = 0 ;
2323+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2324+ let id_a = peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ;
2325+ let mut fd_a = FileDescriptor {
2326+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2327+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2328+ } ;
2329+ let addr_a = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1000 } ;
2330+ let mut fd_b = FileDescriptor {
2331+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2332+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2333+ } ;
2334+ let addr_b = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1001 } ;
2335+ let initial_data = peers[ 1 ] . new_outbound_connection( id_a, fd_b. clone( ) , Some ( addr_a. clone( ) ) ) . unwrap( ) ;
2336+ peers[ 0 ] . new_inbound_connection( fd_a. clone( ) , Some ( addr_b. clone( ) ) ) . unwrap( ) ;
2337+ assert_eq!( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
2338+
2339+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2340+ peers[ 0 ] . process_events( ) ;
2341+ if fd_a. disconnect. load( Ordering :: Acquire ) { break ; }
2342+ let a_data = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2343+ if peers[ 1 ] . read_event( & mut fd_b, & a_data) . is_err( ) { break ; }
2344+
2345+ peers[ 1 ] . process_events( ) ;
2346+ if fd_b. disconnect. load( Ordering :: Acquire ) { break ; }
2347+ let b_data = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2348+ if peers[ 0 ] . read_event( & mut fd_a, & b_data) . is_err( ) { break ; }
2349+
2350+ cfgs[ 0 ] . chan_handler. pending_events. lock( ) . unwrap( )
2351+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2352+ node_id: peers[ 1 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2353+ msg: msgs:: Shutdown {
2354+ channel_id: [ 0 ; 32 ] ,
2355+ scriptpubkey: bitcoin:: Script :: new( ) ,
2356+ } ,
2357+ } ) ;
2358+ cfgs[ 1 ] . chan_handler. pending_events. lock( ) . unwrap( )
2359+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2360+ node_id: peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2361+ msg: msgs:: Shutdown {
2362+ channel_id: [ 0 ; 32 ] ,
2363+ scriptpubkey: bitcoin:: Script :: new( ) ,
2364+ } ,
2365+ } ) ;
2366+
2367+ peers[ 0 ] . timer_tick_occurred( ) ;
2368+ peers[ 1 ] . timer_tick_occurred( ) ;
2369+ }
2370+
2371+ peers[ 0 ] . socket_disconnected( & fd_a) ;
2372+ peers[ 1 ] . socket_disconnected( & fd_b) ;
2373+ ctr += 1 ;
2374+ std:: thread:: sleep( std:: time:: Duration :: from_micros( 1 ) ) ;
2375+ }
2376+ } )
2377+ } } }
2378+ let thrd_a = spawn_thread ! ( 1 ) ;
2379+ let thrd_b = spawn_thread ! ( 2 ) ;
2380+
2381+ thrd_a. join ( ) . unwrap ( ) ;
2382+ thrd_b. join ( ) . unwrap ( ) ;
2383+ }
2384+
22842385 #[ test]
22852386 fn test_disconnect_peer ( ) {
22862387 // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -2337,7 +2438,10 @@ mod tests {
23372438 let cfgs = create_peermgr_cfgs ( 2 ) ;
23382439 let peers = create_network ( 2 , & cfgs) ;
23392440
2340- let mut fd_dup = FileDescriptor { fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2441+ let mut fd_dup = FileDescriptor {
2442+ fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2443+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2444+ } ;
23412445 let addr_dup = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1003 } ;
23422446 let id_a = cfgs[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
23432447 peers[ 0 ] . new_inbound_connection ( fd_dup. clone ( ) , Some ( addr_dup. clone ( ) ) ) . unwrap ( ) ;
@@ -2441,8 +2545,14 @@ mod tests {
24412545 let peers = create_network ( 2 , & cfgs) ;
24422546
24432547 let a_id = peers[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2444- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2445- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2548+ let mut fd_a = FileDescriptor {
2549+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2550+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2551+ } ;
2552+ let mut fd_b = FileDescriptor {
2553+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2554+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2555+ } ;
24462556 let initial_data = peers[ 1 ] . new_outbound_connection ( a_id, fd_b. clone ( ) , None ) . unwrap ( ) ;
24472557 peers[ 0 ] . new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
24482558
0 commit comments