Skip to content

Commit 28ef7c9

Browse files
ValuedMammalLagginTimes
authored andcommitted
refactor(rpc)!: update mempool interface and test code
1 parent 86cdaf4 commit 28ef7c9

File tree

3 files changed

+350
-51
lines changed

3 files changed

+350
-51
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 189 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -43,6 +44,16 @@ pub struct Emitter<'c, C> {
4344
/// The last emitted block during our last mempool emission. This is used to determine whether
4445
/// there has been a reorg since our last mempool emission.
4546
last_mempool_tip: Option<u32>,
47+
48+
/// A set of txids currently assumed to still be in the mempool.
49+
///
50+
/// This is used to detect mempool evictions by comparing the set against the latest mempool
51+
/// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered
52+
/// evicted.
53+
///
54+
/// When the emitter emits a block, confirmed txids are removed from this set. This prevents
55+
/// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
56+
expected_mempool_txids: HashSet<Txid>,
4657
}
4758

4859
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -53,18 +64,34 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5364
///
5465
/// `start_height` starts emission from a given height (if there are no conflicts with the
5566
/// original chain).
56-
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
67+
///
68+
/// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
69+
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
70+
pub fn new(
71+
client: &'c C,
72+
last_cp: CheckPoint,
73+
start_height: u32,
74+
expected_mempool_txids: HashSet<Txid>,
75+
) -> Self {
5776
Self {
5877
client,
5978
start_height,
6079
last_cp,
6180
last_block: None,
6281
last_mempool_time: 0,
6382
last_mempool_tip: None,
83+
expected_mempool_txids,
6484
}
6585
}
6686

67-
/// Emit mempool transactions, alongside their first-seen unix timestamps.
87+
/// Emit mempool transactions and any evicted [`Txid`]s.
88+
///
89+
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
90+
/// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
91+
/// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
92+
/// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height
93+
/// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
94+
/// return an empty `evicted_txids` set.
6895
///
6996
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
7097
/// ancestors are already emitted.
@@ -74,7 +101,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
74101
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
75102
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
76103
/// at height `h`.
77-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
104+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
78105
let client = self.client;
79106

80107
// This is the emitted tip height during the last mempool emission.
@@ -84,15 +111,46 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
84111
// `start_height` has been emitted.
85112
.unwrap_or(self.start_height.saturating_sub(1));
86113

114+
// Loop to make sure that the fetched mempool content and the fetched tip are consistent
115+
// with one another.
116+
let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
117+
// Determine if height and hash matches the best block from the RPC. Evictions are deferred
118+
// if we are not at the best block.
119+
let height = client.get_block_count()?;
120+
let hash = client.get_block_hash(height)?;
121+
122+
// Get the raw mempool result from the RPC client which will be used to determine if any
123+
// transactions have been evicted.
124+
let mp = client.get_raw_mempool_verbose()?;
125+
let mp_txids: HashSet<Txid> = mp.keys().copied().collect();
126+
127+
if height == client.get_block_count()? && hash == client.get_block_hash(height)? {
128+
break (mp, mp_txids, height, hash);
129+
}
130+
};
131+
132+
let at_tip =
133+
rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash();
134+
135+
// If at tip, any expected txid missing from raw mempool is considered evicted;
136+
// if not at tip, we don't evict anything.
137+
let evicted_txids: HashSet<Txid> = if at_tip {
138+
self.expected_mempool_txids
139+
.difference(&raw_mempool_txids)
140+
.copied()
141+
.collect()
142+
} else {
143+
HashSet::new()
144+
};
145+
87146
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88147
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
89148
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90149
// be the new latest timestamp.
91150
let prev_mempool_time = self.last_mempool_time;
92151
let mut latest_time = prev_mempool_time;
93152

94-
let txs_to_emit = client
95-
.get_raw_mempool_verbose()?
153+
let new_txs = raw_mempool
96154
.into_iter()
97155
.filter_map({
98156
let latest_time = &mut latest_time;
@@ -101,25 +159,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
101159
if tx_time > *latest_time {
102160
*latest_time = tx_time;
103161
}
104-
105-
// Avoid emitting transactions that are already emitted if we can guarantee
106-
// blocks containing ancestors are already emitted. The bitcoind rpc interface
107-
// provides us with the block height that the tx is introduced to the mempool.
108-
// If we have already emitted the block of height, we can assume that all
109-
// ancestor txs have been processed by the receiver.
162+
// Best-effort check to avoid re-emitting transactions we've already emitted.
163+
//
164+
// Complete suppression isn't possible, since a transaction may spend outputs
165+
// owned by the wallet. To determine if such a transaction is relevant, we must
166+
// have already seen its ancestor(s) that contain the spent prevouts.
167+
//
168+
// Fortunately, bitcoind provides the block height at which the transaction
169+
// entered the mempool. If we've already emitted that block height, we can
170+
// reasonably assume the receiver has seen all ancestor transactions.
110171
let is_already_emitted = tx_time <= prev_mempool_time;
111172
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
112173
if is_already_emitted && is_within_height {
113174
return None;
114175
}
115-
116176
let tx = match client.get_raw_transaction(&txid, None) {
117177
Ok(tx) => tx,
118-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
119178
Err(err) if err.is_not_found_error() => return None,
120179
Err(err) => return Some(Err(err)),
121180
};
122-
123181
Some(Ok((tx, tx_time as u64)))
124182
}
125183
})
@@ -128,22 +186,56 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
128186
self.last_mempool_time = latest_time;
129187
self.last_mempool_tip = Some(self.last_cp.height());
130188

131-
Ok(txs_to_emit)
132-
}
189+
// If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
190+
// still catching up to the tip and keep accumulating.
191+
if at_tip {
192+
self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
193+
} else {
194+
self.expected_mempool_txids
195+
.extend(new_txs.iter().map(|(tx, _)| tx.compute_txid()));
196+
}
133197

134-
/// Emit the next block height and header (if any).
135-
pub fn next_header(&mut self) -> Result<Option<BlockEvent<Header>>, bitcoincore_rpc::Error> {
136-
Ok(poll(self, |hash| self.client.get_block_header(hash))?
137-
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
198+
Ok(MempoolEvent {
199+
new_txs,
200+
evicted_txids,
201+
latest_update_time: latest_time as u64,
202+
})
138203
}
139204

140205
/// Emit the next block height and block (if any).
141206
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
142-
Ok(poll(self, |hash| self.client.get_block(hash))?
143-
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
207+
if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
208+
// Stop tracking unconfirmed transactions that have been confirmed in this block.
209+
for tx in &block.txdata {
210+
self.expected_mempool_txids.remove(&tx.compute_txid());
211+
}
212+
return Ok(Some(BlockEvent { block, checkpoint }));
213+
}
214+
Ok(None)
144215
}
145216
}
146217

218+
/// A new emission from mempool.
219+
#[derive(Debug)]
220+
pub struct MempoolEvent {
221+
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
222+
///
223+
/// To understand the second condition, consider a receiver which filters transactions based on
224+
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
225+
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
226+
/// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
227+
/// block at height `h`.
228+
pub new_txs: Vec<(Transaction, u64)>,
229+
230+
/// [`Txid`]s of all transactions that have been evicted from mempool.
231+
pub evicted_txids: HashSet<Txid>,
232+
233+
/// The latest timestamp of when a transaction entered the mempool.
234+
///
235+
/// This is useful for setting the timestamp for evicted transactions.
236+
pub latest_update_time: u64,
237+
}
238+
147239
/// A newly emitted block from [`Emitter`].
148240
#[derive(Debug)]
149241
pub struct BlockEvent<B> {
@@ -329,3 +421,77 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
329421
}
330422
}
331423
}
424+
425+
#[cfg(test)]
426+
mod test {
427+
use crate::{bitcoincore_rpc::RpcApi, Emitter};
428+
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
429+
use bdk_chain::local_chain::LocalChain;
430+
use bdk_testenv::{anyhow, TestEnv};
431+
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
432+
use std::collections::HashSet;
433+
434+
#[test]
435+
fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
436+
let env = TestEnv::new()?;
437+
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
438+
let chain_tip = chain.tip();
439+
let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
440+
441+
env.mine_blocks(100, None)?;
442+
while emitter.next_block()?.is_some() {}
443+
444+
let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
445+
let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
446+
let mut mempool_txids = HashSet::new();
447+
448+
// Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
449+
for _ in 0..10 {
450+
let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
451+
mempool_txids.insert(sent_txid);
452+
emitter.mempool()?;
453+
env.mine_blocks(1, None)?;
454+
455+
for txid in &mempool_txids {
456+
assert!(
457+
emitter.expected_mempool_txids.contains(txid),
458+
"Expected txid {:?} missing",
459+
txid
460+
);
461+
}
462+
}
463+
464+
// Process each block and check that confirmed txids are removed from from
465+
// expected_mempool_txids.
466+
while let Some(block_event) = emitter.next_block()? {
467+
let confirmed_txids: HashSet<Txid> = block_event
468+
.block
469+
.txdata
470+
.iter()
471+
.map(|tx| tx.compute_txid())
472+
.collect();
473+
mempool_txids = mempool_txids
474+
.difference(&confirmed_txids)
475+
.copied()
476+
.collect::<HashSet<_>>();
477+
for txid in confirmed_txids {
478+
assert!(
479+
!emitter.expected_mempool_txids.contains(&txid),
480+
"Expected txid {:?} should have been removed",
481+
txid
482+
);
483+
}
484+
for txid in &mempool_txids {
485+
assert!(
486+
emitter.expected_mempool_txids.contains(txid),
487+
"Expected txid {:?} missing",
488+
txid
489+
);
490+
}
491+
}
492+
493+
assert!(emitter.expected_mempool_txids.is_empty());
494+
495+
Ok(())
496+
}
497+
}

0 commit comments

Comments
 (0)