Skip to content

Commit ca37ae2

Browse files
committed
refactor(rpc)!: update mempool interface and test code
1 parent c727cd2 commit ca37ae2

File tree

3 files changed

+135
-33
lines changed

3 files changed

+135
-33
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 101 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -43,6 +44,9 @@ pub struct Emitter<'c, C> {
4344
/// The last emitted block during our last mempool emission. This is used to determine whether
4445
/// there has been a reorg since our last mempool emission.
4546
last_mempool_tip: Option<u32>,
47+
48+
/// Expected mempool txs. TODO: Docs.
49+
expected_mempool_txids: HashSet<Txid>,
4650
}
4751

4852
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -53,28 +57,36 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5357
///
5458
/// `start_height` starts emission from a given height (if there are no conflicts with the
5559
/// original chain).
56-
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
60+
pub fn new(
61+
client: &'c C,
62+
last_cp: CheckPoint,
63+
start_height: u32,
64+
expected_mempool_txids: HashSet<Txid>,
65+
) -> Self {
5766
Self {
5867
client,
5968
start_height,
6069
last_cp,
6170
last_block: None,
6271
last_mempool_time: 0,
6372
last_mempool_tip: None,
73+
expected_mempool_txids,
6474
}
6575
}
6676

67-
/// Emit mempool transactions, alongside their first-seen unix timestamps.
77+
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
6878
///
69-
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
70-
/// ancestors are already emitted.
79+
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
80+
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
81+
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
82+
/// transaction's ancestors are already emitted.
7183
///
7284
/// To understand why, consider a receiver which filters transactions based on whether it
7385
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
7486
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
7587
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
7688
/// at height `h`.
77-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
89+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
7890
let client = self.client;
7991

8092
// This is the emitted tip height during the last mempool emission.
@@ -84,24 +96,64 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
8496
// `start_height` has been emitted.
8597
.unwrap_or(self.start_height.saturating_sub(1));
8698

99+
// Get the raw mempool result from the RPC client.
100+
let raw_mempool = client.get_raw_mempool_verbose()?;
101+
let raw_mempool_txids: HashSet<Txid> = raw_mempool.keys().copied().collect();
102+
103+
// Determine the latest block height from the raw mempool data.
104+
let latest_height = raw_mempool
105+
.values()
106+
.map(|entry| entry.height)
107+
.max()
108+
.unwrap_or(0);
109+
110+
// Determine if we are at tip: if the highest mempool height equals the current chain tip.
111+
let at_tip = latest_height == self.last_cp.height() as u64;
112+
113+
// Clear out `expected_mempool_txids` if we are at the latest block height or if we are on a
114+
// different block.
115+
if prev_mempool_tip != self.last_cp.height() {
116+
self.expected_mempool_txids.clear();
117+
} else if at_tip {
118+
let blockhash = client.get_block_hash(latest_height)?;
119+
let block = client.get_block(&blockhash)?;
120+
let confirmed_txids = block
121+
.txdata
122+
.into_iter()
123+
.map(|tx| tx.compute_txid())
124+
.collect::<Vec<_>>();
125+
self.expected_mempool_txids
126+
.retain(|txid| !confirmed_txids.contains(txid));
127+
}
128+
129+
// If at tip, any expected txid missing from raw mempool is considered evicted;
130+
// if not at tip, we don't evict anything.
131+
let mut evicted_txids: HashSet<Txid> = if at_tip {
132+
self.expected_mempool_txids
133+
.difference(&raw_mempool_txids)
134+
.copied()
135+
.collect()
136+
} else {
137+
HashSet::new()
138+
};
139+
87140
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88141
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
89142
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90143
// be the new latest timestamp.
91144
let prev_mempool_time = self.last_mempool_time;
92145
let mut latest_time = prev_mempool_time;
93146

94-
let txs_to_emit = client
95-
.get_raw_mempool_verbose()?
147+
let new_txs = raw_mempool
96148
.into_iter()
97149
.filter_map({
98150
let latest_time = &mut latest_time;
151+
let evicted_txids = &mut evicted_txids;
99152
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
100153
let tx_time = tx_entry.time as usize;
101154
if tx_time > *latest_time {
102155
*latest_time = tx_time;
103156
}
104-
105157
// Avoid emitting transactions that are already emitted if we can guarantee
106158
// blocks containing ancestors are already emitted. The bitcoind rpc interface
107159
// provides us with the block height that the tx is introduced to the mempool.
@@ -112,23 +164,36 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
112164
if is_already_emitted && is_within_height {
113165
return None;
114166
}
115-
116167
let tx = match client.get_raw_transaction(&txid, None) {
117168
Ok(tx) => tx,
118-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
119-
Err(err) if err.is_not_found_error() => return None,
169+
Err(err) if err.is_not_found_error() => {
170+
// If at tip and the transaction isn't found, mark it as evicted.
171+
if at_tip {
172+
evicted_txids.insert(txid);
173+
}
174+
return None;
175+
}
120176
Err(err) => return Some(Err(err)),
121177
};
122-
123178
Some(Ok((tx, tx_time as u64)))
124179
}
125180
})
126181
.collect::<Result<Vec<_>, _>>()?;
127182

128183
self.last_mempool_time = latest_time;
129184
self.last_mempool_tip = Some(self.last_cp.height());
130-
131-
Ok(txs_to_emit)
185+
self.expected_mempool_txids.extend(
186+
new_txs
187+
.iter()
188+
.map(|(tx, _)| tx.compute_txid())
189+
.collect::<Vec<_>>(),
190+
);
191+
192+
Ok(MempoolEvent {
193+
new_txs,
194+
evicted_txids,
195+
latest_update_time: latest_time as u64,
196+
})
132197
}
133198

134199
/// Emit the next block height and header (if any).
@@ -144,6 +209,27 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
144209
}
145210
}
146211

212+
/// A new emission from mempool.
213+
#[derive(Debug)]
214+
pub struct MempoolEvent {
215+
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
216+
///
217+
/// To understand the second condition, consider a receiver which filters transactions based on
218+
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
219+
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
220+
/// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
221+
/// block at height `h`.
222+
pub new_txs: Vec<(Transaction, u64)>,
223+
224+
/// [`Txid`]s of all transactions that have been evicted from mempool.
225+
pub evicted_txids: HashSet<Txid>,
226+
227+
/// The latest timestamp of when a transaction entered the mempool.
228+
///
229+
/// This is useful for setting the timestamp for evicted transactions.
230+
pub latest_update_time: u64,
231+
}
232+
147233
/// A newly emitted block from [`Emitter`].
148234
#[derive(Debug)]
149235
pub struct BlockEvent<B> {

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, BTreeSet};
1+
use std::collections::{BTreeMap, BTreeSet, HashSet};
22

33
use bdk_bitcoind_rpc::Emitter;
44
use bdk_chain::{
@@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2222
let env = TestEnv::new()?;
2323
let network_tip = env.rpc_client().get_block_count()?;
2424
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
25+
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
2626

2727
// Mine some blocks and return the actual block hashes.
2828
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156156
index
157157
});
158158

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
159+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
160160

161161
while let Some(emission) = emitter.next_block()? {
162162
let height = emission.block_height();
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
189189
assert!(emitter.next_block()?.is_none());
190190

191191
let mempool_txs = emitter.mempool()?;
192-
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
192+
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
193193
assert_eq!(
194194
indexed_additions
195195
.tx_graph
@@ -252,6 +252,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252252
hash: env.rpc_client().get_block_hash(0)?,
253253
}),
254254
EMITTER_START_HEIGHT as _,
255+
HashSet::new(),
255256
);
256257

257258
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -328,6 +329,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
328329
hash: env.rpc_client().get_block_hash(0)?,
329330
}),
330331
0,
332+
HashSet::new(),
331333
);
332334

333335
// setup addresses
@@ -419,6 +421,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
419421
hash: env.rpc_client().get_block_hash(0)?,
420422
}),
421423
0,
424+
HashSet::new(),
422425
);
423426

424427
// mine blocks and sync up emitter
@@ -437,6 +440,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
437440
// the first emission should include all transactions
438441
let emitted_txids = emitter
439442
.mempool()?
443+
.new_txs
440444
.into_iter()
441445
.map(|(tx, _)| tx.compute_txid())
442446
.collect::<BTreeSet<Txid>>();
@@ -447,7 +451,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
447451

448452
// second emission should be empty
449453
assert!(
450-
emitter.mempool()?.is_empty(),
454+
emitter.mempool()?.new_txs.is_empty(),
451455
"second emission should be empty"
452456
);
453457

@@ -457,7 +461,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
457461
}
458462
while emitter.next_header()?.is_some() {}
459463
assert!(
460-
emitter.mempool()?.is_empty(),
464+
emitter.mempool()?.new_txs.is_empty(),
461465
"third emission, after chain tip is extended, should also be empty"
462466
);
463467

@@ -484,6 +488,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
484488
hash: env.rpc_client().get_block_hash(0)?,
485489
}),
486490
0,
491+
HashSet::new(),
487492
);
488493

489494
// mine blocks to get initial balance, sync emitter up to tip
@@ -506,6 +511,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
506511
assert_eq!(
507512
emitter
508513
.mempool()?
514+
.new_txs
509515
.into_iter()
510516
.map(|(tx, _)| tx.compute_txid())
511517
.collect::<BTreeSet<_>>(),
@@ -515,6 +521,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
515521
assert_eq!(
516522
emitter
517523
.mempool()?
524+
.new_txs
518525
.into_iter()
519526
.map(|(tx, _)| tx.compute_txid())
520527
.collect::<BTreeSet<_>>(),
@@ -535,6 +542,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
535542
.collect::<BTreeSet<_>>();
536543
let emitted_txids = emitter
537544
.mempool()?
545+
.new_txs
538546
.into_iter()
539547
.map(|(tx, _)| tx.compute_txid())
540548
.collect::<BTreeSet<_>>();
@@ -572,6 +580,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
572580
hash: env.rpc_client().get_block_hash(0)?,
573581
}),
574582
0,
583+
HashSet::new(),
575584
);
576585

577586
// mine blocks to get initial balance
@@ -593,6 +602,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
593602
assert_eq!(
594603
emitter
595604
.mempool()?
605+
.new_txs
596606
.into_iter()
597607
.map(|(tx, _)| tx.compute_txid())
598608
.collect::<BTreeSet<_>>(),
@@ -628,6 +638,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
628638
// include mempool txs introduced at reorg height or greater
629639
let mempool = emitter
630640
.mempool()?
641+
.new_txs
631642
.into_iter()
632643
.map(|(tx, _)| tx.compute_txid())
633644
.collect::<BTreeSet<_>>();
@@ -643,6 +654,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
643654

644655
let mempool = emitter
645656
.mempool()?
657+
.new_txs
646658
.into_iter()
647659
.map(|(tx, _)| tx.compute_txid())
648660
.collect::<BTreeSet<_>>();
@@ -696,6 +708,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
696708
hash: env.rpc_client().get_block_hash(0)?,
697709
}),
698710
(PREMINE_COUNT - 2) as u32,
711+
HashSet::new(),
699712
);
700713

701714
// mine 101 blocks
@@ -821,9 +834,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
821834
assert!(mempool_event.evicted_txids.contains(&txid_1));
822835

823836
// Update graph with evicted tx.
824-
let exp_txids = exp_spk_txids.into_iter().map(|(_, txid)| txid);
825-
let evicted_txids = mempool_event.evicted_txids(exp_txids);
826-
for txid in evicted_txids {
837+
for txid in mempool_event.evicted_txids {
827838
let _ = graph.insert_evicted_at(txid, seen_at);
828839
}
829840

0 commit comments

Comments
 (0)