Skip to content

Commit 8e4b37d

Browse files
Merge branch 'v0.40-dev' into fix/syncProgress
2 parents 3e7ea3e + fdc4411 commit 8e4b37d

File tree

15 files changed

+518
-445
lines changed

15 files changed

+518
-445
lines changed

.github/workflows/rust.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ jobs:
329329

330330
integrations_tests:
331331
name: Integration Tests
332+
if: ${{ false }} # Temporarily disabled
332333
runs-on: ubuntu-latest
333334
strategy:
334335
matrix:
@@ -356,4 +357,3 @@ jobs:
356357
uses: reviewdog/action-actionlint@v1
357358
with:
358359
fail_on_error: true
359-

dash-network-ffi/src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! FFI bindings for dash-network library
22
3+
use std::fmt;
4+
35
use dash_network::Network as DashNetwork;
46

57
// Initialize function
@@ -70,10 +72,6 @@ impl NetworkInfo {
7072
self.network.magic()
7173
}
7274

73-
pub fn to_string(&self) -> String {
74-
self.network.to_string()
75-
}
76-
7775
pub fn is_core_v20_active(&self, block_height: u32) -> bool {
7876
self.network.core_v20_is_active_at(block_height)
7977
}
@@ -83,6 +81,12 @@ impl NetworkInfo {
8381
}
8482
}
8583

84+
impl fmt::Display for NetworkInfo {
85+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86+
self.network.fmt(f)
87+
}
88+
}
89+
8690
#[cfg(test)]
8791
mod tests {
8892
use super::*;

dash-spv-ffi/src/bin/ffi_cli.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,6 @@ fn main() {
215215
break;
216216
}
217217
dash_spv_ffi_sync_progress_destroy(prog_ptr);
218-
} else {
219-
// If progress is unavailable, assume sync finished or errored
220-
break;
221218
}
222219
thread::sleep(Duration::from_millis(300));
223220
}

dash-spv/src/client/message_handler.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::network::NetworkManager;
77
use crate::storage::StorageManager;
88
use crate::sync::sequential::SequentialSyncManager;
99
use crate::types::{MempoolState, SpvEvent, SpvStats};
10+
// Removed local ad-hoc compact filter construction in favor of always processing full blocks
1011
use key_wallet_manager::wallet_interface::WalletInterface;
1112
use std::sync::Arc;
1213
use tokio::sync::RwLock;
@@ -238,7 +239,23 @@ impl<
238239
block.txdata.len()
239240
);
240241

241-
// Process new block (update state, check watched items)
242+
// 1) Ensure header processing and chain tip update for this block
243+
// Route the header through the sequential sync manager as a Headers message
244+
let headers_msg = NetworkMessage::Headers(vec![block.header]);
245+
if let Err(e) = self
246+
.sync_manager
247+
.handle_message(headers_msg, &mut *self.network, &mut *self.storage)
248+
.await
249+
{
250+
tracing::error!(
251+
"❌ Failed to process header for block {} via sync manager: {}",
252+
block_hash,
253+
e
254+
);
255+
return Err(SpvError::Sync(e));
256+
}
257+
258+
// 2) Always process the full block (privacy and correctness)
242259
if let Err(e) = self.process_new_block(block).await {
243260
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
244261
return Err(e);
@@ -434,31 +451,16 @@ impl<
434451
self.network.send_message(getdata).await.map_err(SpvError::Network)?;
435452
}
436453

437-
// Process new blocks immediately when detected
454+
// For blocks announced via inventory during tip sync, request full blocks for privacy
438455
if !blocks_to_request.is_empty() {
439456
tracing::info!(
440-
"🔄 Processing {} new block announcements to stay synchronized",
457+
"📥 Requesting {} new blocks announced via inventory",
441458
blocks_to_request.len()
442459
);
443460

444-
// Extract block hashes
445-
let block_hashes: Vec<dashcore::BlockHash> = blocks_to_request
446-
.iter()
447-
.filter_map(|inv| {
448-
if let Inventory::Block(hash) = inv {
449-
Some(*hash)
450-
} else {
451-
None
452-
}
453-
})
454-
.collect();
455-
456-
// Process each new block
457-
for block_hash in block_hashes {
458-
tracing::info!("📥 Requesting header for new block {}", block_hash);
459-
if let Err(e) = self.process_new_block_hash(block_hash).await {
460-
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
461-
}
461+
let getdata = NetworkMessage::GetData(blocks_to_request);
462+
if let Err(e) = self.network.send_message(getdata).await {
463+
tracing::error!("Failed to request announced blocks: {}", e);
462464
}
463465
}
464466

dash-spv/src/client/mod.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -935,14 +935,12 @@ impl<
935935

936936
// Emit detailed progress update
937937
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
938-
// Storage tip is the headers vector index (0-based).
939-
let current_storage_tip = {
938+
// Storage tip now represents the absolute blockchain height.
939+
let current_tip_height = {
940940
let storage = self.storage.lock().await;
941941
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
942942
};
943-
// Convert to absolute blockchain height: base + storage_tip
944-
let sync_base_height = { self.state.read().await.sync_base_height };
945-
let current_height = sync_base_height + current_storage_tip;
943+
let current_height = current_tip_height;
946944
let peer_best = self
947945
.network
948946
.get_peer_best_height()
@@ -952,9 +950,9 @@ impl<
952950
.unwrap_or(current_height);
953951

954952
// Calculate headers downloaded this second
955-
if current_storage_tip > last_height {
956-
headers_this_second = current_storage_tip - last_height;
957-
last_height = current_storage_tip;
953+
if current_tip_height > last_height {
954+
headers_this_second = current_tip_height - last_height;
955+
last_height = current_tip_height;
958956
}
959957

960958
let headers_per_second = headers_this_second as f64;
@@ -1011,18 +1009,13 @@ impl<
10111009
last_rate_calc = Instant::now();
10121010
}
10131011

1014-
// Emit a detailed progress snapshot when filter/header heights change
1015-
let (_sync_base_height, abs_header_height, filter_header_height) = {
1016-
let (storage_tip, filter_tip) = {
1017-
let storage = self.storage.lock().await;
1018-
let storage_tip =
1019-
storage.get_tip_height().await.ok().flatten().unwrap_or(0);
1020-
let filter_tip =
1021-
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
1022-
(storage_tip, filter_tip)
1023-
};
1024-
let base = { self.state.read().await.sync_base_height };
1025-
(base, base + storage_tip, filter_tip)
1012+
// Emit filter headers progress only when heights change
1013+
let (abs_header_height, filter_header_height) = {
1014+
let storage = self.storage.lock().await;
1015+
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
1016+
let filter_tip =
1017+
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
1018+
(storage_tip, filter_tip)
10261019
};
10271020

10281021
{
@@ -1879,8 +1872,13 @@ impl<
18791872
let mut loaded_count = 0u32;
18801873
let target_height = saved_state.chain_tip.height;
18811874

1882-
// Start from height 1 (genesis is already in ChainState)
1883-
let mut current_height = 1u32;
1875+
// Determine first height to load. Skip genesis (already present) unless we started from a checkpoint base.
1876+
let mut current_height =
1877+
if saved_state.synced_from_checkpoint && saved_state.sync_base_height > 0 {
1878+
saved_state.sync_base_height
1879+
} else {
1880+
1u32
1881+
};
18841882

18851883
while current_height <= target_height {
18861884
let end_height = (current_height + BATCH_SIZE - 1).min(target_height);
@@ -1895,12 +1893,12 @@ impl<
18951893
};
18961894

18971895
if headers.is_empty() {
1898-
tracing::error!(
1899-
"Failed to load headers for range {}..{} - storage may be corrupted",
1896+
tracing::warn!(
1897+
"No headers found for range {}..{} when restoring from state",
19001898
current_height,
19011899
end_height + 1
19021900
);
1903-
return Ok(false);
1901+
break;
19041902
}
19051903

19061904
// Validate headers before adding to chain state

dash-spv/src/client/status_display.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
4848
// For checkpoint sync: height = checkpoint_height + storage_count
4949
let storage = self.storage.lock().await;
5050
if let Ok(Some(storage_tip)) = storage.get_tip_height().await {
51-
let blockchain_height = state.sync_base_height + storage_tip;
51+
let blockchain_height = storage_tip;
5252
if with_logging {
5353
tracing::debug!(
54-
"Status display: storage_tip={}, sync_base={}, blockchain_height={}",
55-
storage_tip,
54+
"Status display: reported tip height={}, sync_base={}, raw_storage_tip={}",
55+
blockchain_height,
5656
state.sync_base_height,
57-
blockchain_height
57+
storage_tip
5858
);
5959
}
6060
blockchain_height

dash-spv/src/network/multi_peer.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::collections::{HashMap, HashSet};
44
use std::net::SocketAddr;
55
use std::path::PathBuf;
6-
use std::sync::atomic::{AtomicBool, Ordering};
6+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
77
use std::sync::Arc;
88
use std::time::{Duration, SystemTime};
99
use tokio::sync::{mpsc, Mutex};
@@ -70,6 +70,8 @@ pub struct MultiPeerNetworkManager {
7070
user_agent: Option<String>,
7171
/// Exclusive mode: restrict to configured peers only (no DNS or peer store)
7272
exclusive_mode: bool,
73+
/// Cached count of currently connected peers for fast, non-blocking queries
74+
connected_peer_count: Arc<AtomicUsize>,
7375
}
7476

7577
impl MultiPeerNetworkManager {
@@ -123,6 +125,7 @@ impl MultiPeerNetworkManager {
123125
peers_sent_headers2: Arc::new(Mutex::new(HashSet::new())),
124126
user_agent: config.user_agent.clone(),
125127
exclusive_mode,
128+
connected_peer_count: Arc::new(AtomicUsize::new(0)),
126129
})
127130
}
128131

@@ -209,6 +212,7 @@ impl MultiPeerNetworkManager {
209212
let mempool_strategy = self.mempool_strategy;
210213
let read_timeout = self.read_timeout;
211214
let user_agent = self.user_agent.clone();
215+
let connected_peer_count = self.connected_peer_count.clone();
212216

213217
// Spawn connection task
214218
let mut tasks = self.tasks.lock().await;
@@ -235,6 +239,9 @@ impl MultiPeerNetworkManager {
235239
return;
236240
}
237241

242+
// Increment connected peer counter on successful add
243+
connected_peer_count.fetch_add(1, Ordering::Relaxed);
244+
238245
// Add to known addresses
239246
addrv2_handler.add_known_address(addr, ServiceFlags::from(1)).await;
240247

@@ -246,6 +253,7 @@ impl MultiPeerNetworkManager {
246253
addrv2_handler,
247254
shutdown,
248255
reputation_manager.clone(),
256+
connected_peer_count.clone(),
249257
)
250258
.await;
251259
}
@@ -287,6 +295,7 @@ impl MultiPeerNetworkManager {
287295
addrv2_handler: Arc<AddrV2Handler>,
288296
shutdown: Arc<AtomicBool>,
289297
reputation_manager: Arc<PeerReputationManager>,
298+
connected_peer_count: Arc<AtomicUsize>,
290299
) {
291300
tokio::spawn(async move {
292301
log::debug!("Starting peer reader loop for {}", addr);
@@ -551,7 +560,11 @@ impl MultiPeerNetworkManager {
551560

552561
// Remove from pool
553562
log::warn!("Disconnecting from {} (peer reader loop ended)", addr);
554-
pool.remove_connection(&addr).await;
563+
let removed = pool.remove_connection(&addr).await;
564+
if removed.is_some() {
565+
// Decrement connected peer counter when a connection is removed
566+
connected_peer_count.fetch_sub(1, Ordering::Relaxed);
567+
}
555568

556569
// Give small positive reputation if peer maintained long connection
557570
let conn_duration = Duration::from_secs(60 * loop_iteration); // Rough estimate
@@ -576,6 +589,7 @@ impl MultiPeerNetworkManager {
576589
let peer_search_started = self.peer_search_started.clone();
577590
let initial_peers = self.initial_peers.clone();
578591
let data_dir = self.data_dir.clone();
592+
let connected_peer_count = self.connected_peer_count.clone();
579593

580594
// Check if we're in exclusive mode (explicit flag or peers configured)
581595
let exclusive_mode = self.exclusive_mode;
@@ -597,6 +611,8 @@ impl MultiPeerNetworkManager {
597611

598612
let count = pool.connection_count().await;
599613
log::debug!("Connected peers: {}", count);
614+
// Keep the cached counter in sync with actual pool count
615+
connected_peer_count.store(count, Ordering::Relaxed);
600616
if exclusive_mode {
601617
// In exclusive mode, only reconnect to originally specified peers
602618
for addr in initial_peers.iter() {
@@ -987,6 +1003,7 @@ impl Clone for MultiPeerNetworkManager {
9871003
peers_sent_headers2: self.peers_sent_headers2.clone(),
9881004
user_agent: self.user_agent.clone(),
9891005
exclusive_mode: self.exclusive_mode,
1006+
connected_peer_count: self.connected_peer_count.clone(),
9901007
}
9911008
}
9921009
}
@@ -1068,19 +1085,13 @@ impl NetworkManager for MultiPeerNetworkManager {
10681085
}
10691086

10701087
fn is_connected(&self) -> bool {
1071-
// We're "connected" if we have at least one peer
1072-
let pool = self.pool.clone();
1073-
let count = tokio::task::block_in_place(move || {
1074-
tokio::runtime::Handle::current().block_on(pool.connection_count())
1075-
});
1076-
count > 0
1088+
// Use cached counter to avoid blocking in async context
1089+
self.connected_peer_count.load(Ordering::Relaxed) > 0
10771090
}
10781091

10791092
fn peer_count(&self) -> usize {
1080-
let pool = self.pool.clone();
1081-
tokio::task::block_in_place(move || {
1082-
tokio::runtime::Handle::current().block_on(pool.connection_count())
1083-
})
1093+
// Use cached counter to avoid blocking in async context
1094+
self.connected_peer_count.load(Ordering::Relaxed)
10841095
}
10851096

10861097
fn peer_info(&self) -> Vec<PeerInfo> {

0 commit comments

Comments
 (0)