Skip to content

Commit 57590e0

Browse files
committed
bitcoind_rpc: rm BlockHash from Emitter::last_mempool_tip
Instead of comparing the blockhash against the emitted_blocks map to see whether the block is part of the emitter's best chain, we reduce the `last_mempool_tip` height to the last agreement height during the polling logic. The benefits of this is we have tighter bounds for avoiding re- emission. Also, it will be easier to replace `emitted_blocks` to a `CheckPoint` (since we no longer rely on map lookup).
1 parent 6d4b33e commit 57590e0

File tree

2 files changed

+66
-25
lines changed

2 files changed

+66
-25
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub struct Emitter<'c, C> {
3333

3434
/// The last emitted block during our last mempool emission. This is used to determine whether
3535
/// there has been a reorg since our last mempool emission.
36-
last_mempool_tip: Option<(u32, BlockHash)>,
36+
last_mempool_tip: Option<u32>,
3737
}
3838

3939
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -65,12 +65,17 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
6565
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
6666
let client = self.client;
6767

68-
let prev_mempool_tip = match self.last_mempool_tip {
69-
// use 'avoid-re-emission' logic if there is no reorg
70-
Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height,
71-
_ => 0,
72-
};
73-
68+
// This is the emitted tip height during the last mempool emission.
69+
let prev_mempool_tip = self
70+
.last_mempool_tip
71+
// We use `start_height - 1` as we cannot guarantee that the block at
72+
// `start_height` has been emitted.
73+
.unwrap_or(self.start_height.saturating_sub(1));
74+
75+
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
76+
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
77+
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
78+
// be the new latest timestamp.
7479
let prev_mempool_time = self.last_mempool_time;
7580
let mut latest_time = prev_mempool_time;
7681

@@ -109,11 +114,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
109114
.collect::<Result<Vec<_>, _>>()?;
110115

111116
self.last_mempool_time = latest_time;
112-
self.last_mempool_tip = self
113-
.emitted_blocks
114-
.iter()
115-
.last()
116-
.map(|(&height, &hash)| (height, hash));
117+
self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height);
117118

118119
Ok(txs_to_emit)
119120
}
@@ -209,7 +210,18 @@ where
209210
continue;
210211
}
211212
PollResponse::AgreementFound(res) => {
212-
emitter.emitted_blocks.split_off(&(res.height as u32 + 1));
213+
let agreement_h = res.height as u32;
214+
215+
// get rid of evicted blocks
216+
emitter.emitted_blocks.split_off(&(agreement_h + 1));
217+
218+
// The tip during the last mempool emission needs to in the best chain, we reduce
219+
// it if it is not.
220+
if let Some(h) = emitter.last_mempool_tip.as_mut() {
221+
if *h > agreement_h {
222+
*h = agreement_h;
223+
}
224+
}
213225
emitter.last_block = Some(res);
214226
continue;
215227
}

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,8 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
368368
// must receive mined block which will confirm the transactions.
369369
{
370370
let (height, block) = emitter.next_block()?.expect("must get mined block");
371-
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
371+
let _ = chain
372+
.apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
372373
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
373374
assert!(indexed_additions.graph.txs.is_empty());
374375
assert!(indexed_additions.graph.txouts.is_empty());
@@ -685,34 +686,59 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
685686
env.send(&addr, Amount::from_sat(2100))?;
686687
}
687688

688-
// perform reorgs at different heights
689-
for reorg_count in 1..TIP_DIFF {
690-
// sync emitter to tip
691-
while emitter.next_header()?.is_some() {}
689+
// sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
690+
// from the mempool yet)
691+
while emitter.next_header()?.is_some() {}
692+
assert_eq!(
693+
emitter
694+
.mempool()?
695+
.into_iter()
696+
.map(|(tx, _)| tx.txid())
697+
.collect::<BTreeSet<_>>(),
698+
env.client
699+
.get_raw_mempool()?
700+
.into_iter()
701+
.collect::<BTreeSet<_>>(),
702+
"first mempool emission should include all txs",
703+
);
692704

705+
// perform reorgs at different heights, these reorgs will not comfirm transactions in the
706+
// mempool
707+
for reorg_count in 1..TIP_DIFF {
693708
println!("REORG COUNT: {}", reorg_count);
694709
env.reorg_empty_blocks(reorg_count)?;
695710

696-
// we recalculate this at every loop as reorgs may evict transactions from mempool
697-
let tx_introductions = env
711+
// This is a map of mempool txids to tip height where the tx was introduced to the mempool
712+
// we recalculate this at every loop as reorgs may evict transactions from mempool. We use
713+
// the introduction height to determine whether we expect a tx to appear in a mempool
714+
// emission.
715+
// TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first?
716+
let tx_introductions = dbg!(env
698717
.client
699718
.get_raw_mempool_verbose()?
700719
.into_iter()
701720
.map(|(txid, entry)| (txid, entry.height as usize))
702-
.collect::<BTreeMap<_, _>>();
721+
.collect::<BTreeMap<_, _>>());
703722

723+
// `next_header` emits the replacement block of the reorg
704724
if let Some((height, _)) = emitter.next_header()? {
705-
// the mempool emission (that follows the first block emission after reorg) should return
706-
// the entire mempool contents
725+
println!("\t- replacement height: {}", height);
726+
727+
// the mempool emission (that follows the first block emission after reorg) should only
728+
// include mempool txs introduced at reorg height or greater
707729
let mempool = emitter
708730
.mempool()?
709731
.into_iter()
710732
.map(|(tx, _)| tx.txid())
711733
.collect::<BTreeSet<_>>();
712-
let exp_mempool = tx_introductions.keys().copied().collect::<BTreeSet<_>>();
734+
let exp_mempool = tx_introductions
735+
.iter()
736+
.filter(|(_, &intro_h)| intro_h >= (height as usize))
737+
.map(|(&txid, _)| txid)
738+
.collect::<BTreeSet<_>>();
713739
assert_eq!(
714740
mempool, exp_mempool,
715-
"the first mempool emission after reorg should include all mempool txs"
741+
"the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater"
716742
);
717743

718744
let mempool = emitter
@@ -738,6 +764,9 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
738764
.collect::<Vec<_>>(),
739765
);
740766
}
767+
768+
// sync emitter to tip
769+
while emitter.next_header()?.is_some() {}
741770
}
742771

743772
Ok(())

0 commit comments

Comments
 (0)