Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dash-spv/src/chain/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl ChainTipManager {
Err(e) => {
// Restore the old tip if adding the new one failed
if let Some(tip) = old_tip {
self.tips.insert(tip_hash.clone(), tip);
self.tips.insert(*tip_hash, tip);
}
Err(e)
}
Expand Down
95 changes: 48 additions & 47 deletions dash-spv/src/chain/chainlock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,59 +189,60 @@ impl ChainLockManager {
}

// Full validation with masternode engine if available
let engine_guard = self
.masternode_engine
.read()
.map_err(|_| ValidationError::InvalidChainLock("Lock poisoned".to_string()))?;

let mut validated = false;
{
let engine_guard = self
.masternode_engine
.read()
.map_err(|_| ValidationError::InvalidChainLock("Lock poisoned".to_string()))?;

if let Some(engine) = engine_guard.as_ref() {
// Use the masternode engine's verify_chain_lock method
match engine.verify_chain_lock(&chain_lock) {
Ok(()) => {
info!(
"✅ ChainLock validated with masternode engine for height {}",
chain_lock.block_height
);
validated = true;
}
Err(e) => {
// Check if the error is due to missing masternode lists
let error_string = e.to_string();
if error_string.contains("No masternode lists in engine") {
// ChainLock validation requires masternode list at (block_height - CHAINLOCK_VALIDATION_MASTERNODE_OFFSET)
let required_height = chain_lock
.block_height
.saturating_sub(CHAINLOCK_VALIDATION_MASTERNODE_OFFSET);
warn!("⚠️ Masternode engine exists but lacks required masternode lists for height {} (needs list at height {} for ChainLock validation), queueing ChainLock for later validation",
chain_lock.block_height, required_height);
drop(engine_guard); // Release the read lock before acquiring write lock
self.queue_pending_chainlock(chain_lock.clone()).map_err(|e| {
ValidationError::InvalidChainLock(format!(
"Failed to queue pending ChainLock: {}",
if let Some(engine) = engine_guard.as_ref() {
// Use the masternode engine's verify_chain_lock method
match engine.verify_chain_lock(&chain_lock) {
Ok(()) => {
info!(
"✅ ChainLock validated with masternode engine for height {}",
chain_lock.block_height
);
validated = true;
}
Err(e) => {
// Check if the error is due to missing masternode lists
let error_string = e.to_string();
if error_string.contains("No masternode lists in engine") {
// ChainLock validation requires masternode list at (block_height - CHAINLOCK_VALIDATION_MASTERNODE_OFFSET)
let required_height = chain_lock
.block_height
.saturating_sub(CHAINLOCK_VALIDATION_MASTERNODE_OFFSET);
warn!("⚠️ Masternode engine exists but lacks required masternode lists for height {} (needs list at height {} for ChainLock validation), queueing ChainLock for later validation",
chain_lock.block_height, required_height);
self.queue_pending_chainlock(chain_lock.clone()).map_err(|e| {
ValidationError::InvalidChainLock(format!(
"Failed to queue pending ChainLock: {}",
e
))
})?;
} else {
return Err(ValidationError::InvalidChainLock(format!(
"MasternodeListEngine validation failed: {:?}",
e
))
})?;
} else {
return Err(ValidationError::InvalidChainLock(format!(
"MasternodeListEngine validation failed: {:?}",
e
)));
)));
}
}
}
} else {
// Queue for later validation when engine becomes available
warn!(
"⚠️ Masternode engine not available, queueing ChainLock for later validation"
);
self.queue_pending_chainlock(chain_lock.clone()).map_err(|e| {
ValidationError::InvalidChainLock(format!(
"Failed to queue pending ChainLock: {}",
e
))
})?;
}
} else {
// Queue for later validation when engine becomes available
warn!("⚠️ Masternode engine not available, queueing ChainLock for later validation");
drop(engine_guard); // Release the read lock before acquiring write lock
self.queue_pending_chainlock(chain_lock.clone()).map_err(|e| {
ValidationError::InvalidChainLock(format!(
"Failed to queue pending ChainLock: {}",
e
))
})?;
}
} // engine_guard dropped before any await

// Store the chain lock with appropriate validation status
self.store_chain_lock_with_validation(chain_lock.clone(), storage, validated).await?;
Expand Down
14 changes: 2 additions & 12 deletions dash-spv/src/chain/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Checkpoint {
}

/// Checkpoint override settings
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct CheckpointOverride {
/// Override checkpoint height for sync chain
pub sync_override_height: Option<u32>,
Expand All @@ -70,16 +70,6 @@ pub struct CheckpointOverride {
pub sync_from_genesis: bool,
}

impl Default for CheckpointOverride {
fn default() -> Self {
Self {
sync_override_height: None,
terminal_override_height: None,
sync_from_genesis: false,
}
}
}

/// Manages checkpoints for a specific network
pub struct CheckpointManager {
/// Checkpoints indexed by height
Expand Down Expand Up @@ -147,7 +137,7 @@ impl CheckpointManager {

/// Check if we're past the last checkpoint
pub fn is_past_last_checkpoint(&self, height: u32) -> bool {
self.sorted_heights.last().map_or(true, |&last| height > last)
self.sorted_heights.last().is_none_or(|&last| height > last)
}

/// Get the last checkpoint before a given timestamp
Expand Down
11 changes: 6 additions & 5 deletions dash-spv/src/chain/fork_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ impl ForkDetector {
// Check if this connects to the main chain (creates new fork)
if let Ok(Some(height)) = storage.get_header_height(&prev_hash) {
// Check if this would create a fork from before our checkpoint
if chain_state.synced_from_checkpoint && chain_state.sync_base_height > 0 {
if height < chain_state.sync_base_height {
tracing::warn!(
if chain_state.synced_from_checkpoint
&& chain_state.sync_base_height > 0
&& height < chain_state.sync_base_height
{
tracing::warn!(
"Rejecting header that would create fork from height {} (before checkpoint base {}). \
This likely indicates headers from genesis were received during checkpoint sync.",
height, chain_state.sync_base_height
);
return ForkDetectionResult::Orphan;
}
return ForkDetectionResult::Orphan;
}

// Found connection point - this creates a new fork
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/chain/orphan_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl OrphanPool {
if let Some(orphan) = self.orphans_by_hash.get_mut(&o.header.block_hash()) {
orphan.process_attempts += 1;
}
o.header.clone()
o.header
})
.collect()
})
Expand Down
8 changes: 4 additions & 4 deletions dash-spv/src/client/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use key_wallet_manager::wallet_interface::WalletInterface;
#[derive(Debug)]
pub enum BlockProcessingTask {
ProcessBlock {
block: dashcore::Block,
block: Box<dashcore::Block>,
response_tx: oneshot::Sender<Result<()>>,
},
ProcessTransaction {
tx: dashcore::Transaction,
tx: Box<dashcore::Transaction>,
response_tx: oneshot::Sender<Result<()>>,
},
ProcessCompactFilter {
Expand Down Expand Up @@ -126,7 +126,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
}

// Process block and handle errors
let result = self.process_block_internal(block).await;
let result = self.process_block_internal(*block).await;

match &result {
Ok(()) => {
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
response_tx,
} => {
let txid = tx.txid();
let result = self.process_transaction_internal(tx).await;
let result = self.process_transaction_internal(*tx).await;

if let Err(e) = &result {
tracing::error!("❌ TRANSACTION PROCESSING FAILED for tx {}: {}", txid, e);
Expand Down
6 changes: 3 additions & 3 deletions dash-spv/src/client/block_processor_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ mod tests {
let (response_tx, _response_rx) = oneshot::channel();
task_tx
.send(BlockProcessingTask::ProcessBlock {
block: block.clone(),
block: Box::new(block.clone()),
response_tx,
})
.unwrap();
Expand Down Expand Up @@ -300,7 +300,7 @@ mod tests {
let (response_tx, _response_rx) = oneshot::channel();
task_tx
.send(BlockProcessingTask::ProcessTransaction {
tx: tx.clone(),
tx: Box::new(tx.clone()),
response_tx,
})
.unwrap();
Expand Down Expand Up @@ -354,7 +354,7 @@ mod tests {
let (response_tx, _response_rx) = oneshot::channel();
task_tx
.send(BlockProcessingTask::ProcessBlock {
block: block.clone(),
block: Box::new(block.clone()),
response_tx,
})
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions dash-spv/src/client/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl<
// Emit event
let event = SpvEvent::MempoolTransactionAdded {
txid,
transaction: tx,
transaction: Box::new(tx),
amount,
addresses,
is_instant_send,
Expand Down Expand Up @@ -495,7 +495,7 @@ impl<
// Send block to the background processor without waiting for completion
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
let task = crate::client::BlockProcessingTask::ProcessBlock {
block,
block: Box::new(block),
response_tx,
};

Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ impl<
// Send block to the background processor without waiting for completion
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
let task = BlockProcessingTask::ProcessBlock {
block,
block: Box::new(block),
response_tx,
};

Expand Down
9 changes: 3 additions & 6 deletions dash-spv/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,9 @@ impl TcpConnection {
drop(state);

// Handle disconnection if needed
match &result {
Err(NetworkError::PeerDisconnected) => {
self.state = None;
self.connected_at = None;
}
_ => {}
if let Err(NetworkError::PeerDisconnected) = &result {
self.state = None;
self.connected_at = None;
}

result
Expand Down
6 changes: 6 additions & 0 deletions dash-spv/src/network/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub struct MessageHandler {
stats: MessageStats,
}

impl Default for MessageHandler {
fn default() -> Self {
Self::new()
}
}

impl MessageHandler {
/// Create a new message handler.
pub fn new() -> Self {
Expand Down
6 changes: 3 additions & 3 deletions dash-spv/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl NetworkManager for TcpNetworkManager {
}

fn is_connected(&self) -> bool {
self.connection.as_ref().map_or(false, |c| c.is_connected())
self.connection.as_ref().is_some_and(|c| c.is_connected())
}

fn peer_count(&self) -> usize {
Expand Down Expand Up @@ -285,7 +285,7 @@ impl NetworkManager for TcpNetworkManager {
}

fn should_ping(&self) -> bool {
self.connection.as_ref().map_or(false, |c| c.should_ping())
self.connection.as_ref().is_some_and(|c| c.should_ping())
}

fn cleanup_old_pings(&mut self) {
Expand All @@ -302,7 +302,7 @@ impl NetworkManager for TcpNetworkManager {
if let Some(connection) = &self.connection {
// For single peer connection, return the peer's best height
match connection.peer_info().best_height {
Some(height) if height > 0 => Ok(Some(height as u32)),
Some(height) if height > 0 => Ok(Some(height)),
_ => Ok(None),
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions dash-spv/src/network/multi_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ impl MultiPeerNetworkManager {

let handle = tokio::spawn(async move {
let mut conn_guard = conn.write().await;
conn_guard.send_message(msg).await.map_err(|e| Error::Network(e))
conn_guard.send_message(msg).await.map_err(Error::Network)
});
handles.push(handle);
}
Expand Down Expand Up @@ -884,7 +884,7 @@ impl MultiPeerNetworkManager {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
addr.hash(&mut hasher);
crate::types::PeerId(hasher.finish() as u64)
crate::types::PeerId(hasher.finish())
} else {
// Default to PeerId(0) if no peer available
crate::types::PeerId(0)
Expand Down Expand Up @@ -1176,7 +1176,7 @@ impl NetworkManager for MultiPeerNetworkManager {

if let Some(peer_height) = peer_info.best_height {
if peer_height > 0 {
best_height = best_height.max(peer_height as u32);
best_height = best_height.max(peer_height);
log::debug!(
"get_peer_best_height: Updated best_height to {} from peer {}",
best_height,
Expand Down
10 changes: 8 additions & 2 deletions dash-spv/src/network/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Default for PeerReputation {
impl PeerReputation {
/// Check if the peer is currently banned
pub fn is_banned(&self) -> bool {
self.banned_until.map_or(false, |until| Instant::now() < until)
self.banned_until.is_some_and(|until| Instant::now() < until)
}

/// Get remaining ban time
Expand Down Expand Up @@ -198,6 +198,12 @@ pub struct PeerReputationManager {
max_events: usize,
}

impl Default for PeerReputationManager {
fn default() -> Self {
Self::new()
}
}

impl PeerReputationManager {
/// Create a new reputation manager
pub fn new() -> Self {
Expand All @@ -224,7 +230,7 @@ impl PeerReputationManager {
// Update score
let old_score = reputation.score;
reputation.score =
(reputation.score + score_change).max(MIN_SCORE).min(MAX_MISBEHAVIOR_SCORE);
(reputation.score + score_change).clamp(MIN_SCORE, MAX_MISBEHAVIOR_SCORE);

// Track positive/negative actions
if score_change > 0 {
Expand Down
6 changes: 6 additions & 0 deletions dash-spv/src/storage/sync_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ pub struct MemoryStorage {
block_txs: RwLock<HashMap<BlockHash, Vec<Txid>>>,
}

impl Default for MemoryStorage {
fn default() -> Self {
Self::new()
}
}

impl MemoryStorage {
pub fn new() -> Self {
Self {
Expand Down
Loading
Loading