diff --git a/src/builder.rs b/src/builder.rs index 79982b4e3..dbb7096dc 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1495,12 +1495,16 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); - let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); + let background_processor_task = Mutex::new(None); + let background_tasks = Mutex::new(None); + let cancellable_background_tasks = Mutex::new(None); Ok(Node { runtime, stop_sender, - event_handling_stopped_sender, + background_processor_task, + background_tasks, + cancellable_background_tasks, config, wallet, chain_source, diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 6e62d9c08..9882e652b 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -40,7 +40,7 @@ use std::time::{Duration, Instant}; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3; -const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 20; +const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 10; pub(crate) struct ElectrumRuntimeClient { electrum_client: Arc, diff --git a/src/config.rs b/src/config.rs index 4a39c1b56..6b02015fe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,10 +65,16 @@ pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60 pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; // The timeout after which we abort a wallet syncing operation. -pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; +pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 20; // The timeout after which we abort a wallet syncing operation. -pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; +pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 10; + +// The timeout after which we give up waiting on LDK's event handler to exit on shutdown. +pub(crate) const LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS: u64 = 30; + +// The timeout after which we give up waiting on a background task to exit on shutdown. +pub(crate) const BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort a fee rate cache update operation. pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; diff --git a/src/lib.rs b/src/lib.rs index c3bfe16d8..868354c1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,8 +127,9 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, - PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, + BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, + NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -178,7 +179,9 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc>>>, stop_sender: tokio::sync::watch::Sender<()>, - event_handling_stopped_sender: tokio::sync::watch::Sender<()>, + background_processor_task: Mutex>>, + background_tasks: Mutex>>, + cancellable_background_tasks: Mutex>>, config: Arc, wallet: Arc, chain_source: Arc, @@ -232,6 +235,10 @@ impl Node { return Err(Error::AlreadyRunning); } + let mut background_tasks = tokio::task::JoinSet::new(); + let mut cancellable_background_tasks = tokio::task::JoinSet::new(); + let runtime_handle = runtime.handle(); + log_info!( self.logger, "Starting up LDK Node with node ID {} on network: {}", @@ -258,11 +265,19 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); - runtime.spawn(async move { - chain_source - .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) - .await; - }); + background_tasks.spawn_on( + async move { + chain_source + .continuously_sync_wallets( + stop_sync_receiver, + sync_cman, + sync_cmon, + sync_sweeper, + ) + .await; + }, + runtime_handle, + ); if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); @@ -270,7 +285,7 @@ impl Node { let gossip_sync_logger = Arc::clone(&self.logger); let gossip_node_metrics = Arc::clone(&self.node_metrics); let mut stop_gossip_sync = self.stop_sender.subscribe(); - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); loop { tokio::select! { @@ -311,7 +326,7 @@ impl Node { } } } - }); + }, runtime_handle); } if let Some(listening_addresses) = &self.config.listening_addresses { @@ -337,7 +352,7 @@ impl Node { bind_addrs.extend(resolved_address); } - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { { let listener = tokio::net::TcpListener::bind(&*bind_addrs).await @@ -356,7 +371,7 @@ impl Node { _ = stop_listen.changed() => { log_debug!( listening_logger, - "Stopping listening to inbound connections.", + "Stopping listening to inbound connections." ); break; } @@ -375,7 +390,7 @@ impl Node { } listening_indicator.store(false, Ordering::Release); - }); + }, runtime_handle); } // Regularly reconnect to persisted peers. @@ -384,7 +399,7 @@ impl Node { let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); let mut stop_connect = self.stop_sender.subscribe(); - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { @@ -392,7 +407,7 @@ impl Node { _ = stop_connect.changed() => { log_debug!( connect_logger, - "Stopping reconnecting known peers.", + "Stopping reconnecting known peers." ); return; } @@ -412,7 +427,7 @@ impl Node { } } } - }); + }, runtime_handle); // Regularly broadcast node announcements. let bcast_cm = Arc::clone(&self.channel_manager); @@ -424,7 +439,7 @@ impl Node { let mut stop_bcast = self.stop_sender.subscribe(); let node_alias = self.config.node_alias.clone(); if may_announce_channel(&self.config).is_ok() { - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. #[cfg(not(test))] let mut interval = tokio::time::interval(Duration::from_secs(30)); @@ -495,7 +510,7 @@ impl Node { } } } - }); + }, runtime_handle); } let mut stop_tx_bcast = self.stop_sender.subscribe(); @@ -576,9 +591,7 @@ impl Node { }) }; - let background_stop_logger = Arc::clone(&self.logger); - let event_handling_stopped_sender = self.event_handling_stopped_sender.clone(); - runtime.spawn(async move { + let handle = runtime.spawn(async move { process_events_async( background_persister, |e| background_event_handler.handle_event(e), @@ -598,43 +611,41 @@ impl Node { log_error!(background_error_logger, "Failed to process events: {}", e); panic!("Failed to process events"); }); - log_debug!(background_stop_logger, "Events processing stopped.",); - - match event_handling_stopped_sender.send(()) { - Ok(_) => (), - Err(e) => { - log_error!( - background_stop_logger, - "Failed to send 'events handling stopped' signal. This should never happen: {}", - e - ); - debug_assert!(false); - }, - } }); + debug_assert!(self.background_processor_task.lock().unwrap().is_none()); + *self.background_processor_task.lock().unwrap() = Some(handle); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); let liquidity_handler = Arc::clone(&liquidity_source); let liquidity_logger = Arc::clone(&self.logger); - runtime.spawn(async move { - loop { - tokio::select! { - _ = stop_liquidity_handler.changed() => { - log_debug!( - liquidity_logger, - "Stopping processing liquidity events.", - ); - return; + background_tasks.spawn_on( + async move { + loop { + tokio::select! { + _ = stop_liquidity_handler.changed() => { + log_debug!( + liquidity_logger, + "Stopping processing liquidity events.", + ); + return; + } + _ = liquidity_handler.handle_next_event() => {} } - _ = liquidity_handler.handle_next_event() => {} } - } - }); + }, + runtime_handle, + ); } *runtime_lock = Some(runtime); + debug_assert!(self.background_tasks.lock().unwrap().is_none()); + *self.background_tasks.lock().unwrap() = Some(background_tasks); + + debug_assert!(self.cancellable_background_tasks.lock().unwrap().is_none()); + *self.cancellable_background_tasks.lock().unwrap() = Some(cancellable_background_tasks); + log_info!(self.logger, "Startup complete."); Ok(()) } @@ -654,7 +665,7 @@ impl Node { // Stop the runtime. match self.stop_sender.send(()) { - Ok(_) => (), + Ok(_) => log_trace!(self.logger, "Sent shutdown signal to background tasks."), Err(e) => { log_error!( self.logger, @@ -665,46 +676,101 @@ impl Node { }, } + // Cancel cancellable background tasks + if let Some(mut tasks) = self.cancellable_background_tasks.lock().unwrap().take() { + let runtime_2 = Arc::clone(&runtime); + tasks.abort_all(); + tokio::task::block_in_place(move || { + runtime_2.block_on(async { while let Some(_) = tasks.join_next().await {} }) + }); + } else { + debug_assert!(false, "Expected some cancellable background tasks"); + }; + // Disconnect all peers. self.peer_manager.disconnect_all_peers(); + log_debug!(self.logger, "Disconnected all network peers."); - // Wait until event handling stopped, at least until a timeout is reached. - let event_handling_stopped_logger = Arc::clone(&self.logger); - let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); - - // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow - // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We - // should drop this considerably post upgrading to BDK 1.0. - let timeout_res = tokio::task::block_in_place(move || { - runtime.block_on(async { - tokio::time::timeout( - Duration::from_secs(100), - event_handling_stopped_receiver.changed(), - ) - .await - }) - }); + // Stop any runtime-dependant chain sources. + self.chain_source.stop(); + log_debug!(self.logger, "Stopped chain sources."); + + // Wait until non-cancellable background tasks (mod LDK's background processor) are done. + let runtime_3 = Arc::clone(&runtime); + if let Some(mut tasks) = self.background_tasks.lock().unwrap().take() { + tokio::task::block_in_place(move || { + runtime_3.block_on(async { + loop { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS), + tasks.join_next_with_id(), + ); + match timeout_fut.await { + Ok(Some(Ok((id, _)))) => { + log_trace!(self.logger, "Stopped background task with id {}", id); + }, + Ok(Some(Err(e))) => { + tasks.abort_all(); + log_trace!(self.logger, "Stopping background task failed: {}", e); + break; + }, + Ok(None) => { + log_debug!(self.logger, "Stopped all background tasks"); + break; + }, + Err(e) => { + tasks.abort_all(); + log_error!( + self.logger, + "Stopping background task timed out: {}", + e + ); + break; + }, + } + } + }) + }); + } else { + debug_assert!(false, "Expected some background tasks"); + }; + + // Wait until background processing stopped, at least until a timeout is reached. + if let Some(background_processor_task) = + self.background_processor_task.lock().unwrap().take() + { + let abort_handle = background_processor_task.abort_handle(); + let timeout_res = tokio::task::block_in_place(move || { + runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), + background_processor_task, + ) + .await + }) + }); - match timeout_res { - Ok(stop_res) => match stop_res { - Ok(()) => {}, + match timeout_res { + Ok(stop_res) => match stop_res { + Ok(()) => log_debug!(self.logger, "Stopped background processing of events."), + Err(e) => { + abort_handle.abort(); + log_error!( + self.logger, + "Stopping event handling failed. This should never happen: {}", + e + ); + panic!("Stopping event handling failed. This should never happen."); + }, + }, Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling failed. This should never happen: {}", - e - ); - panic!("Stopping event handling failed. This should never happen."); + abort_handle.abort(); + log_error!(self.logger, "Stopping event handling timed out: {}", e); }, - }, - Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling timed out: {}", - e - ); - }, - } + } + } else { + debug_assert!(false, "Expected a background processing task"); + }; #[cfg(tokio_unstable)] { diff --git a/tests/integration_tests_cln.rs b/tests/integration_tests_cln.rs index b6300576c..f77311fb2 100644 --- a/tests/integration_tests_cln.rs +++ b/tests/integration_tests_cln.rs @@ -64,7 +64,17 @@ fn test_cln() { // Setup CLN let sock = "/tmp/lightning-rpc"; let cln_client = LightningRPC::new(&sock); - let cln_info = cln_client.getinfo().unwrap(); + let cln_info = { + loop { + let info = cln_client.getinfo().unwrap(); + // Wait for CLN to sync block height before channel open. + // Prevents crash due to unset blockheight (see LDK Node issue #527). + if info.blockheight > 0 { + break info; + } + std::thread::sleep(std::time::Duration::from_millis(250)); + } + }; let cln_node_id = PublicKey::from_str(&cln_info.id).unwrap(); let cln_address: SocketAddress = match cln_info.binding.first().unwrap() { NetworkAddress::Ipv4 { address, port } => { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index db48eca23..b21387521 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1381,3 +1381,14 @@ fn facade_logging() { validate_log_entry(entry); } } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn drop_in_async_context() { + let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + let seed_bytes = vec![42u8; 64]; + + let config = random_config(true); + let node = setup_node(&chain_source, config, Some(seed_bytes)); + node.stop().unwrap(); +}