@@ -256,7 +256,7 @@ impl<I: Persistable> SegmentCache<I> {
256256 let end_segment = Self :: index_to_segment_id ( storage_end_idx) ;
257257
258258 for segment_id in start_segment..=end_segment {
259- let segment = self . get_segment ( & segment_id) . await ?;
259+ let segment = self . get_segment_mut ( & segment_id) . await ?;
260260 let item_count = segment. items . len ( ) as u32 ;
261261
262262 let seg_start_idx = if segment_id == start_segment {
@@ -271,11 +271,7 @@ impl<I: Persistable> SegmentCache<I> {
271271 item_count
272272 } ;
273273
274- if seg_start_idx < item_count && seg_end_idx <= item_count {
275- items. extend_from_slice (
276- & segment. items [ seg_start_idx as usize ..seg_end_idx as usize ] ,
277- ) ;
278- }
274+ items. extend_from_slice ( segment. get ( seg_start_idx..seg_end_idx) ) ;
279275 }
280276
281277 Ok ( items)
@@ -394,11 +390,14 @@ pub struct Segment<I: Persistable> {
394390impl < I : Persistable > Segment < I > {
395391 const ITEMS_PER_SEGMENT : u32 = 50_000 ;
396392
397- fn new ( segment_id : u32 , items : Vec < I > ) -> Self {
393+ fn new ( segment_id : u32 , mut items : Vec < I > , state : SegmentState ) -> Self {
394+ debug_assert ! ( items. len( ) <= Self :: ITEMS_PER_SEGMENT as usize ) ;
395+ items. truncate ( Self :: ITEMS_PER_SEGMENT as usize ) ;
396+
398397 Self {
399398 segment_id,
400399 items,
401- state : SegmentState :: Clean ,
400+ state,
402401 last_accessed : Instant :: now ( ) ,
403402 }
404403 }
@@ -407,7 +406,7 @@ impl<I: Persistable> Segment<I> {
407406 // Load segment from disk
408407 let segment_path = base_path. join ( I :: relative_disk_path ( segment_id) ) ;
409408
410- let items = if segment_path. exists ( ) {
409+ let ( items, state ) = if segment_path. exists ( ) {
411410 let file = File :: open ( & segment_path) ?;
412411 let mut reader = BufReader :: new ( file) ;
413412 let mut items = Vec :: with_capacity ( Segment :: < I > :: ITEMS_PER_SEGMENT as usize ) ;
@@ -429,12 +428,12 @@ impl<I: Persistable> Segment<I> {
429428 }
430429 }
431430
432- items
431+ ( items, SegmentState :: Clean )
433432 } else {
434- Vec :: with_capacity ( Self :: ITEMS_PER_SEGMENT as usize )
433+ ( Vec :: with_capacity ( Self :: ITEMS_PER_SEGMENT as usize ) , SegmentState :: Dirty )
435434 } ;
436435
437- Ok ( Self :: new ( segment_id, items) )
436+ Ok ( Self :: new ( segment_id, items, state ) )
438437 }
439438
440439 pub fn persist ( & mut self , base_path : & Path ) -> StorageResult < ( ) > {
@@ -466,6 +465,8 @@ impl<I: Persistable> Segment<I> {
466465 }
467466
468467 pub fn insert ( & mut self , item : I , offset : u32 ) {
468+ debug_assert ! ( offset < Self :: ITEMS_PER_SEGMENT ) ;
469+
469470 let offset = offset as usize ;
470471
471472 debug_assert ! ( offset <= self . items. len( ) ) ;
@@ -476,7 +477,8 @@ impl<I: Persistable> Segment<I> {
476477 self . items . push ( item) ;
477478 } else {
478479 tracing:: error!(
479- "Tried to store an item out of the allowed bounds in segment with id {}" ,
480+ "Tried to store an item out of the allowed bounds (offset {}) in segment with id {}" ,
481+ offset,
480482 self . segment_id
481483 ) ;
482484 }
@@ -485,4 +487,209 @@ impl<I: Persistable> Segment<I> {
485487 self . state = SegmentState :: Dirty ;
486488 self . last_accessed = std:: time:: Instant :: now ( ) ;
487489 }
490+
491+ pub fn get ( & mut self , range : Range < u32 > ) -> & [ I ] {
492+ self . last_accessed = std:: time:: Instant :: now ( ) ;
493+
494+ if range. start as usize >= self . items . len ( ) {
495+ return & [ ] ;
496+ } ;
497+
498+ let end = range. end . min ( self . items . len ( ) as u32 ) ;
499+
500+ & self . items [ range. start as usize ..end as usize ]
501+ }
502+ }
503+
504+ #[ cfg( test) ]
505+ mod tests {
506+ use dashcore_hashes:: Hash ;
507+ use tempfile:: TempDir ;
508+
509+ use super :: * ;
510+
511+ trait TestStruct {
512+ fn new_test ( id : u32 ) -> Self ;
513+ }
514+
515+ impl TestStruct for FilterHeader {
516+ fn new_test ( id : u32 ) -> Self {
517+ let mut bytes = [ 0u8 ; 32 ] ;
518+ bytes[ 0 ..4 ] . copy_from_slice ( & id. to_le_bytes ( ) ) ;
519+ FilterHeader :: from_raw_hash ( dashcore_hashes:: sha256d:: Hash :: from_byte_array ( bytes) )
520+ }
521+ }
522+
523+ #[ tokio:: test]
524+ async fn test_segment_cache_eviction ( ) {
525+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
526+
527+ const MAX_SEGMENTS : u32 = SegmentCache :: < FilterHeader > :: MAX_ACTIVE_SEGMENTS as u32 ;
528+
529+ let mut cache = SegmentCache :: < FilterHeader > :: new ( tmp_dir. path ( ) )
530+ . await
531+ . expect ( "Failed to create new segment_cache" ) ;
532+
533+ // This logic is a little tricky. Each cache can contain up to MAX_SEGMENTS segments in memory.
534+ // By storing MAX_SEGMENTS + 1 items, we ensure that the cache will evict the first introduced.
535+ // Then, by asking again in order starting in 0, we force the cache to load the evicted segment
536+ // from disk, evicting at the same time the next, 1 in this case. Then we ask for the 1 that we
537+ // know is evicted and so on.
538+
539+ for i in 0 ..=MAX_SEGMENTS {
540+ let segment = cache. get_segment_mut ( & i) . await . expect ( "Failed to create a new segment" ) ;
541+ assert ! ( segment. items. is_empty( ) ) ;
542+ assert ! ( segment. state == SegmentState :: Dirty ) ;
543+
544+ segment. items = vec ! [ FilterHeader :: new_test( i) ] ;
545+ }
546+
547+ for i in 0 ..=MAX_SEGMENTS {
548+ assert_eq ! ( cache. segments. len( ) , MAX_SEGMENTS as usize ) ;
549+
550+ let segment = cache. get_segment_mut ( & i) . await . expect ( "Failed to load segment" ) ;
551+
552+ assert_eq ! ( segment. items. len( ) , 1 ) ;
553+ assert_eq ! ( segment. get( 0 ..1 ) , [ FilterHeader :: new_test( i) ] ) ;
554+ assert ! ( segment. state == SegmentState :: Clean ) ;
555+ }
556+ }
557+
558+ #[ tokio:: test]
559+ async fn test_segment_cache_persist_load ( ) {
560+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
561+
562+ let items: Vec < _ > = ( 0 ..10 ) . map ( FilterHeader :: new_test) . collect ( ) ;
563+
564+ let mut cache = SegmentCache :: < FilterHeader > :: new ( tmp_dir. path ( ) )
565+ . await
566+ . expect ( "Failed to create new segment_cache" ) ;
567+
568+ let segment = cache. get_segment_mut ( & 0 ) . await . expect ( "Failed to create a new segment" ) ;
569+
570+ assert_eq ! ( segment. state, SegmentState :: Dirty ) ;
571+ segment. items = items. clone ( ) ;
572+
573+ assert ! ( segment. persist( tmp_dir. path( ) ) . is_ok( ) ) ;
574+
575+ cache. clear_in_memory ( ) ;
576+ assert ! ( cache. segments. is_empty( ) ) ;
577+
578+ let segment = cache. get_segment ( & 0 ) . await . expect ( "Failed to load segment" ) ;
579+
580+ assert_eq ! ( segment. items, items) ;
581+ assert_eq ! ( segment. state, SegmentState :: Clean ) ;
582+
583+ cache. clear_all ( ) . await . expect ( "Failed to clean on-memory and on-disk data" ) ;
584+ assert ! ( cache. segments. is_empty( ) ) ;
585+
586+ let segment = cache. get_segment ( & 0 ) . await . expect ( "Failed to create a new segment" ) ;
587+
588+ assert ! ( segment. items. is_empty( ) ) ;
589+ assert_eq ! ( segment. state, SegmentState :: Dirty ) ;
590+ }
591+
592+ #[ tokio:: test]
593+ async fn test_segment_cache_get_insert ( ) {
594+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
595+
596+ const ITEMS_PER_SEGMENT : u32 = Segment :: < FilterHeader > :: ITEMS_PER_SEGMENT ;
597+
598+ let mut cache = SegmentCache :: < FilterHeader > :: new ( tmp_dir. path ( ) )
599+ . await
600+ . expect ( "Failed to create new segment_cache" ) ;
601+
602+ let items = cache
603+ . get_items ( 0 ..ITEMS_PER_SEGMENT )
604+ . await
605+ . expect ( "segment cache couldn't return items" ) ;
606+
607+ assert ! ( items. is_empty( ) ) ;
608+
609+ let items = cache
610+ . get_items ( 0 ..ITEMS_PER_SEGMENT + 1 )
611+ . await
612+ . expect ( "segment cache couldn't return items" ) ;
613+
614+ assert ! ( items. is_empty( ) ) ;
615+
616+ // Cannot test the store logic bcs it depends on the DiskStorageManager, test that struct properly or
617+ // remove the necessity of it
618+ }
619+
620+ #[ tokio:: test]
621+ async fn test_segment_persist_load ( ) {
622+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
623+
624+ let segment_id = 10 ;
625+
626+ const MAX_ITEMS : u32 = Segment :: < FilterHeader > :: ITEMS_PER_SEGMENT ;
627+
628+ // Testing with half full segment
629+ let items = ( 0 ..MAX_ITEMS / 2 ) . map ( FilterHeader :: new_test) . collect ( ) ;
630+ let mut segment = Segment :: new ( segment_id, items, SegmentState :: Dirty ) ;
631+
632+ assert_eq ! ( segment. get( MAX_ITEMS ..MAX_ITEMS + 1 ) , [ ] ) ;
633+ assert_eq ! (
634+ segment. get( 0 ..MAX_ITEMS / 2 ) ,
635+ & ( 0 ..MAX_ITEMS / 2 ) . map( FilterHeader :: new_test) . collect:: <Vec <_>>( )
636+ ) ;
637+ assert_eq ! (
638+ segment. get( MAX_ITEMS / 2 - 1 ..MAX_ITEMS / 2 ) ,
639+ [ FilterHeader :: new_test( MAX_ITEMS / 2 - 1 ) ]
640+ ) ;
641+ assert_eq ! ( segment. get( MAX_ITEMS / 2 ..MAX_ITEMS / 2 + 1 ) , [ ] ) ;
642+ assert_eq ! ( segment. get( MAX_ITEMS - 1 ..MAX_ITEMS ) , [ ] ) ;
643+
644+ assert_eq ! ( segment. state, SegmentState :: Dirty ) ;
645+ assert ! ( segment. persist( tmp_dir. path( ) ) . is_ok( ) ) ;
646+ assert_eq ! ( segment. state, SegmentState :: Clean ) ;
647+
648+ let mut loaded_segment =
649+ Segment :: < FilterHeader > :: load ( tmp_dir. path ( ) , segment_id) . await . unwrap ( ) ;
650+
651+ assert_eq ! (
652+ loaded_segment. get( MAX_ITEMS ..MAX_ITEMS + 1 ) ,
653+ segment. get( MAX_ITEMS ..MAX_ITEMS + 1 )
654+ ) ;
655+ assert_eq ! ( loaded_segment. get( 0 ..1 ) , segment. get( 0 ..1 ) ) ;
656+ assert_eq ! (
657+ loaded_segment. get( MAX_ITEMS / 2 - 1 ..MAX_ITEMS / 2 ) ,
658+ segment. get( MAX_ITEMS / 2 - 1 ..MAX_ITEMS / 2 )
659+ ) ;
660+ assert_eq ! (
661+ loaded_segment. get( MAX_ITEMS / 2 ..MAX_ITEMS / 2 + 1 ) ,
662+ segment. get( MAX_ITEMS / 2 ..MAX_ITEMS / 2 + 1 )
663+ ) ;
664+ assert_eq ! (
665+ loaded_segment. get( MAX_ITEMS - 1 ..MAX_ITEMS ) ,
666+ segment. get( MAX_ITEMS - 1 ..MAX_ITEMS )
667+ ) ;
668+ }
669+
670+ #[ test]
671+ fn test_segment_insert_get ( ) {
672+ let segment_id = 10 ;
673+
674+ const MAX_ITEMS : u32 = Segment :: < FilterHeader > :: ITEMS_PER_SEGMENT ;
675+
676+ let items = ( 0 ..10 ) . map ( FilterHeader :: new_test) . collect ( ) ;
677+
678+ let mut segment = Segment :: new ( segment_id, items, SegmentState :: Dirty ) ;
679+
680+ assert_eq ! ( segment. items. len( ) , 10 ) ;
681+ assert_eq ! (
682+ segment. get( 0 ..MAX_ITEMS + 1 ) ,
683+ & ( 0 ..10 ) . map( FilterHeader :: new_test) . collect:: <Vec <_>>( )
684+ ) ;
685+
686+ segment. insert ( FilterHeader :: new_test ( 4 ) , 4 ) ;
687+ segment. insert ( FilterHeader :: new_test ( 10 ) , 10 ) ;
688+
689+ assert_eq ! ( segment. items. len( ) , 11 ) ;
690+ assert_eq ! (
691+ segment. get( 0 ..MAX_ITEMS + 1 ) ,
692+ & ( 0 ..11 ) . map( FilterHeader :: new_test) . collect:: <Vec <_>>( )
693+ ) ;
694+ }
488695}
0 commit comments