Skip to content

Commit 72fe65b

Browse files
evanlinjinLLFourn
andcommitted
feat(esplora)!: simplify chain update logic
Co-authored-by: LLFourn <[email protected]>
1 parent eded1a7 commit 72fe65b

File tree

3 files changed

+196
-211
lines changed

3 files changed

+196
-211
lines changed

crates/chain/src/local_chain.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,7 @@ impl CheckPoint {
220220
cp = cp.prev().expect("will break before genesis block");
221221
};
222222

223-
base
224-
.extend(core::iter::once(block_id).chain(tail.into_iter().rev()))
223+
base.extend(core::iter::once(block_id).chain(tail.into_iter().rev()))
225224
.expect("tail is in order")
226225
}
227226
}

crates/esplora/src/async_ext.rs

Lines changed: 94 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::collections::BTreeSet;
22

33
use async_trait::async_trait;
4-
use bdk_chain::collections::btree_map;
54
use bdk_chain::Anchor;
65
use bdk_chain::{
76
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
@@ -97,11 +96,11 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
9796
stop_gap: usize,
9897
parallel_requests: usize,
9998
) -> Result<FullScanUpdate<K>, Error> {
100-
let update_blocks = init_chain_update(self, &local_tip).await?;
99+
let latest_blocks = fetch_latest_blocks(self).await?;
101100
let (tx_graph, last_active_indices) =
102101
full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?;
103102
let local_chain =
104-
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
103+
chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?;
105104
Ok(FullScanUpdate {
106105
local_chain,
107106
tx_graph,
@@ -117,124 +116,117 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
117116
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
118117
parallel_requests: usize,
119118
) -> Result<SyncUpdate, Error> {
120-
let update_blocks = init_chain_update(self, &local_tip).await?;
119+
let latest_blocks = fetch_latest_blocks(self).await?;
121120
let tx_graph =
122121
sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?;
123122
let local_chain =
124-
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
123+
chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?;
125124
Ok(SyncUpdate {
126125
tx_graph,
127126
local_chain,
128127
})
129128
}
130129
}
131130

132-
/// Create the initial chain update.
131+
/// Fetch latest blocks from Esplora in an atomic call.
133132
///
134-
/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the
135-
/// update can connect to the `start_tip`.
136-
///
137-
/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and
133+
/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks AND
138134
/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for
139135
/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use
140136
/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when
141137
/// alternating between chain-sources.
142-
async fn init_chain_update(
138+
async fn fetch_latest_blocks(
143139
client: &esplora_client::AsyncClient,
144-
local_tip: &CheckPoint,
145140
) -> Result<BTreeMap<u32, BlockHash>, Error> {
146-
// Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
147-
// consistent.
148-
let mut fetched_blocks = client
141+
Ok(client
149142
.get_blocks(None)
150143
.await?
151144
.into_iter()
152145
.map(|b| (b.time.height, b.id))
153-
.collect::<BTreeMap<u32, BlockHash>>();
154-
let new_tip_height = fetched_blocks
155-
.keys()
156-
.last()
157-
.copied()
158-
.expect("must atleast have one block");
159-
160-
// Ensure `fetched_blocks` can create an update that connects with the original chain by
161-
// finding a "Point of Agreement".
162-
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
163-
if height > new_tip_height {
164-
continue;
165-
}
146+
.collect())
147+
}
166148

167-
let fetched_hash = match fetched_blocks.entry(height) {
168-
btree_map::Entry::Occupied(entry) => *entry.get(),
169-
btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height).await?),
170-
};
149+
/// Used instead of [`esplora_client::BlockingClient::get_block_hash`].
150+
///
151+
/// This first checks the previously fetched `latest_blocks` before fetching from Esplora again.
152+
async fn fetch_block(
153+
client: &esplora_client::AsyncClient,
154+
latest_blocks: &BTreeMap<u32, BlockHash>,
155+
height: u32,
156+
) -> Result<Option<BlockHash>, Error> {
157+
if let Some(&hash) = latest_blocks.get(&height) {
158+
return Ok(Some(hash));
159+
}
171160

172-
// We have found point of agreement so the update will connect!
173-
if fetched_hash == local_hash {
174-
break;
175-
}
161+
// We avoid fetching blocks higher than previously fetched `latest_blocks` as the local chain
162+
// tip is used to signal for the last-synced-up-to-height.
163+
let &tip_height = latest_blocks
164+
.keys()
165+
.last()
166+
.expect("must have atleast one entry");
167+
if height > tip_height {
168+
return Ok(None);
176169
}
177170

178-
Ok(fetched_blocks)
171+
Ok(Some(client.get_block_hash(height).await?))
179172
}
180173

181-
/// Fetches missing checkpoints and finalizes the [`local_chain::Update`].
174+
/// Create the [`local_chain::Update`].
182175
///
183-
/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an
184-
/// existing checkpoint/block under `local_tip` or `update_blocks`.
185-
async fn finalize_chain_update<A: Anchor>(
176+
/// We want to have a corresponding checkpoint per anchor height. However, checkpoints fetched
177+
/// should not surpass `latest_blocks`.
178+
async fn chain_update<A: Anchor>(
186179
client: &esplora_client::AsyncClient,
180+
latest_blocks: &BTreeMap<u32, BlockHash>,
187181
local_tip: &CheckPoint,
188182
anchors: &BTreeSet<(A, Txid)>,
189-
mut update_blocks: BTreeMap<u32, BlockHash>,
190183
) -> Result<local_chain::Update, Error> {
191-
let update_tip_height = update_blocks
192-
.keys()
193-
.last()
194-
.copied()
195-
.expect("must atleast have one block");
196-
197-
// We want to have a corresponding checkpoint per height. We iterate the heights of anchors
198-
// backwards, comparing it against our `local_tip`'s chain and our current set of
199-
// `update_blocks` to see if a corresponding checkpoint already exists.
200-
let anchor_heights = anchors
201-
.iter()
202-
.rev()
203-
.map(|(a, _)| a.anchor_block().height)
204-
// filter out heights that surpass the update tip
205-
.filter(|h| *h <= update_tip_height)
206-
// filter out duplicate heights
207-
.filter({
208-
let mut prev_height = Option::<u32>::None;
209-
move |h| match prev_height.replace(*h) {
210-
None => true,
211-
Some(prev_h) => prev_h != *h,
212-
}
213-
});
184+
let mut point_of_agreement = None;
185+
let mut conflicts = vec![];
186+
for local_cp in local_tip.iter() {
187+
let remote_hash = match fetch_block(client, latest_blocks, local_cp.height()).await? {
188+
Some(hash) => hash,
189+
None => continue,
190+
};
191+
if remote_hash == local_cp.hash() {
192+
point_of_agreement = Some(local_cp.clone());
193+
break;
194+
} else {
195+
// it is not strictly necessary to include all the conflicted heights (we do need the
196+
// first one) but it seems prudent to make sure the updated chain's heights are a
197+
// superset of the existing chain after update.
198+
conflicts.push(BlockId {
199+
height: local_cp.height(),
200+
hash: remote_hash,
201+
});
202+
}
203+
}
214204

215-
// We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of
216-
// checkpoints more efficient.
217-
let mut curr_cp = local_tip.clone();
205+
let mut tip = point_of_agreement.expect("remote esplora should have same genesis block");
218206

219-
for h in anchor_heights {
220-
if let Some(cp) = curr_cp.range(h..).last() {
221-
curr_cp = cp.clone();
222-
if cp.height() == h {
223-
continue;
224-
}
225-
}
226-
if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) {
227-
entry.insert(client.get_block_hash(h).await?);
207+
tip = tip
208+
.extend(conflicts.into_iter().rev())
209+
.expect("evicted are in order");
210+
211+
for anchor in anchors {
212+
let height = anchor.0.anchor_block().height;
213+
if tip.get(height).is_none() {
214+
let hash = match fetch_block(client, latest_blocks, height).await? {
215+
Some(hash) => hash,
216+
None => continue,
217+
};
218+
tip = tip.insert(BlockId { height, hash });
228219
}
229220
}
230221

222+
// insert the most recent blocks at the tip to make sure we update the tip and make the update
223+
// robust.
224+
for (&height, &hash) in latest_blocks.iter() {
225+
tip = tip.insert(BlockId { height, hash });
226+
}
227+
231228
Ok(local_chain::Update {
232-
tip: CheckPoint::from_block_ids(
233-
update_blocks
234-
.into_iter()
235-
.map(|(height, hash)| BlockId { height, hash }),
236-
)
237-
.expect("must be in order"),
229+
tip,
238230
introduce_older_blocks: true,
239231
})
240232
}
@@ -424,7 +416,7 @@ mod test {
424416
use electrsd::bitcoind::bitcoincore_rpc::RpcApi;
425417
use esplora_client::Builder;
426418

427-
use crate::async_ext::{finalize_chain_update, init_chain_update};
419+
use crate::async_ext::{chain_update, fetch_latest_blocks};
428420

429421
macro_rules! h {
430422
($index:literal) => {{
@@ -493,9 +485,8 @@ mod test {
493485
// craft initial `local_chain`
494486
let local_chain = {
495487
let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?);
496-
let chain_tip = chain.tip();
497-
let update_blocks = init_chain_update(&client, &chain_tip).await?;
498-
let update_anchors = t
488+
// force `chain_update_blocking` to add all checkpoints in `t.initial_cps`
489+
let anchors = t
499490
.initial_cps
500491
.iter()
501492
.map(|&height| -> anyhow::Result<_> {
@@ -508,10 +499,14 @@ mod test {
508499
))
509500
})
510501
.collect::<anyhow::Result<BTreeSet<_>>>()?;
511-
let chain_update =
512-
finalize_chain_update(&client, &chain_tip, &update_anchors, update_blocks)
513-
.await?;
514-
chain.apply_update(chain_update)?;
502+
let update = chain_update(
503+
&client,
504+
&fetch_latest_blocks(&client).await?,
505+
&chain.tip(),
506+
&anchors,
507+
)
508+
.await?;
509+
chain.apply_update(update)?;
515510
chain
516511
};
517512
println!("local chain height: {}", local_chain.tip().height());
@@ -529,9 +524,7 @@ mod test {
529524

530525
// craft update
531526
let update = {
532-
let local_tip = local_chain.tip();
533-
let update_blocks = init_chain_update(&client, &local_tip).await?;
534-
let update_anchors = t
527+
let anchors = t
535528
.anchors
536529
.iter()
537530
.map(|&(height, txid)| -> anyhow::Result<_> {
@@ -544,7 +537,13 @@ mod test {
544537
))
545538
})
546539
.collect::<anyhow::Result<_>>()?;
547-
finalize_chain_update(&client, &local_tip, &update_anchors, update_blocks).await?
540+
chain_update(
541+
&client,
542+
&fetch_latest_blocks(&client).await?,
543+
&local_chain.tip(),
544+
&anchors,
545+
)
546+
.await?
548547
};
549548

550549
// apply update

0 commit comments

Comments
 (0)