diff --git a/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs b/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs index f3cbdb89a9f3d..291600a9f72a8 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs @@ -148,9 +148,10 @@ pub struct CacheEvictionManager { persist_batch_size: usize, eviction_batch_size: usize, eviction_below_threshold: u8, - // if ttl of a key is less then this value, key will be evicted - // this help to delete upcoming keys for deleting - eviction_min_ttl_threshold: u32, + /// Proactive deletion of keys with upcoming expiration in the next N seconds. + it checks size, because + /// possible it can lead to a drop of refresh keys or touch flags + eviction_proactive_size_threshold: u32, + eviction_proactive_ttl_threshold: u32, compaction_trigger_size: u64, // background listener to track events _ttl_tl_loop_join_handle: Arc>, @@ -308,7 +309,10 @@ impl CacheEvictionManager { persist_batch_size: config.cachestore_cache_persist_batch_size(), eviction_batch_size: config.cachestore_cache_eviction_batch_size(), eviction_below_threshold: config.cachestore_cache_eviction_below_threshold(), - eviction_min_ttl_threshold: config.cachestore_cache_eviction_min_ttl_threshold(), + eviction_proactive_size_threshold: config + .cachestore_cache_eviction_proactive_size_threshold(), + eviction_proactive_ttl_threshold: config + .cachestore_cache_eviction_proactive_ttl_threshold(), compaction_trigger_size: config.cachestore_cache_compaction_trigger_size(), // _ttl_tl_loop_join_handle: Arc::new(AbortingJoinHandle::new(join_handle)), @@ -616,7 +620,8 @@ impl CacheEvictionManager { criteria: CacheEvictionWeightCriteria, store: &Arc, ) -> Result<(KeysVector, KeysVector), CubeError> { - let eviction_min_ttl_threshold = self.eviction_min_ttl_threshold as i64; + let eviction_proactive_ttl_threshold = self.eviction_proactive_ttl_threshold; + let eviction_proactive_size_threshold = self.eviction_proactive_size_threshold; let (all_keys, stats_total_keys, stats_total_raw_size, expired_keys) = store .read_operation_out_of_queue(move |db_ref| { @@ -625,8 +630,7 @@ impl CacheEvictionManager { let cache_schema = CacheItemRocksTable::new(db_ref.clone()); - let now_with_threshold = - Utc::now() + chrono::Duration::seconds(eviction_min_ttl_threshold); + let now_at_start = Utc::now(); let mut expired_keys = KeysVector::with_capacity(64); let mut all_keys: Vec<( @@ -646,7 +650,19 @@ impl CacheEvictionManager { stats_total_raw_size += raw_size as u64; if let Some(ttl) = item.ttl { - if ttl < now_with_threshold { + let ready_to_delete = if ttl <= now_at_start { + true + } else if ttl - now_at_start + <= chrono::Duration::seconds(eviction_proactive_ttl_threshold as i64) + { + // Checking the size of the key, because it can be problematic to delete keys with small size, because + // it can be a refresh key. + raw_size > eviction_proactive_size_threshold + } else { + false + }; + + if ready_to_delete { expired_keys.push((item.row_id, raw_size)); continue; } @@ -736,7 +752,7 @@ impl CacheEvictionManager { let deletion_result = self.delete_items(pending, &store, false).await?; result.add_eviction_result(deletion_result); - return Ok(EvictionResult::Finished(result)); + Ok(EvictionResult::Finished(result)) } async fn do_eviction_by_sampling( @@ -748,14 +764,14 @@ impl CacheEvictionManager { ) -> Result { // move let eviction_batch_size = self.eviction_batch_size; - let eviction_min_ttl_threshold = self.eviction_min_ttl_threshold as i64; + let eviction_proactive_ttl_threshold = self.eviction_proactive_ttl_threshold; + let eviction_proactive_size_threshold = self.eviction_proactive_size_threshold; let to_delete: Vec<(u64, u32)> = store .read_operation_out_of_queue(move |db_ref| { let mut pending_volume_remove: u64 = 0; - let now_with_threshold = - Utc::now() + chrono::Duration::seconds(eviction_min_ttl_threshold); + let now_at_start = Utc::now(); let mut to_delete = Vec::with_capacity(eviction_batch_size); let cache_schema = CacheItemRocksTable::new(db_ref.clone()); @@ -774,7 +790,19 @@ impl CacheEvictionManager { Self::get_weight_and_size_by_criteria(&item, &criteria)?; if let Some(ttl) = item.ttl { - if ttl < now_with_threshold { + let ready_to_delete = if ttl < now_at_start { + true + } else if ttl - now_at_start + <= chrono::Duration::seconds(eviction_proactive_ttl_threshold as i64) + { + // Checking the size of the key, because it can be problematic to delete keys with small size, because + // it can be a refresh key. + raw_size > eviction_proactive_size_threshold + } else { + false + }; + + if ready_to_delete { if target_is_size { pending_volume_remove += raw_size as u64; } else { @@ -782,7 +810,6 @@ impl CacheEvictionManager { } to_delete.push((item.row_id, raw_size)); - continue; } } diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 5bc27d27a5021..fb433a3fd452c 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -464,7 +464,9 @@ pub trait ConfigObj: DIService { fn cachestore_cache_persist_batch_size(&self) -> usize; - fn cachestore_cache_eviction_min_ttl_threshold(&self) -> u32; + fn cachestore_cache_eviction_proactive_size_threshold(&self) -> u32; + + fn cachestore_cache_eviction_proactive_ttl_threshold(&self) -> u32; fn cachestore_cache_ttl_notify_channel(&self) -> usize; @@ -609,7 +611,8 @@ pub struct ConfigObjImpl { pub cachestore_cache_eviction_batch_size: usize, pub cachestore_cache_eviction_below_threshold: u8, pub cachestore_cache_persist_batch_size: usize, - pub cachestore_cache_eviction_min_ttl_threshold: u32, + pub cachestore_cache_eviction_proactive_size_threshold: u32, + pub cachestore_cache_eviction_proactive_ttl_threshold: u32, pub cachestore_cache_ttl_notify_channel: usize, pub cachestore_cache_ttl_buffer_max_size: usize, pub upload_concurrency: u64, @@ -855,8 +858,8 @@ impl ConfigObj for ConfigObjImpl { self.cachestore_cache_persist_batch_size } - fn cachestore_cache_eviction_min_ttl_threshold(&self) -> u32 { - self.cachestore_cache_eviction_min_ttl_threshold + fn cachestore_cache_eviction_proactive_ttl_threshold(&self) -> u32 { + self.cachestore_cache_eviction_proactive_ttl_threshold } fn cachestore_cache_ttl_notify_channel(&self) -> usize { @@ -1027,6 +1030,10 @@ impl ConfigObj for ConfigObjImpl { fn cachestore_cache_eviction_below_threshold(&self) -> u8 { self.cachestore_cache_eviction_below_threshold } + + fn cachestore_cache_eviction_proactive_size_threshold(&self) -> u32 { + self.cachestore_cache_eviction_proactive_size_threshold + } } lazy_static! { @@ -1421,8 +1428,16 @@ impl Config { "CUBESTORE_CACHE_PERSIST_BATCH_SIZE", 150, ), - cachestore_cache_eviction_min_ttl_threshold: env_parse_duration( - "CUBESTORE_CACHE_EVICTION_TTL_THRESHOLD", + cachestore_cache_eviction_proactive_size_threshold: env_parse_duration( + "CUBESTORE_CACHE_EVICTION_PROACTIVE_SIZE_THRESHOLD", + // 256 kb + 256 << 10, + Some(cachestore_cache_max_entry_size as u32), + // It's not allowed to be less than 128 kb, because it can proactively evict refresh keys + Some(128 << 10), + ), + cachestore_cache_eviction_proactive_ttl_threshold: env_parse_duration( + "CUBESTORE_CACHE_EVICTION_PROACTIVE_TTL_THRESHOLD", 5, Some(5 * 60), Some(0), @@ -1636,7 +1651,8 @@ impl Config { cachestore_cache_eviction_batch_size: 150, cachestore_cache_eviction_below_threshold: 15, cachestore_cache_persist_batch_size: 200, - cachestore_cache_eviction_min_ttl_threshold: 5, + cachestore_cache_eviction_proactive_size_threshold: 4096, + cachestore_cache_eviction_proactive_ttl_threshold: 5, cachestore_cache_ttl_notify_channel: 4_096, cachestore_cache_ttl_buffer_max_size: 16_384, upload_concurrency: 4, diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 13a8ccaba8fb5..4124c40474b57 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -177,7 +177,7 @@ pub trait RocksSecondaryIndex: BaseRocksSecondaryIndex { expire, RocksSecondaryIndexValueTTLExtended { lfu: 0, - // Setup currect time as a protection for LRU eviction + // Specify the current time as protection from LRU eviction lru: Some(Utc::now()), raw_size: self.raw_value_size(row), },