33use std:: collections:: { HashMap , HashSet } ;
44use std:: net:: SocketAddr ;
55use std:: path:: PathBuf ;
6- use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
6+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
77use std:: sync:: Arc ;
88use std:: time:: { Duration , SystemTime } ;
99use tokio:: sync:: { mpsc, Mutex } ;
@@ -14,6 +14,7 @@ use async_trait::async_trait;
1414use dashcore:: network:: constants:: ServiceFlags ;
1515use dashcore:: network:: message:: NetworkMessage ;
1616use dashcore:: Network ;
17+ use tokio_util:: sync:: CancellationToken ;
1718
1819use crate :: client:: config:: MempoolStrategy ;
1920use crate :: client:: ClientConfig ;
@@ -43,8 +44,8 @@ pub struct PeerNetworkManager {
4344 reputation_manager : Arc < PeerReputationManager > ,
4445 /// Network type
4546 network : Network ,
46- /// Shutdown signal
47- shutdown : Arc < AtomicBool > ,
47+ /// Shutdown token
48+ shutdown_token : CancellationToken ,
4849 /// Channel for incoming messages
4950 message_tx : mpsc:: Sender < ( SocketAddr , NetworkMessage ) > ,
5051 message_rx : Arc < Mutex < mpsc:: Receiver < ( SocketAddr , NetworkMessage ) > > > ,
@@ -111,7 +112,7 @@ impl PeerNetworkManager {
111112 peer_store : Arc :: new ( peer_store) ,
112113 reputation_manager,
113114 network : config. network ,
114- shutdown : Arc :: new ( AtomicBool :: new ( false ) ) ,
115+ shutdown_token : CancellationToken :: new ( ) ,
115116 message_tx,
116117 message_rx : Arc :: new ( Mutex :: new ( message_rx) ) ,
117118 tasks : Arc :: new ( Mutex :: new ( JoinSet :: new ( ) ) ) ,
@@ -207,7 +208,7 @@ impl PeerNetworkManager {
207208 let network = self . network ;
208209 let message_tx = self . message_tx . clone ( ) ;
209210 let addrv2_handler = self . addrv2_handler . clone ( ) ;
210- let shutdown = self . shutdown . clone ( ) ;
211+ let shutdown_token = self . shutdown_token . clone ( ) ;
211212 let reputation_manager = self . reputation_manager . clone ( ) ;
212213 let mempool_strategy = self . mempool_strategy ;
213214 let read_timeout = self . read_timeout ;
@@ -251,7 +252,7 @@ impl PeerNetworkManager {
251252 pool. clone ( ) ,
252253 message_tx,
253254 addrv2_handler,
254- shutdown ,
255+ shutdown_token ,
255256 reputation_manager. clone ( ) ,
256257 connected_peer_count. clone ( ) ,
257258 )
@@ -293,23 +294,17 @@ impl PeerNetworkManager {
293294 pool : Arc < ConnectionPool > ,
294295 message_tx : mpsc:: Sender < ( SocketAddr , NetworkMessage ) > ,
295296 addrv2_handler : Arc < AddrV2Handler > ,
296- shutdown : Arc < AtomicBool > ,
297+ shutdown_token : CancellationToken ,
297298 reputation_manager : Arc < PeerReputationManager > ,
298299 connected_peer_count : Arc < AtomicUsize > ,
299300 ) {
300301 tokio:: spawn ( async move {
301302 log:: debug!( "Starting peer reader loop for {}" , addr) ;
302303 let mut loop_iteration = 0 ;
303304
304- while !shutdown . load ( Ordering :: Relaxed ) {
305+ loop {
305306 loop_iteration += 1 ;
306307
307- // Check shutdown signal first with detailed logging
308- if shutdown. load ( Ordering :: Relaxed ) {
309- log:: info!( "Breaking peer reader loop for {} - shutdown signal received (iteration {})" , addr, loop_iteration) ;
310- break ;
311- }
312-
313308 // Get connection
314309 let conn = match pool. get_connection ( & addr) . await {
315310 Some ( conn) => conn,
@@ -332,7 +327,15 @@ impl PeerNetworkManager {
332327
333328 // Now get write lock only for the duration of the read
334329 let mut conn_guard = conn. write ( ) . await ;
335- conn_guard. receive_message ( ) . await
330+ tokio:: select! {
331+ message = conn_guard. receive_message( ) => {
332+ message
333+ } ,
334+ _ = shutdown_token. cancelled( ) => {
335+ log:: info!( "Breaking peer reader loop for {} - shutdown signal received (iteration {})" , addr, loop_iteration) ;
336+ break ;
337+ }
338+ }
336339 } ;
337340
338341 match msg_result {
@@ -559,7 +562,11 @@ impl PeerNetworkManager {
559562 }
560563
561564 // Remove from pool
562- log:: warn!( "Disconnecting from {} (peer reader loop ended)" , addr) ;
565+ log:: warn!(
566+ "Disconnecting from {} (peer reader loop ended), shutdown triggered {}" ,
567+ addr,
568+ shutdown_token. is_cancelled( )
569+ ) ;
563570 let removed = pool. remove_connection ( & addr) . await ;
564571 if removed. is_some ( ) {
565572 // Decrement connected peer counter when a connection is removed
@@ -582,7 +589,7 @@ impl PeerNetworkManager {
582589 let pool = self . pool . clone ( ) ;
583590 let discovery = self . discovery . clone ( ) ;
584591 let network = self . network ;
585- let shutdown = self . shutdown . clone ( ) ;
592+ let shutdown_token = self . shutdown_token . clone ( ) ;
586593 let addrv2_handler = self . addrv2_handler . clone ( ) ;
587594 let peer_store = self . peer_store . clone ( ) ;
588595 let reputation_manager = self . reputation_manager . clone ( ) ;
@@ -605,7 +612,7 @@ impl PeerNetworkManager {
605612
606613 let mut tasks = self . tasks . lock ( ) . await ;
607614 tasks. spawn ( async move {
608- while !shutdown . load ( Ordering :: Relaxed ) {
615+ loop {
609616 // Clean up disconnected peers
610617 pool. cleanup_disconnected ( ) . await ;
611618
@@ -618,7 +625,13 @@ impl PeerNetworkManager {
618625 for addr in initial_peers. iter ( ) {
619626 if !pool. is_connected ( addr) . await && !pool. is_connecting ( addr) . await {
620627 log:: info!( "Reconnecting to exclusive peer: {}" , addr) ;
621- connect_fn ( * addr) . await ;
628+ tokio:: select! {
629+ _= connect_fn( * addr) => { } ,
630+ _ = shutdown_token. cancelled( ) => {
631+ log:: info!( "Maintenance loop shutting down during connection attempt (exclusive)" ) ;
632+ break ;
633+ }
634+ }
622635 }
623636 }
624637 } else {
@@ -648,7 +661,13 @@ impl PeerNetworkManager {
648661
649662 for addr in best_peers {
650663 if !pool. is_connected ( & addr) . await && !pool. is_connecting ( & addr) . await {
651- connect_fn ( addr) . await ;
664+ tokio:: select! {
665+ _= connect_fn( addr) => { } ,
666+ _ = shutdown_token. cancelled( ) => {
667+ log:: info!( "Maintenance loop shutting down during connection attempt (min peers)" ) ;
668+ break ;
669+ }
670+ }
652671 attempted += 1 ;
653672 if attempted >= needed {
654673 break ;
@@ -667,11 +686,23 @@ impl PeerNetworkManager {
667686 } ) ;
668687 if elapsed >= DNS_DISCOVERY_DELAY {
669688 log:: info!( "Using DNS discovery after {}s delay" , elapsed. as_secs( ) ) ;
670- let dns_peers = discovery. discover_peers ( network) . await ;
689+ let dns_peers = tokio:: select! {
690+ peers = discovery. discover_peers( network) => peers,
691+ _ = shutdown_token. cancelled( ) => {
692+ log:: info!( "Maintenance loop shutting down during DNS discovery" ) ;
693+ break ;
694+ }
695+ } ;
671696 let mut dns_attempted = 0 ;
672697 for addr in dns_peers. into_iter ( ) {
673698 if !pool. is_connected ( & addr) . await && !pool. is_connecting ( & addr) . await {
674- connect_fn ( addr) . await ;
699+ tokio:: select! {
700+ _= connect_fn( addr) => { } ,
701+ _ = shutdown_token. cancelled( ) => {
702+ log:: info!( "Maintenance loop shutting down during connection attempt (dns)" ) ;
703+ break ;
704+ }
705+ }
675706 dns_attempted += 1 ;
676707 if dns_attempted >= needed {
677708 break ;
@@ -725,7 +756,15 @@ impl PeerNetworkManager {
725756 }
726757 }
727758
728- time:: sleep ( MAINTENANCE_INTERVAL ) . await ;
759+ tokio:: select! {
760+ _ = time:: sleep( MAINTENANCE_INTERVAL ) => {
761+ log:: debug!( "Maintenance interval elapsed" ) ;
762+ }
763+ _ = shutdown_token. cancelled( ) => {
764+ log:: info!( "Maintenance loop shutting down" ) ;
765+ break ;
766+ }
767+ }
729768 }
730769 } ) ;
731770 }
@@ -948,7 +987,7 @@ impl PeerNetworkManager {
948987 /// Shutdown the network manager
949988 pub async fn shutdown ( & self ) {
950989 log:: info!( "Shutting down peer network manager" ) ;
951- self . shutdown . store ( true , Ordering :: Relaxed ) ;
990+ self . shutdown_token . cancel ( ) ;
952991
953992 // Save known peers before shutdown
954993 let addresses = self . addrv2_handler . get_addresses_for_peer ( MAX_ADDR_TO_STORE ) . await ;
@@ -989,7 +1028,7 @@ impl Clone for PeerNetworkManager {
9891028 peer_store : self . peer_store . clone ( ) ,
9901029 reputation_manager : self . reputation_manager . clone ( ) ,
9911030 network : self . network ,
992- shutdown : self . shutdown . clone ( ) ,
1031+ shutdown_token : self . shutdown_token . clone ( ) ,
9931032 message_tx : self . message_tx . clone ( ) ,
9941033 message_rx : self . message_rx . clone ( ) ,
9951034 tasks : self . tasks . clone ( ) ,
0 commit comments