Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
#![warn(missing_docs)]

use bdk_core::{BlockId, CheckPoint};
use bitcoin::{block::Header, Block, BlockHash, Transaction};
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
use bitcoincore_rpc::bitcoincore_rpc_json;
use std::collections::HashSet;

pub mod bip158;

Expand Down Expand Up @@ -64,17 +65,19 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
}
}

/// Emit mempool transactions, alongside their first-seen unix timestamps.
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
///
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
/// ancestors are already emitted.
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
/// transaction's ancestors are already emitted.
///
/// To understand why, consider a receiver which filters transactions based on whether it
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
/// at height `h`.
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
let client = self.client;

// This is the emitted tip height during the last mempool emission.
Expand All @@ -91,8 +94,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
let prev_mempool_time = self.last_mempool_time;
let mut latest_time = prev_mempool_time;

let txs_to_emit = client
.get_raw_mempool_verbose()?
// Get the raw mempool result from the RPC client.
let raw_mempool = client.get_raw_mempool_verbose()?;
let raw_mempool_txids = raw_mempool.keys().copied().collect::<HashSet<_>>();

let emitted_txs = raw_mempool
.into_iter()
.filter_map({
let latest_time = &mut latest_time;
Expand Down Expand Up @@ -128,7 +134,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
self.last_mempool_time = latest_time;
self.last_mempool_tip = Some(self.last_cp.height());

Ok(txs_to_emit)
Ok(MempoolEvent {
emitted_txs,
raw_mempool_txids,
last_seen: latest_time as u64,
})
}

/// Emit the next block height and header (if any).
Expand All @@ -144,6 +154,37 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
}
}

/// A new emission from mempool.
#[derive(Debug)]
pub struct MempoolEvent {
/// Emitted mempool transactions with their first‐seen unix timestamps.
pub emitted_txs: Vec<(Transaction, u64)>,

/// Set of all [`Txid`]s from the raw mempool result, including transactions that may have been
/// confirmed or evicted during processing. This is used to determine which expected
/// transactions are missing.
pub raw_mempool_txids: HashSet<Txid>,

/// The latest first-seen epoch of emitted mempool transactions.
pub last_seen: u64,
}

impl MempoolEvent {
/// Given an iterator of expected [`Txid`]s, return those that are missing from the mempool.
pub fn evicted_txids(
&self,
expected_unconfirmed_txids: impl IntoIterator<Item = Txid>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For spk-based syncing we want to use all canonical txids (including confirmed) to determine evicted txs. The reason is because spk-based APIs returns both confirmed and unconfirmed under a given spk and that a missing previously-confirmed tx is also evicted from the mempool.

For rpc-based, we obviously want to only input unconfirmed txs as block (confirmed events) are handled separately...

We can add an include_confirmed: bool param on TxGraph/IndexedTxGraph methods that lists expected txs... but that does feel seen like it's easy to screw up.

Another idea is to compare the MempoolEvent txids to the previous MempoolEvent txids and the difference that does not end up in a block is evicted... However, when we initiate the Emitter, we still need the wallet state of unconfirmed txs as we no longer have all the data from the last MempoolEvent emission.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's best to combine those two ideas.

This way we don't have to get something from our receiving structures (expected_unconfirmed_txids) before applying the update.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that the entire txid history is strictly needed in the expected set.

a missing previously-confirmed tx is also evicted from the mempool.

Perhaps, but maybe we're only concerned with reorgs below some number of confirmations? For a busy wallet the set of expected txids could be quite large which could hinder sync performance over time, considering that the normal fate of a transaction is to gain confirmations and never disappear from the blockchain.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add an include_confirmed: bool param on TxGraph/IndexedTxGraph methods that lists expected txs... but that does feel seen like it's easy to screw up.

Or have a method that takes an iterator of canonical txs, that way the caller can filter them on a number of confirmations for example.

) -> HashSet<Txid> {
let expected_set = expected_unconfirmed_txids
.into_iter()
.collect::<HashSet<_>>();
expected_set
.difference(&self.raw_mempool_txids)
.copied()
.collect()
}
}

/// A newly emitted block from [`Emitter`].
#[derive(Debug)]
pub struct BlockEvent<B> {
Expand Down
117 changes: 114 additions & 3 deletions crates/bitcoind_rpc/tests/test_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
assert!(emitter.next_block()?.is_none());

let mempool_txs = emitter.mempool()?;
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.emitted_txs);
assert_eq!(
indexed_additions
.tx_graph
Expand Down Expand Up @@ -437,6 +437,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
// the first emission should include all transactions
let emitted_txids = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<Txid>>();
Expand All @@ -447,7 +448,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {

// second emission should be empty
assert!(
emitter.mempool()?.is_empty(),
emitter.mempool()?.emitted_txs.is_empty(),
"second emission should be empty"
);

Expand All @@ -457,7 +458,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
}
while emitter.next_header()?.is_some() {}
assert!(
emitter.mempool()?.is_empty(),
emitter.mempool()?.emitted_txs.is_empty(),
"third emission, after chain tip is extended, should also be empty"
);

Expand Down Expand Up @@ -506,6 +507,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
assert_eq!(
emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
Expand All @@ -515,6 +517,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
assert_eq!(
emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
Expand All @@ -535,6 +538,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
.collect::<BTreeSet<_>>();
let emitted_txids = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -593,6 +597,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
assert_eq!(
emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
Expand Down Expand Up @@ -628,6 +633,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
// include mempool txs introduced at reorg height or greater
let mempool = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
Expand All @@ -643,6 +649,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {

let mempool = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -731,3 +738,107 @@ fn no_agreement_point() -> anyhow::Result<()> {

Ok(())
}

#[test]
fn test_expect_tx_evicted() -> anyhow::Result<()> {
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin;
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput;
use bdk_chain::miniscript;
use bdk_chain::spk_txout::SpkTxOutIndex;
use bdk_chain::ConfirmationBlockTime;
use bitcoin::constants::genesis_block;
use bitcoin::secp256k1::Secp256k1;
use bitcoin::Network;
use std::collections::HashMap;
let env = TestEnv::new()?;

let s = bdk_testenv::utils::DESCRIPTORS[0];
let desc = miniscript::Descriptor::parse_descriptor(&Secp256k1::new(), s)
.unwrap()
.0;
let spk = desc.at_derivation_index(0)?.script_pubkey();

let chain = LocalChain::from_genesis_hash(genesis_block(Network::Regtest).block_hash()).0;
let chain_tip = chain.tip().block_id();

let mut index = SpkTxOutIndex::default();
index.insert_spk(("external", 0u32), spk.clone());
let mut graph = IndexedTxGraph::<ConfirmationBlockTime, _>::new(index);

// Receive tx1.
let _ = env.mine_blocks(100, None)?;
let txid_1 = env.send(
&Address::from_script(&spk, Network::Regtest)?,
Amount::ONE_BTC,
)?;

let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1);
let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.emitted_txs);
assert!(changeset
.tx_graph
.txs
.iter()
.any(|tx| tx.compute_txid() == txid_1));
let seen_at = graph
.graph()
.get_tx_node(txid_1)
.unwrap()
.last_seen_unconfirmed
.unwrap();

// Double spend tx1.

// Get `prevout` from core.
let core = env.rpc_client();
let tx1 = &core.get_raw_transaction(&txid_1, None)?;
let txin = &tx1.input[0];
let op = txin.previous_output;

// Create `tx1b` using the previous output from tx1.
let utxo = CreateRawTransactionInput {
txid: op.txid,
vout: op.vout,
sequence: None,
};
let addr = core.get_new_address(None, None)?.assume_checked();
let tx = core.create_raw_transaction(
&[utxo],
&HashMap::from([(addr.to_string(), Amount::from_btc(49.99)?)]),
None,
None,
)?;
let res = core.sign_raw_transaction_with_wallet(&tx, None, None)?;
let tx1b = res.transaction()?;

// Send the tx.
let txid_2 = core.send_raw_transaction(&tx1b)?;

// Retrieve the expected unconfirmed txids and spks from the graph.
let exp_spk_txids = graph.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?;
assert_eq!(exp_spk_txids, vec![(txid_1, spk)]);

// Check that mempool emission contains evicted txid.
let mempool_event = emitter.mempool()?;
let unseen_txids: Vec<Txid> = mempool_event
.emitted_txs
.iter()
.map(|(tx, _)| tx.compute_txid())
.collect();
assert!(unseen_txids.contains(&txid_2));

// Update graph with evicted tx.
let exp_txids = exp_spk_txids.into_iter().map(|(txid, _)| txid);
let evicted_txids = mempool_event.evicted_txids(exp_txids);
for txid in evicted_txids {
let _ = graph.insert_evicted_at(txid, seen_at);
}

// tx1 should no longer be canonical.
assert!(graph
.graph()
.list_canonical_txs(&chain, chain_tip)
.next()
.is_none());

Ok(())
}
12 changes: 4 additions & 8 deletions crates/chain/benches/canonicalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,8 @@ pub fn many_conflicting_unconfirmed(c: &mut Criterion) {
}],
..new_tx(i)
};
let update = TxUpdate {
txs: vec![Arc::new(tx)],
..Default::default()
};
let mut update = TxUpdate::default();
update.txs = vec![Arc::new(tx)];
let _ = tx_graph.apply_update_at(update, Some(i as u64));
}
}));
Expand Down Expand Up @@ -169,10 +167,8 @@ pub fn many_chained_unconfirmed(c: &mut Criterion) {
..new_tx(i)
};
let txid = tx.compute_txid();
let update = TxUpdate {
txs: vec![Arc::new(tx)],
..Default::default()
};
let mut update = TxUpdate::default();
update.txs = vec![Arc::new(tx)];
let _ = tx_graph.apply_update_at(update, Some(i as u64));
// Store the next prevout.
previous_output = OutPoint::new(txid, 0);
Expand Down
Loading
Loading