Skip to content

Commit cd6de5e

Browse files
authored
refactor: Make flow control syncing default and drop the no flow parts (#211)
1 parent c76ddbb commit cd6de5e

21 files changed

+54
-2323
lines changed

dash-spv/src/client/config.rs

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -78,42 +78,9 @@ pub struct ClientConfig {
7878
/// Maximum concurrent filter requests (default: 8).
7979
pub max_concurrent_filter_requests: usize,
8080

81-
/// Enable flow control for filter requests (default: true).
82-
pub enable_filter_flow_control: bool,
83-
8481
/// Delay between filter requests in milliseconds (default: 50).
8582
pub filter_request_delay_ms: u64,
8683

87-
/// Enable automatic CFHeader gap detection and restart
88-
pub enable_cfheader_gap_restart: bool,
89-
90-
/// Interval for checking CFHeader gaps (seconds)
91-
pub cfheader_gap_check_interval_secs: u64,
92-
93-
/// Cooldown between CFHeader restart attempts (seconds)
94-
pub cfheader_gap_restart_cooldown_secs: u64,
95-
96-
/// Maximum CFHeader gap restart attempts
97-
pub max_cfheader_gap_restart_attempts: u32,
98-
99-
/// Enable automatic filter gap detection and restart
100-
pub enable_filter_gap_restart: bool,
101-
102-
/// Interval for checking filter gaps (seconds)
103-
pub filter_gap_check_interval_secs: u64,
104-
105-
/// Minimum filter gap size to trigger restart (blocks)
106-
pub min_filter_gap_size: u32,
107-
108-
/// Cooldown between filter restart attempts (seconds)
109-
pub filter_gap_restart_cooldown_secs: u64,
110-
111-
/// Maximum filter gap restart attempts
112-
pub max_filter_gap_restart_attempts: u32,
113-
114-
/// Maximum number of filters to sync in a single gap sync batch
115-
pub max_filter_gap_sync_size: u32,
116-
11784
// Mempool configuration
11885
/// Enable tracking of unconfirmed (mempool) transactions.
11986
pub enable_mempool_tracking: bool,
@@ -159,9 +126,6 @@ pub struct ClientConfig {
159126
/// Maximum concurrent CFHeaders requests for parallel sync (default: 50).
160127
pub max_concurrent_cfheaders_requests_parallel: usize,
161128

162-
/// Enable flow control for CFHeaders requests (default: true).
163-
pub enable_cfheaders_flow_control: bool,
164-
165129
/// Timeout for CFHeaders requests in seconds (default: 30).
166130
pub cfheaders_request_timeout_secs: u64,
167131

@@ -210,18 +174,7 @@ impl Default for ClientConfig {
210174
log_level: "info".to_string(),
211175
user_agent: None,
212176
max_concurrent_filter_requests: 16,
213-
enable_filter_flow_control: true,
214177
filter_request_delay_ms: 0,
215-
enable_cfheader_gap_restart: true,
216-
cfheader_gap_check_interval_secs: 15,
217-
cfheader_gap_restart_cooldown_secs: 30,
218-
max_cfheader_gap_restart_attempts: 5,
219-
enable_filter_gap_restart: true,
220-
filter_gap_check_interval_secs: 20,
221-
min_filter_gap_size: 10,
222-
filter_gap_restart_cooldown_secs: 30,
223-
max_filter_gap_restart_attempts: 5,
224-
max_filter_gap_sync_size: 50000,
225178
// Mempool defaults
226179
enable_mempool_tracking: true,
227180
mempool_strategy: MempoolStrategy::FetchAll,
@@ -243,7 +196,6 @@ impl Default for ClientConfig {
243196
wallet_creation_time: None,
244197
// CFHeaders flow control defaults
245198
max_concurrent_cfheaders_requests_parallel: 50,
246-
enable_cfheaders_flow_control: true,
247199
cfheaders_request_timeout_secs: 30,
248200
max_cfheaders_retries: 3,
249201
// QRInfo defaults (simplified per plan)
@@ -341,12 +293,6 @@ impl ClientConfig {
341293
self
342294
}
343295

344-
/// Enable or disable filter flow control.
345-
pub fn with_filter_flow_control(mut self, enabled: bool) -> Self {
346-
self.enable_filter_flow_control = enabled;
347-
self
348-
}
349-
350296
/// Set delay between filter requests.
351297
pub fn with_filter_request_delay(mut self, delay_ms: u64) -> Self {
352298
self.filter_request_delay_ms = delay_ms;

dash-spv/src/client/config_test.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ mod tests {
2828
assert!(config.enable_persistence);
2929
assert_eq!(config.log_level, "info");
3030
assert_eq!(config.max_concurrent_filter_requests, 16);
31-
assert!(config.enable_filter_flow_control);
3231
assert_eq!(config.filter_request_delay_ms, 0);
3332

3433
// Mempool defaults
@@ -66,7 +65,6 @@ mod tests {
6665
.with_connection_timeout(Duration::from_secs(10))
6766
.with_log_level("debug")
6867
.with_max_concurrent_filter_requests(32)
69-
.with_filter_flow_control(false)
7068
.with_filter_request_delay(100)
7169
.with_mempool_tracking(MempoolStrategy::BloomFilter)
7270
.with_max_mempool_transactions(500)
@@ -80,7 +78,6 @@ mod tests {
8078
assert_eq!(config.connection_timeout, Duration::from_secs(10));
8179
assert_eq!(config.log_level, "debug");
8280
assert_eq!(config.max_concurrent_filter_requests, 32);
83-
assert!(!config.enable_filter_flow_control);
8481
assert_eq!(config.filter_request_delay_ms, 100);
8582

8683
// Mempool settings
@@ -196,28 +193,6 @@ mod tests {
196193

197194
// Removed selective strategy validation test; Selective variant no longer exists
198195

199-
#[test]
200-
fn test_cfheader_gap_settings() {
201-
let config = ClientConfig::default();
202-
203-
assert!(config.enable_cfheader_gap_restart);
204-
assert_eq!(config.cfheader_gap_check_interval_secs, 15);
205-
assert_eq!(config.cfheader_gap_restart_cooldown_secs, 30);
206-
assert_eq!(config.max_cfheader_gap_restart_attempts, 5);
207-
}
208-
209-
#[test]
210-
fn test_filter_gap_settings() {
211-
let config = ClientConfig::default();
212-
213-
assert!(config.enable_filter_gap_restart);
214-
assert_eq!(config.filter_gap_check_interval_secs, 20);
215-
assert_eq!(config.min_filter_gap_size, 10);
216-
assert_eq!(config.filter_gap_restart_cooldown_secs, 30);
217-
assert_eq!(config.max_filter_gap_restart_attempts, 5);
218-
assert_eq!(config.max_filter_gap_sync_size, 50000);
219-
}
220-
221196
#[test]
222197
fn test_request_control_defaults() {
223198
let config = ClientConfig::default();

dash-spv/src/client/filter_sync.rs

Lines changed: 0 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -1,171 +0,0 @@
1-
//! Filter synchronization and management for the Dash SPV client.
2-
3-
use crate::error::{Result, SpvError};
4-
use crate::network::NetworkManager;
5-
use crate::storage::StorageManager;
6-
use crate::sync::SyncManager;
7-
use crate::types::FilterMatch;
8-
use crate::types::SpvStats;
9-
use key_wallet_manager::wallet_interface::WalletInterface;
10-
use std::sync::Arc;
11-
use tokio::sync::RwLock;
12-
13-
/// Filter synchronization manager for coordinating filter downloads and checking.
14-
pub struct FilterSyncCoordinator<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> {
15-
sync_manager: &'a mut SyncManager<S, N, W>,
16-
storage: &'a mut S,
17-
network: &'a mut N,
18-
stats: &'a Arc<RwLock<SpvStats>>,
19-
running: &'a Arc<RwLock<bool>>,
20-
}
21-
22-
impl<
23-
'a,
24-
S: StorageManager + Send + Sync + 'static,
25-
N: NetworkManager + Send + Sync + 'static,
26-
W: WalletInterface,
27-
> FilterSyncCoordinator<'a, S, N, W>
28-
{
29-
/// Create a new filter sync coordinator.
30-
pub fn new(
31-
sync_manager: &'a mut SyncManager<S, N, W>,
32-
storage: &'a mut S,
33-
network: &'a mut N,
34-
stats: &'a Arc<RwLock<SpvStats>>,
35-
running: &'a Arc<RwLock<bool>>,
36-
) -> Self {
37-
Self {
38-
sync_manager,
39-
storage,
40-
network,
41-
stats,
42-
running,
43-
}
44-
}
45-
46-
/// Sync compact filters for recent blocks and check for matches.
47-
/// Sync and check filters with internal monitoring loop management.
48-
/// This method automatically handles the monitoring loop required for CFilter message processing.
49-
pub async fn sync_and_check_filters_with_monitoring(
50-
&mut self,
51-
num_blocks: Option<u32>,
52-
) -> Result<Vec<FilterMatch>> {
53-
// Just delegate to the regular method for now - the real fix is in sync_filters_coordinated
54-
self.sync_and_check_filters(num_blocks).await
55-
}
56-
57-
pub async fn sync_and_check_filters(
58-
&mut self,
59-
num_blocks: Option<u32>,
60-
) -> Result<Vec<FilterMatch>> {
61-
let running = self.running.read().await;
62-
if !*running {
63-
return Err(SpvError::Config("Client not running".to_string()));
64-
}
65-
drop(running);
66-
67-
// Get current filter tip height to determine range (use filter headers, not block headers)
68-
// This ensures consistency between range calculation and progress tracking
69-
let tip_height =
70-
self.storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0);
71-
72-
// Determine how many blocks to request
73-
let num_blocks = num_blocks.unwrap_or(100).max(1);
74-
let default_start = tip_height.saturating_sub(num_blocks - 1);
75-
76-
// Ask the wallet for an earliest rescan height, falling back to the default window.
77-
let wallet_hint = self.sync_manager.wallet_birth_height_hint().await;
78-
let mut start_height = wallet_hint.unwrap_or(default_start).min(default_start);
79-
80-
// Respect any user-provided start height hint from the configuration.
81-
if let Some(config_start) = self.sync_manager.config_start_height() {
82-
let capped = config_start.min(tip_height);
83-
start_height = start_height.max(capped);
84-
}
85-
86-
// Make sure we never request past the current tip
87-
start_height = start_height.min(tip_height);
88-
89-
let actual_count = if start_height <= tip_height {
90-
tip_height - start_height + 1
91-
} else {
92-
0
93-
};
94-
95-
tracing::info!(
96-
"Requesting filters from height {} to {} ({} blocks based on filter tip height)",
97-
start_height,
98-
tip_height,
99-
actual_count
100-
);
101-
if let Some(hint) = wallet_hint {
102-
tracing::debug!("Wallet hint for earliest required height: {}", hint);
103-
}
104-
tracing::info!("Filter processing and matching will happen automatically in background thread as CFilter messages arrive");
105-
106-
// Send filter requests - processing will happen automatically in the background
107-
if actual_count > 0 {
108-
self.sync_filters_coordinated(start_height, actual_count).await?;
109-
} else {
110-
tracing::debug!("No filters requested because calculated range is empty");
111-
}
112-
113-
// Return empty vector since matching happens asynchronously in the filter processor thread
114-
// Actual matches will be processed and blocks requested automatically when CFilter messages arrive
115-
Ok(Vec::new())
116-
}
117-
118-
/// Sync filters for a specific height range.
119-
pub async fn sync_filters_range(
120-
&mut self,
121-
start_height: Option<u32>,
122-
count: Option<u32>,
123-
) -> Result<()> {
124-
// Get filter tip height to determine default values
125-
let filter_tip_height =
126-
self.storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0);
127-
128-
let start = start_height.unwrap_or(filter_tip_height.saturating_sub(99));
129-
let num_blocks = count.unwrap_or(100);
130-
131-
tracing::info!(
132-
"Starting filter sync for specific range from height {} ({} blocks)",
133-
start,
134-
num_blocks
135-
);
136-
137-
self.sync_filters_coordinated(start, num_blocks).await
138-
}
139-
140-
/// Sync filters in coordination with the monitoring loop using flow control processing
141-
async fn sync_filters_coordinated(&mut self, start_height: u32, count: u32) -> Result<()> {
142-
tracing::info!("Starting coordinated filter sync with flow control from height {} to {} ({} filters expected)",
143-
start_height, start_height + count - 1, count);
144-
145-
// Start tracking filter sync progress
146-
crate::sync::filters::FilterSyncManager::<S, N>::start_filter_sync_tracking(
147-
self.stats,
148-
count as u64,
149-
)
150-
.await;
151-
152-
// Use the new flow control method
153-
self.sync_manager
154-
.filter_sync_mut()
155-
.sync_filters_with_flow_control(
156-
&mut *self.network,
157-
&mut *self.storage,
158-
Some(start_height),
159-
Some(count),
160-
)
161-
.await
162-
.map_err(SpvError::Sync)?;
163-
164-
let (pending_count, active_count, flow_enabled) =
165-
self.sync_manager.filter_sync().get_flow_control_status();
166-
tracing::info!("✅ Filter sync with flow control initiated (flow control enabled: {}, {} requests queued, {} active)",
167-
flow_enabled, pending_count, active_count);
168-
169-
Ok(())
170-
}
171-
}

dash-spv/src/client/message_handler.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::mempool_filter::MempoolFilter;
66
use crate::network::NetworkManager;
77
use crate::storage::StorageManager;
88
use crate::sync::SyncManager;
9-
use crate::types::{MempoolState, SpvEvent, SpvStats};
9+
use crate::types::{MempoolState, SpvEvent};
1010
// Removed local ad-hoc compact filter construction in favor of always processing full blocks
1111
use key_wallet_manager::wallet_interface::WalletInterface;
1212
use std::sync::Arc;
@@ -18,7 +18,6 @@ pub struct MessageHandler<'a, S: StorageManager, N: NetworkManager, W: WalletInt
1818
storage: &'a mut S,
1919
network: &'a mut N,
2020
config: &'a ClientConfig,
21-
stats: &'a Arc<RwLock<SpvStats>>,
2221
block_processor_tx: &'a tokio::sync::mpsc::UnboundedSender<crate::client::BlockProcessingTask>,
2322
mempool_filter: &'a Option<Arc<MempoolFilter>>,
2423
mempool_state: &'a Arc<RwLock<MempoolState>>,
@@ -39,7 +38,6 @@ impl<
3938
storage: &'a mut S,
4039
network: &'a mut N,
4140
config: &'a ClientConfig,
42-
stats: &'a Arc<RwLock<SpvStats>>,
4341
block_processor_tx: &'a tokio::sync::mpsc::UnboundedSender<
4442
crate::client::BlockProcessingTask,
4543
>,
@@ -52,7 +50,6 @@ impl<
5250
storage,
5351
network,
5452
config,
55-
stats,
5653
block_processor_tx,
5754
mempool_filter,
5855
mempool_state,
@@ -338,17 +335,6 @@ impl<
338335
}
339336
NetworkMessage::CFilter(cfilter) => {
340337
tracing::debug!("Received CFilter for block {}", cfilter.block_hash);
341-
342-
// Record the height of this received filter for gap tracking
343-
crate::sync::filters::FilterSyncManager::<S, N>::record_filter_received_at_height(
344-
self.stats,
345-
&*self.storage,
346-
&cfilter.block_hash,
347-
)
348-
.await;
349-
350-
// Sequential sync manager handles the filter internally
351-
// For sequential sync, filter checking is done within the sync manager
352338
}
353339
NetworkMessage::SendDsq(wants_dsq) => {
354340
tracing::info!("Received SendDsq message - peer wants DSQ messages: {}", wants_dsq);

dash-spv/src/client/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
//!
1818
//! - `block_processor.rs` (649 lines) - Block processing and validation
1919
//! - `config.rs` (484 lines) - Client configuration
20-
//! - `filter_sync.rs` (171 lines) - Filter synchronization
2120
//! - `message_handler.rs` (585 lines) - Network message handling
2221
//! - `status_display.rs` (242 lines) - Status display formatting
2322
//!
@@ -35,7 +34,6 @@
3534
// Existing extracted modules
3635
pub mod block_processor;
3736
pub mod config;
38-
pub mod filter_sync;
3937
pub mod interface;
4038
pub mod message_handler;
4139
pub mod status_display;
@@ -54,7 +52,6 @@ mod transactions;
5452
// Re-export public types from extracted modules
5553
pub use block_processor::{BlockProcessingTask, BlockProcessor};
5654
pub use config::ClientConfig;
57-
pub use filter_sync::FilterSyncCoordinator;
5855
pub use message_handler::MessageHandler;
5956
pub use status_display::StatusDisplay;
6057

0 commit comments

Comments
 (0)