Skip to content

Commit 9eae61d

Browse files
committed
Await on the background processing task's JoinHandle
Previously, we used to a channel to indicate that the background processor task has been stopped. Here, we rather just await the task's `JoinHandle` which is more robust in that it avoids a race condition.
1 parent 6fe2d30 commit 9eae61d

File tree

2 files changed

+38
-46
lines changed

2 files changed

+38
-46
lines changed

src/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,12 +1591,12 @@ fn build_with_store_internal(
15911591
};
15921592

15931593
let (stop_sender, _) = tokio::sync::watch::channel(());
1594-
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
1594+
let background_processor_task = Mutex::new(None);
15951595

15961596
Ok(Node {
15971597
runtime,
15981598
stop_sender,
1599-
event_handling_stopped_sender,
1599+
background_processor_task,
16001600
config,
16011601
wallet,
16021602
chain_source,

src/lib.rs

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ uniffi::include_scaffolding!("ldk_node");
180180
pub struct Node {
181181
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
182182
stop_sender: tokio::sync::watch::Sender<()>,
183-
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
183+
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
184184
config: Arc<Config>,
185185
wallet: Arc<Wallet>,
186186
chain_source: Arc<ChainSource>,
@@ -579,8 +579,7 @@ impl Node {
579579
};
580580

581581
let background_stop_logger = Arc::clone(&self.logger);
582-
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
583-
runtime.spawn(async move {
582+
let handle = runtime.spawn(async move {
584583
process_events_async(
585584
background_persister,
586585
|e| background_event_handler.handle_event(e),
@@ -601,19 +600,9 @@ impl Node {
601600
panic!("Failed to process events");
602601
});
603602
log_debug!(background_stop_logger, "Events processing stopped.",);
604-
605-
match event_handling_stopped_sender.send(()) {
606-
Ok(_) => (),
607-
Err(e) => {
608-
log_error!(
609-
background_stop_logger,
610-
"Failed to send 'events handling stopped' signal. This should never happen: {}",
611-
e
612-
);
613-
debug_assert!(false);
614-
},
615-
}
616603
});
604+
debug_assert!(self.background_processor_task.lock().unwrap().is_none());
605+
*self.background_processor_task.lock().unwrap() = Some(handle);
617606

618607
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
619608
let mut stop_liquidity_handler = self.stop_sender.subscribe();
@@ -670,39 +659,42 @@ impl Node {
670659
// Disconnect all peers.
671660
self.peer_manager.disconnect_all_peers();
672661

673-
// Wait until event handling stopped, at least until a timeout is reached.
674-
let event_handling_stopped_logger = Arc::clone(&self.logger);
675-
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
662+
// Stop any runtime-dependant chain sources.
663+
self.chain_source.stop();
676664

677-
let timeout_res = tokio::task::block_in_place(move || {
678-
runtime.block_on(async {
679-
tokio::time::timeout(
680-
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
681-
event_handling_stopped_receiver.changed(),
682-
)
683-
.await
684-
})
685-
});
665+
// Wait until background processing stopped, at least until a timeout is reached.
666+
if let Some(background_processor_task) =
667+
self.background_processor_task.lock().unwrap().take()
668+
{
669+
let abort_handle = background_processor_task.abort_handle();
670+
let timeout_res = tokio::task::block_in_place(move || {
671+
runtime.block_on(async {
672+
tokio::time::timeout(
673+
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
674+
background_processor_task,
675+
)
676+
.await
677+
})
678+
});
686679

687-
match timeout_res {
688-
Ok(stop_res) => match stop_res {
689-
Ok(()) => {},
680+
match timeout_res {
681+
Ok(stop_res) => match stop_res {
682+
Ok(()) => {},
683+
Err(e) => {
684+
abort_handle.abort();
685+
log_error!(
686+
self.logger,
687+
"Stopping event handling failed. This should never happen: {}",
688+
e
689+
);
690+
panic!("Stopping event handling failed. This should never happen.");
691+
},
692+
},
690693
Err(e) => {
691-
log_error!(
692-
event_handling_stopped_logger,
693-
"Stopping event handling failed. This should never happen: {}",
694-
e
695-
);
696-
panic!("Stopping event handling failed. This should never happen.");
694+
abort_handle.abort();
695+
log_error!(self.logger, "Stopping event handling timed out: {}", e);
697696
},
698-
},
699-
Err(e) => {
700-
log_error!(
701-
event_handling_stopped_logger,
702-
"Stopping event handling timed out: {}",
703-
e
704-
);
705-
},
697+
}
706698
}
707699

708700
#[cfg(tokio_unstable)]

0 commit comments

Comments
 (0)