Skip to content

Commit 1125eb4

Browse files
authored
fix(evicition): Limit accumulation deleted bytes during eviction (#5995)
* fix(eviction): Limit accumulating deleted bytes during eviction Cache state variable accumulated_deleted_bytes_during_eviction need to have upper bound when accumulating to prevent over eviction. Adjust deleted bytes for each shard between two heartbeats by comparing previous shard used memory and current shard used memory. Fixes #5945 Signed-off-by: mkaruza <[email protected]>
1 parent fac42a0 commit 1125eb4

File tree

2 files changed

+86
-34
lines changed

2 files changed

+86
-34
lines changed

src/server/engine_shard.cc

Lines changed: 74 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ ABSL_FLAG(string, tiered_prefix, "",
5555
ABSL_FLAG(bool, enable_heartbeat_eviction, true,
5656
"Enable eviction during heartbeat when memory is under pressure.");
5757
ABSL_FLAG(bool, enable_heartbeat_rss_eviction, true,
58-
"Enable eviction during heartbeat when rss memory is under pressure. Evicition based "
58+
"Enable eviction during heartbeat when rss memory is under pressure. Eviction based "
5959
"on used_memory will still be enabled.");
6060
ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
6161
"Eviction starts when the free memory (including RSS memory) drops below "
@@ -383,7 +383,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) {
383383
}
384384
auto heartbeat = [this]() { Heartbeat(); };
385385

386-
eviction_state_.rss_eviction_enabled_ = GetFlag(FLAGS_enable_heartbeat_rss_eviction);
386+
eviction_state_.rss_eviction_enabled = GetFlag(FLAGS_enable_heartbeat_rss_eviction);
387387
std::chrono::milliseconds period_ms(*cycle_ms);
388388

389389
fb2::Fiber::Opts fb_opts{.priority = absl::GetFlag(FLAGS_background_heartbeat)
@@ -701,16 +701,57 @@ void EngineShard::RetireExpiredAndEvict() {
701701
}
702702

703703
// Track deleted bytes only if we expect to lower memory
704-
if (eviction_state_.track_deleted_bytes)
705-
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
704+
if (eviction_state_.track_deleted_bytes) {
705+
eviction_state_.deleted_bytes_at_prev_eviction = deleted_bytes;
706+
}
707+
}
708+
709+
// Adjust deleted bytes w.r.t shard used memory. If we increase shard used
710+
// memory in current heartbeat we can invalidate deleted_bytes. Otherwise we adjust deleted
711+
// bytes by diff.
712+
void EngineShard::EvictionTaskState::AdjustDeletedBytes(size_t shard_used_memory) {
713+
if (shard_used_memory >= shard_used_memory_at_prev_eviction) {
714+
deleted_bytes_at_prev_eviction = 0;
715+
} else {
716+
deleted_bytes_at_prev_eviction = std::min(
717+
deleted_bytes_at_prev_eviction, shard_used_memory_at_prev_eviction - shard_used_memory);
718+
}
719+
}
720+
721+
// Check if adding value of previous deleted bytes will be higher than rss memory budget and
722+
// limit if needed.
723+
void EngineShard::EvictionTaskState::LimitAccumulatedDeletedBytes(
724+
size_t shard_rss_over_memory_budget) {
725+
const size_t next_acc_deleted_bytes =
726+
acc_deleted_bytes_during_eviction + deleted_bytes_at_prev_eviction;
727+
acc_deleted_bytes_during_eviction = shard_rss_over_memory_budget > next_acc_deleted_bytes
728+
? next_acc_deleted_bytes
729+
: shard_rss_over_memory_budget;
730+
}
731+
732+
// Once the rss memory is lowered we can start also decreasing accumulated total bytes.
733+
void EngineShard::EvictionTaskState::AdjustAccumulatedDeletedBytes(size_t global_used_rss_memory) {
734+
if (global_used_rss_memory < global_rss_memory_at_prev_eviction) {
735+
auto decrease_delete_bytes_before_rss_update =
736+
std::min(acc_deleted_bytes_during_eviction,
737+
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shard_set->size());
738+
VLOG(2) << "deleted_bytes_before_rss_update: " << acc_deleted_bytes_during_eviction
739+
<< " decrease_delete_bytes_before_rss_update: "
740+
<< decrease_delete_bytes_before_rss_update;
741+
acc_deleted_bytes_during_eviction -= decrease_delete_bytes_before_rss_update;
742+
}
743+
LOG_IF(DFATAL, global_used_rss_memory < (acc_deleted_bytes_during_eviction * shard_set->size()))
744+
<< "RSS eviction underflow "
745+
<< "global_used_rss_memory: " << global_used_rss_memory
746+
<< " total_deleted_bytes_on_eviction: " << acc_deleted_bytes_during_eviction;
706747
}
707748

708749
size_t EngineShard::CalculateEvictionBytes() {
709750
const size_t shards_count = shard_set->size();
710751
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
711752

712753
// Calculate threshold for both used_memory and rss_memory
713-
size_t limit = max_memory_limit.load(memory_order_relaxed);
754+
const size_t limit = max_memory_limit.load(memory_order_relaxed);
714755
const size_t shard_memory_budget_threshold =
715756
size_t(limit * eviction_memory_budget_threshold) / shards_count;
716757

@@ -727,55 +768,56 @@ size_t EngineShard::CalculateEvictionBytes() {
727768
// Check for `enable_heartbeat_rss_eviction` flag since it dynamic. And reset
728769
// state if flag has changed.
729770
bool rss_eviction_enabled_flag = GetFlag(FLAGS_enable_heartbeat_rss_eviction);
730-
if (eviction_state_.rss_eviction_enabled_ != rss_eviction_enabled_flag) {
731-
eviction_state_.global_rss_memory_at_prev_eviction =
732-
eviction_state_.deleted_bytes_before_rss_update = 0;
733-
eviction_state_.rss_eviction_enabled_ = rss_eviction_enabled_flag;
771+
if (eviction_state_.rss_eviction_enabled != rss_eviction_enabled_flag) {
772+
eviction_state_.Reset(rss_eviction_enabled_flag);
734773
}
735-
if (eviction_state_.rss_eviction_enabled_) {
736-
// Calculate how much rss memory is used by all shards
774+
if (eviction_state_.rss_eviction_enabled) {
737775
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
738-
auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
739-
auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
740-
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
741-
auto decrease_delete_bytes_before_rss_update =
742-
std::min(deleted_bytes_before_rss_update,
743-
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
744-
VLOG(2) << "deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update
745-
<< " decrease_delete_bytes_before_rss_update: "
746-
<< decrease_delete_bytes_before_rss_update;
747-
deleted_bytes_before_rss_update -= decrease_delete_bytes_before_rss_update;
748-
}
776+
const size_t rss_memory_threshold_start = limit * (1. - eviction_memory_budget_threshold);
777+
const size_t shard_used_memory = UsedMemory();
778+
779+
// Adjust previous deleted bytes
780+
eviction_state_.AdjustDeletedBytes(shard_used_memory);
781+
782+
// Calculate memory budget that is higher than rss_memory_threshold_start. This is our limit
783+
// for accumulated_deleted_bytes.
784+
const size_t shard_rss_over_memory_budget =
785+
global_used_rss_memory > rss_memory_threshold_start
786+
? (global_used_rss_memory - rss_memory_threshold_start) / shards_count
787+
: 0;
788+
eviction_state_.LimitAccumulatedDeletedBytes(shard_rss_over_memory_budget);
749789

750-
global_rss_memory_at_prev_eviction = global_used_rss_memory;
790+
// Once the rss memory is lowered we can start also decreasing accumulated total bytes.
791+
eviction_state_.AdjustAccumulatedDeletedBytes(global_used_rss_memory);
751792

752-
LOG_IF(DFATAL, global_used_rss_memory < (deleted_bytes_before_rss_update * shards_count))
753-
<< "RSS evicition underflow "
754-
<< "global_used_rss_memory: " << global_used_rss_memory
755-
<< " deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
793+
// Update rss/used memory for this heartbeat
794+
eviction_state_.global_rss_memory_at_prev_eviction = global_used_rss_memory;
795+
eviction_state_.shard_used_memory_at_prev_eviction = shard_used_memory;
756796

757797
// If we underflow use limit as used_memory
758-
size_t used_rss_memory_with_deleted_bytes =
759-
std::min(global_used_rss_memory - deleted_bytes_before_rss_update * shards_count, limit);
798+
size_t used_rss_memory_with_deleted_bytes = std::min(
799+
global_used_rss_memory - eviction_state_.acc_deleted_bytes_during_eviction * shards_count,
800+
limit);
760801

761802
// Try to evict more bytes if we are close to the rss memory limit
762-
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
803+
size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
763804
limit, used_rss_memory_with_deleted_bytes, shard_memory_budget_threshold);
764805

765806
// RSS evictions starts so we should start tracking deleted_bytes
766807
if (rss_goal_bytes) {
767808
eviction_state_.track_deleted_bytes = true;
768809
} else {
769810
// There is no RSS eviction goal and we have cleared tracked deleted bytes
770-
if (!deleted_bytes_before_rss_update) {
811+
if (!eviction_state_.acc_deleted_bytes_during_eviction) {
771812
eviction_state_.track_deleted_bytes = false;
772813
}
773814
}
774815

775816
VLOG_IF(2, rss_goal_bytes > 0)
776817
<< "Rss memory goal bytes: " << rss_goal_bytes
777818
<< ", rss used memory: " << global_used_rss_memory << ", rss memory limit: " << limit
778-
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
819+
<< ", accumulated_deleted_bytes_during_eviction: "
820+
<< eviction_state_.acc_deleted_bytes_during_eviction;
779821

780822
goal_bytes = std::max(goal_bytes, rss_goal_bytes);
781823
}

src/server/engine_shard.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,19 @@ class EngineShard {
226226
};
227227

228228
struct EvictionTaskState {
229-
bool rss_eviction_enabled_ = true;
229+
void Reset(bool rss_eviction_enabled_flag) {
230+
rss_eviction_enabled = rss_eviction_enabled_flag;
231+
shard_used_memory_at_prev_eviction = global_rss_memory_at_prev_eviction =
232+
acc_deleted_bytes_during_eviction = deleted_bytes_at_prev_eviction = 0;
233+
}
234+
void AdjustDeletedBytes(size_t shard_used_memory);
235+
void LimitAccumulatedDeletedBytes(size_t shard_rss_over_memory_budget);
236+
void AdjustAccumulatedDeletedBytes(size_t global_used_rss_memory);
237+
bool rss_eviction_enabled = true;
230238
bool track_deleted_bytes = false;
231-
size_t deleted_bytes_before_rss_update = 0;
239+
size_t acc_deleted_bytes_during_eviction = 0; // Accumulated deleted bytes during eviction
240+
size_t deleted_bytes_at_prev_eviction = 0; // Bytes deleted in previous eviction
241+
size_t shard_used_memory_at_prev_eviction = 0;
232242
size_t global_rss_memory_at_prev_eviction = 0;
233243
};
234244

0 commit comments

Comments
 (0)