Skip to content

Commit 8074550

Browse files
committed
refactor(rpc)!: update mempool interface and test code
1 parent 1a70384 commit 8074550

File tree

3 files changed

+133
-35
lines changed

3 files changed

+133
-35
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 100 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -44,8 +45,14 @@ pub struct Emitter<'c, C> {
4445
/// there has been a reorg since our last mempool emission.
4546
last_mempool_tip: Option<u32>,
4647

47-
/// Unconfirmed txids that are expected to appear in mempool. This is used to determine if any
48-
/// known txids have been evicted.
48+
/// A set of txids currently assumed to still be in the mempool.
49+
///
50+
/// This is used to detect mempool evictions by comparing the set against the latest mempool
51+
/// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered
52+
/// evicted.
53+
///
54+
/// When the emitter emits a block, confirmed txids are removed from this set. This prevents
55+
/// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
4956
expected_mempool_txids: HashSet<Txid>,
5057
}
5158

@@ -58,8 +65,8 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5865
/// `start_height` starts emission from a given height (if there are no conflicts with the
5966
/// original chain).
6067
///
61-
/// `expected_mempool_txids` is the initial set of unconfirmed txids. Once at tip, any that are
62-
/// no longer in mempool are marked evicted.
68+
/// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
69+
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
6370
pub fn new(
6471
client: &'c C,
6572
last_cp: CheckPoint,
@@ -73,11 +80,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
7380
last_block: None,
7481
last_mempool_time: 0,
7582
last_mempool_tip: None,
83+
expected_mempool_txids,
7684
}
7785
}
7886

79-
/// Emit mempool transactions and any evicted [`Txid`]s. Returns a `latest_update_time` which is
80-
/// used for setting the timestamp for evicted transactions.
87+
/// Emit mempool transactions and any evicted [`Txid`]s.
8188
///
8289
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
8390
/// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
@@ -93,7 +100,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
93100
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
94101
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
95102
/// at height `h`.
96-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
103+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
97104
let client = self.client;
98105

99106
// This is the emitted tip height during the last mempool emission.
@@ -120,7 +127,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
120127

121128
// If at tip, any expected txid missing from raw mempool is considered evicted;
122129
// if not at tip, we don't evict anything.
123-
let mut evicted_txids: HashSet<Txid> = if at_tip {
130+
let evicted_txids: HashSet<Txid> = if at_tip {
124131
self.expected_mempool_txids
125132
.difference(&raw_mempool_txids)
126133
.copied()
@@ -136,8 +143,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
136143
let prev_mempool_time = self.last_mempool_time;
137144
let mut latest_time = prev_mempool_time;
138145

139-
let txs_to_emit = client
140-
.get_raw_mempool_verbose()?
146+
let new_txs = raw_mempool
141147
.into_iter()
142148
.filter_map({
143149
let latest_time = &mut latest_time;
@@ -146,25 +152,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
146152
if tx_time > *latest_time {
147153
*latest_time = tx_time;
148154
}
149-
150-
// Avoid emitting transactions that are already emitted if we can guarantee
151-
// blocks containing ancestors are already emitted. The bitcoind rpc interface
152-
// provides us with the block height that the tx is introduced to the mempool.
153-
// If we have already emitted the block of height, we can assume that all
154-
// ancestor txs have been processed by the receiver.
155+
// Best-effort check to avoid re-emitting transactions we've already emitted.
156+
//
157+
// Complete suppression isn't possible, since a transaction may spend outputs
158+
// owned by the wallet. To determine if such a transaction is relevant, we must
159+
// have already seen its ancestor(s) that contain the spent prevouts.
160+
//
161+
// Fortunately, bitcoind provides the block height at which the transaction
162+
// entered the mempool. If we've already emitted that block height, we can
163+
// reasonably assume the receiver has seen all ancestor transactions.
155164
let is_already_emitted = tx_time <= prev_mempool_time;
156165
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
157166
if is_already_emitted && is_within_height {
158167
return None;
159168
}
160-
161169
let tx = match client.get_raw_transaction(&txid, None) {
162170
Ok(tx) => tx,
163-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
164171
Err(err) if err.is_not_found_error() => return None,
165172
Err(err) => return Some(Err(err)),
166173
};
167-
168174
Some(Ok((tx, tx_time as u64)))
169175
}
170176
})
@@ -414,3 +420,77 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
414420
}
415421
}
416422
}
423+
424+
#[cfg(test)]
425+
mod test {
426+
use crate::{bitcoincore_rpc::RpcApi, Emitter};
427+
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
428+
use bdk_chain::local_chain::LocalChain;
429+
use bdk_testenv::{anyhow, TestEnv};
430+
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
431+
use std::collections::HashSet;
432+
433+
#[test]
434+
fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
435+
let env = TestEnv::new()?;
436+
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
437+
let chain_tip = chain.tip();
438+
let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
439+
440+
env.mine_blocks(100, None)?;
441+
while emitter.next_block()?.is_some() {}
442+
443+
let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
444+
let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
445+
let mut mempool_txids = HashSet::new();
446+
447+
// Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
448+
for _ in 0..10 {
449+
let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
450+
mempool_txids.insert(sent_txid);
451+
emitter.mempool()?;
452+
env.mine_blocks(1, None)?;
453+
454+
for txid in &mempool_txids {
455+
assert!(
456+
emitter.expected_mempool_txids.contains(txid),
457+
"Expected txid {:?} missing",
458+
txid
459+
);
460+
}
461+
}
462+
463+
// Process each block and check that confirmed txids are removed from from
464+
// expected_mempool_txids.
465+
while let Some(block_event) = emitter.next_block()? {
466+
let confirmed_txids: HashSet<Txid> = block_event
467+
.block
468+
.txdata
469+
.iter()
470+
.map(|tx| tx.compute_txid())
471+
.collect();
472+
mempool_txids = mempool_txids
473+
.difference(&confirmed_txids)
474+
.copied()
475+
.collect::<HashSet<_>>();
476+
for txid in confirmed_txids {
477+
assert!(
478+
!emitter.expected_mempool_txids.contains(&txid),
479+
"Expected txid {:?} should have been removed",
480+
txid
481+
);
482+
}
483+
for txid in &mempool_txids {
484+
assert!(
485+
emitter.expected_mempool_txids.contains(txid),
486+
"Expected txid {:?} missing",
487+
txid
488+
);
489+
}
490+
}
491+
492+
assert!(emitter.expected_mempool_txids.is_empty());
493+
494+
Ok(())
495+
}
496+
}

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, BTreeSet};
1+
use std::collections::{BTreeMap, BTreeSet, HashSet};
22

33
use bdk_bitcoind_rpc::Emitter;
44
use bdk_chain::{
@@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2222
let env = TestEnv::new()?;
2323
let network_tip = env.rpc_client().get_block_count()?;
2424
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
25+
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
2626

2727
// Mine some blocks and return the actual block hashes.
2828
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156156
index
157157
});
158158

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
159+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
160160

161161
while let Some(emission) = emitter.next_block()? {
162162
let height = emission.block_height();
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
189189
assert!(emitter.next_block()?.is_none());
190190

191191
let mempool_txs = emitter.mempool()?;
192-
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
192+
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
193193
assert_eq!(
194194
indexed_additions
195195
.tx_graph
@@ -252,6 +252,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252252
hash: env.rpc_client().get_block_hash(0)?,
253253
}),
254254
EMITTER_START_HEIGHT as _,
255+
HashSet::new(),
255256
);
256257

257258
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -328,6 +329,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
328329
hash: env.rpc_client().get_block_hash(0)?,
329330
}),
330331
0,
332+
HashSet::new(),
331333
);
332334

333335
// setup addresses
@@ -419,6 +421,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
419421
hash: env.rpc_client().get_block_hash(0)?,
420422
}),
421423
0,
424+
HashSet::new(),
422425
);
423426

424427
// mine blocks and sync up emitter
@@ -437,6 +440,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
437440
// the first emission should include all transactions
438441
let emitted_txids = emitter
439442
.mempool()?
443+
.new_txs
440444
.into_iter()
441445
.map(|(tx, _)| tx.compute_txid())
442446
.collect::<BTreeSet<Txid>>();
@@ -447,7 +451,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
447451

448452
// second emission should be empty
449453
assert!(
450-
emitter.mempool()?.is_empty(),
454+
emitter.mempool()?.new_txs.is_empty(),
451455
"second emission should be empty"
452456
);
453457

@@ -457,7 +461,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
457461
}
458462
while emitter.next_header()?.is_some() {}
459463
assert!(
460-
emitter.mempool()?.is_empty(),
464+
emitter.mempool()?.new_txs.is_empty(),
461465
"third emission, after chain tip is extended, should also be empty"
462466
);
463467

@@ -484,6 +488,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
484488
hash: env.rpc_client().get_block_hash(0)?,
485489
}),
486490
0,
491+
HashSet::new(),
487492
);
488493

489494
// mine blocks to get initial balance, sync emitter up to tip
@@ -506,6 +511,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
506511
assert_eq!(
507512
emitter
508513
.mempool()?
514+
.new_txs
509515
.into_iter()
510516
.map(|(tx, _)| tx.compute_txid())
511517
.collect::<BTreeSet<_>>(),
@@ -515,6 +521,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
515521
assert_eq!(
516522
emitter
517523
.mempool()?
524+
.new_txs
518525
.into_iter()
519526
.map(|(tx, _)| tx.compute_txid())
520527
.collect::<BTreeSet<_>>(),
@@ -535,6 +542,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
535542
.collect::<BTreeSet<_>>();
536543
let emitted_txids = emitter
537544
.mempool()?
545+
.new_txs
538546
.into_iter()
539547
.map(|(tx, _)| tx.compute_txid())
540548
.collect::<BTreeSet<_>>();
@@ -572,6 +580,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
572580
hash: env.rpc_client().get_block_hash(0)?,
573581
}),
574582
0,
583+
HashSet::new(),
575584
);
576585

577586
// mine blocks to get initial balance
@@ -593,6 +602,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
593602
assert_eq!(
594603
emitter
595604
.mempool()?
605+
.new_txs
596606
.into_iter()
597607
.map(|(tx, _)| tx.compute_txid())
598608
.collect::<BTreeSet<_>>(),
@@ -628,6 +638,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
628638
// include mempool txs introduced at reorg height or greater
629639
let mempool = emitter
630640
.mempool()?
641+
.new_txs
631642
.into_iter()
632643
.map(|(tx, _)| tx.compute_txid())
633644
.collect::<BTreeSet<_>>();
@@ -643,6 +654,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
643654

644655
let mempool = emitter
645656
.mempool()?
657+
.new_txs
646658
.into_iter()
647659
.map(|(tx, _)| tx.compute_txid())
648660
.collect::<BTreeSet<_>>();
@@ -696,6 +708,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
696708
hash: env.rpc_client().get_block_hash(0)?,
697709
}),
698710
(PREMINE_COUNT - 2) as u32,
711+
HashSet::new(),
699712
);
700713

701714
// mine 101 blocks

0 commit comments

Comments
 (0)