Skip to content

Commit e0512ac

Browse files
committed
feat(bitcoind_rpc)!: emissions include checkpoint and connected_to data
Previously, emissions are purely blocks + the block height. This means emitted blocks can only connect to previous-adjacent blocks. Hence, sync must start from genesis and include every block.
1 parent 8f2d4d9 commit e0512ac

File tree

3 files changed

+128
-74
lines changed

3 files changed

+128
-74
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ pub struct Emitter<'c, C> {
4343
}
4444

4545
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
46-
/// Construct a new [`Emitter`] with the given RPC `client`, `last_cp` and `start_height`.
46+
/// Construct a new [`Emitter`].
4747
///
48-
/// * `last_cp` is the check point used to find the latest block which is still part of the best
49-
/// chain.
50-
/// * `start_height` is the block height to start emitting blocks from.
48+
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
49+
/// can start emission from a block that connects to the original chain.
50+
///
51+
/// `start_height` starts emission from a given height (if there are no conflicts with the
52+
/// original chain).
5153
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
5254
Self {
5355
client,
@@ -127,13 +129,58 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
127129
}
128130

129131
/// Emit the next block height and header (if any).
130-
pub fn next_header(&mut self) -> Result<Option<(u32, Header)>, bitcoincore_rpc::Error> {
131-
poll(self, |hash| self.client.get_block_header(hash))
132+
pub fn next_header(&mut self) -> Result<Option<BlockEvent<Header>>, bitcoincore_rpc::Error> {
133+
Ok(poll(self, |hash| self.client.get_block_header(hash))?
134+
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
132135
}
133136

134137
/// Emit the next block height and block (if any).
135-
pub fn next_block(&mut self) -> Result<Option<(u32, Block)>, bitcoincore_rpc::Error> {
136-
poll(self, |hash| self.client.get_block(hash))
138+
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
139+
Ok(poll(self, |hash| self.client.get_block(hash))?
140+
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
141+
}
142+
}
143+
144+
/// A newly emitted block from [`Emitter`].
145+
#[derive(Debug)]
146+
pub struct BlockEvent<B> {
147+
/// Either a full [`Block`] or [`Header`] of the new block.
148+
pub block: B,
149+
150+
/// The checkpoint of the new block.
151+
///
152+
/// A [`CheckPoint`] is a node of a linked list of [`BlockId`]s. This checkpoint is linked to
153+
/// all [`BlockId`]s originally passed in [`Emitter::new`] as well as emitted blocks since then.
154+
/// These blocks are guaranteed to be of the same chain.
155+
///
156+
/// This is important as BDK structures require block-to-apply to be connected with another
157+
/// block in the original chain.
158+
pub checkpoint: CheckPoint,
159+
}
160+
161+
impl<B> BlockEvent<B> {
162+
/// The block height of this new block.
163+
pub fn block_height(&self) -> u32 {
164+
self.checkpoint.height()
165+
}
166+
167+
/// The block hash of this new block.
168+
pub fn block_hash(&self) -> BlockHash {
169+
self.checkpoint.hash()
170+
}
171+
172+
/// The [`BlockId`] of a previous block that this block connects to.
173+
///
174+
/// This either returns a [`BlockId`] of a previously emitted block or from the chain we started
175+
/// with (passed in as `last_cp` in [`Emitter::new`]).
176+
///
177+
/// This value is derived from [`BlockEvent::checkpoint`].
178+
pub fn connected_to(&self) -> BlockId {
179+
match self.checkpoint.prev() {
180+
Some(prev_cp) => prev_cp.block_id(),
181+
// there is no previous checkpoint, so just connect with itself
182+
None => self.checkpoint.block_id(),
183+
}
137184
}
138185
}
139186

@@ -203,7 +250,7 @@ where
203250
fn poll<C, V, F>(
204251
emitter: &mut Emitter<C>,
205252
get_item: F,
206-
) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
253+
) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
207254
where
208255
C: bitcoincore_rpc::RpcApi,
209256
F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
@@ -215,13 +262,14 @@ where
215262
let hash = res.hash;
216263
let item = get_item(&hash)?;
217264

218-
emitter.last_cp = emitter
265+
let new_cp = emitter
219266
.last_cp
220267
.clone()
221268
.push(BlockId { height, hash })
222269
.expect("must push");
270+
emitter.last_cp = new_cp.clone();
223271
emitter.last_block = Some(res);
224-
return Ok(Some((height, item)));
272+
return Ok(Some((new_cp, item)));
225273
}
226274
PollResponse::NoMoreBlocks => {
227275
emitter.last_block = None;

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -157,28 +157,6 @@ impl TestEnv {
157157
}
158158
}
159159

160-
fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update {
161-
let this_id = BlockId {
162-
height,
163-
hash: block.block_hash(),
164-
};
165-
let tip = if block.header.prev_blockhash == BlockHash::all_zeros() {
166-
CheckPoint::new(this_id)
167-
} else {
168-
CheckPoint::new(BlockId {
169-
height: height - 1,
170-
hash: block.header.prev_blockhash,
171-
})
172-
.extend(core::iter::once(this_id))
173-
.expect("must construct checkpoint")
174-
};
175-
176-
local_chain::Update {
177-
tip,
178-
introduce_older_blocks: false,
179-
}
180-
}
181-
182160
/// Ensure that blocks are emitted in order even after reorg.
183161
///
184162
/// 1. Mine 101 blocks.
@@ -200,17 +178,21 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
200178

201179
// see if the emitter outputs the right blocks
202180
println!("first sync:");
203-
while let Some((height, block)) = emitter.next_block()? {
181+
while let Some(emission) = emitter.next_block()? {
182+
let height = emission.block_height();
183+
let hash = emission.block_hash();
204184
assert_eq!(
205-
block.block_hash(),
185+
emission.block_hash(),
206186
exp_hashes[height as usize],
207187
"emitted block hash is unexpected"
208188
);
209189

210-
let chain_update = block_to_chain_update(&block, height);
211190
assert_eq!(
212-
local_chain.apply_update(chain_update)?,
213-
BTreeMap::from([(height, Some(block.block_hash()))]),
191+
local_chain.apply_update(local_chain::Update {
192+
tip: emission.checkpoint,
193+
introduce_older_blocks: false,
194+
})?,
195+
BTreeMap::from([(height, Some(hash))]),
214196
"chain update changeset is unexpected",
215197
);
216198
}
@@ -237,27 +219,30 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
237219
// see if the emitter outputs the right blocks
238220
println!("after reorg:");
239221
let mut exp_height = exp_hashes.len() - reorged_blocks.len();
240-
while let Some((height, block)) = emitter.next_block()? {
222+
while let Some(emission) = emitter.next_block()? {
223+
let height = emission.block_height();
224+
let hash = emission.block_hash();
241225
assert_eq!(
242226
height, exp_height as u32,
243227
"emitted block has unexpected height"
244228
);
245229

246230
assert_eq!(
247-
block.block_hash(),
248-
exp_hashes[height as usize],
231+
hash, exp_hashes[height as usize],
249232
"emitted block is unexpected"
250233
);
251234

252-
let chain_update = block_to_chain_update(&block, height);
253235
assert_eq!(
254-
local_chain.apply_update(chain_update)?,
236+
local_chain.apply_update(local_chain::Update {
237+
tip: emission.checkpoint,
238+
introduce_older_blocks: false,
239+
})?,
255240
if exp_height == exp_hashes.len() - reorged_blocks.len() {
256-
core::iter::once((height, Some(block.block_hash())))
241+
core::iter::once((height, Some(hash)))
257242
.chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None)))
258243
.collect::<bdk_chain::local_chain::ChangeSet>()
259244
} else {
260-
BTreeMap::from([(height, Some(block.block_hash()))])
245+
BTreeMap::from([(height, Some(hash))])
261246
},
262247
"chain update changeset is unexpected",
263248
);
@@ -307,9 +292,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
307292

308293
let emitter = &mut Emitter::new(&env.client, chain.tip(), 0);
309294

310-
while let Some((height, block)) = emitter.next_block()? {
311-
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
312-
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
295+
while let Some(emission) = emitter.next_block()? {
296+
let height = emission.block_height();
297+
let _ = chain.apply_update(local_chain::Update {
298+
tip: emission.checkpoint,
299+
introduce_older_blocks: false,
300+
})?;
301+
let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height);
313302
assert!(indexed_additions.is_empty());
314303
}
315304

@@ -367,10 +356,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
367356

368357
// must receive mined block which will confirm the transactions.
369358
{
370-
let (height, block) = emitter.next_block()?.expect("must get mined block");
371-
let _ = chain
372-
.apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
373-
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
359+
let emission = emitter.next_block()?.expect("must get mined block");
360+
let height = emission.block_height();
361+
let _ = chain.apply_update(local_chain::Update {
362+
tip: emission.checkpoint,
363+
introduce_older_blocks: false,
364+
})?;
365+
let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height);
374366
assert!(indexed_additions.graph.txs.is_empty());
375367
assert!(indexed_additions.graph.txouts.is_empty());
376368
assert_eq!(indexed_additions.graph.anchors, exp_anchors);
@@ -407,9 +399,12 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
407399

408400
for reorg_count in 1..=10 {
409401
let replaced_blocks = env.reorg_empty_blocks(reorg_count)?;
410-
let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg");
402+
let next_emission = emitter.next_header()?.expect("must emit block after reorg");
411403
assert_eq!(
412-
(height as usize, next_header.block_hash()),
404+
(
405+
next_emission.block_height() as usize,
406+
next_emission.block_hash()
407+
),
413408
replaced_blocks[0],
414409
"block emitted after reorg should be at the reorg height"
415410
);
@@ -439,8 +434,9 @@ fn sync_from_emitter<C>(
439434
where
440435
C: bitcoincore_rpc::RpcApi,
441436
{
442-
while let Some((height, block)) = emitter.next_block()? {
443-
process_block(recv_chain, recv_graph, block, height)?;
437+
while let Some(emission) = emitter.next_block()? {
438+
let height = emission.block_height();
439+
process_block(recv_chain, recv_graph, emission.block, height)?;
444440
}
445441
Ok(())
446442
}
@@ -660,7 +656,8 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
660656

661657
// At this point, the emitter has seen all mempool transactions. It should only re-emit those
662658
// that have introduction heights less than the emitter's last-emitted block tip.
663-
while let Some((height, _)) = emitter.next_header()? {
659+
while let Some(emission) = emitter.next_header()? {
660+
let height = emission.block_height();
664661
// We call `mempool()` twice.
665662
// The second call (at height `h`) should skip the tx introduced at height `h`.
666663
for try_index in 0..2 {
@@ -754,7 +751,8 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
754751
.collect::<BTreeMap<_, _>>());
755752

756753
// `next_header` emits the replacement block of the reorg
757-
if let Some((height, _)) = emitter.next_header()? {
754+
if let Some(emission) = emitter.next_header()? {
755+
let height = emission.block_height();
758756
println!("\t- replacement height: {}", height);
759757

760758
// the mempool emission (that follows the first block emission after reorg) should only
@@ -835,12 +833,12 @@ fn no_agreement_point() -> anyhow::Result<()> {
835833
env.mine_blocks(PREMINE_COUNT, None)?;
836834

837835
// emit block 99a
838-
let (_, block_header_99a) = emitter.next_header()?.expect("block 99a header");
836+
let block_header_99a = emitter.next_header()?.expect("block 99a header").block;
839837
let block_hash_99a = block_header_99a.block_hash();
840838
let block_hash_98a = block_header_99a.prev_blockhash;
841839

842840
// emit block 100a
843-
let (_, block_header_100a) = emitter.next_header()?.expect("block 100a header");
841+
let block_header_100a = emitter.next_header()?.expect("block 100a header").block;
844842
let block_hash_100a = block_header_100a.block_hash();
845843

846844
// get hash for block 101a
@@ -855,7 +853,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
855853
env.mine_blocks(3, None)?;
856854

857855
// emit block header 99b
858-
let (_, block_header_99b) = emitter.next_header()?.expect("block 99b header");
856+
let block_header_99b = emitter.next_header()?.expect("block 99b header").block;
859857
let block_hash_99b = block_header_99b.block_hash();
860858
let block_hash_98b = block_header_99b.prev_blockhash;
861859

example-crates/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use bdk_bitcoind_rpc::{
1414
use bdk_chain::{
1515
bitcoin::{constants::genesis_block, Block, Transaction},
1616
indexed_tx_graph, keychain,
17-
local_chain::{self, CheckPoint, LocalChain},
17+
local_chain::{self, LocalChain},
1818
ConfirmationTimeHeightAnchor, IndexedTxGraph,
1919
};
2020
use example_cli::{
@@ -42,7 +42,7 @@ type ChangeSet = (
4242

4343
#[derive(Debug)]
4444
enum Emission {
45-
Block { height: u32, block: Block },
45+
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
4646
Mempool(Vec<(Transaction, u64)>),
4747
Tip(u32),
4848
}
@@ -178,17 +178,20 @@ fn main() -> anyhow::Result<()> {
178178
let mut last_db_commit = Instant::now();
179179
let mut last_print = Instant::now();
180180

181-
while let Some((height, block)) = emitter.next_block()? {
181+
while let Some(emission) = emitter.next_block()? {
182+
let height = emission.block_height();
183+
182184
let mut chain = chain.lock().unwrap();
183185
let mut graph = graph.lock().unwrap();
184186
let mut db = db.lock().unwrap();
185187

186-
let chain_update =
187-
CheckPoint::from_header(&block.header, height).into_update(false);
188188
let chain_changeset = chain
189-
.apply_update(chain_update)
189+
.apply_update(local_chain::Update {
190+
tip: emission.checkpoint,
191+
introduce_older_blocks: false,
192+
})
190193
.expect("must always apply as we receive blocks in order from emitter");
191-
let graph_changeset = graph.apply_block_relevant(block, height);
194+
let graph_changeset = graph.apply_block_relevant(emission.block, height);
192195
db.stage((chain_changeset, graph_changeset));
193196

194197
// commit staged db changes in intervals
@@ -256,15 +259,16 @@ fn main() -> anyhow::Result<()> {
256259

257260
loop {
258261
match emitter.next_block()? {
259-
Some((height, block)) => {
262+
Some(block_emission) => {
263+
let height = block_emission.block_height();
260264
if sigterm_flag.load(Ordering::Acquire) {
261265
break;
262266
}
263267
if height > block_count {
264268
block_count = rpc_client.get_block_count()? as u32;
265269
tx.send(Emission::Tip(block_count))?;
266270
}
267-
tx.send(Emission::Block { height, block })?;
271+
tx.send(Emission::Block(block_emission))?;
268272
}
269273
None => {
270274
if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
@@ -293,13 +297,17 @@ fn main() -> anyhow::Result<()> {
293297
let mut chain = chain.lock().unwrap();
294298

295299
let changeset = match emission {
296-
Emission::Block { height, block } => {
297-
let chain_update =
298-
CheckPoint::from_header(&block.header, height).into_update(false);
300+
Emission::Block(block_emission) => {
301+
let height = block_emission.block_height();
302+
let chain_update = local_chain::Update {
303+
tip: block_emission.checkpoint,
304+
introduce_older_blocks: false,
305+
};
299306
let chain_changeset = chain
300307
.apply_update(chain_update)
301308
.expect("must always apply as we receive blocks in order from emitter");
302-
let graph_changeset = graph.apply_block_relevant(block, height);
309+
let graph_changeset =
310+
graph.apply_block_relevant(block_emission.block, height);
303311
(chain_changeset, graph_changeset)
304312
}
305313
Emission::Mempool(mempool_txs) => {

0 commit comments

Comments
 (0)