Skip to content

Commit 1c02114

Browse files
authored
Merge pull request #619 from tnull/2025-08-background-task-tracking
Refactor task tracking to ensure we await/abort any spawned tasks
2 parents 217b398 + 90a4fe1 commit 1c02114

File tree

5 files changed

+191
-175
lines changed

5 files changed

+191
-175
lines changed

src/builder.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -668,9 +668,9 @@ impl NodeBuilder {
668668
let logger = setup_logger(&self.log_writer_config, &self.config)?;
669669

670670
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
671-
Arc::new(Runtime::with_handle(handle.clone()))
671+
Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger)))
672672
} else {
673-
Arc::new(Runtime::new().map_err(|e| {
673+
Arc::new(Runtime::new(Arc::clone(&logger)).map_err(|e| {
674674
log_error!(logger, "Failed to setup tokio runtime: {}", e);
675675
BuildError::RuntimeSetupFailed
676676
})?)
@@ -715,9 +715,9 @@ impl NodeBuilder {
715715
let logger = setup_logger(&self.log_writer_config, &self.config)?;
716716

717717
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
718-
Arc::new(Runtime::with_handle(handle.clone()))
718+
Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger)))
719719
} else {
720-
Arc::new(Runtime::new().map_err(|e| {
720+
Arc::new(Runtime::new(Arc::clone(&logger)).map_err(|e| {
721721
log_error!(logger, "Failed to setup tokio runtime: {}", e);
722722
BuildError::RuntimeSetupFailed
723723
})?)
@@ -1668,18 +1668,11 @@ fn build_with_store_internal(
16681668
};
16691669

16701670
let (stop_sender, _) = tokio::sync::watch::channel(());
1671-
let background_processor_task = Mutex::new(None);
1672-
let background_tasks = Mutex::new(None);
1673-
let cancellable_background_tasks = Mutex::new(None);
1674-
16751671
let is_running = Arc::new(RwLock::new(false));
16761672

16771673
Ok(Node {
16781674
runtime,
16791675
stop_sender,
1680-
background_processor_task,
1681-
background_tasks,
1682-
cancellable_background_tasks,
16831676
config,
16841677
wallet,
16851678
chain_source,

src/event.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ where
10591059
forwarding_channel_manager.process_pending_htlc_forwards();
10601060
};
10611061

1062-
self.runtime.spawn(future);
1062+
self.runtime.spawn_cancellable_background_task(future);
10631063
},
10641064
LdkEvent::SpendableOutputs { outputs, channel_id } => {
10651065
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
@@ -1441,7 +1441,7 @@ where
14411441
}
14421442
}
14431443
};
1444-
self.runtime.spawn(future);
1444+
self.runtime.spawn_cancellable_background_task(future);
14451445
},
14461446
LdkEvent::BumpTransaction(bte) => {
14471447
match bte {

src/gossip.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,6 @@ impl RuntimeSpawner {
144144

145145
impl FutureSpawner for RuntimeSpawner {
146146
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
147-
self.runtime.spawn(future);
147+
self.runtime.spawn_cancellable_background_task(future);
148148
}
149149
}

src/lib.rs

Lines changed: 36 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,8 @@ pub use builder::NodeBuilder as Builder;
128128

129129
use chain::ChainSource;
130130
use config::{
131-
default_user_config, may_announce_channel, ChannelConfig, Config,
132-
BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS,
133-
NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
131+
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
132+
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
134133
};
135134
use connection::ConnectionManager;
136135
use event::{EventHandler, EventQueue};
@@ -181,9 +180,6 @@ uniffi::include_scaffolding!("ldk_node");
181180
pub struct Node {
182181
runtime: Arc<Runtime>,
183182
stop_sender: tokio::sync::watch::Sender<()>,
184-
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
185-
background_tasks: Mutex<Option<tokio::task::JoinSet<()>>>,
186-
cancellable_background_tasks: Mutex<Option<tokio::task::JoinSet<()>>>,
187183
config: Arc<Config>,
188184
wallet: Arc<Wallet>,
189185
chain_source: Arc<ChainSource>,
@@ -226,10 +222,6 @@ impl Node {
226222
return Err(Error::AlreadyRunning);
227223
}
228224

229-
let mut background_tasks = tokio::task::JoinSet::new();
230-
let mut cancellable_background_tasks = tokio::task::JoinSet::new();
231-
let runtime_handle = self.runtime.handle();
232-
233225
log_info!(
234226
self.logger,
235227
"Starting up LDK Node with node ID {} on network: {}",
@@ -253,27 +245,19 @@ impl Node {
253245
let sync_cman = Arc::clone(&self.channel_manager);
254246
let sync_cmon = Arc::clone(&self.chain_monitor);
255247
let sync_sweeper = Arc::clone(&self.output_sweeper);
256-
background_tasks.spawn_on(
257-
async move {
258-
chain_source
259-
.continuously_sync_wallets(
260-
stop_sync_receiver,
261-
sync_cman,
262-
sync_cmon,
263-
sync_sweeper,
264-
)
265-
.await;
266-
},
267-
runtime_handle,
268-
);
248+
self.runtime.spawn_background_task(async move {
249+
chain_source
250+
.continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper)
251+
.await;
252+
});
269253

270254
if self.gossip_source.is_rgs() {
271255
let gossip_source = Arc::clone(&self.gossip_source);
272256
let gossip_sync_store = Arc::clone(&self.kv_store);
273257
let gossip_sync_logger = Arc::clone(&self.logger);
274258
let gossip_node_metrics = Arc::clone(&self.node_metrics);
275259
let mut stop_gossip_sync = self.stop_sender.subscribe();
276-
cancellable_background_tasks.spawn_on(async move {
260+
self.runtime.spawn_cancellable_background_task(async move {
277261
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
278262
loop {
279263
tokio::select! {
@@ -314,7 +298,7 @@ impl Node {
314298
}
315299
}
316300
}
317-
}, runtime_handle);
301+
});
318302
}
319303

320304
if let Some(listening_addresses) = &self.config.listening_addresses {
@@ -340,7 +324,7 @@ impl Node {
340324
bind_addrs.extend(resolved_address);
341325
}
342326

343-
cancellable_background_tasks.spawn_on(async move {
327+
self.runtime.spawn_cancellable_background_task(async move {
344328
{
345329
let listener =
346330
tokio::net::TcpListener::bind(&*bind_addrs).await
@@ -378,7 +362,7 @@ impl Node {
378362
}
379363

380364
listening_indicator.store(false, Ordering::Release);
381-
}, runtime_handle);
365+
});
382366
}
383367

384368
// Regularly reconnect to persisted peers.
@@ -387,7 +371,7 @@ impl Node {
387371
let connect_logger = Arc::clone(&self.logger);
388372
let connect_peer_store = Arc::clone(&self.peer_store);
389373
let mut stop_connect = self.stop_sender.subscribe();
390-
cancellable_background_tasks.spawn_on(async move {
374+
self.runtime.spawn_cancellable_background_task(async move {
391375
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
392376
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
393377
loop {
@@ -415,7 +399,7 @@ impl Node {
415399
}
416400
}
417401
}
418-
}, runtime_handle);
402+
});
419403

420404
// Regularly broadcast node announcements.
421405
let bcast_cm = Arc::clone(&self.channel_manager);
@@ -427,7 +411,7 @@ impl Node {
427411
let mut stop_bcast = self.stop_sender.subscribe();
428412
let node_alias = self.config.node_alias.clone();
429413
if may_announce_channel(&self.config).is_ok() {
430-
cancellable_background_tasks.spawn_on(async move {
414+
self.runtime.spawn_cancellable_background_task(async move {
431415
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
432416
#[cfg(not(test))]
433417
let mut interval = tokio::time::interval(Duration::from_secs(30));
@@ -498,15 +482,14 @@ impl Node {
498482
}
499483
}
500484
}
501-
}, runtime_handle);
485+
});
502486
}
503487

504488
let stop_tx_bcast = self.stop_sender.subscribe();
505489
let chain_source = Arc::clone(&self.chain_source);
506-
cancellable_background_tasks.spawn_on(
507-
async move { chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await },
508-
runtime_handle,
509-
);
490+
self.runtime.spawn_cancellable_background_task(async move {
491+
chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await
492+
});
510493

511494
let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new(
512495
Arc::clone(&self.tx_broadcaster),
@@ -563,7 +546,7 @@ impl Node {
563546
})
564547
};
565548

566-
let handle = self.runtime.spawn(async move {
549+
self.runtime.spawn_background_processor_task(async move {
567550
process_events_async(
568551
background_persister,
569552
|e| background_event_handler.handle_event(e),
@@ -584,38 +567,27 @@ impl Node {
584567
panic!("Failed to process events");
585568
});
586569
});
587-
debug_assert!(self.background_processor_task.lock().unwrap().is_none());
588-
*self.background_processor_task.lock().unwrap() = Some(handle);
589570

590571
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
591572
let mut stop_liquidity_handler = self.stop_sender.subscribe();
592573
let liquidity_handler = Arc::clone(&liquidity_source);
593574
let liquidity_logger = Arc::clone(&self.logger);
594-
background_tasks.spawn_on(
595-
async move {
596-
loop {
597-
tokio::select! {
598-
_ = stop_liquidity_handler.changed() => {
599-
log_debug!(
600-
liquidity_logger,
601-
"Stopping processing liquidity events.",
602-
);
603-
return;
604-
}
605-
_ = liquidity_handler.handle_next_event() => {}
575+
self.runtime.spawn_background_task(async move {
576+
loop {
577+
tokio::select! {
578+
_ = stop_liquidity_handler.changed() => {
579+
log_debug!(
580+
liquidity_logger,
581+
"Stopping processing liquidity events.",
582+
);
583+
return;
606584
}
585+
_ = liquidity_handler.handle_next_event() => {}
607586
}
608-
},
609-
runtime_handle,
610-
);
587+
}
588+
});
611589
}
612590

613-
debug_assert!(self.background_tasks.lock().unwrap().is_none());
614-
*self.background_tasks.lock().unwrap() = Some(background_tasks);
615-
616-
debug_assert!(self.cancellable_background_tasks.lock().unwrap().is_none());
617-
*self.cancellable_background_tasks.lock().unwrap() = Some(cancellable_background_tasks);
618-
619591
log_info!(self.logger, "Startup complete.");
620592
*is_running_lock = true;
621593
Ok(())
@@ -649,15 +621,7 @@ impl Node {
649621
}
650622

651623
// Cancel cancellable background tasks
652-
if let Some(mut tasks) = self.cancellable_background_tasks.lock().unwrap().take() {
653-
let runtime_handle = self.runtime.handle();
654-
tasks.abort_all();
655-
tokio::task::block_in_place(move || {
656-
runtime_handle.block_on(async { while let Some(_) = tasks.join_next().await {} })
657-
});
658-
} else {
659-
debug_assert!(false, "Expected some cancellable background tasks");
660-
};
624+
self.runtime.abort_cancellable_background_tasks();
661625

662626
// Disconnect all peers.
663627
self.peer_manager.disconnect_all_peers();
@@ -668,91 +632,13 @@ impl Node {
668632
log_debug!(self.logger, "Stopped chain sources.");
669633

670634
// Wait until non-cancellable background tasks (mod LDK's background processor) are done.
671-
let runtime_handle = self.runtime.handle();
672-
if let Some(mut tasks) = self.background_tasks.lock().unwrap().take() {
673-
tokio::task::block_in_place(move || {
674-
runtime_handle.block_on(async {
675-
loop {
676-
let timeout_fut = tokio::time::timeout(
677-
Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS),
678-
tasks.join_next_with_id(),
679-
);
680-
match timeout_fut.await {
681-
Ok(Some(Ok((id, _)))) => {
682-
log_trace!(self.logger, "Stopped background task with id {}", id);
683-
},
684-
Ok(Some(Err(e))) => {
685-
tasks.abort_all();
686-
log_trace!(self.logger, "Stopping background task failed: {}", e);
687-
break;
688-
},
689-
Ok(None) => {
690-
log_debug!(self.logger, "Stopped all background tasks");
691-
break;
692-
},
693-
Err(e) => {
694-
tasks.abort_all();
695-
log_error!(
696-
self.logger,
697-
"Stopping background task timed out: {}",
698-
e
699-
);
700-
break;
701-
},
702-
}
703-
}
704-
})
705-
});
706-
} else {
707-
debug_assert!(false, "Expected some background tasks");
708-
};
635+
self.runtime.wait_on_background_tasks();
709636

710-
// Wait until background processing stopped, at least until a timeout is reached.
711-
if let Some(background_processor_task) =
712-
self.background_processor_task.lock().unwrap().take()
713-
{
714-
let abort_handle = background_processor_task.abort_handle();
715-
let timeout_res = tokio::task::block_in_place(move || {
716-
self.runtime.block_on(async {
717-
tokio::time::timeout(
718-
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
719-
background_processor_task,
720-
)
721-
.await
722-
})
723-
});
724-
725-
match timeout_res {
726-
Ok(stop_res) => match stop_res {
727-
Ok(()) => log_debug!(self.logger, "Stopped background processing of events."),
728-
Err(e) => {
729-
abort_handle.abort();
730-
log_error!(
731-
self.logger,
732-
"Stopping event handling failed. This should never happen: {}",
733-
e
734-
);
735-
panic!("Stopping event handling failed. This should never happen.");
736-
},
737-
},
738-
Err(e) => {
739-
abort_handle.abort();
740-
log_error!(self.logger, "Stopping event handling timed out: {}", e);
741-
},
742-
}
743-
} else {
744-
debug_assert!(false, "Expected a background processing task");
745-
};
637+
// Finally, wait until background processing stopped, at least until a timeout is reached.
638+
self.runtime.wait_on_background_processor_task();
746639

747640
#[cfg(tokio_unstable)]
748-
{
749-
let runtime_handle = self.runtime.handle();
750-
log_trace!(
751-
self.logger,
752-
"Active runtime tasks left prior to shutdown: {}",
753-
runtime_handle.metrics().active_tasks_count()
754-
);
755-
}
641+
self.runtime.log_metrics();
756642

757643
log_info!(self.logger, "Shutdown complete.");
758644
*is_running_lock = false;

0 commit comments

Comments
 (0)