Skip to content

Commit 2bf683b

Browse files
committed
Move continuous tx broadcast processing loop to ChainSource
Rather than looping in the `spawn` method directly, we move the loop to a refactored `continuously_process_broadcast_queue` method on `ChainSource`, which also allows us to react on the stop signal if we're polling `recv`.
1 parent 7e686b2 commit 2bf683b

File tree

2 files changed

+29
-32
lines changed

2 files changed

+29
-32
lines changed

src/chain/mod.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::config::{
1818
};
1919
use crate::fee_estimator::OnchainFeeEstimator;
2020
use crate::io::utils::write_node_metrics;
21-
use crate::logger::{log_info, log_trace, LdkLogger, Logger};
21+
use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger};
2222
use crate::runtime::Runtime;
2323
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2424
use crate::{Error, NodeMetrics};
@@ -426,19 +426,33 @@ impl ChainSource {
426426
}
427427
}
428428

429-
pub(crate) async fn process_broadcast_queue(&self) {
429+
pub(crate) async fn continuously_process_broadcast_queue(
430+
&self, mut stop_tx_bcast_receiver: tokio::sync::watch::Receiver<()>,
431+
) {
430432
let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
431-
while let Some(next_package) = receiver.recv().await {
432-
match &self.kind {
433-
ChainSourceKind::Esplora(esplora_chain_source) => {
434-
esplora_chain_source.process_broadcast_package(next_package).await
435-
},
436-
ChainSourceKind::Electrum(electrum_chain_source) => {
437-
electrum_chain_source.process_broadcast_package(next_package).await
438-
},
439-
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
440-
bitcoind_chain_source.process_broadcast_package(next_package).await
441-
},
433+
loop {
434+
let tx_bcast_logger = Arc::clone(&self.logger);
435+
tokio::select! {
436+
_ = stop_tx_bcast_receiver.changed() => {
437+
log_debug!(
438+
tx_bcast_logger,
439+
"Stopping broadcasting transactions.",
440+
);
441+
return;
442+
}
443+
Some(next_package) = receiver.recv() => {
444+
match &self.kind {
445+
ChainSourceKind::Esplora(esplora_chain_source) => {
446+
esplora_chain_source.process_broadcast_package(next_package).await
447+
},
448+
ChainSourceKind::Electrum(electrum_chain_source) => {
449+
electrum_chain_source.process_broadcast_package(next_package).await
450+
},
451+
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
452+
bitcoind_chain_source.process_broadcast_package(next_package).await
453+
},
454+
}
455+
}
442456
}
443457
}
444458
}

src/lib.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -487,27 +487,10 @@ impl Node {
487487
});
488488
}
489489

490-
let mut stop_tx_bcast = self.stop_sender.subscribe();
490+
let stop_tx_bcast = self.stop_sender.subscribe();
491491
let chain_source = Arc::clone(&self.chain_source);
492-
let tx_bcast_logger = Arc::clone(&self.logger);
493492
self.runtime.spawn(async move {
494-
// Every second we try to clear our broadcasting queue.
495-
let mut interval = tokio::time::interval(Duration::from_secs(1));
496-
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
497-
loop {
498-
tokio::select! {
499-
_ = stop_tx_bcast.changed() => {
500-
log_debug!(
501-
tx_bcast_logger,
502-
"Stopping broadcasting transactions.",
503-
);
504-
return;
505-
}
506-
_ = interval.tick() => {
507-
chain_source.process_broadcast_queue().await;
508-
}
509-
}
510-
}
493+
chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await
511494
});
512495

513496
let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new(

0 commit comments

Comments
 (0)