Skip to content

Commit d475749

Browse files
committed
Await on the background processing task's JoinHandle
Previously, we used to a channel to indicate that the background processor task has been stopped. Here, we rather just await the task's `JoinHandle` which is more robust in that it avoids a race condition.
1 parent c7889de commit d475749

File tree

2 files changed

+36
-47
lines changed

2 files changed

+36
-47
lines changed

src/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,12 +1591,12 @@ fn build_with_store_internal(
15911591
};
15921592

15931593
let (stop_sender, _) = tokio::sync::watch::channel(());
1594-
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
1594+
let background_processor_task = Mutex::new(None);
15951595

15961596
Ok(Node {
15971597
runtime,
15981598
stop_sender,
1599-
event_handling_stopped_sender,
1599+
background_processor_task,
16001600
config,
16011601
wallet,
16021602
chain_source,

src/lib.rs

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ uniffi::include_scaffolding!("ldk_node");
180180
pub struct Node {
181181
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
182182
stop_sender: tokio::sync::watch::Sender<()>,
183-
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
183+
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
184184
config: Arc<Config>,
185185
wallet: Arc<Wallet>,
186186
chain_source: Arc<ChainSource>,
@@ -579,8 +579,7 @@ impl Node {
579579
};
580580

581581
let background_stop_logger = Arc::clone(&self.logger);
582-
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
583-
runtime.spawn(async move {
582+
let handle = runtime.spawn(async move {
584583
process_events_async(
585584
background_persister,
586585
|e| background_event_handler.handle_event(e),
@@ -601,19 +600,9 @@ impl Node {
601600
panic!("Failed to process events");
602601
});
603602
log_debug!(background_stop_logger, "Events processing stopped.",);
604-
605-
match event_handling_stopped_sender.send(()) {
606-
Ok(_) => (),
607-
Err(e) => {
608-
log_error!(
609-
background_stop_logger,
610-
"Failed to send 'events handling stopped' signal. This should never happen: {}",
611-
e
612-
);
613-
debug_assert!(false);
614-
},
615-
}
616603
});
604+
debug_assert!(self.background_processor_task.lock().unwrap().is_none());
605+
*self.background_processor_task.lock().unwrap() = Some(handle);
617606

618607
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
619608
let mut stop_liquidity_handler = self.stop_sender.subscribe();
@@ -678,39 +667,39 @@ impl Node {
678667
// Stop any runtime-dependant chain sources.
679668
self.chain_source.stop();
680669

681-
// Wait until event handling stopped, at least until a timeout is reached.
682-
let event_handling_stopped_logger = Arc::clone(&self.logger);
683-
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
684-
685-
let timeout_res = tokio::task::block_in_place(move || {
686-
runtime.block_on(async {
687-
tokio::time::timeout(
688-
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
689-
event_handling_stopped_receiver.changed(),
690-
)
691-
.await
692-
})
693-
});
670+
// Wait until background processing stopped, at least until a timeout is reached.
671+
if let Some(background_processor_task) =
672+
self.background_processor_task.lock().unwrap().take()
673+
{
674+
let abort_handle = background_processor_task.abort_handle();
675+
let timeout_res = tokio::task::block_in_place(move || {
676+
runtime.block_on(async {
677+
tokio::time::timeout(
678+
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
679+
background_processor_task,
680+
)
681+
.await
682+
})
683+
});
694684

695-
match timeout_res {
696-
Ok(stop_res) => match stop_res {
697-
Ok(()) => {},
685+
match timeout_res {
686+
Ok(stop_res) => match stop_res {
687+
Ok(()) => {},
688+
Err(e) => {
689+
abort_handle.abort();
690+
log_error!(
691+
self.logger,
692+
"Stopping event handling failed. This should never happen: {}",
693+
e
694+
);
695+
panic!("Stopping event handling failed. This should never happen.");
696+
},
697+
},
698698
Err(e) => {
699-
log_error!(
700-
event_handling_stopped_logger,
701-
"Stopping event handling failed. This should never happen: {}",
702-
e
703-
);
704-
panic!("Stopping event handling failed. This should never happen.");
699+
abort_handle.abort();
700+
log_error!(self.logger, "Stopping event handling timed out: {}", e);
705701
},
706-
},
707-
Err(e) => {
708-
log_error!(
709-
event_handling_stopped_logger,
710-
"Stopping event handling timed out: {}",
711-
e
712-
);
713-
},
702+
}
714703
}
715704

716705
#[cfg(tokio_unstable)]

0 commit comments

Comments
 (0)