Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ pub struct CacheEvictionManager {
persist_batch_size: usize,
eviction_batch_size: usize,
eviction_below_threshold: u8,
// if ttl of a key is less then this value, key will be evicted
// this help to delete upcoming keys for deleting
eviction_min_ttl_threshold: u32,
/// Proactive deletion of keys with upcoming expiration in the next N seconds. + it checks size, because
/// possible it can lead to a drop of refresh keys or touch flags
eviction_proactive_size_threshold: u32,
eviction_proactive_ttl_threshold: u32,
compaction_trigger_size: u64,
// background listener to track events
_ttl_tl_loop_join_handle: Arc<AbortingJoinHandle<()>>,
Expand Down Expand Up @@ -308,7 +309,10 @@ impl CacheEvictionManager {
persist_batch_size: config.cachestore_cache_persist_batch_size(),
eviction_batch_size: config.cachestore_cache_eviction_batch_size(),
eviction_below_threshold: config.cachestore_cache_eviction_below_threshold(),
eviction_min_ttl_threshold: config.cachestore_cache_eviction_min_ttl_threshold(),
eviction_proactive_size_threshold: config
.cachestore_cache_eviction_proactive_size_threshold(),
eviction_proactive_ttl_threshold: config
.cachestore_cache_eviction_proactive_ttl_threshold(),
compaction_trigger_size: config.cachestore_cache_compaction_trigger_size(),
//
_ttl_tl_loop_join_handle: Arc::new(AbortingJoinHandle::new(join_handle)),
Expand Down Expand Up @@ -616,7 +620,8 @@ impl CacheEvictionManager {
criteria: CacheEvictionWeightCriteria,
store: &Arc<RocksStore>,
) -> Result<(KeysVector, KeysVector), CubeError> {
let eviction_min_ttl_threshold = self.eviction_min_ttl_threshold as i64;
let eviction_proactive_ttl_threshold = self.eviction_proactive_ttl_threshold;
let eviction_proactive_size_threshold = self.eviction_proactive_size_threshold;

let (all_keys, stats_total_keys, stats_total_raw_size, expired_keys) = store
.read_operation_out_of_queue(move |db_ref| {
Expand All @@ -625,8 +630,7 @@ impl CacheEvictionManager {

let cache_schema = CacheItemRocksTable::new(db_ref.clone());

let now_with_threshold =
Utc::now() + chrono::Duration::seconds(eviction_min_ttl_threshold);
let now_at_start = Utc::now();

let mut expired_keys = KeysVector::with_capacity(64);
let mut all_keys: Vec<(
Expand All @@ -646,7 +650,19 @@ impl CacheEvictionManager {
stats_total_raw_size += raw_size as u64;

if let Some(ttl) = item.ttl {
if ttl < now_with_threshold {
let ready_to_delete = if ttl <= now_at_start {
true
} else if ttl - now_at_start
<= chrono::Duration::seconds(eviction_proactive_ttl_threshold as i64)
{
// Checking the size of the key, because it can be problematic to delete keys with small size, because
// it can be a refresh key.
raw_size > eviction_proactive_size_threshold
} else {
false
};

if ready_to_delete {
expired_keys.push((item.row_id, raw_size));
continue;
}
Expand Down Expand Up @@ -736,7 +752,7 @@ impl CacheEvictionManager {
let deletion_result = self.delete_items(pending, &store, false).await?;
result.add_eviction_result(deletion_result);

return Ok(EvictionResult::Finished(result));
Ok(EvictionResult::Finished(result))
}

async fn do_eviction_by_sampling(
Expand All @@ -748,14 +764,14 @@ impl CacheEvictionManager {
) -> Result<EvictionResult, CubeError> {
// move
let eviction_batch_size = self.eviction_batch_size;
let eviction_min_ttl_threshold = self.eviction_min_ttl_threshold as i64;
let eviction_proactive_ttl_threshold = self.eviction_proactive_ttl_threshold;
let eviction_proactive_size_threshold = self.eviction_proactive_size_threshold;

let to_delete: Vec<(u64, u32)> = store
.read_operation_out_of_queue(move |db_ref| {
let mut pending_volume_remove: u64 = 0;

let now_with_threshold =
Utc::now() + chrono::Duration::seconds(eviction_min_ttl_threshold);
let now_at_start = Utc::now();
let mut to_delete = Vec::with_capacity(eviction_batch_size);

let cache_schema = CacheItemRocksTable::new(db_ref.clone());
Expand All @@ -774,15 +790,26 @@ impl CacheEvictionManager {
Self::get_weight_and_size_by_criteria(&item, &criteria)?;

if let Some(ttl) = item.ttl {
if ttl < now_with_threshold {
let ready_to_delete = if ttl < now_at_start {
true
} else if ttl - now_at_start
<= chrono::Duration::seconds(eviction_proactive_ttl_threshold as i64)
{
// Checking the size of the key, because it can be problematic to delete keys with small size, because
// it can be a refresh key.
raw_size > eviction_proactive_size_threshold
} else {
false
};

if ready_to_delete {
if target_is_size {
pending_volume_remove += raw_size as u64;
} else {
pending_volume_remove += 1;
}

to_delete.push((item.row_id, raw_size));

continue;
}
}
Expand Down
30 changes: 23 additions & 7 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,9 @@ pub trait ConfigObj: DIService {

fn cachestore_cache_persist_batch_size(&self) -> usize;

fn cachestore_cache_eviction_min_ttl_threshold(&self) -> u32;
fn cachestore_cache_eviction_proactive_size_threshold(&self) -> u32;

fn cachestore_cache_eviction_proactive_ttl_threshold(&self) -> u32;

fn cachestore_cache_ttl_notify_channel(&self) -> usize;

Expand Down Expand Up @@ -609,7 +611,8 @@ pub struct ConfigObjImpl {
pub cachestore_cache_eviction_batch_size: usize,
pub cachestore_cache_eviction_below_threshold: u8,
pub cachestore_cache_persist_batch_size: usize,
pub cachestore_cache_eviction_min_ttl_threshold: u32,
pub cachestore_cache_eviction_proactive_size_threshold: u32,
pub cachestore_cache_eviction_proactive_ttl_threshold: u32,
pub cachestore_cache_ttl_notify_channel: usize,
pub cachestore_cache_ttl_buffer_max_size: usize,
pub upload_concurrency: u64,
Expand Down Expand Up @@ -855,8 +858,8 @@ impl ConfigObj for ConfigObjImpl {
self.cachestore_cache_persist_batch_size
}

fn cachestore_cache_eviction_min_ttl_threshold(&self) -> u32 {
self.cachestore_cache_eviction_min_ttl_threshold
fn cachestore_cache_eviction_proactive_ttl_threshold(&self) -> u32 {
self.cachestore_cache_eviction_proactive_ttl_threshold
}

fn cachestore_cache_ttl_notify_channel(&self) -> usize {
Expand Down Expand Up @@ -1027,6 +1030,10 @@ impl ConfigObj for ConfigObjImpl {
fn cachestore_cache_eviction_below_threshold(&self) -> u8 {
self.cachestore_cache_eviction_below_threshold
}

fn cachestore_cache_eviction_proactive_size_threshold(&self) -> u32 {
self.cachestore_cache_eviction_proactive_size_threshold
}
}

lazy_static! {
Expand Down Expand Up @@ -1421,8 +1428,16 @@ impl Config {
"CUBESTORE_CACHE_PERSIST_BATCH_SIZE",
150,
),
cachestore_cache_eviction_min_ttl_threshold: env_parse_duration(
"CUBESTORE_CACHE_EVICTION_TTL_THRESHOLD",
cachestore_cache_eviction_proactive_size_threshold: env_parse_duration(
"CUBESTORE_CACHE_EVICTION_PROACTIVE_SIZE_THRESHOLD",
// 256 kb
256 << 10,
Some(cachestore_cache_max_entry_size as u32),
// It's not allowed to be less than 128 kb, because it can proactively evict refresh keys
Some(128 << 10),
),
cachestore_cache_eviction_proactive_ttl_threshold: env_parse_duration(
"CUBESTORE_CACHE_EVICTION_PROACTIVE_TTL_THRESHOLD",
5,
Some(5 * 60),
Some(0),
Expand Down Expand Up @@ -1636,7 +1651,8 @@ impl Config {
cachestore_cache_eviction_batch_size: 150,
cachestore_cache_eviction_below_threshold: 15,
cachestore_cache_persist_batch_size: 200,
cachestore_cache_eviction_min_ttl_threshold: 5,
cachestore_cache_eviction_proactive_size_threshold: 4096,
cachestore_cache_eviction_proactive_ttl_threshold: 5,
cachestore_cache_ttl_notify_channel: 4_096,
cachestore_cache_ttl_buffer_max_size: 16_384,
upload_concurrency: 4,
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/metastore/rocks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub trait RocksSecondaryIndex<T, K: Hash>: BaseRocksSecondaryIndex<T> {
expire,
RocksSecondaryIndexValueTTLExtended {
lfu: 0,
// Setup currect time as a protection for LRU eviction
// Specify the current time as protection from LRU eviction
lru: Some(Utc::now()),
raw_size: self.raw_value_size(row),
},
Expand Down
Loading