Skip to content

Commit 105d6c6

Browse files
committed
feat(bitcoind_rpc)!: simplified tx graph update
Replaced `into_tx_graph_update` with `indexed_tx_graph_update`. The latter returns a vec of `(tx, anchors, last_seen)` to be passed into `IndexedTxGraph::insert_relevant_txs`.
1 parent e6f2dbb commit 105d6c6

File tree

5 files changed

+78
-139
lines changed

5 files changed

+78
-139
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 43 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
//! This crate is used for updating [`bdk_chain`] structures with data from the `bitcoind` RPC
2-
//! interface.
2+
//! interface (excluding the RPC wallet API).
33
//!
4-
//! The main structure is [`Emitter`], which sources blockchain data from
5-
//! [`bitcoincore_rpc::Client`].
4+
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
65
//!
76
//! To only get block updates (exlude mempool transactions), the caller can use
87
//! [`Emitter::emit_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
@@ -43,9 +42,9 @@
4342

4443
use bdk_chain::{
4544
bitcoin::{Block, Transaction},
46-
indexed_tx_graph::Indexer,
45+
indexed_tx_graph::TxItem,
4746
local_chain::{self, CheckPoint},
48-
Append, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph,
47+
BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor,
4948
};
5049
pub use bitcoincore_rpc;
5150
use bitcoincore_rpc::{json::GetBlockResult, RpcApi};
@@ -92,24 +91,21 @@ impl EmittedUpdate {
9291
})
9392
}
9493

95-
/// Transforms the emitted update into a [`TxGraph`] update.
96-
///
97-
/// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so
98-
/// they do not get included in the [`TxGraph`] update. We have provided two closures;
99-
/// [`empty_filter`] and [`indexer_filter`] for this purpose.
94+
/// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`].
10095
///
10196
/// The `anchor_map` parameter takes in a closure that creates anchors of a specific type.
10297
/// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create
10398
/// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively.
104-
pub fn into_tx_graph_update<F, M, A>(self, tx_filter: F, anchor_map: M) -> TxGraph<A>
99+
///
100+
/// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs
101+
pub fn indexed_tx_graph_update<M, A>(&self, anchor_map: M) -> Vec<TxItem<'_, Option<A>>>
105102
where
106-
F: FnMut(&Transaction) -> bool,
107103
M: Fn(&CheckPoint, &Block, usize) -> A,
108-
A: Clone + Ord + PartialOrd,
104+
A: Clone + Ord + PartialEq,
109105
{
110106
match self {
111-
EmittedUpdate::Block(e) => e.into_tx_graph_update(tx_filter, anchor_map),
112-
EmittedUpdate::Mempool(e) => e.into_tx_graph_update(tx_filter),
107+
EmittedUpdate::Block(e) => e.indexed_tx_graph_update(anchor_map).collect(),
108+
EmittedUpdate::Mempool(e) => e.indexed_tx_graph_update().collect(),
113109
}
114110
}
115111
}
@@ -129,87 +125,63 @@ impl EmittedBlock {
129125
self.cp.clone()
130126
}
131127

132-
/// Transforms the emitted update into a [`TxGraph`] update.
128+
/// Convenience method to get [`local_chain::Update`].
129+
pub fn chain_update(&self) -> local_chain::Update {
130+
local_chain::Update {
131+
tip: self.cp.clone(),
132+
introduce_older_blocks: false,
133+
}
134+
}
135+
136+
/// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`].
133137
///
134-
/// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so
135-
/// they do not get included in the [`TxGraph`] update. We have provided two closures;
136-
/// [`empty_filter`] and [`indexer_filter`] for this purpose.
138+
/// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more.
137139
///
138-
/// The `anchor_map` parameter takes in a closure that creates anchors of a specific type.
139-
/// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create
140-
/// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively.
141-
pub fn into_tx_graph_update<F, M, A>(self, mut tx_filter: F, anchor_map: M) -> TxGraph<A>
140+
/// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs
141+
pub fn indexed_tx_graph_update<M, A>(
142+
&self,
143+
anchor_map: M,
144+
) -> impl Iterator<Item = TxItem<'_, Option<A>>>
142145
where
143-
F: FnMut(&Transaction) -> bool,
144146
M: Fn(&CheckPoint, &Block, usize) -> A,
145-
A: Clone + Ord + PartialOrd,
147+
A: Clone + Ord + PartialEq,
146148
{
147-
let mut tx_graph = TxGraph::default();
148-
let tx_iter = self
149-
.block
149+
self.block
150150
.txdata
151151
.iter()
152152
.enumerate()
153-
.filter(move |(_, tx)| tx_filter(tx));
154-
for (tx_pos, tx) in tx_iter {
155-
let txid = tx.txid();
156-
let _ = tx_graph.insert_anchor(txid, anchor_map(&self.cp, &self.block, tx_pos));
157-
let _ = tx_graph.insert_tx(tx.clone());
158-
}
159-
tx_graph
153+
.map(move |(i, tx)| (tx, Some(anchor_map(&self.cp, &self.block, i)), None))
160154
}
161155
}
162156

163157
/// An emitted subset of mempool transactions.
164158
#[derive(Debug, Clone)]
165159
pub struct EmittedMempool {
166-
/// Subset of mempool transactions.
160+
/// Subset of mempool transactions as tuples of `(tx, seen_at)`.
161+
///
162+
/// `seen_at` is the unix timestamp of when the transaction was first seen in the mempool.
167163
pub txs: Vec<(Transaction, u64)>,
168164
}
169165

170166
impl EmittedMempool {
171-
/// Transforms the emitted mempool into a [`TxGraph`] update.
167+
/// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`].
168+
///
169+
/// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more.
172170
///
173-
/// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so
174-
/// they do not get included in the [`TxGraph`] update. We have provided two closures;
175-
/// [`empty_filter`] and [`indexer_filter`] for this purpose.
176-
pub fn into_tx_graph_update<F, A>(self, mut tx_filter: F) -> TxGraph<A>
171+
/// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs
172+
pub fn indexed_tx_graph_update<A>(&self) -> impl Iterator<Item = TxItem<'_, Option<A>>>
177173
where
178-
F: FnMut(&Transaction) -> bool,
179-
A: Clone + Ord + PartialOrd,
174+
A: Clone + Ord + PartialEq,
180175
{
181-
let mut tx_graph = TxGraph::default();
182-
let tx_iter = self.txs.into_iter().filter(move |(tx, _)| tx_filter(tx));
183-
for (tx, seen_at) in tx_iter {
184-
let _ = tx_graph.insert_seen_at(tx.txid(), seen_at);
185-
let _ = tx_graph.insert_tx(tx);
186-
}
187-
tx_graph
188-
}
189-
}
190-
191-
/// Creates a closure that filters transactions based on an [`Indexer`] implementation.
192-
pub fn indexer_filter<'i, I: Indexer>(
193-
indexer: &'i mut I,
194-
changeset: &'i mut I::ChangeSet,
195-
) -> impl FnMut(&Transaction) -> bool + 'i
196-
where
197-
I::ChangeSet: bdk_chain::Append,
198-
{
199-
|tx| {
200-
changeset.append(indexer.index_tx(tx));
201-
indexer.is_tx_relevant(tx)
176+
self.txs
177+
.iter()
178+
.map(|(tx, seen_at)| (tx, None, Some(*seen_at)))
202179
}
203180
}
204181

205-
/// Returns an empty filter-closure.
206-
pub fn empty_filter() -> impl FnMut(&Transaction) -> bool {
207-
|_| true
208-
}
209-
210182
/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationHeightAnchor`].
211183
///
212-
/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`].
184+
/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`].
213185
pub fn confirmation_height_anchor(
214186
cp: &CheckPoint,
215187
_block: &Block,
@@ -224,7 +196,7 @@ pub fn confirmation_height_anchor(
224196

225197
/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationTimeAnchor`].
226198
///
227-
/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`].
199+
/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`].
228200
pub fn confirmation_time_anchor(
229201
cp: &CheckPoint,
230202
block: &Block,

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -215,15 +215,10 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
215215
let _ = chain.apply_update(chain_update)?;
216216
}
217217

218-
let tx_graph_update = update.into_tx_graph_update(
219-
bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()),
220-
bdk_bitcoind_rpc::confirmation_height_anchor,
221-
);
222-
assert_eq!(tx_graph_update.full_txs().count(), 0);
223-
assert_eq!(tx_graph_update.all_txouts().count(), 0);
224-
assert_eq!(tx_graph_update.all_anchors().len(), 0);
218+
let tx_graph_update =
219+
update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor);
225220

226-
let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update);
221+
let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update);
227222
assert!(indexed_additions.is_empty());
228223
}
229224

@@ -250,20 +245,10 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
250245
let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?;
251246
assert!(update.is_mempool());
252247

253-
let tx_graph_update = update.into_tx_graph_update(
254-
bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()),
255-
bdk_bitcoind_rpc::confirmation_height_anchor,
256-
);
257-
assert_eq!(
258-
tx_graph_update
259-
.full_txs()
260-
.map(|tx| tx.txid)
261-
.collect::<BTreeSet<Txid>>(),
262-
exp_txids,
263-
"the mempool update should have 3 relevant transactions",
264-
);
248+
let tx_graph_update =
249+
update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor);
265250

266-
let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update);
251+
let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update);
267252
assert_eq!(
268253
indexed_additions
269254
.graph
@@ -302,25 +287,10 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
302287
let _ = chain.apply_update(chain_update)?;
303288
}
304289

305-
let tx_graph_update = update.into_tx_graph_update(
306-
bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()),
307-
bdk_bitcoind_rpc::confirmation_height_anchor,
308-
);
309-
assert_eq!(
310-
tx_graph_update
311-
.full_txs()
312-
.map(|tx| tx.txid)
313-
.collect::<BTreeSet<Txid>>(),
314-
exp_txids,
315-
"block update should have 3 relevant transactions",
316-
);
317-
assert_eq!(
318-
tx_graph_update.all_anchors(),
319-
&exp_anchors,
320-
"the block update should introduce anchors",
321-
);
290+
let tx_graph_update =
291+
update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor);
322292

323-
let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update);
293+
let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update);
324294
assert!(indexed_additions.graph.txs.is_empty());
325295
assert!(indexed_additions.graph.txouts.is_empty());
326296
assert_eq!(indexed_additions.graph.anchors, exp_anchors);

crates/chain/src/indexed_tx_graph.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ where
135135
/// timestamp of when the transactions are last seen.
136136
pub fn insert_relevant_txs<'t>(
137137
&mut self,
138-
txs: impl IntoIterator<Item = (&'t Transaction, impl IntoIterator<Item = A>)>,
139-
seen_at: Option<u64>,
138+
txs: impl IntoIterator<Item = TxItem<'t, impl IntoIterator<Item = A>>>,
140139
) -> ChangeSet<A, I::ChangeSet> {
141140
// The algorithm below allows for non-topologically ordered transactions by using two loops.
142141
// This is achieved by:
@@ -146,17 +145,19 @@ where
146145
// returns true or not. (in a second loop).
147146
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
148147
let mut transactions = Vec::new();
149-
for (tx, anchors) in txs.into_iter() {
148+
for (tx, anchors, seen_at) in txs.into_iter() {
150149
changeset.indexer.append(self.index.index_tx(tx));
151-
transactions.push((tx, anchors));
150+
transactions.push((tx, anchors, seen_at));
152151
}
153152
changeset.append(
154153
transactions
155154
.into_iter()
156-
.filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) {
157-
true => Some(self.insert_tx(tx, anchors, seen_at)),
158-
false => None,
159-
})
155+
.filter_map(
156+
|(tx, anchors, seen_at)| match self.index.is_tx_relevant(tx) {
157+
true => Some(self.insert_tx(tx, anchors, seen_at)),
158+
false => None,
159+
},
160+
)
160161
.fold(Default::default(), |mut acc, other| {
161162
acc.append(other);
162163
acc
@@ -166,6 +167,9 @@ where
166167
}
167168
}
168169

170+
/// Represents a single transaction update.
171+
pub type TxItem<'t, A> = (&'t Transaction, A, Option<u64>);
172+
169173
/// A structure that represents changes to an [`IndexedTxGraph`].
170174
#[derive(Clone, Debug, PartialEq)]
171175
#[cfg_attr(

crates/chain/tests/test_indexed_tx_graph.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ fn insert_relevant_txs() {
7474
};
7575

7676
assert_eq!(
77-
graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None)), None),
77+
graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None, None))),
7878
changeset,
7979
);
8080

@@ -211,8 +211,8 @@ fn test_list_owned_txouts() {
211211
// Insert transactions into graph with respective anchors
212212
// For unconfirmed txs we pass in `None`.
213213

214-
let _ = graph.insert_relevant_txs(
215-
[&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| {
214+
let _ =
215+
graph.insert_relevant_txs([&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| {
216216
let height = i as u32;
217217
(
218218
*tx,
@@ -225,12 +225,11 @@ fn test_list_owned_txouts() {
225225
anchor_block,
226226
confirmation_height: anchor_block.height,
227227
}),
228+
None,
228229
)
229-
}),
230-
None,
231-
);
230+
}));
232231

233-
let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None)), Some(100));
232+
let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100))));
234233

235234
// A helper lambda to extract and filter data from the graph.
236235
let fetch =

example-crates/example_rpc/src/main.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ impl From<RpcArgs> for Auth {
6464

6565
#[derive(Subcommand, Debug, Clone)]
6666
enum RpcCommands {
67-
/// Scans blocks via RPC (starting from last point of agreement) and stores/indexes relevant
68-
/// transactions
69-
Scan {
67+
/// Syncs local state with remote state via RPC (starting from last point of agreement) and
68+
/// stores/indexes relevant transactions
69+
Sync {
7070
/// Starting block height to fallback to if no point of agreement if found
7171
#[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
7272
fallback_height: u32,
@@ -93,7 +93,7 @@ enum RpcCommands {
9393
impl RpcCommands {
9494
fn rpc_args(&self) -> &RpcArgs {
9595
match self {
96-
RpcCommands::Scan { rpc_args, .. } => rpc_args,
96+
RpcCommands::Sync { rpc_args, .. } => rpc_args,
9797
RpcCommands::Tx { rpc_args, .. } => rpc_args,
9898
}
9999
}
@@ -145,7 +145,7 @@ fn main() -> anyhow::Result<()> {
145145
};
146146

147147
match rpc_cmd {
148-
RpcCommands::Scan {
148+
RpcCommands::Sync {
149149
fallback_height,
150150
lookahead,
151151
live,
@@ -200,15 +200,9 @@ fn main() -> anyhow::Result<()> {
200200
let mut chain = chain.lock().unwrap();
201201
let mut graph = graph.lock().unwrap();
202202

203-
let graph_update = {
204-
let tx_filter = bdk_bitcoind_rpc::indexer_filter(
205-
&mut graph.index,
206-
&mut indexed_changeset.indexer,
207-
);
208-
let anchor_map = bdk_bitcoind_rpc::confirmation_time_anchor;
209-
item.into_tx_graph_update(tx_filter, anchor_map)
210-
};
211-
indexed_changeset.append(graph.apply_update(graph_update));
203+
let graph_update =
204+
item.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_time_anchor);
205+
indexed_changeset.append(graph.insert_relevant_txs(graph_update));
212206

213207
let chain_changeset = match chain_update {
214208
Some(update) => chain.apply_update(update)?,

0 commit comments

Comments
 (0)