@@ -126,8 +126,9 @@ pub use builder::NodeBuilder as Builder;
126
126
127
127
use chain:: ChainSource ;
128
128
use config:: {
129
- default_user_config, may_announce_channel, ChannelConfig , Config , NODE_ANN_BCAST_INTERVAL ,
130
- PEER_RECONNECTION_INTERVAL , RGS_SYNC_INTERVAL ,
129
+ default_user_config, may_announce_channel, ChannelConfig , Config ,
130
+ LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS , NODE_ANN_BCAST_INTERVAL , PEER_RECONNECTION_INTERVAL ,
131
+ RGS_SYNC_INTERVAL ,
131
132
} ;
132
133
use connection:: ConnectionManager ;
133
134
use event:: { EventHandler , EventQueue } ;
@@ -146,9 +147,7 @@ use types::{
146
147
} ;
147
148
pub use types:: { ChannelDetails , CustomTlvRecord , PeerDetails , UserChannelId } ;
148
149
149
- #[ cfg( tokio_unstable) ]
150
- use logger:: log_trace;
151
- use logger:: { log_debug, log_error, log_info, LdkLogger , Logger } ;
150
+ use logger:: { log_debug, log_error, log_info, log_trace, LdkLogger , Logger } ;
152
151
153
152
use lightning:: chain:: BestBlock ;
154
153
use lightning:: events:: bump_transaction:: Wallet as LdkWallet ;
@@ -179,7 +178,7 @@ uniffi::include_scaffolding!("ldk_node");
179
178
pub struct Node {
180
179
runtime : Arc < RwLock < Option < Arc < tokio:: runtime:: Runtime > > > > ,
181
180
stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
182
- event_handling_stopped_sender : tokio:: sync :: watch :: Sender < ( ) > ,
181
+ background_processor_task : Mutex < Option < tokio:: task :: JoinHandle < ( ) > > > ,
183
182
config : Arc < Config > ,
184
183
wallet : Arc < Wallet > ,
185
184
chain_source : Arc < ChainSource > ,
@@ -577,9 +576,7 @@ impl Node {
577
576
} )
578
577
} ;
579
578
580
- let background_stop_logger = Arc :: clone ( & self . logger ) ;
581
- let event_handling_stopped_sender = self . event_handling_stopped_sender . clone ( ) ;
582
- runtime. spawn ( async move {
579
+ let handle = runtime. spawn ( async move {
583
580
process_events_async (
584
581
background_persister,
585
582
|e| background_event_handler. handle_event ( e) ,
@@ -599,20 +596,9 @@ impl Node {
599
596
log_error ! ( background_error_logger, "Failed to process events: {}" , e) ;
600
597
panic ! ( "Failed to process events" ) ;
601
598
} ) ;
602
- log_debug ! ( background_stop_logger, "Events processing stopped." , ) ;
603
-
604
- match event_handling_stopped_sender. send ( ( ) ) {
605
- Ok ( _) => ( ) ,
606
- Err ( e) => {
607
- log_error ! (
608
- background_stop_logger,
609
- "Failed to send 'events handling stopped' signal. This should never happen: {}" ,
610
- e
611
- ) ;
612
- debug_assert ! ( false ) ;
613
- } ,
614
- }
615
599
} ) ;
600
+ debug_assert ! ( self . background_processor_task. lock( ) . unwrap( ) . is_none( ) ) ;
601
+ * self . background_processor_task . lock ( ) . unwrap ( ) = Some ( handle) ;
616
602
617
603
if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
618
604
let mut stop_liquidity_handler = self . stop_sender . subscribe ( ) ;
@@ -655,7 +641,7 @@ impl Node {
655
641
656
642
// Stop the runtime.
657
643
match self . stop_sender . send ( ( ) ) {
658
- Ok ( _) => ( ) ,
644
+ Ok ( _) => log_trace ! ( self . logger , "Sent shutdown signal to background tasks." ) ,
659
645
Err ( e) => {
660
646
log_error ! (
661
647
self . logger,
@@ -668,43 +654,45 @@ impl Node {
668
654
669
655
// Disconnect all peers.
670
656
self . peer_manager . disconnect_all_peers ( ) ;
657
+ log_debug ! ( self . logger, "Disconnected all network peers." ) ;
671
658
672
- // Wait until event handling stopped, at least until a timeout is reached.
673
- let event_handling_stopped_logger = Arc :: clone ( & self . logger ) ;
674
- let mut event_handling_stopped_receiver = self . event_handling_stopped_sender . subscribe ( ) ;
675
-
676
- // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
677
- // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
678
- // should drop this considerably post upgrading to BDK 1.0.
679
- let timeout_res = tokio:: task:: block_in_place ( move || {
680
- runtime. block_on ( async {
681
- tokio:: time:: timeout (
682
- Duration :: from_secs ( 100 ) ,
683
- event_handling_stopped_receiver. changed ( ) ,
684
- )
685
- . await
686
- } )
687
- } ) ;
659
+ // Stop any runtime-dependant chain sources.
660
+ self . chain_source . stop ( ) ;
661
+ log_debug ! ( self . logger, "Stopped chain sources." ) ;
662
+
663
+ // Wait until background processing stopped, at least until a timeout is reached.
664
+ if let Some ( background_processor_task) =
665
+ self . background_processor_task . lock ( ) . unwrap ( ) . take ( )
666
+ {
667
+ let abort_handle = background_processor_task. abort_handle ( ) ;
668
+ let timeout_res = tokio:: task:: block_in_place ( move || {
669
+ runtime. block_on ( async {
670
+ tokio:: time:: timeout (
671
+ Duration :: from_secs ( LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS ) ,
672
+ background_processor_task,
673
+ )
674
+ . await
675
+ } )
676
+ } ) ;
688
677
689
- match timeout_res {
690
- Ok ( stop_res) => match stop_res {
691
- Ok ( ( ) ) => { } ,
678
+ match timeout_res {
679
+ Ok ( stop_res) => match stop_res {
680
+ Ok ( ( ) ) => log_debug ! ( self . logger, "Stopped background processing of events." ) ,
681
+ Err ( e) => {
682
+ abort_handle. abort ( ) ;
683
+ log_error ! (
684
+ self . logger,
685
+ "Stopping event handling failed. This should never happen: {}" ,
686
+ e
687
+ ) ;
688
+ panic ! ( "Stopping event handling failed. This should never happen." ) ;
689
+ } ,
690
+ } ,
692
691
Err ( e) => {
693
- log_error ! (
694
- event_handling_stopped_logger,
695
- "Stopping event handling failed. This should never happen: {}" ,
696
- e
697
- ) ;
698
- panic ! ( "Stopping event handling failed. This should never happen." ) ;
692
+ abort_handle. abort ( ) ;
693
+ log_error ! ( self . logger, "Stopping event handling timed out: {}" , e) ;
699
694
} ,
700
- } ,
701
- Err ( e) => {
702
- log_error ! (
703
- event_handling_stopped_logger,
704
- "Stopping event handling timed out: {}" ,
705
- e
706
- ) ;
707
- } ,
695
+ }
708
696
}
709
697
710
698
#[ cfg( tokio_unstable) ]
0 commit comments