Skip to content

Commit 38d69c9

Browse files
committed
Merge #1041: Add bitcoind_rpc chain source module.
85c6253 docs(bitcoind_rpc): better `Emitter::mempool` explanation (志宇) b69c13d example_bitcoind_rpc: tweaks (志宇) 5f34df8 bitcoind_rpc!: bring back `CheckPoint`s to `Emitter` (志宇) 57590e0 bitcoind_rpc: rm `BlockHash` from `Emitter::last_mempool_tip` (志宇) 6d4b33e chain: split `IndexedTxGraph::insert_tx` into 3 methods (志宇) 4f5695d chain: improvements to `IndexedTxGraph` and `TxGraph` APIs (志宇) 150d6f8 feat(example_bitcoind_rpc_polling): add example for RPC polling (志宇) 4f10463 test(bitcoind_rpc): add no_agreement_point test (志宇) a73dac2 test(bitcoind_rpc): initial tests for `Emitter` (志宇) bb7424d feat(bitcoind_rpc): introduce `bitcoind_rpc` crate (志宇) 240657b chain: add batch-insert methods for `IndexedTxGraph` (志宇) 43bc813 chain: add helper methods on `CheckPoint` (志宇) b3db5ca feat(chain): add `AnchorFromBlockPosition` trait (志宇) f795a43 feat(example_cli): allow chain specific args in examples (志宇) Pull request description: ### Description This PR builds on top of #1034 and adds the `bitcoind_rpc` chain-src module and example. ### Notes to the reviewers Don't merge this until #1034 is in! ### Changelog notice * Add `bitcoind_rpc` chain-source module. * Add `example_bitcoind_rpc` example module. * Add `AnchorFromBlockPosition` trait which are for anchors that can be constructed from a given block, height and position in block. * Add helper methods to `IndexedTxGraph` and `TxGraph` for batch operations and applying blocks directly. * Add helper methods to `CheckPoint` for easier construction from a block `Header`. ### Checklists * [x] Add test: we should detect when an initially-confirmed transaction is "unconfirmed" during a reorg. * [x] Improve `example_bitcoind_rpc`: add `live` command. * [x] Improve docs. * [x] Reintroduce `CheckPoint`. #### All Submissions: * [x] I've signed all my commits * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md) * [x] I ran `cargo fmt` and `cargo clippy` before committing #### New Features: * [x] I've added tests for the new feature * [x] I've added docs for the new feature ACKs for top commit: notmandatory: Re ACK 85c6253 Tree-SHA512: 88dbafbebaf227b18c69f2ea884e3e586bf9c11e5e450eb4872ade1d1ccd5cf1e33ce9930a6f5aa918baa3e92add7503858b039b8c9d553a281ad6d833f08a49
2 parents 67eec36 + 85c6253 commit 38d69c9

File tree

16 files changed

+2092
-345
lines changed

16 files changed

+2092
-345
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ members = [
66
"crates/file_store",
77
"crates/electrum",
88
"crates/esplora",
9+
"crates/bitcoind_rpc",
910
"example-crates/example_cli",
1011
"example-crates/example_electrum",
1112
"example-crates/example_esplora",
13+
"example-crates/example_bitcoind_rpc_polling",
1214
"example-crates/wallet_electrum",
1315
"example-crates/wallet_esplora_blocking",
1416
"example-crates/wallet_esplora_async",

crates/bdk/src/wallet/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ impl<D> Wallet<D> {
509509
where
510510
D: PersistBackend<ChangeSet>,
511511
{
512-
let additions = self.indexed_graph.insert_txout(outpoint, &txout);
512+
let additions = self.indexed_graph.insert_txout(outpoint, txout);
513513
self.persist.stage(ChangeSet::from(additions));
514514
}
515515

@@ -738,7 +738,16 @@ impl<D> Wallet<D> {
738738
ConfirmationTime::Unconfirmed { last_seen } => (None, Some(last_seen)),
739739
};
740740

741-
let changeset: ChangeSet = self.indexed_graph.insert_tx(&tx, anchor, last_seen).into();
741+
let mut changeset = ChangeSet::default();
742+
let txid = tx.txid();
743+
changeset.append(self.indexed_graph.insert_tx(tx).into());
744+
if let Some(anchor) = anchor {
745+
changeset.append(self.indexed_graph.insert_anchor(txid, anchor).into());
746+
}
747+
if let Some(last_seen) = last_seen {
748+
changeset.append(self.indexed_graph.insert_seen_at(txid, last_seen).into());
749+
}
750+
742751
let changed = !changeset.is_empty();
743752
self.persist.stage(changeset);
744753
Ok(changed)

crates/bitcoind_rpc/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "bdk_bitcoind_rpc"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
# For no-std, remember to enable the bitcoin/no-std feature
10+
bitcoin = { version = "0.30", default-features = false }
11+
bitcoincore-rpc = { version = "0.17" }
12+
bdk_chain = { path = "../chain", version = "0.5", default-features = false }
13+
14+
[dev-dependencies]
15+
bitcoind = { version = "0.33", features = ["25_0"] }
16+
anyhow = { version = "1" }
17+
18+
[features]
19+
default = ["std"]
20+
std = ["bitcoin/std", "bdk_chain/std"]
21+
serde = ["bitcoin/serde", "bdk_chain/serde"]

crates/bitcoind_rpc/src/lib.rs

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface. It does not
2+
//! use the wallet RPC API, so this crate can be used with wallet-disabled Bitcoin Core nodes.
3+
//!
4+
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
5+
//!
6+
//! To only get block updates (exclude mempool transactions), the caller can use
7+
//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
8+
//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
9+
//! mempool.
10+
#![warn(missing_docs)]
11+
12+
use bdk_chain::{local_chain::CheckPoint, BlockId};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction};
14+
pub use bitcoincore_rpc;
15+
use bitcoincore_rpc::bitcoincore_rpc_json;
16+
17+
/// A structure that emits data sourced from [`bitcoincore_rpc::Client`].
18+
///
19+
/// Refer to [module-level documentation] for more.
20+
///
21+
/// [module-level documentation]: crate
22+
pub struct Emitter<'c, C> {
23+
client: &'c C,
24+
start_height: u32,
25+
26+
/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
27+
/// that the block is no longer in the best chain, it will be popped off from here.
28+
last_cp: Option<CheckPoint>,
29+
30+
/// The block result returned from rpc of the last-emitted block. As this result contains the
31+
/// next block's block hash (which we use to fetch the next block), we set this to `None`
32+
/// whenever there are no more blocks, or the next block is no longer in the best chain. This
33+
/// gives us an opportunity to re-fetch this result.
34+
last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
35+
36+
/// The latest first-seen epoch of emitted mempool transactions. This is used to determine
37+
/// whether a mempool transaction is already emitted.
38+
last_mempool_time: usize,
39+
40+
/// The last emitted block during our last mempool emission. This is used to determine whether
41+
/// there has been a reorg since our last mempool emission.
42+
last_mempool_tip: Option<u32>,
43+
}
44+
45+
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
46+
/// Construct a new [`Emitter`] with the given RPC `client` and `start_height`.
47+
///
48+
/// `start_height` is the block height to start emitting blocks from.
49+
pub fn from_height(client: &'c C, start_height: u32) -> Self {
50+
Self {
51+
client,
52+
start_height,
53+
last_cp: None,
54+
last_block: None,
55+
last_mempool_time: 0,
56+
last_mempool_tip: None,
57+
}
58+
}
59+
60+
/// Construct a new [`Emitter`] with the given RPC `client` and `checkpoint`.
61+
///
62+
/// `checkpoint` is used to find the latest block which is still part of the best chain. The
63+
/// [`Emitter`] will emit blocks starting right above this block.
64+
pub fn from_checkpoint(client: &'c C, checkpoint: CheckPoint) -> Self {
65+
Self {
66+
client,
67+
start_height: 0,
68+
last_cp: Some(checkpoint),
69+
last_block: None,
70+
last_mempool_time: 0,
71+
last_mempool_tip: None,
72+
}
73+
}
74+
75+
/// Emit mempool transactions, alongside their first-seen unix timestamps.
76+
///
77+
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
78+
/// ancestors are already emitted.
79+
///
80+
/// To understand why, consider a receiver which filters transactions based on whether it
81+
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
82+
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
83+
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
84+
/// at height `h`.
85+
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
86+
let client = self.client;
87+
88+
// This is the emitted tip height during the last mempool emission.
89+
let prev_mempool_tip = self
90+
.last_mempool_tip
91+
// We use `start_height - 1` as we cannot guarantee that the block at
92+
// `start_height` has been emitted.
93+
.unwrap_or(self.start_height.saturating_sub(1));
94+
95+
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
96+
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
97+
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
98+
// be the new latest timestamp.
99+
let prev_mempool_time = self.last_mempool_time;
100+
let mut latest_time = prev_mempool_time;
101+
102+
let txs_to_emit = client
103+
.get_raw_mempool_verbose()?
104+
.into_iter()
105+
.filter_map({
106+
let latest_time = &mut latest_time;
107+
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
108+
let tx_time = tx_entry.time as usize;
109+
if tx_time > *latest_time {
110+
*latest_time = tx_time;
111+
}
112+
113+
// Avoid emitting transactions that are already emitted if we can guarantee
114+
// blocks containing ancestors are already emitted. The bitcoind rpc interface
115+
// provides us with the block height that the tx is introduced to the mempool.
116+
// If we have already emitted the block of height, we can assume that all
117+
// ancestor txs have been processed by the receiver.
118+
let is_already_emitted = tx_time <= prev_mempool_time;
119+
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
120+
if is_already_emitted && is_within_height {
121+
return None;
122+
}
123+
124+
let tx = match client.get_raw_transaction(&txid, None) {
125+
Ok(tx) => tx,
126+
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
127+
Err(err) if err.is_not_found_error() => return None,
128+
Err(err) => return Some(Err(err)),
129+
};
130+
131+
Some(Ok((tx, tx_time as u64)))
132+
}
133+
})
134+
.collect::<Result<Vec<_>, _>>()?;
135+
136+
self.last_mempool_time = latest_time;
137+
self.last_mempool_tip = self.last_cp.as_ref().map(|cp| cp.height());
138+
139+
Ok(txs_to_emit)
140+
}
141+
142+
/// Emit the next block height and header (if any).
143+
pub fn next_header(&mut self) -> Result<Option<(u32, Header)>, bitcoincore_rpc::Error> {
144+
poll(self, |hash| self.client.get_block_header(hash))
145+
}
146+
147+
/// Emit the next block height and block (if any).
148+
pub fn next_block(&mut self) -> Result<Option<(u32, Block)>, bitcoincore_rpc::Error> {
149+
poll(self, |hash| self.client.get_block(hash))
150+
}
151+
}
152+
153+
enum PollResponse {
154+
Block(bitcoincore_rpc_json::GetBlockResult),
155+
NoMoreBlocks,
156+
/// Fetched block is not in the best chain.
157+
BlockNotInBestChain,
158+
AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint),
159+
AgreementPointNotFound,
160+
}
161+
162+
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
163+
where
164+
C: bitcoincore_rpc::RpcApi,
165+
{
166+
let client = emitter.client;
167+
168+
if let Some(last_res) = &emitter.last_block {
169+
assert!(
170+
emitter.last_cp.is_some(),
171+
"must not have block result without last cp"
172+
);
173+
174+
let next_hash = match last_res.nextblockhash {
175+
None => return Ok(PollResponse::NoMoreBlocks),
176+
Some(next_hash) => next_hash,
177+
};
178+
179+
let res = client.get_block_info(&next_hash)?;
180+
if res.confirmations < 0 {
181+
return Ok(PollResponse::BlockNotInBestChain);
182+
}
183+
return Ok(PollResponse::Block(res));
184+
}
185+
186+
if emitter.last_cp.is_none() {
187+
let hash = client.get_block_hash(emitter.start_height as _)?;
188+
189+
let res = client.get_block_info(&hash)?;
190+
if res.confirmations < 0 {
191+
return Ok(PollResponse::BlockNotInBestChain);
192+
}
193+
return Ok(PollResponse::Block(res));
194+
}
195+
196+
for cp in emitter.last_cp.iter().flat_map(CheckPoint::iter) {
197+
let res = client.get_block_info(&cp.hash())?;
198+
if res.confirmations < 0 {
199+
// block is not in best chain
200+
continue;
201+
}
202+
203+
// agreement point found
204+
return Ok(PollResponse::AgreementFound(res, cp));
205+
}
206+
207+
Ok(PollResponse::AgreementPointNotFound)
208+
}
209+
210+
fn poll<C, V, F>(
211+
emitter: &mut Emitter<C>,
212+
get_item: F,
213+
) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
214+
where
215+
C: bitcoincore_rpc::RpcApi,
216+
F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
217+
{
218+
loop {
219+
match poll_once(emitter)? {
220+
PollResponse::Block(res) => {
221+
let height = res.height as u32;
222+
let hash = res.hash;
223+
let item = get_item(&hash)?;
224+
225+
let this_id = BlockId { height, hash };
226+
let prev_id = res.previousblockhash.map(|prev_hash| BlockId {
227+
height: height - 1,
228+
hash: prev_hash,
229+
});
230+
231+
match (&mut emitter.last_cp, prev_id) {
232+
(Some(cp), _) => *cp = cp.clone().push(this_id).expect("must push"),
233+
(last_cp, None) => *last_cp = Some(CheckPoint::new(this_id)),
234+
// When the receiver constructs a local_chain update from a block, the previous
235+
// checkpoint is also included in the update. We need to reflect this state in
236+
// `Emitter::last_cp` as well.
237+
(last_cp, Some(prev_id)) => {
238+
*last_cp = Some(CheckPoint::new(prev_id).push(this_id).expect("must push"))
239+
}
240+
}
241+
242+
emitter.last_block = Some(res);
243+
244+
return Ok(Some((height, item)));
245+
}
246+
PollResponse::NoMoreBlocks => {
247+
emitter.last_block = None;
248+
return Ok(None);
249+
}
250+
PollResponse::BlockNotInBestChain => {
251+
emitter.last_block = None;
252+
continue;
253+
}
254+
PollResponse::AgreementFound(res, cp) => {
255+
let agreement_h = res.height as u32;
256+
257+
// get rid of evicted blocks
258+
emitter.last_cp = Some(cp);
259+
260+
// The tip during the last mempool emission needs to in the best chain, we reduce
261+
// it if it is not.
262+
if let Some(h) = emitter.last_mempool_tip.as_mut() {
263+
if *h > agreement_h {
264+
*h = agreement_h;
265+
}
266+
}
267+
emitter.last_block = Some(res);
268+
continue;
269+
}
270+
PollResponse::AgreementPointNotFound => {
271+
// We want to clear `last_cp` and set `start_height` to the first checkpoint's
272+
// height. This way, the first checkpoint in `LocalChain` can be replaced.
273+
if let Some(last_cp) = emitter.last_cp.take() {
274+
emitter.start_height = last_cp.height();
275+
}
276+
emitter.last_block = None;
277+
continue;
278+
}
279+
}
280+
}
281+
}
282+
283+
/// Extends [`bitcoincore_rpc::Error`].
284+
pub trait BitcoindRpcErrorExt {
285+
/// Returns whether the error is a "not found" error.
286+
///
287+
/// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
288+
/// [`Iterator::Item`].
289+
fn is_not_found_error(&self) -> bool;
290+
}
291+
292+
impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
293+
fn is_not_found_error(&self) -> bool {
294+
if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
295+
{
296+
rpc_err.code == -5
297+
} else {
298+
false
299+
}
300+
}
301+
}

0 commit comments

Comments
 (0)