Skip to content

Commit 5f34df8

Browse files
committed
bitcoind_rpc!: bring back CheckPoints to Emitter
* `bdk_chain` dependency is added. In the future, we will introduce a separate `bdk_core` crate to contain shared types. * replace `Emitter::new` with `from_height` and `from_checkpoint` * `from_height` emits from the given start height * `from_checkpoint` uses the provided cp to find agreement point * introduce logic that ensures emitted blocks can connect with receiver's `LocalChain` * in our rpc example, we can now `expect()` chain updates to always since we are using checkpoints and receiving blocks in order
1 parent 57590e0 commit 5f34df8

File tree

4 files changed

+84
-43
lines changed

4 files changed

+84
-43
lines changed

crates/bitcoind_rpc/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ edition = "2021"
99
# For no-std, remember to enable the bitcoin/no-std feature
1010
bitcoin = { version = "0.30", default-features = false }
1111
bitcoincore-rpc = { version = "0.17" }
12+
bdk_chain = { path = "../chain", version = "0.5", default-features = false }
1213

1314
[dev-dependencies]
14-
bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] }
1515
bitcoind = { version = "0.33", features = ["25_0"] }
1616
anyhow = { version = "1" }
1717

1818
[features]
1919
default = ["std"]
20-
std = ["bitcoin/std"]
21-
serde = ["bitcoin/serde"]
20+
std = ["bitcoin/std", "bdk_chain/std"]
21+
serde = ["bitcoin/serde", "bdk_chain/serde"]

crates/bitcoind_rpc/src/lib.rs

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
//! mempool.
1010
#![warn(missing_docs)]
1111

12-
use std::collections::BTreeMap;
13-
12+
use bdk_chain::{local_chain::CheckPoint, BlockId};
1413
use bitcoin::{block::Header, Block, BlockHash, Transaction};
1514
pub use bitcoincore_rpc;
1615
use bitcoincore_rpc::bitcoincore_rpc_json;
@@ -24,7 +23,7 @@ pub struct Emitter<'c, C> {
2423
client: &'c C,
2524
start_height: u32,
2625

27-
emitted_blocks: BTreeMap<u32, BlockHash>,
26+
last_cp: Option<CheckPoint>,
2827
last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
2928

3029
/// The latest first-seen epoch of emitted mempool transactions. This is used to determine
@@ -37,14 +36,29 @@ pub struct Emitter<'c, C> {
3736
}
3837

3938
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
40-
/// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`].
39+
/// Construct a new [`Emitter`] with the given RPC `client` and `start_height`.
4140
///
4241
/// `start_height` is the block height to start emitting blocks from.
43-
pub fn new(client: &'c C, start_height: u32) -> Self {
42+
pub fn from_height(client: &'c C, start_height: u32) -> Self {
4443
Self {
4544
client,
4645
start_height,
47-
emitted_blocks: BTreeMap::new(),
46+
last_cp: None,
47+
last_block: None,
48+
last_mempool_time: 0,
49+
last_mempool_tip: None,
50+
}
51+
}
52+
53+
/// Construct a new [`Emitter`] with the given RPC `client` and `checkpoint`.
54+
///
55+
/// `checkpoint` is used to find the latest block which is still part of the best chain. The
56+
/// [`Emitter`] will emit blocks starting right above this block.
57+
pub fn from_checkpoint(client: &'c C, checkpoint: CheckPoint) -> Self {
58+
Self {
59+
client,
60+
start_height: 0,
61+
last_cp: Some(checkpoint),
4862
last_block: None,
4963
last_mempool_time: 0,
5064
last_mempool_tip: None,
@@ -114,7 +128,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
114128
.collect::<Result<Vec<_>, _>>()?;
115129

116130
self.last_mempool_time = latest_time;
117-
self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height);
131+
self.last_mempool_tip = self.last_cp.as_ref().map(|cp| cp.height());
118132

119133
Ok(txs_to_emit)
120134
}
@@ -135,7 +149,7 @@ enum PollResponse {
135149
NoMoreBlocks,
136150
/// Fetched block is not in the best chain.
137151
BlockNotInBestChain,
138-
AgreementFound(bitcoincore_rpc_json::GetBlockResult),
152+
AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint),
139153
AgreementPointNotFound,
140154
}
141155

@@ -146,7 +160,10 @@ where
146160
let client = emitter.client;
147161

148162
if let Some(last_res) = &emitter.last_block {
149-
assert!(!emitter.emitted_blocks.is_empty());
163+
assert!(
164+
emitter.last_cp.is_some(),
165+
"must not have block result without last cp"
166+
);
150167

151168
let next_hash = match last_res.nextblockhash {
152169
None => return Ok(PollResponse::NoMoreBlocks),
@@ -160,7 +177,7 @@ where
160177
return Ok(PollResponse::Block(res));
161178
}
162179

163-
if emitter.emitted_blocks.is_empty() {
180+
if emitter.last_cp.is_none() {
164181
let hash = client.get_block_hash(emitter.start_height as _)?;
165182

166183
let res = client.get_block_info(&hash)?;
@@ -170,15 +187,15 @@ where
170187
return Ok(PollResponse::Block(res));
171188
}
172189

173-
for (&_, hash) in emitter.emitted_blocks.iter().rev() {
174-
let res = client.get_block_info(hash)?;
190+
for cp in emitter.last_cp.iter().flat_map(CheckPoint::iter) {
191+
let res = client.get_block_info(&cp.hash())?;
175192
if res.confirmations < 0 {
176193
// block is not in best chain
177194
continue;
178195
}
179196

180197
// agreement point found
181-
return Ok(PollResponse::AgreementFound(res));
198+
return Ok(PollResponse::AgreementFound(res, cp));
182199
}
183200

184201
Ok(PollResponse::AgreementPointNotFound)
@@ -196,9 +213,28 @@ where
196213
match poll_once(emitter)? {
197214
PollResponse::Block(res) => {
198215
let height = res.height as u32;
199-
let item = get_item(&res.hash)?;
200-
assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None);
216+
let hash = res.hash;
217+
let item = get_item(&hash)?;
218+
219+
let this_id = BlockId { height, hash };
220+
let prev_id = res.previousblockhash.map(|prev_hash| BlockId {
221+
height: height - 1,
222+
hash: prev_hash,
223+
});
224+
225+
match (&mut emitter.last_cp, prev_id) {
226+
(Some(cp), _) => *cp = cp.clone().push(this_id).expect("must push"),
227+
(last_cp, None) => *last_cp = Some(CheckPoint::new(this_id)),
228+
// When the receiver constructs a local_chain update from a block, the previous
229+
// checkpoint is also included in the update. We need to reflect this state in
230+
// `Emitter::last_cp` as well.
231+
(last_cp, Some(prev_id)) => {
232+
*last_cp = Some(CheckPoint::new(prev_id).push(this_id).expect("must push"))
233+
}
234+
}
235+
201236
emitter.last_block = Some(res);
237+
202238
return Ok(Some((height, item)));
203239
}
204240
PollResponse::NoMoreBlocks => {
@@ -209,11 +245,11 @@ where
209245
emitter.last_block = None;
210246
continue;
211247
}
212-
PollResponse::AgreementFound(res) => {
248+
PollResponse::AgreementFound(res, cp) => {
213249
let agreement_h = res.height as u32;
214250

215251
// get rid of evicted blocks
216-
emitter.emitted_blocks.split_off(&(agreement_h + 1));
252+
emitter.last_cp = Some(cp);
217253

218254
// The tip during the last mempool emission needs to in the best chain, we reduce
219255
// it if it is not.
@@ -226,7 +262,11 @@ where
226262
continue;
227263
}
228264
PollResponse::AgreementPointNotFound => {
229-
emitter.emitted_blocks.clear();
265+
// We want to clear `last_cp` and set `start_height` to the first checkpoint's
266+
// height. This way, the first checkpoint in `LocalChain` can be replaced.
267+
if let Some(last_cp) = emitter.last_cp.take() {
268+
emitter.start_height = last_cp.height();
269+
}
230270
emitter.last_block = None;
231271
continue;
232272
}

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up
189189
pub fn test_sync_local_chain() -> anyhow::Result<()> {
190190
let env = TestEnv::new()?;
191191
let mut local_chain = LocalChain::default();
192-
let mut emitter = Emitter::new(&env.client, 0);
192+
let mut emitter = Emitter::from_height(&env.client, 0);
193193

194194
// mine some blocks and returned the actual block hashes
195195
let exp_hashes = {
@@ -305,7 +305,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
305305
index
306306
});
307307

308-
let emitter = &mut Emitter::new(&env.client, 0);
308+
let emitter = &mut Emitter::from_height(&env.client, 0);
309309

310310
while let Some((height, block)) = emitter.next_block()? {
311311
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
@@ -393,7 +393,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
393393
const CHAIN_TIP_HEIGHT: usize = 110;
394394

395395
let env = TestEnv::new()?;
396-
let mut emitter = Emitter::new(&env.client, EMITTER_START_HEIGHT as _);
396+
let mut emitter = Emitter::from_height(&env.client, EMITTER_START_HEIGHT as _);
397397

398398
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
399399
while emitter.next_header()?.is_some() {}
@@ -461,7 +461,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
461461
const SEND_AMOUNT: Amount = Amount::from_sat(10_000);
462462

463463
let env = TestEnv::new()?;
464-
let mut emitter = Emitter::new(&env.client, 0);
464+
let mut emitter = Emitter::from_height(&env.client, 0);
465465

466466
// setup addresses
467467
let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked();
@@ -542,7 +542,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
542542
const MEMPOOL_TX_COUNT: usize = 2;
543543

544544
let env = TestEnv::new()?;
545-
let mut emitter = Emitter::new(&env.client, 0);
545+
let mut emitter = Emitter::from_height(&env.client, 0);
546546

547547
// mine blocks and sync up emitter
548548
let addr = env.client.get_new_address(None, None)?.assume_checked();
@@ -597,7 +597,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
597597
const MEMPOOL_TX_COUNT: usize = 21;
598598

599599
let env = TestEnv::new()?;
600-
let mut emitter = Emitter::new(&env.client, 0);
600+
let mut emitter = Emitter::from_height(&env.client, 0);
601601

602602
// mine blocks to get initial balance, sync emitter up to tip
603603
let addr = env.client.get_new_address(None, None)?.assume_checked();
@@ -674,7 +674,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
674674
const PREMINE_COUNT: usize = 101;
675675

676676
let env = TestEnv::new()?;
677-
let mut emitter = Emitter::new(&env.client, 0);
677+
let mut emitter = Emitter::from_height(&env.client, 0);
678678

679679
// mine blocks to get initial balance
680680
let addr = env.client.get_new_address(None, None)?.assume_checked();
@@ -789,7 +789,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
789789
let env = TestEnv::new()?;
790790

791791
// start height is 99
792-
let mut emitter = Emitter::new(&env.client, (PREMINE_COUNT - 2) as u32);
792+
let mut emitter = Emitter::from_height(&env.client, (PREMINE_COUNT - 2) as u32);
793793

794794
// mine 101 blocks
795795
env.mine_blocks(PREMINE_COUNT, None)?;

example-crates/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ const DB_MAGIC: &[u8] = b"bdk_example_rpc";
2727
const DB_PATH: &str = ".bdk_example_rpc.db";
2828

2929
const CHANNEL_BOUND: usize = 10;
30-
/// The block depth which we assume no reorgs can happen at.
31-
const ASSUME_FINAL_DEPTH: u32 = 6;
3230
/// Delay for printing status to stdout.
3331
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
3432
/// Delay between mempool emissions.
@@ -160,21 +158,22 @@ fn main() -> anyhow::Result<()> {
160158
let mut db = db.lock().unwrap();
161159

162160
graph.index.set_lookahead_for_all(lookahead);
163-
// we start at a height lower than last-seen tip in case of reorgs
164-
let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| {
165-
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
166-
});
167161

168162
let rpc_client = rpc_args.new_client()?;
169-
let mut emitter = Emitter::new(&rpc_client, start_height);
163+
let mut emitter = match chain.tip() {
164+
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
165+
None => Emitter::from_height(&rpc_client, fallback_height),
166+
};
170167

171168
let mut last_db_commit = Instant::now();
172169
let mut last_print = Instant::now();
173170

174171
while let Some((height, block)) = emitter.next_block()? {
175172
let chain_update =
176173
CheckPoint::from_header(&block.header, height).into_update(false);
177-
let chain_changeset = chain.apply_update(chain_update)?;
174+
let chain_changeset = chain
175+
.apply_update(chain_update)
176+
.expect("must always apply as we recieve blocks in order from emitter");
178177
let graph_changeset = graph.apply_block_relevant(block, height);
179178
db.stage((chain_changeset, graph_changeset));
180179

@@ -227,17 +226,17 @@ fn main() -> anyhow::Result<()> {
227226
let sigterm_flag = start_ctrlc_handler();
228227

229228
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
230-
// we start at a height lower than last-seen tip in case of reorgs
231-
let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| {
232-
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
233-
});
229+
let last_cp = chain.lock().unwrap().tip();
234230

235231
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
236232
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
237233
println!("emitter thread started...");
238234

239235
let rpc_client = rpc_args.new_client()?;
240-
let mut emitter = Emitter::new(&rpc_client, start_height);
236+
let mut emitter = match last_cp {
237+
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
238+
None => Emitter::from_height(&rpc_client, fallback_height),
239+
};
241240

242241
let mut block_count = rpc_client.get_block_count()? as u32;
243242
tx.send(Emission::Tip(block_count))?;
@@ -284,7 +283,9 @@ fn main() -> anyhow::Result<()> {
284283
Emission::Block { height, block } => {
285284
let chain_update =
286285
CheckPoint::from_header(&block.header, height).into_update(false);
287-
let chain_changeset = chain.apply_update(chain_update)?;
286+
let chain_changeset = chain
287+
.apply_update(chain_update)
288+
.expect("must always apply as we recieve blocks in order from emitter");
288289
let graph_changeset = graph.apply_block_relevant(block, height);
289290
(chain_changeset, graph_changeset)
290291
}

0 commit comments

Comments
 (0)