Skip to content

Commit c7889de

Browse files
committed
Ensure we clear the broadcast queue one last time in stop
As peer disconnection might result in force-closures, we here attempt to process the broadcast queue one last time on shutdown.
1 parent 6fe2d30 commit c7889de

File tree

3 files changed

+157
-111
lines changed

3 files changed

+157
-111
lines changed

src/chain/mod.rs

Lines changed: 138 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use bdk_wallet::Update as BdkUpdate;
4444

4545
use esplora_client::AsyncClient as EsploraAsyncClient;
4646

47-
use bitcoin::{FeeRate, Network, Script, ScriptBuf, Txid};
47+
use bitcoin::{FeeRate, Network, Script, ScriptBuf, Transaction, Txid};
4848

4949
use std::collections::HashMap;
5050
use std::sync::{Arc, Mutex, RwLock};
@@ -1474,83 +1474,125 @@ impl ChainSource {
14741474

14751475
pub(crate) async fn process_broadcast_queue(&self) {
14761476
match self {
1477-
Self::Esplora { esplora_client, tx_broadcaster, logger, .. } => {
1477+
Self::Esplora { tx_broadcaster, .. } => {
14781478
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
14791479
while let Some(next_package) = receiver.recv().await {
1480-
for tx in &next_package {
1481-
let txid = tx.compute_txid();
1482-
let timeout_fut = tokio::time::timeout(
1483-
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
1484-
esplora_client.broadcast(tx),
1485-
);
1486-
match timeout_fut.await {
1487-
Ok(res) => match res {
1488-
Ok(()) => {
1489-
log_trace!(
1490-
logger,
1491-
"Successfully broadcast transaction {}",
1492-
txid
1493-
);
1494-
},
1495-
Err(e) => match e {
1496-
esplora_client::Error::HttpResponse { status, message } => {
1497-
if status == 400 {
1498-
// Log 400 at lesser level, as this often just means bitcoind already knows the
1499-
// transaction.
1500-
// FIXME: We can further differentiate here based on the error
1501-
// message which will be available with rust-esplora-client 0.7 and
1502-
// later.
1503-
log_trace!(
1504-
logger,
1505-
"Failed to broadcast due to HTTP connection error: {}",
1506-
message
1507-
);
1508-
} else {
1509-
log_error!(
1510-
logger,
1511-
"Failed to broadcast due to HTTP connection error: {} - {}",
1512-
status, message
1513-
);
1514-
}
1480+
self.process_package(next_package).await
1481+
}
1482+
},
1483+
Self::Electrum { tx_broadcaster, .. } => {
1484+
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
1485+
while let Some(next_package) = receiver.recv().await {
1486+
self.process_package(next_package).await
1487+
}
1488+
},
1489+
Self::Bitcoind { tx_broadcaster, .. } => {
1490+
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
1491+
while let Some(next_package) = receiver.recv().await {
1492+
self.process_package(next_package).await
1493+
}
1494+
},
1495+
}
1496+
}
1497+
1498+
pub(crate) async fn try_process_broadcast_queue(&self) {
1499+
match self {
1500+
Self::Esplora { tx_broadcaster, .. } => {
1501+
if let Ok(mut receiver) = tx_broadcaster.try_get_broadcast_queue().await {
1502+
while let Ok(next_package) = receiver.try_recv() {
1503+
self.process_package(next_package).await
1504+
}
1505+
}
1506+
},
1507+
Self::Electrum { tx_broadcaster, .. } => {
1508+
if let Ok(mut receiver) = tx_broadcaster.try_get_broadcast_queue().await {
1509+
while let Ok(next_package) = receiver.try_recv() {
1510+
self.process_package(next_package).await
1511+
}
1512+
}
1513+
},
1514+
Self::Bitcoind { tx_broadcaster, .. } => {
1515+
if let Ok(mut receiver) = tx_broadcaster.try_get_broadcast_queue().await {
1516+
while let Ok(next_package) = receiver.try_recv() {
1517+
self.process_package(next_package).await
1518+
}
1519+
}
1520+
},
1521+
}
1522+
}
1523+
1524+
async fn process_package(&self, package: Vec<Transaction>) {
1525+
match self {
1526+
Self::Esplora { esplora_client, logger, .. } => {
1527+
for tx in &package {
1528+
let txid = tx.compute_txid();
1529+
let timeout_fut = tokio::time::timeout(
1530+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
1531+
esplora_client.broadcast(tx),
1532+
);
1533+
match timeout_fut.await {
1534+
Ok(res) => match res {
1535+
Ok(()) => {
1536+
log_trace!(logger, "Successfully broadcast transaction {}", txid);
1537+
},
1538+
Err(e) => match e {
1539+
esplora_client::Error::HttpResponse { status, message } => {
1540+
if status == 400 {
1541+
// Log 400 at lesser level, as this often just means bitcoind already knows the
1542+
// transaction.
1543+
// FIXME: We can further differentiate here based on the error
1544+
// message which will be available with rust-esplora-client 0.7 and
1545+
// later.
15151546
log_trace!(
15161547
logger,
1517-
"Failed broadcast transaction bytes: {}",
1518-
log_bytes!(tx.encode())
1548+
"Failed to broadcast due to HTTP connection error: {}",
1549+
message
15191550
);
1520-
},
1521-
_ => {
1551+
} else {
15221552
log_error!(
1523-
logger,
1524-
"Failed to broadcast transaction {}: {}",
1525-
txid,
1526-
e
1527-
);
1528-
log_trace!(
1529-
logger,
1530-
"Failed broadcast transaction bytes: {}",
1531-
log_bytes!(tx.encode())
1532-
);
1533-
},
1553+
logger,
1554+
"Failed to broadcast due to HTTP connection error: {} - {}",
1555+
status, message
1556+
);
1557+
}
1558+
log_trace!(
1559+
logger,
1560+
"Failed broadcast transaction bytes: {}",
1561+
log_bytes!(tx.encode())
1562+
);
1563+
},
1564+
_ => {
1565+
log_error!(
1566+
logger,
1567+
"Failed to broadcast transaction {}: {}",
1568+
txid,
1569+
e
1570+
);
1571+
log_trace!(
1572+
logger,
1573+
"Failed broadcast transaction bytes: {}",
1574+
log_bytes!(tx.encode())
1575+
);
15341576
},
15351577
},
1536-
Err(e) => {
1537-
log_error!(
1538-
logger,
1539-
"Failed to broadcast transaction due to timeout {}: {}",
1540-
txid,
1541-
e
1542-
);
1543-
log_trace!(
1544-
logger,
1545-
"Failed broadcast transaction bytes: {}",
1546-
log_bytes!(tx.encode())
1547-
);
1548-
},
1549-
}
1578+
},
1579+
Err(e) => {
1580+
log_error!(
1581+
logger,
1582+
"Failed to broadcast transaction due to timeout {}: {}",
1583+
txid,
1584+
e
1585+
);
1586+
log_trace!(
1587+
logger,
1588+
"Failed broadcast transaction bytes: {}",
1589+
log_bytes!(tx.encode())
1590+
);
1591+
},
15501592
}
15511593
}
15521594
},
1553-
Self::Electrum { electrum_runtime_status, tx_broadcaster, .. } => {
1595+
Self::Electrum { electrum_runtime_status, .. } => {
15541596
let electrum_client: Arc<ElectrumRuntimeClient> = if let Some(client) =
15551597
electrum_runtime_status.read().unwrap().client().as_ref()
15561598
{
@@ -1563,54 +1605,31 @@ impl ChainSource {
15631605
return;
15641606
};
15651607

1566-
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
1567-
while let Some(next_package) = receiver.recv().await {
1568-
for tx in next_package {
1569-
electrum_client.broadcast(tx).await;
1570-
}
1608+
for tx in package {
1609+
electrum_client.broadcast(tx).await;
15711610
}
15721611
},
1573-
Self::Bitcoind { api_client, tx_broadcaster, logger, .. } => {
1612+
Self::Bitcoind { api_client, logger, .. } => {
15741613
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
15751614
// features, we should eventually switch to use `submitpackage` via the
15761615
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
15771616
// transactions.
1578-
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
1579-
while let Some(next_package) = receiver.recv().await {
1580-
for tx in &next_package {
1581-
let txid = tx.compute_txid();
1582-
let timeout_fut = tokio::time::timeout(
1583-
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
1584-
api_client.broadcast_transaction(tx),
1585-
);
1586-
match timeout_fut.await {
1587-
Ok(res) => match res {
1588-
Ok(id) => {
1589-
debug_assert_eq!(id, txid);
1590-
log_trace!(
1591-
logger,
1592-
"Successfully broadcast transaction {}",
1593-
txid
1594-
);
1595-
},
1596-
Err(e) => {
1597-
log_error!(
1598-
logger,
1599-
"Failed to broadcast transaction {}: {}",
1600-
txid,
1601-
e
1602-
);
1603-
log_trace!(
1604-
logger,
1605-
"Failed broadcast transaction bytes: {}",
1606-
log_bytes!(tx.encode())
1607-
);
1608-
},
1617+
for tx in &package {
1618+
let txid = tx.compute_txid();
1619+
let timeout_fut = tokio::time::timeout(
1620+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
1621+
api_client.broadcast_transaction(tx),
1622+
);
1623+
match timeout_fut.await {
1624+
Ok(res) => match res {
1625+
Ok(id) => {
1626+
debug_assert_eq!(id, txid);
1627+
log_trace!(logger, "Successfully broadcast transaction {}", txid);
16091628
},
16101629
Err(e) => {
16111630
log_error!(
16121631
logger,
1613-
"Failed to broadcast transaction due to timeout {}: {}",
1632+
"Failed to broadcast transaction {}: {}",
16141633
txid,
16151634
e
16161635
);
@@ -1620,7 +1639,20 @@ impl ChainSource {
16201639
log_bytes!(tx.encode())
16211640
);
16221641
},
1623-
}
1642+
},
1643+
Err(e) => {
1644+
log_error!(
1645+
logger,
1646+
"Failed to broadcast transaction due to timeout {}: {}",
1647+
txid,
1648+
e
1649+
);
1650+
log_trace!(
1651+
logger,
1652+
"Failed broadcast transaction bytes: {}",
1653+
log_bytes!(tx.encode())
1654+
);
1655+
},
16241656
}
16251657
}
16261658
},

src/lib.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -651,10 +651,7 @@ impl Node {
651651

652652
log_info!(self.logger, "Shutting down LDK Node with node ID {}...", self.node_id());
653653

654-
// Stop any runtime-dependant chain sources.
655-
self.chain_source.stop();
656-
657-
// Stop the runtime.
654+
// Stop background tasks.
658655
match self.stop_sender.send(()) {
659656
Ok(_) => (),
660657
Err(e) => {
@@ -670,6 +667,17 @@ impl Node {
670667
// Disconnect all peers.
671668
self.peer_manager.disconnect_all_peers();
672669

670+
// Ensure we process the broadcast queue one last time, as we may have generated
671+
// force-closures during peer disconnection.
672+
let chain_source = Arc::clone(&self.chain_source);
673+
let runtime_ref = &runtime;
674+
tokio::task::block_in_place(move || {
675+
runtime_ref.block_on(async { chain_source.try_process_broadcast_queue().await })
676+
});
677+
678+
// Stop any runtime-dependant chain sources.
679+
self.chain_source.stop();
680+
673681
// Wait until event handling stopped, at least until a timeout is reached.
674682
let event_handling_stopped_logger = Arc::clone(&self.logger);
675683
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();

src/tx_broadcaster.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use lightning::chain::chaininterface::BroadcasterInterface;
1212
use bitcoin::Transaction;
1313

1414
use tokio::sync::mpsc;
15-
use tokio::sync::{Mutex, MutexGuard};
15+
use tokio::sync::{Mutex, MutexGuard, TryLockError};
1616

1717
use std::ops::Deref;
1818

@@ -39,6 +39,12 @@ where
3939
pub(crate) async fn get_broadcast_queue(&self) -> MutexGuard<mpsc::Receiver<Vec<Transaction>>> {
4040
self.queue_receiver.lock().await
4141
}
42+
43+
pub(crate) async fn try_get_broadcast_queue(
44+
&self,
45+
) -> Result<MutexGuard<mpsc::Receiver<Vec<Transaction>>>, TryLockError> {
46+
self.queue_receiver.try_lock()
47+
}
4248
}
4349

4450
impl<L: Deref> BroadcasterInterface for TransactionBroadcaster<L>

0 commit comments

Comments
 (0)