Skip to content

Commit f12f9e7

Browse files
committed
Wait on all background tasks to finish (or abort)
Previously, we'd only wait for the background processor tasks to successfully finish. It turned out that this could lead to races when the other background tasks took too long to shutdown. Here, we attempt to wait on all background tasks shutting down for a bit, before moving on.
1 parent 8ce139a commit f12f9e7

File tree

3 files changed

+92
-30
lines changed

3 files changed

+92
-30
lines changed

src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,11 +1496,13 @@ fn build_with_store_internal(
14961496

14971497
let (stop_sender, _) = tokio::sync::watch::channel(());
14981498
let background_processor_task = Mutex::new(None);
1499+
let background_tasks = Mutex::new(None);
14991500

15001501
Ok(Node {
15011502
runtime,
15021503
stop_sender,
15031504
background_processor_task,
1505+
background_tasks,
15041506
config,
15051507
wallet,
15061508
chain_source,

src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 10;
7373
// The timeout after which we give up waiting on LDK's event handler to exit on shutdown.
7474
pub(crate) const LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS: u64 = 30;
7575

76+
// The timeout after which we give up waiting on a background task to exit on shutdown.
77+
pub(crate) const BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS: u64 = 5;
78+
7679
// The timeout after which we abort a fee rate cache update operation.
7780
pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5;
7881

src/lib.rs

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ pub use builder::NodeBuilder as Builder;
128128
use chain::ChainSource;
129129
use config::{
130130
default_user_config, may_announce_channel, ChannelConfig, Config,
131-
LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL,
132-
RGS_SYNC_INTERVAL,
131+
BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS,
132+
NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
133133
};
134134
use connection::ConnectionManager;
135135
use event::{EventHandler, EventQueue};
@@ -180,6 +180,7 @@ pub struct Node {
180180
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
181181
stop_sender: tokio::sync::watch::Sender<()>,
182182
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
183+
background_tasks: Mutex<Option<tokio::task::JoinSet<()>>>,
183184
config: Arc<Config>,
184185
wallet: Arc<Wallet>,
185186
chain_source: Arc<ChainSource>,
@@ -233,6 +234,9 @@ impl Node {
233234
return Err(Error::AlreadyRunning);
234235
}
235236

237+
let mut background_tasks = tokio::task::JoinSet::new();
238+
let runtime_handle = runtime.handle();
239+
236240
log_info!(
237241
self.logger,
238242
"Starting up LDK Node with node ID {} on network: {}",
@@ -259,19 +263,27 @@ impl Node {
259263
let sync_cman = Arc::clone(&self.channel_manager);
260264
let sync_cmon = Arc::clone(&self.chain_monitor);
261265
let sync_sweeper = Arc::clone(&self.output_sweeper);
262-
runtime.spawn(async move {
263-
chain_source
264-
.continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper)
265-
.await;
266-
});
266+
background_tasks.spawn_on(
267+
async move {
268+
chain_source
269+
.continuously_sync_wallets(
270+
stop_sync_receiver,
271+
sync_cman,
272+
sync_cmon,
273+
sync_sweeper,
274+
)
275+
.await;
276+
},
277+
runtime_handle,
278+
);
267279

268280
if self.gossip_source.is_rgs() {
269281
let gossip_source = Arc::clone(&self.gossip_source);
270282
let gossip_sync_store = Arc::clone(&self.kv_store);
271283
let gossip_sync_logger = Arc::clone(&self.logger);
272284
let gossip_node_metrics = Arc::clone(&self.node_metrics);
273285
let mut stop_gossip_sync = self.stop_sender.subscribe();
274-
runtime.spawn(async move {
286+
background_tasks.spawn_on(async move {
275287
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
276288
loop {
277289
tokio::select! {
@@ -312,7 +324,7 @@ impl Node {
312324
}
313325
}
314326
}
315-
});
327+
}, runtime_handle);
316328
}
317329

318330
if let Some(listening_addresses) = &self.config.listening_addresses {
@@ -338,7 +350,7 @@ impl Node {
338350
bind_addrs.extend(resolved_address);
339351
}
340352

341-
runtime.spawn(async move {
353+
background_tasks.spawn_on(async move {
342354
{
343355
let listener =
344356
tokio::net::TcpListener::bind(&*bind_addrs).await
@@ -357,7 +369,7 @@ impl Node {
357369
_ = stop_listen.changed() => {
358370
log_debug!(
359371
listening_logger,
360-
"Stopping listening to inbound connections.",
372+
"Stopping listening to inbound connections."
361373
);
362374
break;
363375
}
@@ -376,7 +388,7 @@ impl Node {
376388
}
377389

378390
listening_indicator.store(false, Ordering::Release);
379-
});
391+
}, runtime_handle);
380392
}
381393

382394
// Regularly reconnect to persisted peers.
@@ -385,15 +397,15 @@ impl Node {
385397
let connect_logger = Arc::clone(&self.logger);
386398
let connect_peer_store = Arc::clone(&self.peer_store);
387399
let mut stop_connect = self.stop_sender.subscribe();
388-
runtime.spawn(async move {
400+
background_tasks.spawn_on(async move {
389401
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
390402
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
391403
loop {
392404
tokio::select! {
393405
_ = stop_connect.changed() => {
394406
log_debug!(
395407
connect_logger,
396-
"Stopping reconnecting known peers.",
408+
"Stopping reconnecting known peers."
397409
);
398410
return;
399411
}
@@ -413,7 +425,7 @@ impl Node {
413425
}
414426
}
415427
}
416-
});
428+
}, runtime_handle);
417429

418430
// Regularly broadcast node announcements.
419431
let bcast_cm = Arc::clone(&self.channel_manager);
@@ -425,7 +437,7 @@ impl Node {
425437
let mut stop_bcast = self.stop_sender.subscribe();
426438
let node_alias = self.config.node_alias.clone();
427439
if may_announce_channel(&self.config).is_ok() {
428-
runtime.spawn(async move {
440+
background_tasks.spawn_on(async move {
429441
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
430442
#[cfg(not(test))]
431443
let mut interval = tokio::time::interval(Duration::from_secs(30));
@@ -496,7 +508,7 @@ impl Node {
496508
}
497509
}
498510
}
499-
});
511+
}, runtime_handle);
500512
}
501513

502514
let mut stop_tx_bcast = self.stop_sender.subscribe();
@@ -605,24 +617,30 @@ impl Node {
605617
let mut stop_liquidity_handler = self.stop_sender.subscribe();
606618
let liquidity_handler = Arc::clone(&liquidity_source);
607619
let liquidity_logger = Arc::clone(&self.logger);
608-
runtime.spawn(async move {
609-
loop {
610-
tokio::select! {
611-
_ = stop_liquidity_handler.changed() => {
612-
log_debug!(
613-
liquidity_logger,
614-
"Stopping processing liquidity events.",
615-
);
616-
return;
620+
background_tasks.spawn_on(
621+
async move {
622+
loop {
623+
tokio::select! {
624+
_ = stop_liquidity_handler.changed() => {
625+
log_debug!(
626+
liquidity_logger,
627+
"Stopping processing liquidity events.",
628+
);
629+
return;
630+
}
631+
_ = liquidity_handler.handle_next_event() => {}
617632
}
618-
_ = liquidity_handler.handle_next_event() => {}
619633
}
620-
}
621-
});
634+
},
635+
runtime_handle,
636+
);
622637
}
623638

624639
*runtime_lock = Some(runtime);
625640

641+
debug_assert!(self.background_tasks.lock().unwrap().is_none());
642+
*self.background_tasks.lock().unwrap() = Some(background_tasks);
643+
626644
log_info!(self.logger, "Startup complete.");
627645
Ok(())
628646
}
@@ -661,13 +679,52 @@ impl Node {
661679
self.chain_source.stop();
662680
log_debug!(self.logger, "Stopped chain sources.");
663681

682+
// Wait until all background tasks (mod LDK's background processor) are done.
683+
let runtime_handle = runtime.handle();
684+
if let Some(mut tasks) = self.background_tasks.lock().unwrap().take() {
685+
tokio::task::block_in_place(move || {
686+
runtime_handle.block_on(async {
687+
loop {
688+
let timeout_fut = tokio::time::timeout(
689+
Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS),
690+
tasks.join_next_with_id(),
691+
);
692+
match timeout_fut.await {
693+
Ok(Some(Ok((id, _)))) => {
694+
log_trace!(self.logger, "Stopped background task with id {}", id);
695+
},
696+
Ok(Some(Err(e))) => {
697+
tasks.abort_all();
698+
log_trace!(self.logger, "Stopping background task failed: {}", e);
699+
break;
700+
},
701+
Ok(None) => {
702+
log_debug!(self.logger, "Stopped all background tasks");
703+
break;
704+
},
705+
Err(e) => {
706+
tasks.abort_all();
707+
log_error!(
708+
self.logger,
709+
"Stopping background task timed out: {}",
710+
e
711+
);
712+
break;
713+
},
714+
}
715+
}
716+
})
717+
});
718+
}
719+
664720
// Wait until background processing stopped, at least until a timeout is reached.
665721
if let Some(background_processor_task) =
666722
self.background_processor_task.lock().unwrap().take()
667723
{
724+
let runtime_handle = runtime.handle();
668725
let abort_handle = background_processor_task.abort_handle();
669726
let timeout_res = tokio::task::block_in_place(move || {
670-
runtime.block_on(async {
727+
runtime_handle.block_on(async {
671728
tokio::time::timeout(
672729
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
673730
background_processor_task,

0 commit comments

Comments
 (0)