Skip to content

Commit 7a18cad

Browse files
feat(electrum): optimize merkle proof validation with batching
Co-authored-by: keerthi <[email protected]>
1 parent 63923c6 commit 7a18cad

File tree

2 files changed

+185
-45
lines changed

2 files changed

+185
-45
lines changed

crates/electrum/src/bdk_electrum_client.rs

Lines changed: 136 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ use std::sync::{Arc, Mutex};
1212
/// We include a chain suffix of a certain length for the purpose of robustness.
1313
const CHAIN_SUFFIX_LENGTH: u32 = 8;
1414

15+
/// Maximum batch size for proof validation requests
16+
const MAX_BATCH_SIZE: usize = 100;
17+
1518
/// Wrapper around an [`electrum_client::ElectrumApi`] which includes an internal in-memory
1619
/// transaction cache to avoid re-fetching already downloaded transactions.
1720
#[derive(Debug)]
@@ -22,6 +25,8 @@ pub struct BdkElectrumClient<E> {
2225
tx_cache: Mutex<HashMap<Txid, Arc<Transaction>>>,
2326
/// The header cache
2427
block_header_cache: Mutex<HashMap<u32, Header>>,
28+
/// Cache of transaction anchors
29+
anchor_cache: Mutex<HashMap<(Txid, BlockHash), ConfirmationBlockTime>>,
2530
}
2631

2732
impl<E: ElectrumApi> BdkElectrumClient<E> {
@@ -31,6 +36,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
3136
inner: client,
3237
tx_cache: Default::default(),
3338
block_header_cache: Default::default(),
39+
anchor_cache: Default::default(),
3440
}
3541
}
3642

@@ -135,13 +141,19 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
135141

136142
let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
137143
let mut last_active_indices = BTreeMap::<K, u32>::default();
144+
let mut pending_anchors = Vec::new();
138145
for keychain in request.keychains() {
139146
let spks = request
140147
.iter_spks(keychain.clone())
141148
.map(|(spk_i, spk)| (spk_i, SpkWithExpectedTxids::from(spk)));
142-
if let Some(last_active_index) =
143-
self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)?
144-
{
149+
if let Some(last_active_index) = self.populate_with_spks(
150+
start_time,
151+
&mut tx_update,
152+
spks,
153+
stop_gap,
154+
batch_size,
155+
&mut pending_anchors,
156+
)? {
145157
last_active_indices.insert(keychain, last_active_index);
146158
}
147159
}
@@ -151,6 +163,13 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
151163
self.fetch_prev_txout(&mut tx_update)?;
152164
}
153165

166+
if !pending_anchors.is_empty() {
167+
let anchors = self.batch_fetch_anchors(&pending_anchors)?;
168+
for (txid, anchor) in anchors {
169+
tx_update.anchors.insert((anchor, txid));
170+
}
171+
}
172+
154173
let chain_update = match tip_and_latest_blocks {
155174
Some((chain_tip, latest_blocks)) => Some(chain_update(
156175
chain_tip,
@@ -204,6 +223,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
204223
};
205224

206225
let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
226+
let mut pending_anchors = Vec::new();
207227
self.populate_with_spks(
208228
start_time,
209229
&mut tx_update,
@@ -213,15 +233,33 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
213233
.map(|(i, spk)| (i as u32, spk)),
214234
usize::MAX,
215235
batch_size,
236+
&mut pending_anchors,
237+
)?;
238+
self.populate_with_txids(
239+
start_time,
240+
&mut tx_update,
241+
request.iter_txids(),
242+
&mut pending_anchors,
243+
)?;
244+
self.populate_with_outpoints(
245+
start_time,
246+
&mut tx_update,
247+
request.iter_outpoints(),
248+
&mut pending_anchors,
216249
)?;
217-
self.populate_with_txids(start_time, &mut tx_update, request.iter_txids())?;
218-
self.populate_with_outpoints(start_time, &mut tx_update, request.iter_outpoints())?;
219250

220251
// Fetch previous `TxOut`s for fee calculation if flag is enabled.
221252
if fetch_prev_txouts {
222253
self.fetch_prev_txout(&mut tx_update)?;
223254
}
224255

256+
if !pending_anchors.is_empty() {
257+
let anchors = self.batch_fetch_anchors(&pending_anchors)?;
258+
for (txid, anchor) in anchors {
259+
tx_update.anchors.insert((anchor, txid));
260+
}
261+
}
262+
225263
let chain_update = match tip_and_latest_blocks {
226264
Some((chain_tip, latest_blocks)) => Some(chain_update(
227265
chain_tip,
@@ -249,16 +287,17 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
249287
mut spks_with_expected_txids: impl Iterator<Item = (u32, SpkWithExpectedTxids)>,
250288
stop_gap: usize,
251289
batch_size: usize,
290+
pending_anchors: &mut Vec<(Txid, usize)>,
252291
) -> Result<Option<u32>, Error> {
253-
let mut unused_spk_count = 0_usize;
254-
let mut last_active_index = Option::<u32>::None;
292+
let mut unused_spk_count = 0;
293+
let mut last_active_index = None;
255294

256295
loop {
257296
let spks = (0..batch_size)
258297
.map_while(|_| spks_with_expected_txids.next())
259298
.collect::<Vec<_>>();
260299
if spks.is_empty() {
261-
return Ok(last_active_index);
300+
break;
262301
}
263302

264303
let spk_histories = self
@@ -267,9 +306,9 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
267306

268307
for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) {
269308
if spk_history.is_empty() {
270-
unused_spk_count = unused_spk_count.saturating_add(1);
309+
unused_spk_count += 1;
271310
if unused_spk_count >= stop_gap {
272-
return Ok(last_active_index);
311+
break;
273312
}
274313
} else {
275314
last_active_index = Some(spk_index);
@@ -292,7 +331,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
292331
match tx_res.height.try_into() {
293332
// Returned heights 0 & -1 are reserved for unconfirmed txs.
294333
Ok(height) if height > 0 => {
295-
self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?;
334+
pending_anchors.push((tx_res.tx_hash, height));
296335
}
297336
_ => {
298337
tx_update.seen_ats.insert((tx_res.tx_hash, start_time));
@@ -301,6 +340,8 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
301340
}
302341
}
303342
}
343+
344+
Ok(last_active_index)
304345
}
305346

306347
/// Populate the `tx_update` with associated transactions/anchors of `outpoints`.
@@ -312,6 +353,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
312353
start_time: u64,
313354
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
314355
outpoints: impl IntoIterator<Item = OutPoint>,
356+
pending_anchors: &mut Vec<(Txid, usize)>,
315357
) -> Result<(), Error> {
316358
for outpoint in outpoints {
317359
let op_txid = outpoint.txid;
@@ -337,7 +379,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
337379
match res.height.try_into() {
338380
// Returned heights 0 & -1 are reserved for unconfirmed txs.
339381
Ok(height) if height > 0 => {
340-
self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
382+
pending_anchors.push((res.tx_hash, height));
341383
}
342384
_ => {
343385
tx_update.seen_ats.insert((res.tx_hash, start_time));
@@ -359,7 +401,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
359401
match res.height.try_into() {
360402
// Returned heights 0 & -1 are reserved for unconfirmed txs.
361403
Ok(height) if height > 0 => {
362-
self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
404+
pending_anchors.push((res.tx_hash, height));
363405
}
364406
_ => {
365407
tx_update.seen_ats.insert((res.tx_hash, start_time));
@@ -368,6 +410,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
368410
}
369411
}
370412
}
413+
371414
Ok(())
372415
}
373416

@@ -377,6 +420,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
377420
start_time: u64,
378421
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
379422
txids: impl IntoIterator<Item = Txid>,
423+
pending_anchors: &mut Vec<(Txid, usize)>,
380424
) -> Result<(), Error> {
381425
for txid in txids {
382426
let tx = match self.fetch_tx(txid) {
@@ -402,7 +446,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
402446
match r.height.try_into() {
403447
// Returned heights 0 & -1 are reserved for unconfirmed txs.
404448
Ok(height) if height > 0 => {
405-
self.validate_merkle_for_anchor(tx_update, txid, height)?;
449+
pending_anchors.push((txid, height));
406450
}
407451
_ => {
408452
tx_update.seen_ats.insert((r.tx_hash, start_time));
@@ -412,52 +456,99 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
412456

413457
tx_update.txs.push(tx);
414458
}
459+
415460
Ok(())
416461
}
417462

418-
// Helper function which checks if a transaction is confirmed by validating the merkle proof.
419-
// An anchor is inserted if the transaction is validated to be in a confirmed block.
420-
fn validate_merkle_for_anchor(
463+
/// Batch validate Merkle proofs, cache each confirmation anchor, and return them.
464+
fn batch_fetch_anchors(
421465
&self,
422-
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
423-
txid: Txid,
424-
confirmation_height: usize,
425-
) -> Result<(), Error> {
426-
if let Ok(merkle_res) = self
427-
.inner
428-
.transaction_get_merkle(&txid, confirmation_height)
466+
txs_with_heights: &[(Txid, usize)],
467+
) -> Result<Vec<(Txid, ConfirmationBlockTime)>, Error> {
468+
let mut results = Vec::with_capacity(txs_with_heights.len());
469+
let mut to_fetch = Vec::new();
470+
471+
// Build a map for height to block hash conversions. This is for obtaining block hash data
472+
// with minimum `fetch_header` calls.
473+
let mut height_to_hash: HashMap<u32, BlockHash> = HashMap::new();
474+
for &(_, height) in txs_with_heights {
475+
let h = height as u32;
476+
if !height_to_hash.contains_key(&h) {
477+
// Try to obtain hash from the header cache, or fetch the header if absent.
478+
let hash = self.fetch_header(h)?.block_hash();
479+
height_to_hash.insert(h, hash);
480+
}
481+
}
482+
483+
// Check cache.
429484
{
430-
let mut header = self.fetch_header(merkle_res.block_height as u32)?;
431-
let mut is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
432-
&txid,
433-
&header.merkle_root,
434-
&merkle_res,
435-
);
436-
437-
// Merkle validation will fail if the header in `block_header_cache` is outdated, so we
438-
// want to check if there is a new header and validate against the new one.
439-
if !is_confirmed_tx {
440-
header = self.update_header(merkle_res.block_height as u32)?;
441-
is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
485+
let anchor_cache = self.anchor_cache.lock().unwrap();
486+
for &(txid, height) in txs_with_heights {
487+
let h = height as u32;
488+
let hash = height_to_hash[&h];
489+
if let Some(anchor) = anchor_cache.get(&(txid, hash)) {
490+
results.push((txid, *anchor));
491+
} else {
492+
to_fetch.push((txid, height, hash));
493+
}
494+
}
495+
}
496+
497+
// Fetch missing proofs in batches
498+
for chunk in to_fetch.chunks(MAX_BATCH_SIZE) {
499+
for &(txid, height, hash) in chunk {
500+
// Fetch the raw proof.
501+
let proof = self.inner.transaction_get_merkle(&txid, height)?;
502+
503+
// Validate against header, retrying once on stale header.
504+
let mut header = self.fetch_header(height as u32)?;
505+
let mut valid = electrum_client::utils::validate_merkle_proof(
442506
&txid,
443507
&header.merkle_root,
444-
&merkle_res,
508+
&proof,
445509
);
446-
}
510+
if !valid {
511+
header = self.update_header(height as u32)?;
512+
valid = electrum_client::utils::validate_merkle_proof(
513+
&txid,
514+
&header.merkle_root,
515+
&proof,
516+
);
517+
}
447518

448-
if is_confirmed_tx {
449-
tx_update.anchors.insert((
450-
ConfirmationBlockTime {
519+
// Build and cache the anchor if merkle proof is valid.
520+
if valid {
521+
let anchor = ConfirmationBlockTime {
451522
confirmation_time: header.time as u64,
452523
block_id: BlockId {
453-
height: merkle_res.block_height as u32,
524+
height: height as u32,
454525
hash: header.block_hash(),
455526
},
456-
},
457-
txid,
458-
));
527+
};
528+
self.anchor_cache
529+
.lock()
530+
.unwrap()
531+
.insert((txid, hash), anchor);
532+
results.push((txid, anchor));
533+
}
459534
}
460535
}
536+
537+
Ok(results)
538+
}
539+
540+
/// Validate a single transaction’s Merkle proof, cache its confirmation anchor, and update.
541+
#[allow(dead_code)]
542+
fn validate_anchor_for_update(
543+
&self,
544+
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
545+
txid: Txid,
546+
confirmation_height: usize,
547+
) -> Result<(), Error> {
548+
let anchors = self.batch_fetch_anchors(&[(txid, confirmation_height)])?;
549+
for (txid, anchor) in anchors {
550+
tx_update.anchors.insert((anchor, txid));
551+
}
461552
Ok(())
462553
}
463554

0 commit comments

Comments
 (0)