Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,14 +880,12 @@ impl<

// Emit detailed progress update
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
// Storage tip is the headers vector index (0-based).
let current_storage_tip = {
// Storage tip now represents the absolute blockchain height.
let current_tip_height = {
let storage = self.storage.lock().await;
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
};
// Convert to absolute blockchain height: base + storage_tip
let sync_base_height = { self.state.read().await.sync_base_height };
let current_height = sync_base_height + current_storage_tip;
let current_height = current_tip_height;
let peer_best = self
.network
.get_peer_best_height()
Expand All @@ -897,9 +895,9 @@ impl<
.unwrap_or(current_height);

// Calculate headers downloaded this second
if current_storage_tip > last_height {
headers_this_second = current_storage_tip - last_height;
last_height = current_storage_tip;
if current_tip_height > last_height {
headers_this_second = current_tip_height - last_height;
last_height = current_tip_height;
}

let headers_per_second = headers_this_second as f64;
Expand Down Expand Up @@ -956,7 +954,7 @@ impl<
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
let filter_tip =
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
(self.state.read().await.sync_base_height + storage_tip, filter_tip)
(storage_tip, filter_tip)
};
if abs_header_height != last_emitted_header_height
|| filter_header_height != last_emitted_filter_header_height
Expand Down Expand Up @@ -1770,8 +1768,13 @@ impl<
let mut loaded_count = 0u32;
let target_height = saved_state.chain_tip.height;

// Start from height 1 (genesis is already in ChainState)
let mut current_height = 1u32;
// Determine first height to load. Skip genesis (already present) unless we started from a checkpoint base.
let mut current_height =
if saved_state.synced_from_checkpoint && saved_state.sync_base_height > 0 {
saved_state.sync_base_height
} else {
1u32
};

while current_height <= target_height {
let end_height = (current_height + BATCH_SIZE - 1).min(target_height);
Expand All @@ -1786,12 +1789,12 @@ impl<
};

if headers.is_empty() {
tracing::error!(
"Failed to load headers for range {}..{} - storage may be corrupted",
tracing::warn!(
"No headers found for range {}..{} when restoring from state",
current_height,
end_height + 1
);
return Ok(false);
break;
}

// Validate headers before adding to chain state
Expand Down
8 changes: 4 additions & 4 deletions dash-spv/src/client/status_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
// For checkpoint sync: height = checkpoint_height + storage_count
let storage = self.storage.lock().await;
if let Ok(Some(storage_tip)) = storage.get_tip_height().await {
let blockchain_height = state.sync_base_height + storage_tip;
let blockchain_height = storage_tip;
if with_logging {
tracing::debug!(
"Status display: storage_tip={}, sync_base={}, blockchain_height={}",
storage_tip,
"Status display: reported tip height={}, sync_base={}, raw_storage_tip={}",
blockchain_height,
state.sync_base_height,
blockchain_height
storage_tip
);
}
blockchain_height
Expand Down
116 changes: 82 additions & 34 deletions dash-spv/src/storage/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,25 +1159,36 @@ impl StorageManager for DiskStorageManager {
async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>> {
let mut headers = Vec::new();

let start_segment = Self::get_segment_id(range.start);
let end_segment = Self::get_segment_id(range.end.saturating_sub(1));
// Convert blockchain height range to storage index range using sync_base_height
let sync_base_height = *self.sync_base_height.read().await;
let storage_start = if sync_base_height > 0 && range.start >= sync_base_height {
range.start - sync_base_height
} else {
range.start
};

let storage_end = if sync_base_height > 0 && range.end > sync_base_height {
range.end - sync_base_height
} else {
range.end
};

let start_segment = Self::get_segment_id(storage_start);
let end_segment = Self::get_segment_id(storage_end.saturating_sub(1));

for segment_id in start_segment..=end_segment {
self.ensure_segment_loaded(segment_id).await?;

let segments = self.active_segments.read().await;
if let Some(segment) = segments.get(&segment_id) {
let _segment_start_height = segment_id * HEADERS_PER_SEGMENT;
let _segment_end_height = _segment_start_height + segment.headers.len() as u32;

let start_idx = if segment_id == start_segment {
Self::get_segment_offset(range.start)
Self::get_segment_offset(storage_start)
} else {
0
};

let end_idx = if segment_id == end_segment {
Self::get_segment_offset(range.end.saturating_sub(1)) + 1
Self::get_segment_offset(storage_end.saturating_sub(1)) + 1
} else {
segment.headers.len()
};
Expand All @@ -1198,17 +1209,31 @@ impl StorageManager for DiskStorageManager {
}

async fn get_header(&self, height: u32) -> StorageResult<Option<BlockHeader>> {
// TODO: This method currently expects storage-relative heights (0-based from sync_base_height).
// Consider refactoring to accept blockchain heights and handle conversion internally for better UX.
// Accept blockchain (absolute) height and convert to storage index using sync_base_height.
let sync_base_height = *self.sync_base_height.read().await;

// First check if this height is within our known range
let tip_height = self.cached_tip_height.read().await;
if let Some(tip) = *tip_height {
if height > tip {
// Convert absolute height to storage index (base-inclusive mapping)
let storage_index = if sync_base_height > 0 {
if height >= sync_base_height {
height - sync_base_height
} else {
// If caller passes a small value (likely a pre-conversion storage index), use it directly
height
}
} else {
height
};

// First check if this storage index is within our known range
let tip_index_opt = *self.cached_tip_height.read().await;
if let Some(tip_index) = tip_index_opt {
if storage_index > tip_index {
tracing::trace!(
"Requested header at height {} is beyond tip height {}",
"Requested header at storage index {} is beyond tip index {} (abs height {} base {})",
storage_index,
tip_index,
height,
tip
sync_base_height
);
return Ok(None);
}
Expand All @@ -1217,8 +1242,8 @@ impl StorageManager for DiskStorageManager {
return Ok(None);
}

let segment_id = Self::get_segment_id(height);
let offset = Self::get_segment_offset(height);
let segment_id = Self::get_segment_id(storage_index);
let offset = Self::get_segment_offset(storage_index);

self.ensure_segment_loaded(segment_id).await?;

Expand All @@ -1235,18 +1260,30 @@ impl StorageManager for DiskStorageManager {

if header.is_none() {
tracing::debug!(
"Header not found at height {} (segment: {}, offset: {})",
height,
"Header not found at storage index {} (segment: {}, offset: {}, abs height {}, base {})",
storage_index,
segment_id,
offset
offset,
height,
sync_base_height
);
}

Ok(header)
}

async fn get_tip_height(&self) -> StorageResult<Option<u32>> {
Ok(*self.cached_tip_height.read().await)
let tip_index_opt = *self.cached_tip_height.read().await;
if let Some(tip_index) = tip_index_opt {
let base = *self.sync_base_height.read().await;
if base > 0 {
Ok(Some(base + tip_index))
} else {
Ok(Some(tip_index))
}
} else {
Ok(None)
}
}

async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
Expand Down Expand Up @@ -1487,7 +1524,12 @@ impl StorageManager for DiskStorageManager {

// Load all headers
if let Some(tip_height) = self.get_tip_height().await? {
state.headers = self.load_headers(0..tip_height + 1).await?;
let range_start = if state.synced_from_checkpoint && state.sync_base_height > 0 {
state.sync_base_height
} else {
0
};
state.headers = self.load_headers(range_start..tip_height + 1).await?;
}

// Load all filter headers
Expand Down Expand Up @@ -2032,16 +2074,22 @@ mod tests {
// Store headers using checkpoint sync method
storage.store_headers_from_height(&headers, checkpoint_height).await?;

// Verify headers are stored at correct storage indices
// Header at blockchain height 1,100,000 should be at storage index 0
let header_at_0 = storage.get_header(0).await?;
assert!(header_at_0.is_some(), "Header at storage index 0 should exist");
assert_eq!(header_at_0.unwrap(), headers[0]);
// Set sync base height so storage interprets heights as blockchain heights
let mut base_state = ChainState::new();
base_state.sync_base_height = checkpoint_height;
base_state.synced_from_checkpoint = true;
storage.store_chain_state(&base_state).await?;

// Verify headers are stored at correct blockchain heights
// Header at blockchain height 1,100,000 should be retrievable by that height
let header_at_base = storage.get_header(checkpoint_height).await?;
assert!(header_at_base.is_some(), "Header at base blockchain height should exist");
assert_eq!(header_at_base.unwrap(), headers[0]);

// Header at blockchain height 1,100,099 should be at storage index 99
let header_at_99 = storage.get_header(99).await?;
assert!(header_at_99.is_some(), "Header at storage index 99 should exist");
assert_eq!(header_at_99.unwrap(), headers[99]);
// Header at blockchain height 1,100,099 should be retrievable by that height
let header_at_ending = storage.get_header(checkpoint_height + 99).await?;
assert!(header_at_ending.is_some(), "Header at ending blockchain height should exist");
assert_eq!(header_at_ending.unwrap(), headers[99]);

// Test the reverse index (hash -> blockchain height)
let hash_0 = headers[0].block_hash();
Expand Down Expand Up @@ -2081,11 +2129,11 @@ mod tests {
"After index rebuild, hash should still map to blockchain height 1,100,000"
);

// Verify headers can still be retrieved by storage index
let header_after_reload = storage2.get_header(0).await?;
// Verify header can still be retrieved by blockchain height after reload
let header_after_reload = storage2.get_header(checkpoint_height).await?;
assert!(
header_after_reload.is_some(),
"Header at storage index 0 should exist after reload"
"Header at base blockchain height should exist after reload"
);
assert_eq!(header_after_reload.unwrap(), headers[0]);

Expand Down
88 changes: 75 additions & 13 deletions dash-spv/src/storage/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,76 @@ impl StorageManager for MemoryStorageManager {
}

async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>> {
let start = range.start as usize;
let end = range.end.min(self.headers.len() as u32) as usize;
// Interpret range as blockchain (absolute) heights and map to storage indices
let (base, has_base) = match self.load_sync_state().await {
Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => {
(state.sync_base_height, true)
}
_ => (0u32, false),
};

let start_idx = if has_base {
if range.start < base {
0usize
} else {
(range.start - base) as usize
}
} else {
range.start as usize
};

let end_abs = range.end.min(if has_base {
base + self.headers.len() as u32
} else {
self.headers.len() as u32
});
let end_idx = if has_base {
if end_abs <= base {
0usize
} else {
(end_abs - base) as usize
}
} else {
end_abs as usize
};

if start > self.headers.len() {
if start_idx > self.headers.len() {
return Ok(Vec::new());
}

Ok(self.headers[start..end].to_vec())
let end_idx = end_idx.min(self.headers.len());
Ok(self.headers[start_idx..end_idx].to_vec())
}

async fn get_header(&self, height: u32) -> StorageResult<Option<BlockHeader>> {
Ok(self.headers.get(height as usize).copied())
// Accept blockchain (absolute) height; convert to storage index using base (if any)
let base = match self.load_sync_state().await {
Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => {
state.sync_base_height
}
_ => 0u32,
};

// If height >= base, treat as absolute; else accept it as a storage index for compatibility
let idx = if base > 0 && height >= base {
(height - base) as usize
} else {
height as usize
};

Ok(self.headers.get(idx).copied())
}

async fn get_tip_height(&self) -> StorageResult<Option<u32>> {
if self.headers.is_empty() {
Ok(None)
} else {
Ok(Some(self.headers.len() as u32 - 1))
return Ok(None);
}
let base = match self.load_sync_state().await {
Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => {
state.sync_base_height
}
_ => 0u32,
};
Ok(Some(base + self.headers.len() as u32 - 1))
}

async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
Expand Down Expand Up @@ -349,11 +399,23 @@ impl StorageManager for MemoryStorageManager {
return Ok(Vec::new());
}

let mut results = Vec::with_capacity((end_height - start_height + 1) as usize);
// Map absolute heights to storage indices
let base = match self.load_sync_state().await {
Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => {
state.sync_base_height
}
_ => 0u32,
};

for height in start_height..=end_height {
if let Some(header) = self.headers.get(height as usize) {
results.push((height, *header));
let mut results = Vec::with_capacity((end_height - start_height + 1) as usize);
for abs_h in start_height..=end_height {
let idx = if base > 0 && abs_h >= base {
(abs_h - base) as usize
} else {
abs_h as usize
};
if let Some(header) = self.headers.get(idx) {
results.push((abs_h, *header));
}
}

Expand Down
Loading
Loading