@@ -570,11 +570,12 @@ impl Builder {
570570 }
571571 } ;
572572
573- let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
573+ let ( stop_sender , stop_receiver ) = tokio :: sync :: watch :: channel ( ( ) ) ;
574574
575575 Arc :: new ( Node {
576576 runtime,
577- stop_running,
577+ stop_sender,
578+ stop_receiver,
578579 config,
579580 wallet,
580581 tx_sync,
@@ -599,7 +600,8 @@ impl Builder {
599600/// Needs to be initialized and instantiated through [`Builder::build`].
600601pub struct Node {
601602 runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
602- stop_running : Arc < AtomicBool > ,
603+ stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
604+ stop_receiver : tokio:: sync:: watch:: Receiver < ( ) > ,
603605 config : Arc < Config > ,
604606 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
605607 tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -633,8 +635,6 @@ impl Node {
633635
634636 let runtime = tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ;
635637
636- let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
637-
638638 let event_handler = Arc :: new ( EventHandler :: new (
639639 Arc :: clone ( & self . wallet ) ,
640640 Arc :: clone ( & self . event_queue ) ,
@@ -653,66 +653,76 @@ impl Node {
653653 let sync_cman = Arc :: clone ( & self . channel_manager ) ;
654654 let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
655655 let sync_logger = Arc :: clone ( & self . logger ) ;
656- let stop_sync = Arc :: clone ( & stop_running ) ;
656+ let mut stop_sync = self . stop_receiver . clone ( ) ;
657657
658658 std:: thread:: spawn ( move || {
659659 tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
660660 async move {
661+ let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 30 ) ) ;
662+ interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
661663 loop {
662- if stop_sync. load ( Ordering :: Acquire ) {
663- return ;
664- }
665664 let now = Instant :: now ( ) ;
666- match wallet. sync ( ) . await {
667- Ok ( ( ) ) => log_info ! (
668- sync_logger,
669- "Background sync of on-chain wallet finished in {}ms." ,
670- now. elapsed( ) . as_millis( )
671- ) ,
672- Err ( err) => {
673- log_error ! (
674- sync_logger,
675- "Background sync of on-chain wallet failed: {}" ,
676- err
677- )
665+ tokio:: select! {
666+ _ = stop_sync. changed( ) => {
667+ return ;
668+ }
669+ _ = interval. tick( ) => {
670+ match wallet. sync( ) . await {
671+ Ok ( ( ) ) => log_info!(
672+ sync_logger,
673+ "Background sync of on-chain wallet finished in {}ms." ,
674+ now. elapsed( ) . as_millis( )
675+ ) ,
676+ Err ( err) => {
677+ log_error!(
678+ sync_logger,
679+ "Background sync of on-chain wallet failed: {}" ,
680+ err
681+ )
682+ }
683+ }
678684 }
679685 }
680- tokio:: time:: sleep ( Duration :: from_secs ( 20 ) ) . await ;
681686 }
682687 } ,
683688 ) ;
684689 } ) ;
685690
686691 let sync_logger = Arc :: clone ( & self . logger ) ;
687- let stop_sync = Arc :: clone ( & stop_running ) ;
692+ let mut stop_sync = self . stop_receiver . clone ( ) ;
688693 runtime. spawn ( async move {
694+ let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 10 ) ) ;
695+ interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
689696 loop {
690- if stop_sync. load ( Ordering :: Acquire ) {
691- return ;
692- }
693697 let now = Instant :: now ( ) ;
694- let confirmables = vec ! [
695- & * sync_cman as & ( dyn Confirm + Sync + Send ) ,
696- & * sync_cmon as & ( dyn Confirm + Sync + Send ) ,
697- ] ;
698- match tx_sync. sync ( confirmables) . await {
699- Ok ( ( ) ) => log_info ! (
700- sync_logger,
701- "Background sync of Lightning wallet finished in {}ms." ,
702- now. elapsed( ) . as_millis( )
703- ) ,
704- Err ( e) => {
705- log_error ! ( sync_logger, "Background sync of Lightning wallet failed: {}" , e)
698+ tokio:: select! {
699+ _ = stop_sync. changed( ) => {
700+ return ;
701+ }
702+ _ = interval. tick( ) => {
703+ let confirmables = vec![
704+ & * sync_cman as & ( dyn Confirm + Sync + Send ) ,
705+ & * sync_cmon as & ( dyn Confirm + Sync + Send ) ,
706+ ] ;
707+ match tx_sync. sync( confirmables) . await {
708+ Ok ( ( ) ) => log_info!(
709+ sync_logger,
710+ "Background sync of Lightning wallet finished in {}ms." ,
711+ now. elapsed( ) . as_millis( )
712+ ) ,
713+ Err ( e) => {
714+ log_error!( sync_logger, "Background sync of Lightning wallet failed: {}" , e)
715+ }
716+ }
706717 }
707718 }
708- tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
709719 }
710720 } ) ;
711721
712722 if let Some ( listening_address) = & self . config . listening_address {
713723 // Setup networking
714724 let peer_manager_connection_handler = Arc :: clone ( & self . peer_manager ) ;
715- let stop_listen = Arc :: clone ( & stop_running ) ;
725+ let mut stop_listen = self . stop_receiver . clone ( ) ;
716726 let listening_address = listening_address. clone ( ) ;
717727
718728 let bind_addr = listening_address
@@ -727,18 +737,22 @@ impl Node {
727737 "Failed to bind to listen address/port - is something else already listening on it?" ,
728738 ) ;
729739 loop {
730- if stop_listen. load ( Ordering :: Acquire ) {
731- return ;
732- }
733740 let peer_mgr = Arc :: clone ( & peer_manager_connection_handler) ;
734- let tcp_stream = listener. accept ( ) . await . unwrap ( ) . 0 ;
735- tokio:: spawn ( async move {
736- lightning_net_tokio:: setup_inbound (
737- Arc :: clone ( & peer_mgr) ,
738- tcp_stream. into_std ( ) . unwrap ( ) ,
739- )
740- . await ;
741- } ) ;
741+ tokio:: select! {
742+ _ = stop_listen. changed( ) => {
743+ return ;
744+ }
745+ res = listener. accept( ) => {
746+ let tcp_stream = res. unwrap( ) . 0 ;
747+ tokio:: spawn( async move {
748+ lightning_net_tokio:: setup_inbound(
749+ Arc :: clone( & peer_mgr) ,
750+ tcp_stream. into_std( ) . unwrap( ) ,
751+ )
752+ . await ;
753+ } ) ;
754+ }
755+ }
742756 }
743757 } ) ;
744758 }
@@ -748,36 +762,38 @@ impl Node {
748762 let connect_pm = Arc :: clone ( & self . peer_manager ) ;
749763 let connect_logger = Arc :: clone ( & self . logger ) ;
750764 let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
751- let stop_connect = Arc :: clone ( & stop_running ) ;
765+ let mut stop_connect = self . stop_receiver . clone ( ) ;
752766 runtime. spawn ( async move {
753767 let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
754768 interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
755769 loop {
756- if stop_connect. load ( Ordering :: Acquire ) {
757- return ;
758- }
759-
760- interval. tick ( ) . await ;
761- let pm_peers = connect_pm
762- . get_peer_node_ids ( )
763- . iter ( )
764- . map ( |( peer, _addr) | * peer)
765- . collect :: < Vec < _ > > ( ) ;
766- for node_id in connect_cm
767- . list_channels ( )
768- . iter ( )
769- . map ( |chan| chan. counterparty . node_id )
770- . filter ( |id| !pm_peers. contains ( id) )
771- {
772- if let Some ( peer_info) = connect_peer_store. get_peer ( & node_id) {
773- let _ = do_connect_peer (
774- peer_info. node_id ,
775- peer_info. address ,
776- Arc :: clone ( & connect_pm) ,
777- Arc :: clone ( & connect_logger) ,
778- )
779- . await ;
780- }
770+ tokio:: select! {
771+ _ = stop_connect. changed( ) => {
772+ return ;
773+ }
774+ _ = interval. tick( ) => {
775+ let pm_peers = connect_pm
776+ . get_peer_node_ids( )
777+ . iter( )
778+ . map( |( peer, _addr) | * peer)
779+ . collect:: <Vec <_>>( ) ;
780+ for node_id in connect_cm
781+ . list_channels( )
782+ . iter( )
783+ . map( |chan| chan. counterparty. node_id)
784+ . filter( |id| !pm_peers. contains( id) )
785+ {
786+ if let Some ( peer_info) = connect_peer_store. get_peer( & node_id) {
787+ let _ = do_connect_peer(
788+ peer_info. node_id,
789+ peer_info. address,
790+ Arc :: clone( & connect_pm) ,
791+ Arc :: clone( & connect_logger) ,
792+ )
793+ . await ;
794+ }
795+ }
796+ }
781797 }
782798 }
783799 } ) ;
@@ -786,28 +802,32 @@ impl Node {
786802 let bcast_cm = Arc :: clone ( & self . channel_manager ) ;
787803 let bcast_pm = Arc :: clone ( & self . peer_manager ) ;
788804 let bcast_config = Arc :: clone ( & self . config ) ;
789- let stop_bcast = Arc :: clone ( & stop_running ) ;
805+ let mut stop_bcast = self . stop_receiver . clone ( ) ;
790806 runtime. spawn ( async move {
791807 let mut interval = tokio:: time:: interval ( NODE_ANN_BCAST_INTERVAL ) ;
792808 loop {
793- if stop_bcast. load ( Ordering :: Acquire ) {
794- return ;
795- }
796-
797- if !bcast_cm. list_channels ( ) . iter ( ) . any ( |chan| chan. is_public ) { continue ; }
798-
799- interval. tick ( ) . await ;
800-
801- if !bcast_cm. list_channels ( ) . iter ( ) . any ( |chan| chan. is_public ) { continue ; }
809+ tokio:: select! {
810+ _ = stop_bcast. changed( ) => {
811+ return ;
812+ }
813+ _ = interval. tick( ) , if bcast_cm. list_channels( ) . iter( ) . any( |chan| chan. is_public) => {
814+ while bcast_pm. get_peer_node_ids( ) . is_empty( ) {
815+ // Sleep a bit and retry if we don't have any peers yet.
816+ tokio:: time:: sleep( Duration :: from_secs( 5 ) ) . await ;
817+
818+ // Check back if we need to stop.
819+ match stop_bcast. has_changed( ) {
820+ Ok ( false ) => { } ,
821+ Ok ( true ) => return ,
822+ Err ( _) => return ,
823+ }
824+ }
802825
803- while bcast_pm. get_peer_node_ids ( ) . is_empty ( ) {
804- // Sleep a bit and retry if we don't have any peers yet.
805- tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
826+ let addresses =
827+ bcast_config. listening_address. iter( ) . cloned( ) . map( |a| a. 0 ) . collect( ) ;
828+ bcast_pm. broadcast_node_announcement( [ 0 ; 3 ] , [ 0 ; 32 ] , addresses) ;
829+ }
806830 }
807-
808- let addresses =
809- bcast_config. listening_address . iter ( ) . cloned ( ) . map ( |a| a. 0 ) . collect ( ) ;
810- bcast_pm. broadcast_node_announcement ( [ 0 ; 3 ] , [ 0 ; 32 ] , addresses) ;
811831 }
812832 } ) ;
813833
@@ -820,15 +840,17 @@ impl Node {
820840 let background_peer_man = Arc :: clone ( & self . peer_manager ) ;
821841 let background_logger = Arc :: clone ( & self . logger ) ;
822842 let background_scorer = Arc :: clone ( & self . scorer ) ;
823- let stop_background_processing = Arc :: clone ( & stop_running ) ;
843+ let stop_bp = self . stop_receiver . clone ( ) ;
824844 let sleeper = move |d| {
825- let stop = Arc :: clone ( & stop_background_processing ) ;
845+ let mut stop = stop_bp . clone ( ) ;
826846 Box :: pin ( async move {
827- if stop. load ( Ordering :: Acquire ) {
828- true
829- } else {
830- tokio:: time:: sleep ( d) . await ;
831- false
847+ tokio:: select! {
848+ _ = stop. changed( ) => {
849+ true
850+ }
851+ _ = tokio:: time:: sleep( d) => {
852+ false
853+ }
832854 }
833855 } )
834856 } ;
@@ -860,7 +882,7 @@ impl Node {
860882 pub fn stop ( & self ) -> Result < ( ) , Error > {
861883 let runtime = self . runtime . write ( ) . unwrap ( ) . take ( ) . ok_or ( Error :: NotRunning ) ?;
862884 // Stop the runtime.
863- self . stop_running . store ( true , Ordering :: Release ) ;
885+ self . stop_sender . send ( ( ) ) . expect ( "Failed to send stop signal" ) ;
864886
865887 // Stop disconnect peers.
866888 self . peer_manager . disconnect_all_peers ( ) ;
0 commit comments