Skip to content

Wait on all background tasks to finish (or abort) #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1632,11 +1632,15 @@ fn build_with_store_internal(

let (stop_sender, _) = tokio::sync::watch::channel(());
let background_processor_task = Mutex::new(None);
let background_tasks = Mutex::new(None);
let cancellable_background_tasks = Mutex::new(None);

Ok(Node {
runtime,
stop_sender,
background_processor_task,
background_tasks,
cancellable_background_tasks,
config,
wallet,
chain_source,
Expand Down
76 changes: 32 additions & 44 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::fee_estimator::{
};
use crate::io::utils::write_node_metrics;
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};

use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget;
Expand Down Expand Up @@ -54,7 +54,6 @@ pub(super) struct BitcoindChainSource {
onchain_wallet: Arc<Wallet>,
wallet_polling_status: Mutex<WalletSyncStatus>,
fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>,
config: Arc<Config>,
logger: Arc<Logger>,
Expand All @@ -65,8 +64,8 @@ impl BitcoindChainSource {
pub(crate) fn new_rpc(
rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String,
onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>, config: Arc<Config>,
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Self {
let api_client = Arc::new(BitcoindClient::new_rpc(
rpc_host.clone(),
Expand All @@ -85,7 +84,6 @@ impl BitcoindChainSource {
onchain_wallet,
wallet_polling_status,
fee_estimator,
tx_broadcaster,
kv_store,
config,
logger: Arc::clone(&logger),
Expand All @@ -96,9 +94,8 @@ impl BitcoindChainSource {
pub(crate) fn new_rest(
rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String,
onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>, config: Arc<Config>,
rest_client_config: BitcoindRestClientConfig, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
kv_store: Arc<DynStore>, config: Arc<Config>, rest_client_config: BitcoindRestClientConfig,
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Self {
let api_client = Arc::new(BitcoindClient::new_rest(
rest_client_config.rest_host,
Expand All @@ -120,7 +117,6 @@ impl BitcoindChainSource {
wallet_polling_status,
onchain_wallet,
fee_estimator,
tx_broadcaster,
kv_store,
config,
logger: Arc::clone(&logger),
Expand Down Expand Up @@ -530,53 +526,45 @@ impl BitcoindChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_queue(&self) {
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
// features, we should eventually switch to use `submitpackage` via the
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
// transactions.
let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
while let Some(next_package) = receiver.recv().await {
for tx in &next_package {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
self.api_client.broadcast_transaction(tx),
);
match timeout_fut.await {
Ok(res) => match res {
Ok(id) => {
debug_assert_eq!(id, txid);
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
},
Err(e) => {
log_error!(
self.logger,
"Failed to broadcast transaction {}: {}",
txid,
e
);
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
);
},
for tx in &package {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
self.api_client.broadcast_transaction(tx),
);
match timeout_fut.await {
Ok(res) => match res {
Ok(id) => {
debug_assert_eq!(id, txid);
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
},
Err(e) => {
log_error!(
self.logger,
"Failed to broadcast transaction due to timeout {}: {}",
txid,
e
);
log_error!(self.logger, "Failed to broadcast transaction {}: {}", txid, e);
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
);
},
}
},
Err(e) => {
log_error!(
self.logger,
"Failed to broadcast transaction due to timeout {}: {}",
txid,
e
);
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
);
},
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::fee_estimator::{
};
use crate::io::utils::write_node_metrics;
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::NodeMetrics;

use lightning::chain::{Confirm, Filter, WatchedOutput};
Expand Down Expand Up @@ -56,7 +56,6 @@ pub(super) struct ElectrumChainSource {
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>,
config: Arc<Config>,
logger: Arc<Logger>,
Expand All @@ -66,9 +65,8 @@ pub(super) struct ElectrumChainSource {
impl ElectrumChainSource {
pub(super) fn new(
server_url: String, sync_config: ElectrumSyncConfig, onchain_wallet: Arc<Wallet>,
fee_estimator: Arc<OnchainFeeEstimator>, tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
fee_estimator: Arc<OnchainFeeEstimator>, kv_store: Arc<DynStore>, config: Arc<Config>,
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Self {
let electrum_runtime_status = RwLock::new(ElectrumRuntimeStatus::new());
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
Expand All @@ -81,7 +79,6 @@ impl ElectrumChainSource {
onchain_wallet_sync_status,
lightning_wallet_sync_status,
fee_estimator,
tx_broadcaster,
kv_store,
config,
logger: Arc::clone(&logger),
Expand Down Expand Up @@ -302,7 +299,7 @@ impl ElectrumChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_queue(&self) {
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
let electrum_client: Arc<ElectrumRuntimeClient> =
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
Arc::clone(client)
Expand All @@ -311,11 +308,8 @@ impl ElectrumChainSource {
return;
};

let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
while let Some(next_package) = receiver.recv().await {
for tx in next_package {
electrum_client.broadcast(tx).await;
}
for tx in package {
electrum_client.broadcast(tx).await;
}
}
}
Expand Down
131 changes: 63 additions & 68 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::fee_estimator::{
};
use crate::io::utils::write_node_metrics;
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};

use lightning::chain::{Confirm, Filter, WatchedOutput};
Expand All @@ -30,7 +30,7 @@ use bdk_esplora::EsploraAsyncExt;

use esplora_client::AsyncClient as EsploraAsyncClient;

use bitcoin::{FeeRate, Network, Script, Txid};
use bitcoin::{FeeRate, Network, Script, Transaction, Txid};

use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
Expand All @@ -44,7 +44,6 @@ pub(super) struct EsploraChainSource {
tx_sync: Arc<EsploraSyncClient<Arc<Logger>>>,
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>,
config: Arc<Config>,
logger: Arc<Logger>,
Expand All @@ -55,8 +54,8 @@ impl EsploraChainSource {
pub(crate) fn new(
server_url: String, headers: HashMap<String, String>, sync_config: EsploraSyncConfig,
onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>, config: Arc<Config>,
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Self {
// FIXME / TODO: We introduced this to make `bdk_esplora` work separately without updating
// `lightning-transaction-sync`. We should revert this as part of of the upgrade to LDK 0.2.
Expand Down Expand Up @@ -90,7 +89,6 @@ impl EsploraChainSource {
tx_sync,
lightning_wallet_sync_status,
fee_estimator,
tx_broadcaster,
kv_store,
config,
logger,
Expand Down Expand Up @@ -372,76 +370,73 @@ impl EsploraChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_queue(&self) {
let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
while let Some(next_package) = receiver.recv().await {
for tx in &next_package {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
self.esplora_client.broadcast(tx),
);
match timeout_fut.await {
Ok(res) => match res {
Ok(()) => {
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
},
Err(e) => match e {
esplora_client::Error::HttpResponse { status, message } => {
if status == 400 {
// Log 400 at lesser level, as this often just means bitcoind already knows the
// transaction.
// FIXME: We can further differentiate here based on the error
// message which will be available with rust-esplora-client 0.7 and
// later.
log_trace!(
self.logger,
"Failed to broadcast due to HTTP connection error: {}",
message
);
} else {
log_error!(
self.logger,
"Failed to broadcast due to HTTP connection error: {} - {}",
status,
message
);
}
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
for tx in &package {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
self.esplora_client.broadcast(tx),
);
match timeout_fut.await {
Ok(res) => match res {
Ok(()) => {
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
},
Err(e) => match e {
esplora_client::Error::HttpResponse { status, message } => {
if status == 400 {
// Log 400 at lesser level, as this often just means bitcoind already knows the
// transaction.
// FIXME: We can further differentiate here based on the error
// message which will be available with rust-esplora-client 0.7 and
// later.
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
"Failed to broadcast due to HTTP connection error: {}",
message
);
},
_ => {
} else {
log_error!(
self.logger,
"Failed to broadcast transaction {}: {}",
txid,
e
);
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
"Failed to broadcast due to HTTP connection error: {} - {}",
status,
message
);
},
}
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
);
},
_ => {
log_error!(
self.logger,
"Failed to broadcast transaction {}: {}",
txid,
e
);
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
);
},
},
Err(e) => {
log_error!(
self.logger,
"Failed to broadcast transaction due to timeout {}: {}",
txid,
e
);
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
);
},
}
},
Err(e) => {
log_error!(
self.logger,
"Failed to broadcast transaction due to timeout {}: {}",
txid,
e
);
log_trace!(
self.logger,
"Failed broadcast transaction bytes: {}",
log_bytes!(tx.encode())
);
},
}
}
}
Expand Down
Loading
Loading