@@ -148,9 +148,10 @@ pub struct CacheEvictionManager {
148148 persist_batch_size : usize ,
149149 eviction_batch_size : usize ,
150150 eviction_below_threshold : u8 ,
151- // if ttl of a key is less then this value, key will be evicted
152- // this help to delete upcoming keys for deleting
153- eviction_min_ttl_threshold : u32 ,
151+ /// Proactive deletion of keys with upcoming expiration in the next N seconds. + it checks size, because
152+ /// possible it can lead to a drop of refresh keys or touch flags
153+ eviction_proactive_size_threshold : u32 ,
154+ eviction_proactive_ttl_threshold : u32 ,
154155 compaction_trigger_size : u64 ,
155156 // background listener to track events
156157 _ttl_tl_loop_join_handle : Arc < AbortingJoinHandle < ( ) > > ,
@@ -308,7 +309,10 @@ impl CacheEvictionManager {
308309 persist_batch_size : config. cachestore_cache_persist_batch_size ( ) ,
309310 eviction_batch_size : config. cachestore_cache_eviction_batch_size ( ) ,
310311 eviction_below_threshold : config. cachestore_cache_eviction_below_threshold ( ) ,
311- eviction_min_ttl_threshold : config. cachestore_cache_eviction_min_ttl_threshold ( ) ,
312+ eviction_proactive_size_threshold : config
313+ . cachestore_cache_eviction_proactive_size_threshold ( ) ,
314+ eviction_proactive_ttl_threshold : config
315+ . cachestore_cache_eviction_proactive_ttl_threshold ( ) ,
312316 compaction_trigger_size : config. cachestore_cache_compaction_trigger_size ( ) ,
313317 //
314318 _ttl_tl_loop_join_handle : Arc :: new ( AbortingJoinHandle :: new ( join_handle) ) ,
@@ -616,7 +620,8 @@ impl CacheEvictionManager {
616620 criteria : CacheEvictionWeightCriteria ,
617621 store : & Arc < RocksStore > ,
618622 ) -> Result < ( KeysVector , KeysVector ) , CubeError > {
619- let eviction_min_ttl_threshold = self . eviction_min_ttl_threshold as i64 ;
623+ let eviction_proactive_ttl_threshold = self . eviction_proactive_ttl_threshold ;
624+ let eviction_proactive_size_threshold = self . eviction_proactive_size_threshold ;
620625
621626 let ( all_keys, stats_total_keys, stats_total_raw_size, expired_keys) = store
622627 . read_operation_out_of_queue ( move |db_ref| {
@@ -625,8 +630,7 @@ impl CacheEvictionManager {
625630
626631 let cache_schema = CacheItemRocksTable :: new ( db_ref. clone ( ) ) ;
627632
628- let now_with_threshold =
629- Utc :: now ( ) + chrono:: Duration :: seconds ( eviction_min_ttl_threshold) ;
633+ let now_at_start = Utc :: now ( ) ;
630634
631635 let mut expired_keys = KeysVector :: with_capacity ( 64 ) ;
632636 let mut all_keys: Vec < (
@@ -646,7 +650,19 @@ impl CacheEvictionManager {
646650 stats_total_raw_size += raw_size as u64 ;
647651
648652 if let Some ( ttl) = item. ttl {
649- if ttl < now_with_threshold {
653+ let ready_to_delete = if ttl <= now_at_start {
654+ true
655+ } else if ttl - now_at_start
656+ <= chrono:: Duration :: seconds ( eviction_proactive_ttl_threshold as i64 )
657+ {
658+ // Checking the size of the key, because it can be problematic to delete keys with small size, because
659+ // it can be a refresh key.
660+ raw_size > eviction_proactive_size_threshold
661+ } else {
662+ false
663+ } ;
664+
665+ if ready_to_delete {
650666 expired_keys. push ( ( item. row_id , raw_size) ) ;
651667 continue ;
652668 }
@@ -736,7 +752,7 @@ impl CacheEvictionManager {
736752 let deletion_result = self . delete_items ( pending, & store, false ) . await ?;
737753 result. add_eviction_result ( deletion_result) ;
738754
739- return Ok ( EvictionResult :: Finished ( result) ) ;
755+ Ok ( EvictionResult :: Finished ( result) )
740756 }
741757
742758 async fn do_eviction_by_sampling (
@@ -748,14 +764,14 @@ impl CacheEvictionManager {
748764 ) -> Result < EvictionResult , CubeError > {
749765 // move
750766 let eviction_batch_size = self . eviction_batch_size ;
751- let eviction_min_ttl_threshold = self . eviction_min_ttl_threshold as i64 ;
767+ let eviction_proactive_ttl_threshold = self . eviction_proactive_ttl_threshold ;
768+ let eviction_proactive_size_threshold = self . eviction_proactive_size_threshold ;
752769
753770 let to_delete: Vec < ( u64 , u32 ) > = store
754771 . read_operation_out_of_queue ( move |db_ref| {
755772 let mut pending_volume_remove: u64 = 0 ;
756773
757- let now_with_threshold =
758- Utc :: now ( ) + chrono:: Duration :: seconds ( eviction_min_ttl_threshold) ;
774+ let now_at_start = Utc :: now ( ) ;
759775 let mut to_delete = Vec :: with_capacity ( eviction_batch_size) ;
760776
761777 let cache_schema = CacheItemRocksTable :: new ( db_ref. clone ( ) ) ;
@@ -774,15 +790,26 @@ impl CacheEvictionManager {
774790 Self :: get_weight_and_size_by_criteria ( & item, & criteria) ?;
775791
776792 if let Some ( ttl) = item. ttl {
777- if ttl < now_with_threshold {
793+ let ready_to_delete = if ttl < now_at_start {
794+ true
795+ } else if ttl - now_at_start
796+ <= chrono:: Duration :: seconds ( eviction_proactive_ttl_threshold as i64 )
797+ {
798+ // Checking the size of the key, because it can be problematic to delete keys with small size, because
799+ // it can be a refresh key.
800+ raw_size > eviction_proactive_size_threshold
801+ } else {
802+ false
803+ } ;
804+
805+ if ready_to_delete {
778806 if target_is_size {
779807 pending_volume_remove += raw_size as u64 ;
780808 } else {
781809 pending_volume_remove += 1 ;
782810 }
783811
784812 to_delete. push ( ( item. row_id , raw_size) ) ;
785-
786813 continue ;
787814 }
788815 }
0 commit comments