Skip to content

Commit 0530c9b

Browse files
authored
feat: implement parallel filter matching (#303)
1 parent 2621234 commit 0530c9b

File tree

9 files changed

+193
-155
lines changed

9 files changed

+193
-155
lines changed

dash-spv/src/sync/legacy/filters/matching.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,6 @@ use crate::network::NetworkManager;
2020
use crate::storage::StorageManager;
2121

2222
impl<S: StorageManager, N: NetworkManager> super::manager::FilterSyncManager<S, N> {
23-
pub async fn check_filter_for_matches<
24-
W: key_wallet_manager::wallet_interface::WalletInterface,
25-
>(
26-
&self,
27-
filter_data: &[u8],
28-
block_hash: &BlockHash,
29-
wallet: &mut W,
30-
) -> SyncResult<bool> {
31-
// Create the BlockFilter from the raw data
32-
let filter = dashcore::bip158::BlockFilter::new(filter_data);
33-
34-
// Use wallet's check_compact_filter method
35-
let matches = wallet.check_compact_filter(&filter, block_hash).await;
36-
if matches {
37-
tracing::info!("🎯 Filter match found for block {}", block_hash);
38-
Ok(true)
39-
} else {
40-
Ok(false)
41-
}
42-
}
43-
4423
/// Check if filter matches any of the provided scripts using BIP158 GCS filter.
4524
#[allow(dead_code)]
4625
fn filter_matches_scripts(

dash-spv/src/sync/legacy/message_handlers.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
//! Message handlers for synchronization phases.
22
3-
use std::ops::DerefMut;
4-
use std::time::Instant;
5-
3+
use dashcore::bip158::BlockFilter;
64
use dashcore::block::Block;
75
use dashcore::network::message::NetworkMessage;
86
use dashcore::network::message_blockdata::Inventory;
7+
use std::collections::HashMap;
8+
use std::time::Instant;
99

10+
use super::manager::SyncManager;
11+
use super::phases::SyncPhase;
1012
use crate::error::{SyncError, SyncResult};
1113
use crate::network::{Message, NetworkManager};
1214
use crate::storage::StorageManager;
1315
use key_wallet_manager::wallet_interface::WalletInterface;
14-
15-
use super::manager::SyncManager;
16-
use super::phases::SyncPhase;
16+
use key_wallet_manager::wallet_manager::{check_compact_filters_for_addresses, FilterMatchKey};
1717

1818
impl<S: StorageManager, N: NetworkManager, W: WalletInterface> SyncManager<S, N, W> {
1919
/// Handle incoming network messages with phase filtering
@@ -480,8 +480,6 @@ impl<S: StorageManager, N: NetworkManager, W: WalletInterface> SyncManager<S, N,
480480
) -> SyncResult<()> {
481481
tracing::debug!("📨 Received CFilter for block {}", cfilter.block_hash);
482482

483-
let mut wallet = self.wallet.write().await;
484-
485483
// Check filter against wallet if available
486484
// First, verify filter data matches expected filter header chain
487485
let height = storage
@@ -515,20 +513,18 @@ impl<S: StorageManager, N: NetworkManager, W: WalletInterface> SyncManager<S, N,
515513
.await
516514
.map_err(|e| SyncError::Storage(format!("Failed to store filter: {}", e)))?;
517515

518-
let matches = self
519-
.filter_sync
520-
.check_filter_for_matches(&cfilter.filter, &cfilter.block_hash, wallet.deref_mut())
521-
.await?;
522-
523-
drop(wallet);
516+
let key = FilterMatchKey::new(height, cfilter.block_hash);
517+
let input = HashMap::from([(key, BlockFilter::new(&cfilter.filter))]);
518+
let addresses = self.wallet.read().await.monitored_addresses();
519+
let matches = check_compact_filters_for_addresses(&input, addresses);
524520

525521
{
526522
let mut stats_lock = self.stats.write().await;
527523
stats_lock.filters_received += 1;
528524
stats_lock.last_filter_received_time = Some(std::time::Instant::now());
529525
}
530526

531-
if matches {
527+
if !matches.is_empty() {
532528
// Update filter match statistics
533529
{
534530
let mut stats = self.stats.write().await;

key-wallet-manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"], opti
2525
async-trait = "0.1"
2626
bincode = { version = "2.0.1", optional = true }
2727
zeroize = { version = "1.8", features = ["derive"] }
28+
rayon = "1.11"
2829
tokio = { version = "1.32", features = ["full"] }
2930

3031
[dev-dependencies]

key-wallet-manager/src/test_utils/wallet.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1+
use crate::{wallet_interface::WalletInterface, BlockProcessingResult};
2+
use dashcore::{Address, Block, Transaction, Txid};
13
use std::{collections::BTreeMap, sync::Arc};
2-
3-
use dashcore::{Block, Transaction, Txid};
44
use tokio::sync::Mutex;
55

6-
use crate::{wallet_interface::WalletInterface, BlockProcessingResult};
7-
86
// Type alias for transaction effects map
97
type TransactionEffectsMap = Arc<Mutex<BTreeMap<Txid, (i64, Vec<String>)>>>;
108

@@ -57,15 +55,6 @@ impl WalletInterface for MockWallet {
5755
processed.push(tx.txid());
5856
}
5957

60-
async fn check_compact_filter(
61-
&mut self,
62-
_filter: &dashcore::bip158::BlockFilter,
63-
_block_hash: &dashcore::BlockHash,
64-
) -> bool {
65-
// Return true for all filters in test
66-
true
67-
}
68-
6958
async fn describe(&self) -> String {
7059
"MockWallet (test implementation)".to_string()
7160
}
@@ -74,6 +63,10 @@ impl WalletInterface for MockWallet {
7463
let map = self.effects.lock().await;
7564
map.get(&tx.txid()).cloned()
7665
}
66+
67+
fn monitored_addresses(&self) -> Vec<Address> {
68+
Vec::new()
69+
}
7770
}
7871

7972
/// Mock wallet that returns false for filter checks
@@ -94,13 +87,8 @@ impl WalletInterface for NonMatchingMockWallet {
9487

9588
async fn process_mempool_transaction(&mut self, _tx: &Transaction) {}
9689

97-
async fn check_compact_filter(
98-
&mut self,
99-
_filter: &dashcore::bip158::BlockFilter,
100-
_block_hash: &dashcore::BlockHash,
101-
) -> bool {
102-
// Always return false - filter doesn't match
103-
false
90+
fn monitored_addresses(&self) -> Vec<Address> {
91+
Vec::new()
10492
}
10593

10694
async fn describe(&self) -> String {

key-wallet-manager/src/wallet_interface.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
use alloc::string::String;
66
use alloc::vec::Vec;
77
use async_trait::async_trait;
8-
use dashcore::bip158::BlockFilter;
98
use dashcore::prelude::CoreBlockHeight;
109
use dashcore::{Address, Block, Transaction, Txid};
1110

@@ -47,13 +46,8 @@ pub trait WalletInterface: Send + Sync + 'static {
4746
/// Called when a transaction is seen in the mempool
4847
async fn process_mempool_transaction(&mut self, tx: &Transaction);
4948

50-
/// Check if a compact filter matches any watched items
51-
/// Returns true if the block should be downloaded
52-
async fn check_compact_filter(
53-
&mut self,
54-
filter: &BlockFilter,
55-
block_hash: &dashcore::BlockHash,
56-
) -> bool;
49+
/// Get all addresses the wallet is monitoring for incoming transactions
50+
fn monitored_addresses(&self) -> Vec<Address>;
5751

5852
/// Return the wallet's per-transaction net change and involved addresses if known.
5953
/// Returns (net_amount, addresses) where net_amount is received - sent in satoshis.
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use alloc::vec::Vec;
2+
use dashcore::bip158::BlockFilter;
3+
use dashcore::prelude::CoreBlockHeight;
4+
use dashcore::{Address, BlockHash};
5+
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
6+
use std::collections::{BTreeSet, HashMap};
7+
8+
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
9+
pub struct FilterMatchKey {
10+
height: CoreBlockHeight,
11+
hash: BlockHash,
12+
}
13+
14+
impl FilterMatchKey {
15+
pub fn new(height: CoreBlockHeight, hash: BlockHash) -> Self {
16+
Self {
17+
height,
18+
hash,
19+
}
20+
}
21+
pub fn height(&self) -> CoreBlockHeight {
22+
self.height
23+
}
24+
pub fn hash(&self) -> &BlockHash {
25+
&self.hash
26+
}
27+
}
28+
29+
/// Check compact filters for addresses and return the keys that matched.
30+
pub fn check_compact_filters_for_addresses(
31+
input: &HashMap<FilterMatchKey, BlockFilter>,
32+
addresses: Vec<Address>,
33+
) -> BTreeSet<FilterMatchKey> {
34+
let script_pubkey_bytes: Vec<Vec<u8>> =
35+
addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect();
36+
37+
input
38+
.into_par_iter()
39+
.filter_map(|(key, filter)| {
40+
filter
41+
.match_any(key.hash(), script_pubkey_bytes.iter().map(|v| v.as_slice()))
42+
.unwrap_or(false)
43+
.then_some(key.clone())
44+
})
45+
.collect()
46+
}
47+
48+
#[cfg(test)]
49+
mod tests {
50+
use super::*;
51+
use crate::Network;
52+
use dashcore::{Block, Transaction};
53+
54+
#[test]
55+
fn test_empty_input_returns_empty() {
56+
let result = check_compact_filters_for_addresses(&HashMap::new(), vec![]);
57+
assert!(result.is_empty());
58+
}
59+
60+
#[test]
61+
fn test_empty_addresses_returns_empty() {
62+
let address = Address::dummy(Network::Regtest, 1);
63+
let tx = Transaction::dummy(&address, 0..0, &[1]);
64+
let block = Block::dummy(100, vec![tx]);
65+
let filter = BlockFilter::dummy(&block);
66+
let key = FilterMatchKey::new(100, block.block_hash());
67+
68+
let mut input = HashMap::new();
69+
input.insert(key.clone(), filter);
70+
71+
let output = check_compact_filters_for_addresses(&input, vec![]);
72+
assert!(!output.contains(&key));
73+
}
74+
75+
#[test]
76+
fn test_matching_filter() {
77+
let address = Address::dummy(Network::Regtest, 1);
78+
let tx = Transaction::dummy(&address, 0..0, &[1]);
79+
let block = Block::dummy(100, vec![tx]);
80+
let filter = BlockFilter::dummy(&block);
81+
let key = FilterMatchKey::new(100, block.block_hash());
82+
83+
let mut input = HashMap::new();
84+
input.insert(key.clone(), filter);
85+
86+
let output = check_compact_filters_for_addresses(&input, vec![address]);
87+
assert!(output.contains(&key));
88+
}
89+
90+
#[test]
91+
fn test_non_matching_filter() {
92+
let address = Address::dummy(Network::Regtest, 1);
93+
let address_other = Address::dummy(Network::Regtest, 2);
94+
95+
let tx = Transaction::dummy(&address_other, 0..0, &[1]);
96+
let block = Block::dummy(100, vec![tx]);
97+
let filter = BlockFilter::dummy(&block);
98+
let key = FilterMatchKey::new(100, block.block_hash());
99+
100+
let mut input = HashMap::new();
101+
input.insert(key.clone(), filter);
102+
103+
let output = check_compact_filters_for_addresses(&input, vec![address]);
104+
assert!(!output.contains(&key));
105+
}
106+
107+
#[test]
108+
fn test_batch_mixed_results() {
109+
let unrelated_address = Address::dummy(Network::Regtest, 0);
110+
let address_1 = Address::dummy(Network::Regtest, 1);
111+
let address_2 = Address::dummy(Network::Regtest, 2);
112+
113+
let tx_1 = Transaction::dummy(&address_1, 0..0, &[1]);
114+
let block_1 = Block::dummy(100, vec![tx_1]);
115+
let filter_1 = BlockFilter::dummy(&block_1);
116+
let key_1 = FilterMatchKey::new(100, block_1.block_hash());
117+
118+
let tx_2 = Transaction::dummy(&address_2, 0..0, &[2]);
119+
let block_2 = Block::dummy(200, vec![tx_2]);
120+
let filter_2 = BlockFilter::dummy(&block_2);
121+
let key_2 = FilterMatchKey::new(200, block_2.block_hash());
122+
123+
let tx_3 = Transaction::dummy(&unrelated_address, 0..0, &[10]);
124+
let block_3 = Block::dummy(300, vec![tx_3]);
125+
let filter_3 = BlockFilter::dummy(&block_3);
126+
let key_3 = FilterMatchKey::new(300, block_3.block_hash());
127+
128+
let mut input = HashMap::new();
129+
input.insert(key_1.clone(), filter_1);
130+
input.insert(key_2.clone(), filter_2);
131+
input.insert(key_3.clone(), filter_3);
132+
133+
let output = check_compact_filters_for_addresses(&input, vec![address_1, address_2]);
134+
assert_eq!(output.len(), 2);
135+
assert!(output.contains(&key_1));
136+
assert!(output.contains(&key_2));
137+
assert!(!output.contains(&key_3));
138+
}
139+
140+
#[test]
141+
fn test_output_sorted_by_height() {
142+
let address = Address::dummy(Network::Regtest, 1);
143+
144+
// Create blocks at different heights (inserted in non-sorted order)
145+
let heights = [500, 100, 300, 200, 400];
146+
let mut input = HashMap::new();
147+
148+
for (i, &height) in heights.iter().enumerate() {
149+
let tx = Transaction::dummy(&address, 0..0, &[i as u64]);
150+
let block = Block::dummy(height, vec![tx]);
151+
let filter = BlockFilter::dummy(&block);
152+
let key = FilterMatchKey::new(height, block.block_hash());
153+
input.insert(key, filter);
154+
}
155+
156+
let output = check_compact_filters_for_addresses(&input, vec![address]);
157+
158+
// Verify output is sorted by height (ascending)
159+
let heights_out: Vec<CoreBlockHeight> = output.iter().map(|k| k.height()).collect();
160+
let mut sorted_heights = heights_out.clone();
161+
sorted_heights.sort();
162+
163+
assert_eq!(heights_out, sorted_heights);
164+
assert_eq!(heights_out, vec![100, 200, 300, 400, 500]);
165+
}
166+
}

key-wallet-manager/src/wallet_manager/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
//! each of which can have multiple accounts. This follows the architecture
55
//! pattern where a manager oversees multiple distinct wallets.
66
7+
mod matching;
78
mod process_block;
89
mod transaction_building;
910

11+
pub use crate::wallet_manager::matching::{check_compact_filters_for_addresses, FilterMatchKey};
1012
use alloc::collections::BTreeMap;
1113
use alloc::string::String;
1214
use alloc::vec::Vec;

key-wallet-manager/src/wallet_manager/process_block.rs

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@ use alloc::string::String;
44
use alloc::vec::Vec;
55
use async_trait::async_trait;
66
use core::fmt::Write as _;
7-
use dashcore::bip158::BlockFilter;
87
use dashcore::prelude::CoreBlockHeight;
9-
use dashcore::{Block, BlockHash, Transaction};
8+
use dashcore::{Address, Block, Transaction};
109
use key_wallet::transaction_checking::transaction_router::TransactionRouter;
1110
use key_wallet::transaction_checking::TransactionContext;
1211
use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface;
@@ -58,30 +57,8 @@ impl<T: WalletInfoInterface + Send + Sync + 'static> WalletInterface for WalletM
5857
.await;
5958
}
6059

61-
async fn check_compact_filter(&mut self, filter: &BlockFilter, block_hash: &BlockHash) -> bool {
62-
// Collect all scripts we're watching
63-
let mut script_bytes = Vec::new();
64-
65-
// Get all wallet addresses for this network
66-
for info in self.wallet_infos.values() {
67-
let monitored = info.monitored_addresses();
68-
for address in monitored {
69-
script_bytes.push(address.script_pubkey().as_bytes().to_vec());
70-
}
71-
}
72-
73-
// If we don't watch any scripts for this network, there can be no match.
74-
// Note: BlockFilterReader::match_any returns true for an empty query set,
75-
// so we must guard this case explicitly to avoid false positives.
76-
let hit = if script_bytes.is_empty() {
77-
false
78-
} else {
79-
filter
80-
.match_any(block_hash, &mut script_bytes.iter().map(|s| s.as_slice()))
81-
.unwrap_or(false)
82-
};
83-
84-
hit
60+
fn monitored_addresses(&self) -> Vec<Address> {
61+
self.monitored_addresses()
8562
}
8663

8764
async fn transaction_effect(&self, tx: &Transaction) -> Option<(i64, Vec<String>)> {

0 commit comments

Comments
 (0)