Skip to content

Commit 5d2092b

Browse files
authored
Merge pull request #612 from tnull/2025-08-shutdown-wait-on-all-tasks
Wait on all background tasks to finish (or abort)
2 parents 3de8b1d + 17a45dd commit 5d2092b

File tree

8 files changed

+264
-195
lines changed

8 files changed

+264
-195
lines changed

src/builder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,11 +1634,15 @@ fn build_with_store_internal(
16341634

16351635
let (stop_sender, _) = tokio::sync::watch::channel(());
16361636
let background_processor_task = Mutex::new(None);
1637+
let background_tasks = Mutex::new(None);
1638+
let cancellable_background_tasks = Mutex::new(None);
16371639

16381640
Ok(Node {
16391641
runtime,
16401642
stop_sender,
16411643
background_processor_task,
1644+
background_tasks,
1645+
cancellable_background_tasks,
16421646
config,
16431647
wallet,
16441648
chain_source,

src/chain/bitcoind.rs

Lines changed: 32 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::fee_estimator::{
1616
};
1717
use crate::io::utils::write_node_metrics;
1818
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
19-
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
19+
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2020
use crate::{Error, NodeMetrics};
2121

2222
use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget;
@@ -54,7 +54,6 @@ pub(super) struct BitcoindChainSource {
5454
onchain_wallet: Arc<Wallet>,
5555
wallet_polling_status: Mutex<WalletSyncStatus>,
5656
fee_estimator: Arc<OnchainFeeEstimator>,
57-
tx_broadcaster: Arc<Broadcaster>,
5857
kv_store: Arc<DynStore>,
5958
config: Arc<Config>,
6059
logger: Arc<Logger>,
@@ -65,8 +64,8 @@ impl BitcoindChainSource {
6564
pub(crate) fn new_rpc(
6665
rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String,
6766
onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
68-
tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>, config: Arc<Config>,
69-
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
67+
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
68+
node_metrics: Arc<RwLock<NodeMetrics>>,
7069
) -> Self {
7170
let api_client = Arc::new(BitcoindClient::new_rpc(
7271
rpc_host.clone(),
@@ -85,7 +84,6 @@ impl BitcoindChainSource {
8584
onchain_wallet,
8685
wallet_polling_status,
8786
fee_estimator,
88-
tx_broadcaster,
8987
kv_store,
9088
config,
9189
logger: Arc::clone(&logger),
@@ -96,9 +94,8 @@ impl BitcoindChainSource {
9694
pub(crate) fn new_rest(
9795
rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String,
9896
onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
99-
tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>, config: Arc<Config>,
100-
rest_client_config: BitcoindRestClientConfig, logger: Arc<Logger>,
101-
node_metrics: Arc<RwLock<NodeMetrics>>,
97+
kv_store: Arc<DynStore>, config: Arc<Config>, rest_client_config: BitcoindRestClientConfig,
98+
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
10299
) -> Self {
103100
let api_client = Arc::new(BitcoindClient::new_rest(
104101
rest_client_config.rest_host,
@@ -120,7 +117,6 @@ impl BitcoindChainSource {
120117
wallet_polling_status,
121118
onchain_wallet,
122119
fee_estimator,
123-
tx_broadcaster,
124120
kv_store,
125121
config,
126122
logger: Arc::clone(&logger),
@@ -530,53 +526,45 @@ impl BitcoindChainSource {
530526
Ok(())
531527
}
532528

533-
pub(crate) async fn process_broadcast_queue(&self) {
529+
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
534530
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
535531
// features, we should eventually switch to use `submitpackage` via the
536532
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
537533
// transactions.
538-
let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
539-
while let Some(next_package) = receiver.recv().await {
540-
for tx in &next_package {
541-
let txid = tx.compute_txid();
542-
let timeout_fut = tokio::time::timeout(
543-
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
544-
self.api_client.broadcast_transaction(tx),
545-
);
546-
match timeout_fut.await {
547-
Ok(res) => match res {
548-
Ok(id) => {
549-
debug_assert_eq!(id, txid);
550-
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
551-
},
552-
Err(e) => {
553-
log_error!(
554-
self.logger,
555-
"Failed to broadcast transaction {}: {}",
556-
txid,
557-
e
558-
);
559-
log_trace!(
560-
self.logger,
561-
"Failed broadcast transaction bytes: {}",
562-
log_bytes!(tx.encode())
563-
);
564-
},
534+
for tx in &package {
535+
let txid = tx.compute_txid();
536+
let timeout_fut = tokio::time::timeout(
537+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
538+
self.api_client.broadcast_transaction(tx),
539+
);
540+
match timeout_fut.await {
541+
Ok(res) => match res {
542+
Ok(id) => {
543+
debug_assert_eq!(id, txid);
544+
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
565545
},
566546
Err(e) => {
567-
log_error!(
568-
self.logger,
569-
"Failed to broadcast transaction due to timeout {}: {}",
570-
txid,
571-
e
572-
);
547+
log_error!(self.logger, "Failed to broadcast transaction {}: {}", txid, e);
573548
log_trace!(
574549
self.logger,
575550
"Failed broadcast transaction bytes: {}",
576551
log_bytes!(tx.encode())
577552
);
578553
},
579-
}
554+
},
555+
Err(e) => {
556+
log_error!(
557+
self.logger,
558+
"Failed to broadcast transaction due to timeout {}: {}",
559+
txid,
560+
e
561+
);
562+
log_trace!(
563+
self.logger,
564+
"Failed broadcast transaction bytes: {}",
565+
log_bytes!(tx.encode())
566+
);
567+
},
580568
}
581569
}
582570
}

src/chain/electrum.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::fee_estimator::{
1818
};
1919
use crate::io::utils::write_node_metrics;
2020
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
21-
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
21+
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2222
use crate::NodeMetrics;
2323

2424
use lightning::chain::{Confirm, Filter, WatchedOutput};
@@ -56,7 +56,6 @@ pub(super) struct ElectrumChainSource {
5656
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
5757
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
5858
fee_estimator: Arc<OnchainFeeEstimator>,
59-
tx_broadcaster: Arc<Broadcaster>,
6059
kv_store: Arc<DynStore>,
6160
config: Arc<Config>,
6261
logger: Arc<Logger>,
@@ -66,9 +65,8 @@ pub(super) struct ElectrumChainSource {
6665
impl ElectrumChainSource {
6766
pub(super) fn new(
6867
server_url: String, sync_config: ElectrumSyncConfig, onchain_wallet: Arc<Wallet>,
69-
fee_estimator: Arc<OnchainFeeEstimator>, tx_broadcaster: Arc<Broadcaster>,
70-
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
71-
node_metrics: Arc<RwLock<NodeMetrics>>,
68+
fee_estimator: Arc<OnchainFeeEstimator>, kv_store: Arc<DynStore>, config: Arc<Config>,
69+
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
7270
) -> Self {
7371
let electrum_runtime_status = RwLock::new(ElectrumRuntimeStatus::new());
7472
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
@@ -81,7 +79,6 @@ impl ElectrumChainSource {
8179
onchain_wallet_sync_status,
8280
lightning_wallet_sync_status,
8381
fee_estimator,
84-
tx_broadcaster,
8582
kv_store,
8683
config,
8784
logger: Arc::clone(&logger),
@@ -302,7 +299,7 @@ impl ElectrumChainSource {
302299
Ok(())
303300
}
304301

305-
pub(crate) async fn process_broadcast_queue(&self) {
302+
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
306303
let electrum_client: Arc<ElectrumRuntimeClient> =
307304
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
308305
Arc::clone(client)
@@ -311,11 +308,8 @@ impl ElectrumChainSource {
311308
return;
312309
};
313310

314-
let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
315-
while let Some(next_package) = receiver.recv().await {
316-
for tx in next_package {
317-
electrum_client.broadcast(tx).await;
318-
}
311+
for tx in package {
312+
electrum_client.broadcast(tx).await;
319313
}
320314
}
321315
}

src/chain/esplora.rs

Lines changed: 63 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::fee_estimator::{
1818
};
1919
use crate::io::utils::write_node_metrics;
2020
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
21-
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
21+
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2222
use crate::{Error, NodeMetrics};
2323

2424
use lightning::chain::{Confirm, Filter, WatchedOutput};
@@ -30,7 +30,7 @@ use bdk_esplora::EsploraAsyncExt;
3030

3131
use esplora_client::AsyncClient as EsploraAsyncClient;
3232

33-
use bitcoin::{FeeRate, Network, Script, Txid};
33+
use bitcoin::{FeeRate, Network, Script, Transaction, Txid};
3434

3535
use std::collections::HashMap;
3636
use std::sync::{Arc, Mutex, RwLock};
@@ -44,7 +44,6 @@ pub(super) struct EsploraChainSource {
4444
tx_sync: Arc<EsploraSyncClient<Arc<Logger>>>,
4545
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
4646
fee_estimator: Arc<OnchainFeeEstimator>,
47-
tx_broadcaster: Arc<Broadcaster>,
4847
kv_store: Arc<DynStore>,
4948
config: Arc<Config>,
5049
logger: Arc<Logger>,
@@ -55,8 +54,8 @@ impl EsploraChainSource {
5554
pub(crate) fn new(
5655
server_url: String, headers: HashMap<String, String>, sync_config: EsploraSyncConfig,
5756
onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
58-
tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>, config: Arc<Config>,
59-
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
57+
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
58+
node_metrics: Arc<RwLock<NodeMetrics>>,
6059
) -> Self {
6160
// FIXME / TODO: We introduced this to make `bdk_esplora` work separately without updating
6261
// `lightning-transaction-sync`. We should revert this as part of of the upgrade to LDK 0.2.
@@ -90,7 +89,6 @@ impl EsploraChainSource {
9089
tx_sync,
9190
lightning_wallet_sync_status,
9291
fee_estimator,
93-
tx_broadcaster,
9492
kv_store,
9593
config,
9694
logger,
@@ -372,76 +370,73 @@ impl EsploraChainSource {
372370
Ok(())
373371
}
374372

375-
pub(crate) async fn process_broadcast_queue(&self) {
376-
let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
377-
while let Some(next_package) = receiver.recv().await {
378-
for tx in &next_package {
379-
let txid = tx.compute_txid();
380-
let timeout_fut = tokio::time::timeout(
381-
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
382-
self.esplora_client.broadcast(tx),
383-
);
384-
match timeout_fut.await {
385-
Ok(res) => match res {
386-
Ok(()) => {
387-
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
388-
},
389-
Err(e) => match e {
390-
esplora_client::Error::HttpResponse { status, message } => {
391-
if status == 400 {
392-
// Log 400 at lesser level, as this often just means bitcoind already knows the
393-
// transaction.
394-
// FIXME: We can further differentiate here based on the error
395-
// message which will be available with rust-esplora-client 0.7 and
396-
// later.
397-
log_trace!(
398-
self.logger,
399-
"Failed to broadcast due to HTTP connection error: {}",
400-
message
401-
);
402-
} else {
403-
log_error!(
404-
self.logger,
405-
"Failed to broadcast due to HTTP connection error: {} - {}",
406-
status,
407-
message
408-
);
409-
}
373+
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
374+
for tx in &package {
375+
let txid = tx.compute_txid();
376+
let timeout_fut = tokio::time::timeout(
377+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
378+
self.esplora_client.broadcast(tx),
379+
);
380+
match timeout_fut.await {
381+
Ok(res) => match res {
382+
Ok(()) => {
383+
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
384+
},
385+
Err(e) => match e {
386+
esplora_client::Error::HttpResponse { status, message } => {
387+
if status == 400 {
388+
// Log 400 at lesser level, as this often just means bitcoind already knows the
389+
// transaction.
390+
// FIXME: We can further differentiate here based on the error
391+
// message which will be available with rust-esplora-client 0.7 and
392+
// later.
410393
log_trace!(
411394
self.logger,
412-
"Failed broadcast transaction bytes: {}",
413-
log_bytes!(tx.encode())
395+
"Failed to broadcast due to HTTP connection error: {}",
396+
message
414397
);
415-
},
416-
_ => {
398+
} else {
417399
log_error!(
418400
self.logger,
419-
"Failed to broadcast transaction {}: {}",
420-
txid,
421-
e
422-
);
423-
log_trace!(
424-
self.logger,
425-
"Failed broadcast transaction bytes: {}",
426-
log_bytes!(tx.encode())
401+
"Failed to broadcast due to HTTP connection error: {} - {}",
402+
status,
403+
message
427404
);
428-
},
405+
}
406+
log_trace!(
407+
self.logger,
408+
"Failed broadcast transaction bytes: {}",
409+
log_bytes!(tx.encode())
410+
);
411+
},
412+
_ => {
413+
log_error!(
414+
self.logger,
415+
"Failed to broadcast transaction {}: {}",
416+
txid,
417+
e
418+
);
419+
log_trace!(
420+
self.logger,
421+
"Failed broadcast transaction bytes: {}",
422+
log_bytes!(tx.encode())
423+
);
429424
},
430425
},
431-
Err(e) => {
432-
log_error!(
433-
self.logger,
434-
"Failed to broadcast transaction due to timeout {}: {}",
435-
txid,
436-
e
437-
);
438-
log_trace!(
439-
self.logger,
440-
"Failed broadcast transaction bytes: {}",
441-
log_bytes!(tx.encode())
442-
);
443-
},
444-
}
426+
},
427+
Err(e) => {
428+
log_error!(
429+
self.logger,
430+
"Failed to broadcast transaction due to timeout {}: {}",
431+
txid,
432+
e
433+
);
434+
log_trace!(
435+
self.logger,
436+
"Failed broadcast transaction bytes: {}",
437+
log_bytes!(tx.encode())
438+
);
439+
},
445440
}
446441
}
447442
}

0 commit comments

Comments
 (0)