From 8dd9e559aab22947c14ac1e6003e9ce43ce5c410 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich <43710058+BagritsevichStepan@users.noreply.github.com> Date: Sat, 27 Sep 2025 14:12:45 +0300 Subject: [PATCH] chore: Limit expiry value to 32 bits. We use adaptive precision, where we keep "millis" precision if we can, and switch to seconds precision if the deadline is too large. This actually was before but now I reduced the cut-off to 4B ms, or 49 days. Also, the ttl in seconds is now limited to 4B sec, with larger values quietly rounded down to this limit. Signed-off-by: Roman Gershman --- src/core/expire_period.h | 34 ++++++++++++++++++++-------------- src/server/db_slice.cc | 2 ++ src/server/db_slice.h | 8 ++++---- src/server/engine_shard.cc | 10 +++++++++- src/server/engine_shard.h | 5 ++++- 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/core/expire_period.h b/src/core/expire_period.h index 6f8511eb2f36..a73ee040558d 100644 --- a/src/core/expire_period.h +++ b/src/core/expire_period.h @@ -8,9 +8,13 @@ namespace dfly { +// ExpirePeriod encapsulates the expiration period of a key. +// It can represent periods in milliseconds up to ~49 days and in seconds up to ~136 years. +// If value in milliseconds is too high, it switches to seconds precision. +// And if it's still too high, it silently clamps to the max value. class ExpirePeriod { public: - static constexpr size_t kMaxGenId = 15; + static constexpr uint32_t kMaxGenId = 15; ExpirePeriod() : val_(0), gen_(0), precision_(0) { static_assert(sizeof(ExpirePeriod) == 8); // TODO @@ -35,28 +39,30 @@ class ExpirePeriod { void Set(uint64_t ms); - bool is_second_precision() { return precision_ == 1;} + bool is_second_precision() const { + return precision_ == 1; + } private: - uint64_t val_ : 59; - uint64_t gen_ : 4; - uint64_t precision_ : 1; // 0 - ms, 1 - sec. + uint32_t val_; + uint8_t gen_ : 2; // generation id. + uint8_t precision_ : 1; // 0 - ms, 1 - sec. }; inline void ExpirePeriod::Set(uint64_t ms) { - constexpr uint64_t kBarrier = (1ULL << 48); - - if (ms < kBarrier) { - val_ = ms; - precision_ = 0; // ms + if (ms < UINT32_MAX) { + val_ = ms; // about 49 days in ms. + precision_ = 0; // ms return; } - precision_ = 1; - if (ms < kBarrier << 10) { - ms = (ms + 500) / 1000; // seconds + precision_ = 1; // seconds + if (ms < UINT64_MAX / 2) { + ms = (ms + 500) / 1000; + val_ = ms >= UINT32_MAX ? UINT32_MAX - 1 : ms; + } else { + val_ = UINT32_MAX - 1; } - val_ = ms >= kBarrier ? kBarrier - 1 : ms; } } // namespace dfly diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index cabbdf25de7a..8dc7259ce378 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -398,6 +398,8 @@ class DbSlice::PrimeBumpPolicy { DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner) : shard_id_(index), cache_mode_(cache_mode), + expire_allowed_(true), + expire_base_index_(0), owner_(owner), client_tracking_map_(owner->memory_resource()) { db_arr_.emplace_back(); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 3a1933b57a9f..571532fbfd47 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -252,11 +252,11 @@ class DbSlice { } int64_t ExpireTime(const ExpirePeriod& val) const { - return expire_base_[0] + val.duration_ms(); + return expire_base_[val.generation_id()] + val.duration_ms(); } ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const { - return ExpirePeriod{time_ms - expire_base_[0]}; + return ExpirePeriod{time_ms - expire_base_[expire_base_index_], expire_base_index_}; } struct ItAndUpdater { @@ -607,11 +607,11 @@ class DbSlice { ShardId shard_id_; uint8_t cache_mode_ : 1; - + uint8_t expire_allowed_ : 1; + uint8_t expire_base_index_ : 1; EngineShard* owner_; int64_t expire_base_[2]; // Used for expire logic, represents a real clock. - bool expire_allowed_ = true; uint64_t version_ = 1; // Used to version entries in the PrimeTable. uint64_t next_moved_id_ = 1; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 945d40350492..3fdd815c8833 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -301,6 +301,8 @@ std::optional EngineShard::DoDefrag(CollectPageStats collect return page_usage.CollectedStats(); } +constexpr uint32_t kRunAtLowPriority = 0u; + // the memory defragmentation task is as follow: // 1. Check if memory usage is high enough // 2. Check if diff between commited and used memory is high enough @@ -310,7 +312,6 @@ std::optional EngineShard::DoDefrag(CollectPageStats collect // priority. // otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { - constexpr uint32_t kRunAtLowPriority = 0u; if (!namespaces) { return kRunAtLowPriority; } @@ -326,6 +327,10 @@ uint32_t EngineShard::DefragTask() { return 6; // priority. } +uint32_t EngineShard::UpdateExpiresTask() { + return kRunAtLowPriority; +} + EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) : txq_([](const Transaction* t) { return t->txid(); }), queue_(kQueueLen, 1, 1), @@ -347,6 +352,8 @@ void EngineShard::Shutdown() { void EngineShard::StopPeriodicFiber() { ProactorBase::me()->RemoveOnIdleTask(defrag_task_); + ProactorBase::me()->RemoveOnIdleTask(update_expire_base_task_); + fiber_heartbeat_periodic_done_.Notify(); if (fiber_heartbeat_periodic_.IsJoinable()) { fiber_heartbeat_periodic_.Join(); @@ -394,6 +401,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) { RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_); }); defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); }); + update_expire_base_task_ = pb->AddOnIdleTask([this]() { return UpdateExpiresTask(); }); } void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb, diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index ce0a211f7959..6707634a4a82 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -257,6 +257,7 @@ class EngineShard { // context of the controlling thread will access this shard! // -------------------------------------------------------------------------- uint32_t DefragTask(); + uint32_t UpdateExpiresTask(); TxQueue txq_; TaskQueue queue_, queue2_; @@ -283,7 +284,9 @@ class EngineShard { journal::Journal* journal_ = nullptr; IntentLock shard_lock_; - uint32_t defrag_task_ = 0; + // Idle tasks. + uint32_t defrag_task_ = 0, update_expire_base_task_ = 0; + EvictionTaskState eviction_state_; // Used on eviction fiber util::fb2::Fiber fiber_heartbeat_periodic_; util::fb2::Done fiber_heartbeat_periodic_done_;