Skip to content

Commit ec4fd97

Browse files
committed
feat(electrum): batched Headers and script_get_history
1 parent f21a21d commit ec4fd97

File tree

1 file changed

+133
-114
lines changed

1 file changed

+133
-114
lines changed

crates/electrum/src/bdk_electrum_client.rs

Lines changed: 133 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -70,33 +70,6 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
7070
Ok(tx)
7171
}
7272

73-
/// Fetch block header of given `height`.
74-
///
75-
/// If it hits the cache it will return the cached version and avoid making the request.
76-
fn fetch_header(&self, height: u32) -> Result<Header, Error> {
77-
let block_header_cache = self.block_header_cache.lock().unwrap();
78-
79-
if let Some(header) = block_header_cache.get(&height) {
80-
return Ok(*header);
81-
}
82-
83-
drop(block_header_cache);
84-
85-
self.update_header(height)
86-
}
87-
88-
/// Update a block header at given `height`. Returns the updated header.
89-
fn update_header(&self, height: u32) -> Result<Header, Error> {
90-
let header = self.inner.block_header(height as usize)?;
91-
92-
self.block_header_cache
93-
.lock()
94-
.unwrap()
95-
.insert(height, header);
96-
97-
Ok(header)
98-
}
99-
10073
/// Broadcasts a transaction to the network.
10174
///
10275
/// This is a re-export of [`ElectrumApi::transaction_broadcast`].
@@ -292,7 +265,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
292265
let mut unused_spk_count = 0;
293266
let mut last_active_index = None;
294267

295-
loop {
268+
'batch_loop: loop {
296269
let spks = (0..batch_size)
297270
.map_while(|_| spks_with_expected_txids.next())
298271
.collect::<Vec<_>>();
@@ -308,7 +281,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
308281
if spk_history.is_empty() {
309282
unused_spk_count += 1;
310283
if unused_spk_count >= stop_gap {
311-
break;
284+
break 'batch_loop;
312285
}
313286
} else {
314287
last_active_index = Some(spk_index);
@@ -339,13 +312,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
339312
}
340313
}
341314
}
342-
343-
if unused_spk_count >= stop_gap {
344-
break;
345-
}
346315
}
347-
348-
Ok(last_active_index)
349316
}
350317

351318
/// Populate the `tx_update` with associated transactions/anchors of `outpoints`.
@@ -359,56 +326,74 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
359326
outpoints: impl IntoIterator<Item = OutPoint>,
360327
pending_anchors: &mut Vec<(Txid, usize)>,
361328
) -> Result<(), Error> {
362-
for outpoint in outpoints {
363-
let op_txid = outpoint.txid;
364-
let op_tx = self.fetch_tx(op_txid)?;
365-
let op_txout = match op_tx.output.get(outpoint.vout as usize) {
366-
Some(txout) => txout,
367-
None => continue,
368-
};
369-
debug_assert_eq!(op_tx.compute_txid(), op_txid);
370-
371-
// attempt to find the following transactions (alongside their chain positions), and
372-
// add to our sparsechain `update`:
373-
let mut has_residing = false; // tx in which the outpoint resides
374-
let mut has_spending = false; // tx that spends the outpoint
375-
for res in self.inner.script_get_history(&op_txout.script_pubkey)? {
376-
if has_residing && has_spending {
377-
break;
329+
// Collect valid outpoints with their corresponding `spk` and `tx`.
330+
let mut ops_spks_txs = Vec::new();
331+
for op in outpoints {
332+
if let Ok(tx) = self.fetch_tx(op.txid) {
333+
if let Some(txout) = tx.output.get(op.vout as usize) {
334+
ops_spks_txs.push((op, txout.script_pubkey.clone(), tx));
378335
}
336+
}
337+
}
379338

380-
if !has_residing && res.tx_hash == op_txid {
381-
has_residing = true;
382-
tx_update.txs.push(Arc::clone(&op_tx));
383-
match res.height.try_into() {
384-
// Returned heights 0 & -1 are reserved for unconfirmed txs.
385-
Ok(height) if height > 0 => {
386-
pending_anchors.push((res.tx_hash, height));
387-
}
388-
_ => {
389-
tx_update.seen_ats.insert((res.tx_hash, start_time));
390-
}
339+
// Dedup `spk`s, batch-fetch all histories in one call, and store them in a map.
340+
let unique_spks: Vec<_> = ops_spks_txs
341+
.iter()
342+
.map(|(_, spk, _)| spk.clone())
343+
.collect::<HashSet<_>>()
344+
.into_iter()
345+
.collect();
346+
let histories = self
347+
.inner
348+
.batch_script_get_history(unique_spks.iter().map(|spk| spk.as_script()))?;
349+
let mut spk_map = HashMap::new();
350+
for (spk, history) in unique_spks.into_iter().zip(histories.into_iter()) {
351+
spk_map.insert(spk, history);
352+
}
353+
354+
for (outpoint, spk, tx) in ops_spks_txs {
355+
if let Some(spk_history) = spk_map.get(&spk) {
356+
let mut has_residing = false; // tx in which the outpoint resides
357+
let mut has_spending = false; // tx that spends the outpoint
358+
359+
for res in spk_history {
360+
if has_residing && has_spending {
361+
break;
391362
}
392-
}
393363

394-
if !has_spending && res.tx_hash != op_txid {
395-
let res_tx = self.fetch_tx(res.tx_hash)?;
396-
// we exclude txs/anchors that do not spend our specified outpoint(s)
397-
has_spending = res_tx
398-
.input
399-
.iter()
400-
.any(|txin| txin.previous_output == outpoint);
401-
if !has_spending {
402-
continue;
364+
if !has_residing && res.tx_hash == outpoint.txid {
365+
has_residing = true;
366+
tx_update.txs.push(Arc::clone(&tx));
367+
match res.height.try_into() {
368+
// Returned heights 0 & -1 are reserved for unconfirmed txs.
369+
Ok(height) if height > 0 => {
370+
pending_anchors.push((res.tx_hash, height));
371+
}
372+
_ => {
373+
tx_update.seen_ats.insert((res.tx_hash, start_time));
374+
}
375+
}
403376
}
404-
tx_update.txs.push(Arc::clone(&res_tx));
405-
match res.height.try_into() {
406-
// Returned heights 0 & -1 are reserved for unconfirmed txs.
407-
Ok(height) if height > 0 => {
408-
pending_anchors.push((res.tx_hash, height));
377+
378+
if !has_spending && res.tx_hash != outpoint.txid {
379+
let res_tx = self.fetch_tx(res.tx_hash)?;
380+
// we exclude txs/anchors that do not spend our specified outpoint(s)
381+
has_spending = res_tx
382+
.input
383+
.iter()
384+
.any(|txin| txin.previous_output == outpoint);
385+
if !has_spending {
386+
continue;
409387
}
410-
_ => {
411-
tx_update.seen_ats.insert((res.tx_hash, start_time));
388+
tx_update.txs.push(Arc::clone(&res_tx));
389+
match res.height.try_into() {
390+
// Returned heights 0 & -1 are reserved for unconfirmed txs.
391+
Ok(height) if height > 0 => {
392+
pending_anchors.push((res.tx_hash, height));
393+
}
394+
_ => {
395+
tx_update.seen_ats.insert((res.tx_hash, start_time));
396+
}
412397
}
413398
}
414399
}
@@ -426,39 +411,47 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
426411
txids: impl IntoIterator<Item = Txid>,
427412
pending_anchors: &mut Vec<(Txid, usize)>,
428413
) -> Result<(), Error> {
414+
let mut txs = Vec::<(Txid, Arc<Transaction>)>::new();
415+
let mut scripts = Vec::new();
429416
for txid in txids {
430-
let tx = match self.fetch_tx(txid) {
431-
Ok(tx) => tx,
432-
Err(electrum_client::Error::Protocol(_)) => continue,
433-
Err(other_err) => return Err(other_err),
434-
};
417+
match self.fetch_tx(txid) {
418+
Ok(tx) => {
419+
let spk = tx
420+
.output
421+
.first()
422+
.map(|txo| &txo.script_pubkey)
423+
.expect("tx must have an output")
424+
.clone();
425+
txs.push((txid, tx));
426+
scripts.push(spk);
427+
}
428+
Err(electrum_client::Error::Protocol(_)) => {
429+
continue;
430+
}
431+
Err(e) => return Err(e),
432+
}
433+
}
435434

436-
let spk = tx
437-
.output
438-
.first()
439-
.map(|txo| &txo.script_pubkey)
440-
.expect("tx must have an output");
435+
// because of restrictions of the Electrum API, we have to use the `script_get_history`
436+
// call to get confirmation status of our transaction
437+
let spk_histories = self
438+
.inner
439+
.batch_script_get_history(scripts.iter().map(|spk| spk.as_script()))?;
441440

442-
// because of restrictions of the Electrum API, we have to use the `script_get_history`
443-
// call to get confirmation status of our transaction
444-
if let Some(r) = self
445-
.inner
446-
.script_get_history(spk)?
447-
.into_iter()
448-
.find(|r| r.tx_hash == txid)
449-
{
450-
match r.height.try_into() {
441+
for (tx, spk_history) in txs.into_iter().zip(spk_histories) {
442+
if let Some(res) = spk_history.into_iter().find(|res| res.tx_hash == tx.0) {
443+
match res.height.try_into() {
451444
// Returned heights 0 & -1 are reserved for unconfirmed txs.
452445
Ok(height) if height > 0 => {
453-
pending_anchors.push((txid, height));
446+
pending_anchors.push((tx.0, height));
454447
}
455448
_ => {
456-
tx_update.seen_ats.insert((r.tx_hash, start_time));
449+
tx_update.seen_ats.insert((res.tx_hash, start_time));
457450
}
458451
}
459452
}
460453

461-
tx_update.txs.push(tx);
454+
tx_update.txs.push(tx.1);
462455
}
463456

464457
Ok(())
@@ -472,19 +465,37 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
472465
let mut results = Vec::with_capacity(txs_with_heights.len());
473466
let mut to_fetch = Vec::new();
474467

475-
// Build a map for height to block hash conversions. This is for obtaining block hash data
476-
// with minimum `fetch_header` calls.
477-
let mut height_to_hash: HashMap<u32, BlockHash> = HashMap::new();
478-
for &(_, height) in txs_with_heights {
479-
let h = height as u32;
480-
if !height_to_hash.contains_key(&h) {
481-
// Try to obtain hash from the header cache, or fetch the header if absent.
482-
let hash = self.fetch_header(h)?.block_hash();
483-
height_to_hash.insert(h, hash);
468+
// Figure out which block heights we need headers for.
469+
let mut needed_heights: Vec<u32> =
470+
txs_with_heights.iter().map(|&(_, h)| h as u32).collect();
471+
needed_heights.sort_unstable();
472+
needed_heights.dedup();
473+
474+
let mut height_to_hash = HashMap::with_capacity(needed_heights.len());
475+
476+
// Collect headers of missing heights, and build `height_to_hash` map.
477+
{
478+
let mut cache = self.block_header_cache.lock().unwrap();
479+
480+
let mut missing_heights = Vec::new();
481+
for &height in &needed_heights {
482+
if let Some(header) = cache.get(&height) {
483+
height_to_hash.insert(height, header.block_hash());
484+
} else {
485+
missing_heights.push(height);
486+
}
487+
}
488+
489+
if !missing_heights.is_empty() {
490+
let headers = self.inner.batch_block_header(missing_heights.clone())?;
491+
for (height, header) in missing_heights.into_iter().zip(headers) {
492+
height_to_hash.insert(height, header.block_hash());
493+
cache.insert(height, header);
494+
}
484495
}
485496
}
486497

487-
// Check cache.
498+
// Check our anchor cache and queue up any proofs we still need.
488499
{
489500
let anchor_cache = self.anchor_cache.lock().unwrap();
490501
for &(txid, height) in txs_with_heights {
@@ -505,14 +516,22 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
505516
let proof = self.inner.transaction_get_merkle(&txid, height)?;
506517

507518
// Validate against header, retrying once on stale header.
508-
let mut header = self.fetch_header(height as u32)?;
519+
let mut header = {
520+
let cache = self.block_header_cache.lock().unwrap();
521+
cache[&(height as u32)]
522+
};
509523
let mut valid = electrum_client::utils::validate_merkle_proof(
510524
&txid,
511525
&header.merkle_root,
512526
&proof,
513527
);
514528
if !valid {
515-
header = self.update_header(height as u32)?;
529+
let new_header = self.inner.block_header(height)?;
530+
self.block_header_cache
531+
.lock()
532+
.unwrap()
533+
.insert(height as u32, new_header);
534+
header = new_header;
516535
valid = electrum_client::utils::validate_merkle_proof(
517536
&txid,
518537
&header.merkle_root,
@@ -526,7 +545,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
526545
confirmation_time: header.time as u64,
527546
block_id: BlockId {
528547
height: height as u32,
529-
hash: header.block_hash(),
548+
hash,
530549
},
531550
};
532551
self.anchor_cache

0 commit comments

Comments
 (0)