Skip to content

Commit c0374a0

Browse files
committed
feat(chain): SyncRequest now uses ExactSizeIterators
This allows the caller to track sync progress.
1 parent 0f94f24 commit c0374a0

File tree

2 files changed

+117
-23
lines changed

2 files changed

+117
-23
lines changed

crates/chain/src/spk_client.rs

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Helper types for spk-based blockchain clients.
22
3-
use core::{fmt::Debug, ops::RangeBounds};
3+
use core::{fmt::Debug, marker::PhantomData, ops::RangeBounds};
44

55
use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
66
use bitcoin::{OutPoint, Script, ScriptBuf, Txid};
@@ -18,11 +18,11 @@ pub struct SyncRequest {
1818
/// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip
1919
pub chain_tip: CheckPoint,
2020
/// Transactions that spend from or to these indexed script pubkeys.
21-
pub spks: Box<dyn Iterator<Item = ScriptBuf> + Send>,
21+
pub spks: Box<dyn ExactSizeIterator<Item = ScriptBuf> + Send>,
2222
/// Transactions with these txids.
23-
pub txids: Box<dyn Iterator<Item = Txid> + Send>,
23+
pub txids: Box<dyn ExactSizeIterator<Item = Txid> + Send>,
2424
/// Transactions with these outpoints or spent from these outpoints.
25-
pub outpoints: Box<dyn Iterator<Item = OutPoint> + Send>,
25+
pub outpoints: Box<dyn ExactSizeIterator<Item = OutPoint> + Send>,
2626
}
2727

2828
impl SyncRequest {
@@ -42,7 +42,7 @@ impl SyncRequest {
4242
#[must_use]
4343
pub fn set_spks(
4444
mut self,
45-
spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send + 'static>,
45+
spks: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static>,
4646
) -> Self {
4747
self.spks = Box::new(spks.into_iter());
4848
self
@@ -54,7 +54,7 @@ impl SyncRequest {
5454
#[must_use]
5555
pub fn set_txids(
5656
mut self,
57-
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send + 'static>,
57+
txids: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static>,
5858
) -> Self {
5959
self.txids = Box::new(txids.into_iter());
6060
self
@@ -66,7 +66,9 @@ impl SyncRequest {
6666
#[must_use]
6767
pub fn set_outpoints(
6868
mut self,
69-
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send + 'static>,
69+
outpoints: impl IntoIterator<
70+
IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
71+
>,
7072
) -> Self {
7173
self.outpoints = Box::new(outpoints.into_iter());
7274
self
@@ -79,11 +81,11 @@ impl SyncRequest {
7981
pub fn chain_spks(
8082
mut self,
8183
spks: impl IntoIterator<
82-
IntoIter = impl Iterator<Item = ScriptBuf> + Send + 'static,
84+
IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static,
8385
Item = ScriptBuf,
8486
>,
8587
) -> Self {
86-
self.spks = Box::new(self.spks.chain(spks));
88+
self.spks = Box::new(ExactSizeChain::new(self.spks, spks.into_iter()));
8789
self
8890
}
8991

@@ -93,9 +95,12 @@ impl SyncRequest {
9395
#[must_use]
9496
pub fn chain_txids(
9597
mut self,
96-
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send + 'static, Item = Txid>,
98+
txids: impl IntoIterator<
99+
IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static,
100+
Item = Txid,
101+
>,
97102
) -> Self {
98-
self.txids = Box::new(self.txids.chain(txids));
103+
self.txids = Box::new(ExactSizeChain::new(self.txids, txids.into_iter()));
99104
self
100105
}
101106

@@ -106,39 +111,42 @@ impl SyncRequest {
106111
pub fn chain_outpoints(
107112
mut self,
108113
outpoints: impl IntoIterator<
109-
IntoIter = impl Iterator<Item = OutPoint> + Send + 'static,
114+
IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
110115
Item = OutPoint,
111116
>,
112117
) -> Self {
113-
self.outpoints = Box::new(self.outpoints.chain(outpoints));
118+
self.outpoints = Box::new(ExactSizeChain::new(self.outpoints, outpoints.into_iter()));
114119
self
115120
}
116121

117-
/// Add a closure that will be called for each [`Script`] synced in this request.
122+
/// Add a closure that will be called for [`Script`]s previously added to this request.
118123
///
119124
/// This consumes the [`SyncRequest`] and returns the updated one.
120125
#[must_use]
121-
pub fn inspect_spks(mut self, inspect: impl Fn(&Script) + Send + Sync + 'static) -> Self {
126+
pub fn inspect_spks(
127+
mut self,
128+
mut inspect: impl FnMut(&Script) + Send + Sync + 'static,
129+
) -> Self {
122130
self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk)));
123131
self
124132
}
125133

126-
/// Add a closure that will be called for each [`Txid`] synced in this request.
134+
/// Add a closure that will be called for [`Txid`]s previously added to this request.
127135
///
128136
/// This consumes the [`SyncRequest`] and returns the updated one.
129137
#[must_use]
130-
pub fn inspect_txids(mut self, inspect: impl Fn(&Txid) + Send + Sync + 'static) -> Self {
138+
pub fn inspect_txids(mut self, mut inspect: impl FnMut(&Txid) + Send + Sync + 'static) -> Self {
131139
self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid)));
132140
self
133141
}
134142

135-
/// Add a closure that will be called for each [`OutPoint`] synced in this request.
143+
/// Add a closure that will be called for [`OutPoint`]s previously added to this request.
136144
///
137145
/// This consumes the [`SyncRequest`] and returns the updated one.
138146
#[must_use]
139147
pub fn inspect_outpoints(
140148
mut self,
141-
inspect: impl Fn(&OutPoint) + Send + Sync + 'static,
149+
mut inspect: impl FnMut(&OutPoint) + Send + Sync + 'static,
142150
) -> Self {
143151
self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op)));
144152
self
@@ -313,3 +321,64 @@ pub struct FullScanResult<K> {
313321
/// Last active indices for the corresponding keychains (`K`).
314322
pub last_active_indices: BTreeMap<K, u32>,
315323
}
324+
325+
/// A version of [`core::iter::Chain`] which can combine two [`ExactSizeIterator`]s to form a new
326+
/// [`ExactSizeIterator`].
327+
///
328+
/// The danger of this is explained in [the `ExactSizeIterator` docs]
329+
/// (https://doc.rust-lang.org/core/iter/trait.ExactSizeIterator.html#when-shouldnt-an-adapter-be-exactsizeiterator).
330+
/// This does not apply here since it would be impossible to scan an item count that overflows
331+
/// `usize` anyway.
332+
struct ExactSizeChain<A, B, I> {
333+
a: Option<A>,
334+
b: Option<B>,
335+
i: PhantomData<I>,
336+
}
337+
338+
impl<A, B, I> ExactSizeChain<A, B, I> {
339+
fn new(a: A, b: B) -> Self {
340+
ExactSizeChain {
341+
a: Some(a),
342+
b: Some(b),
343+
i: PhantomData,
344+
}
345+
}
346+
}
347+
348+
impl<A, B, I> Iterator for ExactSizeChain<A, B, I>
349+
where
350+
A: Iterator<Item = I>,
351+
B: Iterator<Item = I>,
352+
{
353+
type Item = I;
354+
355+
fn next(&mut self) -> Option<Self::Item> {
356+
if let Some(a) = &mut self.a {
357+
let item = a.next();
358+
if item.is_some() {
359+
return item;
360+
}
361+
self.a = None;
362+
}
363+
if let Some(b) = &mut self.b {
364+
let item = b.next();
365+
if item.is_some() {
366+
return item;
367+
}
368+
self.b = None;
369+
}
370+
None
371+
}
372+
}
373+
374+
impl<A, B, I> ExactSizeIterator for ExactSizeChain<A, B, I>
375+
where
376+
A: ExactSizeIterator<Item = I>,
377+
B: ExactSizeIterator<Item = I>,
378+
{
379+
fn len(&self) -> usize {
380+
let a_len = self.a.as_ref().map(|a| a.len()).unwrap_or(0);
381+
let b_len = self.b.as_ref().map(|a| a.len()).unwrap_or(0);
382+
a_len + b_len
383+
}
384+
}

example-crates/example_esplora/src/main.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ fn main() -> anyhow::Result<()> {
248248
.map(|(k, i, spk)| (k.to_owned(), i, spk.to_owned()))
249249
.collect::<Vec<_>>();
250250
request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| {
251-
eprintln!("scanning {}:{}", k, i);
251+
eprint!("scanning {}:{}", k, i);
252252
// Flush early to ensure we print at every iteration.
253253
let _ = io::stderr().flush();
254254
spk
@@ -262,7 +262,7 @@ fn main() -> anyhow::Result<()> {
262262
.collect::<Vec<_>>();
263263
request =
264264
request.chain_spks(unused_spks.into_iter().map(move |(k, i, spk)| {
265-
eprintln!(
265+
eprint!(
266266
"Checking if address {} {}:{} has been used",
267267
Address::from_script(&spk, args.network).unwrap(),
268268
k,
@@ -287,7 +287,7 @@ fn main() -> anyhow::Result<()> {
287287
utxos
288288
.into_iter()
289289
.inspect(|utxo| {
290-
eprintln!(
290+
eprint!(
291291
"Checking if outpoint {} (value: {}) has been spent",
292292
utxo.outpoint, utxo.txout.value
293293
);
@@ -308,13 +308,38 @@ fn main() -> anyhow::Result<()> {
308308
.map(|canonical_tx| canonical_tx.tx_node.txid)
309309
.collect::<Vec<Txid>>();
310310
request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| {
311-
eprintln!("Checking if {} is confirmed yet", txid);
311+
eprint!("Checking if {} is confirmed yet", txid);
312312
// Flush early to ensure we print at every iteration.
313313
let _ = io::stderr().flush();
314314
}));
315315
}
316316
}
317317

318+
let total_spks = request.spks.len();
319+
let total_txids = request.txids.len();
320+
let total_ops = request.outpoints.len();
321+
request = request
322+
.inspect_spks({
323+
let mut visited = 0;
324+
move |_| {
325+
visited += 1;
326+
eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32)
327+
}
328+
})
329+
.inspect_txids({
330+
let mut visited = 0;
331+
move |_| {
332+
visited += 1;
333+
eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32)
334+
}
335+
})
336+
.inspect_outpoints({
337+
let mut visited = 0;
338+
move |_| {
339+
visited += 1;
340+
eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32)
341+
}
342+
});
318343
let mut update = client.sync(request, scan_options.parallel_requests)?;
319344

320345
// Update last seen unconfirmed

0 commit comments

Comments
 (0)