33use std:: collections:: { HashMap , HashSet } ;
44use std:: net:: SocketAddr ;
55use std:: path:: PathBuf ;
6- use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
6+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
77use std:: sync:: Arc ;
88use std:: time:: { Duration , SystemTime } ;
99use tokio:: sync:: { mpsc, Mutex } ;
@@ -14,6 +14,7 @@ use async_trait::async_trait;
1414use dashcore:: network:: constants:: ServiceFlags ;
1515use dashcore:: network:: message:: NetworkMessage ;
1616use dashcore:: Network ;
17+ use tokio_util:: sync:: CancellationToken ;
1718
1819use crate :: client:: config:: MempoolStrategy ;
1920use crate :: client:: ClientConfig ;
@@ -43,8 +44,8 @@ pub struct PeerNetworkManager {
4344 reputation_manager : Arc < PeerReputationManager > ,
4445 /// Network type
4546 network : Network ,
46- /// Shutdown signal
47- shutdown : Arc < AtomicBool > ,
47+ /// Shutdown token
48+ shutdown_token : CancellationToken ,
4849 /// Channel for incoming messages
4950 message_tx : mpsc:: Sender < ( SocketAddr , NetworkMessage ) > ,
5051 message_rx : Arc < Mutex < mpsc:: Receiver < ( SocketAddr , NetworkMessage ) > > > ,
@@ -111,7 +112,7 @@ impl PeerNetworkManager {
111112 peer_store : Arc :: new ( peer_store) ,
112113 reputation_manager,
113114 network : config. network ,
114- shutdown : Arc :: new ( AtomicBool :: new ( false ) ) ,
115+ shutdown_token : CancellationToken :: new ( ) ,
115116 message_tx,
116117 message_rx : Arc :: new ( Mutex :: new ( message_rx) ) ,
117118 tasks : Arc :: new ( Mutex :: new ( JoinSet :: new ( ) ) ) ,
@@ -207,7 +208,7 @@ impl PeerNetworkManager {
207208 let network = self . network ;
208209 let message_tx = self . message_tx . clone ( ) ;
209210 let addrv2_handler = self . addrv2_handler . clone ( ) ;
210- let shutdown = self . shutdown . clone ( ) ;
211+ let shutdown_token = self . shutdown_token . clone ( ) ;
211212 let reputation_manager = self . reputation_manager . clone ( ) ;
212213 let mempool_strategy = self . mempool_strategy ;
213214 let read_timeout = self . read_timeout ;
@@ -251,7 +252,7 @@ impl PeerNetworkManager {
251252 pool. clone ( ) ,
252253 message_tx,
253254 addrv2_handler,
254- shutdown ,
255+ shutdown_token ,
255256 reputation_manager. clone ( ) ,
256257 connected_peer_count. clone ( ) ,
257258 )
@@ -293,19 +294,19 @@ impl PeerNetworkManager {
293294 pool : Arc < ConnectionPool > ,
294295 message_tx : mpsc:: Sender < ( SocketAddr , NetworkMessage ) > ,
295296 addrv2_handler : Arc < AddrV2Handler > ,
296- shutdown : Arc < AtomicBool > ,
297+ shutdown_token : CancellationToken ,
297298 reputation_manager : Arc < PeerReputationManager > ,
298299 connected_peer_count : Arc < AtomicUsize > ,
299300 ) {
300301 tokio:: spawn ( async move {
301302 log:: debug!( "Starting peer reader loop for {}" , addr) ;
302303 let mut loop_iteration = 0 ;
303304
304- while !shutdown . load ( Ordering :: Relaxed ) {
305+ loop {
305306 loop_iteration += 1 ;
306307
307308 // Check shutdown signal first with detailed logging
308- if shutdown . load ( Ordering :: Relaxed ) {
309+ if shutdown_token . is_cancelled ( ) {
309310 log:: info!( "Breaking peer reader loop for {} - shutdown signal received (iteration {})" , addr, loop_iteration) ;
310311 break ;
311312 }
@@ -332,7 +333,15 @@ impl PeerNetworkManager {
332333
333334 // Now get write lock only for the duration of the read
334335 let mut conn_guard = conn. write ( ) . await ;
335- conn_guard. receive_message ( ) . await
336+ tokio:: select! {
337+ message = conn_guard. receive_message( ) => {
338+ message
339+ } ,
340+ _ = shutdown_token. cancelled( ) => {
341+ log:: info!( "Breaking peer reader loop for {} - shutdown signal received while reading (iteration {})" , addr, loop_iteration) ;
342+ break ;
343+ }
344+ }
336345 } ;
337346
338347 match msg_result {
@@ -582,7 +591,7 @@ impl PeerNetworkManager {
582591 let pool = self . pool . clone ( ) ;
583592 let discovery = self . discovery . clone ( ) ;
584593 let network = self . network ;
585- let shutdown = self . shutdown . clone ( ) ;
594+ let shutdown_token = self . shutdown_token . clone ( ) ;
586595 let addrv2_handler = self . addrv2_handler . clone ( ) ;
587596 let peer_store = self . peer_store . clone ( ) ;
588597 let reputation_manager = self . reputation_manager . clone ( ) ;
@@ -605,7 +614,7 @@ impl PeerNetworkManager {
605614
606615 let mut tasks = self . tasks . lock ( ) . await ;
607616 tasks. spawn ( async move {
608- while !shutdown . load ( Ordering :: Relaxed ) {
617+ while !shutdown_token . is_cancelled ( ) {
609618 // Clean up disconnected peers
610619 pool. cleanup_disconnected ( ) . await ;
611620
@@ -618,7 +627,13 @@ impl PeerNetworkManager {
618627 for addr in initial_peers. iter ( ) {
619628 if !pool. is_connected ( addr) . await && !pool. is_connecting ( addr) . await {
620629 log:: info!( "Reconnecting to exclusive peer: {}" , addr) ;
621- connect_fn ( * addr) . await ;
630+ tokio:: select! {
631+ _= connect_fn( * addr) => { } ,
632+ _ = shutdown_token. cancelled( ) => {
633+ log:: info!( "Maintenance loop shutting down during connection attempt (exclusive)" ) ;
634+ break ;
635+ }
636+ }
622637 }
623638 }
624639 } else {
@@ -648,7 +663,13 @@ impl PeerNetworkManager {
648663
649664 for addr in best_peers {
650665 if !pool. is_connected ( & addr) . await && !pool. is_connecting ( & addr) . await {
651- connect_fn ( addr) . await ;
666+ tokio:: select! {
667+ _= connect_fn( addr) => { } ,
668+ _ = shutdown_token. cancelled( ) => {
669+ log:: info!( "Maintenance loop shutting down during connection attempt (min peers)" ) ;
670+ break ;
671+ }
672+ }
652673 attempted += 1 ;
653674 if attempted >= needed {
654675 break ;
@@ -667,11 +688,23 @@ impl PeerNetworkManager {
667688 } ) ;
668689 if elapsed >= DNS_DISCOVERY_DELAY {
669690 log:: info!( "Using DNS discovery after {}s delay" , elapsed. as_secs( ) ) ;
670- let dns_peers = discovery. discover_peers ( network) . await ;
691+ let dns_peers = tokio:: select! {
692+ peers = discovery. discover_peers( network) => peers,
693+ _ = shutdown_token. cancelled( ) => {
694+ log:: info!( "Maintenance loop shutting down during DNS discovery" ) ;
695+ break ;
696+ }
697+ } ;
671698 let mut dns_attempted = 0 ;
672699 for addr in dns_peers. into_iter ( ) {
673700 if !pool. is_connected ( & addr) . await && !pool. is_connecting ( & addr) . await {
674- connect_fn ( addr) . await ;
701+ tokio:: select! {
702+ _= connect_fn( addr) => { } ,
703+ _ = shutdown_token. cancelled( ) => {
704+ log:: info!( "Maintenance loop shutting down during connection attempt (dns)" ) ;
705+ break ;
706+ }
707+ }
675708 dns_attempted += 1 ;
676709 if dns_attempted >= needed {
677710 break ;
@@ -725,7 +758,15 @@ impl PeerNetworkManager {
725758 }
726759 }
727760
728- time:: sleep ( MAINTENANCE_INTERVAL ) . await ;
761+ tokio:: select! {
762+ _ = time:: sleep( MAINTENANCE_INTERVAL ) => {
763+ log:: debug!( "Maintenance interval elapsed" ) ;
764+ }
765+ _ = shutdown_token. cancelled( ) => {
766+ log:: info!( "Maintenance loop shutting down" ) ;
767+ break ;
768+ }
769+ }
729770 }
730771 } ) ;
731772 }
@@ -948,7 +989,7 @@ impl PeerNetworkManager {
948989 /// Shutdown the network manager
949990 pub async fn shutdown ( & self ) {
950991 log:: info!( "Shutting down peer network manager" ) ;
951- self . shutdown . store ( true , Ordering :: Relaxed ) ;
992+ self . shutdown_token . cancel ( ) ;
952993
953994 // Save known peers before shutdown
954995 let addresses = self . addrv2_handler . get_addresses_for_peer ( MAX_ADDR_TO_STORE ) . await ;
@@ -989,7 +1030,7 @@ impl Clone for PeerNetworkManager {
9891030 peer_store : self . peer_store . clone ( ) ,
9901031 reputation_manager : self . reputation_manager . clone ( ) ,
9911032 network : self . network ,
992- shutdown : self . shutdown . clone ( ) ,
1033+ shutdown_token : self . shutdown_token . clone ( ) ,
9931034 message_tx : self . message_tx . clone ( ) ,
9941035 message_rx : self . message_rx . clone ( ) ,
9951036 tasks : self . tasks . clone ( ) ,
0 commit comments