Skip to content

Commit 5860704

Browse files
committed
Implement redesigned versions of EsploraExt and EsploraAsyncExt
All associated examples are also updated.
1 parent 2952341 commit 5860704

File tree

9 files changed

+704
-177
lines changed

9 files changed

+704
-177
lines changed

crates/esplora/src/async_ext.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use crate::map_confirmation_time;
1919
///
2020
/// [`EsploraExt`]: crate::EsploraExt
2121
/// [crate-level documentation]: crate
22-
#[cfg(feature = "async")]
2322
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
2423
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
2524
pub trait EsploraAsyncExt {
@@ -84,7 +83,6 @@ pub trait EsploraAsyncExt {
8483
}
8584
}
8685

87-
#[cfg(feature = "async")]
8886
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
8987
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
9088
impl EsploraAsyncExt for esplora_client::AsyncClient {
@@ -103,7 +101,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
103101
) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
104102
let txids = txids.into_iter();
105103
let outpoints = outpoints.into_iter();
106-
let parallel_requests = parallel_requests.max(1);
104+
let parallel_requests = Ord::max(parallel_requests, 1);
107105
let mut scan = KeychainScan::default();
108106
let update = &mut scan.update;
109107
let last_active_indices = &mut scan.last_active_indices;
@@ -285,7 +283,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
285283
}
286284

287285
let reorg_occurred = {
288-
if let Some(checkpoint) = update.chain().latest_checkpoint() {
286+
if let Some(checkpoint) = ChainGraph::chain(update).latest_checkpoint() {
289287
self.get_block_hash(checkpoint.height).await? != checkpoint.hash
290288
} else {
291289
false
@@ -295,8 +293,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
295293
if reorg_occurred {
296294
// A reorg occurred, so let's find out where all the txids we found are in the chain now.
297295
// XXX: collect required because of weird type naming issues
298-
let txids_found = update
299-
.chain()
296+
let txids_found = ChainGraph::chain(update)
300297
.txids()
301298
.map(|(_, txid)| *txid)
302299
.collect::<Vec<_>>();

crates/esplora/src/blocking_ext.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl EsploraExt for esplora_client::BlockingClient {
8282
stop_gap: usize,
8383
parallel_requests: usize,
8484
) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
85-
let parallel_requests = parallel_requests.max(1);
85+
let parallel_requests = Ord::max(parallel_requests, 1);
8686
let mut scan = KeychainScan::default();
8787
let update = &mut scan.update;
8888
let last_active_indices = &mut scan.last_active_indices;
@@ -260,7 +260,7 @@ impl EsploraExt for esplora_client::BlockingClient {
260260
}
261261

262262
let reorg_occurred = {
263-
if let Some(checkpoint) = update.chain().latest_checkpoint() {
263+
if let Some(checkpoint) = ChainGraph::chain(update).latest_checkpoint() {
264264
self.get_block_hash(checkpoint.height)? != checkpoint.hash
265265
} else {
266266
false
@@ -270,8 +270,7 @@ impl EsploraExt for esplora_client::BlockingClient {
270270
if reorg_occurred {
271271
// A reorg occurred, so let's find out where all the txids we found are now in the chain.
272272
// XXX: collect required because of weird type naming issues
273-
let txids_found = update
274-
.chain()
273+
let txids_found = ChainGraph::chain(update)
275274
.txids()
276275
.map(|(_, txid)| *txid)
277276
.collect::<Vec<_>>();

crates/esplora/src/lib.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#![doc = include_str!("../README.md")]
2-
use bdk_chain::ConfirmationTime;
2+
use bdk_chain::{BlockId, ConfirmationTime, ConfirmationTimeAnchor};
33
use esplora_client::TxStatus;
44

55
pub use esplora_client;
6+
pub mod v2;
67

78
#[cfg(feature = "blocking")]
89
mod blocking_ext;
@@ -25,3 +26,17 @@ pub(crate) fn map_confirmation_time(
2526
_ => ConfirmationTime::Unconfirmed { last_seen: 0 },
2627
}
2728
}
29+
30+
pub(crate) fn map_confirmation_time_anchor(
31+
tx_status: &TxStatus,
32+
tip_at_start: BlockId,
33+
) -> Option<ConfirmationTimeAnchor> {
34+
match (tx_status.block_time, tx_status.block_height) {
35+
(Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor {
36+
anchor_block: tip_at_start,
37+
confirmation_height,
38+
confirmation_time,
39+
}),
40+
_ => None,
41+
}
42+
}

crates/esplora/src/v2/async_ext.rs

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
use async_trait::async_trait;
2+
use bdk_chain::{
3+
bitcoin::{BlockHash, OutPoint, Script, Txid},
4+
collections::BTreeMap,
5+
keychain::LocalUpdate,
6+
BlockId, ConfirmationTimeAnchor,
7+
};
8+
use esplora_client::{Error, OutputStatus};
9+
use futures::{stream::FuturesOrdered, TryStreamExt};
10+
11+
use crate::map_confirmation_time_anchor;
12+
13+
/// Trait to extend [`esplora_client::AsyncClient`] functionality.
14+
///
15+
/// This is the async version of [`EsploraExt`]. Refer to
16+
/// [crate-level documentation] for more.
17+
///
18+
/// [`EsploraExt`]: crate::EsploraExt
19+
/// [crate-level documentation]: crate
20+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
21+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
22+
pub trait EsploraAsyncExt {
23+
/// Scan the blockchain (via esplora) for the data specified and returns a
24+
/// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
25+
///
26+
/// - `local_chain`: the most recent block hashes present locally
27+
/// - `keychain_spks`: keychains that we want to scan transactions for
28+
/// - `txids`: transactions for which we want updated [`ChainPosition`]s
29+
/// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
30+
/// want to included in the update
31+
///
32+
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
33+
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
34+
/// parallel.
35+
///
36+
/// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
37+
#[allow(clippy::result_large_err)] // FIXME
38+
async fn scan<K: Ord + Clone + Send>(
39+
&self,
40+
local_chain: &BTreeMap<u32, BlockHash>,
41+
keychain_spks: BTreeMap<
42+
K,
43+
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
44+
>,
45+
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
46+
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
47+
stop_gap: usize,
48+
parallel_requests: usize,
49+
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
50+
51+
/// Convenience method to call [`scan`] without requiring a keychain.
52+
///
53+
/// [`scan`]: EsploraAsyncExt::scan
54+
#[allow(clippy::result_large_err)] // FIXME
55+
async fn scan_without_keychain(
56+
&self,
57+
local_chain: &BTreeMap<u32, BlockHash>,
58+
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = Script> + Send> + Send,
59+
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
60+
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
61+
parallel_requests: usize,
62+
) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
63+
self.scan(
64+
local_chain,
65+
[(
66+
(),
67+
misc_spks
68+
.into_iter()
69+
.enumerate()
70+
.map(|(i, spk)| (i as u32, spk)),
71+
)]
72+
.into(),
73+
txids,
74+
outpoints,
75+
usize::MAX,
76+
parallel_requests,
77+
)
78+
.await
79+
}
80+
}
81+
82+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
83+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
84+
impl EsploraAsyncExt for esplora_client::AsyncClient {
85+
#[allow(clippy::result_large_err)] // FIXME
86+
async fn scan<K: Ord + Clone + Send>(
87+
&self,
88+
local_chain: &BTreeMap<u32, BlockHash>,
89+
keychain_spks: BTreeMap<
90+
K,
91+
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
92+
>,
93+
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
94+
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
95+
stop_gap: usize,
96+
parallel_requests: usize,
97+
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
98+
let parallel_requests = Ord::max(parallel_requests, 1);
99+
100+
let (mut update, tip_at_start) = loop {
101+
let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
102+
103+
for (&height, &original_hash) in local_chain.iter().rev() {
104+
let update_block_id = BlockId {
105+
height,
106+
hash: self.get_block_hash(height).await?,
107+
};
108+
let _ = update
109+
.chain
110+
.insert_block(update_block_id)
111+
.expect("cannot repeat height here");
112+
if update_block_id.hash == original_hash {
113+
break;
114+
}
115+
}
116+
117+
let tip_at_start = BlockId {
118+
height: self.get_height().await?,
119+
hash: self.get_tip_hash().await?,
120+
};
121+
122+
if update.chain.insert_block(tip_at_start).is_ok() {
123+
break (update, tip_at_start);
124+
}
125+
};
126+
127+
for (keychain, spks) in keychain_spks {
128+
let mut spks = spks.into_iter();
129+
let mut last_active_index = None;
130+
let mut empty_scripts = 0;
131+
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
132+
133+
loop {
134+
let futures = (0..parallel_requests)
135+
.filter_map(|_| {
136+
let (index, script) = spks.next()?;
137+
let client = self.clone();
138+
Some(async move {
139+
let mut related_txs = client.scripthash_txs(&script, None).await?;
140+
141+
let n_confirmed =
142+
related_txs.iter().filter(|tx| tx.status.confirmed).count();
143+
// esplora pages on 25 confirmed transactions. If there are 25 or more we
144+
// keep requesting to see if there's more.
145+
if n_confirmed >= 25 {
146+
loop {
147+
let new_related_txs = client
148+
.scripthash_txs(
149+
&script,
150+
Some(related_txs.last().unwrap().txid),
151+
)
152+
.await?;
153+
let n = new_related_txs.len();
154+
related_txs.extend(new_related_txs);
155+
// we've reached the end
156+
if n < 25 {
157+
break;
158+
}
159+
}
160+
}
161+
162+
Result::<_, esplora_client::Error>::Ok((index, related_txs))
163+
})
164+
})
165+
.collect::<FuturesOrdered<_>>();
166+
167+
let n_futures = futures.len();
168+
169+
for (index, related_txs) in futures.try_collect::<Vec<IndexWithTxs>>().await? {
170+
if related_txs.is_empty() {
171+
empty_scripts += 1;
172+
} else {
173+
last_active_index = Some(index);
174+
empty_scripts = 0;
175+
}
176+
for tx in related_txs {
177+
let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
178+
179+
let _ = update.graph.insert_tx(tx.to_tx());
180+
if let Some(anchor) = anchor {
181+
let _ = update.graph.insert_anchor(tx.txid, anchor);
182+
}
183+
}
184+
}
185+
186+
if n_futures == 0 || empty_scripts >= stop_gap {
187+
break;
188+
}
189+
}
190+
191+
if let Some(last_active_index) = last_active_index {
192+
update.keychain.insert(keychain, last_active_index);
193+
}
194+
}
195+
196+
for txid in txids.into_iter() {
197+
if update.graph.get_tx(txid).is_none() {
198+
match self.get_tx(&txid).await? {
199+
Some(tx) => {
200+
let _ = update.graph.insert_tx(tx);
201+
}
202+
None => continue,
203+
}
204+
}
205+
match self.get_tx_status(&txid).await? {
206+
Some(tx_status) => {
207+
if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
208+
let _ = update.graph.insert_anchor(txid, anchor);
209+
}
210+
}
211+
None => continue,
212+
}
213+
}
214+
215+
for op in outpoints.into_iter() {
216+
let mut op_txs = Vec::with_capacity(2);
217+
if let (Some(tx), Some(tx_status)) = (
218+
self.get_tx(&op.txid).await?,
219+
self.get_tx_status(&op.txid).await?,
220+
) {
221+
op_txs.push((tx, tx_status));
222+
if let Some(OutputStatus {
223+
txid: Some(txid),
224+
status: Some(spend_status),
225+
..
226+
}) = self.get_output_status(&op.txid, op.vout as _).await?
227+
{
228+
if let Some(spend_tx) = self.get_tx(&txid).await? {
229+
op_txs.push((spend_tx, spend_status));
230+
}
231+
}
232+
}
233+
234+
for (tx, status) in op_txs {
235+
let txid = tx.txid();
236+
let anchor = map_confirmation_time_anchor(&status, tip_at_start);
237+
238+
let _ = update.graph.insert_tx(tx);
239+
if let Some(anchor) = anchor {
240+
let _ = update.graph.insert_anchor(txid, anchor);
241+
}
242+
}
243+
}
244+
245+
if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? {
246+
// A reorg occurred, so let's find out where all the txids we found are now in the chain
247+
let txids_found = update
248+
.graph
249+
.full_txs()
250+
.map(|tx_node| tx_node.txid)
251+
.collect::<Vec<_>>();
252+
update.chain = EsploraAsyncExt::scan_without_keychain(
253+
self,
254+
local_chain,
255+
[],
256+
txids_found,
257+
[],
258+
parallel_requests,
259+
)
260+
.await?
261+
.chain;
262+
}
263+
264+
Ok(update)
265+
}
266+
}

0 commit comments

Comments
 (0)