Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 220 additions & 13 deletions dash-spv/src/storage/disk/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl<I: Persistable> SegmentCache<I> {
let end_segment = Self::index_to_segment_id(storage_end_idx);

for segment_id in start_segment..=end_segment {
let segment = self.get_segment(&segment_id).await?;
let segment = self.get_segment_mut(&segment_id).await?;
let item_count = segment.items.len() as u32;

let seg_start_idx = if segment_id == start_segment {
Expand All @@ -271,11 +271,7 @@ impl<I: Persistable> SegmentCache<I> {
item_count
};

if seg_start_idx < item_count && seg_end_idx <= item_count {
items.extend_from_slice(
&segment.items[seg_start_idx as usize..seg_end_idx as usize],
);
}
items.extend_from_slice(segment.get(seg_start_idx..seg_end_idx));
}

Ok(items)
Expand Down Expand Up @@ -394,11 +390,14 @@ pub struct Segment<I: Persistable> {
impl<I: Persistable> Segment<I> {
const ITEMS_PER_SEGMENT: u32 = 50_000;

fn new(segment_id: u32, items: Vec<I>) -> Self {
fn new(segment_id: u32, mut items: Vec<I>, state: SegmentState) -> Self {
debug_assert!(items.len() <= Self::ITEMS_PER_SEGMENT as usize);
items.truncate(Self::ITEMS_PER_SEGMENT as usize);

Self {
segment_id,
items,
state: SegmentState::Clean,
state,
last_accessed: Instant::now(),
}
}
Expand All @@ -407,7 +406,7 @@ impl<I: Persistable> Segment<I> {
// Load segment from disk
let segment_path = base_path.join(I::relative_disk_path(segment_id));

let items = if segment_path.exists() {
let (items, state) = if segment_path.exists() {
let file = File::open(&segment_path)?;
let mut reader = BufReader::new(file);
let mut items = Vec::with_capacity(Segment::<I>::ITEMS_PER_SEGMENT as usize);
Expand All @@ -429,12 +428,12 @@ impl<I: Persistable> Segment<I> {
}
}

items
(items, SegmentState::Clean)
} else {
Vec::with_capacity(Self::ITEMS_PER_SEGMENT as usize)
(Vec::with_capacity(Self::ITEMS_PER_SEGMENT as usize), SegmentState::Dirty)
};

Ok(Self::new(segment_id, items))
Ok(Self::new(segment_id, items, state))
}

pub fn persist(&mut self, base_path: &Path) -> StorageResult<()> {
Expand Down Expand Up @@ -466,6 +465,8 @@ impl<I: Persistable> Segment<I> {
}

pub fn insert(&mut self, item: I, offset: u32) {
debug_assert!(offset < Self::ITEMS_PER_SEGMENT);

let offset = offset as usize;

debug_assert!(offset <= self.items.len());
Expand All @@ -476,7 +477,8 @@ impl<I: Persistable> Segment<I> {
self.items.push(item);
} else {
tracing::error!(
"Tried to store an item out of the allowed bounds in segment with id {}",
"Tried to store an item out of the allowed bounds (offset {}) in segment with id {}",
offset,
self.segment_id
);
}
Expand All @@ -485,4 +487,209 @@ impl<I: Persistable> Segment<I> {
self.state = SegmentState::Dirty;
self.last_accessed = std::time::Instant::now();
}

pub fn get(&mut self, range: Range<u32>) -> &[I] {
self.last_accessed = std::time::Instant::now();

if range.start as usize >= self.items.len() {
return &[];
};

let end = range.end.min(self.items.len() as u32);

&self.items[range.start as usize..end as usize]
}
}

#[cfg(test)]
mod tests {
use dashcore_hashes::Hash;
use tempfile::TempDir;

use super::*;

trait TestStruct {
fn new_test(id: u32) -> Self;
}

impl TestStruct for FilterHeader {
fn new_test(id: u32) -> Self {
let mut bytes = [0u8; 32];
bytes[0..4].copy_from_slice(&id.to_le_bytes());
FilterHeader::from_raw_hash(dashcore_hashes::sha256d::Hash::from_byte_array(bytes))
}
}

#[tokio::test]
async fn test_segment_cache_eviction() {
let tmp_dir = TempDir::new().unwrap();

const MAX_SEGMENTS: u32 = SegmentCache::<FilterHeader>::MAX_ACTIVE_SEGMENTS as u32;

let mut cache = SegmentCache::<FilterHeader>::new(tmp_dir.path())
.await
.expect("Failed to create new segment_cache");

// This logic is a little tricky. Each cache can contain up to MAX_SEGMENTS segments in memory.
// By storing MAX_SEGMENTS + 1 items, we ensure that the cache will evict the first introduced.
// Then, by asking again in order starting in 0, we force the cache to load the evicted segment
// from disk, evicting at the same time the next, 1 in this case. Then we ask for the 1 that we
// know is evicted and so on.

for i in 0..=MAX_SEGMENTS {
let segment = cache.get_segment_mut(&i).await.expect("Failed to create a new segment");
assert!(segment.items.is_empty());
assert!(segment.state == SegmentState::Dirty);

segment.items = vec![FilterHeader::new_test(i)];
}

for i in 0..=MAX_SEGMENTS {
assert_eq!(cache.segments.len(), MAX_SEGMENTS as usize);

let segment = cache.get_segment_mut(&i).await.expect("Failed to load segment");

assert_eq!(segment.items.len(), 1);
assert_eq!(segment.get(0..1), [FilterHeader::new_test(i)]);
assert!(segment.state == SegmentState::Clean);
}
}

#[tokio::test]
async fn test_segment_cache_persist_load() {
let tmp_dir = TempDir::new().unwrap();

let items: Vec<_> = (0..10).map(FilterHeader::new_test).collect();

let mut cache = SegmentCache::<FilterHeader>::new(tmp_dir.path())
.await
.expect("Failed to create new segment_cache");

let segment = cache.get_segment_mut(&0).await.expect("Failed to create a new segment");

assert_eq!(segment.state, SegmentState::Dirty);
segment.items = items.clone();

assert!(segment.persist(tmp_dir.path()).is_ok());

cache.clear_in_memory();
assert!(cache.segments.is_empty());

let segment = cache.get_segment(&0).await.expect("Failed to load segment");

assert_eq!(segment.items, items);
assert_eq!(segment.state, SegmentState::Clean);

cache.clear_all().await.expect("Failed to clean on-memory and on-disk data");
assert!(cache.segments.is_empty());

let segment = cache.get_segment(&0).await.expect("Failed to create a new segment");

assert!(segment.items.is_empty());
assert_eq!(segment.state, SegmentState::Dirty);
}

#[tokio::test]
async fn test_segment_cache_get_insert() {
let tmp_dir = TempDir::new().unwrap();

const ITEMS_PER_SEGMENT: u32 = Segment::<FilterHeader>::ITEMS_PER_SEGMENT;

let mut cache = SegmentCache::<FilterHeader>::new(tmp_dir.path())
.await
.expect("Failed to create new segment_cache");

let items = cache
.get_items(0..ITEMS_PER_SEGMENT)
.await
.expect("segment cache couldn't return items");

assert!(items.is_empty());

let items = cache
.get_items(0..ITEMS_PER_SEGMENT + 1)
.await
.expect("segment cache couldn't return items");

assert!(items.is_empty());

// Cannot test the store logic bcs it depends on the DiskStorageManager, test that struct properly or
// remove the necessity of it
}

#[tokio::test]
async fn test_segment_persist_load() {
let tmp_dir = TempDir::new().unwrap();

let segment_id = 10;

const MAX_ITEMS: u32 = Segment::<FilterHeader>::ITEMS_PER_SEGMENT;

// Testing with half full segment
let items = (0..MAX_ITEMS / 2).map(FilterHeader::new_test).collect();
let mut segment = Segment::new(segment_id, items, SegmentState::Dirty);

assert_eq!(segment.get(MAX_ITEMS..MAX_ITEMS + 1), []);
assert_eq!(
segment.get(0..MAX_ITEMS / 2),
&(0..MAX_ITEMS / 2).map(FilterHeader::new_test).collect::<Vec<_>>()
);
assert_eq!(
segment.get(MAX_ITEMS / 2 - 1..MAX_ITEMS / 2),
[FilterHeader::new_test(MAX_ITEMS / 2 - 1)]
);
assert_eq!(segment.get(MAX_ITEMS / 2..MAX_ITEMS / 2 + 1), []);
assert_eq!(segment.get(MAX_ITEMS - 1..MAX_ITEMS), []);

assert_eq!(segment.state, SegmentState::Dirty);
assert!(segment.persist(tmp_dir.path()).is_ok());
assert_eq!(segment.state, SegmentState::Clean);

let mut loaded_segment =
Segment::<FilterHeader>::load(tmp_dir.path(), segment_id).await.unwrap();

assert_eq!(
loaded_segment.get(MAX_ITEMS..MAX_ITEMS + 1),
segment.get(MAX_ITEMS..MAX_ITEMS + 1)
);
assert_eq!(loaded_segment.get(0..1), segment.get(0..1));
assert_eq!(
loaded_segment.get(MAX_ITEMS / 2 - 1..MAX_ITEMS / 2),
segment.get(MAX_ITEMS / 2 - 1..MAX_ITEMS / 2)
);
assert_eq!(
loaded_segment.get(MAX_ITEMS / 2..MAX_ITEMS / 2 + 1),
segment.get(MAX_ITEMS / 2..MAX_ITEMS / 2 + 1)
);
assert_eq!(
loaded_segment.get(MAX_ITEMS - 1..MAX_ITEMS),
segment.get(MAX_ITEMS - 1..MAX_ITEMS)
);
}

#[test]
fn test_segment_insert_get() {
let segment_id = 10;

const MAX_ITEMS: u32 = Segment::<FilterHeader>::ITEMS_PER_SEGMENT;

let items = (0..10).map(FilterHeader::new_test).collect();

let mut segment = Segment::new(segment_id, items, SegmentState::Dirty);

assert_eq!(segment.items.len(), 10);
assert_eq!(
segment.get(0..MAX_ITEMS + 1),
&(0..10).map(FilterHeader::new_test).collect::<Vec<_>>()
);

segment.insert(FilterHeader::new_test(4), 4);
segment.insert(FilterHeader::new_test(10), 10);

assert_eq!(segment.items.len(), 11);
assert_eq!(
segment.get(0..MAX_ITEMS + 1),
&(0..11).map(FilterHeader::new_test).collect::<Vec<_>>()
);
}
}