Skip to content

Commit 8f8d7e5

Browse files
committed
Await on the background processing task's JoinHandle
Previously, we used to a channel to indicate that the background processor task has been stopped. Here, we rather just await the task's `JoinHandle` which is more robust in that it avoids a race condition.
1 parent 75dd1dc commit 8f8d7e5

File tree

2 files changed

+38
-46
lines changed

2 files changed

+38
-46
lines changed

src/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,12 +1495,12 @@ fn build_with_store_internal(
14951495
};
14961496

14971497
let (stop_sender, _) = tokio::sync::watch::channel(());
1498-
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
1498+
let background_processor_task = Mutex::new(None);
14991499

15001500
Ok(Node {
15011501
runtime,
15021502
stop_sender,
1503-
event_handling_stopped_sender,
1503+
background_processor_task,
15041504
config,
15051505
wallet,
15061506
chain_source,

src/lib.rs

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ uniffi::include_scaffolding!("ldk_node");
179179
pub struct Node {
180180
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
181181
stop_sender: tokio::sync::watch::Sender<()>,
182-
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
182+
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
183183
config: Arc<Config>,
184184
wallet: Arc<Wallet>,
185185
chain_source: Arc<ChainSource>,
@@ -578,8 +578,7 @@ impl Node {
578578
};
579579

580580
let background_stop_logger = Arc::clone(&self.logger);
581-
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
582-
runtime.spawn(async move {
581+
let handle = runtime.spawn(async move {
583582
process_events_async(
584583
background_persister,
585584
|e| background_event_handler.handle_event(e),
@@ -600,19 +599,9 @@ impl Node {
600599
panic!("Failed to process events");
601600
});
602601
log_debug!(background_stop_logger, "Events processing stopped.",);
603-
604-
match event_handling_stopped_sender.send(()) {
605-
Ok(_) => (),
606-
Err(e) => {
607-
log_error!(
608-
background_stop_logger,
609-
"Failed to send 'events handling stopped' signal. This should never happen: {}",
610-
e
611-
);
612-
debug_assert!(false);
613-
},
614-
}
615602
});
603+
debug_assert!(self.background_processor_task.lock().unwrap().is_none());
604+
*self.background_processor_task.lock().unwrap() = Some(handle);
616605

617606
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
618607
let mut stop_liquidity_handler = self.stop_sender.subscribe();
@@ -669,39 +658,42 @@ impl Node {
669658
// Disconnect all peers.
670659
self.peer_manager.disconnect_all_peers();
671660

672-
// Wait until event handling stopped, at least until a timeout is reached.
673-
let event_handling_stopped_logger = Arc::clone(&self.logger);
674-
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
661+
// Stop any runtime-dependant chain sources.
662+
self.chain_source.stop();
675663

676-
let timeout_res = tokio::task::block_in_place(move || {
677-
runtime.block_on(async {
678-
tokio::time::timeout(
679-
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
680-
event_handling_stopped_receiver.changed(),
681-
)
682-
.await
683-
})
684-
});
664+
// Wait until background processing stopped, at least until a timeout is reached.
665+
if let Some(background_processor_task) =
666+
self.background_processor_task.lock().unwrap().take()
667+
{
668+
let abort_handle = background_processor_task.abort_handle();
669+
let timeout_res = tokio::task::block_in_place(move || {
670+
runtime.block_on(async {
671+
tokio::time::timeout(
672+
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
673+
background_processor_task,
674+
)
675+
.await
676+
})
677+
});
685678

686-
match timeout_res {
687-
Ok(stop_res) => match stop_res {
688-
Ok(()) => {},
679+
match timeout_res {
680+
Ok(stop_res) => match stop_res {
681+
Ok(()) => {},
682+
Err(e) => {
683+
abort_handle.abort();
684+
log_error!(
685+
self.logger,
686+
"Stopping event handling failed. This should never happen: {}",
687+
e
688+
);
689+
panic!("Stopping event handling failed. This should never happen.");
690+
},
691+
},
689692
Err(e) => {
690-
log_error!(
691-
event_handling_stopped_logger,
692-
"Stopping event handling failed. This should never happen: {}",
693-
e
694-
);
695-
panic!("Stopping event handling failed. This should never happen.");
693+
abort_handle.abort();
694+
log_error!(self.logger, "Stopping event handling timed out: {}", e);
696695
},
697-
},
698-
Err(e) => {
699-
log_error!(
700-
event_handling_stopped_logger,
701-
"Stopping event handling timed out: {}",
702-
e
703-
);
704-
},
696+
}
705697
}
706698

707699
#[cfg(tokio_unstable)]

0 commit comments

Comments
 (0)