Skip to content

Commit f0b807b

Browse files
committed
refactor(rpc)!: update mempool interface and test code
1 parent a32b3f2 commit f0b807b

File tree

4 files changed

+137
-35
lines changed

4 files changed

+137
-35
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -43,6 +44,9 @@ pub struct Emitter<'c, C> {
4344
/// The last emitted block during our last mempool emission. This is used to determine whether
4445
/// there has been a reorg since our last mempool emission.
4546
last_mempool_tip: Option<u32>,
47+
48+
/// Expected mempool txs. TODO: Docs.
49+
expected_mempool_txids: HashSet<Txid>,
4650
}
4751

4852
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -53,29 +57,38 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5357
///
5458
/// `start_height` starts emission from a given height (if there are no conflicts with the
5559
/// original chain).
56-
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
60+
pub fn new(
61+
client: &'c C,
62+
last_cp: CheckPoint,
63+
start_height: u32,
64+
expected_mempool_txids: HashSet<Txid>,
65+
) -> Self {
5766
Self {
5867
client,
5968
start_height,
6069
last_cp,
6170
last_block: None,
6271
last_mempool_time: 0,
6372
last_mempool_tip: None,
73+
expected_mempool_txids,
6474
}
6575
}
6676

67-
/// Emit mempool transactions, alongside their first-seen unix timestamps.
77+
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
6878
///
69-
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
70-
/// ancestors are already emitted.
79+
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
80+
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
81+
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
82+
/// transaction's ancestors are already emitted.
7183
///
7284
/// To understand why, consider a receiver which filters transactions based on whether it
7385
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
7486
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
7587
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
7688
/// at height `h`.
77-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
89+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
7890
let client = self.client;
91+
let mut evicted_txids = HashSet::<Txid>::new();
7992

8093
// This is the emitted tip height during the last mempool emission.
8194
let prev_mempool_tip = self
@@ -84,18 +97,39 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
8497
// `start_height` has been emitted.
8598
.unwrap_or(self.start_height.saturating_sub(1));
8699

100+
// Clear out `expected_mempool_txids` if we are on a different block.
101+
if prev_mempool_tip != self.last_cp.height() {
102+
self.expected_mempool_txids.clear();
103+
}
104+
87105
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88106
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
89107
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90108
// be the new latest timestamp.
91109
let prev_mempool_time = self.last_mempool_time;
92110
let mut latest_time = prev_mempool_time;
93111

94-
let txs_to_emit = client
95-
.get_raw_mempool_verbose()?
112+
// Get the raw mempool result from the RPC client.
113+
let raw_mempool = client.get_raw_mempool_verbose()?;
114+
let raw_mempool_txids = raw_mempool.keys().copied().collect::<HashSet<_>>();
115+
116+
// Check if missing txs have been confirmed. If not, they have been evicted.
117+
for &txid in self.expected_mempool_txids.difference(&raw_mempool_txids) {
118+
// Check if missing tx was confirmed. If not, then the tx has been evicted. Tx is also
119+
// considered evicted if it was not found.
120+
if client
121+
.get_transaction(&txid, None)
122+
.map_or(true, |tx_res| tx_res.info.confirmations <= 0)
123+
{
124+
evicted_txids.insert(txid);
125+
}
126+
}
127+
128+
let new_txs = raw_mempool
96129
.into_iter()
97130
.filter_map({
98131
let latest_time = &mut latest_time;
132+
let evicted_txids = &mut evicted_txids;
99133
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
100134
let tx_time = tx_entry.time as usize;
101135
if tx_time > *latest_time {
@@ -115,8 +149,18 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
115149

116150
let tx = match client.get_raw_transaction(&txid, None) {
117151
Ok(tx) => tx,
118-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
119-
Err(err) if err.is_not_found_error() => return None,
152+
Err(err) if err.is_not_found_error() => {
153+
// Check if the tx was confirmed since `get_raw_mempool_verbose`. If
154+
// not, then the tx has been evicted. Tx is also considered evicted if
155+
// it was not found.
156+
if client
157+
.get_transaction(&txid, None)
158+
.map_or(true, |tx_res| tx_res.info.confirmations <= 0)
159+
{
160+
evicted_txids.insert(txid);
161+
}
162+
return None;
163+
}
120164
Err(err) => return Some(Err(err)),
121165
};
122166

@@ -127,8 +171,13 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
127171

128172
self.last_mempool_time = latest_time;
129173
self.last_mempool_tip = Some(self.last_cp.height());
174+
self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
130175

131-
Ok(txs_to_emit)
176+
Ok(MempoolEvent {
177+
new_txs,
178+
evicted_txids,
179+
latest_update_time: latest_time as u64,
180+
})
132181
}
133182

134183
/// Emit the next block height and header (if any).
@@ -144,6 +193,27 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
144193
}
145194
}
146195

196+
/// A new emission from mempool.
197+
#[derive(Debug)]
198+
pub struct MempoolEvent {
199+
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
200+
///
201+
/// To understand the second condition, consider a receiver which filters transactions based on
202+
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
203+
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
204+
/// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
205+
/// block at height `h`.
206+
pub new_txs: Vec<(Transaction, u64)>,
207+
208+
/// [`Txid`]s of all transactions that have been evicted from mempool.
209+
pub evicted_txids: HashSet<Txid>,
210+
211+
/// The latest timestamp of when a transaction entered the mempool.
212+
///
213+
/// This is useful for setting the timestamp for evicted transactions.
214+
pub latest_update_time: u64,
215+
}
216+
147217
/// A newly emitted block from [`Emitter`].
148218
#[derive(Debug)]
149219
pub struct BlockEvent<B> {

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, BTreeSet};
1+
use std::collections::{BTreeMap, BTreeSet, HashSet};
22

33
use bdk_bitcoind_rpc::Emitter;
44
use bdk_chain::{
@@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2222
let env = TestEnv::new()?;
2323
let network_tip = env.rpc_client().get_block_count()?;
2424
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
25+
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
2626

2727
// Mine some blocks and return the actual block hashes.
2828
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156156
index
157157
});
158158

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
159+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
160160

161161
while let Some(emission) = emitter.next_block()? {
162162
let height = emission.block_height();
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
189189
assert!(emitter.next_block()?.is_none());
190190

191191
let mempool_txs = emitter.mempool()?;
192-
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
192+
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
193193
assert_eq!(
194194
indexed_additions
195195
.tx_graph
@@ -252,6 +252,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252252
hash: env.rpc_client().get_block_hash(0)?,
253253
}),
254254
EMITTER_START_HEIGHT as _,
255+
HashSet::new(),
255256
);
256257

257258
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -328,6 +329,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
328329
hash: env.rpc_client().get_block_hash(0)?,
329330
}),
330331
0,
332+
HashSet::new(),
331333
);
332334

333335
// setup addresses
@@ -419,6 +421,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
419421
hash: env.rpc_client().get_block_hash(0)?,
420422
}),
421423
0,
424+
HashSet::new(),
422425
);
423426

424427
// mine blocks and sync up emitter
@@ -437,6 +440,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
437440
// the first emission should include all transactions
438441
let emitted_txids = emitter
439442
.mempool()?
443+
.new_txs
440444
.into_iter()
441445
.map(|(tx, _)| tx.compute_txid())
442446
.collect::<BTreeSet<Txid>>();
@@ -447,7 +451,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
447451

448452
// second emission should be empty
449453
assert!(
450-
emitter.mempool()?.is_empty(),
454+
emitter.mempool()?.new_txs.is_empty(),
451455
"second emission should be empty"
452456
);
453457

@@ -457,7 +461,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
457461
}
458462
while emitter.next_header()?.is_some() {}
459463
assert!(
460-
emitter.mempool()?.is_empty(),
464+
emitter.mempool()?.new_txs.is_empty(),
461465
"third emission, after chain tip is extended, should also be empty"
462466
);
463467

@@ -484,6 +488,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
484488
hash: env.rpc_client().get_block_hash(0)?,
485489
}),
486490
0,
491+
HashSet::new(),
487492
);
488493

489494
// mine blocks to get initial balance, sync emitter up to tip
@@ -506,6 +511,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
506511
assert_eq!(
507512
emitter
508513
.mempool()?
514+
.new_txs
509515
.into_iter()
510516
.map(|(tx, _)| tx.compute_txid())
511517
.collect::<BTreeSet<_>>(),
@@ -515,6 +521,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
515521
assert_eq!(
516522
emitter
517523
.mempool()?
524+
.new_txs
518525
.into_iter()
519526
.map(|(tx, _)| tx.compute_txid())
520527
.collect::<BTreeSet<_>>(),
@@ -535,6 +542,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
535542
.collect::<BTreeSet<_>>();
536543
let emitted_txids = emitter
537544
.mempool()?
545+
.new_txs
538546
.into_iter()
539547
.map(|(tx, _)| tx.compute_txid())
540548
.collect::<BTreeSet<_>>();
@@ -572,6 +580,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
572580
hash: env.rpc_client().get_block_hash(0)?,
573581
}),
574582
0,
583+
HashSet::new(),
575584
);
576585

577586
// mine blocks to get initial balance
@@ -593,6 +602,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
593602
assert_eq!(
594603
emitter
595604
.mempool()?
605+
.new_txs
596606
.into_iter()
597607
.map(|(tx, _)| tx.compute_txid())
598608
.collect::<BTreeSet<_>>(),
@@ -628,6 +638,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
628638
// include mempool txs introduced at reorg height or greater
629639
let mempool = emitter
630640
.mempool()?
641+
.new_txs
631642
.into_iter()
632643
.map(|(tx, _)| tx.compute_txid())
633644
.collect::<BTreeSet<_>>();
@@ -643,6 +654,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
643654

644655
let mempool = emitter
645656
.mempool()?
657+
.new_txs
646658
.into_iter()
647659
.map(|(tx, _)| tx.compute_txid())
648660
.collect::<BTreeSet<_>>();
@@ -696,6 +708,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
696708
hash: env.rpc_client().get_block_hash(0)?,
697709
}),
698710
(PREMINE_COUNT - 2) as u32,
711+
HashSet::new(),
699712
);
700713

701714
// mine 101 blocks
@@ -765,8 +778,8 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
765778
Amount::ONE_BTC,
766779
)?;
767780

768-
let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1);
769-
let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?);
781+
let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, HashSet::from([txid_1]));
782+
let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.new_txs);
770783
assert!(changeset
771784
.tx_graph
772785
.txs
@@ -822,12 +835,11 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
822835
assert!(unseen_txids.contains(&txid_2));
823836

824837
// Update graph with evicted tx.
825-
let exp_txids = exp_spk_txids.into_iter().map(|(_, txid)| txid);
826-
let evicted_txids = mempool_event.evicted_txids(exp_txids);
827-
for txid in evicted_txids {
838+
for txid in mempool_event.evicted_txids {
828839
let _ = graph.insert_evicted_at(txid, seen_at);
829840
}
830841

842+
// tx1 should no longer be canonical.
831843
assert!(graph
832844
.graph()
833845
.list_canonical_txs(&chain, chain_tip)

0 commit comments

Comments
 (0)