@@ -179,7 +179,7 @@ uniffi::include_scaffolding!("ldk_node");
179179pub struct Node {
180180 runtime : Arc < RwLock < Option < Arc < tokio:: runtime:: Runtime > > > > ,
181181 stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
182- event_handling_stopped_sender : tokio:: sync :: watch :: Sender < ( ) > ,
182+ background_processor_task : Mutex < Option < tokio:: task :: JoinHandle < ( ) > > > ,
183183 config : Arc < Config > ,
184184 wallet : Arc < Wallet > ,
185185 chain_source : Arc < ChainSource > ,
@@ -578,8 +578,7 @@ impl Node {
578578 } ;
579579
580580 let background_stop_logger = Arc :: clone ( & self . logger ) ;
581- let event_handling_stopped_sender = self . event_handling_stopped_sender . clone ( ) ;
582- runtime. spawn ( async move {
581+ let handle = runtime. spawn ( async move {
583582 process_events_async (
584583 background_persister,
585584 |e| background_event_handler. handle_event ( e) ,
@@ -600,19 +599,9 @@ impl Node {
600599 panic ! ( "Failed to process events" ) ;
601600 } ) ;
602601 log_debug ! ( background_stop_logger, "Events processing stopped." , ) ;
603-
604- match event_handling_stopped_sender. send ( ( ) ) {
605- Ok ( _) => ( ) ,
606- Err ( e) => {
607- log_error ! (
608- background_stop_logger,
609- "Failed to send 'events handling stopped' signal. This should never happen: {}" ,
610- e
611- ) ;
612- debug_assert ! ( false ) ;
613- } ,
614- }
615602 } ) ;
603+ debug_assert ! ( self . background_processor_task. lock( ) . unwrap( ) . is_none( ) ) ;
604+ * self . background_processor_task . lock ( ) . unwrap ( ) = Some ( handle) ;
616605
617606 if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
618607 let mut stop_liquidity_handler = self . stop_sender . subscribe ( ) ;
@@ -669,39 +658,42 @@ impl Node {
669658 // Disconnect all peers.
670659 self . peer_manager . disconnect_all_peers ( ) ;
671660
672- // Wait until event handling stopped, at least until a timeout is reached.
673- let event_handling_stopped_logger = Arc :: clone ( & self . logger ) ;
674- let mut event_handling_stopped_receiver = self . event_handling_stopped_sender . subscribe ( ) ;
661+ // Stop any runtime-dependant chain sources.
662+ self . chain_source . stop ( ) ;
675663
676- let timeout_res = tokio:: task:: block_in_place ( move || {
677- runtime. block_on ( async {
678- tokio:: time:: timeout (
679- Duration :: from_secs ( LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS ) ,
680- event_handling_stopped_receiver. changed ( ) ,
681- )
682- . await
683- } )
684- } ) ;
664+ // Wait until background processing stopped, at least until a timeout is reached.
665+ if let Some ( background_processor_task) =
666+ self . background_processor_task . lock ( ) . unwrap ( ) . take ( )
667+ {
668+ let abort_handle = background_processor_task. abort_handle ( ) ;
669+ let timeout_res = tokio:: task:: block_in_place ( move || {
670+ runtime. block_on ( async {
671+ tokio:: time:: timeout (
672+ Duration :: from_secs ( LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS ) ,
673+ background_processor_task,
674+ )
675+ . await
676+ } )
677+ } ) ;
685678
686- match timeout_res {
687- Ok ( stop_res) => match stop_res {
688- Ok ( ( ) ) => { } ,
679+ match timeout_res {
680+ Ok ( stop_res) => match stop_res {
681+ Ok ( ( ) ) => { } ,
682+ Err ( e) => {
683+ abort_handle. abort ( ) ;
684+ log_error ! (
685+ self . logger,
686+ "Stopping event handling failed. This should never happen: {}" ,
687+ e
688+ ) ;
689+ panic ! ( "Stopping event handling failed. This should never happen." ) ;
690+ } ,
691+ } ,
689692 Err ( e) => {
690- log_error ! (
691- event_handling_stopped_logger,
692- "Stopping event handling failed. This should never happen: {}" ,
693- e
694- ) ;
695- panic ! ( "Stopping event handling failed. This should never happen." ) ;
693+ abort_handle. abort ( ) ;
694+ log_error ! ( self . logger, "Stopping event handling timed out: {}" , e) ;
696695 } ,
697- } ,
698- Err ( e) => {
699- log_error ! (
700- event_handling_stopped_logger,
701- "Stopping event handling timed out: {}" ,
702- e
703- ) ;
704- } ,
696+ }
705697 }
706698
707699 #[ cfg( tokio_unstable) ]
0 commit comments