Skip to content

Commit b166833

Browse files
committed
Implement process_broadcast_queue for ChainSource::Electrum
1 parent 9fc13c7 commit b166833

File tree

2 files changed

+69
-6
lines changed

2 files changed

+69
-6
lines changed

src/chain/electrum.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,22 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use crate::config::TX_BROADCAST_TIMEOUT_SECS;
89
use crate::error::Error;
9-
use crate::logger::{log_error, LdkLogger, Logger};
10+
use crate::logger::{log_bytes, log_error, log_trace, LdkLogger, Logger};
1011

1112
use lightning::chain::{Filter, WatchedOutput};
13+
use lightning::util::ser::Writeable;
1214
use lightning_transaction_sync::ElectrumSyncClient;
1315

1416
use bdk_electrum::BdkElectrumClient;
1517

16-
use electrum_client::Client as ElectrumClient;
18+
use electrum_client::{Client as ElectrumClient, ElectrumApi};
1719

18-
use bitcoin::{Script, Txid};
20+
use bitcoin::{Script, Transaction, Txid};
1921

2022
use std::sync::Arc;
23+
use std::time::Duration;
2124

2225
pub(crate) struct ElectrumRuntimeClient {
2326
electrum_client: Arc<ElectrumClient>,
@@ -48,6 +51,48 @@ impl ElectrumRuntimeClient {
4851
);
4952
Ok(Self { electrum_client, bdk_electrum_client, tx_sync, runtime, logger })
5053
}
54+
55+
pub(crate) async fn broadcast(&self, tx: Transaction) {
56+
let electrum_client = Arc::clone(&self.electrum_client);
57+
58+
let txid = tx.compute_txid();
59+
let tx_bytes = tx.encode();
60+
61+
let spawn_fut =
62+
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx));
63+
64+
let timeout_fut =
65+
tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut);
66+
67+
match timeout_fut.await {
68+
Ok(res) => match res {
69+
Ok(_) => {
70+
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
71+
},
72+
Err(e) => {
73+
log_error!(self.logger, "Failed to broadcast transaction {}: {}", txid, e);
74+
log_trace!(
75+
self.logger,
76+
"Failed broadcast transaction bytes: {}",
77+
log_bytes!(tx_bytes)
78+
);
79+
},
80+
},
81+
Err(e) => {
82+
log_error!(
83+
self.logger,
84+
"Failed to broadcast transaction due to timeout {}: {}",
85+
txid,
86+
e
87+
);
88+
log_trace!(
89+
self.logger,
90+
"Failed broadcast transaction bytes: {}",
91+
log_bytes!(tx_bytes)
92+
);
93+
},
94+
}
95+
}
5196
}
5297

5398
impl Filter for ElectrumRuntimeClient {

src/chain/mod.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub(crate) enum ChainSource {
127127
Electrum {
128128
server_url: String,
129129
sync_config: ElectrumSyncConfig,
130-
electrum_runtime_client: RwLock<Option<ElectrumRuntimeClient>>,
130+
electrum_runtime_client: RwLock<Option<Arc<ElectrumRuntimeClient>>>,
131131
registered_txs_cache: Mutex<Vec<(Txid, ScriptBuf)>>,
132132
registered_outputs_cache: Mutex<Vec<WatchedOutput>>,
133133
onchain_wallet: Arc<Wallet>,
@@ -271,7 +271,7 @@ impl ChainSource {
271271
}
272272
}
273273

274-
*locked_client = Some(client);
274+
*locked_client = Some(Arc::new(client));
275275
},
276276
_ => {
277277
// Nothing to do for other chain sources.
@@ -1190,7 +1190,25 @@ impl ChainSource {
11901190
}
11911191
}
11921192
},
1193-
Self::Electrum { .. } => todo!(),
1193+
Self::Electrum { electrum_runtime_client, tx_broadcaster, .. } => {
1194+
let electrum_client: Arc<ElectrumRuntimeClient> =
1195+
if let Some(client) = electrum_runtime_client.read().unwrap().as_ref() {
1196+
Arc::clone(client)
1197+
} else {
1198+
debug_assert!(
1199+
false,
1200+
"We should have started the chain source before broadcasting"
1201+
);
1202+
return;
1203+
};
1204+
1205+
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
1206+
while let Some(next_package) = receiver.recv().await {
1207+
for tx in next_package {
1208+
electrum_client.broadcast(tx).await;
1209+
}
1210+
}
1211+
},
11941212
Self::BitcoindRpc { bitcoind_rpc_client, tx_broadcaster, logger, .. } => {
11951213
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
11961214
// features, we should eventually switch to use `submitpackage` via the

0 commit comments

Comments
 (0)